Skip to content

Commit b2bfee8

Browse files
kjnilssonmergify[bot]
authored andcommitted
QQ: Revise checkpointing logic
To take more frequent checkpoints for large message workload Lower the min_checkpoint_interval substantially to allow quorum queues better control over when checkpoints are taken. Track bytes enqueued in the aux state and suggest a checkpoint after every 64MB enqueued (this value is scaled according to backlog just like the indexes condition). This should help with more timely checkpointing when very large messages is used. Try evaluating byte size independently of time window also increase max size (cherry picked from commit 6695282)
1 parent 0eaa42b commit b2bfee8

File tree

4 files changed

+59
-26
lines changed

4 files changed

+59
-26
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
932932
smallest_index :: undefined | ra:index(),
933933
messages_total :: non_neg_integer(),
934934
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
935-
unused_1 = ?NIL}).
935+
bytes_in = 0 :: non_neg_integer()}).
936936
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
937937
-record(aux, {name :: atom(),
938938
capacity :: term(),
@@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
943943
gc = #aux_gc{} :: #aux_gc{},
944944
tick_pid :: undefined | pid(),
945945
cache = #{} :: map(),
946-
last_checkpoint :: #checkpoint{}}).
946+
last_checkpoint :: #checkpoint{},
947+
bytes_in = 0 :: non_neg_integer(),
948+
bytes_out = 0 :: non_neg_integer()}).
947949

948950
init_aux(Name) when is_atom(Name) ->
949951
%% TODO: catch specific exception throw if table already exists
@@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
956958
last_checkpoint = #checkpoint{index = 0,
957959
timestamp = erlang:system_time(millisecond),
958960
messages_total = 0,
959-
unused_1 = ?NIL}}.
961+
bytes_in = 0}}.
960962

961963
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
962964
capacity = Cap,
@@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
973975
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
974976
handle_aux(leader, cast, eval,
975977
#?AUX{last_decorators_state = LastDec,
978+
bytes_in = BytesIn,
976979
last_checkpoint = Check0} = Aux0,
977980
RaAux) ->
978981
#?STATE{cfg = #cfg{resource = QName}} = MacState =
979982
ra_aux:machine_state(RaAux),
980983

981984
Ts = erlang:system_time(millisecond),
982-
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),
985+
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
983986

984987
%% this is called after each batch of commands have been applied
985988
%% set timer for message expire
@@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
995998
last_decorators_state = NewLast}, RaAux, Effects}
996999
end;
9971000
handle_aux(_RaftState, cast, eval,
998-
#?AUX{last_checkpoint = Check0} = Aux0,
1001+
#?AUX{last_checkpoint = Check0,
1002+
bytes_in = BytesIn} = Aux0,
9991003
RaAux) ->
10001004
Ts = erlang:system_time(millisecond),
1001-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
1005+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
10021006
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
1007+
handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}},
1008+
#?AUX{bytes_in = Bytes} = Aux0,
1009+
RaAux) ->
1010+
{no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []};
10031011
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
10041012
consumer_key = Key} = Ret, Corr, Pid},
10051013
Aux0, RaAux0) ->
@@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11291137
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11301138
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
11311139
handle_aux(_RaState, _, force_checkpoint,
1132-
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
1140+
#?AUX{last_checkpoint = Check0,
1141+
bytes_in = BytesIn} = Aux, RaAux) ->
11331142
Ts = erlang:system_time(millisecond),
11341143
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
11351144
rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b",
11361145
[rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]),
1137-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
1146+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, true),
11381147
{no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects};
11391148
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
11401149
#?STATE{dlx = DlxState,
@@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15781587
apply_enqueue(#{index := RaftIdx,
15791588
system_time := Ts} = Meta, From,
15801589
Seq, RawMsg, Size, State0) ->
1581-
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, [], State0) of
1590+
Effects0 = [{aux, {bytes_in, Size}}],
1591+
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size,
1592+
Effects0, State0) of
15821593
{ok, State1, Effects1} ->
15831594
checkout(Meta, State0, State1, Effects1);
15841595
{out_of_sequence, State, Effects} ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
29182929
end.
29192930

29202931

