Skip to content

Commit c6aaa50

Browse files
Merge pull request #12023 from rabbitmq/mergify/bp/v4.0.x/pr-11964
QQ: checkpointing frequency improvements (backport #11964)
2 parents 4d003f9 + 212afbc commit c6aaa50

File tree

7 files changed

+165
-118
lines changed

7 files changed

+165
-118
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ rabbitmq_suite(
714714
"@gen_batch_server//:erlang_app",
715715
"@meck//:erlang_app",
716716
"@ra//:erlang_app",
717+
"//deps/rabbitmq_ct_helpers:erlang_app",
717718
],
718719
)
719720

deps/rabbit/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1332,7 +1332,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
13321332
outs = ["test/rabbit_fifo_int_SUITE.beam"],
13331333
app_name = "rabbit",
13341334
erlc_opts = "//:test_erlc_opts",
1335-
deps = ["//deps/rabbit_common:erlang_app"],
1335+
deps = ["//deps/rabbit_common:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
13361336
)
13371337
erlang_bytecode(
13381338
name = "rabbit_fifo_prop_SUITE_beam_files",

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 100 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ init(#{name := Name,
192192
update_config(Conf, State) ->
193193
DLH = maps:get(dead_letter_handler, Conf, undefined),
194194
BLH = maps:get(become_leader_handler, Conf, undefined),
195-
RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
196195
Overflow = maps:get(overflow_strategy, Conf, drop_head),
197196
MaxLength = maps:get(max_length, Conf, undefined),
198197
MaxBytes = maps:get(max_bytes, Conf, undefined),
@@ -206,11 +205,9 @@ update_config(Conf, State) ->
206205
competing
207206
end,
208207
Cfg = State#?STATE.cfg,
209-
RCISpec = {RCI, RCI},
210208

211209
LastActive = maps:get(created, Conf, undefined),
212-
State#?STATE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
213-
dead_letter_handler = DLH,
210+
State#?STATE{cfg = Cfg#cfg{dead_letter_handler = DLH,
214211
become_leader_handler = BLH,
215212
overflow_strategy = Overflow,
216213
max_length = MaxLength,
@@ -485,7 +482,7 @@ apply(#{index := Index}, #purge{},
485482
returns = lqueue:new(),
486483
msg_bytes_enqueue = 0
487484
},
488-
Effects0 = [garbage_collection],
485+
Effects0 = [{aux, force_checkpoint}, garbage_collection],
489486
Reply = {purge, NumReady},
490487
{State, _, Effects} = evaluate_limit(Index, false, State0,
491488
State1, Effects0),
@@ -580,9 +577,8 @@ apply(#{system_time := Ts} = Meta,
580577
Effects = [{monitor, node, Node} | Effects1],
581578
checkout(Meta, State0, State#?STATE{enqueuers = Enqs,
582579
last_active = Ts}, Effects);
583-
apply(#{index := _Idx} = Meta, {down, Pid, _Info}, State0) ->
584-
{State1, Effects1} = activate_next_consumer(
585-
handle_down(Meta, Pid, State0)),
580+
apply(Meta, {down, Pid, _Info}, State0) ->
581+
{State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)),
586582
checkout(Meta, State0, State1, Effects1);
587583
apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
588584
enqueuers = Enqs0,
@@ -670,7 +666,8 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
670666
end, Returns0)),
671667

672668
Messages = rabbit_fifo_q:from_lqueue(Messages0),
673-
#?STATE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
669+
Cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
670+
#?STATE{cfg = Cfg#cfg{unused_1 = ?NIL},
674671
messages = Messages,
675672
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
676673
returns = Returns,
@@ -813,8 +810,7 @@ state_enter0(_, _, Effects) ->
813810
Effects.
814811

