Skip to content

Commit 29b9124

Browse files
Merge pull request #4242 from rabbitmq/mk-rabbitmq-server-4127-followup
Make it possible to delete or restart a dynamic Shovel on/from any cluster node (take two)
2 parents 4a2f61a + a252093 commit 29b9124

9 files changed

+293
-83
lines changed

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
5+
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
66
%%
77

88
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand').
@@ -68,15 +68,32 @@ banner([Name], #{vhost := VHost}) ->
6868

6969
run([Name], #{node := Node, vhost := VHost}) ->
7070
ActingUser = 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(),
71-
case rabbit_misc:rpc_call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of
71+
72+
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
7273
{badrpc, _} = Error ->
7374
Error;
74-
{error, not_found} ->
75+
Xs when is_list(Xs) ->
7576
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
7677
"on the target node '~s' and / or virtual host '~s'",
7778
[Node, VHost]),
78-
{error, rabbit_data_coercion:to_binary(ErrMsg)};
79-
ok -> ok
79+
case rabbit_shovel_status:find_matching_shovel(VHost, Name, Xs) of
80+
undefined ->
81+
{error, rabbit_data_coercion:to_binary(ErrMsg)};
82+
Match ->
83+
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
84+
{_, HostingNode} = lists:keyfind(node, 1, Opts),
85+
case rabbit_misc:rpc_call(
86+
HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of
87+
{badrpc, _} = Error ->
88+
Error;
89+
{error, not_found} ->
90+
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
91+
"on the target node '~s' and / or virtual host '~s'",
92+
[Node, VHost]),
93+
{error, rabbit_data_coercion:to_binary(ErrMsg)};
94+
ok -> ok
95+
end
96+
end
8097
end.
8198

8299
switches() ->

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,28 @@ banner([Name], #{node := Node, vhost := VHost}) ->
5454
<< " on node ">>, atom_to_binary(Node, utf8)]).
5555

5656
run([Name], #{node := Node, vhost := VHost}) ->
57-
case rabbit_misc:rpc_call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name]) of
57+
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
5858
{badrpc, _} = Error ->
5959
Error;
60-
{error, not_found} ->
60+
Xs when is_list(Xs) ->
6161
ErrMsg = rabbit_misc:format("Shovel with the given name was not found "
6262
"on the target node '~s' and / or virtual host '~s'",
6363
[Node, VHost]),
64-
{error, rabbit_data_coercion:to_binary(ErrMsg)};
65-
ok -> ok
64+
case rabbit_shovel_status:find_matching_shovel(VHost, Name, Xs) of
65+
undefined ->
66+
{error, rabbit_data_coercion:to_binary(ErrMsg)};
67+
Match ->
68+
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
69+
{_, HostingNode} = lists:keyfind(node, 1, Opts),
70+
case rabbit_misc:rpc_call(
71+
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of
72+
{badrpc, _} = Error ->
73+
Error;
74+
{error, not_found} ->
75+
{error, rabbit_data_coercion:to_binary(ErrMsg)};
76+
ok -> ok
77+
end
78+
end
6679
end.
6780

6881
output(Output, _Opts) ->

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
5+
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
66
%%
77

88
-module(rabbit_shovel_status).
99
-behaviour(gen_server).
1010

1111
-export([start_link/0]).
1212

13-
-export([report/3, remove/1, status/0, lookup/1]).
13+
-export([report/3, remove/1, status/0, lookup/1, cluster_status/0, cluster_status_with_nodes/0]).
14+
-export([inject_node_info/2, find_matching_shovel/3]).
1415

1516
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
1617
terminate/2, code_change/3]).
@@ -34,6 +35,23 @@ remove(Name) ->
3435
status() ->
3536
gen_server:call(?SERVER, status, infinity).
3637

38+
cluster_status() ->
39+
Nodes = rabbit_nodes:all_running(),
40+
lists:usort(rabbit_misc:append_rpc_all_nodes(Nodes, ?MODULE, status, [])).
41+
42+
cluster_status_with_nodes() ->
43+
Nodes = rabbit_nodes:all_running(),
44+
lists:foldl(
45+
fun(Node, Acc) ->
46+
case rabbit_misc:rpc_call(Node, ?MODULE, status, []) of
47+
{badrpc, _} ->
48+
Acc;
49+
Xs0 when is_list(Xs0) ->
50+
Xs = inject_node_info(Node, Xs0),
51+
Acc ++ Xs
52+
end
53+
end, [], Nodes).
54+
3755
lookup(Name) ->
3856
gen_server:call(?SERVER, {lookup, Name}, infinity).
3957

@@ -83,6 +101,26 @@ terminate(_Reason, State) ->
83101
code_change(_OldVsn, State, _Extra) ->
84102
{ok, State}.
85103

