Skip to content

Commit 89a815d

Browse files
committed
Make classic_queue_consumer_unsent_message_limit configurable
Similar to other RabbitMQ internal credit flow configurations such as `credit_flow_default_credit` and `msg_store_credit_disc_bound`, this commit makes the `classic_queue_consumer_unsent_message_limit` configurable via `advanced.config`. See #11822 for the original motivation to make this setting configurable. (cherry picked from commit c771b24) (cherry picked from commit eee851b) # Conflicts: # deps/rabbit/src/rabbit_queue_consumers.erl
1 parent a7a1db9 commit 89a815d

File tree

1 file changed

+60
-6
lines changed

1 file changed

+60
-6
lines changed

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
-define(QUEUE, lqueue).
2323

24-
-define(UNSENT_MESSAGE_LIMIT, 200).
24+
-define(KEY_UNSENT_MESSAGE_LIMIT, classic_queue_consumer_unsent_message_limit).
25+
-define(DEFAULT_UNSENT_MESSAGE_LIMIT, 200).
2526

2627
%% Utilisation average calculations are all in μs.
2728
-define(USE_AVG_HALF_LIFE, 1000000.0).
@@ -63,10 +64,15 @@
6364

6465
-spec new() -> state().
6566

66-
new() -> #state{consumers = priority_queue:new(),
67-
use = {active,
68-
erlang:monotonic_time(micro_seconds),
69-
1.0}}.
67+
new() ->
68+
Val = application:get_env(rabbit,
69+
?KEY_UNSENT_MESSAGE_LIMIT,
70+
?DEFAULT_UNSENT_MESSAGE_LIMIT),
71+
persistent_term:put(?KEY_UNSENT_MESSAGE_LIMIT, Val),
72+
#state{consumers = priority_queue:new(),
73+
use = {active,
74+
erlang:monotonic_time(microsecond),
75+
1.0}}.
7076

7177
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
7278

@@ -246,6 +252,7 @@ deliver(FetchFun, QName, ConsumersChanged,
246252
end
247253
end.
248254

255+
<<<<<<< HEAD
249256
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
250257
C = lookup_ch(ChPid),
251258
case is_ch_blocked(C) of
@@ -263,6 +270,52 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
263270
FetchFun, Consumer,
264271
C#cr{limiter = Limiter}, QName)}
265272
end
273+
=======
274+
deliver_to_consumer(FetchFun,
275+
E = {ChPid, Consumer = #consumer{tag = CTag}},
276+
QName) ->
277+
C = #cr{link_states = LinkStates} = lookup_ch(ChPid),
278+
case LinkStates of
279+
#{CTag := #link_state{delivery_count = DeliveryCount0,
280+
credit = Credit} = LinkState0} ->
281+
%% bypass credit flow for link credit consumers
282+
%% as it is handled separately
283+
case Credit > 0 of
284+
true ->
285+
DeliveryCount = case DeliveryCount0 of
286+
credit_api_v1 ->
287+
DeliveryCount0;
288+
_ ->
289+
serial_number:add(DeliveryCount0, 1)
290+
end,
291+
LinkState = LinkState0#link_state{delivery_count = DeliveryCount,
292+
credit = Credit - 1},
293+
C1 = C#cr{link_states = maps:update(CTag, LinkState, LinkStates)},
294+
{delivered, deliver_to_consumer(FetchFun, Consumer, C1, QName)};
295+
false ->
296+
block_consumer(C, E),
297+
undelivered
298+
end;
299+
_ ->
300+
%% not a link credit consumer, use credit flow
301+
case is_ch_blocked(C) of
302+
true ->
303+
block_consumer(C, E),
304+
undelivered;
305+
false ->
306+
case rabbit_limiter:can_send(C#cr.limiter,
307+
Consumer#consumer.ack_required,
308+
CTag) of
309+
{suspend, Limiter} ->
310+
block_consumer(C#cr{limiter = Limiter}, E),
311+
undelivered;
312+
{continue, Limiter} ->
313+
{delivered, deliver_to_consumer(
314+
FetchFun, Consumer,
315+
C#cr{limiter = Limiter}, QName)}
316+
end
317+
end
318+
>>>>>>> eee851bd90 (Make classic_queue_consumer_unsent_message_limit configurable)
266319
end.
267320

268321
deliver_to_consumer(FetchFun,
@@ -522,7 +575,8 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
522575
update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
523576

524577
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
525-
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
578+
UnsentMessageLimit = persistent_term:get(?KEY_UNSENT_MESSAGE_LIMIT),
579+
Count >= UnsentMessageLimit orelse rabbit_limiter:is_suspended(Limiter).
526580

527581
send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
528582
case rabbit_limiter:drained(Limiter) of

0 commit comments

Comments
 (0)