815812
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
816-
tick(Ts, #?STATE{cfg = #cfg{name = _Name,
817-
resource = QName}} = State) ->
813+
tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
818814
case is_expired(Ts, State) of
819815
true ->
820816
[{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
@@ -835,7 +831,6 @@ overview(#?STATE{consumers = Cons,
835831
waiting_consumers = WaitingConsumers} = State) ->
836832
Conf = #{name => Cfg#cfg.name,
837833
resource => Cfg#cfg.resource,
838-
release_cursor_interval => Cfg#cfg.release_cursor_interval,
839834
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
840835
max_length => Cfg#cfg.max_length,
841836
max_bytes => Cfg#cfg.max_bytes,
@@ -908,9 +903,10 @@ which_module(4) -> ?MODULE.
908903

909904
-record(checkpoint, {index :: ra:index(),
910905
timestamp :: milliseconds(),
911-
enqueue_count :: non_neg_integer(),
912906
smallest_index :: undefined | ra:index(),
913-
messages_total :: non_neg_integer()}).
907+
messages_total :: non_neg_integer(),
908+
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
909+
unused_1 = ?NIL}).
914910
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
915911
-record(aux, {name :: atom(),
916912
capacity :: term(),
@@ -921,7 +917,6 @@ which_module(4) -> ?MODULE.
921917
gc = #aux_gc{} :: #aux_gc{},
922918
tick_pid :: undefined | pid(),
923919
cache = #{} :: map(),
924-
%% TODO: we need a state conversion for this
925920
last_checkpoint :: #checkpoint{}}).
926921

927922
init_aux(Name) when is_atom(Name) ->
@@ -934,8 +929,8 @@ init_aux(Name) when is_atom(Name) ->
934929
capacity = {inactive, Now, 1, 1.0},
935930
last_checkpoint = #checkpoint{index = 0,
936931
timestamp = erlang:system_time(millisecond),
937-
enqueue_count = 0,
938-
messages_total = 0}}.
932+
messages_total = 0,
933+
unused_1 = ?NIL}}.
939934

