Skip to content

Commit d2a294e

Browse files
committed
QQ: tweaks to checkpointing for use cases with fewer larger messages.
Lower the min_checkpoint_interval substantially to allow quorum queues better control over when checkpoints are taken. Track bytes enqueued in the aux state and suggest a checkpoint after every 64MB enqueued. This should help with more timely checkpointing when very large messages is used.
1 parent bb20885 commit d2a294e

File tree

4 files changed

+54
-26
lines changed

4 files changed

+54
-26
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
932932
smallest_index :: undefined | ra:index(),
933933
messages_total :: non_neg_integer(),
934934
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
935-
unused_1 = ?NIL}).
935+
bytes_in = 0 :: non_neg_integer()}).
936936
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
937937
-record(aux, {name :: atom(),
938938
capacity :: term(),
@@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
943943
gc = #aux_gc{} :: #aux_gc{},
944944
tick_pid :: undefined | pid(),
945945
cache = #{} :: map(),
946-
last_checkpoint :: #checkpoint{}}).
946+
last_checkpoint :: #checkpoint{},
947+
bytes_in = 0 :: non_neg_integer(),
948+
bytes_out = 0 :: non_neg_integer()}).
947949

948950
init_aux(Name) when is_atom(Name) ->
949951
%% TODO: catch specific exception throw if table already exists
@@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
956958
last_checkpoint = #checkpoint{index = 0,
957959
timestamp = erlang:system_time(millisecond),
958960
messages_total = 0,
959-
unused_1 = ?NIL}}.
961+
bytes_in = 0}}.
960962

961963
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
962964
capacity = Cap,
@@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
973975
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
974976
handle_aux(leader, cast, eval,
975977
#?AUX{last_decorators_state = LastDec,
978+
bytes_in = BytesIn,
976979
last_checkpoint = Check0} = Aux0,
977980
RaAux) ->
978981
#?STATE{cfg = #cfg{resource = QName}} = MacState =
979982
ra_aux:machine_state(RaAux),
980983

981984
Ts = erlang:system_time(millisecond),
982-
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false),
985+
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
983986

984987
%% this is called after each batch of commands have been applied
985988
%% set timer for message expire
@@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
995998
last_decorators_state = NewLast}, RaAux, Effects}
996999
end;
9971000
handle_aux(_RaftState, cast, eval,
998-
#?AUX{last_checkpoint = Check0} = Aux0,
1001+
#?AUX{last_checkpoint = Check0,
1002+
bytes_in = BytesIn} = Aux0,
9991003
RaAux) ->
10001004
Ts = erlang:system_time(millisecond),
1001-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false),
1005+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
10021006
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
1007+
handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}},
1008+
#?AUX{bytes_in = Bytes} = Aux0,
1009+
RaAux) ->
1010+
{no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []};
10031011
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
10041012
consumer_key = Key} = Ret, Corr, Pid},
10051013
Aux0, RaAux0) ->
@@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11291137
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11301138
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
11311139
handle_aux(_RaState, _, force_checkpoint,
1132-
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
1140+
#?AUX{last_checkpoint = Check0,
1141+
bytes_in = BytesIn} = Aux, RaAux) ->
11331142
Ts = erlang:system_time(millisecond),
11341143
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
11351144
rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b",
11361145
[rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]),
1137-
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
1146+
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, true),
11381147
{no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects};
11391148
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
11401149
#?STATE{dlx = DlxState,
@@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15781587
apply_enqueue(#{index := RaftIdx,
15791588
system_time := Ts} = Meta, From,
15801589
Seq, RawMsg, Size, State0) ->
1581-
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, [], State0) of
1590+
Effects0 = [{aux, {bytes_in, Size}}],
1591+
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size,
1592+
Effects0, State0) of
15821593
{ok, State1, Effects1} ->
15831594
checkout(Meta, State0, State1, Effects1);
15841595
{out_of_sequence, State, Effects} ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
29182929
end.
29192930

29202931

