Skip to content

Commit 300757d

Browse files
authored
Merge pull request #3466 from rabbitmq/mergify/bp/v3.8.x/pr-3460
Quorum Queue consumer cancellation fixes (backport #3448) (backport #3460)
2 parents d5f4408 + 8433ef5 commit 300757d

File tree

2 files changed

+41
-23
lines changed

2 files changed

+41
-23
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,13 @@ apply(#{index := Index,
340340
{State, Reply, Effects}
341341
end
342342
end;
343-
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
344-
{State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
345-
consumer_cancel),
346-
checkout(Meta, State0, State, Effects);
343+
apply(#{index := Idx} = Meta,
344+
#checkout{spec = cancel,
345+
consumer_id = ConsumerId}, State0) ->
346+
{State1, Effects1} = cancel_consumer(Meta, ConsumerId, State0, [],
347+
consumer_cancel),
348+
{State, Reply, Effects} = checkout(Meta, State0, State1, Effects1),
349+
update_smallest_raft_index(Idx, Reply, State, Effects);
347350
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
348351
consumer_id = {_, Pid} = ConsumerId},
349352
State0) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
372375
{State, _, Effects} = evaluate_limit(Index, false, State0,
373376
State1, Effects0),
374377
update_smallest_raft_index(Index, Reply, State, Effects);
375-
apply(_Meta, #garbage_collection{}, State) ->
376-
{State, ok, [{aux, garbage_collection}]};
378+
apply(#{index := Idx}, #garbage_collection{}, State) ->
379+
update_smallest_raft_index(Idx, ok, State, [{aux, garbage_collection}]);
377380
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
378381
#?MODULE{consumers = Cons0,
379382
cfg = #cfg{consumer_strategy = single_active},
@@ -506,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
506509
checkout(Meta, State0, State, Effects);
507510
apply(_, {nodedown, _Node}, State) ->
508511
{State, ok};
509-
apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
512+
apply(#{index := Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) ->
510513
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
511514
purge_node(Meta, Node, S, E)
512515
end, {State0, []}, Nodes),
513-
{State, ok, Effects};
514-
apply(Meta, #update_config{config = Conf}, State) ->
515-
checkout(Meta, State, update_config(Conf, State), []);
516+
update_smallest_raft_index(Idx, ok, State, Effects);
517+
apply(#{index := Idx} = Meta, #update_config{config = Conf}, State0) ->
518+
{State, Reply, Effects} = checkout(Meta, State0, update_config(Conf, State0), []),
519+
update_smallest_raft_index(Idx, Reply, State, Effects);
516520
apply(_Meta, {machine_version, 0, 1}, V0State) ->
517521
State = convert_v0_to_v1(V0State),
518522
{State, ok, []};
@@ -1735,25 +1739,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17351739
messages = Messages0,
17361740
consumers = Cons0} = InitState) ->
17371741
case priority_queue:out(SQ0) of
1738-
{{value, ConsumerId}, SQ1} ->
1742+
{{value, ConsumerId}, SQ1}
1743+
when is_map_key(ConsumerId, Cons0) ->
17391744
case take_next_msg(InitState) of
17401745
{ConsumerMsg, State0} ->
17411746
%% there are consumers waiting to be serviced
17421747
%% process consumer checkout
1743-
case maps:find(ConsumerId, Cons0) of
1744-
{ok, #consumer{credit = 0}} ->
1748+
case maps:get(ConsumerId, Cons0) of
1749+
#consumer{credit = 0} ->
17451750
%% no credit but was still on queue
17461751
%% can happen when draining
17471752
%% recurse without consumer on queue
17481753
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1749-
{ok, #consumer{status = cancelled}} ->
1754+
#consumer{status = cancelled} ->
17501755
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1751-
{ok, #consumer{status = suspected_down}} ->
1756+
#consumer{status = suspected_down} ->
17521757
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
1753-
{ok, #consumer{checked_out = Checked0,
1754-
next_msg_id = Next,
1755-
credit = Credit,
1756-
delivery_count = DelCnt} = Con0} ->
1758+
#consumer{checked_out = Checked0,
1759+
next_msg_id = Next,
1760+
credit = Credit,
1761+
delivery_count = DelCnt} = Con0 ->
17571762
Checked = maps:put(Next, ConsumerMsg, Checked0),
17581763
Con = Con0#consumer{checked_out = Checked,
17591764
next_msg_id = Next + 1,
@@ -1780,14 +1785,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17801785
add_bytes_checkout(Header, State1)),
17811786
M}
17821787
end,
1783-
{success, ConsumerId, Next, Msg, State};
1784-
error ->
1785-
%% consumer did not exist but was queued, recurse
1786-
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
1788+
{success, ConsumerId, Next, Msg, State}
17871789
end;
17881790
empty ->
17891791
{nochange, InitState}
17901792
end;
1793+
{{value, _ConsumerId}, SQ1} ->
1794+
%% consumer did not exist but was queued, recurse
1795+
checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
17911796
{empty, _} ->
17921797
case lqueue:len(Messages0) of
17931798
0 -> {nochange, InitState};

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,18 @@ return_auto_checked_out_test(_) ->
392392
Effects),
393393
ok.
394394

395+
cancelled_checkout_empty_queue_test(_) ->
396+
Cid = {<<"cid">>, self()},
397+
{State1, _} = check_auto(Cid, 2, test_init(test)),
398+
% cancelled checkout should clear out service_queue also, else we'd get a
399+
% build up of these
400+
{State2, _, Effects} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
401+
?assertEqual(0, map_size(State2#rabbit_fifo.consumers)),
402+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
403+
ct:pal("Effs: ~p", [Effects]),
404+
?ASSERT_EFF({release_cursor, _, _}, Effects),
405+
ok.
406+
395407
cancelled_checkout_out_test(_) ->
396408
Cid = {<<"cid">>, self()},
397409
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -401,6 +413,7 @@ cancelled_checkout_out_test(_) ->
401413
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
402414
?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
403415
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
416+
?assertEqual(0, priority_queue:len(State2#rabbit_fifo.service_queue)),
404417

405418
{State3, {dequeue, empty}} =
406419
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, settled}, #{}), State2),

0 commit comments

Comments
 (0)