104+
inject_node_info(Node, Shovels) ->
105+
lists:map(
106+
fun({Name, Type, {State, Opts}, Timestamp}) ->
107+
Opts1 = Opts ++ [{node, Node}],
108+
{Name, Type, {State, Opts1}, Timestamp}
109+
end, Shovels).
110+
111+
find_matching_shovel(VHost, Name, Shovels) ->
112+
case lists:filter(
113+
fun ({{V, S}, _Kind, _Status, _}) ->
114+
VHost =:= V andalso Name =:= S
115+
end, Shovels) of
116+
[] -> undefined;
117+
[S | _] -> S
118+
end.
119+
120+
%%
121+
%% Implementation
122+
%%
123+
86124
split_status({running, MoreInfo}) -> [{status, running} | MoreInfo];
87125
split_status({terminated, Reason}) -> [{status, terminated},
88126
{reason, Reason}];
@@ -94,4 +132,4 @@ split_name(Name) when is_atom(Name) -> [{name, Name}].
94132

95133
ensure_timer(State0) ->
96134
State1 = rabbit_misc:stop_timer(State0, #state.timer),
97-
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).
135+
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).

deps/rabbitmq_shovel_management/BUILD.bazel

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ load(
88
"assert_suites",
99
"rabbitmq_app",
1010
"rabbitmq_integration_suite",
11+
"rabbitmq_suite",
1112
)
1213

1314
APP_NAME = "rabbitmq_shovel_management"
@@ -71,6 +72,16 @@ suites = [
7172
PACKAGE,
7273
name = "http_SUITE",
7374
),
75+
rabbitmq_suite(
76+
name = "rabbit_shovel_mgmt_SUITE",
77+
runtime_deps = [
78+
"@meck//:erlang_app",
79+
],
80+
deps = [
81+
"//deps/rabbit_common:erlang_app",
82+
"//deps/rabbitmq_management_agent:erlang_app",
83+
],
84+
),
7485
]
7586

7687
assert_suites(

deps/rabbitmq_shovel_management/priv/www/js/shovel.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ dispatcher_add(function(sammy) {
8787
if (sync_delete(this, '/shovels/vhost/:vhost/:name')) {
8888
go_to('#/dynamic-shovels');
8989
} else {
90-
show_popup('warn', 'Shovel not deleted because it is not running on this node.');
90+
show_popup('warn', 'Shovel could not be deleted');
9191
return false;
9292
}
9393
});
9494
sammy.del("#/shovel-restart-link", function(){
9595
if (sync_delete(this, '/shovels/vhost/:vhost/:name/restart')) {
9696
update();
9797
} else {
98-
show_popup('warn', 'Shovel not restarted because it is not running on this node.');
98+
show_popup('warn', 'Shovel could not be restarted');
9999
return false;
100100
}
101101
});

deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt.erl

Lines changed: 53 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111

1212
-export([dispatcher/0, web_ui/0]).
1313
-export([init/2, to_json/2, resource_exists/2, content_types_provided/2,
14-
is_authorized/2, allowed_methods/2, delete_resource/2]).
14+
is_authorized/2, allowed_methods/2, delete_resource/2, get_shovel_node/4]).
1515

1616
-import(rabbit_misc, [pget/2]).
1717

1818
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1919
-include_lib("amqp_client/include/amqp_client.hrl").
20+
-include_lib("rabbit_shovel_mgmt.hrl").
2021

2122
dispatcher() -> [{"/shovels", ?MODULE, []},
2223
{"/shovels/:vhost", ?MODULE, []},
@@ -45,11 +46,10 @@ resource_exists(ReqData, Context) ->
4546
none -> true;
4647
Name ->
4748
%% Deleting or restarting a shovel
48-
case rabbit_shovel_status:lookup({VHost, Name}) of
49-
not_found ->
50-
rabbit_log:error("Shovel with the name '~s' was not found "
51-
"on the target node '~s' and / or virtual host '~s'",
52-
[Name, node(), VHost]),
49+
case get_shovel_node(VHost, Name, ReqData, Context) of
50+
undefined ->
51+
rabbit_log:error("Shovel with the name '~s' was not found on virtual host '~s'",
52+
[Name, VHost]),
5353
false;
5454
_ ->
5555
true
@@ -60,7 +60,7 @@ resource_exists(ReqData, Context) ->
6060

6161
to_json(ReqData, Context) ->
6262
rabbit_mgmt_util:reply_list(
63-
filter_vhost_req(status(ReqData, Context), ReqData), ReqData, Context).
63+
filter_vhost_req(rabbit_shovel_mgmt_util:status(ReqData, Context), ReqData), ReqData, Context).
6464