940935
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
941936
capacity = Cap,
@@ -950,6 +945,35 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
950945
Name = element(2, AuxV2),
951946
AuxV3 = init_aux(Name),
952947
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
948+
handle_aux(leader, cast, eval,
949+
#?AUX{last_decorators_state = LastDec,
950+
last_checkpoint = Check0} = Aux0,
951+
RaAux) ->
952+
#?STATE{cfg = #cfg{resource = QName}} = MacState =
953+
ra_aux:machine_state(RaAux),
954+
955+
Ts = erlang:system_time(millisecond),
956+
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),
957+
958+
%% this is called after each batch of commands have been applied
959+
%% set timer for message expire
960+
%% should really be the last applied index ts but this will have to do
961+
Effects1 = timer_effect(Ts, MacState, Effects0),
962+
case query_notify_decorators_info(MacState) of
963+
LastDec ->
964+
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
965+
{MaxActivePriority, IsEmpty} = NewLast ->
966+
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
967+
| Effects1],
968+
{no_reply, Aux0#?AUX{last_checkpoint = Check,
969+
last_decorators_state = NewLast}, RaAux, Effects}
970+
end;
971+
handle_aux(_RaftState, cast, eval,
972+
#?AUX{last_checkpoint = Check0} = Aux0,
973+
RaAux) ->
974+
Ts = erlang:system_time(millisecond),
975+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
976+
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
953977
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
954978
consumer_key = Key} = Ret, Corr, Pid},
955979
Aux0, RaAux0) ->
@@ -959,18 +983,18 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
959983
case find_consumer(Key, Consumers) of
960984
{ConsumerKey, #consumer{checked_out = Checked}} ->
961985
{RaAux, ToReturn} =
962-
maps:fold(
963-
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
964-
%% it is possible this is not found if the consumer
965-
%% crashed and the message got removed
966-
case ra_aux:log_fetch(Idx, RA0) of
967-
{{_Term, _Meta, Cmd}, RA} ->
968-
Msg = get_msg(Cmd),
969-
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
970-
{undefined, RA} ->
971-
{RA, Acc}
972-
end
973-
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
986+
maps:fold(
987+
fun (MsgId, ?MSG(Idx, Header), {RA0, Acc}) ->
988+
%% it is possible this is not found if the consumer
989+
%% crashed and the message got removed
990+
case ra_aux:log_fetch(Idx, RA0) of
991+
{{_Term, _Meta, Cmd}, RA} ->
992+
Msg = get_msg(Cmd),
993+
{RA, [{MsgId, Idx, Header, Msg} | Acc]};
994+
{undefined, RA} ->
995+
{RA, Acc}
996+
end
997+
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
974998

975999
Appends = make_requeue(ConsumerKey, {notify, Corr, Pid},
9761000
lists:sort(ToReturn), []),
@@ -1020,35 +1044,6 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
10201044
_ ->
10211045
{reply, {error, consumer_not_found}, Aux0, RaAux0}
10221046
end;
1023-
handle_aux(leader, cast, eval,
1024-
#?AUX{last_decorators_state = LastDec,
1025-
last_checkpoint = Check0} = Aux0,
1026-
RaAux) ->
1027-
#?STATE{cfg = #cfg{resource = QName}} = MacState =
1028-
ra_aux:machine_state(RaAux),
1029-
1030-
Ts = erlang:system_time(millisecond),
1031-
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux),
1032-
1033-
%% this is called after each batch of commands have been applied
1034-
%% set timer for message expire
1035-
%% should really be the last applied index ts but this will have to do
1036-
Effects1 = timer_effect(Ts, MacState, Effects0),
1037-
case query_notify_decorators_info(MacState) of
1038-
LastDec ->
1039-
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects1};
1040-
{MaxActivePriority, IsEmpty} = NewLast ->
1041-
Effects = [notify_decorators_effect(QName, MaxActivePriority, IsEmpty)
1042-
| Effects1],
1043-
{no_reply, Aux0#?AUX{last_checkpoint = Check,
1044-
last_decorators_state = NewLast}, RaAux, Effects}
1045-
end;
1046-
handle_aux(_RaftState, cast, eval,
1047-
#?AUX{last_checkpoint = Check0} = Aux0,
1048-
RaAux) ->
1049-
Ts = erlang:system_time(millisecond),
1050-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux),
1051-
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
10521047
handle_aux(_RaState, cast, Cmd, #?AUX{capacity = Use0} = Aux0, RaAux)
10531048
when Cmd == active orelse Cmd == inactive ->
10541049
{no_reply, Aux0#?AUX{capacity = update_use(Use0, Cmd)}, RaAux};
@@ -1107,6 +1102,11 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11071102
end;
11081103
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11091104
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
1105+
handle_aux(_RaState, _, force_checkpoint,
1106+
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
1107+
Ts = erlang:system_time(millisecond),
1108+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
1109+
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
11101110
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
11111111
#?STATE{dlx = DlxState,
11121112
cfg = #cfg{dead_letter_handler = DLH,
@@ -2639,8 +2639,8 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
26392639
end, Enqs, WaitingConsumers0).
26402640

26412641
is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
2642-
last_active = LastActive,
2643-
consumers = Consumers})
2642+
last_active = LastActive,
2643+
consumers = Consumers})
26442644
when is_number(LastActive) andalso is_number(Expires) ->
26452645
%% TODO: should it be active consumers?
26462646
Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
@@ -2845,53 +2845,53 @@ priority_tag(Msg) ->
28452845
lo
28462846
end.
28472847

2848-
-define(CHECK_ENQ_MIN_INTERVAL_MS, 500).
2849-
-define(CHECK_ENQ_MIN_INDEXES, 4096).
2850-
-define(CHECK_MIN_INTERVAL_MS, 5000).
2851-
-define(CHECK_MIN_INDEXES, 65456).
28522848

