Skip to content

Commit 9517d30

Browse files
SimonUngemergify[bot]
authored andcommitted
See #7389. Only one tick process per QQ
(cherry picked from commit 9363648)
1 parent 625553e commit 9517d30

File tree

2 files changed

+46
-34
lines changed

2 files changed

+46
-34
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -895,8 +895,7 @@ tick(Ts, #?MODULE{cfg = #cfg{name = _Name,
895895
true ->
896896
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
897897
false ->
898-
[{mod_call, rabbit_quorum_queue,
899-
handle_tick, [QName, overview(State), all_nodes(State)]}]
898+
[{aux, {handle_tick, [QName, overview(State), all_nodes(State)]}}]
900899
end.
901900

902901
-spec overview(state()) -> map().
@@ -979,7 +978,7 @@ which_module(3) -> ?MODULE.
979978
last_decorators_state :: term(),
980979
capacity :: term(),
981980
gc = #aux_gc{} :: #aux_gc{},
982-
unused,
981+
tick_pid,
983982
unused2}).
984983

985984
init_aux(Name) when is_atom(Name) ->
@@ -1029,6 +1028,18 @@ handle_aux(leader, cast, {#return{msg_ids = MsgIds,
10291028
_ ->
10301029
{no_reply, Aux0, Log0}
10311030
end;
1031+
handle_aux(leader, _, {handle_tick, Args},
1032+
#?AUX{tick_pid = Pid} = Aux, Log, _) ->
1033+
NewPid =
1034+
case process_is_alive(Pid) of
1035+
false ->
1036+
%% No active TICK pid
1037+
spawn(rabbit_quorum_queue, handle_tick, Args);
1038+
true ->
1039+
%% Active TICK pid, do nothing
1040+
Pid
1041+
end,
1042+
{no_reply, Aux#?AUX{tick_pid = NewPid}, Log};
10321043
handle_aux(_, _, {get_checked_out, ConsumerId, MsgIds},
10331044
Aux0, Log0, #?MODULE{cfg = #cfg{},
10341045
consumers = Consumers}) ->
@@ -1151,6 +1162,10 @@ force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
11511162
AuxState
11521163
end.
11531164

1165+
process_is_alive(Pid) when is_pid(Pid) ->
1166+
is_process_alive(Pid);
1167+
process_is_alive(_) ->
1168+
false.
11541169
%%% Queries
11551170

11561171
query_messages_ready(State) ->

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -608,18 +608,18 @@ tick_test(C) ->
608608
{S3, {_, _}} = deq(C, 4, Cid2, unsettled, Msg2, S2),
609609
{S4, _, _} = apply(meta(C, 5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
610610

611-
[{mod_call, rabbit_quorum_queue, handle_tick,
612-
[#resource{},
613-
#{config := #{name := ?FUNCTION_NAME},
614-
num_consumers := 1,
615-
num_checked_out := 1,
616-
num_ready_messages := 1,
617-
num_messages := 2,
618-
enqueue_message_bytes := 3,
619-
checkout_message_bytes := 3,
620-
num_discarded := _Discards},
621-
[_Node]
622-
]}] = rabbit_fifo:tick(1, S4),
611+
[{aux, {handle_tick,
612+
[#resource{},
613+
#{config := #{name := ?FUNCTION_NAME},
614+
num_consumers := 1,
615+
num_checked_out := 1,
616+
num_ready_messages := 1,
617+
num_messages := 2,
618+
enqueue_message_bytes := 3,
619+
checkout_message_bytes := 3,
620+
num_discarded := _Discards},
621+
[_Node]
622+
]}}] = rabbit_fifo:tick(1, S4),
623623
ok.
624624

625625

@@ -1562,10 +1562,10 @@ purge_nodes_test(C) ->
15621562
{down, EnqPid, noconnection},
15631563
State3),
15641564
?assertMatch(
1565-
[{mod_call, rabbit_quorum_queue, handle_tick,
1565+
[{aux, {handle_tick,
15661566
[#resource{}, _Metrics,
15671567
[ThisNode, Node]
1568-
]}] , rabbit_fifo:tick(1, State4)),
1568+
]}}] , rabbit_fifo:tick(1, State4)),
15691569
%% assert there are both enqueuers and consumers
15701570
{State, _, _} = apply(meta(C, 5),
15711571
rabbit_fifo:make_purge_nodes([Node]),
@@ -1578,10 +1578,10 @@ purge_nodes_test(C) ->
15781578
?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
15791579
State),
15801580
?assertMatch(
1581-
[{mod_call, rabbit_quorum_queue, handle_tick,
1581+
[{aux, {handle_tick,
15821582
[#resource{}, _Metrics,
15831583
[ThisNode]
1584-
]}] , rabbit_fifo:tick(1, State)),
1584+
]}}] , rabbit_fifo:tick(1, State)),
15851585
ok.
15861586

