Skip to content

Commit 3716131

Browse files
author
Daniil Fedotov
committed
Use wrapper supervisor one level above vhost supervisors
Wrapper supervisor makes it possible to make vhosts restartable exactly N times without interfering with each other. Because vhost should call recovery every time it's restarted, and recovery includes dynamically adding message stores, it's impossible to restart it using one_for_all. So vhost supervisor will just fail if it's child fails and vhost supervisor wrapper will restart it with recovery.
1 parent dd2a79d commit 3716131

File tree

4 files changed

+154
-58
lines changed

4 files changed

+154
-58
lines changed

src/rabbit_vhost.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ add(VHostPath, ActingUser) ->
106106
{<<"amq.rabbitmq.trace">>, topic, true}]],
107107
ok
108108
end),
109+
ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
109110
rabbit_event:notify(vhost_created, info(VHostPath)
110111
++ [{user_who_performed_action, ActingUser}]),
111112
R.

src/rabbit_vhost_sup_sup.erl

Lines changed: 84 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@
2323
-export([init/1]).
2424

2525
-export([start_link/0, start/0]).
26-
-export([vhost_sup/1, vhost_sup/2]).
27-
-export([start_vhost/1, stop_and_delete_vhost/1, delete_on_all_nodes/1]).
26+
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
27+
-export([delete_on_all_nodes/1]).
28+
-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
29+
30+
%% Internal
31+
-export([stop_and_delete_vhost/1]).
32+
33+
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
2834

2935
start() ->
3036
rabbit_sup:start_supervisor_child(?MODULE).
@@ -33,39 +39,67 @@ start_link() ->
3339
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
3440

