Skip to content

Commit 47ee288

Browse files
kjnilssonmergify-bot
authored andcommitted
QQ: fix memory leak when cancelling consumer
If the queue is empty when a consumer is cancelled it would leave the consumer id inside the service queue. If an application subscribes/unsubscibes in a loop from an empty queue this would cause the service queue to never be cleared up. NB: whenever we make a change to how the quorum queue state machien is calculated we need to consider how this effects determinism as during an upgrade different members may calculate a different service queue state. In this case it should be ok as they will eventually converge on the same state once all "dead" consumer ids have been removed from the queue. In any case it should not affect how messages are assigned to consumers. (cherry picked from commit 5779059)
1 parent d644976 commit 47ee288

File tree

2 files changed

+25
-13
lines changed

2 files changed

+25
-13
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,25 +1750,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17501750
messages = Messages0,
17511751
consumers = Cons0} = InitState) ->
17521752
case priority_queue:out(SQ0) of
1753-
{{value, ConsumerId}, SQ1} ->
1753+
{{value, ConsumerId}, SQ1}
1754+
when is_map_key(ConsumerId, Cons0) ->
17541755
case take_next_msg(InitState) of
17551756
{ConsumerMsg, State0} ->
17561757
%% there are consumers waiting to be serviced
17571758
%% process consumer checkout
1758-
case maps:find(ConsumerId, Cons0) of
1759-
{ok, #consumer{credit = 0}} ->
1759+
case maps:get(ConsumerId, Cons0) of
1760+
#consumer{credit = 0} ->
17601761
%% no credit but was still on queue
17611762
%% can happen when draining
17621763
%% recurse without consumer on queue
17631764
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1764-
{ok, #consumer{status = cancelled}} ->
1765+
#consumer{status = cancelled} ->
17651766
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1766-
{ok, #consumer{status = suspected_down}} ->
1767+
#consumer{status = suspected_down} ->
17671768
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1768-
{ok, #consumer{checked_out = Checked0,
1769-
next_msg_id = Next,
1770-
credit = Credit,
1771-
delivery_count = DelCnt} = Con0} ->
1769+
#consumer{checked_out = Checked0,
1770+
next_msg_id = Next,
1771+
credit = Credit,
1772+
delivery_count = DelCnt} = Con0 ->
17721773
Checked = maps:put(Next, ConsumerMsg, Checked0),
17731774
Con = Con0#consumer{checked_out = Checked,
17741775
next_msg_id = Next + 1,
@@ -1795,14 +1796,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17951796
add_bytes_checkout(Header, State1)),
17961797
M}
17971798
end,
1798-
{success, ConsumerId, Next, Msg, State};
1799-
error ->
1800-
%% consumer did not exist but was queued, recurse
1801-
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
1799+
{success, ConsumerId, Next, Msg, State}
18021800
end;
18031801
empty ->
18041802
{nochange, InitState}
18051803
end;
1804+
{{value, _ConsumerId}, SQ1} ->
1805+
%% consumer did not exist but was queued, recurse
1806+
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
18061807
{empty, _} ->
18071808
case lqueue:len(Messages0) of
18081809
0 -> {nochange, InitState};

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,16 @@ return_auto_checked_out_test(_) ->
386386
Effects),
387387
ok.
388388

389+
cancelled_checkout_empty_queue_test(_) ->
390+
Cid = {<<"cid">>, self()},
391+
{State1, _} = check_auto(Cid, 2, test_init(test)),
392+
% cancelled checkout should clear out service_queue also, else we'd get a
393+
% build up of these
394+
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
395+
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
396+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
397+
ok.
398+
389399
cancelled_checkout_out_test(_) ->
390400
Cid = {<<"cid">>, self()},
391401
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -395,6 +405,7 @@ cancelled_checkout_out_test(_) ->
395405
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
396406
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
397407
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
408+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
398409

399410
{State3, {dequeue, empty}} =
400411
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),

0 commit comments

Comments
 (0)