Skip to content

Commit 75b2a53

Browse files
authored
Merge pull request #3503 from rabbitmq/super-stream-cli
Add functions to create/delete super stream in manager
2 parents c935f9c + 6b9589b commit 75b2a53

10 files changed

+1283
-125
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_date_time).
9+
10+
-export([parse_duration/1]).
11+
12+
-type datetime_plist() :: list({atom(), integer()}).
13+
14+
% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl
15+
-spec gi(string()) -> integer().
16+
gi(DS) ->
17+
{Int, _Rest} = string:to_integer(DS),
18+
case Int of
19+
error ->
20+
0;
21+
_ ->
22+
Int
23+
end.
24+
25+
-spec parse_duration(string()) -> datetime_plist().
26+
parse_duration(Bin)
27+
when is_binary(Bin) -> %TODO extended format
28+
parse_duration(binary_to_list(Bin));
29+
parse_duration(Str) ->
30+
case re:run(Str,
31+
"^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0"
32+
"-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?("
33+
"?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]"
34+
"+)?)S)?)?$",
35+
[{capture, [sign, years, months, days, hours, minutes, seconds],
36+
list}])
37+
of
38+
{match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} ->
39+
{ok, [{sign, Sign},
40+
{years, gi(Years)},
41+
{months, gi(Months)},
42+
{days, gi(Days)},
43+
{hours, gi(Hours)},
44+
{minutes, gi(Minutes)},
45+
{seconds, gi(Seconds)}]};
46+
nomatch ->
47+
error
48+
end.

deps/rabbit_common/test/unit_SUITE.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ groups() ->
4444
frame_encoding_does_not_fail_with_empty_binary_payload,
4545
amqp_table_conversion,
4646
name_type,
47-
get_erl_path
47+
get_erl_path,
48+
date_time_parse_duration
4849
]},
4950
{parse_mem_limit, [parallel], [
5051
parse_mem_limit_relative_exactly_max,
@@ -460,3 +461,23 @@ get_erl_path(_) ->
460461
?assertNotMatch(nomatch, string:find(Exe, "erl"))
461462
end,
462463
ok.
464+
465+
date_time_parse_duration(_) ->
466+
?assertEqual(
467+
{ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]},
468+
rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S")
469+
),
470+
?assertEqual(
471+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]},
472+
rabbit_date_time:parse_duration("PT6M")
473+
),
474+
?assertEqual(
475+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]},
476+
rabbit_date_time:parse_duration("PT10M30S")
477+
),
478+
?assertEqual(
479+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]},
480+
rabbit_date_time:parse_duration("P5DT8H")
481+
),
482+
?assertEqual(error, rabbit_date_time:parse_duration("foo")),
483+
ok.

