Skip to content

Commit b0bd5f8

Browse files
committed
Add delete_super_stream CLI command
1 parent a73b1a3 commit b0bd5f8

File tree

3 files changed

+170
-8
lines changed

3 files changed

+170
-8
lines changed

deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,28 @@ run([SuperStream],
9494
|| K <- lists:seq(0, Partitions - 1)],
9595
RoutingKeys =
9696
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
97+
create_super_stream(NodeName,
98+
Timeout,
99+
VHost,
100+
SuperStream,
101+
Streams,
102+
RoutingKeys);
103+
run([SuperStream],
104+
#{node := NodeName,
105+
vhost := VHost,
106+
timeout := Timeout,
107+
routing_keys := RoutingKeysStr}) ->
108+
RoutingKeys =
109+
[rabbit_data_coercion:to_binary(
110+
string:strip(K))
111+
|| K
112+
<- string:tokens(
113+
rabbit_data_coercion:to_list(RoutingKeysStr), ",")],
114+
Streams =
115+
[list_to_binary(binary_to_list(SuperStream)
116+
++ "-"
117+
++ binary_to_list(K))
118+
|| K <- RoutingKeys],
97119
create_super_stream(NodeName,
98120
Timeout,
99121
VHost,
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
17+
18+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-ignore_xref([{'Elixir.RabbitMQ.CLI.DefaultOutput', output, 1},
23+
{'Elixir.RabbitMQ.CLI.Core.Helpers', cli_acting_user, 0},
24+
{'Elixir.RabbitMQ.CLI.Core.ExitCodes', exit_software, 0}]).
25+
26+
-export([scopes/0,
27+
usage/0,
28+
usage_additional/0,
29+
usage_doc_guides/0,
30+
banner/2,
31+
validate/2,
32+
merge_defaults/2,
33+
run/2,
34+
output/2,
35+
description/0,
36+
help_section/0]).
37+
38+
scopes() ->
39+
[ctl, streams].
40+
41+
description() ->
42+
<<"Delete a super stream (experimental feature)">>.
43+
44+
help_section() ->
45+
{plugin, stream}.
46+
47+
validate([], _Opts) ->
48+
{validation_failure, not_enough_args};
49+
validate([_Name], _Opts) ->
50+
ok;
51+
validate(_, _Opts) ->
52+
{validation_failure, too_many_args}.
53+
54+
merge_defaults(_Args, Opts) ->
55+
{_Args, maps:merge(#{vhost => <<"/">>}, Opts)}.
56+
57+
usage() ->
58+
<<"delete_super_stream <name> [--vhost <vhost>]">>.
59+
60+
usage_additional() ->
61+
[["<name>", "The name of the super stream to delete."],
62+
["--vhost <vhost>", "The virtual host of the super stream."]].
63+
64+
usage_doc_guides() ->
65+
[?STREAM_GUIDE_URL].
66+
67+
run([SuperStream],
68+
#{node := NodeName,
69+
vhost := VHost,
70+
timeout := Timeout}) ->
71+
delete_super_stream(NodeName, Timeout, VHost, SuperStream).
72+
73+
delete_super_stream(NodeName, Timeout, VHost, SuperStream) ->
74+
case rabbit_misc:rpc_call(NodeName,
75+
rabbit_stream_manager,
76+
delete_super_stream,
77+
[VHost, SuperStream, cli_acting_user()],
78+
Timeout)
79+
of
80+
ok ->
81+
{ok,
82+
rabbit_misc:format("Super stream ~s has been deleted",
83+
[SuperStream])};
84+
Error ->
85+
Error
86+
end.
87+
88+
banner(_, _) ->
89+
<<"Deleting a super stream ...">>.
90+
91+
output({error, Msg}, _Opts) ->
92+
{error, 'Elixir.RabbitMQ.CLI.Core.ExitCodes':exit_software(), Msg};
93+
output({ok, Msg}, _Opts) ->
94+
{ok, Msg}.
95+
96+
cli_acting_user() ->
97+
'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user().

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand').
2626
-define(COMMAND_ADD_SUPER_STREAM,
2727
'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand').
28+
-define(COMMAND_DELETE_SUPER_STREAM,
29+
'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand').
2830

2931
all() ->
3032
[{group, list_connections},
@@ -41,8 +43,11 @@ groups() ->
4143
{list_publishers, [],
4244
[list_publishers_merge_defaults, list_publishers_run]},
4345
{super_streams, [],
44-
[add_super_stream_merge_defaults, add_super_stream_validate,
45-
add_super_stream_run]}].
46+
[add_super_stream_merge_defaults,
47+
add_super_stream_validate,
48+
delete_super_stream_merge_defaults,
49+
delete_super_stream_validate,
50+
add_delete_super_stream_run]}].
4651

4752
init_per_suite(Config) ->
4853
case rabbit_ct_helpers:is_mixed_versions() of
@@ -360,20 +365,58 @@ add_super_stream_validate(_Config) ->
360365
<<"a,b,c">>})),
361366
ok.
362367

363-
add_super_stream_run(Config) ->
368+
delete_super_stream_merge_defaults(_Config) ->
369+
?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}},
370+
?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>],
371+
#{})),
372+
ok.
373+
374+
delete_super_stream_validate(_Config) ->
375+
?assertMatch({validation_failure, not_enough_args},
376+
?COMMAND_DELETE_SUPER_STREAM:validate([], #{})),
377+
?assertMatch({validation_failure, too_many_args},
378+
?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>],
379+
#{})),
380+
?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})),
381+
ok.
382+
383+
add_delete_super_stream_run(Config) ->
364384
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
365385
Opts =
366386
#{node => Node,
367387
timeout => 10000,
368-
vhost => <<"/">>,
369-
partitions => 3},
388+
vhost => <<"/">>},
370389

371390
?assertMatch({ok, _},
372-
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], Opts)),
391+
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
392+
maps:merge(#{partitions => 3},
393+
Opts))),
373394
?assertEqual({ok,
374395
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
375-
rabbit_stream_manager_SUITE:partitions(Config,
376-
<<"invoices">>)).
396+
partitions(Config, <<"invoices">>)),
397+
?assertMatch({ok, _},
398+
?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
399+
?assertEqual({error, stream_not_found},
400+
partitions(Config, <<"invoices">>)),
401+
402+
?assertMatch({ok, _},
403+
?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>],
404+
maps:merge(#{routing_keys =>
405+
<<" amer,emea , apac">>},
406+
Opts))),
407+
?assertEqual({ok,
408+
[<<"invoices-amer">>, <<"invoices-emea">>,
409+
<<"invoices-apac">>]},
410+
partitions(Config, <<"invoices">>)),
411+
?assertMatch({ok, _},
412+
?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)),
413+
?assertEqual({error, stream_not_found},
414+
partitions(Config, <<"invoices">>)),
415+
416+
ok.
417+
418+
partitions(Config, SuperStream) ->
419+
rabbit_stream_manager_SUITE:partitions(Config, SuperStream).
377420

378421
create_stream(S, Stream, C0) ->
379422
rabbit_stream_SUITE:test_create_stream(gen_tcp, S, Stream, C0).

0 commit comments

Comments
 (0)