Skip to content

Commit a73b1a3

Browse files
committed
Add add_super_stream CLI command
1 parent 1476590 commit a73b1a3

File tree

3 files changed

+212
-8
lines changed

3 files changed

+212
-8
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
17+
18+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
24+
{'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
25+
26+
-export([scopes/0,
27+
usage/0,
28+
usage_additional/0,
29+
usage_doc_guides/0,
30+
switches/0,
31+
banner/2,
32+
validate/2,
33+
merge_defaults/2,
34+
run/2,
35+
output/2,
36+
description/0,
37+
help_section/0]).
38+
39+
scopes() ->
40+
[ctl, streams].
41+
42+
description() ->
43+
<<"Add a super stream (experimental feature)">>.
44+
45+
switches() ->
46+
[{partitions, integer}, {routing_keys, string}].
47+
48+
help_section() ->
49+
{plugin, stream}.
50+
51+
validate([], _Opts) ->
52+
{validation_failure, not_enough_args};
53+
validate([_Name], #{partitions := _, routing_keys := _}) ->
54+
{validation_failure,
55+
"Specify --partitions or routing-keys, not both."};
56+
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
57+
{validation_failure, "The partition number must be greater than 0"};
58+
validate([_Name], _Opts) ->
59+
ok;
60+
validate(_, _Opts) ->
61+
{validation_failure, too_many_args}.
62+
63+
merge_defaults(_Args, #{routing_keys := _V} = Opts) ->
64+
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)};
65+
merge_defaults(_Args, Opts) ->
66+
{_Args, maps:merge(#{partitions => 3, vhost => <<"/">>}, Opts)}.
67+
68+
usage() ->
69+
<<"add_super_stream <name> [--vhost <vhost>] [--partition"
70+
"s <partitions>] [--routing-keys <routing-keys>]">>.
71+
72+
usage_additional() ->
73+
[["<name>", "The name of the super stream."],
74+
["--vhost <vhost>", "The virtual host the super stream is added to."],
75+
["--partitions <partitions>",
76+
"The number of partitions, default is 3. Mutually "
77+
"exclusive with --routing-keys."],
78+
["--routing-keys <routing-keys>",
79+
"Comma-separated list of routing keys. Mutually "
80+
"exclusive with --partitions."]].
81+
82+
usage_doc_guides() ->
83+
[?STREAM_GUIDE_URL].
84+
85+
run([SuperStream],
86+
#{node := NodeName,
87+
vhost := VHost,
88+
timeout := Timeout,
89+
partitions := Partitions}) ->
90+
Streams =
91+
[list_to_binary(binary_to_list(SuperStream)
92+
++ "-"
93+
++ integer_to_list(K))
94+
|| K <- lists:seq(0, Partitions - 1)],
95+
RoutingKeys =
96+
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
97+
create_super_stream(NodeName,
98+
Timeout,
99+
VHost,
100+
SuperStream,
101+
Streams,
102+
RoutingKeys).
103+
104+
create_super_stream(NodeName,
105+
Timeout,
106+
VHost,
107+
SuperStream,
108+
Streams,
109+
RoutingKeys) ->
110+
case rabbit_misc:rpc_call(NodeName,
111+
rabbit_stream_manager,
112+
create_super_stream,
113+
[VHost,
114+
SuperStream,
115+
Streams,
116+
[],
117+
RoutingKeys,
118+
cli_acting_user()],
119+
Timeout)
120+
of
121+
ok ->
122+
{ok,
123+
rabbit_misc:format("Super stream ~s has been created",
124+
[SuperStream])};
125+
Error ->
126+
Error
127+
end.
128+
129+
banner(_, _) ->
130+
<<"Adding a super stream ...">>.
131+
132+
output({error, Msg}, _Opts) ->
133+
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
134+
output({ok, Msg}, _Opts) ->
135+
{ok, Msg}.
136+
137+
cli_acting_user() ->
138+
'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,12 @@ validate_super_stream_creation(VirtualHost, Name, Partitions) ->
559559
{error, validation_failed} ->
560560
{error,
561561
{validation_failed,
562-
rabbit_misc:format("~p is not a correct name for a super stream",
562+
rabbit_misc:format("~s is not a correct name for a super stream",
563563
[Name])}};
564564
{ok, true} ->
565565
{error,
566566
{reference_already_exists,
567-
rabbit_misc:format("there is already an exchange named ~p",
567+
rabbit_misc:format("there is already an exchange named ~s",
568568
[Name])}};
569569
{ok, false} ->
570570
case check_already_existing_queue(VirtualHost, Partitions) of
@@ -615,11 +615,11 @@ check_already_existing_queue0(VirtualHost, [Q | T], _Error) ->
615615
{ok, true} ->
616616
{error,
617617
{reference_already_exists,
618-
rabbit_misc:format("there is already a queue named ~p", [Q])}};
618+
rabbit_misc:format("there is already a queue named ~s", [Q])}};
619619
{error, validation_failed} ->
620620
{error,
621621
{validation_failed,
622-
rabbit_misc:format("~p is not a correct name for a queue", [Q])}}
622+
rabbit_misc:format("~s is not a correct name for a queue", [Q])}}
623623
end.
624624

