@@ -2675,16 +2675,19 @@ transform_store(Store, TransformFun) ->
2675
2675
2676
2676
move_messages_to_vhost_store () ->
2677
2677
Queues = list_persistent_queues (),
2678
- OldStore = run_old_persistent_store (),
2678
+ % Maybe recover old store.
2679
+ {RecoveryTerms , StartFunState } = start_recovery_terms (Queues ),
2680
+ OldStore = run_old_persistent_store (RecoveryTerms , StartFunState ),
2681
+ NewStoreSup = start_new_store_sup (),
2679
2682
Migrations = spawn_for_each (fun (Queue ) ->
2680
- migrate_queue (Queue , OldStore )
2683
+ migrate_queue (Queue , OldStore , NewStoreSup )
2681
2684
end , Queues ),
2682
2685
wait (Migrations ),
2683
2686
delete_old_store (OldStore ).
2684
2687
2685
- migrate_queue (Queue , OldStore ) ->
2686
- OldStoreClient = get_client (OldStore ),
2687
- NewStoreClient = get_new_store_client (Queue ),
2688
+ migrate_queue (Queue , OldStore , NewStoreSup ) ->
2689
+ OldStoreClient = get_old_client (OldStore ),
2690
+ NewStoreClient = get_new_store_client (Queue , NewStoreSup ),
2688
2691
walk_queue_index (
2689
2692
fun (MessageIdInStore ) ->
2690
2693
Msg = get_msg_from_store (OldStoreClient ),
@@ -2693,11 +2696,22 @@ migrate_queue(Queue, OldStore) ->
2693
2696
Queue ).
2694
2697
2695
2698
2696
- get_new_store_client (Queue ) ->
2699
+ get_new_store_client (Queue , NewStoreSup ) ->
2697
2700
Vhost = queue_vhost (Queue ),
2698
- Store = run_persistent_store (Vhost ),
2699
- get_client (Store ).
2701
+ get_new_client (NewStoreSup , Vhost ).
2700
2702
2703
+ get_new_client (NewStoreSup , VHost ) ->
2704
+ rabbit_msg_store_vhost_sup :client_init (NewStoreSup ,
2705
+ rabbit_guid :gen (),
2706
+ fun (_ ,_ ) -> ok end ,
2707
+ fun () -> ok end ,
2708
+ VHost ).
2709
+
2710
+ get_old_client (OldStore ) ->
2711
+ rabbit_msg_store :client_init (OldStore ,
2712
+ rabbit_guid :gen (),
2713
+ fun (_ ,_ ) -> ok end ,
2714
+ fun () -> ok end ).
2701
2715
2702
2716
list_persistent_queues () ->
2703
2717
Node = node (),
@@ -2710,3 +2724,32 @@ list_persistent_queues() ->
2710
2724
mnesia :read (rabbit_queue , Name , read ) =:= []]))
2711
2725
end ).
2712
2726
2727
+ start_recovery_terms (Queues ) ->
2728
+ {AllTerms , StartFunState } = rabbit_queue_index :start (Queues ),
2729
+ Refs = [Ref || Terms <- AllTerms ,
2730
+ Terms /= non_clean_shutdown ,
2731
+ begin
2732
+ Ref = proplists :get_value (persistent_ref , Terms ),
2733
+ Ref =/= undefined
2734
+ end ],
2735
+ {Refs , StartFunState }.
2736
+
2737
+ run_old_persistent_store (Refs , StartFunState ) ->
2738
+ OldStoreName = old_persistent_msg_store .
2739
+ ok = rabbit_sup :start_child (OldStoreName , rabbit_msg_store ,
2740
+ [OldStoreName , rabbit_mnesia :dir (),
2741
+ Refs , StartFunState ]),
2742
+ OldStoreName .
2743
+
2744
+ run_persistent_store (Vhost ) ->
2745
+
2746
+
2747
+ ? PERSISTENT_MSG_STORE .
2748
+
2749
+ start_new_store_sup () ->
2750
+ % Start persistent store sup without recovery.
2751
+ ok = rabbit_sup :start_child (? PERSISTENT_MSG_STORE , rabbit_msg_store_vhost_sup ,
2752
+ [? PERSISTENT_MSG_STORE , rabbit_mnesia :dir (),
2753
+ undefined , {fun (ok ) -> finished end , ok }]),
2754
+ ? PERSISTENT_MSG_STORE .
2755
+
0 commit comments