Skip to content

Commit a29a68a

Browse files
Merge branch 'thuandb-master'
2 parents 49809d1 + b569ab5 commit a29a68a

7 files changed

+226
-16
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,14 @@ suites = [
989989
size = "medium",
990990
flaky = True,
991991
),
992+
rabbitmq_suite(
993+
name = "rabbit_mirror_queue_sync_SUITE",
994+
size = "small",
995+
),
996+
rabbitmq_suite(
997+
name = "rabbit_mirror_queue_misc_SUITE",
998+
size = "small",
999+
),
9921000
]
9931001

9941002
assert_suites(

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,23 @@ end}.
10211021
{mapping, "mirroring_sync_batch_size", "rabbit.mirroring_sync_batch_size",
10221022
[{datatype, bytesize}, {validators, ["mirroring_sync_batch_size"]}]}.
10231023

1024+
%% Mirror sync max throughput (in bytes) per second.
1025+
%% Supported unit symbols:
1026+
%% k, kiB: kibibytes (2^10 - 1,024 bytes)
1027+
%% M, MiB: mebibytes (2^20 - 1,048,576 bytes)
1028+
%% G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
1029+
%% kB: kilobytes (10^3 - 1,000 bytes)
1030+
%% MB: megabytes (10^6 - 1,000,000 bytes)
1031+
%% GB: gigabytes (10^9 - 1,000,000,000 bytes)
1032+
%%
1033+
%% 0 means "no limit".
1034+
%%
1035+
%% {mirroring_sync_max_throughput, 0},
1036+
1037+
{mapping, "mirroring_sync_max_throughput", "rabbit.mirroring_sync_max_throughput", [
1038+
{datatype, [integer, string]}
1039+
]}.
1040+
10241041
%% Peer discovery backend used by cluster formation.
10251042
%%
10261043

deps/rabbit/src/rabbit_mirror_queue_master.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,14 @@ sync_mirrors(HandleInfo, EmitStats,
156156
{ok, Q} = rabbit_amqqueue:lookup(QName),
157157
SPids = amqqueue:get_slave_pids(Q),
158158
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
159-
Log("batch size: ~p", [SyncBatchSize]),
159+
SyncThroughput = rabbit_mirror_queue_misc:default_max_sync_throughput(),
160+
log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput),
160161
Ref = make_ref(),
161162
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
162163
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
163164
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
164165
case rabbit_mirror_queue_sync:master_go(
165-
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
166+
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) of
166167
{cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
167168
{ok, S(BQS1)};
168169
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
@@ -173,6 +174,11 @@ sync_mirrors(HandleInfo, EmitStats,
173174
{ok, S(BQS1)}
174175
end.
175176

177+
log_mirror_sync_config(Log, SyncBatchSize, 0) ->
178+
Log("batch size: ~p", [SyncBatchSize]);
179+
log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput) ->
180+
Log("max batch size: ~p; max sync throughput: ~p bytes/s", [SyncBatchSize, SyncThroughput]).
181+
176182
terminate({shutdown, dropped} = Reason,
177183
State = #state { backing_queue = BQ,
178184
backing_queue_state = BQS }) ->

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
is_mirrored/1, is_mirrored_ha_nodes/1,
1717
update_mirrors/2, update_mirrors/1, validate_policy/1,
1818
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
19-
sync_batch_size/1, log_info/3, log_warning/3]).
19+
sync_batch_size/1, default_max_sync_throughput/0,
20+
log_info/3, log_warning/3]).
2021
-export([stop_all_slaves/5]).
2122

2223
-export([sync_queue/1, cancel_sync_queue/1, queue_length/1]).
@@ -506,6 +507,25 @@ default_batch_size() ->
506507
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
507508
?DEFAULT_BATCH_SIZE).
508509

