Skip to content

Commit 2795293

Browse files
Merge pull request #12712 from rabbitmq/gh_12608
QQ: reduce memory use when dropping many messages at once.
2 parents c78bc8a + bfa293a commit 2795293

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1596,11 +1596,30 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
15961596
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
15971597
dlx = DlxState} = State = State3,
15981598
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
1599-
{State, DlxEffects ++ Effects};
1599+
{State, combine_effects(DlxEffects, Effects)};
16001600
empty ->
16011601
{State0, Effects}
16021602
end.
16031603

1604+
%% combine global counter update effects to avoid bulding a huge list of
1605+
%% effects if many messages are dropped at the same time as could happen
1606+
%% when the `max_length' is changed via a configuration update.
1607+
combine_effects([{mod_call,
1608+
rabbit_global_counters,
1609+
messages_dead_lettered,
1610+
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1611+
[{mod_call,
1612+
rabbit_global_counters,
1613+
messages_dead_lettered,
1614+
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1615+
[{mod_call,
1616+
rabbit_global_counters,
1617+
messages_dead_lettered,
1618+
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
1619+
combine_effects(New, Old) ->
1620+
New ++ Old.
1621+
1622+
16041623
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16051624
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16061625
case mc:is(Msg) of

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2185,6 +2185,34 @@ update_config_delivery_limit_test(Config) ->
21852185

21862186
ok.
21872187

2188+
update_config_max_length_test(Config) ->
2189+
QName = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
2190+
InitConf = #{name => ?FUNCTION_NAME,
2191+
queue_resource => QName,
2192+
delivery_limit => 20
2193+
},
2194+
State0 = init(InitConf),
2195+
?assertMatch(#{config := #{delivery_limit := 20}},
2196+
rabbit_fifo:overview(State0)),
2197+
2198+
State1 = lists:foldl(fun (Num, FS0) ->
2199+
{FS, _} = enq(Config, Num, Num, Num, FS0),
2200+
FS
2201+
end, State0, lists:seq(1, 100)),
2202+
Conf = #{name => ?FUNCTION_NAME,
2203+
queue_resource => QName,
2204+
max_length => 2,
2205+
dead_letter_handler => undefined},
2206+
%% assert only one global counter effect is generated rather than 1 per
2207+
%% dropped message
2208+
{State, ok, Effects} = apply(meta(Config, ?LINE),
2209+
rabbit_fifo:make_update_config(Conf), State1),
2210+
?assertMatch([{mod_call, rabbit_global_counters, messages_dead_lettered,
2211+
[maxlen, rabbit_quorum_queue,disabled, 98]}], Effects),
2212+
?assertMatch(#{config := #{max_length := 2},
2213+
num_ready_messages := 2}, rabbit_fifo:overview(State)),
2214+
ok.
2215+
21882216
purge_nodes_test(Config) ->
21892217
Node = purged@node,
21902218
ThisNode = node(),

0 commit comments

Comments
 (0)