Skip to content

Commit 7dfebb9

Browse files
Merge pull request #9924 from rabbitmq/add-super-stream-binding-keys-alias
Add --binding-keys to add_super_stream command
2 parents edbf454 + 7399281 commit 7dfebb9

File tree

3 files changed

+53
-19
lines changed

3 files changed

+53
-19
lines changed

deps/rabbit/docs/rabbitmq-streams.8

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,16 +307,16 @@ for each producer:
307307
.sp
308308
.Dl rabbitmq-streams list_stream_publishers connection_pid publisher_id stream
309309
.\" ------------------------------------------------------------------
310-
.It Cm add_super_stream Ar super-stream Oo Fl -vhost Ar vhost Oc Oo Fl -partitions Ar partitions Oc Oo Fl -routing-keys Ar routing-keys Oc Oo Fl -max-length-bytes Ar max-length-bytes Oc Oo Fl -max-age Ar max-age Oc Oo Fl -stream-max-segment-size-bytes Ar stream-max-segment-size-bytes Oc Oo Fl -leader-locator Ar leader-locator Oc Oo Fl -initial-cluster-size Ar initial-cluster-size Oc
310+
.It Cm add_super_stream Ar super-stream Oo Fl -vhost Ar vhost Oc Oo Fl -partitions Ar partitions Oc Oo Fl -binding-keys Ar binding-keys Oc Oo Fl -max-length-bytes Ar max-length-bytes Oc Oo Fl -max-age Ar max-age Oc Oo Fl -stream-max-segment-size-bytes Ar stream-max-segment-size-bytes Oc Oo Fl -leader-locator Ar leader-locator Oc Oo Fl -initial-cluster-size Ar initial-cluster-size Oc
311311
.Bl -tag -width Ds
312312
.It Ar super-stream
313313
The name of the super stream to create.
314314
.It Ar vhost
315315
The name of the virtual host to create the super stream into.
316316
.It Ar partitions
317317
The number of partitions the super stream will have.
318-
.It Ar routing-keys
319-
Comma-separated list of routing keys.
318+
.It Ar binding-keys
319+
Comma-separated list of binding keys.
320320
.It Ar max-length-bytes
321321
The maximum size of partition streams, example values: 20gb, 500mb.
322322
.It Ar max-age

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ description() ->
4040

