Skip to content

Commit c020b81

Browse files
author
Daniil Fedotov
committed
New upgrade scope queues started by boot step before queue recovery
1 parent c9f7fcb commit c020b81

File tree

3 files changed

+99
-83
lines changed

3 files changed

+99
-83
lines changed

src/rabbit.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@
146146
{requires, core_initialized},
147147
{enables, routing_ready}]}).
148148

149+
-rabbit_boot_step({upgrade_queues,
150+
[{description, "codec correctness check"},
151+
{mfa, {rabbit_upgrade,
152+
maybe_upgrade_queues,
153+
[]}},
154+
{requires, [core_initialized]},
155+
{enables, recovery}]}).
156+
149157
-rabbit_boot_step({recovery,
150158
[{description, "exchange, queue and binding recovery"},
151159
{mfa, {rabbit, recover, []}},

src/rabbit_upgrade.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-module(rabbit_upgrade).
1818

1919
-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
20+
maybe_upgrade_queues/0,
2021
nodes_running/1, secondary_upgrade/1]).
2122

2223
-include("rabbit.hrl").
@@ -255,6 +256,18 @@ maybe_upgrade_local() ->
255256

256257
%% -------------------------------------------------------------------
257258

259+
maybe_upgrade_queues() ->
260+
case rabbit_version:upgrades_required(queues) of
261+
{error, version_not_available} -> version_not_available;
262+
{error, starting_from_scratch} -> starting_from_scratch;
263+
{error, _} = Err -> throw(Err);
264+
{ok, []} -> ok;
265+
{ok, Upgrades} -> apply_upgrades(queues, Upgrades,
266+
fun() -> ok end)
267+
end.
268+
269+
%% -------------------------------------------------------------------
270+
258271
apply_upgrades(Scope, Upgrades, Fun) ->
259272
ok = rabbit_file:lock_file(lock_filename()),
260273
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),

src/rabbit_variable_queue.erl

Lines changed: 78 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@
346346
%%----------------------------------------------------------------------------
347347

348348
-rabbit_upgrade({multiple_routing_keys, local, []}).
349-
% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
349+
-rabbit_upgrade({move_messages_to_vhost_store, queues, []}).
350+
350351
-compile(export_all).
351352

352353
-ifdef(use_specs).
@@ -501,7 +502,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
501502
MsgOnDiskFun, AsyncCallback, VHost);
502503
false -> undefined
503504
end,
504-
msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
505+
msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
505506
AsyncCallback, VHost));
506507

507508
%% We can be recovering a transient queue if it crashed
@@ -522,7 +523,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
522523
false -> {undefined, fun(_MsgId) -> false end}
523524
end,
524525
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
525-
undefined, AsyncCallback,
526+
undefined, AsyncCallback,
526527
VHost),
527528
{DeltaCount, DeltaBytes, IndexState} =
528529
rabbit_queue_index:recover(
@@ -1203,8 +1204,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
12031204
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
12041205
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
12051206
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
1206-
fun () ->
1207-
Callback(?MODULE, CloseFDsFun)
1207+
fun () ->
1208+
Callback(?MODULE, CloseFDsFun)
12081209
end,
12091210
VHost).
12101211

@@ -2656,84 +2657,83 @@ transform_store(Store, TransformFun) ->
26562657
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
26572658