deps/rabbitmq_stream/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ suites = [
8484
PACKAGE,
8585
name = "rabbit_stream_utils_SUITE",
8686
),
87+
rabbitmq_integration_suite(
88+
PACKAGE,
89+
name = "rabbit_stream_manager_SUITE",
90+
),
8791
rabbitmq_integration_suite(
8892
PACKAGE,
8993
name = "rabbit_stream_SUITE",
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
17+
18+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
24+
{'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
25+
26+
-export([scopes/0,
27+
usage/0,
28+
usage_additional/0,
29+
usage_doc_guides/0,
30+
switches/0,
31+
banner/2,
32+
validate/2,
33+
merge_defaults/2,
34+
run/2,
35+
output/2,
36+
description/0,
37+
help_section/0]).
38+
39+
scopes() ->
40+
[ctl, streams].
41+
42+
description() ->
43+
<<"Add a super stream (experimental feature)">>.
44+
45+
switches() ->
46+
[{partitions, integer},
47+
{routing_keys, string},
48+
{max_length_bytes, string},
49+
{max_age, string},
50+
{stream_max_segment_size_bytes, string},
51+
{leader_locator, string},
52+
{initial_cluster_size, integer}].
53+
54+
help_section() ->
55+
{plugin, stream}.
56+
57+
validate([], _Opts) ->
58+
{validation_failure, not_enough_args};
59+
validate([_Name], #{partitions := _, routing_keys := _}) ->
60+
{validation_failure,
61+
"Specify --partitions or routing-keys, not both."};
62+
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
63+
{validation_failure, "The partition number must be greater than 0"};
64+
validate([_Name], Opts) ->
65+
validate_stream_arguments(Opts);
66+
validate(_, _Opts) ->
67+
{validation_failure, too_many_args}.
68+
69+
validate_stream_arguments(#{max_length_bytes := Value} = Opts) ->
70+
case parse_information_unit(Value) of
71+
error ->
72+
{validation_failure,
73+
"Invalid value for --max-length-bytes, valid example "
74+
"values: 100gb, 50mb"};
75+
_ ->
76+
validate_stream_arguments(maps:remove(max_length_bytes, Opts))
77+
end;
78+
validate_stream_arguments(#{max_age := Value} = Opts) ->
79+
case rabbit_date_time:parse_duration(Value) of
80+
{ok, _} ->
81+
validate_stream_arguments(maps:remove(max_age, Opts));
82+
error ->
83+
{validation_failure,
84+
"Invalid value for --max-age, the value must a "
85+
"ISO 8601 duration, e.g. e.g. PT10M30S for 10 "
86+
"minutes 30 seconds, P5DT8H for 5 days 8 hours."}
87+
end;
88+
validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
89+
Opts) ->
90+
case parse_information_unit(Value) of
91+
error ->
92+
{validation_failure,
93+
"Invalid value for --stream-max-segment-size-bytes, "
94+
"valid example values: 100gb, 50mb"};
95+
_ ->
96+
validate_stream_arguments(maps:remove(stream_max_segment_size_bytes,
97+
Opts))
98+
end;
99+
validate_stream_arguments(#{leader_locator := <<"client-local">>} =
100+
Opts) ->
101+
validate_stream_arguments(maps:remove(leader_locator, Opts));
102+
validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->
103+
validate_stream_arguments(maps:remove(leader_locator, Opts));
104+
validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
105+
Opts) ->
106+
validate_stream_arguments(maps:remove(leader_locator, Opts));
107+
validate_stream_arguments(#{leader_locator := _}) ->
108+
{validation_failure,
109+
"Invalid value for --leader-locator, valid values "
110+
"are client-local, random, least-leaders."};
111+
validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
112+
try
113+
case rabbit_data_coercion:to_integer(Value) of
114+
S when S > 0 ->
115+
validate_stream_arguments(maps:remove(initial_cluster_size,
116+
Opts));
117+
_ ->
118+
{validation_failure,
119+
"Invalid value for --initial-cluster-size, the "
120+
"value must be positive."}
121+
end
122+
catch
123+
error:_ ->
124+
{validation_failure,
125+
"Invalid value for --initial-cluster-size, the "
126+
"value must be a positive integer."}
127+
end;
128+
validate_stream_arguments(_) ->
129+
ok.
130+
131+
merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
132+
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
133+
merge_defaults(_Args, Opts) ->
134+
{_Args, maps:merge(#{partitions => 3, vhost => <<"/">>}, Opts)}.
135+
136+
usage() ->
137+
<<"add_super_stream <name> [--vhost <vhost>] [--partition"
138+
"s <partitions>] [--routing-keys <routing-keys>]">>.
139+
140+
usage_additional() ->
141+
[["<name>", "The name of the super stream."],
142+
["--vhost <vhost>", "The virtual host the super stream is added to."],
143+
["--partitions <partitions>",
144+
"The number of partitions, default is 3. Mutually "
145+
"exclusive with --routing-keys."],
146+
["--routing-keys <routing-keys>",
147+
"Comma-separated list of routing keys. Mutually "
148+
"exclusive with --partitions."],
149+
["--max-length-bytes <max-length-bytes>",
150+
"The maximum size of partition streams, example "
151+
"values: 20gb, 500mb."],
152+
["--max-age <max-age>",
153+
"The maximum age of partition stream segments, "
154+
"using the ISO 8601 duration format, e.g. PT10M30S "
155+
"for 10 minutes 30 seconds, P5DT8H for 5 days "
156+
"8 hours."],
157+
["--stream-max-segment-size-bytes <stream-max-segment-si"
158+
"ze-bytes>",
159+
"The maximum size of partition stream segments, "
160+
"example values: 500mb, 1gb."],
161+
["--leader-locator <leader-locator>",
162+
"Leader locator strategy for partition streams, "
163+
"possible values are client-local, least-leaders, "
164+
"random."],
165+
["--initial-cluster-size <initial-cluster-size>",
166+
"The initial cluster size of partition streams."]].
167+
168+
usage_doc_guides() ->
169+
[?STREAM_GUIDE_URL].
170+
171+
run([SuperStream],
172+
#{node := NodeName,
173+
vhost := VHost,
174+
timeout := Timeout,
175+
partitions := Partitions} =
176+
Opts) ->
177+
Streams =
178+
[list_to_binary(binary_to_list(SuperStream)
179+
++ "-"
180+
++ integer_to_list(K))
181+
|| K <- lists:seq(0, Partitions - 1)],
182+
RoutingKeys =
183+
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
184+
create_super_stream(NodeName,
185+
Timeout,
186+
VHost,
187+
SuperStream,
188+
Streams,
189+
stream_arguments(Opts),
190+
RoutingKeys);
191+
run([SuperStream],
192+
#{node := NodeName,
193+
vhost := VHost,
194+
timeout := Timeout,
195+
routing_keys := RoutingKeysStr} =
196+
Opts) ->
197+
RoutingKeys =
198+
[rabbit_data_coercion:to_binary(
199+
string:strip(K))
200+
|| K
201+
<- string:tokens(
202+
rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
203+
Streams =
204+
[list_to_binary(binary_to_list(SuperStream)
205+
++ "-"
206+
++ binary_to_list(K))
207+
|| K <- RoutingKeys],
208+
create_super_stream(NodeName,
209+
Timeout,
210+
VHost,
211+
SuperStream,
212+
Streams,
213+
stream_arguments(Opts),
214+
RoutingKeys).
215+
216+
stream_arguments(Opts) ->
217+
stream_arguments(#{}, Opts).
218+
219+
stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
220+
Acc;
221+
stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
222+
stream_arguments(maps:put(<<"max-length-bytes">>,
223+
parse_information_unit(Value), Acc),
224+
maps:remove(max_length_bytes, Arguments));
225+
stream_arguments(Acc, #{max_age := Value} = Arguments) ->
226+
{ok, Duration} = rabbit_date_time:parse_duration(Value),
227+
DurationInSeconds = duration_to_seconds(Duration),
228+
stream_arguments(maps:put(<<"max-age">>,
229+
list_to_binary(integer_to_list(DurationInSeconds)
230+
++ "s"),
231+
Acc),
232+
maps:remove(max_age, Arguments));
233+
stream_arguments(Acc,
234+
#{stream_max_segment_size_bytes := Value} = Arguments) ->
235+
stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>,
236+
parse_information_unit(Value), Acc),
237+
maps:remove(stream_max_segment_size_bytes, Arguments));
238+
stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) ->
239+
stream_arguments(maps:put(<<"initial-cluster-size">>,
240+
rabbit_data_coercion:to_binary(Value), Acc),
241+
maps:remove(initial_cluster_size, Arguments));
242+
stream_arguments(Acc, #{leader_locator := Value} = Arguments) ->
243+
stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc),
244+
maps:remove(leader_locator, Arguments));
245+
stream_arguments(ArgumentsAcc, _Arguments) ->
246+
ArgumentsAcc.
247+
248+
duration_to_seconds([{sign, _},
249+
{years, Y},
250+
{months, M},
251+
{days, D},
252+
{hours, H},
253+
{minutes, Mn},
254+
{seconds, S}]) ->
255+
Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.
256+
257+
create_super_stream(NodeName,
258+
Timeout,
259+
VHost,
260+
SuperStream,
261+
Streams,
262+
Arguments,
263+
RoutingKeys) ->
264+
case rabbit_misc:rpc_call(NodeName,
265+
rabbit_stream_manager,
266+
create_super_stream,
267+
[VHost,
268+
SuperStream,
269+
Streams,
270+
Arguments,
271+
RoutingKeys,
272+
cli_acting_user()],
273+
Timeout)
274+
of
275+
ok ->
276+
{ok,
277+
rabbit_misc:format("Super stream ~s has been created",
278+
[SuperStream])};
279+
Error ->
280+
Error
281+
end.
282+
283+
banner(_, _) ->
284+
<<"Adding a super stream (experimental feature)...">>.
285+
286+
output({error, Msg}, _Opts) ->
287+
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
288+
output({ok, Msg}, _Opts) ->
289+
{ok, Msg}.
290+
291+
cli_acting_user() ->
292+
'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
293+
294+
parse_information_unit(Value) ->
295+
case rabbit_resource_monitor_misc:parse_information_unit(Value) of
296+
{ok, R} ->
297+
integer_to_binary(R);
298+
{error, _} ->
299+
error
300+
end.

0 commit comments

Comments
 (0)