Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Commit 9123889

Browse files
Merge pull request #188 from rabbitmq/rabbitmq-server-1146-full
Support per-vhost stop/start API for backing queue behaviour.
2 parents 5a6d690 + 883f554 commit 9123889

File tree

3 files changed

+64
-29
lines changed

3 files changed

+64
-29
lines changed

src/rabbit_amqqueue.erl

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
-module(rabbit_amqqueue).
1818

19-
-export([recover/0, stop/0, start/1, declare/6, declare/7,
19+
-export([warn_file_limit/0]).
20+
-export([recover/1, stop/1, start/1, declare/6, declare/7,
2021
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
2122
forget_all_durable/1, delete_crashed/1, delete_crashed/2,
2223
delete_crashed_internal/2]).
@@ -70,8 +71,8 @@
7071
{'absent', rabbit_types:amqqueue(),absent_reason()}.
7172
-type not_found_or_absent() ::
7273
'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}.
73-
-spec recover() -> [rabbit_types:amqqueue()].
74-
-spec stop() -> 'ok'.
74+
-spec recover(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
75+
-spec stop(rabbit_types:vhost()) -> 'ok'.
7576
-spec start([rabbit_types:amqqueue()]) -> 'ok'.
7677
-spec declare
7778
(name(), boolean(), boolean(), rabbit_framing:amqp_table(),
@@ -210,10 +211,7 @@
210211
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
211212
arguments]).
212213

213-
recover() ->
214-
%% Clear out remnants of old incarnation, in case we restarted
215-
%% faster than other nodes handled DOWN messages from us.
216-
on_node_down(node()),
214+
warn_file_limit() ->
217215
DurableQueues = find_durable_queues(),
218216
L = length(DurableQueues),
219217

@@ -226,27 +224,23 @@ recover() ->
226224
[L, file_handle_cache:get_limit(), L]);
227225
false ->
228226
ok
229-
end,
227+
end.
230228

229+
recover(VHost) ->
230+
Queues = find_durable_queues(VHost),
231231
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
232-
233232
%% We rely on BQ:start/1 returning the recovery terms in the same
234233
%% order as the supplied queue names, so that we can zip them together
235234
%% for further processing in recover_durable_queues.
236235
{ok, OrderedRecoveryTerms} =
237-
BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
238-
{ok,_} = supervisor:start_child(
239-
rabbit_sup,
240-
{rabbit_amqqueue_sup_sup,
241-
{rabbit_amqqueue_sup_sup, start_link, []},
242-
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
243-
recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
244-
245-
stop() ->
246-
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
247-
ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
236+
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
237+
{ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
238+
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).
239+
240+
stop(VHost) ->
241+
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
248242
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
249-
ok = BQ:stop().
243+
ok = BQ:stop(VHost).
250244

251245
start(Qs) ->
252246
%% At this point all recovered queues and their bindings are
@@ -256,6 +250,24 @@ start(Qs) ->
256250
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
257251
ok.
258252

253+
find_durable_queues(VHost) ->
254+
Node = node(),
255+
mnesia:async_dirty(
256+
fun () ->
257+
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
258+
vhost = VH,
259+
pid = Pid}
260+
<- mnesia:table(rabbit_durable_queue),
261+
VH =:= VHost,
262+
node(Pid) == Node andalso
263+
%% Terminations on node down will not remove the rabbit_queue
264+
%% record if it is a mirrored queue (such info is now obtained from
265+
%% the policy). Thus, we must check if the local pid is alive
266+
%% - if the record is present - in order to restart.
267+
(mnesia:read(rabbit_queue, Name, read) =:= []
268+
orelse not erlang:is_process_alive(Pid))]))
269+
end).
270+
259271
find_durable_queues() ->
260272
Node = node(),
261273
mnesia:async_dirty(
@@ -268,8 +280,8 @@ find_durable_queues() ->
268280
%% record if it is a mirrored queue (such info is now obtained from
269281
%% the policy). Thus, we must check if the local pid is alive
270282
%% - if the record is present - in order to restart.
271-
(mnesia:read(rabbit_queue, Name, read) =:= []
272-
orelse not erlang:is_process_alive(Pid))]))
283+
(mnesia:read(rabbit_queue, Name, read) =:= []
284+
orelse not erlang:is_process_alive(Pid))]))
273285
end).
274286

275287
recover_durable_queues(QueuesAndRecoveryTerms) ->

src/rabbit_backing_queue.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,19 @@
5454

5555
-spec info_keys() -> rabbit_types:info_keys().
5656

57-
%% Called on startup with a list of durable queue names. The queues
58-
%% aren't being started at this point, but this call allows the
57+
%% Called on startup with a vhost and a list of durable queue names on this vhost.
58+
%% The queues aren't being started at this point, but this call allows the
5959
%% backing queue to perform any checking necessary for the consistency
6060
%% of those queues, or initialise any other shared resources.
6161
%%
6262
%% The list of queue recovery terms returned as {ok, Terms} must be given
6363
%% in the same order as the list of queue names supplied.
64-
-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
64+
-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
6565

66-
%% Called to tear down any state/resources. NB: Implementations should
66+
%% Called to tear down any state/resources for vhost. NB: Implementations should
6767
%% not depend on this function being called on shutdown and instead
6868
%% should hook into the rabbit supervision hierarchy.
69-
-callback stop() -> 'ok'.
69+
-callback stop(rabbit_types:vhost()) -> 'ok'.
7070

7171
%% Initialise the backing queue and its state.
7272
%%

src/supervisor2.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,14 @@
142142
[ChildSpec :: child_spec()]}}
143143
| ignore.
144144

145+
%% Optional callback prep_stop/0. It cannot be exported as Erlang/OTP doesn't
146+
%% support definition of optional callbacks.
147+
%%
148+
%% Currently used to stop application dependencies.
149+
%%
150+
%% -callback prep_stop() -> ok.
151+
%%
152+
145153
-define(restarting(_Pid_), {restarting,_Pid_}).
146154

147155
%%% ---------------------------------------------------
@@ -629,6 +637,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
629637
{ok, State1} ->
630638
{noreply, State1};
631639
{shutdown, State1} ->
640+
prep_stop(State1),
632641
{stop, shutdown, State1}
633642
end;
634643

@@ -801,7 +810,8 @@ restart_child(Pid, Reason, State) ->
801810
try_restart(RestartType, Reason, Child, State) ->
802811
case handle_restart(RestartType, Reason, Child, State) of
803812
{ok, NState} -> {noreply, NState};
804-
{shutdown, State2} -> {stop, shutdown, State2}
813+
{shutdown, State2} -> prep_stop(State2),
814+
{stop, shutdown, State2}
805815
end.
806816

807817
do_restart(RestartType, Reason, Child, State) ->
@@ -1504,3 +1514,16 @@ report_progress(Child, SupName) ->
15041514
Progress = [{supervisor, SupName},
15051515
{started, extract_child(Child)}],
15061516
error_logger:info_report(progress, Progress).
1517+
1518+
prep_stop(#state{module = Mod}) ->
1519+
%% Catch any exception - including non-existing prep_stop -
1520+
%% and continue stopping the supervision tree.
1521+
%% This is only executed when children are terminating,
1522+
%% because any other call from a top supervisor or application
1523+
%% will cause a deadlock stopping applications within prep_stop.
1524+
try
1525+
Mod:prep_stop()
1526+
catch
1527+
_:_ ->
1528+
ok
1529+
end.

0 commit comments

Comments
 (0)