Skip to content

Commit 0f1f27c

Browse files
committed
Qq: adjust checkpointing algo to something more like
it was in 3.13.x. Also add a force_checkpoint aux command that the purge operation emits - this can also be used to try to force a checkpoint
1 parent cabe873 commit 0f1f27c

File tree

7 files changed

+165
-115
lines changed

7 files changed

+165
-115
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ rabbitmq_suite(
714714
"@gen_batch_server//:erlang_app",
715715
"@meck//:erlang_app",
716716
"@ra//:erlang_app",
717+
"//deps/rabbitmq_ct_helpers:erlang_app",
717718
],
718719
)
719720

deps/rabbit/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
13291329
outs = ["test/rabbit_fifo_int_SUITE.beam"],
13301330
app_name = "rabbit",
13311331
erlc_opts = "//:test_erlc_opts",
1332-
deps = ["//deps/rabbit_common:erlang_app"],
1332+
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
13331333
)
13341334
erlang_bytecode(
13351335
name = "rabbit_fifo_prop_SUITE_beam_files",

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 100 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ init(#{name := Name,
192192
update_config(Conf, State) ->
193193
DLH = maps:get(dead_letter_handler, Conf, undefined),
194194
BLH = maps:get(become_leader_handler, Conf, undefined),
195-
RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
196195
Overflow = maps:get(overflow_strategy, Conf, drop_head),
197196
MaxLength = maps:get(max_length, Conf, undefined),
198197
MaxBytes = maps:get(max_bytes, Conf, undefined),
@@ -206,11 +205,9 @@ update_config(Conf, State) ->
206205
competing
207206
end,
208207
Cfg = State#?STATE.cfg,
209-
RCISpec = {RCI, RCI},
210208

211209
LastActive = maps:get(created, Conf, undefined),
212-
State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
213-
dead_letter_handler = DLH,
210+
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
214211
become_leader_handler = BLH,
215212
overflow_strategy = Overflow,
216213
max_length = MaxLength,
@@ -485,7 +482,7 @@ apply(#{index := Index}, #purge{},
485482
returns = lqueue:new(),
486483
msg_bytes_enqueue = 0
487484
},
488-
Effects0 = [garbage_collection],
485+
Effects0 = [{aux, force_checkpoint}, garbage_collection],
489486
Reply = {purge, NumReady},
490487
{State, _, Effects} = evaluate_limit(Index, false, State0,
491488
State1, Effects0),
@@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta,
580577
Effects = [{monitor, node, Node} | Effects1],
581578
checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
582579
last_active = Ts}, Effects);
583-
apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) ->
584-
{State1, Effects1} = activate_next_consumer(
585-
handle_down(Meta, Pid, State0)),
580+
apply(Meta, {down, Pid, _Info}, State0) ->
581+
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
586582
checkout(Meta, State0, State1, Effects1);
587583
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
588584
enqueuers = Enqs0,
@@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
670666
end, Returns0)),
671667

672668
Messages = rabbit_fifo_q:from_lqueue(Messages0),
673-
#?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
669+
Cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
670+
#?STATE{cfg = Cfg#cfg{unused_1 = ?NIL},
674671
messages = Messages,
675672
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
676673
returns = Returns,
@@ -813,8 +810,7 @@ state_enter0(_, _, Effects) ->
813810
Effects.
814811

815812
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
816-
tick(Ts, #?STATE{cfg = #cfg{name = _Name,
817-
resource = QName}} = State) ->
813+
tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
818814
case is_expired(Ts, State) of
819815
true ->
820816
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
@@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons,
835831
waiting_consumers = WaitingConsumers} = State) ->
836832
Conf = #{name => Cfg#cfg.name,
837833
resource => Cfg#cfg.resource,
838-
release_cursor_interval => Cfg#cfg.release_cursor_interval,
839834
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
840835
max_length => Cfg#cfg.max_length,
841836
max_bytes => Cfg#cfg.max_bytes,
@@ -908,9 +903,10 @@ which_module(4) -> ?MODULE.
908903

909904
-record(checkpoint, {index :: ra:index(),
910905
timestamp :: milliseconds(),
911-
enqueue_count :: non_neg_integer(),
912906
smallest_index :: undefined | ra:index(),
913-
messages_total :: non_neg_integer()}).
907+
messages_total :: non_neg_integer(),
908+
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
909+
unused_1 = ?NIL}).
914910
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
915911
-record(aux, {name :: atom(),
916912
capacity :: term(),
@@ -934,8 +930,8 @@ init_aux(Name) when is_atom(Name) ->
934930
capacity = {inactive, Now, 1, 1.0},
935931
last_checkpoint = #checkpoint{index = 0,
936932
timestamp = erlang:system_time(millisecond),
937-
enqueue_count = 0,
938-
messages_total = 0}}.
933+
messages_total = 0,
934+
unused_1 = ?NIL}}.
939935

