Skip to content

Commit 492e23b

Browse files
author
Daniil Fedotov
committed
wip: migrating to vhost base messge store
1 parent b3fc71a commit 492e23b

File tree

3 files changed

+70
-46
lines changed

3 files changed

+70
-46
lines changed

src/rabbit_msg_store.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
718718

719719
Dir = filename:join(BaseDir, atom_to_list(Server)),
720720

721-
{ok, IndexModule} = application:get_env(msg_store_index_module),
721+
{ok, IndexModule} = application:get_env(rabbit,msg_store_index_module),
722722
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
723723

724724
AttemptFileSummaryRecovery =
@@ -758,7 +758,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
758758
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
759759
[set, public, {keypos, #dying_client.client_ref}]),
760760

761-
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
761+
{ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit),
762762

763763
{ok, GCPid} = rabbit_msg_store_gc:start_link(
764764
#gc_state { dir = Dir,

src/rabbit_queue_index.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
2424

2525
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
26+
-export([scan_queue_segments/3]).
2627

2728
-define(CLEAN_FILENAME, "clean.dot").
2829

@@ -660,20 +661,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
660661
end.
661662

662663
queue_index_walker_reader(QueueName, Gatherer) ->
663-
State = blank_state(QueueName),
664-
ok = scan_segments(
664+
ok = scan_queue_segments(
665665
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
666666
when is_binary(MsgId) ->
667667
gatherer:sync_in(Gatherer, {MsgId, 1});
668668
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
669669
_IsAcked, Acc) ->
670670
Acc
671-
end, ok, State),
671+
end, ok, QueueName),
672672
ok = gatherer:finish(Gatherer).
673673

674-
scan_segments(Fun, Acc, State) ->
675-
State1 = #qistate { segments = Segments, dir = Dir } =
676-
recover_journal(State),
674+
scan_queue_segments(Fun, Acc, QueueName) ->
675+
State = #qistate { segments = Segments, dir = Dir } =
676+
recover_journal(blank_state(QueueName)),
677677
Result = lists:foldr(
678678
fun (Seg, AccN) ->
679679
segment_entries_foldr(
@@ -682,8 +682,8 @@ scan_segments(Fun, Acc, State) ->
682682
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
683683
IsPersistent, IsDelivered, IsAcked, AccM)
684684
end, AccN, segment_find_or_new(Seg, Dir, Segments))
685-
end, Acc, all_segment_nums(State1)),
686-
{_SegmentCounts, _State} = terminate(State1),
685+
end, Acc, all_segment_nums(State)),
686+
{_SegmentCounts, _State} = terminate(State),
687687
Result.
688688

689689
%%----------------------------------------------------------------------------

src/rabbit_variable_queue.erl

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
%% exported for testing only
3434
-export([start_msg_store/2, stop_msg_store/0, init/6]).
3535

36+
-export([move_messages_to_vhost_store/0]).
37+
-include_lib("stdlib/include/qlc.hrl").
38+
3639
%%----------------------------------------------------------------------------
3740
%% Messages, and their position in the queue, can be in memory or on
3841
%% disk, or both. Persistent messages will have both message and
@@ -334,8 +337,11 @@
334337
}).
335338

336339
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
340+
-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
341+
-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
337342
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
338343
-define(TRANSIENT_MSG_STORE, msg_store_transient).
344+
339345
-define(QUEUE, lqueue).
340346

341347
-include("rabbit.hrl").
@@ -344,7 +350,8 @@
344350
%%----------------------------------------------------------------------------
345351

346352
-rabbit_upgrade({multiple_routing_keys, local, []}).
347-
-rabbit_upgrade({move_messages_to_vhost_store, local, []}).
353+
% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
354+
-compile(export_all).
348355

349356
-type seq_id() :: non_neg_integer().
350357

@@ -452,6 +459,8 @@
452459
%% Public API
453460
%%----------------------------------------------------------------------------
454461

462+
463+
455464
start(DurableQueues) ->
456465
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
457466
start_msg_store(
@@ -470,23 +479,23 @@ stop() ->
470479

471480
start_msg_store(Refs, StartFunState) ->
472481
VHosts = rabbit_vhost:list(),
473-
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
474-
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
482+
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
483+
[?TRANSIENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
475484
undefined, {fun (ok) -> finished end, ok}]),
476-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
477-
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
485+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
486+
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
478487
Refs, StartFunState]),
479488
lists:foreach(
480489
fun(VHost) ->
481-
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
482-
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
490+
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
491+
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost)
483492
end,
484493
VHosts),
485494
ok.
486495

487496
stop_msg_store() ->
488-
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
489-
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
497+
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
498+
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
490499

491500
init(Queue, Recover, Callback) ->
492501
init(
@@ -504,11 +513,11 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
504513
VHost = QueueName#resource.virtual_host,
505514
init(IsDurable, IndexState, 0, 0, [],
506515
case IsDurable of
507-
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
516+
true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
508517
MsgOnDiskFun, AsyncCallback, VHost);
509518
false -> undefined
510519
end,
511-
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
520+
msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
512521
AsyncCallback, VHost));
513522

