Skip to content

Commit 1d246e3

Browse files
author
Daniil Fedotov
committed
Migrating to per-vhost supervisor message store.
Support reading/saving recovery terms from global storage to per-vhost storages.
1 parent e4d6c27 commit 1d246e3

7 files changed

+88
-33
lines changed

src/rabbit_amqqueue_sup_sup.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
-behaviour(supervisor2).
2020

2121
-export([start_link/0, start_queue_process/3]).
22-
-export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]).
22+
-export([start_for_vhost/1, stop_for_vhost/1,
23+
find_for_vhost/2, find_for_vhost/1]).
2324

2425
-export([init/1]).
2526

@@ -50,13 +51,19 @@ init([]) ->
5051
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
5152
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
5253

54+
-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
55+
find_for_vhost(VHost) ->
56+
find_for_vhost(VHost, node()).
57+
58+
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
5359
find_for_vhost(VHost, Node) ->
5460
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
5561
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
5662
[QSup] -> {ok, QSup};
5763
Result -> {error, {queue_supervisor_not_found, Result}}
5864
end.
5965

66+
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
6067
start_for_vhost(VHost) ->
6168
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
6269
supervisor2:start_child(
@@ -65,6 +72,7 @@ start_for_vhost(VHost) ->
6572
{rabbit_amqqueue_sup_sup, start_link, []},
6673
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
6774

75+
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
6876
stop_for_vhost(VHost) ->
6977
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
7078
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),

src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,13 @@ stop_pending_slaves(QName, Pids) ->
194194
[begin
195195
rabbit_mirror_queue_misc:log_warning(
196196
QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
197-
%TODO: per-vhost supervisor
198197
case erlang:process_info(Pid, dictionary) of
199198
undefined -> ok;
200199
{dictionary, Dict} ->
200+
Vhost = QName#resource.virtual_host,
201+
{ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
201202
case proplists:get_value('$ancestors', Dict) of
202-
[Sup, rabbit_amqqueue_sup_sup | _] ->
203+
[Sup, AmqQSup | _] ->
203204
exit(Sup, kill),
204205
exit(Pid, kill);
205206
_ ->

src/rabbit_queue_index.erl

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
-export([scan_queue_segments/3]).
2727

2828
%% Migrates from global to per-vhost message stores
29-
-export([move_to_per_vhost_stores/1, update_recovery_term/2]).
29+
-export([move_to_per_vhost_stores/1,
30+
update_recovery_term/2,
31+
read_global_recovery_terms/1,
32+
cleanup_global_recovery_terms/0]).
3033

3134
-define(CLEAN_FILENAME, "clean.dot").
3235

@@ -516,6 +519,33 @@ start(VHost, DurableQueueNames) ->
516519
OrderedTerms = lists:reverse(DurableTerms),
517520
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
518521

522+
523+
read_global_recovery_terms(DurableQueueNames) ->
524+
ok = rabbit_recovery_terms:open_global_table(),
525+
526+
DurableTerms =
527+
lists:foldl(
528+
fun(QName, RecoveryTerms) ->
529+
DirName = queue_name_to_dir_name(QName),
530+
RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
531+
{error, _} -> non_clean_shutdown;
532+
{ok, Terms} -> Terms
533+
end,
534+
[RecoveryInfo | RecoveryTerms]
535+
end, [], DurableQueueNames),
536+
537+
ok = rabbit_recovery_terms:close_global_table(),
538+
%% The backing queue interface requires that the queue recovery terms
539+
%% which come back from start/1 are in the same order as DurableQueueNames
540+
OrderedTerms = lists:reverse(DurableTerms),
541+
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
542+
543+
cleanup_global_recovery_terms() ->
544+
rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
545+
rabbit_recovery_terms:delete_global_table(),
546+
ok.
547+
548+
519549
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
520550

521551
all_queue_directory_names(VHost) ->

src/rabbit_recovery_terms.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
terminate/2, code_change/3]).
2929

3030
-export([upgrade_recovery_terms/0, persistent_bytes/0]).
31+
-export([open_global_table/0, close_global_table/0,
32+
read_global/1, delete_global_table/0]).
33+
-export([open_table/1, close_table/1]).
3134

3235
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
3336
-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
@@ -119,12 +122,19 @@ open_global_table() ->
119122
File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
120123
{ok, _} = dets:open_file(?MODULE, [{file, File},
121124
{ram_file, true},
122-
{auto_save, infinity}]).
125+
{auto_save, infinity}]),
126+
ok.
123127

124128
close_global_table() ->
125129
ok = dets:sync(?MODULE),
126130
ok = dets:close(?MODULE).
127131

132+
read_global(DirBaseName) ->
133+
read(?MODULE, DirBaseName).
134+
135+
delete_global_table() ->
136+
file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")).
137+
128138
%%----------------------------------------------------------------------------
129139

130140
init([VHost]) ->

src/rabbit_variable_queue.erl

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2753,7 +2753,7 @@ transform_store(Store, TransformFun) ->
27532753