940936
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
941937
capacity = Cap,
@@ -950,6 +946,35 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
950946
Name = element(2, AuxV2),
951947
AuxV3 = init_aux(Name),
952948
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
949+
handle_aux(leader, cast, eval,
950+
#?AUX{last_decorators_state = LastDec,
951+
last_checkpoint = Check0} = Aux0,
952+
RaAux) ->
953+
#?STATE{cfg = #cfg{resource = QName}} = MacState =
954+
ra_aux:machine_state(RaAux),
955+
956+
Ts = erlang:system_time(millisecond),
957+
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),
958+
959+
%% this is called after each batch of commands have been applied
960+
%% set timer for message expire
961+
%% should really be the last applied index ts but this will have to do
962+
Effects1 = timer_effect(Ts, MacState, Effects0),
963+
case query_notify_decorators_info(MacState) of
964+
LastDec ->
965+
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
966+
{MaxActivePriority, IsEmpty} = NewLast ->
967+
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
968+
| Effects1],
969+
{no_reply, Aux0#?AUX{last_checkpoint = Check,
970+
last_decorators_state = NewLast}, RaAux, Effects}
971+
end;
972+
handle_aux(_RaftState, cast, eval,
973+
#?AUX{last_checkpoint = Check0} = Aux0,
974+
RaAux) ->
975+
Ts = erlang:system_time(millisecond),
976+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
977+
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
953978
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
954979
consumer_key = Key} = Ret, Corr, Pid},
955980
Aux0, RaAux0) ->
@@ -959,18 +984,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
959984
case find_consumer(Key, Consumers) of
960985
{ConsumerKey, #consumer{checked_out = Checked}} ->
961986
{RaAux, ToReturn} =
962-
maps:fold(
963-
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
964-
%% it is possible this is not found if the consumer
965-
%% crashed and the message got removed
966-
case ra_aux:log_fetch(Idx, RA0) of
967-
{{_Term, _Meta, Cmd}, RA} ->
968-
Msg = get_msg(Cmd),
969-
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
970-
{undefined, RA} ->
971-
{RA, Acc}
972-
end
973-
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
987+
maps:fold(
988+
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
989+
%% it is possible this is not found if the consumer
990+
%% crashed and the message got removed
991+
case ra_aux:log_fetch(Idx, RA0) of
992+
{{_Term, _Meta, Cmd}, RA} ->
993+
Msg = get_msg(Cmd),
994+
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
995+
{undefined, RA} ->
996+
{RA, Acc}
997+
end
998+
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
974999

9751000
Appends = make_requeue(ConsumerKey, {notify, Corr, Pid},
9761001
lists:sort(ToReturn), []),
@@ -1020,35 +1045,6 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
10201045
_ ->
10211046
{reply, {error, consumer_not_found}, Aux0, RaAux0}
10221047
end;
1023-
handle_aux(leader, cast, eval,
1024-
#?AUX{last_decorators_state = LastDec,
1025-
last_checkpoint = Check0} = Aux0,
1026-
RaAux) ->
1027-
#?STATE{cfg = #cfg{resource = QName}} = MacState =
1028-
ra_aux:machine_state(RaAux),
1029-
1030-
Ts = erlang:system_time(millisecond),
1031-
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux),
1032-
1033-
%% this is called after each batch of commands have been applied
1034-
%% set timer for message expire
1035-
%% should really be the last applied index ts but this will have to do
1036-
Effects1 = timer_effect(Ts, MacState, Effects0),
1037-
case query_notify_decorators_info(MacState) of
1038-
LastDec ->
1039-
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
1040-
{MaxActivePriority, IsEmpty} = NewLast ->
1041-
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
1042-
| Effects1],
1043-
{no_reply, Aux0#?AUX{last_checkpoint = Check,
1044-
last_decorators_state = NewLast}, RaAux, Effects}
1045-
end;
1046-
handle_aux(_RaftState, cast, eval,
1047-
#?AUX{last_checkpoint = Check0} = Aux0,
1048-
RaAux) ->
1049-
Ts = erlang:system_time(millisecond),
1050-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux),
1051-
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
10521048
handle_aux(_RaState, cast, Cmd, #?AUX{capacity = Use0} = Aux0, RaAux)
10531049
when Cmd == active orelse Cmd == inactive ->
10541050
{no_reply, Aux0#?AUX{capacity = update_use(Use0, Cmd)}, RaAux};
@@ -1107,6 +1103,11 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11071103
end;
11081104
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11091105
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
1106+
handle_aux(_RaState, _, force_checkpoint,
1107+
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
1108+
Ts = erlang:system_time(millisecond),
1109+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
1110+
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
11101111
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
11111112
#?STATE{dlx = DlxState,
11121113
cfg = #cfg{dead_letter_handler = DLH,
@@ -2639,8 +2640,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
26392640
end, Enqs, WaitingConsumers0).
26402641

26412642
is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
2642-
last_active = LastActive,
2643-
consumers = Consumers})
2643+
last_active = LastActive,
2644+
consumers = Consumers})
26442645
when is_number(LastActive) andalso is_number(Expires) ->
26452646
%% TODO: should it be active consumers?
26462647
Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
@@ -2845,53 +2846,53 @@ priority_tag(Msg) ->
28452846
lo
28462847
end.
28472848

