@@ -929,7 +929,7 @@ which_module(5) -> ?MODULE.
929
929
smallest_index :: undefined | ra :index (),
930
930
messages_total :: non_neg_integer (),
931
931
indexes = ? CHECK_MIN_INDEXES :: non_neg_integer (),
932
- unused_1 = ? NIL }).
932
+ bytes_in = 0 :: non_neg_integer () }).
933
933
-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
934
934
-record (aux , {name :: atom (),
935
935
capacity :: term (),
@@ -940,7 +940,9 @@ which_module(5) -> ?MODULE.
940
940
gc = # aux_gc {} :: # aux_gc {},
941
941
tick_pid :: undefined | pid (),
942
942
cache = #{} :: map (),
943
- last_checkpoint :: # checkpoint {}}).
943
+ last_checkpoint :: # checkpoint {},
944
+ bytes_in = 0 :: non_neg_integer (),
945
+ bytes_out = 0 :: non_neg_integer ()}).
944
946
945
947
init_aux (Name ) when is_atom (Name ) ->
946
948
% % TODO: catch specific exception throw if table already exists
@@ -953,7 +955,7 @@ init_aux(Name) when is_atom(Name) ->
953
955
last_checkpoint = # checkpoint {index = 0 ,
954
956
timestamp = erlang :system_time (millisecond ),
955
957
messages_total = 0 ,
956
- unused_1 = ? NIL }}.
958
+ bytes_in = 0 }}.
957
959
958
960
handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
959
961
capacity = Cap ,
@@ -970,13 +972,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
970
972
handle_aux (RaftState , Tag , Cmd , AuxV3 , RaAux );
971
973
handle_aux (leader , cast , eval ,
972
974
#? AUX {last_decorators_state = LastDec ,
975
+ bytes_in = BytesIn ,
973
976
last_checkpoint = Check0 } = Aux0 ,
974
977
RaAux ) ->
975
978
#? STATE {cfg = # cfg {resource = QName }} = MacState =
976
979
ra_aux :machine_state (RaAux ),
977
980
978
981
Ts = erlang :system_time (millisecond ),
979
- {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , false ),
982
+ {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
980
983
981
984
% % this is called after each batch of commands have been applied
982
985
% % set timer for message expire
@@ -992,11 +995,16 @@ handle_aux(leader, cast, eval,
992
995
last_decorators_state = NewLast }, RaAux , Effects }
993
996
end ;
994
997
handle_aux (_RaftState , cast , eval ,
995
- #? AUX {last_checkpoint = Check0 } = Aux0 ,
998
+ #? AUX {last_checkpoint = Check0 ,
999
+ bytes_in = BytesIn } = Aux0 ,
996
1000
RaAux ) ->
997
1001
Ts = erlang :system_time (millisecond ),
998
- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , false ),
1002
+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
999
1003
{no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects };
1004
+ handle_aux (_RaftState , cast , {bytes_in , {MetaSize , BodySize }},
1005
+ #? AUX {bytes_in = Bytes } = Aux0 ,
1006
+ RaAux ) ->
1007
+ {no_reply , Aux0 #? AUX {bytes_in = Bytes + MetaSize + BodySize }, RaAux , []};
1000
1008
handle_aux (_RaftState , cast , {# return {msg_ids = MsgIds ,
1001
1009
consumer_key = Key } = Ret , Corr , Pid },
1002
1010
Aux0 , RaAux0 ) ->
@@ -1126,12 +1134,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
1126
1134
handle_aux (_ , _ , garbage_collection , Aux , RaAux ) ->
1127
1135
{no_reply , force_eval_gc (RaAux , Aux ), RaAux };
1128
1136
handle_aux (_RaState , _ , force_checkpoint ,
1129
- #? AUX {last_checkpoint = Check0 } = Aux , RaAux ) ->
1137
+ #? AUX {last_checkpoint = Check0 ,
1138
+ bytes_in = BytesIn } = Aux , RaAux ) ->
1130
1139
Ts = erlang :system_time (millisecond ),
1131
1140
#? STATE {cfg = # cfg {resource = QR }} = ra_aux :machine_state (RaAux ),
1132
1141
rabbit_log :debug (" ~ts : rabbit_fifo: forcing checkpoint at ~b " ,
1133
1142
[rabbit_misc :rs (QR ), ra_aux :last_applied (RaAux )]),
1134
- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , true ),
1143
+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , true ),
1135
1144
{no_reply , Aux #? AUX {last_checkpoint = Check }, RaAux , Effects };
1136
1145
handle_aux (RaState , _ , {dlx , _ } = Cmd , Aux0 , RaAux ) ->
1137
1146
#? STATE {dlx = DlxState ,
@@ -1575,7 +1584,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
1575
1584
apply_enqueue (#{index := RaftIdx ,
1576
1585
system_time := Ts } = Meta , From ,
1577
1586
Seq , RawMsg , Size , State0 ) ->
1578
- case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size , [], State0 ) of
1587
+ Effects0 = [{aux , {bytes_in , Size }}],
1588
+ case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size ,
1589
+ Effects0 , State0 ) of
1579
1590
{ok , State1 , Effects1 } ->
1580
1591
checkout (Meta , State0 , State1 , Effects1 );
1581
1592
{out_of_sequence , State , Effects } ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
2918
2929
end .
2919
2930
2920
2931
2921
- do_checkpoints (Ts ,
2922
- # checkpoint {index = ChIdx ,
2923
- timestamp = ChTime ,
2924
- smallest_index = LastSmallest ,
2925
- indexes = MinIndexes } = Check0 , RaAux , Force ) ->
2932
+ do_checkpoints (Ts , # checkpoint {index = ChIdx ,
2933
+ timestamp = ChTime ,
2934
+ smallest_index = LastSmallest ,
2935
+ bytes_in = LastBytesIn ,
2936
+ indexes = MinIndexes } = Check0 ,
2937
+ RaAux , BytesIn , Force ) ->
2926
2938
LastAppliedIdx = ra_aux :last_applied (RaAux ),
2927
2939
IndexesSince = LastAppliedIdx - ChIdx ,
2928
2940
#? STATE {} = MacState = ra_aux :machine_state (RaAux ),
@@ -2934,21 +2946,35 @@ do_checkpoints(Ts,
2934
2946
Smallest
2935
2947
end ,
2936
2948
MsgsTot = messages_total (MacState ),
2949
+ % % more than 64MB (by default) of message data has been written to the log
2950
+ % % best take a checkpoint
2951
+
2937
2952
{CheckMinInterval , CheckMinIndexes , CheckMaxIndexes } =
2938
2953
persistent_term :get (quorum_queue_checkpoint_config ,
2939
2954
{? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
2940
2955
? CHECK_MAX_INDEXES }),
2956
+
2957
+ % % scale the bytes limit as the backlog increases
2958
+ MaxBytesFactor = max (1 , MsgsTot / CheckMaxIndexes ),
2959
+ EnoughDataWritten = BytesIn - LastBytesIn > (? CHECK_MAX_BYTES * MaxBytesFactor ),
2941
2960
EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
2942
2961
2943
- % % enough time has passed and enough indexes have been committed
2944
- case (IndexesSince > MinIndexes andalso
2945
- EnoughTimeHasPassed ) orelse
2946
- % % the queue is empty and some commands have been
2947
- % % applied since the last checkpoint
2948
- (MsgsTot == 0 andalso
2949
- IndexesSince > CheckMinIndexes andalso
2950
- EnoughTimeHasPassed ) orelse
2951
- Force of
2962
+ case (EnoughTimeHasPassed andalso
2963
+ (
2964
+ % % condition 1: enough indexes have been committed since the last
2965
+ % % checkpoint
2966
+ (IndexesSince > MinIndexes ) orelse
2967
+ % % condition 2: the queue is empty and _some_ commands
2968
+ % % have been applied since the last checkpoint
2969
+ (MsgsTot == 0 andalso IndexesSince > 32 )
2970
+ )
2971
+ ) orelse
2972
+ % % condition 3: enough message data has been written to warrant a new
2973
+ % % checkpoint, this ignores the time windowing
2974
+ EnoughDataWritten orelse
2975
+ % % force was requested, e.g. after a purge
2976
+ Force
2977
+ of
2952
2978
true ->
2953
2979
% % take fewer checkpoints the more messages there are on queue
2954
2980
NextIndexes = min (max (MsgsTot , CheckMinIndexes ), CheckMaxIndexes ),
@@ -2957,6 +2983,7 @@ do_checkpoints(Ts,
2957
2983
timestamp = Ts ,
2958
2984
smallest_index = NewSmallest ,
2959
2985
messages_total = MsgsTot ,
2986
+ bytes_in = BytesIn ,
2960
2987
indexes = NextIndexes },
2961
2988
[{checkpoint , LastAppliedIdx , MacState } |
2962
2989
release_cursor (LastSmallest , NewSmallest )]};
0 commit comments