625625
declare_super_stream_exchange(VirtualHost, Name, Username) ->
@@ -736,7 +736,7 @@ add_super_stream_binding(VirtualHost,
736736
{error, {resources_missing, [{absent, Q, _Reason} | _]}} ->
737737
{error,
738738
{stream_not_found,
739-
rabbit_misc:format("stream ~p does not exists (absent", [Q])}};
739+
rabbit_misc:format("stream ~s does not exists (absent)", [Q])}};
740740
{error, binding_not_found} ->
741741
{error,
742742
{not_found,

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand').
2424
-define(COMMAND_LIST_PUBLISHERS,
2525
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
26+
-define(COMMAND_ADD_SUPER_STREAM,
27+
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
2628

2729
all() ->
28-
[{group, list_connections}, {group, list_consumers},
29-
{group, list_publishers}].
30+
[{group, list_connections},
31+
{group, list_consumers},
32+
{group, list_publishers},
33+
{super_streams}].
3034

3135
groups() ->
3236
[{list_connections, [],
@@ -35,7 +39,10 @@ groups() ->
3539
{list_consumers, [],
3640
[list_consumers_merge_defaults, list_consumers_run]},
3741
{list_publishers, [],
38-
[list_publishers_merge_defaults, list_publishers_run]}].
42+
[list_publishers_merge_defaults, list_publishers_run]},
43+
{super_streams, [],
44+
[add_super_stream_merge_defaults, add_super_stream_validate,
45+
add_super_stream_run]}].
3946

4047
init_per_suite(Config) ->
4148
case rabbit_ct_helpers:is_mixed_versions() of
@@ -309,6 +316,65 @@ list_publishers_run(Config) ->
309316
?awaitMatch(0, publisher_count(Config), ?WAIT),
310317
ok.
311318

319+
add_super_stream_merge_defaults(_Config) ->
320+
?assertMatch({[<<"super-stream">>],
321+
#{partitions := 3, vhost := <<"/">>}},
322+
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
323+
#{})),
324+
325+
?assertMatch({[<<"super-stream">>],
326+
#{partitions := 5, vhost := <<"/">>}},
327+
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
328+
#{partitions => 5})),
329+
330+
DefaultWithRoutingKeys =
331+
?COMMAND_ADD_SUPER_STREAM:merge_defaults([<<"super-stream">>],
332+
#{routing_keys =>
333+
<<"amer,emea,apac">>}),
334+
?assertMatch({[<<"super-stream">>],
335+
#{routing_keys := <<"amer,emea,apac">>, vhost := <<"/">>}},
336+
DefaultWithRoutingKeys),
337+
338+
{_, Opts} = DefaultWithRoutingKeys,
339+
?assertEqual(false, maps:is_key(partitions, Opts)).
340+
341+
add_super_stream_validate(_Config) ->
342+
?assertMatch({validation_failure, not_enough_args},
343+
?COMMAND_ADD_SUPER_STREAM:validate([], #{})),
344+
?assertMatch({validation_failure, too_many_args},
345+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>, <<"b">>], #{})),
346+
?assertMatch({validation_failure, _},
347+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
348+
#{partitions => 1,
349+
routing_keys =>
350+
<<"a,b,c">>})),
351+
?assertMatch({validation_failure, _},
352+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
353+
#{partitions => 0})),
354+
?assertEqual(ok,
355+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
356+
#{partitions => 5})),
357+
?assertEqual(ok,
358+
?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>],
359+
#{routing_keys =>
360+
<<"a,b,c">>})),
361+
ok.
362+
363+
add_super_stream_run(Config) ->
364+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
365+
Opts =
366+
#{node => Node,
367+
timeout => 10000,
368+
vhost => <<"/">>,
369+
partitions => 3},
370+
371+
?assertMatch({ok, _},
372+
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], Opts)),
373+
?assertEqual({ok,
374+
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
375+
rabbit_stream_manager_SUITE:partitions(Config,
376+
<<"invoices">>)).
377+
312378
create_stream(S, Stream, C0) ->
313379
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).
314380

0 commit comments

Comments
 (0)