2848-
-define(CHECK_ENQ_MIN_INTERVAL_MS, 500).
2849-
-define(CHECK_ENQ_MIN_INDEXES, 4096).
2850-
-define(CHECK_MIN_INTERVAL_MS, 5000).
2851-
-define(CHECK_MIN_INDEXES, 65456).
28522849

28532850
do_checkpoints(Ts,
28542851
#checkpoint{index = ChIdx,
28552852
timestamp = ChTime,
2856-
enqueue_count = ChEnqCnt,
28572853
smallest_index = LastSmallest,
2858-
messages_total = LastMsgsTot} = Check0, RaAux) ->
2854+
indexes = MinIndexes} = Check0, RaAux, Force) ->
28592855
LastAppliedIdx = ra_aux:last_applied(RaAux),
2860-
#?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux),
2856+
IndexesSince = LastAppliedIdx - ChIdx,
2857+
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
2858+
TimeSince = Ts - ChTime,
2859+
NewSmallest = case smallest_raft_index(MacState) of
2860+
undefined ->
2861+
LastAppliedIdx;
2862+
Smallest ->
2863+
Smallest
2864+
end,
28612865
MsgsTot = messages_total(MacState),
2862-
Mult = case MsgsTot > 200_000 of
2863-
true ->
2864-
min(4, MsgsTot div 100_000);
2865-
false ->
2866-
1
2867-
end,
2868-
Since = Ts - ChTime,
2869-
NewSmallest = case smallest_raft_index(MacState) of
2870-
undefined ->
2871-
LastAppliedIdx;
2872-
Smallest ->
2873-
Smallest
2874-
end,
2875-
{Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso
2876-
Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse
2877-
(LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso
2878-
Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse
2879-
(LastMsgsTot > 0 andalso MsgsTot == 0) of
2880-
true ->
2881-
%% take a checkpoint;
2882-
{#checkpoint{index = LastAppliedIdx,
2883-
timestamp = Ts,
2884-
enqueue_count = EnqCnt,
2885-
smallest_index = NewSmallest,
2886-
messages_total = MsgsTot},
2887-
[{checkpoint, LastAppliedIdx, MacState} |
2888-
release_cursor(LastSmallest, NewSmallest)]};
2889-
false ->
2890-
{Check0#checkpoint{smallest_index = NewSmallest},
2891-
release_cursor(LastSmallest, NewSmallest)}
2892-
end,
2893-
2894-
{Check, Effects}.
2866+
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
2867+
persistent_term:get(quorum_queue_checkpoint_config,
2868+
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
2869+
?CHECK_MAX_INDEXES}),
2870+
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
2871+
2872+
%% enough time has passed and enough indexes have been committed
2873+
case (IndexesSince > MinIndexes andalso
2874+
EnoughTimeHasPassed) orelse
2875+
%% the queue is empty and some commands have been
2876+
%% applied since the last checkpoint
2877+
(MsgsTot == 0 andalso
2878+
IndexesSince > CheckMinIndexes andalso
2879+
EnoughTimeHasPassed) orelse
2880+
Force of
2881+
true ->
2882+
%% take fewer checkpoints the more messages there are on queue
2883+
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
2884+
%% take a checkpoint;
2885+
{#checkpoint{index = LastAppliedIdx,
2886+
timestamp = Ts,
2887+
smallest_index = NewSmallest,
2888+
messages_total = MsgsTot,
2889+
indexes = NextIndexes},
2890+
[{checkpoint, LastAppliedIdx, MacState} |
2891+
release_cursor(LastSmallest, NewSmallest)]};
2892+
false ->
2893+
{Check0#checkpoint{smallest_index = NewSmallest},
2894+
release_cursor(LastSmallest, NewSmallest)}
2895+
end.
28952896

28962897
release_cursor(LastSmallest, Smallest)
28972898
when is_integer(LastSmallest) andalso

0 commit comments

Comments
 (0)