Skip to content

Commit 2354527

Browse files
kjnilssonmergify-bot
authored andcommitted
QQ: fix memory leak when cancelling consumer
If the queue is empty when a consumer is cancelled it would leave the consumer id inside the service queue. If an application subscribes/unsubscibes in a loop from an empty queue this would cause the service queue to never be cleared up. NB: whenever we make a change to how the quorum queue state machien is calculated we need to consider how this effects determinism as during an upgrade different members may calculate a different service queue state. In this case it should be ok as they will eventually converge on the same state once all "dead" consumer ids have been removed from the queue. In any case it should not affect how messages are assigned to consumers. (cherry picked from commit 5779059) (cherry picked from commit 47ee288)
1 parent d5f4408 commit 2354527

File tree

2 files changed

+25
-13
lines changed

2 files changed

+25
-13
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,25 +1735,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17351735
messages = Messages0,
17361736
consumers = Cons0} = InitState) ->
17371737
case priority_queue:out(SQ0) of
1738-
{{value, ConsumerId}, SQ1} ->
1738+
{{value, ConsumerId}, SQ1}
1739+
when is_map_key(ConsumerId, Cons0) ->
17391740
case take_next_msg(InitState) of
17401741
{ConsumerMsg, State0} ->
17411742
%% there are consumers waiting to be serviced
17421743
%% process consumer checkout
1743-
case maps:find(ConsumerId, Cons0) of
1744-
{ok, #consumer{credit = 0}} ->
1744+
case maps:get(ConsumerId, Cons0) of
1745+
#consumer{credit = 0} ->
17451746
%% no credit but was still on queue
17461747
%% can happen when draining
17471748
%% recurse without consumer on queue
17481749
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1749-
{ok, #consumer{status = cancelled}} ->
1750+
#consumer{status = cancelled} ->
17501751
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1751-
{ok, #consumer{status = suspected_down}} ->
1752+
#consumer{status = suspected_down} ->
17521753
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1753-
{ok, #consumer{checked_out = Checked0,
1754-
next_msg_id = Next,
1755-
credit = Credit,
1756-
delivery_count = DelCnt} = Con0} ->
1754+
#consumer{checked_out = Checked0,
1755+
next_msg_id = Next,
1756+
credit = Credit,
1757+
delivery_count = DelCnt} = Con0 ->
17571758
Checked = maps:put(Next, ConsumerMsg, Checked0),
17581759
Con = Con0#consumer{checked_out = Checked,
17591760
next_msg_id = Next + 1,
@@ -1780,14 +1781,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17801781
add_bytes_checkout(Header, State1)),
17811782
M}
17821783
end,
1783-
{success, ConsumerId, Next, Msg, State};
1784-
error ->
1785-
%% consumer did not exist but was queued, recurse
1786-
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
1784+
{success, ConsumerId, Next, Msg, State}
17871785
end;
17881786
empty ->
17891787
{nochange, InitState}
17901788
end;
1789+
{{value, _ConsumerId}, SQ1} ->
1790+
%% consumer did not exist but was queued, recurse
1791+
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
17911792
{empty, _} ->
17921793
case lqueue:len(Messages0) of
17931794
0 -> {nochange, InitState};

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,16 @@ return_auto_checked_out_test(_) ->
392392
Effects),
393393
ok.
394394

395+
cancelled_checkout_empty_queue_test(_) ->
396+
Cid = {<<"cid">>, self()},
397+
{State1, _} = check_auto(Cid, 2, test_init(test)),
398+
% cancelled checkout should clear out service_queue also, else we'd get a
399+
% build up of these
400+
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
401+
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
402+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
403+
ok.
404+
395405
cancelled_checkout_out_test(_) ->
396406
Cid = {<<"cid">>, self()},
397407
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -401,6 +411,7 @@ cancelled_checkout_out_test(_) ->
401411
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
402412
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
403413
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
414+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
404415

405416
{State3, {dequeue, empty}} =
406417
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),

0 commit comments

Comments
 (0)