Skip to content

Support configure max sync throughput in CMQs #3925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,14 @@ suites = [
size = "medium",
flaky = True,
),
rabbitmq_suite(
name = "rabbit_mirror_queue_sync_SUITE",
size = "small",
),
rabbitmq_suite(
name = "rabbit_mirror_queue_misc_SUITE",
size = "small",
),
]

assert_suites(
Expand Down
17 changes: 17 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,23 @@ end}.
{mapping, "mirroring_sync_batch_size", "rabbit.mirroring_sync_batch_size",
[{datatype, bytesize}, {validators, ["mirroring_sync_batch_size"]}]}.

%% Mirror sync max throughput (in bytes) per second.
%% Supported unit symbols:
%% k, kiB: kibibytes (2^10 - 1,024 bytes)
%% M, MiB: mebibytes (2^20 - 1,048,576 bytes)
%% G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
%% kB: kilobytes (10^3 - 1,000 bytes)
%% MB: megabytes (10^6 - 1,000,000 bytes)
%% GB: gigabytes (10^9 - 1,000,000,000 bytes)
%%
%% 0 means "no limit".
%%
%% {mirroring_sync_max_throughput, 0},

{mapping, "mirroring_sync_max_throughput", "rabbit.mirroring_sync_max_throughput", [
{datatype, [integer, string]}
]}.

%% Peer discovery backend used by cluster formation.
%%

Expand Down
10 changes: 8 additions & 2 deletions deps/rabbit/src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,14 @@ sync_mirrors(HandleInfo, EmitStats,
{ok, Q} = rabbit_amqqueue:lookup(QName),
SPids = amqqueue:get_slave_pids(Q),
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
Log("batch size: ~p", [SyncBatchSize]),
SyncThroughput = rabbit_mirror_queue_misc:default_max_sync_throughput(),
log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) of
{cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
{ok, S(BQS1)};
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
Expand All @@ -173,6 +174,11 @@ sync_mirrors(HandleInfo, EmitStats,
{ok, S(BQS1)}
end.

log_mirror_sync_config(Log, SyncBatchSize, 0) ->
Log("batch size: ~p", [SyncBatchSize]);
log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput) ->
Log("max batch size: ~p; max sync throughput: ~p bytes/s", [SyncBatchSize, SyncThroughput]).

terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
Expand Down
22 changes: 21 additions & 1 deletion deps/rabbit/src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
is_mirrored/1, is_mirrored_ha_nodes/1,
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
sync_batch_size/1, default_max_sync_throughput/0,
log_info/3, log_warning/3]).
-export([stop_all_slaves/5]).

-export([sync_queue/1, cancel_sync_queue/1, queue_length/1]).
Expand Down Expand Up @@ -506,6 +507,25 @@ default_batch_size() ->
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
?DEFAULT_BATCH_SIZE).

-define(DEFAULT_MAX_SYNC_THROUGHPUT, 0).

default_max_sync_throughput() ->
case application:get_env(rabbit, mirroring_sync_max_throughput) of
{ok, Value} ->
case rabbit_resource_monitor_misc:parse_information_unit(Value) of
{ok, ParsedThroughput} ->
ParsedThroughput;
{error, parse_error} ->
rabbit_log:warning(
"The configured value for the mirroring_sync_max_throughput is "
"not a valid value: ~p. Disabled sync throughput control. ",
[Value]),
?DEFAULT_MAX_SYNC_THROUGHPUT
end;
undefined ->
?DEFAULT_MAX_SYNC_THROUGHPUT
end.

-spec update_mirrors
(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.

Expand Down
70 changes: 57 additions & 13 deletions deps/rabbit/src/rabbit_mirror_queue_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-export([master_prepare/4, master_go/9, slave/7, conserve_resources/3]).

%% Export for UTs
-export([maybe_master_batch_send/2, get_time_diff/3, append_to_acc/4]).

-define(SYNC_PROGRESS_INTERVAL, 1000000).

-define(SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS, 50).

%% There are three processes around, the master, the syncer and the
%% slave(s). The syncer is an intermediary, linked to the master in
%% order to make sure we do not mess with the master's credit flow or
Expand Down Expand Up @@ -67,23 +72,24 @@ master_prepare(Ref, QName, Log, SPids) ->
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
non_neg_integer(),
non_neg_integer(),
bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'cancelled', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}.