2921-
do_checkpoints(Ts,
2922-
#checkpoint{index = ChIdx,
2923-
timestamp = ChTime,
2924-
smallest_index = LastSmallest,
2925-
indexes = MinIndexes} = Check0, RaAux, Force) ->
2932+
do_checkpoints(Ts, #checkpoint{index = ChIdx,
2933+
timestamp = ChTime,
2934+
smallest_index = LastSmallest,
2935+
bytes_in = LastBytesIn,
2936+
indexes = MinIndexes} = Check0,
2937+
RaAux, BytesIn, Force) ->
29262938
LastAppliedIdx = ra_aux:last_applied(RaAux),
29272939
IndexesSince = LastAppliedIdx - ChIdx,
29282940
#?STATE{} = MacState = ra_aux:machine_state(RaAux),
@@ -2934,21 +2946,30 @@ do_checkpoints(Ts,
29342946
Smallest
29352947
end,
29362948
MsgsTot = messages_total(MacState),
2949+
%% more than 64MB (by default) of message data has been written to the log
2950+
%% best take a checkpoint
2951+
EnoughDataWritten = BytesIn - LastBytesIn > ?CHECK_MAX_BYTES,
2952+
29372953
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
29382954
persistent_term:get(quorum_queue_checkpoint_config,
29392955
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
29402956
?CHECK_MAX_INDEXES}),
29412957
EnoughTimeHasPassed = TimeSince > CheckMinInterval,
29422958

2943-
%% enough time has passed and enough indexes have been committed
2944-
case (IndexesSince > MinIndexes andalso
2945-
EnoughTimeHasPassed) orelse
2946-
%% the queue is empty and some commands have been
2947-
%% applied since the last checkpoint
2948-
(MsgsTot == 0 andalso
2949-
IndexesSince > CheckMinIndexes andalso
2950-
EnoughTimeHasPassed) orelse
2951-
Force of
2959+
case EnoughTimeHasPassed andalso
2960+
(
2961+
%% condition 1: enough indexes have been committed since the last
2962+
%% checkpoint
2963+
(IndexesSince > MinIndexes) orelse
2964+
%% condition 2: the queue is empty and _some_ commands
2965+
%% have been applied since the last checkpoint
2966+
(MsgsTot == 0 andalso IndexesSince > 32) orelse
2967+
%% condition 3: enough message data has been written to warrant a new
2968+
%% checkpoint
2969+
EnoughDataWritten orelse
2970+
%% force was requested, e.g. after a purge
2971+
Force
2972+
) of
29522973
true ->
29532974
%% take fewer checkpoints the more messages there are on queue
29542975
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
@@ -2957,6 +2978,7 @@ do_checkpoints(Ts,
29572978
timestamp = Ts,
29582979
smallest_index = NewSmallest,
29592980
messages_total = MsgsTot,
2981+
bytes_in = BytesIn,
29602982
indexes = NextIndexes},
29612983
[{checkpoint, LastAppliedIdx, MacState} |
29622984
release_cursor(LastSmallest, NewSmallest)]};

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@
100100
% represents a partially applied module call
101101

102102
-define(CHECK_MIN_INTERVAL_MS, 1000).
103-
-define(CHECK_MIN_INDEXES, 4096).
103+
-define(CHECK_MIN_INDEXES, 4096 * 2).
104104
-define(CHECK_MAX_INDEXES, 666_667).
105+
%% once these many bytes have been written since the last checkpoint
106+
%% we request a checkpoint irrespectively
107+
-define(CHECK_MAX_BYTES, 64_000_000).
105108

106109
-define(USE_AVG_HALF_LIFE, 10000.0).
107110
%% an average QQ without any message uses about 100KB so setting this limit

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,9 @@
145145
-define(DELETE_TIMEOUT, 5000).
146146
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
147147
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
148-
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
149-
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384
148+
%% setting a low default here to allow quorum queues to better chose themselves
149+
%% when to take a checkpoint
150+
-define(MIN_CHECKPOINT_INTERVAL, 64).
150151
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
151152
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
152153

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,8 @@ gh_12635(Config) ->
15271527
publish_confirm(Ch0, QQ),
15281528
publish_confirm(Ch0, QQ),
15291529

1530+
%% a QQ will not take checkpoints more frequently than every 1s
1531+
timer:sleep(1000),
15301532
%% force a checkpoint on leader
15311533
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
15321534
rabbit_ct_helpers:await_condition(

0 commit comments

Comments
 (0)