Skip to content

Commit c9f7fcb

Browse files
author
Daniil Fedotov
committed
wip: migrating to vhost base messge store
1 parent 3187ea3 commit c9f7fcb

File tree

3 files changed

+70
-46
lines changed

3 files changed

+70
-46
lines changed

src/rabbit_msg_store.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
708708

709709
Dir = filename:join(BaseDir, atom_to_list(Server)),
710710

711-
{ok, IndexModule} = application:get_env(msg_store_index_module),
711+
{ok, IndexModule} = application:get_env(rabbit,msg_store_index_module),
712712
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
713713

714714
AttemptFileSummaryRecovery =
@@ -746,7 +746,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
746746
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
747747
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
748748

749-
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
749+
{ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit),
750750

751751
{ok, GCPid} = rabbit_msg_store_gc:start_link(
752752
#gc_state { dir = Dir,

src/rabbit_queue_index.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
2424

2525
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
26+
-export([scan_queue_segments/3]).
2627

2728
-define(CLEAN_FILENAME, "clean.dot").
2829

@@ -664,20 +665,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
664665
end.
665666

666667
queue_index_walker_reader(QueueName, Gatherer) ->
667-
State = blank_state(QueueName),
668-
ok = scan_segments(
668+
ok = scan_queue_segments(
669669
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
670670
when is_binary(MsgId) ->
671671
gatherer:sync_in(Gatherer, {MsgId, 1});
672672
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
673673
_IsAcked, Acc) ->
674674
Acc
675-
end, ok, State),
675+
end, ok, QueueName),
676676
ok = gatherer:finish(Gatherer).
677677

678-
scan_segments(Fun, Acc, State) ->
679-
State1 = #qistate { segments = Segments, dir = Dir } =
680-
recover_journal(State),
678+
scan_queue_segments(Fun, Acc, QueueName) ->
679+
State = #qistate { segments = Segments, dir = Dir } =
680+
recover_journal(blank_state(QueueName)),
681681
Result = lists:foldr(
682682
fun (Seg, AccN) ->
683683
segment_entries_foldr(
@@ -686,8 +686,8 @@ scan_segments(Fun, Acc, State) ->
686686
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
687687
IsPersistent, IsDelivered, IsAcked, AccM)
688688
end, AccN, segment_find_or_new(Seg, Dir, Segments))
689-
end, Acc, all_segment_nums(State1)),
690-
{_SegmentCounts, _State} = terminate(State1),
689+
end, Acc, all_segment_nums(State)),
690+
{_SegmentCounts, _State} = terminate(State),
691691
Result.
692692

693693
%%----------------------------------------------------------------------------

src/rabbit_variable_queue.erl

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
%% exported for testing only
3434
-export([start_msg_store/2, stop_msg_store/0, init/6]).
3535

36+
-export([move_messages_to_vhost_store/0]).
37+
-include_lib("stdlib/include/qlc.hrl").
38+
3639
%%----------------------------------------------------------------------------
3740
%% Messages, and their position in the queue, can be in memory or on
3841
%% disk, or both. Persistent messages will have both message and
@@ -330,8 +333,11 @@
330333
}).
331334

332335
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
336+
-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
337+
-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
333338
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
334339
-define(TRANSIENT_MSG_STORE, msg_store_transient).
340+
335341
-define(QUEUE, lqueue).
336342

337343
-include("rabbit.hrl").
@@ -340,7 +346,8 @@
340346
%%----------------------------------------------------------------------------
341347

342348
-rabbit_upgrade({multiple_routing_keys, local, []}).
343-
-rabbit_upgrade({move_messages_to_vhost_store, local, []}).
349+
% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
350+
-compile(export_all).
344351

345352
-ifdef(use_specs).
346353

@@ -436,6 +443,8 @@
436443
%% Public API
437444
%%----------------------------------------------------------------------------
438445

446+
447+
439448
start(DurableQueues) ->
440449
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
441450
start_msg_store(
@@ -454,23 +463,23 @@ stop() ->
454463

455464
start_msg_store(Refs, StartFunState) ->
456465
VHosts = rabbit_vhost:list(),
457-
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
458-
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
466+
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
467+
[?TRANSIENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
459468
undefined, {fun (ok) -> finished end, ok}]),
460-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
461-
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
469+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
470+
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
462471
Refs, StartFunState]),
463472
lists:foreach(
464473
fun(VHost) ->
465-
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
466-
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
474+
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
475+
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost)
467476
end,
468477
VHosts),
469478
ok.
470479

471480
stop_msg_store() ->
472-
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
473-
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
481+
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
482+
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
474483

475484
init(Queue, Recover, Callback) ->
476485
init(
@@ -488,11 +497,11 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
488497
VHost = QueueName#resource.virtual_host,
489498
init(IsDurable, IndexState, 0, 0, [],
490499
case IsDurable of
491-
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
500+
true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
492501
MsgOnDiskFun, AsyncCallback, VHost);
493502
false -> undefined
494503
end,
495-
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
504+
msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
496505
AsyncCallback, VHost));
497506