master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
master_batch_go0(Args, SyncBatchSize,
master_batch_go0(Args, SyncBatchSize, SyncThroughput,
BQ, BQS)
end.

master_batch_go0(Args, BatchSize, BQ, BQS) ->
master_batch_go0(Args, BatchSize, SyncThroughput, BQ, BQS) ->
FoldFun =
fun (Msg, MsgProps, Unacked, Acc) ->
Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc),
Expand All @@ -92,24 +98,27 @@ master_batch_go0(Args, BatchSize, BQ, BQS) ->
false -> {cont, Acc1}
end
end,
FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
FoldAcc = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).

master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
{Batch, I, {Curr, Len}, Last}) ->
{Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, Last}) ->
T = maybe_emit_stats(Last, I, EmitStats, Log),
HandleInfo({syncing, I}),
handle_set_maximum_since_use(),
SyncMsg = {msgs, Ref, lists:reverse(Batch)},
NewAcc = {[], I + length(Batch), {Curr, Len}, T},
NewAcc = {[], I + length(Batch), {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T},
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).

%% Either send messages when we reach the last one in the queue or
%% whenever we have accumulated BatchSize messages.
maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) ->
maybe_master_batch_send({_, _, _, {Len, Len}, _}, _BatchSize) ->
true;
maybe_master_batch_send({_, _, _, {Curr, _Len}, _}, BatchSize)
when Curr rem BatchSize =:= 0 ->
true;
maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
when Curr rem BatchSize =:= 0 ->
maybe_master_batch_send({_, _, {TotalBytes, _, SyncThroughput}, {_Curr, _Len}, _}, _BatchSize)
when TotalBytes > SyncThroughput ->
true;
maybe_master_batch_send(_Acc, _BatchSize) ->
false.
Expand All @@ -121,8 +130,10 @@ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.

append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
{[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {_, _, 0}, {Curr, Len}, T}) ->
{[{Msg, MsgProps, Unacked} | Batch], I, {0, 0, 0}, {Curr + 1, Len}, T};
append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T}) ->
{[{Msg, MsgProps, Unacked} | Batch], I, {TotalBytes + rabbit_basic:msg_size(Msg), LastCheck, SyncThroughput}, {Curr + 1, Len}, T}.

