Skip to content

Commit d1eecf2

Browse files
Merge pull request #1158 from rabbitmq/rabbitmq-server-1146-full
Per-vhost supervisors.
2 parents 2ee4ef2 + 4cdad4b commit d1eecf2

24 files changed

+897
-444
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ define PROJECT_ENV
117117
%% rabbitmq-server-589
118118
{proxy_protocol, false},
119119
{disk_monitor_failure_retries, 10},
120-
{disk_monitor_failure_retry_interval, 120000}
120+
{disk_monitor_failure_retry_interval, 120000},
121+
%% either "stop_node" or "ignore"
122+
{vhost_restart_strategy, stop_node}
121123
]
122124
endef
123125

priv/schema/rabbitmq.schema

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,12 @@ end}.
949949
{mapping, "proxy_protocol", "rabbit.proxy_protocol",
950950
[{datatype, {enum, [true, false]}}]}.
951951

952+
%% Whether to stop the rabbit application if VHost data
953+
%% cannot be recovered.
954+
955+
{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy",
956+
[{datatype, {enum, [stop_node, ignore]}}]}.
957+
952958
% ==========================
953959
% Lager section
954960
% ==========================

src/rabbit.erl

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,6 @@
147147
[{description, "core initialized"},
148148
{requires, kernel_ready}]}).
149149

