Skip to content

Commit b914cdb

Browse files
authored
Merge pull request #1800 from rabbitmq/qq-ghost-consumer-fix
Ensure quorum queue consumers are cleaned up
2 parents df52e8d + 0a21df1 commit b914cdb

File tree

3 files changed

+29
-49
lines changed

3 files changed

+29
-49
lines changed

src/rabbit_fifo.erl

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@
4545
dehydrate_state/1
4646
]).
4747

48-
-ifdef(TEST).
49-
-export([
50-
metrics_handler/1
51-
]).
52-
-endif.
53-
5448
-type raw_msg() :: term().
5549
%% The raw message. It is opaque to rabbit_fifo.
5650

@@ -153,6 +147,7 @@
153147

154148
-record(state,
155149
{name :: atom(),
150+
queue_resource :: rabbit_types:r('queue'),
156151
shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(),
157152
% unassigned messages
158153
messages = #{} :: #{msg_in_id() => indexed_msg()},
@@ -183,9 +178,7 @@
183178
% needs to be part of snapshot
184179
service_queue = queue:new() :: queue:queue(consumer_id()),
185180
dead_letter_handler :: maybe(applied_mfa()),
186-
cancel_consumer_handler :: maybe(applied_mfa()),
187181
become_leader_handler :: maybe(applied_mfa()),
188-
metrics_handler :: maybe(applied_mfa()),
189182
%% This is a special field that is only used for snapshots
190183
%% It represents the number of queued messages at the time the
191184
%% dehydrated snapshot state was cached.
@@ -205,8 +198,6 @@
205198
-type config() :: #{name := atom(),
206199
dead_letter_handler => applied_mfa(),
207200
become_leader_handler => applied_mfa(),
208-
cancel_consumer_handler => applied_mfa(),
209-
metrics_handler => applied_mfa(),
210201
shadow_copy_interval => non_neg_integer()}.
211202

212203
-export_type([protocol/0,
@@ -223,19 +214,17 @@
223214
config/0]).
224215

225216
-spec init(config()) -> {state(), ra_machine:effects()}.
226-
init(#{name := Name} = Conf) ->
227-
update_state(Conf, #state{name = Name}).
217+
init(#{name := Name,
218+
queue_resource := Resource} = Conf) ->
219+
update_state(Conf, #state{name = Name,
220+
queue_resource = Resource}).
228221

229222
update_state(Conf, State) ->
230223
DLH = maps:get(dead_letter_handler, Conf, undefined),
231-
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
232224
BLH = maps:get(become_leader_handler, Conf, undefined),
233-
MH = maps:get(metrics_handler, Conf, undefined),
234225
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
235226
State#state{dead_letter_handler = DLH,
236-
cancel_consumer_handler = CCH,
237227
become_leader_handler = BLH,
238-
metrics_handler = MH,
239228
shadow_copy_interval = SHI}.
240229

241230
% msg_ids are scoped per consumer
@@ -504,21 +493,17 @@ state_enter(_, _) ->
504493

505494
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
506495
tick(_Ts, #state{name = Name,
496+
queue_resource = QName,
507497
messages = Messages,
508498
ra_indexes = Indexes,
509-
metrics_handler = MH,
510499
consumers = Cons} = State) ->
511500
Metrics = {Name,
512501
maps:size(Messages), % Ready
513502
num_checked_out(State), % checked out
514503
rabbit_fifo_index:size(Indexes), %% Total
515504
maps:size(Cons)}, % Consumers
516-
case MH of
517-
undefined ->
518-
[{aux, emit}];
519-
{Mod, Fun, Args} ->
520-
[{mod_call, Mod, Fun, Args ++ [Metrics]}, {aux, emit}]
521-
end.
505+
[{mod_call, rabbit_quorum_queue,
506+
update_metrics, [QName, Metrics]}, {aux, emit}].
522507

