Skip to content

Commit 6cb3d32

Browse files
author
Daniil Fedotov
committed
Per-vhost supervision trees for queues and message stores.
Per-vhost message stores can be restarted, but queues contain references for old message stores in message store client data, also queues rely on message store process to report confirms for messages on disk. Because after message store restart queues will not get any confirms and will fail with badarg error trying to access message store with an old client, queue processes should be restarted together with message stores. Queue process cannot monitor message store because of backing_queue mechanism, so they should be controlled by a supervision tree. One tree will contain queues supervisor and message store proecesses. Per-vhost supervisor will restart if any of it's children dies. Per-vhost supervisor restart process will do queue and message store data recovery the same way as pre-3.7 global message store did, just with VHost as an argument and in a vhost data directory.
1 parent 6fa3ee4 commit 6cb3d32

20 files changed

+3738
-362
lines changed

src/rabbit.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -813,10 +813,7 @@ boot_delegate() ->
813813

814814
recover() ->
815815
rabbit_policy:recover(),
816-
Qs = rabbit_amqqueue:recover(),
817-
ok = rabbit_binding:recover(rabbit_exchange:recover(),
818-
[QName || #amqqueue{name = QName} <- Qs]),
819-
rabbit_amqqueue:start(Qs).
816+
rabbit_vhost:recover().
820817

821818
maybe_insert_default_data() ->
822819
case rabbit_table:needs_default_data() of

src/rabbit_amqqueue_sup_sup.erl

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
-behaviour(supervisor2).
2020

2121
-export([start_link/0, start_queue_process/3]).
22+
-export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]).
2223

2324
-export([init/1]).
2425

@@ -36,14 +37,35 @@
3637
%%----------------------------------------------------------------------------
3738

3839
start_link() ->
39-
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
40+
supervisor2:start_link(?MODULE, []).
4041

