Skip to content

Commit 3b087f1

Browse files
authored
Merge pull request #3460 from rabbitmq/mergify/bp/v3.9.x/pr-3448
Quorum Queue consumer cancellation fixes (backport #3448)
2 parents dc10074 + dc43970 commit 3b087f1

File tree

2 files changed

+41
-23
lines changed

2 files changed

+41
-23
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,13 @@ apply(#{index := Index,
340340
{State, Reply, Effects}
341341
end
342342
end;
343-
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
344-
{State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
345-
consumer_cancel),
346-
checkout(Meta, State0, State, Effects);
343+
apply(#{index := Idx} = Meta,
344+
#checkout{spec = cancel,
345+
consumer_id = ConsumerId}, State0) ->
346+
{State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [],
347+
consumer_cancel),
348+
{State, Reply, Effects} = checkout(Meta, State0, State1, Effects1),
349+
update_smallest_raft_index(Idx, Reply, State, Effects);
347350
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
348351
consumer_id = {_, Pid} = ConsumerId},
349352
State0) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
372375
{State, _, Effects} = evaluate_limit(Index, false, State0,
373376
State1, Effects0),
374377
update_smallest_raft_index(Index, Reply, State, Effects);
375-
apply(_Meta, #garbage_collection{}, State) ->
376-
{State, ok, [{aux, garbage_collection}]};
378+
apply(#{index := Idx}, #garbage_collection{}, State) ->
379+
update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
377380
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
378381
#?MODULE{consumers = Cons0,
379382
cfg = #cfg{consumer_strategy = single_active},
@@ -506,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
506509
checkout(Meta, State0, State, Effects);
507510
apply(_, {nodedown, _Node}, State) ->
508511
{State, ok};
509-
apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
512+
apply(#{index := Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) ->
510513
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
511514
purge_node(Meta, Node, S, E)
512515
end, {State0, []}, Nodes),
513-
{State, ok, Effects};
514-
apply(Meta, #update_config{config = Conf}, State) ->
515-
checkout(Meta, State, update_config(Conf, State), []);
516+
update_smallest_raft_index(Idx, ok, State, Effects);
517+
apply(#{index := Idx} = Meta, #update_config{config = Conf}, State0) ->
518+
{State, Reply, Effects} = checkout(Meta, State0, update_config(Conf, State0), []),
519+
update_smallest_raft_index(Idx, Reply, State, Effects);
516520
apply(_Meta, {machine_version, 0, 1}, V0State) ->
517521
State = convert_v0_to_v1(V0State),
518522
{State, ok, []};
@@ -1750,25 +1754,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17501754
messages = Messages0,
17511755
consumers = Cons0} = InitState) ->
17521756
case priority_queue:out(SQ0) of
1753-
{{value, ConsumerId}, SQ1} ->
1757+
{{value, ConsumerId}, SQ1}
1758+
when is_map_key(ConsumerId, Cons0) ->
17541759
case take_next_msg(InitState) of
17551760
{ConsumerMsg, State0} ->
17561761
%% there are consumers waiting to be serviced
17571762
%% process consumer checkout
1758-
case maps:find(ConsumerId, Cons0) of
1759-
{ok, #consumer{credit = 0}} ->
1763+
case maps:get(ConsumerId, Cons0) of
1764+
#consumer{credit = 0} ->
17601765
%% no credit but was still on queue
17611766
%% can happen when draining
17621767
%% recurse without consumer on queue
17631768
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1764-
{ok, #consumer{status = cancelled}} ->
1769+
#consumer{status = cancelled} ->
17651770
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1766-
{ok, #consumer{status = suspected_down}} ->
1771+
#consumer{status = suspected_down} ->
17671772
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1768-
{ok, #consumer{checked_out = Checked0,
1769-
next_msg_id = Next,
1770-
credit = Credit,
1771-
delivery_count = DelCnt} = Con0} ->
1773+
#consumer{checked_out = Checked0,
1774+
next_msg_id = Next,
1775+
credit = Credit,
1776+
delivery_count = DelCnt} = Con0 ->
17721777
Checked = maps:put(Next, ConsumerMsg, Checked0),
17731778
Con = Con0#consumer{checked_out = Checked,
17741779
next_msg_id = Next + 1,
@@ -1795,14 +1800,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17951800
add_bytes_checkout(Header, State1)),
17961801
M}
17971802
end,
1798-
{success, ConsumerId, Next, Msg, State};
1799-
error ->
1800-
%% consumer did not exist but was queued, recurse
1801-
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
1803+
{success, ConsumerId, Next, Msg, State}
18021804
end;
18031805
empty ->
18041806
{nochange, InitState}
18051807
end;
1808+
{{value, _ConsumerId}, SQ1} ->
1809+
%% consumer did not exist but was queued, recurse
1810+
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
18061811
{empty, _} ->
18071812
case lqueue:len(Messages0) of
18081813
0 -> {nochange, InitState};

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,18 @@ return_auto_checked_out_test(_) ->
386386
Effects),
387387
ok.
388388

389+
cancelled_checkout_empty_queue_test(_) ->
390+
Cid = {<<"cid">>, self()},
391+
{State1, _} = check_auto(Cid, 2, test_init(test)),
392+
% cancelled checkout should clear out service_queue also, else we'd get a
393+
% build up of these
394+
{State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
395+
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
396+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
397+
ct:pal("Effs: ~p", [Effects]),
398+
?ASSERT_EFF({release_cursor, _, _}, Effects),
399+
ok.
400+
389401
cancelled_checkout_out_test(_) ->
390402
Cid = {<<"cid">>, self()},
391403
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -395,6 +407,7 @@ cancelled_checkout_out_test(_) ->
395407
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
396408
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
397409
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
410+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
398411

399412
{State3, {dequeue, empty}} =
400413
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),

0 commit comments

Comments
 (0)