Skip to content

Commit 1282c00

Browse files
author
Daniil Fedotov
committed
Change vhost supervision recovery process.
In order to be aware if vhost is alive or not, introduce a rabbit_vhost_process gen_server, which manages recovery and teardown of a vhost. Also aliveness of the process can be used to determine a vhost state. Vhost process termination emits an event to close all the vhost connections. Addresses [#145106713]
1 parent 272a5c9 commit 1282c00

8 files changed

+135
-61
lines changed

src/rabbit_connection_tracking.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
3535
clear_tracked_connection_tables_for_this_node/0,
3636
register_connection/1, unregister_connection/1,
37-
list/0, list/1, list_on_node/1, list_of_user/1,
37+
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
3838
tracked_connection_from_connection_created/1,
3939
tracked_connection_from_connection_state/1,
4040
count_connections_in/1]).
@@ -217,6 +217,16 @@ list_on_node(Node) ->
217217
catch exit:{aborted, {no_exists, _}} -> []
218218
end.
219219

220+
-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].
221+
222+
list_on_node(Node, VHost) ->
223+
try mnesia:dirty_match_object(
224+
tracked_connection_table_name_for(Node),
225+
#tracked_connection{vhost = VHost, _ = '_'})
226+
catch exit:{aborted, {no_exists, _}} -> []
227+
end.
228+
229+
220230
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
221231

222232
list_of_user(Username) ->

src/rabbit_connection_tracking_handler.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
8282
close_connections(rabbit_connection_tracking:list(VHost),
8383
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
8484
{ok, State};
85+
handle_event(#event{type = vhost_down, props = Details}, State) ->
86+
VHost = pget(name, Details),
87+
Node = pget(node, Details),
88+
rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
89+
" because the vhost database has stopped working",
90+
[VHost, Node]),
91+
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
92+
rabbit_misc:format("vhost '~s' is down", [VHost])),
93+
{ok, State};
8594
handle_event(#event{type = user_deleted, props = Details}, State) ->
8695
Username = pget(name, Details),
8796
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),

src/rabbit_msg_store.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
10101010
ok;
10111011
{error, RTErr} ->
10121012
rabbit_log:error("Unable to save message store recovery terms"
1013-
"for directory ~p~nError: ~p~n",
1013+
" for directory ~p~nError: ~p~n",
10141014
[Dir, RTErr])
10151015
end,
10161016
State3 #msstate { index_state = undefined,

src/rabbit_vhost.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
2727
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
2828
-export([delete_storage/1]).
29+
-export([vhost_down/1]).
2930

3031
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
3132
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -144,6 +145,12 @@ delete(VHostPath, ActingUser) ->
144145
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
145146
ok.
146147

148+
vhost_down(VHostPath) ->
149+
ok = rabbit_event:notify(vhost_down,
150+
[{name, VHostPath},
151+
{node, node()},
152+
{user_who_performed_action, ?INTERNAL_USER}]).
153+
147154
delete_storage(VHost) ->
148155
VhostDir = msg_store_dir_path(VHost),
149156
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),

src/rabbit_vhost_sup_watcher.erl renamed to src/rabbit_vhost_process.erl

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,21 @@
1414
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
1515
%%
1616

17-
%% This module implements a watcher process which should stop
18-
%% the parent supervisor if its vhost is missing from the mnesia DB
17+
%% This module implements a vhost identity process.
1918

20-
-module(rabbit_vhost_sup_watcher).
19+
%% On start this process will try to recover the vhost data and
20+
%% processes structure (queues and message stores).
21+
%% If recovered successfully, the process will save it's PID
22+
%% to vhost process registry. If vhost process PID is in the registry and the
23+
%% process is alive - the vhost is considered running.
24+
25+
%% On termination, the ptocess will notify of vhost going down.
26+
27+
%% The process will also check periodically if the vhost still
28+
%% present in mnesia DB and stop the vhost supervision tree when it
29+
%% desapears.
30+
31+
-module(rabbit_vhost_process).
2132

2233
-include("rabbit.hrl").
2334

@@ -29,15 +40,26 @@
2940
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
3041
code_change/3]).
3142

32-
3343
start_link(VHost) ->
3444
gen_server2:start_link(?MODULE, [VHost], []).
3545

3646

