Skip to content

Commit eee851b

Browse files
ansdmergify[bot]
authored andcommitted
Make classic_queue_consumer_unsent_message_limit configurable
Similar to other RabbitMQ internal credit flow configurations such as `credit_flow_default_credit` and `msg_store_credit_disc_bound`, this commit makes the `classic_queue_consumer_unsent_message_limit` configurable via `advanced.config`. See #11822 for the original motivation to make this setting configurable. (cherry picked from commit c771b24)
1 parent 8246ad4 commit eee851b

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222

2323
-define(QUEUE, lqueue).
2424

25-
-define(UNSENT_MESSAGE_LIMIT, 200).
25+
-define(KEY_UNSENT_MESSAGE_LIMIT, classic_queue_consumer_unsent_message_limit).
26+
-define(DEFAULT_UNSENT_MESSAGE_LIMIT, 200).
2627

2728
%% Utilisation average calculations are all in μs.
2829
-define(USE_AVG_HALF_LIFE, 1000000.0).
@@ -72,10 +73,15 @@
7273

7374
-spec new() -> state().
7475

75-
new() -> #state{consumers = priority_queue:new(),
76-
use = {active,
77-
erlang:monotonic_time(micro_seconds),
78-
1.0}}.
76+
new() ->
77+
Val = application:get_env(rabbit,
78+
?KEY_UNSENT_MESSAGE_LIMIT,
79+
?DEFAULT_UNSENT_MESSAGE_LIMIT),
80+
persistent_term:put(?KEY_UNSENT_MESSAGE_LIMIT, Val),
81+
#state{consumers = priority_queue:new(),
82+
use = {active,
83+
erlang:monotonic_time(microsecond),
84+
1.0}}.
7985

8086
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
8187

@@ -286,7 +292,6 @@ deliver_to_consumer(FetchFun,
286292
E = {ChPid, Consumer = #consumer{tag = CTag}},
287293
QName) ->
288294
C = #cr{link_states = LinkStates} = lookup_ch(ChPid),
289-
ChBlocked = is_ch_blocked(C),
290295
case LinkStates of
291296
#{CTag := #link_state{delivery_count = DeliveryCount0,
292297
credit = Credit} = LinkState0} ->
@@ -308,22 +313,24 @@ deliver_to_consumer(FetchFun,
308313
block_consumer(C, E),
309314
undelivered
310315
end;
311-
_ when ChBlocked ->
312-
%% not a link credit consumer, use credit flow
313-
block_consumer(C, E),
314-
undelivered;
315316
_ ->
316317
%% not a link credit consumer, use credit flow
317-
case rabbit_limiter:can_send(C#cr.limiter,
318-
Consumer#consumer.ack_required,
319-
CTag) of
320-
{suspend, Limiter} ->
321-
block_consumer(C#cr{limiter = Limiter}, E),
318+
case is_ch_blocked(C) of
319+
true ->
320+
block_consumer(C, E),
322321
undelivered;
323-
{continue, Limiter} ->
324-
{delivered, deliver_to_consumer(
325-
FetchFun, Consumer,
326-
C#cr{limiter = Limiter}, QName)}
322+
false ->
323+
case rabbit_limiter:can_send(C#cr.limiter,
324+
Consumer#consumer.ack_required,
325+
CTag) of
326+
{suspend, Limiter} ->
327+
block_consumer(C#cr{limiter = Limiter}, E),
328+
undelivered;
329+
{continue, Limiter} ->
330+
{delivered, deliver_to_consumer(
331+
FetchFun, Consumer,
332+
C#cr{limiter = Limiter}, QName)}
333+
end
327334
end
328335
end.
329336

@@ -653,7 +660,8 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
653660
update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
654661

655662
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
656-
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
663+
UnsentMessageLimit = persistent_term:get(?KEY_UNSENT_MESSAGE_LIMIT),
664+
Count >= UnsentMessageLimit orelse rabbit_limiter:is_suspended(Limiter).
657665

658666
tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
659667

0 commit comments

Comments
 (0)