Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Support per-vhost stop/start API for backing queue behaviour. #188

Merged
merged 4 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

-module(rabbit_amqqueue).

-export([recover/0, stop/0, start/1, declare/6, declare/7,
-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1, delete_crashed/1, delete_crashed/2,
delete_crashed_internal/2]).
Expand Down Expand Up @@ -70,8 +71,8 @@
{'absent', rabbit_types:amqqueue(),absent_reason()}.
-type not_found_or_absent() ::
'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}.
-spec recover() -> [rabbit_types:amqqueue()].
-spec stop() -> 'ok'.
-spec recover(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
-spec stop(rabbit_types:vhost()) -> 'ok'.
-spec start([rabbit_types:amqqueue()]) -> 'ok'.
-spec declare
(name(), boolean(), boolean(), rabbit_framing:amqp_table(),
Expand Down Expand Up @@ -210,10 +211,7 @@
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
arguments]).

recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
warn_file_limit() ->
DurableQueues = find_durable_queues(),
L = length(DurableQueues),

Expand All @@ -226,27 +224,23 @@ recover() ->
[L, file_handle_cache:get_limit(), L]);
false ->
ok
end,
end.

recover(VHost) ->
Queues = find_durable_queues(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),

%% We rely on BQ:start/1 returning the recovery terms in the same
%% order as the supplied queue names, so that we can zip them together
%% for further processing in recover_durable_queues.
{ok, OrderedRecoveryTerms} =
BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup_sup,
{rabbit_amqqueue_sup_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).

stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
{ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).

stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
ok = BQ:stop(VHost).

start(Qs) ->
%% At this point all recovered queues and their bindings are
Expand All @@ -256,6 +250,24 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.

find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
vhost = VH,
pid = Pid}
<- mnesia:table(rabbit_durable_queue),
VH =:= VHost,
node(Pid) == Node andalso
%% Terminations on node down will not remove the rabbit_queue
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
(mnesia:read(rabbit_queue, Name, read) =:= []
orelse not erlang:is_process_alive(Pid))]))
end).

find_durable_queues() ->
Node = node(),
mnesia:async_dirty(
Expand All @@ -268,8 +280,8 @@ find_durable_queues() ->
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
(mnesia:read(rabbit_queue, Name, read) =:= []
orelse not erlang:is_process_alive(Pid))]))
(mnesia:read(rabbit_queue, Name, read) =:= []
orelse not erlang:is_process_alive(Pid))]))
end).

recover_durable_queues(QueuesAndRecoveryTerms) ->
Expand Down
10 changes: 5 additions & 5 deletions src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@

-spec info_keys() -> rabbit_types:info_keys().

%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
%% Called on startup with a vhost and a list of durable queue names on this vhost.
%% The queues aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
%%
%% The list of queue recovery terms returned as {ok, Terms} must be given
%% in the same order as the list of queue names supplied.
-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).

%% Called to tear down any state/resources. NB: Implementations should
%% Called to tear down any state/resources for vhost. NB: Implementations should
%% not depend on this function being called on shutdown and instead
%% should hook into the rabbit supervision hierarchy.
-callback stop() -> 'ok'.
-callback stop(rabbit_types:vhost()) -> 'ok'.

%% Initialise the backing queue and its state.
%%
Expand Down
25 changes: 24 additions & 1 deletion src/supervisor2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@
[ChildSpec :: child_spec()]}}
| ignore.

%% Optional callback prep_stop/0. It cannot be exported as Erlang/OTP doesn't
%% support definition of optional callbacks.
%%
%% Currently used to stop application dependencies.
%%
%% -callback prep_stop() -> ok.
%%

-define(restarting(_Pid_), {restarting,_Pid_}).

%%% ---------------------------------------------------
Expand Down Expand Up @@ -629,6 +637,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
{ok, State1} ->
{noreply, State1};
{shutdown, State1} ->
prep_stop(State1),
{stop, shutdown, State1}
end;

Expand Down Expand Up @@ -801,7 +810,8 @@ restart_child(Pid, Reason, State) ->
try_restart(RestartType, Reason, Child, State) ->
case handle_restart(RestartType, Reason, Child, State) of
{ok, NState} -> {noreply, NState};
{shutdown, State2} -> {stop, shutdown, State2}
{shutdown, State2} -> prep_stop(State2),
{stop, shutdown, State2}
end.

do_restart(RestartType, Reason, Child, State) ->
Expand Down Expand Up @@ -1504,3 +1514,16 @@ report_progress(Child, SupName) ->
Progress = [{supervisor, SupName},
{started, extract_child(Child)}],
error_logger:info_report(progress, Progress).

prep_stop(#state{module = Mod}) ->
%% Catch any exception - including non-existing prep_stop -
%% and continue stopping the supervision tree.
%% This is only executed when children are terminating,
%% because any other call from a top supervisor or application
%% will cause a deadlock stopping applications within prep_stop.
try
Mod:prep_stop()
catch
_:_ ->
ok
end.