4141
switches() ->
4242
[{partitions, integer},
43+
{binding_keys, string},
4344
{routing_keys, string},
4445
{max_length_bytes, string},
4546
{max_age, string},
@@ -52,9 +53,15 @@ help_section() ->
5253

5354
validate([], _Opts) ->
5455
{validation_failure, not_enough_args};
56+
validate([_Name], #{routing_keys := _, binding_keys := _}) ->
57+
{validation_failure,
58+
"Specify --binding-keys only."};
59+
validate([_Name], #{partitions := _, binding_keys := _}) ->
60+
{validation_failure,
61+
"Specify --partitions or --binding-keys, not both."};
5562
validate([_Name], #{partitions := _, routing_keys := _}) ->
5663
{validation_failure,
57-
"Specify --partitions or routing-keys, not both."};
64+
"Specify --partitions or --binding-keys, not both."};
5865
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
5966
{validation_failure, "The partition number must be greater than 0"};
6067
validate([_Name], Opts) ->
@@ -128,14 +135,17 @@ validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
128135
validate_stream_arguments(_) ->
129136
ok.
130137

131-
merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
138+
merge_defaults(_Args, #{binding_keys := _V} = Opts) ->
132139
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
140+
merge_defaults(_Args, #{routing_keys := RKs} = Opts) ->
141+
{_Args, maps:merge(#{vhost => <<"/">>, binding_keys => RKs},
142+
maps:remove(routing_keys, Opts))};
133143
merge_defaults(_Args, Opts) ->
134144
{_Args, maps:merge(#{partitions => 3, vhost => <<"/">>}, Opts)}.
135145

136146
usage() ->
137147
<<"add_super_stream <name> [--vhost <vhost>] [--partition"
138-
"s <partitions>] [--routing-keys <routing-keys>]">>.
148+
"s <partitions>] [--binding-keys <binding-keys>]">>.
139149

140150
usage_additional() ->
141151
[[<<"<name>">>,
@@ -144,8 +154,8 @@ usage_additional() ->
144154
<<"The virtual host the super stream is added to.">>],
145155
[<<"--partitions <partitions>">>,
146156
<<"The number of partitions, default is 3. Mutually exclusive with --routing-keys.">>],
147-
[<<"--routing-keys <routing-keys>">>,
148-
<<"Comma-separated list of routing keys. Mutually exclusive with --partitions.">>],
157+
[<<"--binding-keys <binding-keys>">>,
158+
<<"Comma-separated list of binding keys. Mutually exclusive with --partitions.">>],
149159
[<<"--max-length-bytes <max-length-bytes>">>,
150160
<<"The maximum size of partition streams, example values: 20gb, 500mb.">>],
151161
[<<"--max-age <max-age>">>,
@@ -184,26 +194,26 @@ run([SuperStream],
184194
#{node := NodeName,
185195
vhost := VHost,
186196
timeout := Timeout,
187-
routing_keys := RoutingKeysStr} =
197+
binding_keys := BindingKeysStr} =
188198
Opts) ->
189-
RoutingKeys =
199+
BindingKeys =
190200
[rabbit_data_coercion:to_binary(
191201
string:strip(K))
192202
|| K
193203
<- string:tokens(
194-
rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
204+
rabbit_data_coercion:to_list(BindingKeysStr), ",")],
195205
Streams =
196206
[list_to_binary(binary_to_list(SuperStream)
197207
++ "-"
198208
++ binary_to_list(K))
199-
|| K <- RoutingKeys],
209+
|| K <- BindingKeys],
200210
create_super_stream(NodeName,
201211
Timeout,
202212
VHost,
203213
SuperStream,
204214
Streams,
205215
stream_arguments(Opts),
206-
RoutingKeys).
216+
BindingKeys).
207217

208218
stream_arguments(Opts) ->
209219
stream_arguments(#{}, Opts).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -650,17 +650,27 @@ add_super_stream_merge_defaults(_Config) ->
650650
#{partitions := 5, vhost := <<"/">>}},
651651
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
652652
#{partitions => 5})),
653+
DefaultWithBindingKeys =
654+
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
655+
#{binding_keys =>
656+
<<"amer,emea,apac">>}),
657+
?assertMatch({[<<"super-stream">>],
658+
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
659+
DefaultWithBindingKeys),
660+
661+
{_, OptsBks} = DefaultWithBindingKeys,
662+
?assertEqual(false, maps:is_key(partitions, OptsBks)),
653663

654664
DefaultWithRoutingKeys =
655665
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
656666
#{routing_keys =>
657667
<<"amer,emea,apac">>}),
658668
?assertMatch({[<<"super-stream">>],
659-
#{routing_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
669+
#{binding_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
660670
DefaultWithRoutingKeys),
661671

662-
{_, Opts} = DefaultWithRoutingKeys,
663-
?assertEqual(false, maps:is_key(partitions, Opts)).
672+
{_, OptsRks} = DefaultWithRoutingKeys,
673+
?assertEqual(false, maps:is_key(partitions, OptsRks)).
664674

665675
add_super_stream_validate(_Config) ->
666676
?assertMatch({validation_failure, not_enough_args},
@@ -672,6 +682,17 @@ add_super_stream_validate(_Config) ->
672682
#{partitions => 1,
673683
routing_keys =>
674684
<<"a,b,c">>})),
685+
?assertMatch({validation_failure, _},
686+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
687+
#{partitions => 1,
688+
binding_keys => <<"a,b,c">>})),
689+
690+
?assertMatch({validation_failure, _},
691+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
692+
#{routing_keys => 1,
693+
binding_keys => <<"a,b,c">>}
694+
)),
695+
675696
?assertMatch({validation_failure, _},
676697
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
677698
#{partitions => 0})),
@@ -682,6 +703,10 @@ add_super_stream_validate(_Config) ->
682703
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
683704
#{routing_keys =>
684705
<<"a,b,c">>})),
706+
?assertEqual(ok,
707+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
708+
#{binding_keys =>
709+
<<"a,b,c">>})),
685710

686711
[case Expected of
687712
ok ->
@@ -752,11 +777,10 @@ add_delete_super_stream_run(Config) ->
752777
?assertEqual({error, stream_not_found},
753778
partitions(Config, <<"invoices">>)),
754779

755-
% with routing keys
780+
% with binding keys
756781
?assertMatch({ok, _},
757782
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
758-
maps:merge(#{routing_keys =>
759-
<<" amer,emea , apac">>},
783+
maps:merge(#{binding_keys => <<" amer,emea , apac">>},
760784
Opts))),
761785
?assertEqual({ok,
762786
[<<"invoices-amer">>, <<"invoices-emea">>,

0 commit comments

Comments
 (0)