3747
init([VHost]) ->
38-
Interval = interval(),
39-
timer:send_interval(Interval, check_vhost),
40-
{ok, VHost}.
48+
process_flag(trap_exit, true),
49+
rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
50+
try
51+
%% Recover the vhost data and save it to vhost registry.
52+
ok = rabbit_vhost:recover(VHost),
53+
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
54+
Interval = interval(),
55+
timer:send_interval(Interval, check_vhost),
56+
{ok, VHost}
57+
catch _:Reason ->
58+
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
59+
" Stacktrace ~p",
60+
[VHost, Reason, erlang:get_stacktrace()]),
61+
{stop, Reason}
62+
end.
4163

4264
handle_call(_,_,VHost) ->
4365
{reply, ok, VHost}.
@@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
6486
handle_info(_, VHost) ->
6587
{noreply, VHost}.
6688

67-
terminate(_, _) -> ok.
89+
terminate(shutdown, VHost) ->
90+
%% Notify that vhost is stopped.
91+
rabbit_vhost:vhost_down(VHost);
92+
terminate(_, _VHost) ->
93+
ok.
6894

6995
code_change(_OldVsn, VHost, _Extra) ->
7096
{ok, VHost}.

src/rabbit_vhost_sup.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,5 @@ start_link(VHost) ->
2828
supervisor2:start_link(?MODULE, [VHost]).
2929

3030
init([VHost]) ->
31-
{ok, {{one_for_all, 0, 1},
32-
[{rabbit_vhost_sup_watcher,
33-
{rabbit_vhost_sup_watcher, start_link, [VHost]},
34-
intrinsic, ?WORKER_WAIT, worker,
35-
[rabbit_vhost_sup]}]}}.
31+
rabbit_log:error("Starting VHost sup ~p~n", [VHost]),
32+
{ok, {{one_for_all, 0, 1}, []}}.

src/rabbit_vhost_sup_sup.erl

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
-export([start_link/0, start/0]).
2626
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
2727
-export([delete_on_all_nodes/1]).
28-
-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
28+
-export([start_on_all_nodes/1]).
29+
30+
-export([save_vhost_process/2]).
31+
-export([is_vhost_alive/1]).
2932

3033
%% Internal
3134
-export([stop_and_delete_vhost/1]).
3235

33-
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
36+
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
3437

3538
start() ->
3639
case supervisor:start_child(rabbit_sup, {?MODULE,
@@ -56,48 +59,18 @@ init([]) ->
5659
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
5760

5861
start_on_all_nodes(VHost) ->
59-
[ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
62+
[ {ok, _} = vhost_sup(VHost, Node) || Node <- rabbit_nodes:all_running() ],
6063
ok.
6164

6265
delete_on_all_nodes(VHost) ->
6366
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
6467
ok.
6568

66-
start_vhost(VHost, Node) when Node == node(self()) ->
67-
start_vhost(VHost);
68-
start_vhost(VHost, Node) ->
69-
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
70-
{ok, Pid} when is_pid(Pid) ->
71-
{ok, Pid};
72-
{badrpc, RpcErr} ->
73-
{error, RpcErr}
74-
end.
75-
76-
start_vhost(VHost) ->
77-
case rabbit_vhost:exists(VHost) of
78-
false -> {error, {no_such_vhost, VHost}};
79-
true ->
80-
case vhost_sup_pid(VHost) of
81-
no_pid ->
82-
case supervisor2:start_child(?MODULE, [VHost]) of
83-
{ok, _} -> ok;
84-
{error, {already_started, _}} -> ok;
85-
Error ->
86-
rabbit_log:error("Could not start process tree "
87-
"for vhost '~s': ~p", [VHost, Error]),
88-
throw(Error)
89-
end,
90-
{ok, _} = vhost_sup_pid(VHost);
91-
{ok, Pid} when is_pid(Pid) ->
92-
{ok, Pid}
93-
end
94-
end.
95-
9669
stop_and_delete_vhost(VHost) ->
9770
case get_vhost_sup(VHost) of
9871
not_found -> ok;
9972
#vhost_sup{wrapper_pid = WrapperPid,
100-
vhost_sup_pid = VHostSupPid} = VHostSup ->
73+
vhost_sup_pid = VHostSupPid} ->
10174
case is_process_alive(WrapperPid) of
10275
false -> ok;
10376
true ->
@@ -106,7 +79,7 @@ stop_and_delete_vhost(VHost) ->
10679
[VHostSupPid, VHost]),
10780
case supervisor2:terminate_child(?MODULE, WrapperPid) of
10881
ok ->
109-
ets:delete_object(?MODULE, VHostSup),
82+
ets:delete(?MODULE, VHost),
11083
ok = rabbit_vhost:delete_storage(VHost);
11184
Other ->
11285
Other
@@ -128,7 +101,7 @@ stop_and_delete_vhost(VHost, Node) ->
128101
{error, RpcErr}
129102
end.
130103