514523
%% We can be recovering a transient queue if it crashed
@@ -518,7 +527,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
518527
VHost = QueueName#resource.virtual_host,
519528
{PersistentClient, ContainsCheckFun} =
520529
case IsDurable of
521-
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
530+
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
522531
MsgOnDiskFun, AsyncCallback,
523532
VHost),
524533
{C, fun (MsgId) when is_binary(MsgId) ->
@@ -528,14 +537,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
528537
end};
529538
false -> {undefined, fun(_MsgId) -> false end}
530539
end,
531-
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
540+
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
532541
undefined, AsyncCallback,
533542
VHost),
534543
{DeltaCount, DeltaBytes, IndexState} =
535544
rabbit_queue_index:recover(
536545
QueueName, RecoveryTerms,
537546
rabbit_msg_store_vhost_sup:successfully_recovered_state(
538-
?PERSISTENT_MSG_STORE, VHost),
547+
?PERSISTENT_MSG_STORE_SUP, VHost),
539548
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
540549
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
541550
PersistentClient, TransientClient).
@@ -1208,7 +1217,7 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
12081217
Callback, VHost).
12091218

12101219
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
1211-
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
1220+
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
12121221
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
12131222
fun () ->
12141223
Callback(?MODULE, CloseFDsFun)
@@ -2666,23 +2675,26 @@ multiple_routing_keys() ->
26662675

26672676
%% Assumes message store is not running
26682677
transform_storage(TransformFun) ->
2669-
transform_store(?PERSISTENT_MSG_STORE, TransformFun),
2670-
transform_store(?TRANSIENT_MSG_STORE, TransformFun).
2678+
transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
2679+
transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
26712680

26722681
transform_store(Store, TransformFun) ->
26732682
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
26742683
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
26752684

26762685
move_messages_to_vhost_store() ->
2677-
Queues = list_persistent_queues(),
2686+
Queues = rabbit_variable_queue:list_persistent_queues(),
26782687
% Maybe recover old store.
2679-
{RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
2680-
OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
2681-
NewStoreSup = start_new_store_sup(),
2682-
Migrations = spawn_for_each(fun(Queue) ->
2688+
{RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
2689+
OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
2690+
NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
2691+
lists:map(fun(Queue) ->
26832692
migrate_queue(Queue, OldStore, NewStoreSup)
26842693
end, Queues),
2685-
wait(Migrations),
2694+
% Migrations = spawn_for_each(fun(Queue) ->
2695+
% migrate_queue(Queue, OldStore, NewStoreSup)
2696+
% end, Queues),
2697+
% wait(Migrations),
26862698
delete_old_store(OldStore).
26872699

26882700
migrate_queue(Queue, OldStore, NewStoreSup) ->
@@ -2697,8 +2709,19 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
26972709
_ -> OldC
26982710
end
26992711
end,
2712+
OldStoreClient,
27002713
Queue).
27012714

2715+
walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
2716+
% WARNING: State is being recovered and terminated. This can cause side effects!
2717+
rabbit_queue_index:scan_queue_segments(
2718+
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
2719+
when is_binary(MsgId) ->
2720+
Fun(MsgId, ClientState);
2721+
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
2722+
ClientState
2723+
end, Client, QueueName).
2724+
27022725
spawn_for_each(Fun, List) ->
27032726
Ref = erlang:make_ref(),
27042727
Self = self(),
@@ -2752,7 +2775,8 @@ list_persistent_queues() ->
27522775
end).
27532776

27542777
start_recovery_terms(Queues) ->
2755-
{AllTerms, StartFunState} = rabbit_queue_index:start(Queues),
2778+
QueueNames = [Name || #amqqueue{name = Name} <- Queues],
2779+
{AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
27562780
Refs = [Ref || Terms <- AllTerms,
27572781
Terms /= non_clean_shutdown,
27582782
begin
@@ -2762,30 +2786,30 @@ start_recovery_terms(Queues) ->
27622786
{Refs, StartFunState}.
27632787

27642788
run_old_persistent_store(Refs, StartFunState) ->
2765-
OldStoreName = old_persistent_msg_store,
2789+
OldStoreName = ?PERSISTENT_MSG_STORE,
27662790
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
27672791
[OldStoreName, rabbit_mnesia:dir(),
27682792
Refs, StartFunState]),
27692793
OldStoreName.
27702794

2771-
run_persistent_store(Vhost) ->
2772-
2773-
2774-
?PERSISTENT_MSG_STORE.
2775-
27762795
start_new_store_sup() ->
27772796
% Start persistent store sup without recovery.
2778-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
2779-
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
2797+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
2798+
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
27802799
undefined, {fun (ok) -> finished end, ok}]),
2781-
?PERSISTENT_MSG_STORE.
2800+
?PERSISTENT_MSG_STORE_SUP.
27822801

27832802
delete_old_store(OldStore) ->
27842803
gen_server:stop(OldStore),
2785-
rabbit_file:recursive_delete(
2786-
filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])).
2787-
2804+
rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).
27882805

27892806

27902807

2808+
setup() ->
2809+
application:load(rabbit),
2810+
mnesia:start(),
2811+
rabbit_sup:start_link(),
2812+
rabbit:start_fhc(),
2813+
rabbit_sup:start_restartable_child(rabbit_guid),
2814+
rabbit_sup:start_supervisor_child(worker_pool_sup).
27912815

0 commit comments

Comments
 (0)