523508
-spec overview(state()) -> map().
524509
overview(#state{consumers = Cons,
@@ -627,11 +612,11 @@ num_checked_out(#state{consumers = Cons}) ->
627612
end, 0, maps:values(Cons)).
628613

629614
cancel_consumer(ConsumerId,
630-
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
615+
{Effects0, #state{consumers = C0} = S0}) ->
631616
case maps:take(ConsumerId, C0) of
632617
{#consumer{checked_out = Checked0}, Cons} ->
633618
S = return_all(S0, Checked0),
634-
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
619+
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
635620
case maps:size(Cons) of
636621
0 ->
637622
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
@@ -787,13 +772,9 @@ dead_letter_effects(Discarded,
787772
end, [], Discarded),
788773
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
789774

790-
cancel_consumer_effects(_, _, #state{cancel_consumer_handler = undefined},
791-
Effects) ->
792-
Effects;
793-
cancel_consumer_effects(Pid, Name,
794-
#state{cancel_consumer_handler = {Mod, Fun, Args}},
795-
Effects) ->
796-
[{mod_call, Mod, Fun, Args ++ [Pid, Name]} | Effects].
775+
cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
776+
[{mod_call, rabbit_quorum_queue,
777+
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
797778

798779
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
799780
#state{ra_indexes = Indexes,
@@ -1098,11 +1079,8 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
10981079

10991080
test_init(Name) ->
11001081
init(#{name => Name,
1101-
shadow_copy_interval => 0,
1102-
metrics_handler => {?MODULE, metrics_handler, []}}).
1103-
1104-
metrics_handler(_) ->
1105-
ok.
1082+
queue_resource => queue_resource,
1083+
shadow_copy_interval => 0}).
11061084

11071085
enq_enq_checkout_test() ->
11081086
Cid = {<<"enq_enq_checkout_test">>, self()},
@@ -1347,7 +1325,7 @@ down_with_noproc_consumer_returns_unsettled_test() ->
13471325
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
13481326
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
13491327
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
1350-
{State2, [_, _], _} = apply(meta(3), {down, Pid, noproc}, [], State1),
1328+
{State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1),
13511329
{_State, Effects} = check(Cid, 4, State2),
13521330
?ASSERT_EFF({monitor, process, _}, Effects),
13531331
ok.
@@ -1425,6 +1403,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
14251403
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
14261404
Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
14271405
State00 = init(#{name => test,
1406+
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
14281407
dead_letter_handler =>
14291408
{somemod, somefun, [somearg]}}),
14301409
{State0, [_, _]} = enq(1, 1, first, State00),
@@ -1447,14 +1426,12 @@ tick_test() ->
14471426
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
14481427
{S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3),
14491428

1450-
[{mod_call, _, _, [{test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
1429+
[{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
14511430
ok.
14521431

14531432
enq_deq_snapshot_recover_test() ->
14541433
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
14551434
Cid = {Tag, self()},
1456-
% OthPid = spawn(fun () -> ok end),
1457-
% Oth = {<<"oth">>, OthPid},
14581435
Commands = [
14591436
{enqueue, self(), 1, one},
14601437
{enqueue, self(), 2, two},
@@ -1690,6 +1667,7 @@ duplicate_delivery_test() ->
16901667

16911668
state_enter_test() ->
16921669
S0 = init(#{name => the_name,
1670+
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
16931671
become_leader_handler => {m, f, [a]}}),
16941672
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
16951673
ok.
@@ -1789,7 +1767,9 @@ run_log(InitState, Entries) ->
17891767
aux_test() ->
17901768
_ = ra_machine_ets:start_link(),
17911769
Aux0 = init_aux(aux_test),
1792-
MacState = init(#{name => aux_test}),
1770+
MacState = init(#{name => aux_test,
1771+
queue_resource =>
1772+
rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
17931773
Log = undefined,
17941774
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
17951775
Log, MacState),

src/rabbit_quorum_queue.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
-export([dead_letter_publish/4]).
2727
-export([queue_name/1]).
2828
-export([cluster_state/1, status/2]).
29-
-export([cancel_consumer_handler/3, cancel_consumer/3]).
29+
-export([cancel_consumer_handler/2, cancel_consumer/3]).
3030
-export([become_leader/2, update_metrics/2]).
3131
-export([rpc_delete_metrics/1]).
3232
-export([format/1]).
@@ -150,20 +150,17 @@ declare(#amqqueue{name = QName,
150150
Ex
151151
end.
152152

153-
154-
155153
ra_machine(Q) ->
156154
{module, rabbit_fifo, ra_machine_config(Q)}.
157155

158156
ra_machine_config(Q = #amqqueue{name = QName}) ->
159157
#{dead_letter_handler => dlx_mfa(Q),
160-
cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
158+
queue_resource => QName,
161159
become_leader_handler => {?MODULE, become_leader, [QName]},
162160
metrics_handler => {?MODULE, update_metrics, [QName]}}.
163161

164-
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
162+
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
165163
Node = node(ChPid),
166-
% QName = queue_name(Name),
167164
case Node == node() of
168165
true -> cancel_consumer(QName, ChPid, ConsumerTag);
169166
false ->

test/quorum_queue_SUITE.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1507,7 +1507,10 @@ basic_cancel(Config) ->
15071507
wait_for_messages_pending_ack(Servers, RaName, 1),
15081508
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
15091509
wait_for_messages_ready(Servers, RaName, 1),
1510-
wait_for_messages_pending_ack(Servers, RaName, 0)
1510+
wait_for_messages_pending_ack(Servers, RaName, 0),
1511+
[] = rpc:call(Server, ets, tab2list, [consumer_created])
1512+
after 5000 ->
1513+
exit(basic_deliver_timeout)
15111514
end.
15121515

15131516
purge(Config) ->

0 commit comments

Comments
 (0)