510+
-define(DEFAULT_MAX_SYNC_THROUGHPUT, 0).
511+
512+
default_max_sync_throughput() ->
513+
case application:get_env(rabbit, mirroring_sync_max_throughput) of
514+
{ok, Value} ->
515+
case rabbit_resource_monitor_misc:parse_information_unit(Value) of
516+
{ok, ParsedThroughput} ->
517+
ParsedThroughput;
518+
{error, parse_error} ->
519+
rabbit_log:warning(
520+
"The configured value for the mirroring_sync_max_throughput is "
521+
"not a valid value: ~p. Disabled sync throughput control. ",
522+
[Value]),
523+
?DEFAULT_MAX_SYNC_THROUGHPUT
524+
end;
525+
undefined ->
526+
?DEFAULT_MAX_SYNC_THROUGHPUT
527+
end.
528+
509529
-spec update_mirrors
510530
(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.
511531

deps/rabbit/src/rabbit_mirror_queue_sync.erl

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@
99

1010
-include_lib("rabbit_common/include/rabbit.hrl").
1111

12-
-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
12+
-export([master_prepare/4, master_go/9, slave/7, conserve_resources/3]).
13+
14+
%% Export for UTs
15+
-export([maybe_master_batch_send/2, get_time_diff/3, append_to_acc/4]).
1316

1417
-define(SYNC_PROGRESS_INTERVAL, 1000000).
1518

19+
-define(SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS, 50).
20+
1621
%% There are three processes around, the master, the syncer and the
1722
%% slave(s). The syncer is an intermediary, linked to the master in
1823
%% order to make sure we do not mess with the master's credit flow or
@@ -67,23 +72,24 @@ master_prepare(Ref, QName, Log, SPids) ->
6772
rabbit_mirror_queue_master:stats_fun(),
6873
rabbit_mirror_queue_master:stats_fun(),
6974
non_neg_integer(),
75+
non_neg_integer(),
7076
bq(), bqs()) ->
7177
{'already_synced', bqs()} | {'ok', bqs()} |
7278
{'cancelled', bqs()} |
7379
{'shutdown', any(), bqs()} |
7480
{'sync_died', any(), bqs()}.
7581

76-
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
82+
master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) ->
7783
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
7884
receive
7985
{'EXIT', Syncer, normal} -> {already_synced, BQS};
8086
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
8187
{ready, Syncer} -> EmitStats({syncing, 0}),
82-
master_batch_go0(Args, SyncBatchSize,
88+
master_batch_go0(Args, SyncBatchSize, SyncThroughput,
8389
BQ, BQS)
8490
end.
8591

86-
master_batch_go0(Args, BatchSize, BQ, BQS) ->
92+
master_batch_go0(Args, BatchSize, SyncThroughput, BQ, BQS) ->
8793
FoldFun =
8894
fun (Msg, MsgProps, Unacked, Acc) ->
8995
Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc),
@@ -92,24 +98,27 @@ master_batch_go0(Args, BatchSize, BQ, BQS) ->
9298
false -> {cont, Acc1}
9399
end
94100
end,
95-
FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
101+
FoldAcc = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
96102
bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
97103

98104
master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
99-
{Batch, I, {Curr, Len}, Last}) ->
105+
{Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, Last}) ->
100106
T = maybe_emit_stats(Last, I, EmitStats, Log),
101107
HandleInfo({syncing, I}),
102108
handle_set_maximum_since_use(),
103109
SyncMsg = {msgs, Ref, lists:reverse(Batch)},
104-
NewAcc = {[], I + length(Batch), {Curr, Len}, T},
110+
NewAcc = {[], I + length(Batch), {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T},
105111
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
106112

107113
%% Either send messages when we reach the last one in the queue or
108114
%% whenever we have accumulated BatchSize messages.
109-
maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) ->
115+
maybe_master_batch_send({_, _, _, {Len, Len}, _}, _BatchSize) ->
116+
true;
117+
maybe_master_batch_send({_, _, _, {Curr, _Len}, _}, BatchSize)
118+
when Curr rem BatchSize =:= 0 ->
110119
true;
111-
maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
112-
when Curr rem BatchSize =:= 0 ->
120+
maybe_master_batch_send({_, _, {TotalBytes, _, SyncThroughput}, {_Curr, _Len}, _}, _BatchSize)
121+
when TotalBytes > SyncThroughput ->
113122
true;
114123
maybe_master_batch_send(_Acc, _BatchSize) ->
115124
false.
@@ -121,8 +130,10 @@ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
121130
{_, BQS1} -> master_done(Args, BQS1)
122131
end.
123132

