Skip to content

Commit 6b9589b

Browse files
committed
Handle stream arguments in add_super_stream command
max-age, leader-locator, etc.
1 parent ecbd969 commit 6b9589b

File tree

6 files changed

+295
-11
lines changed

6 files changed

+295
-11
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_date_time).
9+
10+
-export([parse_duration/1]).
11+
12+
-type datetime_plist() :: list({atom(), integer()}).
13+
14+
% from https://github.com/erlsci/iso8601/blob/main/src/iso8601.erl
15+
-spec gi(string()) -> integer().
16+
gi(DS) ->
17+
{Int, _Rest} = string:to_integer(DS),
18+
case Int of
19+
error ->
20+
0;
21+
_ ->
22+
Int
23+
end.
24+
25+
-spec parse_duration(string()) -> datetime_plist().
26+
parse_duration(Bin)
27+
when is_binary(Bin) -> %TODO extended format
28+
parse_duration(binary_to_list(Bin));
29+
parse_duration(Str) ->
30+
case re:run(Str,
31+
"^(?<sign>-|\\+)?P(?:(?<years>[0-9]+)Y)?(?:(?<months>[0"
32+
"-9]+)M)?(?:(?<days>[0-9]+)D)?(T(?:(?<hours>[0-9]+)H)?("
33+
"?:(?<minutes>[0-9]+)M)?(?:(?<seconds>[0-9]+(?:\\.[0-9]"
34+
"+)?)S)?)?$",
35+
[{capture, [sign, years, months, days, hours, minutes, seconds],
36+
list}])
37+
of
38+
{match, [Sign, Years, Months, Days, Hours, Minutes, Seconds]} ->
39+
{ok, [{sign, Sign},
40+
{years, gi(Years)},
41+
{months, gi(Months)},
42+
{days, gi(Days)},
43+
{hours, gi(Hours)},
44+
{minutes, gi(Minutes)},
45+
{seconds, gi(Seconds)}]};
46+
nomatch ->
47+
error
48+
end.

deps/rabbit_common/test/unit_SUITE.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ groups() ->
4444
frame_encoding_does_not_fail_with_empty_binary_payload,
4545
amqp_table_conversion,
4646
name_type,
47-
get_erl_path
47+
get_erl_path,
48+
date_time_parse_duration
4849
]},
4950
{parse_mem_limit, [parallel], [
5051
parse_mem_limit_relative_exactly_max,
@@ -460,3 +461,23 @@ get_erl_path(_) ->
460461
?assertNotMatch(nomatch, string:find(Exe, "erl"))
461462
end,
462463
ok.
464+
465+
date_time_parse_duration(_) ->
466+
?assertEqual(
467+
{ok, [{sign, "+"}, {years, 6}, {months, 3}, {days, 1}, {hours, 1}, {minutes, 1}, {seconds, 1}]},
468+
rabbit_date_time:parse_duration("+P6Y3M1DT1H1M1.1S")
469+
),
470+
?assertEqual(
471+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 6}, {seconds, 0}]},
472+
rabbit_date_time:parse_duration("PT6M")
473+
),
474+
?assertEqual(
475+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 0}, {hours, 0}, {minutes, 10}, {seconds, 30}]},
476+
rabbit_date_time:parse_duration("PT10M30S")
477+
),
478+
?assertEqual(
479+
{ok, [{sign, []}, {years, 0}, {months, 0}, {days, 5}, {hours, 8}, {minutes, 0}, {seconds, 0}]},
480+
rabbit_date_time:parse_duration("P5DT8H")
481+
),
482+
?assertEqual(error, rabbit_date_time:parse_duration("foo")),
483+
ok.

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

Lines changed: 148 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ description() ->
4343
<<"Add a super stream (experimental feature)">>.
4444

4545
switches() ->
46-
[{partitions, integer}, {routing_keys, string}].
46+
[{partitions, integer},
47+
{routing_keys, string},
48+
{max_length_bytes, string},
49+
{max_age, string},
50+
{stream_max_segment_size_bytes, string},
51+
{leader_locator, string},
52+
{initial_cluster_size, integer}].
4753

4854
help_section() ->
4955
{plugin, stream}.
@@ -55,11 +61,73 @@ validate([_Name], #{partitions := _, routing_keys := _}) ->
5561
"Specify --partitions or routing-keys, not both."};
5662
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
5763
{validation_failure, "The partition number must be greater than 0"};
58-
validate([_Name], _Opts) ->
59-
ok;
64+
validate([_Name], Opts) ->
65+
validate_stream_arguments(Opts);
6066
validate(_, _Opts) ->
6167
{validation_failure, too_many_args}.
6268

