Skip to content
This repository was archived by the owner on Nov 17, 2020. It is now read-only.

Commit 52772a0

Browse files
author
Daniil Fedotov
committed
Support per-vhost stop/start API for backing queue behaviour.
1 parent 763bccc commit 52772a0

File tree

2 files changed

+44
-32
lines changed

2 files changed

+44
-32
lines changed

src/rabbit_amqqueue.erl

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
-module(rabbit_amqqueue).
1818

19-
-export([recover/0, stop/0, start/1, declare/6, declare/7,
19+
-export([warn_file_limit/0]).
20+
-export([recover/1, stop/1, start/1, declare/6, declare/7,
2021
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
2122
forget_all_durable/1, delete_crashed/1, delete_crashed/2,
2223
delete_crashed_internal/2]).
@@ -70,8 +71,8 @@
7071
{'absent', rabbit_types:amqqueue(),absent_reason()}.
7172
-type not_found_or_absent() ::
7273
'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}.
73-
-spec recover() -> [rabbit_types:amqqueue()].
74-
-spec stop() -> 'ok'.
74+
-spec recover(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
75+
-spec stop(rabbit_types:vhost()) -> 'ok'.
7576
-spec start([rabbit_types:amqqueue()]) -> 'ok'.
7677
-spec declare
7778
(name(), boolean(), boolean(), rabbit_framing:amqp_table(),
@@ -212,10 +213,7 @@
212213
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
213214
arguments]).
214215

215-
recover() ->
216-
%% Clear out remnants of old incarnation, in case we restarted
217-
%% faster than other nodes handled DOWN messages from us.
218-
on_node_down(node()),
216+
warn_file_limit() ->
219217
DurableQueues = find_durable_queues(),
220218
L = length(DurableQueues),
221219

@@ -228,27 +226,23 @@ recover() ->
228226
[L, file_handle_cache:get_limit(), L]);
229227
false ->
230228
ok
231-
end,
229+
end.
232230

231+
recover(VHost) ->
232+
Queues = find_durable_queues(VHost),
233233
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
234-
235234
%% We rely on BQ:start/1 returning the recovery terms in the same
236235
%% order as the supplied queue names, so that we can zip them together
237236
%% for further processing in recover_durable_queues.
238237
{ok, OrderedRecoveryTerms} =
239-
BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
240-
{ok,_} = supervisor:start_child(
241-
rabbit_sup,
242-
{rabbit_amqqueue_sup_sup,
243-
{rabbit_amqqueue_sup_sup, start_link, []},
244-
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
245-
recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
246-
247-
stop() ->
248-
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
249-
ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
238+
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
239+
{ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
240+
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).
241+
242+
stop(VHost) ->
243+
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
250244
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
251-
ok = BQ:stop().
245+
ok = BQ:stop(VHost).
252246

253247
start(Qs) ->
254248
%% At this point all recovered queues and their bindings are
@@ -258,6 +252,24 @@ start(Qs) ->
258252
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
259253
ok.
260254

255+
find_durable_queues(VHost) ->
256+
Node = node(),
257+
mnesia:async_dirty(
258+
fun () ->
259+
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
260+
vhost = VH,
261+
pid = Pid}
262+
<- mnesia:table(rabbit_durable_queue),
263+
VH =:= VHost,
264+
node(Pid) == Node andalso
265+
%% Terminations on node down will not remove the rabbit_queue
266+
%% record if it is a mirrored queue (such info is now obtained from
267+
%% the policy). Thus, we must check if the local pid is alive
268+
%% - if the record is present - in order to restart.
269+
(mnesia:read(rabbit_queue, Name, read) =:= []
270+
orelse not erlang:is_process_alive(Pid))]))
271+
end).
272+
261273
find_durable_queues() ->
262274
Node = node(),
263275
mnesia:async_dirty(
@@ -266,12 +278,12 @@ find_durable_queues() ->
266278
pid = Pid}
267279
<- mnesia:table(rabbit_durable_queue),
268280
node(Pid) == Node andalso
269-
%% Terminations on node down will not remove the rabbit_queue
270-
%% record if it is a mirrored queue (such info is now obtained from
271-
%% the policy). Thus, we must check if the local pid is alive
272-
%% - if the record is present - in order to restart.
273-
(mnesia:read(rabbit_queue, Name, read) =:= []
274-
orelse not erlang:is_process_alive(Pid))]))
281+
%% Terminations on node down will not remove the rabbit_queue
282+
%% record if it is a mirrored queue (such info is now obtained from
283+
%% the policy). Thus, we must check if the local pid is alive
284+
%% - if the record is present - in order to restart.
285+
(mnesia:read(rabbit_queue, Name, read) =:= []
286+
orelse not erlang:is_process_alive(Pid))]))
275287
end).
276288

277289
recover_durable_queues(QueuesAndRecoveryTerms) ->

src/rabbit_backing_queue.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,19 @@
5454

5555
-spec info_keys() -> rabbit_types:info_keys().
5656

57-
%% Called on startup with a list of durable queue names. The queues
58-
%% aren't being started at this point, but this call allows the
57+
%% Called on startup with a vhost and a list of durable queue names on this vhost.
58+
%% The queues aren't being started at this point, but this call allows the
5959
%% backing queue to perform any checking necessary for the consistency
6060
%% of those queues, or initialise any other shared resources.
6161
%%
6262
%% The list of queue recovery terms returned as {ok, Terms} must be given
6363
%% in the same order as the list of queue names supplied.
64-
-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
64+
-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
6565

66-
%% Called to tear down any state/resources. NB: Implementations should
66+
%% Called to tear down any state/resources for vhost. NB: Implementations should
6767
%% not depend on this function being called on shutdown and instead
6868
%% should hook into the rabbit supervision hierarchy.
69-
-callback stop() -> 'ok'.
69+
-callback stop(rabbit_types:vhost()) -> 'ok'.
7070

7171
%% Initialise the backing queue and its state.
7272
%%

0 commit comments

Comments
 (0)