3541
init([]) ->
36-
ets:new(?MODULE, [named_table, public]),
37-
{ok, {{simple_one_for_one, 1, 5},
38-
[{rabbit_vhost, {rabbit_vhost_sup_sup, start_vhost, []},
39-
transient, infinity, supervisor,
40-
[rabbit_vhost_sup_sup, rabbit_vhost_sup]}]}}.
42+
ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
43+
{ok, {{simple_one_for_one, 0, 5},
44+
[{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
45+
permanent, infinity, supervisor,
46+
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
47+
48+
start_on_all_nodes(VHost) ->
49+
[ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
50+
ok.
4151

42-
start_vhost(VHost) ->
43-
case rabbit_vhost_sup:start_link(VHost) of
44-
{ok, Pid} ->
45-
ok = save_vhost_pid(VHost, Pid),
46-
ok = rabbit_vhost:recover(VHost),
52+
delete_on_all_nodes(VHost) ->
53+
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
54+
ok.
55+
56+
start_vhost(VHost, Node) when Node == node(self()) ->
57+
start_vhost(VHost);
58+
start_vhost(VHost, Node) ->
59+
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
60+
{ok, Pid} when is_pid(Pid) ->
4761
{ok, Pid};
48-
Other ->
49-
Other
62+
{badrpc, RpcErr} ->
63+
{error, RpcErr}
5064
end.
5165

52-
stop_and_delete_vhost(VHost) ->
53-
case vhost_pid(VHost) of
54-
no_pid -> ok;
55-
Pid when is_pid(Pid) ->
56-
rabbit_log:info("Stopping vhost supervisor ~p for vhost ~p~n",
57-
[Pid, VHost]),
58-
case supervisor2:terminate_child(?MODULE, Pid) of
59-
ok ->
60-
ok = rabbit_vhost:delete_storage(VHost);
61-
Other ->
62-
Other
66+
start_vhost(VHost) ->
67+
case rabbit_vhost:exists(VHost) of
68+
false -> {error, {no_such_vhost, VHost}};
69+
true ->
70+
case vhost_sup_pid(VHost) of
71+
no_pid ->
72+
case supervisor2:start_child(?MODULE, [VHost]) of
73+
{ok, _} -> ok;
74+
{error, {already_started, _}} -> ok;
75+
Error -> throw(Error)
76+
end,
77+
{ok, _} = vhost_sup_pid(VHost);
78+
{ok, Pid} when is_pid(Pid) ->
79+
{ok, Pid}
6380
end
6481
end.
6582

66-
delete_on_all_nodes(VHost) ->
67-
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
68-
ok.
83+
stop_and_delete_vhost(VHost) ->
84+
case get_vhost_sup(VHost) of
85+
not_found -> ok;
86+
#vhost_sup{wrapper_pid = WrapperPid,
87+
vhost_sup_pid = VHostSupPid} = VHostSup ->
88+
case is_process_alive(WrapperPid) of
89+
false -> ok;
90+
true ->
91+
rabbit_log:info("Stopping vhost supervisor ~p"
92+
" for vhost ~p~n",
93+
[VHostSupPid, VHost]),
94+
case supervisor2:terminate_child(?MODULE, WrapperPid) of
95+
ok ->
96+
ets:delete_object(?MODULE, VHostSup),
97+
ok = rabbit_vhost:delete_storage(VHost);
98+
Other ->
99+
Other
100+
end
101+
end
102+
end.
69103

70104
%% We take an optimistic approach whan stopping a remote VHost supervisor.
71105
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -81,6 +115,7 @@ stop_and_delete_vhost(VHost, Node) ->
81115
{error, RpcErr}
82116
end.
83117

118+
-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
84119
vhost_sup(VHost, Local) when Local == node(self()) ->
85120
vhost_sup(VHost);
86121
vhost_sup(VHost, Node) ->
@@ -93,34 +128,33 @@ vhost_sup(VHost, Node) ->
93128

94129
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
95130
vhost_sup(VHost) ->
96-
case rabbit_vhost:exists(VHost) of
97-
false -> {error, {no_such_vhost, VHost}};
98-
true ->
99-
case vhost_pid(VHost) of
100-
no_pid ->
101-
case supervisor2:start_child(?MODULE, [VHost]) of
102-
{ok, Pid} -> {ok, Pid};
103-
{error, {already_started, Pid}} -> {ok, Pid};
104-
Error -> throw(Error)
105-
end;
106-
Pid when is_pid(Pid) ->
107-
{ok, Pid}
108-
end
109-
end.
131+
start_vhost(VHost).
110132

111-
save_vhost_pid(VHost, Pid) ->
112-
true = ets:insert(?MODULE, {VHost, Pid}),
133+
-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
134+
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
135+
true = ets:insert(?MODULE, #vhost_sup{vhost = VHost,
136+
vhost_sup_pid = VHostPid,
137+
wrapper_pid = WrapperPid}),
113138
ok.
114139

115-
-spec vhost_pid(rabbit_types:vhost()) -> no_pid | pid().
116-
vhost_pid(VHost) ->
140+
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
141+
get_vhost_sup(VHost) ->
117142
case ets:lookup(?MODULE, VHost) of
118-
[] -> no_pid;
119-
[{VHost, Pid}] ->
143+
[] -> not_found;
144+
[#vhost_sup{} = VHostSup] -> VHostSup
145+
end.
146+
147+
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
148+
vhost_sup_pid(VHost) ->
149+
case get_vhost_sup(VHost) of
150+
not_found ->
151+
no_pid;
152+
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
120153
case erlang:is_process_alive(Pid) of
121-
true -> Pid;
154+
true -> {ok, Pid};
122155
false ->
123-
ets:delete_object(?MODULE, {VHost, Pid}),
156+
ets:delete_object(?MODULE, VHostSup),
124157
no_pid
125158
end
126159
end.
160+

src/rabbit_vhost_sup_wrapper.erl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
17+
-module(rabbit_vhost_sup_wrapper).
18+
19+
-include("rabbit.hrl").
20+
21+
-behaviour(supervisor2).
22+
-export([init/1]).
23+
-export([start_link/1]).
24+
-export([start_vhost_sup/1]).
25+
26+
start_link(VHost) ->
27+
supervisor2:start_link(?MODULE, [VHost]).
28+
29+
%% This module is a wrapper around vhost supervisor to
30+
%% provide exactly once restart.
31+
32+
%% rabbit_vhost_sup supervisor children are added dynamically,
33+
%% so one_for_all strategy cannot be used.
34+
35+
init([VHost]) ->
36+
{ok, {{one_for_all, 1, 10000000},
37+
[{rabbit_vhost_sup,
38+
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
39+
intrinsic, infinity, supervisor,
40+
[rabbit_vhost_sup]}]}}.
41+
42+
start_vhost_sup(VHost) ->
43+
case rabbit_vhost_sup:start_link(VHost) of
44+
{ok, Pid} ->
45+
%% Save vhost sup record with wrapper pid and vhost sup pid.
46+
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
47+
%% We can start recover as soon as we have vhost_sup record saved
48+
ok = rabbit_vhost:recover(VHost),
49+
{ok, Pid};
50+
Other ->
51+
Other
52+
end.

src/rabbit_vm.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,23 @@ msg_stores() ->
161161
all_vhosts_children(msg_store_persistent).
162162

163163
all_vhosts_children(Name) ->
164-
lists:filter_map(
165-
fun({_, VHostSup, _, _}) ->
166-
case supervisor2:find_child(VHostSup, Name) of
167-
[QSup] -> {true, QSup};
168-
[] -> false
169-
end
170-
end,
171-
supervisor:which_children(rabbit_vhost_sup_sup)).
164+
case whereis(rabbit_vhost_sup_sup) of
165+
undefined -> [];
166+
Pid when is_pid(Pid) ->
167+
lists:filtermap(
168+
fun({_, VHostSupWrapper, _, _}) ->
169+
case supervisor2:find_child(VHostSupWrapper,
170+
rabbit_vhost_sup) of
171+
[] -> false;
172+
[VHostSup] ->
173+
case supervisor2:find_child(VHostSup, Name) of
174+
[QSup] -> {true, QSup};
175+
[] -> false
176+
end
177+
end
178+
end,
179+
supervisor:which_children(rabbit_vhost_sup_sup))
180+
end.
172181

173182
interesting_sups0() ->
174183
MsgIndexProcs = msg_stores(),

0 commit comments

Comments
 (0)