15871587
meta(Config, Idx) ->
@@ -1765,21 +1765,21 @@ queue_ttl_test(C) ->
17651765
expires => 1000},
17661766
S0 = rabbit_fifo:init(Conf),
17671767
Now = 1500,
1768-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
1768+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S0),
17691769
%% this should delete the queue
17701770
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
17711771
= rabbit_fifo:tick(Now + 1000, S0),
17721772
%% adding a consumer should not ever trigger deletion
17731773
Cid = {<<"cid1">>, self()},
17741774
{S1, _} = check_auto(C, Cid, 1, S0),
1775-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
1776-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
1775+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S1),
1776+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1),
17771777
%% cancelling the consumer should then
17781778
{S2, _, _} = apply(meta(C, 2, Now),
17791779
rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
17801780
%% last_active should have been reset when consumer was cancelled
17811781
%% last_active = 2500
1782-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
1782+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2),
17831783
%% but now it should be deleted
17841784
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
17851785
= rabbit_fifo:tick(Now + 2500, S2),
@@ -1789,7 +1789,7 @@ queue_ttl_test(C) ->
17891789
{down, self(), noconnection}, S1),
17901790
%% last_active should have been reset when consumer was cancelled
17911791
%% last_active = 2500
1792-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
1792+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2D),
17931793
%% but now it should be deleted
17941794
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
17951795
= rabbit_fifo:tick(Now + 2500, S2D),
@@ -1800,7 +1800,7 @@ queue_ttl_test(C) ->
18001800
rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
18011801
S0),
18021802

1803-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq),
1803+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1Deq),
18041804
%% but now it should be deleted
18051805
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
18061806
= rabbit_fifo:tick(Now + 2500, S1Deq),
@@ -1818,11 +1818,10 @@ queue_ttl_test(C) ->
18181818
{wrap_reply, {dequeue, {MsgId, _}, _}}}] = Fun2([Msg]),
18191819
{E3, _, _} = apply(meta(C, 3, Now + 1000),
18201820
rabbit_fifo:make_settle(Deq, [MsgId]), E2),
1821-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3),
1821+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1500, E3),
18221822
%% but now it should be deleted
18231823
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
18241824
= rabbit_fifo:tick(Now + 3000, E3),
1825-
18261825
ok.
18271826

18281827
queue_ttl_with_single_active_consumer_test(C) ->
@@ -1834,35 +1833,33 @@ queue_ttl_with_single_active_consumer_test(C) ->
18341833
single_active_consumer_on => true},
18351834
S0 = rabbit_fifo:init(Conf),
18361835
Now = 1500,
1837-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
1836+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S0),
18381837
%% this should delete the queue
18391838
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
18401839
= rabbit_fifo:tick(Now + 1000, S0),
18411840
%% adding a consumer should not ever trigger deletion
18421841
Cid = {<<"cid1">>, self()},
18431842
{S1, _} = check_auto(C, Cid, 1, S0),
1844-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
1845-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
1843+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now, S1),
1844+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S1),
18461845
%% cancelling the consumer should then
18471846
{S2, _, _} = apply(meta(C, 2, Now),
18481847
rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
18491848
%% last_active should have been reset when consumer was cancelled
18501849
%% last_active = 2500
1851-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
1850+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2),
18521851
%% but now it should be deleted
18531852
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
18541853
= rabbit_fifo:tick(Now + 2500, S2),
1855-
18561854
%% Same for downs
18571855
{S2D, _, _} = apply(meta(C, 2, Now),
18581856
{down, self(), noconnection}, S1),
18591857
%% last_active should have been reset when consumer was cancelled
18601858
%% last_active = 2500
1861-
[{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
1859+
[{aux, {handle_tick, [_, _, _]}}] = rabbit_fifo:tick(Now + 1000, S2D),
18621860
%% but now it should be deleted
18631861
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
18641862
= rabbit_fifo:tick(Now + 2500, S2D),
1865-
18661863
ok.
18671864

18681865
query_peek_test(C) ->

0 commit comments

Comments
 (0)