131-
-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
104+
-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
132105
vhost_sup(VHost, Local) when Local == node(self()) ->
133106
vhost_sup(VHost);
134107
vhost_sup(VHost, Node) ->
@@ -139,9 +112,44 @@ vhost_sup(VHost, Node) ->
139112
{error, RpcErr}
140113
end.
141114

142-
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
115+
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
143116
vhost_sup(VHost) ->
144-
start_vhost(VHost).
117+
case rabbit_vhost:exists(VHost) of
118+
false -> {error, {no_such_vhost, VHost}};
119+
true ->
120+
case vhost_sup_pid(VHost) of
121+
no_pid ->
122+
case supervisor2:start_child(?MODULE, [VHost]) of
123+
{ok, _} -> ok;
124+
{error, {already_started, _}} -> ok;
125+
Error -> throw(Error)
126+
end,
127+
{ok, _} = vhost_sup_pid(VHost);
128+
{ok, Pid} when is_pid(Pid) ->
129+
{ok, Pid}
130+
end
131+
end.
132+
133+
134+
-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
135+
is_vhost_alive(VHost) ->
136+
%% A vhost is considered alive if it's supervision tree is alive and
137+
%% saved in the ETS table
138+
case get_vhost_sup(VHost) of
139+
#vhost_sup{wrapper_pid = WrapperPid,
140+
vhost_sup_pid = VHostSupPid,
141+
vhost_process_pid = VHostProcessPid}
142+
when is_pid(WrapperPid),
143+
is_pid(VHostSupPid),
144+
is_pid(VHostProcessPid) ->
145+
is_process_alive(WrapperPid)
146+
andalso
147+
is_process_alive(VHostSupPid)
148+
andalso
149+
is_process_alive(VHostProcessPid);
150+
_ -> false
151+
end.
152+
145153

146154
-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
147155
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
@@ -150,6 +158,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) ->
150158
wrapper_pid = WrapperPid}),
151159
ok.
152160

161+
-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
162+
save_vhost_process(VHost, VHostProcessPid) ->
163+
true = ets:update_element(?MODULE, VHost,
164+
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
165+
ok.
166+
153167
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
154168
get_vhost_sup(VHost) ->
155169
case ets:lookup(?MODULE, VHost) of

src/rabbit_vhost_sup_wrapper.erl

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,35 @@
2727
-export([start_vhost_sup/1]).
2828

2929
start_link(VHost) ->
30-
supervisor2:start_link(?MODULE, [VHost]).
30+
%% Using supervisor, because supervisor2 does not stop a started child when
31+
%% another one fails to start. Bug?
32+
supervisor:start_link(?MODULE, [VHost]).
3133

3234
init([VHost]) ->
3335
%% 2 restarts in 5 minutes. One per message store.
3436
{ok, {{one_for_all, 2, 300},
35-
[{rabbit_vhost_sup,
36-
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
37-
permanent, infinity, supervisor,
38-
[rabbit_vhost_sup]}]}}.
37+
[
38+
%% rabbit_vhost_sup is an empty supervisor container for
39+
%% all data processes.
40+
{rabbit_vhost_sup,
41+
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
42+
permanent, infinity, supervisor,
43+
[rabbit_vhost_sup]},
44+
%% rabbit_vhost_process is a vhost identity process, which
45+
%% is responsible for data recovery and vhost aliveness status.
46+
%% See the module comments for more info.
47+
{rabbit_vhost_process,
48+
{rabbit_vhost_process, start_link, [VHost]},
49+
permanent, ?WORKER_WAIT, worker,
50+
[rabbit_vhost_process]}]}}.
51+
3952

4053
start_vhost_sup(VHost) ->
4154
case rabbit_vhost_sup:start_link(VHost) of
4255
{ok, Pid} ->
4356
%% Save vhost sup record with wrapper pid and vhost sup pid.
4457
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
45-
%% We can start recover as soon as we have vhost_sup record saved
46-
ok = rabbit_vhost:recover(VHost),
4758
{ok, Pid};
4859
Other ->
4960
Other
50-
end.
61+
end.

0 commit comments

Comments
 (0)