4142
start_queue_process(Node, Q, StartMode) ->
42-
{ok, _SupPid, QPid} = supervisor2:start_child(
43-
{?SERVER, Node}, [Q, StartMode]),
43+
#amqqueue{name = #resource{virtual_host = VHost}} = Q,
44+
{ok, Sup} = find_for_vhost(VHost, Node),
45+
{ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
4446
QPid.
4547

4648
init([]) ->
4749
{ok, {{simple_one_for_one, 10, 10},
4850
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
4951
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
52+
53+
find_for_vhost(VHost, Node) ->
54+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
55+
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
56+
[QSup] -> {ok, QSup};
57+
Result -> {error, {queue_supervisor_not_found, Result}}
58+
end.
59+
60+
start_for_vhost(VHost) ->
61+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
62+
supervisor2:start_child(
63+
VHostSup,
64+
{rabbit_amqqueue_sup_sup,
65+
{rabbit_amqqueue_sup_sup, start_link, []},
66+
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
67+
68+
stop_for_vhost(VHost) ->
69+
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
70+
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
71+
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).

src/rabbit_exchange.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
-include("rabbit.hrl").
1919
-include("rabbit_framing.hrl").
2020

21-
-export([recover/0, policy_changed/2, callback/4, declare/7,
21+
-export([recover/1, policy_changed/2, callback/4, declare/7,
2222
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
2323
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
2424
update_scratch/3, update_decorators/1, immutable/1,
@@ -36,7 +36,7 @@
3636
-type type() :: atom().
3737
-type fun_name() :: atom().
3838

39-
-spec recover() -> [name()].
39+
-spec recover(rabbit_types:vhost()) -> [name()].
4040
-spec callback
4141
(rabbit_types:exchange(), fun_name(),
4242
fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
@@ -107,10 +107,11 @@
107107
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
108108
policy, user_who_performed_action]).
109109

110-
recover() ->
110+
recover(VHost) ->
111111
Xs = rabbit_misc:table_filter(
112112
fun (#exchange{name = XName}) ->
113-
mnesia:read({rabbit_exchange, XName}) =:= []
113+
XName#resource.virtual_host =:= VHost andalso
114+
mnesia:read({rabbit_exchange, XName}) =:= []
114115
end,
115116
fun (X, Tx) ->
116117
X1 = case Tx of

src/rabbit_mirror_queue_master.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
2727
zip_msgs_and_acks/4]).
2828

29-
-export([start/1, stop/0, delete_crashed/1]).
29+
-export([start/2, stop/1, delete_crashed/1]).
3030

3131
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
3232

@@ -81,12 +81,12 @@
8181
%% Backing queue
8282
%% ---------------------------------------------------------------------------
8383

84-
start(_DurableQueues) ->
84+
start(_Vhost, _DurableQueues) ->
8585
%% This will never get called as this module will never be
8686
%% installed as the default BQ implementation.
8787
exit({not_valid_for_generic_backing_queue, ?MODULE}).
8888

89-
stop() ->
89+
stop(_Vhost) ->
9090
%% Same as start/1.
9191
exit({not_valid_for_generic_backing_queue, ?MODULE}).
9292

src/rabbit_mirror_queue_slave.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ stop_pending_slaves(QName, Pids) ->
194194
[begin
195195
rabbit_mirror_queue_misc:log_warning(
196196
QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
197+
%TODO: per-vhost supervisor
197198
case erlang:process_info(Pid, dictionary) of
198199
undefined -> ok;
199200
{dictionary, Dict} ->

src/rabbit_msg_store_vhost_sup.erl

Lines changed: 0 additions & 103 deletions
This file was deleted.

src/rabbit_priority_queue.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
-export([enable/0]).
3232

33-
-export([start/1, stop/0]).
33+
-export([start/2, stop/1]).
3434

3535
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
3636
purge/1, purge_acks/1,
@@ -83,22 +83,22 @@ enable() ->
8383

8484
%%----------------------------------------------------------------------------
8585

86-
start(QNames) ->
86+
start(VHost, QNames) ->
8787
BQ = bq(),
8888
%% TODO this expand-collapse dance is a bit ridiculous but it's what
8989
%% rabbit_amqqueue:recover/0 expects. We could probably simplify
9090
%% this if we rejigged recovery a bit.
9191
{DupNames, ExpNames} = expand_queues(QNames),
92-
case BQ:start(ExpNames) of
92+
case BQ:start(VHost, ExpNames) of
9393
{ok, ExpRecovery} ->
9494
{ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
9595
Else ->
9696
Else
9797
end.
9898

99-
stop() ->
99+
stop(VHost) ->
100100
BQ = bq(),
101-
BQ:stop().
101+
BQ:stop(VHost).
102102

103103
%%----------------------------------------------------------------------------
104104

src/rabbit_queue_index.erl

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
-module(rabbit_queue_index).
1818

1919
-export([erase/1, init/3, reset_state/1, recover/6,
20-
terminate/2, delete_and_terminate/1,
20+
terminate/3, delete_and_terminate/1,
2121
pre_publish/7, flush_pre_publish_cache/2,
2222
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
23-
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
23+
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
2424

2525
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
2626
-export([scan_queue_segments/3]).
@@ -261,7 +261,7 @@
261261
on_sync_fun(), on_sync_fun()) ->
262262
{'undefined' | non_neg_integer(),
263263
'undefined' | non_neg_integer(), qistate()}.
264-
-spec terminate([any()], qistate()) -> qistate().
264+
-spec terminate(rabbit_types:vhsot(), [any()], qistate()) -> qistate().
265265
-spec delete_and_terminate(qistate()) -> qistate().
266266
-spec publish(rabbit_types:msg_id(), seq_id(),
267267
rabbit_types:message_properties(), boolean(),
@@ -278,7 +278,7 @@
278278
-spec next_segment_boundary(seq_id()) -> seq_id().
279279
-spec bounds(qistate()) ->
280280
{non_neg_integer(), non_neg_integer(), qistate()}.
281-
-spec start([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
281+
-spec start(rabbit_types:vhsot(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
282282

283283
-spec add_queue_ttl() -> 'ok'.
284284

@@ -321,9 +321,9 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
321321
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
322322
end.
323323

324-
terminate(Terms, State = #qistate { dir = Dir }) ->
324+
terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
325325
{SegmentCounts, State1} = terminate(State),
326-
rabbit_recovery_terms:store(filename:basename(Dir),
326+
rabbit_recovery_terms:store(VHost, filename:basename(Dir),
327327
[{segments, SegmentCounts} | Terms]),
328328
State1.
329329

@@ -491,34 +491,36 @@ bounds(State = #qistate { segments = Segments }) ->
491491
end,
492492
{LowSeqId, NextSeqId, State}.
493493

494-
start(DurableQueueNames) ->
495-
ok = rabbit_recovery_terms:start(),
494+
start(VHost, DurableQueueNames) ->
495+
ok = rabbit_recovery_terms:start(VHost),
496496
{DurableTerms, DurableDirectories} =
497497
lists:foldl(
498498
fun(QName, {RecoveryTerms, ValidDirectories}) ->
499499
DirName = queue_name_to_dir_name(QName),
500-
RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
500+
RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
501501
{error, _} -> non_clean_shutdown;
502502
{ok, Terms} -> Terms
503503
end,
504504
{[RecoveryInfo | RecoveryTerms],
505505
sets:add_element(DirName, ValidDirectories)}
506506
end, {[], sets:new()}, DurableQueueNames),
507-
508507
%% Any queue directory we've not been asked to recover is considered garbage
509508
rabbit_file:recursive_delete(
510509
[DirName ||
511-
DirName <- all_queue_directory_names(),
510+
DirName <- all_queue_directory_names(VHost),
512511
not sets:is_element(filename:basename(DirName), DurableDirectories)]),
513-
514-
rabbit_recovery_terms:clear(),
512+
rabbit_recovery_terms:clear(VHost),
515513

516514
%% The backing queue interface requires that the queue recovery terms
517515
%% which come back from start/1 are in the same order as DurableQueueNames
518516
OrderedTerms = lists:reverse(DurableTerms),
519517
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
520518

521-
stop() -> rabbit_recovery_terms:stop().
519+
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
520+
521+
all_queue_directory_names(VHost) ->
522+
filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
523+
"queues", "*"])).
522524

523525
all_queue_directory_names() ->
524526
filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
@@ -1447,6 +1449,6 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
14471449
end,
14481450
ok.
14491451

1450-
update_recovery_term(#resource{} = QueueName, Term) ->
1452+
update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
14511453
Key = queue_name_to_dir_name(QueueName),
1452-
rabbit_recovery_terms:store(Key, Term).
1454+
rabbit_recovery_terms:store(VHost, Key, Term).

0 commit comments

Comments
 (0)