Skip to content

Per-vhost supervisors. #1158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ define PROJECT_ENV
%% rabbitmq-server-589
{proxy_protocol, false},
{disk_monitor_failure_retries, 10},
{disk_monitor_failure_retry_interval, 120000}
{disk_monitor_failure_retry_interval, 120000},
%% either "stop_node" or "ignore"
{vhost_restart_strategy, stop_node}
]
endef

Expand Down
6 changes: 6 additions & 0 deletions priv/schema/rabbitmq.schema
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ end}.
{mapping, "proxy_protocol", "rabbit.proxy_protocol",
[{datatype, {enum, [true, false]}}]}.

%% Whether to stop the rabbit application if VHost data
%% cannot be recovered.

{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy",
[{datatype, {enum, [stop_node, ignore]}}]}.

% ==========================
% Lager section
% ==========================
Expand Down
19 changes: 8 additions & 11 deletions src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,6 @@
[{description, "core initialized"},
{requires, kernel_ready}]}).

-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
{requires, core_initialized},
{enables, routing_ready}]}).

-rabbit_boot_step({upgrade_queues,
[{description, "per-vhost message store migration"},
{mfa, {rabbit_upgrade,
Expand All @@ -164,7 +158,13 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
{requires, core_initialized},
{requires, [core_initialized]},
{enables, routing_ready}]}).

-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
{requires, recovery},
{enables, routing_ready}]}).

-rabbit_boot_step({mirrored_queues,
Expand Down Expand Up @@ -829,10 +829,7 @@ boot_delegate() ->

recover() ->
rabbit_policy:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
rabbit_amqqueue:start(Qs).
rabbit_vhost:recover().

maybe_insert_default_data() ->
case rabbit_table:needs_default_data() of
Expand Down
36 changes: 33 additions & 3 deletions src/rabbit_amqqueue_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
-behaviour(supervisor2).

-export([start_link/0, start_queue_process/3]).
-export([start_for_vhost/1, stop_for_vhost/1,
find_for_vhost/2, find_for_vhost/1]).

-export([init/1]).

Expand All @@ -36,14 +38,42 @@
%%----------------------------------------------------------------------------

start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
supervisor2:start_link(?MODULE, []).

start_queue_process(Node, Q, StartMode) ->
{ok, _SupPid, QPid} = supervisor2:start_child(
{?SERVER, Node}, [Q, StartMode]),
#amqqueue{name = #resource{virtual_host = VHost}} = Q,
{ok, Sup} = find_for_vhost(VHost, Node),
{ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
QPid.

init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.

-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost) ->
find_for_vhost(VHost, node()).

-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
end.

-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
supervisor2:start_child(
VHostSup,
{rabbit_amqqueue_sup_sup,
{rabbit_amqqueue_sup_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).

-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
9 changes: 5 additions & 4 deletions src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").

-export([recover/0, policy_changed/2, callback/4, declare/7,
-export([recover/1, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/1, immutable/1,
Expand All @@ -36,7 +36,7 @@
-type type() :: atom().
-type fun_name() :: atom().

-spec recover() -> [name()].
-spec recover(rabbit_types:vhost()) -> [name()].
-spec callback
(rabbit_types:exchange(), fun_name(),
fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
Expand Down Expand Up @@ -107,10 +107,11 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
policy, user_who_performed_action]).

recover() ->
recover(VHost) ->
Xs = rabbit_misc:table_filter(
fun (#exchange{name = XName}) ->
mnesia:read({rabbit_exchange, XName}) =:= []
XName#resource.virtual_host =:= VHost andalso
mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
X1 = case Tx of
Expand Down
6 changes: 3 additions & 3 deletions src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).

-export([start/1, stop/0, delete_crashed/1]).
-export([start/2, stop/1, delete_crashed/1]).

-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).

Expand Down Expand Up @@ -81,12 +81,12 @@
%% Backing queue
%% ---------------------------------------------------------------------------

start(_DurableQueues) ->
start(_Vhost, _DurableQueues) ->
%% This will never get called as this module will never be
%% installed as the default BQ implementation.
exit({not_valid_for_generic_backing_queue, ?MODULE}).

stop() ->
stop(_Vhost) ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).

Expand Down
4 changes: 3 additions & 1 deletion src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,10 @@ stop_pending_slaves(QName, Pids) ->
case erlang:process_info(Pid, dictionary) of
undefined -> ok;
{dictionary, Dict} ->
Vhost = QName#resource.virtual_host,
{ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
case proplists:get_value('$ancestors', Dict) of
[Sup, rabbit_amqqueue_sup_sup | _] ->
[Sup, AmqQSup | _] ->
exit(Sup, kill),
exit(Pid, kill);
_ ->
Expand Down
103 changes: 0 additions & 103 deletions src/rabbit_msg_store_vhost_sup.erl

This file was deleted.

10 changes: 5 additions & 5 deletions src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

-export([enable/0]).

-export([start/1, stop/0]).
-export([start/2, stop/1]).

-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
Expand Down Expand Up @@ -83,22 +83,22 @@ enable() ->

%%----------------------------------------------------------------------------

start(QNames) ->
start(VHost, QNames) ->
BQ = bq(),
%% TODO this expand-collapse dance is a bit ridiculous but it's what
%% rabbit_amqqueue:recover/0 expects. We could probably simplify
%% this if we rejigged recovery a bit.
{DupNames, ExpNames} = expand_queues(QNames),
case BQ:start(ExpNames) of
case BQ:start(VHost, ExpNames) of
{ok, ExpRecovery} ->
{ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
Else ->
Else
end.

stop() ->
stop(VHost) ->
BQ = bq(),
BQ:stop().
BQ:stop(VHost).

%%----------------------------------------------------------------------------

Expand Down
Loading