498507
%% We can be recovering a transient queue if it crashed
@@ -502,7 +511,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
502511
VHost = QueueName#resource.virtual_host,
503512
{PersistentClient, ContainsCheckFun} =
504513
case IsDurable of
505-
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
514+
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
506515
MsgOnDiskFun, AsyncCallback,
507516
VHost),
508517
{C, fun (MsgId) when is_binary(MsgId) ->
@@ -512,14 +521,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
512521
end};
513522
false -> {undefined, fun(_MsgId) -> false end}
514523
end,
515-
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
524+
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
516525
undefined, AsyncCallback,
517526
VHost),
518527
{DeltaCount, DeltaBytes, IndexState} =
519528
rabbit_queue_index:recover(
520529
QueueName, RecoveryTerms,
521530
rabbit_msg_store_vhost_sup:successfully_recovered_state(
522-
?PERSISTENT_MSG_STORE, VHost),
531+
?PERSISTENT_MSG_STORE_SUP, VHost),
523532
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
524533
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
525534
PersistentClient, TransientClient).
@@ -1192,7 +1201,7 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
11921201
Callback, VHost).
11931202

11941203
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
1195-
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
1204+
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
11961205
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
11971206
fun () ->
11981207
Callback(?MODULE, CloseFDsFun)
@@ -2639,23 +2648,26 @@ multiple_routing_keys() ->
26392648

26402649
%% Assumes message store is not running
26412650
transform_storage(TransformFun) ->
2642-
transform_store(?PERSISTENT_MSG_STORE, TransformFun),
2643-
transform_store(?TRANSIENT_MSG_STORE, TransformFun).
2651+
transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
2652+
transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
26442653

26452654
transform_store(Store, TransformFun) ->
26462655
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
26472656
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
26482657

26492658
move_messages_to_vhost_store() ->
2650-
Queues = list_persistent_queues(),
2659+
Queues = rabbit_variable_queue:list_persistent_queues(),
26512660
% Maybe recover old store.
2652-
{RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
2653-
OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
2654-
NewStoreSup = start_new_store_sup(),
2655-
Migrations = spawn_for_each(fun(Queue) ->
2661+
{RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
2662+
OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
2663+
NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
2664+
lists:map(fun(Queue) ->
26562665
migrate_queue(Queue, OldStore, NewStoreSup)
26572666
end, Queues),
2658-
wait(Migrations),
2667+
% Migrations = spawn_for_each(fun(Queue) ->
2668+
% migrate_queue(Queue, OldStore, NewStoreSup)
2669+
% end, Queues),
2670+
% wait(Migrations),
26592671
delete_old_store(OldStore).
26602672

26612673
migrate_queue(Queue, OldStore, NewStoreSup) ->
@@ -2670,8 +2682,19 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
26702682
_ -> OldC
26712683
end
26722684
end,
2685+
OldStoreClient,
26732686
Queue).
26742687

2688+
walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
2689+
% WARNING: State is being recovered and terminated. This can cause side effects!
2690+
rabbit_queue_index:scan_queue_segments(
2691+
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
2692+
when is_binary(MsgId) ->
2693+
Fun(MsgId, ClientState);
2694+
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
2695+
ClientState
2696+
end, Client, QueueName).
2697+
26752698
spawn_for_each(Fun, List) ->
26762699
Ref = erlang:make_ref(),
26772700
Self = self(),
@@ -2725,7 +2748,8 @@ list_persistent_queues() ->
27252748
end).
27262749

27272750
start_recovery_terms(Queues) ->
2728-
{AllTerms, StartFunState} = rabbit_queue_index:start(Queues),
2751+
QueueNames = [Name || #amqqueue{name = Name} <- Queues],
2752+
{AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
27292753
Refs = [Ref || Terms <- AllTerms,
27302754
Terms /= non_clean_shutdown,
27312755
begin
@@ -2735,30 +2759,30 @@ start_recovery_terms(Queues) ->
27352759
{Refs, StartFunState}.
27362760

27372761
run_old_persistent_store(Refs, StartFunState) ->
2738-
OldStoreName = old_persistent_msg_store,
2762+
OldStoreName = ?PERSISTENT_MSG_STORE,
27392763
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
27402764
[OldStoreName, rabbit_mnesia:dir(),
27412765
Refs, StartFunState]),
27422766
OldStoreName.
27432767

2744-
run_persistent_store(Vhost) ->
2745-
2746-
2747-
?PERSISTENT_MSG_STORE.
2748-
27492768
start_new_store_sup() ->
27502769
% Start persistent store sup without recovery.
2751-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
2752-
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
2770+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
2771+
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
27532772
undefined, {fun (ok) -> finished end, ok}]),
2754-
?PERSISTENT_MSG_STORE.
2773+
?PERSISTENT_MSG_STORE_SUP.
27552774

27562775
delete_old_store(OldStore) ->
27572776
gen_server:stop(OldStore),
2758-
rabbit_file:recursive_delete(
2759-
filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])).
2760-
2777+
rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).
27612778

27622779

27632780

2781+
setup() ->
2782+
application:load(rabbit),
2783+
mnesia:start(),
2784+
rabbit_sup:start_link(),
2785+
rabbit:start_fhc(),
2786+
rabbit_sup:start_restartable_child(rabbit_guid),
2787+
rabbit_sup:start_supervisor_child(worker_pool_sup).
27642788

0 commit comments

Comments
 (0)