124-
append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
125-
{[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
133+
append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {_, _, 0}, {Curr, Len}, T}) ->
134+
{[{Msg, MsgProps, Unacked} | Batch], I, {0, 0, 0}, {Curr + 1, Len}, T};
135+
append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T}) ->
136+
{[{Msg, MsgProps, Unacked} | Batch], I, {TotalBytes + rabbit_basic:msg_size(Msg), LastCheck, SyncThroughput}, {Curr + 1, Len}, T}.
126137

127138
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
128139
receive
@@ -131,11 +142,44 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
131142
gen_server2:reply(From, ok),
132143
{stop, cancelled};
133144
{next, Ref} -> Syncer ! SyncMsg,
134-
{cont, NewAcc};
145+
{Msgs, I , {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T} = NewAcc,
146+
{NewTotalBytes, NewLastCheck} = maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput),
147+
{cont, {Msgs, I, {NewTotalBytes, NewLastCheck, SyncThroughput}, {Curr, Len}, T}};
135148
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
136149
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
137150
end.
138151

152+
maybe_throttle_sync_throughput(_ , _, 0) ->
153+
{0, erlang:monotonic_time()};
154+
maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput) ->
155+
Interval = erlang:convert_time_unit(erlang:monotonic_time() - LastCheck, native, milli_seconds),
156+
case Interval > ?SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS of
157+
true -> maybe_pause_sync(TotalBytes, Interval, SyncThroughput),
158+
{0, erlang:monotonic_time()}; %% reset TotalBytes counter and LastCheck.;
159+
false -> {TotalBytes, LastCheck}
160+
end.
161+
162+
maybe_pause_sync(TotalBytes, Interval, SyncThroughput) ->
163+
Delta = get_time_diff(TotalBytes, Interval, SyncThroughput),
164+
pause_queue_sync(Delta).
165+
166+
pause_queue_sync(0) ->
167+
rabbit_log_mirroring:debug("Sync throughput is ok.");
168+
pause_queue_sync(Delta) ->
169+
rabbit_log_mirroring:debug("Sync throughput exceeds threshold. Pause queue sync for ~p ms", [Delta]),
170+
timer:sleep(Delta).
171+
172+
%% Sync throughput computation:
173+
%% - Total bytes have been sent since last check: TotalBytes
174+
%% - Used/Elapsed time since last check: Interval (in milliseconds)
175+
%% - Effective/Used throughput in bytes/s: TotalBytes/Interval * 1000.
176+
%% - When UsedThroughput > SyncThroughput -> we need to slow down to compensate over-used rate.
177+
%% The amount of time to pause queue sync is the different between time needed to broadcast TotalBytes at max throughput
178+
%% and the elapsed time (Interval).
179+
get_time_diff(TotalBytes, Interval, SyncThroughput) ->
180+
rabbit_log_mirroring:debug("Total ~p bytes has been sent over last ~p ms. Effective sync througput: ~p", [TotalBytes, Interval, round(TotalBytes * 1000 / Interval)]),
181+
max(round(TotalBytes/SyncThroughput * 1000 - Interval), 0).
182+
139183
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
140184
receive
141185
{'$gen_call', From,
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
-module(unit_classic_mirrored_queue_sync_throttling_SUITE).
2+
3+
-include_lib("common_test/include/ct.hrl").
4+
-include_lib("eunit/include/eunit.hrl").
5+
-include_lib("amqp_client/include/amqp_client.hrl").
6+
7+
-compile(export_all).
8+
9+
all() ->
10+
[
11+
maybe_master_batch_send,
12+
get_time_diff,
13+
append_to_acc
14+
].
15+
16+
maybe_master_batch_send(_Config) ->
17+
SyncBatchSize = 4096,
18+
SyncThroughput = 2000,
19+
QueueLen = 10000,
20+
?assertEqual(
21+
true, %% Message reach the last one in the queue
22+
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {QueueLen, QueueLen}, 0}, SyncBatchSize)),
23+
?assertEqual(
24+
true, %% # messages batched is less than batch size; and total message size has reached the batch size
25+
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {SyncBatchSize, QueueLen}, 0}, SyncBatchSize)),
26+
TotalBytes0 = SyncThroughput + 1,
27+
Curr0 = 1,
28+
?assertEqual(
29+
true, %% Total batch size exceed max sync throughput
30+
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes0, 0, SyncThroughput}, {Curr0, QueueLen}, 0}, SyncBatchSize)),
31+
TotalBytes1 = 1,
32+
Curr1 = 1,
33+
?assertEqual(
34+
false, %% # messages batched is less than batch size; and total bytes is less than sync throughput
35+
rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes1, 0, SyncThroughput}, {Curr1, QueueLen}, 0}, SyncBatchSize)),
36+
ok.
37+
38+
get_time_diff(_Config) ->
39+
TotalBytes0 = 100,
40+
Interval0 = 1000, %% ms
41+
MaxSyncThroughput0 = 100, %% bytes/s
42+
?assertEqual(%% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; matched max throughput
43+
0, %% => no need to pause queue sync
44+
rabbit_mirror_queue_sync:get_time_diff(TotalBytes0, Interval0, MaxSyncThroughput0)),
45+
46+
TotalBytes1 = 100,
47+
Interval1 = 1000, %% ms
48+
MaxSyncThroughput1 = 200, %% bytes/s
49+
?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; less than max throughput
50+
0, %% => no need to pause queue sync
51+
rabbit_mirror_queue_sync:get_time_diff(TotalBytes1, Interval1, MaxSyncThroughput1)),
52+
53+
TotalBytes2 = 100,
54+
Interval2 = 1000, %% ms
55+
MaxSyncThroughput2 = 50, %% bytes/s
56+
?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; greater than max throughput
57+
1000, %% => pause queue sync for 1000 ms
58+
rabbit_mirror_queue_sync:get_time_diff(TotalBytes2, Interval2, MaxSyncThroughput2)),
59+
ok.
60+
61+
append_to_acc(_Config) ->
62+
Msg = #basic_message{
63+
id = 1,
64+
content = #content{
65+
properties = #'P_basic'{
66+
priority = 2
67+
},
68+
payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes
69+
},
70+
is_persistent = true
71+
},
72+
BQDepth = 10,
73+
SyncThroughput_0 = 0,
74+
FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()},
75+
{_, _, {TotalBytes1, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc1),
76+
?assertEqual(0, TotalBytes1), %% Skipping calculating TotalBytes for the pending batch as SyncThroughput is 0.
77+
78+
SyncThroughput = 100,
79+
FoldAcc2 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
80+
{_, _, {TotalBytes2, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc2),
81+
?assertEqual(10, TotalBytes2), %% Message size is added to existing TotalBytes
82+
83+
FoldAcc3 = {[], 0, {TotalBytes2, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
84+
{_, _, {TotalBytes3, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc3),
85+
?assertEqual(TotalBytes2 + 10, TotalBytes3), %% Message size is added to existing TotalBytes
86+
ok.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-module(unit_classic_mirrored_queue_throughput_SUITE).
2+
3+
-include_lib("common_test/include/ct.hrl").
4+
-include_lib("eunit/include/eunit.hrl").
5+
6+
-compile(export_all).
7+
8+
all() ->
9+
[
10+
default_max_sync_throughput
11+
].
12+
13+
default_max_sync_throughput(_Config) ->
14+
?assertEqual(
15+
0,
16+
rabbit_mirror_queue_misc:default_max_sync_throughput()),
17+
application:set_env(rabbit, mirroring_sync_max_throughput, 100),
18+
?assertEqual(
19+
100,
20+
rabbit_mirror_queue_misc:default_max_sync_throughput()),
21+
application:set_env(rabbit, mirroring_sync_max_throughput, "100MiB"),
22+
?assertEqual(
23+
100*1024*1024,
24+
rabbit_mirror_queue_misc:default_max_sync_throughput()),
25+
application:set_env(rabbit, mirroring_sync_max_throughput, "100MB"),
26+
?assertEqual(
27+
100000000,
28+
rabbit_mirror_queue_misc:default_max_sync_throughput()),
29+
ok.

0 commit comments

Comments
 (0)