69+
validate_stream_arguments(#{max_length_bytes := Value} = Opts) ->
70+
case parse_information_unit(Value) of
71+
error ->
72+
{validation_failure,
73+
"Invalid value for --max-length-bytes, valid example "
74+
"values: 100gb, 50mb"};
75+
_ ->
76+
validate_stream_arguments(maps:remove(max_length_bytes, Opts))
77+
end;
78+
validate_stream_arguments(#{max_age := Value} = Opts) ->
79+
case rabbit_date_time:parse_duration(Value) of
80+
{ok, _} ->
81+
validate_stream_arguments(maps:remove(max_age, Opts));
82+
error ->
83+
{validation_failure,
84+
"Invalid value for --max-age, the value must a "
85+
"ISO 8601 duration, e.g. e.g. PT10M30S for 10 "
86+
"minutes 30 seconds, P5DT8H for 5 days 8 hours."}
87+
end;
88+
validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
89+
Opts) ->
90+
case parse_information_unit(Value) of
91+
error ->
92+
{validation_failure,
93+
"Invalid value for --stream-max-segment-size-bytes, "
94+
"valid example values: 100gb, 50mb"};
95+
_ ->
96+
validate_stream_arguments(maps:remove(stream_max_segment_size_bytes,
97+
Opts))
98+
end;
99+
validate_stream_arguments(#{leader_locator := <<"client-local">>} =
100+
Opts) ->
101+
validate_stream_arguments(maps:remove(leader_locator, Opts));
102+
validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->
103+
validate_stream_arguments(maps:remove(leader_locator, Opts));
104+
validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
105+
Opts) ->
106+
validate_stream_arguments(maps:remove(leader_locator, Opts));
107+
validate_stream_arguments(#{leader_locator := _}) ->
108+
{validation_failure,
109+
"Invalid value for --leader-locator, valid values "
110+
"are client-local, random, least-leaders."};
111+
validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
112+
try
113+
case rabbit_data_coercion:to_integer(Value) of
114+
S when S > 0 ->
115+
validate_stream_arguments(maps:remove(initial_cluster_size,
116+
Opts));
117+
_ ->
118+
{validation_failure,
119+
"Invalid value for --initial-cluster-size, the "
120+
"value must be positive."}
121+
end
122+
catch
123+
error:_ ->
124+
{validation_failure,
125+
"Invalid value for --initial-cluster-size, the "
126+
"value must be a positive integer."}
127+
end;
128+
validate_stream_arguments(_) ->
129+
ok.
130+
63131
merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
64132
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
65133
merge_defaults(_Args, Opts) ->
@@ -77,7 +145,25 @@ usage_additional() ->
77145
"exclusive with --routing-keys."],
78146
["--routing-keys <routing-keys>",
79147
"Comma-separated list of routing keys. Mutually "
80-
"exclusive with --partitions."]].
148+
"exclusive with --partitions."],
149+
["--max-length-bytes <max-length-bytes>",
150+
"The maximum size of partition streams, example "
151+
"values: 20gb, 500mb."],
152+
["--max-age <max-age>",
153+
"The maximum age of partition stream segments, "
154+
"using the ISO 8601 duration format, e.g. PT10M30S "
155+
"for 10 minutes 30 seconds, P5DT8H for 5 days "
156+
"8 hours."],
157+
["--stream-max-segment-size-bytes <stream-max-segment-si"
158+
"ze-bytes>",
159+
"The maximum size of partition stream segments, "
160+
"example values: 500mb, 1gb."],
161+
["--leader-locator <leader-locator>",
162+
"Leader locator strategy for partition streams, "
163+
"possible values are client-local, least-leaders, "
164+
"random."],
165+
["--initial-cluster-size <initial-cluster-size>",
166+
"The initial cluster size of partition streams."]].
81167