27542754
move_messages_to_vhost_store() ->
27552755
case list_persistent_queues() of
2756-
[] -> ok;
2756+
% [] -> ok;
27572757
Queues -> move_messages_to_vhost_store(Queues)
27582758
end.
27592759

@@ -2776,21 +2776,23 @@ move_messages_to_vhost_store(Queues) ->
27762776
VHosts = rabbit_vhost:list(),
27772777

27782778
%% New store should not be recovered.
2779-
ok = start_new_store(VHosts),
2779+
NewMsgStore = start_new_store(VHosts),
2780+
%% Recovery terms should be started for all vhosts for new store.
2781+
[{ok, _} = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
2782+
27802783
MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
27812784
?QUEUE_MIGRATION_BATCH_SIZE),
27822785
in_batches(MigrationBatchSize,
2783-
{rabbit_variable_queue, migrate_queue, [OldStore]},
2786+
{rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
27842787
QueuesWithTerms,
27852788
"message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
27862789
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
27872790

27882791
log_upgrade("Message store migration finished"),
2789-
delete_old_store(OldStore),
2790-
2791-
ok = rabbit_queue_index:stop(),
2792-
ok = stop_new_store(VHosts),
2793-
ok.
2792+
ok = delete_old_store(OldStore),
2793+
ok = rabbit_queue_index:cleanup_global_recovery_terms(),
2794+
[ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
2795+
ok = stop_new_store(NewMsgStore).
27942796

27952797
in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
27962798
in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
@@ -2816,12 +2818,14 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
28162818
rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
28172819
in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
28182820

2819-
migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore) ->
2821+
migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
2822+
RecoveryTerm},
2823+
OldStore, NewStore) ->
28202824
log_upgrade_verbose(
28212825
"Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
28222826
[Name, VHost]),
28232827
OldStoreClient = get_global_store_client(OldStore),
2824-
NewStoreClient = get_per_vhost_store_client(QueueName),
2828+
NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
28252829
%% WARNING: During scan_queue_segments queue index state is being recovered
28262830
%% and terminated. This can cause side effects!
28272831
rabbit_queue_index:scan_queue_segments(
@@ -2857,11 +2861,10 @@ migrate_message(MsgId, OldC, NewC) ->
28572861
_ -> OldC
28582862
end.
28592863

2860-
get_per_vhost_store_client(#resource{virtual_host = VHost}) ->
2861-
rabbit_vhost_msg_store:client_init(VHost, ?PERSISTENT_MSG_STORE,
2862-
rabbit_guid:gen(),
2863-
fun(_,_) -> ok end,
2864-
fun() -> ok end).
2864+
get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
2865+
{VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
2866+
rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
2867+
fun(_,_) -> ok end, fun() -> ok end).
28652868

28662869
get_global_store_client(OldStore) ->
28672870
rabbit_msg_store:client_init(OldStore,
@@ -2902,19 +2905,22 @@ run_old_persistent_store(Refs, StartFunState) ->
29022905

29032906
start_new_store(VHosts) ->
29042907
%% Ensure vhost supervisor is started, so we can add vhsots to it.
2905-
%% TODO: Start message store for vhost without a supervisor.
2906-
lists:foreach(fun(VHost) ->
2907-
% Start persistent store without recovery.
2908-
{ok, _} = rabbit_vhost_msg_store:start(VHost, ?PERSISTENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE)
2908+
lists:map(fun(VHost) ->
2909+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
2910+
{ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
2911+
VHostDir,
2912+
undefined,
2913+
?EMPTY_START_FUN_STATE),
2914+
{VHost, Pid}
29092915
end,
2910-
VHosts),
2911-
ok.
2916+
VHosts).
29122917

2913-
stop_new_store(VHosts) ->
2914-
lists:foreach(fun(VHost) ->
2915-
ok = rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE)
2918+
stop_new_store(NewStore) ->
2919+
lists:foreach(fun({_VHost, StorePid}) ->
2920+
unlink(StorePid),
2921+
exit(StorePid, shutdown)
29162922
end,
2917-
VHosts),
2923+
NewStore),
29182924
ok.
29192925

29202926
delete_old_store(OldStore) ->
@@ -2923,7 +2929,8 @@ delete_old_store(OldStore) ->
29232929
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
29242930
%% Delete old transient store as well
29252931
rabbit_file:recursive_delete(
2926-
[filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).
2932+
[filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
2933+
ok.
29272934

29282935
log_upgrade(Msg) ->
29292936
log_upgrade(Msg, []).

src/rabbit_vhost_sup_sup.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ stop_and_delete_vhost(VHost) ->
6464
end.
6565

6666
delete_on_all_nodes(VHost) ->
67-
%% TODO: failing nodes
6867
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
6968
ok.
7069

test/unit_inbroker_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ bq_queue_recover1(Config) ->
829829
rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
830830
true, false, [], none, <<"acting-user">>),
831831
publish_and_confirm(Q, <<>>, Count),
832-
%% TODO: per-vhost supervisor
832+
833833
SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q),
834834
true = is_pid(SupPid),
835835
exit(SupPid, kill),

0 commit comments

Comments
 (0)