2921-
do_checkpoints(Ts,
2922-
#checkpoint{index = ChIdx,
2923-
timestamp = ChTime,
2924-
smallest_index = LastSmallest,
2925-
indexes = MinIndexes} = Check0, RaAux, Force) ->
2932+
do_checkpoints(Ts, #checkpoint{index = ChIdx,
2933+
timestamp = ChTime,
2934+
smallest_index = LastSmallest,
2935+
bytes_in = LastBytesIn,
2936+
indexes = MinIndexes} = Check0,
2937+
RaAux, BytesIn, Force) ->
29262938
LastAppliedIdx = ra_aux:last_applied(RaAux),
29272939
IndexesSince = LastAppliedIdx - ChIdx,
29282940
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
@@ -2934,21 +2946,35 @@ do_checkpoints(Ts,
29342946
Smallest
29352947
end,
29362948
MsgsTot = messages_total(MacState),
2949+
%% more than 64MB (by default) of message data has been written to the log
2950+
%% best take a checkpoint
2951+
29372952
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
29382953
persistent_term:get(quorum_queue_checkpoint_config,
29392954
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
29402955
?CHECK_MAX_INDEXES}),
2956+
2957+
%% scale the bytes limit as the backlog increases
2958+
MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
2959+
EnoughDataWritten = BytesIn - LastBytesIn > (?CHECK_MAX_BYTES * MaxBytesFactor),
29412960
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
29422961

2943-
%% enough time has passed and enough indexes have been committed
2944-
case (IndexesSince > MinIndexes andalso
2945-
EnoughTimeHasPassed) orelse
2946-
%% the queue is empty and some commands have been
2947-
%% applied since the last checkpoint
2948-
(MsgsTot == 0 andalso
2949-
IndexesSince > CheckMinIndexes andalso
2950-
EnoughTimeHasPassed) orelse
2951-
Force of
2962+
case (EnoughTimeHasPassed andalso
2963+
(
2964+
%% condition 1: enough indexes have been committed since the last
2965+
%% checkpoint
2966+
(IndexesSince > MinIndexes) orelse
2967+
%% condition 2: the queue is empty and _some_ commands
2968+
%% have been applied since the last checkpoint
2969+
(MsgsTot == 0 andalso IndexesSince > 32)
2970+
)
2971+
) orelse
2972+
%% condition 3: enough message data has been written to warrant a new
2973+
%% checkpoint, this ignores the time windowing
2974+
EnoughDataWritten orelse
2975+
%% force was requested, e.g. after a purge
2976+
Force
2977+
of
29522978
true ->
29532979
%% take fewer checkpoints the more messages there are on queue
29542980
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
@@ -2957,6 +2983,7 @@ do_checkpoints(Ts,
29572983
timestamp = Ts,
29582984
smallest_index = NewSmallest,
29592985
messages_total = MsgsTot,
2986+
bytes_in = BytesIn,
29602987
indexes = NextIndexes},
29612988
[{checkpoint, LastAppliedIdx, MacState} |
29622989
release_cursor(LastSmallest, NewSmallest)]};

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@
100100
% represents a partially applied module call
101101

102102
-define(CHECK_MIN_INTERVAL_MS, 1000).
103-
-define(CHECK_MIN_INDEXES, 4096).
103+
-define(CHECK_MIN_INDEXES, 4096 * 2).
104104
-define(CHECK_MAX_INDEXES, 666_667).
105+
%% once these many bytes have been written since the last checkpoint
106+
%% we request a checkpoint irrespectively
107+
-define(CHECK_MAX_BYTES, 128_000_000).
105108

106109
-define(USE_AVG_HALF_LIFE, 10000.0).
107110
%% an average QQ without any message uses about 100KB so setting this limit

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,9 @@
145145
-define(DELETE_TIMEOUT, 5000).
146146
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
147147
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
148-
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
149-
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
148+
%% setting a low default here to allow quorum queues to better chose themselves
149+
%% when to take a checkpoint
150+
-define(MIN_CHECKPOINT_INTERVAL, 64).
150151
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
151152
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
152153

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,8 @@ gh_12635(Config) ->
15271527
publish_confirm(Ch0, QQ),
15281528
publish_confirm(Ch0, QQ),
15291529

1530+
%% a QQ will not take checkpoints more frequently than every 1s
1531+
timer:sleep(1000),
15301532
%% force a checkpoint on leader
15311533
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
15321534
rabbit_ct_helpers:await_condition(

0 commit comments

Comments
 (0)