26582659
move_messages_to_vhost_store() ->
2659-
Queues = rabbit_variable_queue:list_persistent_queues(),
2660-
% Maybe recover old store.
2661-
{RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
2662-
OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
2663-
NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
2664-
lists:map(fun(Queue) ->
2665-
migrate_queue(Queue, OldStore, NewStoreSup)
2666-
end, Queues),
2667-
% Migrations = spawn_for_each(fun(Queue) ->
2668-
% migrate_queue(Queue, OldStore, NewStoreSup)
2669-
% end, Queues),
2670-
% wait(Migrations),
2671-
delete_old_store(OldStore).
2660+
rabbit_log:error("MIGRATING!!"),
2661+
Queues = list_persistent_queues(),
2662+
%% Old msg_store may require recovery.
2663+
%% This upgrade step should only be started
2664+
%% if we are upgrading from version with old store.
2665+
{RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
2666+
OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
2667+
%% New store should not be recovered.
2668+
NewStoreSup = start_new_store_sup(),
2669+
lists:map(fun(Queue) ->
2670+
migrate_queue(Queue, OldStore, NewStoreSup)
2671+
end,
2672+
Queues),
2673+
2674+
{ok, Gatherer} = gatherer:start_link(),
2675+
lists:map(
2676+
fun(Queue) ->
2677+
ok = gatherer:fork(Gatherer),
2678+
ok = worker_pool:submit_async(
2679+
fun () ->
2680+
migrate_queue(Queue, OldStore, NewStoreSup),
2681+
gatherer:finish(Gatherer)
2682+
end)
2683+
end,
2684+
Queues),
2685+
empty = gatherer:out(Gatherer),
2686+
ok = gatherer:stop(Gatherer),
2687+
2688+
delete_old_store(OldStore),
2689+
2690+
ok = rabbit_queue_index:stop(),
2691+
ok = rabbit_sup:stop_child(NewStoreSup).
26722692

26732693
migrate_queue(Queue, OldStore, NewStoreSup) ->
26742694
OldStoreClient = get_old_client(OldStore),
26752695
NewStoreClient = get_new_store_client(Queue, NewStoreSup),
2676-
walk_queue_index(
2677-
fun(MessageIdInStore, OldC) ->
2678-
case rabbit_msg_store:read(MessageIdInStore, OldStoreClient) of
2679-
{{ok, Msg}, OldC1} ->
2680-
ok = rabbit_msg_store:write(MessageIdInStore, Msg, NewStoreClient),
2681-
OldC1;
2682-
_ -> OldC
2683-
end
2684-
end,
2685-
OldStoreClient,
2686-
Queue).
2687-
2688-
walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
2689-
% WARNING: State is being recovered and terminated. This can cause side effects!
2696+
#amqqueue{name = QueueName} = Queue,
2697+
%% WARNING: During scan_queue_segments queue index state is being recovered
2698+
%% and terminated. This can cause side effects!
26902699
rabbit_queue_index:scan_queue_segments(
2691-
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
2700+
%% We migrate only persistent messages, which is stored in msg_store
2701+
%% and is not acked yet
2702+
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, false, OldC)
26922703
when is_binary(MsgId) ->
2693-
Fun(MsgId, ClientState);
2694-
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
2695-
ClientState
2696-
end, Client, QueueName).
2697-
2698-
spawn_for_each(Fun, List) ->
2699-
Ref = erlang:make_ref(),
2700-
Self = self(),
2701-
Processes = lists:map(
2702-
fun(El) ->
2703-
spawn_link(
2704-
fun() ->
2705-
Fun(El),
2706-
Self ! {ok, self(), Ref}
2707-
end)
2704+
migrate_message(MsgId, OldC, NewStoreClient);
2705+
(_SeqId, _MsgId, _MsgProps,
2706+
_IsPersistent, _IsDelivered, _IsAcked, OldC) ->
2707+
OldC
27082708
end,
2709-
List),
2710-
{Ref, Processes}.
2711-
2712-
wait({Ref, Processes}) ->
2713-
lists:foreach(
2714-
fun(Proc) ->
2715-
receive {ok, Proc, Ref} -> ok
2716-
end
2717-
end,
2718-
Processes).
2719-
2720-
get_new_store_client(Queue, NewStoreSup) ->
2721-
Vhost = queue_vhost(Queue),
2722-
get_new_client(NewStoreSup, Vhost).
2723-
2724-
queue_vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost.
2709+
OldStoreClient,
2710+
QueueName).
2711+
2712+
migrate_message(MsgId, OldC, NewC) ->
2713+
case rabbit_msg_store:read(MsgId, OldC) of
2714+
{{ok, Msg}, OldC1} ->
2715+
case rabbit_msg_store:contains(MsgId, NewC) of
2716+
false -> ok = rabbit_msg_store:write(MsgId, Msg, NewC);
2717+
true -> ok
2718+
end,
2719+
% TODO: maybe remove in batches?
2720+
ok = rabbit_msg_store:remove([MsgId], OldC1),
2721+
OldC1;
2722+
_ -> OldC
2723+
end;
27252724

2726-
get_new_client(NewStoreSup, VHost) ->
2727-
rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
2725+
get_new_store_client(#amqqueue{name = #resource{virtual_host = VHost}},
2726+
NewStoreSup) ->
2727+
rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
27282728
rabbit_guid:gen(),
2729-
fun(_,_) -> ok end,
2730-
fun() -> ok end,
2729+
fun(_,_) -> ok end,
2730+
fun() -> ok end,
27312731
VHost).
27322732

27332733
get_old_client(OldStore) ->
27342734
rabbit_msg_store:client_init(OldStore,
27352735
rabbit_guid:gen(),
2736-
fun(_,_) -> ok end,
2736+
fun(_,_) -> ok end,
27372737
fun() -> ok end).
27382738

27392739
list_persistent_queues() ->
@@ -2758,7 +2758,7 @@ start_recovery_terms(Queues) ->
27582758
end],
27592759
{Refs, StartFunState}.
27602760

2761-
run_old_persistent_store(Refs, StartFunState) ->
2761+
run_old_persistent_store(Refs, StartFunState) ->
27622762
OldStoreName = ?PERSISTENT_MSG_STORE,
27632763
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
27642764
[OldStoreName, rabbit_mnesia:dir(),
@@ -2767,22 +2767,17 @@ run_old_persistent_store(Refs, StartFunState) ->
27672767

27682768
start_new_store_sup() ->
27692769
% Start persistent store sup without recovery.
2770-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
2770+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
2771+
rabbit_msg_store_vhost_sup,
27712772
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
27722773
undefined, {fun (ok) -> finished end, ok}]),
27732774
?PERSISTENT_MSG_STORE_SUP.
27742775

27752776
delete_old_store(OldStore) ->
2776-
gen_server:stop(OldStore),
2777-
rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).
2778-
2779-
2780-
2781-
setup() ->
2782-
application:load(rabbit),
2783-
mnesia:start(),
2784-
rabbit_sup:start_link(),
2785-
rabbit:start_fhc(),
2786-
rabbit_sup:start_restartable_child(rabbit_guid),
2787-
rabbit_sup:start_supervisor_child(worker_pool_sup).
2777+
ok = rabbit_sup:stop_child(OldStore),
2778+
rabbit_file:recursive_delete(
2779+
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
2780+
%% Delete old transient store as well
2781+
rabbit_file:recursive_delete(
2782+
[filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).
27882783

0 commit comments

Comments
 (0)