master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
receive
Expand All @@ -131,11 +142,44 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
gen_server2:reply(From, ok),
{stop, cancelled};
{next, Ref} -> Syncer ! SyncMsg,
{cont, NewAcc};
{Msgs, I , {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T} = NewAcc,
{NewTotalBytes, NewLastCheck} = maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput),
{cont, {Msgs, I, {NewTotalBytes, NewLastCheck, SyncThroughput}, {Curr, Len}, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.

maybe_throttle_sync_throughput(_ , _, 0) ->
{0, erlang:monotonic_time()};
maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput) ->
Interval = erlang:convert_time_unit(erlang:monotonic_time() - LastCheck, native, milli_seconds),
case Interval > ?SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS of
true -> maybe_pause_sync(TotalBytes, Interval, SyncThroughput),
{0, erlang:monotonic_time()}; %% reset TotalBytes counter and LastCheck.;
false -> {TotalBytes, LastCheck}
end.

maybe_pause_sync(TotalBytes, Interval, SyncThroughput) ->
Delta = get_time_diff(TotalBytes, Interval, SyncThroughput),
pause_queue_sync(Delta).

pause_queue_sync(0) ->
rabbit_log_mirroring:debug("Sync throughput is ok.");
pause_queue_sync(Delta) ->
rabbit_log_mirroring:debug("Sync throughput exceeds threshold. Pause queue sync for ~p ms", [Delta]),
timer:sleep(Delta).

%% Sync throughput computation:
%% - Total bytes have been sent since last check: TotalBytes
%% - Used/Elapsed time since last check: Interval (in milliseconds)
%% - Effective/Used throughput in bytes/s: TotalBytes/Interval * 1000.
%% - When UsedThroughput > SyncThroughput -> we need to slow down to compensate over-used rate.
%% The amount of time to pause queue sync is the different between time needed to broadcast TotalBytes at max throughput
%% and the elapsed time (Interval).
get_time_diff(TotalBytes, Interval, SyncThroughput) ->
rabbit_log_mirroring:debug("Total ~p bytes has been sent over last ~p ms. Effective sync througput: ~p", [TotalBytes, Interval, round(TotalBytes * 1000 / Interval)]),
max(round(TotalBytes/SyncThroughput * 1000 - Interval), 0).

master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
{'$gen_call', From,
Expand Down
29 changes: 29 additions & 0 deletions deps/rabbit/test/rabbit_mirror_queue_misc_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-module(rabbit_mirror_queue_misc_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).

all() ->
[
default_max_sync_throughput
].

default_max_sync_throughput(_Config) ->
?assertEqual(
0,
rabbit_mirror_queue_misc:default_max_sync_throughput()),
application:set_env(rabbit, mirroring_sync_max_throughput, 100),
?assertEqual(
100,
rabbit_mirror_queue_misc:default_max_sync_throughput()),
application:set_env(rabbit, mirroring_sync_max_throughput, "100MiB"),
?assertEqual(
100*1024*1024,
rabbit_mirror_queue_misc:default_max_sync_throughput()),
application:set_env(rabbit, mirroring_sync_max_throughput, "100MB"),
?assertEqual(
100000000,
rabbit_mirror_queue_misc:default_max_sync_throughput()),
ok.
86 changes: 86 additions & 0 deletions deps/rabbit/test/rabbit_mirror_queue_sync_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-module(rabbit_mirror_queue_sync_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile(export_all).

all() ->
[
maybe_master_batch_send,
get_time_diff,
append_to_acc
].

maybe_master_batch_send(_Config) ->
SyncBatchSize = 4096,
SyncThroughput = 2000,
QueueLen = 10000,
?assertEqual(
true, %% Message reach the last one in the queue
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {QueueLen, QueueLen}, 0}, SyncBatchSize)),
?assertEqual(
true, %% # messages batched is less than batch size; and total message size has reached the batch size
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {SyncBatchSize, QueueLen}, 0}, SyncBatchSize)),
TotalBytes0 = SyncThroughput + 1,
Curr0 = 1,
?assertEqual(
true, %% Total batch size exceed max sync throughput
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes0, 0, SyncThroughput}, {Curr0, QueueLen}, 0}, SyncBatchSize)),
TotalBytes1 = 1,
Curr1 = 1,
?assertEqual(
false, %% # messages batched is less than batch size; and total bytes is less than sync throughput
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes1, 0, SyncThroughput}, {Curr1, QueueLen}, 0}, SyncBatchSize)),
ok.

get_time_diff(_Config) ->
TotalBytes0 = 100,
Interval0 = 1000, %% ms
MaxSyncThroughput0 = 100, %% bytes/s
?assertEqual(%% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; matched max throughput
0, %% => no need to pause queue sync
rabbit_mirror_queue_sync:get_time_diff(TotalBytes0, Interval0, MaxSyncThroughput0)),

TotalBytes1 = 100,
Interval1 = 1000, %% ms
MaxSyncThroughput1 = 200, %% bytes/s
?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; less than max throughput
0, %% => no need to pause queue sync
rabbit_mirror_queue_sync:get_time_diff(TotalBytes1, Interval1, MaxSyncThroughput1)),

TotalBytes2 = 100,
Interval2 = 1000, %% ms
MaxSyncThroughput2 = 50, %% bytes/s
?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; greater than max throughput
1000, %% => pause queue sync for 1000 ms
rabbit_mirror_queue_sync:get_time_diff(TotalBytes2, Interval2, MaxSyncThroughput2)),
ok.

append_to_acc(_Config) ->
Msg = #basic_message{
id = 1,
content = #content{
properties = #'P_basic'{
priority = 2
},
payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes
},
is_persistent = true
},
BQDepth = 10,
SyncThroughput_0 = 0,
FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()},
{_, _, {TotalBytes1, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc1),
?assertEqual(0, TotalBytes1), %% Skipping calculating TotalBytes for the pending batch as SyncThroughput is 0.

SyncThroughput = 100,
FoldAcc2 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
{_, _, {TotalBytes2, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc2),
?assertEqual(10, TotalBytes2), %% Message size is added to existing TotalBytes

FoldAcc3 = {[], 0, {TotalBytes2, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
{_, _, {TotalBytes3, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc3),
?assertEqual(TotalBytes2 + 10, TotalBytes3), %% Message size is added to existing TotalBytes
ok.