150-
-rabbit_boot_step({empty_db_check,
151-
[{description, "empty DB check"},
152-
{mfa, {?MODULE, maybe_insert_default_data, []}},
153-
{requires, core_initialized},
154-
{enables, routing_ready}]}).
155-
156150
-rabbit_boot_step({upgrade_queues,
157151
[{description, "per-vhost message store migration"},
158152
{mfa, {rabbit_upgrade,
@@ -164,7 +158,13 @@
164158
-rabbit_boot_step({recovery,
165159
[{description, "exchange, queue and binding recovery"},
166160
{mfa, {rabbit, recover, []}},
167-
{requires, core_initialized},
161+
{requires, [core_initialized]},
162+
{enables, routing_ready}]}).
163+
164+
-rabbit_boot_step({empty_db_check,
165+
[{description, "empty DB check"},
166+
{mfa, {?MODULE, maybe_insert_default_data, []}},
167+
{requires, recovery},
168168
{enables, routing_ready}]}).
169169

170170
-rabbit_boot_step({mirrored_queues,
@@ -829,10 +829,7 @@ boot_delegate() ->
829829

830830
recover() ->
831831
rabbit_policy:recover(),
832-
Qs = rabbit_amqqueue:recover(),
833-
ok = rabbit_binding:recover(rabbit_exchange:recover(),
834-
[QName || #amqqueue{name = QName} <- Qs]),
835-
rabbit_amqqueue:start(Qs).
832+
rabbit_vhost:recover().
836833

837834
maybe_insert_default_data() ->
838835
case rabbit_table:needs_default_data() of

src/rabbit_amqqueue_sup_sup.erl

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
-behaviour(supervisor2).
2020

2121
-export([start_link/0, start_queue_process/3]).
22+
-export([start_for_vhost/1, stop_for_vhost/1,
23+
find_for_vhost/2, find_for_vhost/1]).
2224

2325
-export([init/1]).
2426

@@ -36,14 +38,42 @@
3638
%%----------------------------------------------------------------------------
3739

3840
start_link() ->
39-
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
41+
supervisor2:start_link(?MODULE, []).
4042

4143
start_queue_process(Node, Q, StartMode) ->
42-
{ok, _SupPid, QPid} = supervisor2:start_child(
43-
{?SERVER, Node}, [Q, StartMode]),
44+
#amqqueue{name = #resource{virtual_host = VHost}} = Q,
45+
{ok, Sup} = find_for_vhost(VHost, Node),
46+
{ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
4447
QPid.
4548

4649
init([]) ->
4750
{ok, {{simple_one_for_one, 10, 10},
4851
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
4952
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
53+
54+
-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
55+
find_for_vhost(VHost) ->
56+
find_for_vhost(VHost, node()).
57+
58+
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
59+
find_for_vhost(VHost, Node) ->
60+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
61+
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
62+
[QSup] -> {ok, QSup};
63+
Result -> {error, {queue_supervisor_not_found, Result}}
64+
end.
65+
66+
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
67+
start_for_vhost(VHost) ->
68+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
69+
supervisor2:start_child(
70+
VHostSup,
71+
{rabbit_amqqueue_sup_sup,
72+
{rabbit_amqqueue_sup_sup, start_link, []},
73+
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
74+
75+
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
76+
stop_for_vhost(VHost) ->
77+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
78+
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
79+
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).

src/rabbit_exchange.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
-include("rabbit.hrl").
1919
-include("rabbit_framing.hrl").
2020

21-
-export([recover/0, policy_changed/2, callback/4, declare/7,
21+
-export([recover/1, policy_changed/2, callback/4, declare/7,
2222
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
2323
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
2424
update_scratch/3, update_decorators/1, immutable/1,
@@ -36,7 +36,7 @@
3636
-type type() :: atom().
3737
-type fun_name() :: atom().
3838

39-
-spec recover() -> [name()].
39+
-spec recover(rabbit_types:vhost()) -> [name()].
4040
-spec callback
4141
(rabbit_types:exchange(), fun_name(),
4242
fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
@@ -107,10 +107,11 @@
107107
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
108108
policy, user_who_performed_action]).
109109

110-
recover() ->
110+
recover(VHost) ->
111111
Xs = rabbit_misc:table_filter(
112112
fun (#exchange{name = XName}) ->
113-
mnesia:read({rabbit_exchange, XName}) =:= []
113+
XName#resource.virtual_host =:= VHost andalso
114+
mnesia:read({rabbit_exchange, XName}) =:= []
114115
end,
115116
fun (X, Tx) ->
116117
X1 = case Tx of

src/rabbit_mirror_queue_master.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
2727
zip_msgs_and_acks/4]).
2828

29-
-export([start/1, stop/0, delete_crashed/1]).
29+
-export([start/2, stop/1, delete_crashed/1]).
3030

3131
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
3232

@@ -81,12 +81,12 @@
8181
%% Backing queue
8282
%% ---------------------------------------------------------------------------
8383

84-
start(_DurableQueues) ->
84+
start(_Vhost, _DurableQueues) ->
8585
%% This will never get called as this module will never be
8686
%% installed as the default BQ implementation.
8787
exit({not_valid_for_generic_backing_queue, ?MODULE}).
8888

89-
stop() ->
89+
stop(_Vhost) ->
9090
%% Same as start/1.
9191
exit({not_valid_for_generic_backing_queue, ?MODULE}).
9292

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ stop_pending_slaves(QName, Pids) ->
197197
case erlang:process_info(Pid, dictionary) of
198198
undefined -> ok;
199199
{dictionary, Dict} ->
200+
Vhost = QName#resource.virtual_host,
201+
{ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
200202
case proplists:get_value('$ancestors', Dict) of
201-
[Sup, rabbit_amqqueue_sup_sup | _] ->
203+
[Sup, AmqQSup | _] ->
202204
exit(Sup, kill),
203205
exit(Pid, kill);
204206
_ ->

src/rabbit_msg_store_vhost_sup.erl

Lines changed: 0 additions & 103 deletions
This file was deleted.

src/rabbit_priority_queue.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
-export([enable/0]).
3232

33-
-export([start/1, stop/0]).
33+
-export([start/2, stop/1]).
3434

3535
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
3636
purge/1, purge_acks/1,
@@ -83,22 +83,22 @@ enable() ->
8383

8484
%%----------------------------------------------------------------------------
8585

86-
start(QNames) ->
86+
start(VHost, QNames) ->
8787
BQ = bq(),
8888
%% TODO this expand-collapse dance is a bit ridiculous but it's what
8989
%% rabbit_amqqueue:recover/0 expects. We could probably simplify
9090
%% this if we rejigged recovery a bit.
9191
{DupNames, ExpNames} = expand_queues(QNames),
92-
case BQ:start(ExpNames) of
92+
case BQ:start(VHost, ExpNames) of
9393
{ok, ExpRecovery} ->
9494
{ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
9595
Else ->
9696
Else
9797
end.
9898

99-
stop() ->
99+
stop(VHost) ->
100100
BQ = bq(),
101-
BQ:stop().
101+
BQ:stop(VHost).
102102

103103
%%----------------------------------------------------------------------------
104104

0 commit comments

Comments
 (0)