28532849
do_checkpoints(Ts,
28542850
#checkpoint{index = ChIdx,
28552851
timestamp = ChTime,
2856-
enqueue_count = ChEnqCnt,
28572852
smallest_index = LastSmallest,
2858-
messages_total = LastMsgsTot} = Check0, RaAux) ->
2853+
indexes = MinIndexes} = Check0, RaAux, Force) ->
28592854
LastAppliedIdx = ra_aux:last_applied(RaAux),
2860-
#?STATE{enqueue_count = EnqCnt} = MacState = ra_aux:machine_state(RaAux),
2855+
IndexesSince = LastAppliedIdx - ChIdx,
2856+
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
2857+
TimeSince = Ts - ChTime,
2858+
NewSmallest = case smallest_raft_index(MacState) of
2859+
undefined ->
2860+
LastAppliedIdx;
2861+
Smallest ->
2862+
Smallest
2863+
end,
28612864
MsgsTot = messages_total(MacState),
2862-
Mult = case MsgsTot > 200_000 of
2863-
true ->
2864-
min(4, MsgsTot div 100_000);
2865-
false ->
2866-
1
2867-
end,
2868-
Since = Ts - ChTime,
2869-
NewSmallest = case smallest_raft_index(MacState) of
2870-
undefined ->
2871-
LastAppliedIdx;
2872-
Smallest ->
2873-
Smallest
2874-
end,
2875-
{Check, Effects} = case (EnqCnt - ChEnqCnt > ?CHECK_ENQ_MIN_INDEXES andalso
2876-
Since > (?CHECK_ENQ_MIN_INTERVAL_MS * Mult)) orelse
2877-
(LastAppliedIdx - ChIdx > ?CHECK_MIN_INDEXES andalso
2878-
Since > (?CHECK_MIN_INTERVAL_MS * Mult)) orelse
2879-
(LastMsgsTot > 0 andalso MsgsTot == 0) of
2880-
true ->
2881-
%% take a checkpoint;
2882-
{#checkpoint{index = LastAppliedIdx,
2883-
timestamp = Ts,
2884-
enqueue_count = EnqCnt,
2885-
smallest_index = NewSmallest,
2886-
messages_total = MsgsTot},
2887-
[{checkpoint, LastAppliedIdx, MacState} |
2888-
release_cursor(LastSmallest, NewSmallest)]};
2889-
false ->
2890-
{Check0#checkpoint{smallest_index = NewSmallest},
2891-
release_cursor(LastSmallest, NewSmallest)}
2892-
end,
2893-
2894-
{Check, Effects}.
2865+
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
2866+
persistent_term:get(quorum_queue_checkpoint_config,
2867+
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
2868+
?CHECK_MAX_INDEXES}),
2869+
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
2870+
2871+
%% enough time has passed and enough indexes have been committed
2872+
case (IndexesSince > MinIndexes andalso
2873+
EnoughTimeHasPassed) orelse
2874+
%% the queue is empty and some commands have been
2875+
%% applied since the last checkpoint
2876+
(MsgsTot == 0 andalso
2877+
IndexesSince > CheckMinIndexes andalso
2878+
EnoughTimeHasPassed) orelse
2879+
Force of
2880+
true ->
2881+
%% take fewer checkpoints the more messages there are on queue
2882+
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
2883+
%% take a checkpoint;
2884+
{#checkpoint{index = LastAppliedIdx,
2885+
timestamp = Ts,
2886+
smallest_index = NewSmallest,
2887+
messages_total = MsgsTot,
2888+
indexes = NextIndexes},
2889+
[{checkpoint, LastAppliedIdx, MacState} |
2890+
release_cursor(LastSmallest, NewSmallest)]};
2891+
false ->
2892+
{Check0#checkpoint{smallest_index = NewSmallest},
2893+
release_cursor(LastSmallest, NewSmallest)}
2894+
end.
28952895

28962896
release_cursor(LastSmallest, Smallest)
28972897
when is_integer(LastSmallest) andalso

0 commit comments

Comments
 (0)