6565
is_authorized(ReqData, Context) ->
6666
rabbit_mgmt_util:is_authorized_monitor(ReqData, Context).
@@ -71,21 +71,28 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
7171
none ->
7272
false;
7373
Name ->
74-
%% We must distinguish between a delete and restart
75-
case is_restart(ReqData) of
76-
true ->
77-
case rabbit_shovel_util:restart_shovel(VHost, Name) of
78-
{error, ErrMsg} ->
79-
rabbit_log:error("Error restarting shovel: ~s", [ErrMsg]),
80-
false;
81-
ok -> true
82-
end;
83-
_ ->
84-
case rabbit_shovel_util:delete_shovel(VHost, Name, Username) of
85-
{error, ErrMsg} ->
86-
rabbit_log:error("Error deleting shovel: ~s", [ErrMsg]),
87-
false;
88-
ok -> true
74+
case get_shovel_node(VHost, Name, ReqData, Context) of
75+
undefined -> rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
76+
false;
77+
Node ->
78+
%% We must distinguish between a delete and restart
79+
case is_restart(ReqData) of
80+
true ->
81+
rabbit_log:info("Asked to restart shovel '~s' in vhost '~s' on node '~s'", [Name, VHost, Node]),
82+
case rpc:call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name], ?SHOVEL_CALLS_TIMEOUT_MS) of
83+
ok -> true;
84+
{_, Msg} -> rabbit_log:error(Msg),
85+
false
86+
end;
87+
88+
_ ->
89+
rabbit_log:info("Asked to delete shovel '~s' in vhost '~s' on node '~s'", [Name, VHost, Node]),
90+
case rpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
91+
ok -> true;
92+
{_, Msg} -> rabbit_log:error(Msg),
93+
false
94+
end
95+
8996
end
9097
end
9198
end,
@@ -107,52 +114,29 @@ filter_vhost_req(List, ReqData) ->
107114
pget(vhost, I) =:= VHost]
108115
end.
109116

110-
%% Allow users to see things in the vhosts they are authorised. But
111-
%% static shovels do not have a vhost, so only allow admins (not
112-
%% monitors) to see them.
113-
filter_vhost_user(List, _ReqData, #context{user = User = #user{tags = Tags}}) ->
114-
VHosts = rabbit_mgmt_util:list_login_vhosts_names(User, undefined),
115-
[I || I <- List, case pget(vhost, I) of
116-
undefined -> lists:member(administrator, Tags);
117-
VHost -> lists:member(VHost, VHosts)
118-
end].
119-
120-
status(ReqData, Context) ->
121-
filter_vhost_user(
122-
lists:append([status(Node) || Node <- [node() | nodes()]]),
123-
ReqData, Context).
124-
125-
status(Node) ->
126-
case rpc:call(Node, rabbit_shovel_status, status, [], infinity) of
127-
{badrpc, {'EXIT', _}} ->
128-
[];
129-
Status ->
130-
[format(Node, I) || I <- Status]
117+
get_shovel_node(VHost, Name, ReqData, Context) ->
118+
Shovels = rabbit_shovel_mgmt_util:status(ReqData, Context),
119+
Match = find_matching_shovel(VHost, Name, Shovels),
120+
case Match of
121+
undefined -> undefined;
122+
Match ->
123+
{_, Node} = lists:keyfind(node, 1, Match),
124+
Node
131125
end.
132126

133-
format(Node, {Name, Type, Info, TS}) ->
134-
[{node, Node}, {timestamp, format_ts(TS)}] ++
135-
format_name(Type, Name) ++
136-
format_info(Info).
137-
138-
format_name(static, Name) -> [{name, Name},
139-
{type, static}];
140-
format_name(dynamic, {VHost, Name}) -> [{name, Name},
141-
{vhost, VHost},
142-
{type, dynamic}].
143-
144-
format_info(starting) ->
145-
[{state, starting}];
146-
147-
format_info({running, Props}) ->
148-
[{state, running}] ++ Props;
149-
150-
format_info({terminated, Reason}) ->
151-
[{state, terminated},
152-
{reason, print("~p", [Reason])}].
153-
154-
format_ts({{Y, M, D}, {H, Min, S}}) ->
155-
print("~w-~2.2.0w-~2.2.0w ~w:~2.2.0w:~2.2.0w", [Y, M, D, H, Min, S]).
156-
157-
print(Fmt, Val) ->
158-
list_to_binary(io_lib:format(Fmt, Val)).
127+
%% This is similar to rabbit_shovel_status:find_matching_shovel/3
128+
%% but operates on a different input (a proplist of Shovel attributes)
129+
-spec find_matching_shovel(VHost :: vhost:name(),
130+
Name :: binary(),
131+
Shovels :: list(list(tuple()))) -> 'undefined' | list(tuple()).
132+
find_matching_shovel(VHost, Name, Shovels) ->
133+
ShovelPred = fun (Attributes) ->
134+
lists:member({name, Name}, Attributes) andalso
135+
lists:member({vhost, VHost}, Attributes)
136+
end,
137+
case lists:search(ShovelPred, Shovels) of
138+
{value, Shovel} ->
139+
Shovel;
140+
_ ->
141+
undefined
142+
end.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-define(SHOVEL_CALLS_TIMEOUT_MS, 25000).

0 commit comments

Comments
 (0)