82168
usage_doc_guides() ->
83169
[?STREAM_GUIDE_URL].
@@ -86,7 +172,8 @@ run([SuperStream],
86172
#{node := NodeName,
87173
vhost := VHost,
88174
timeout := Timeout,
89-
partitions := Partitions}) ->
175+
partitions := Partitions} =
176+
Opts) ->
90177
Streams =
91178
[list_to_binary(binary_to_list(SuperStream)
92179
++ "-"
@@ -99,12 +186,14 @@ run([SuperStream],
99186
VHost,
100187
SuperStream,
101188
Streams,
189+
stream_arguments(Opts),
102190
RoutingKeys);
103191
run([SuperStream],
104192
#{node := NodeName,
105193
vhost := VHost,
106194
timeout := Timeout,
107-
routing_keys := RoutingKeysStr}) ->
195+
routing_keys := RoutingKeysStr} =
196+
Opts) ->
108197
RoutingKeys =
109198
[rabbit_data_coercion:to_binary(
110199
string:strip(K))
@@ -121,21 +210,64 @@ run([SuperStream],
121210
VHost,
122211
SuperStream,
123212
Streams,
213+
stream_arguments(Opts),
124214
RoutingKeys).
125215

216+
stream_arguments(Opts) ->
217+
stream_arguments(#{}, Opts).
218+
219+
stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 ->
220+
Acc;
221+
stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) ->
222+
stream_arguments(maps:put(<<"max-length-bytes">>,
223+
parse_information_unit(Value), Acc),
224+
maps:remove(max_length_bytes, Arguments));
225+
stream_arguments(Acc, #{max_age := Value} = Arguments) ->
226+
{ok, Duration} = rabbit_date_time:parse_duration(Value),
227+
DurationInSeconds = duration_to_seconds(Duration),
228+
stream_arguments(maps:put(<<"max-age">>,
229+
list_to_binary(integer_to_list(DurationInSeconds)
230+
++ "s"),
231+
Acc),
232+
maps:remove(max_age, Arguments));
233+
stream_arguments(Acc,
234+
#{stream_max_segment_size_bytes := Value} = Arguments) ->
235+
stream_arguments(maps:put(<<"stream-max-segment-size-bytes">>,
236+
parse_information_unit(Value), Acc),
237+
maps:remove(stream_max_segment_size_bytes, Arguments));
238+
stream_arguments(Acc, #{initial_cluster_size := Value} = Arguments) ->
239+
stream_arguments(maps:put(<<"initial-cluster-size">>,
240+
rabbit_data_coercion:to_binary(Value), Acc),
241+
maps:remove(initial_cluster_size, Arguments));
242+
stream_arguments(Acc, #{leader_locator := Value} = Arguments) ->
243+
stream_arguments(maps:put(<<"queue-leader-locator">>, Value, Acc),
244+
maps:remove(leader_locator, Arguments));
245+
stream_arguments(ArgumentsAcc, _Arguments) ->
246+
ArgumentsAcc.
247+
248+
duration_to_seconds([{sign, _},
249+
{years, Y},
250+
{months, M},
251+
{days, D},
252+
{hours, H},
253+
{minutes, Mn},
254+
{seconds, S}]) ->
255+
Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.
256+
126257
create_super_stream(NodeName,
127258
Timeout,
128259
VHost,
129260
SuperStream,
130261
Streams,
262+
Arguments,
131263
RoutingKeys) ->
132264
case rabbit_misc:rpc_call(NodeName,
133265
rabbit_stream_manager,
134266
create_super_stream,
135267
[VHost,
136268
SuperStream,
137269
Streams,
138-
[],
270+
Arguments,
139271
RoutingKeys,
140272
cli_acting_user()],
141273
Timeout)
@@ -149,7 +281,7 @@ create_super_stream(NodeName,
149281
end.
150282

151283
banner(_, _) ->
152-
<<"Adding a super stream ...">>.
284+
<<"Adding a super stream (experimental feature)...">>.
153285

154286
output({error, Msg}, _Opts) ->
155287
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
@@ -158,3 +290,11 @@ output({ok, Msg}, _Opts) ->
158290

159291
cli_acting_user() ->
160292
'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().
293+
294+
parse_information_unit(Value) ->
295+
case rabbit_resource_monitor_misc:parse_information_unit(Value) of
296+
{ok, R} ->
297+
integer_to_binary(R);
298+
{error, _} ->
299+
error
300+
end.

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ delete_super_stream(NodeName, Timeout, VHost, SuperStream) ->
8686
end.
8787

8888
banner(_, _) ->
89-
<<"Deleting a super stream ...">>.
89+
<<"Deleting a super stream (experimental feature)...">>.
9090

9191
output({error, Msg}, _Opts) ->
9292
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};

0 commit comments

Comments
 (0)