@@ -2648,16 +2648,19 @@ transform_store(Store, TransformFun) ->
2648
2648
2649
2649
move_messages_to_vhost_store () ->
2650
2650
Queues = list_persistent_queues (),
2651
- OldStore = run_old_persistent_store (),
2651
+ % Maybe recover old store.
2652
+ {RecoveryTerms , StartFunState } = start_recovery_terms (Queues ),
2653
+ OldStore = run_old_persistent_store (RecoveryTerms , StartFunState ),
2654
+ NewStoreSup = start_new_store_sup (),
2652
2655
Migrations = spawn_for_each (fun (Queue ) ->
2653
- migrate_queue (Queue , OldStore )
2656
+ migrate_queue (Queue , OldStore , NewStoreSup )
2654
2657
end , Queues ),
2655
2658
wait (Migrations ),
2656
2659
delete_old_store (OldStore ).
2657
2660
2658
- migrate_queue (Queue , OldStore ) ->
2659
- OldStoreClient = get_client (OldStore ),
2660
- NewStoreClient = get_new_store_client (Queue ),
2661
+ migrate_queue (Queue , OldStore , NewStoreSup ) ->
2662
+ OldStoreClient = get_old_client (OldStore ),
2663
+ NewStoreClient = get_new_store_client (Queue , NewStoreSup ),
2661
2664
walk_queue_index (
2662
2665
fun (MessageIdInStore ) ->
2663
2666
Msg = get_msg_from_store (OldStoreClient ),
@@ -2666,11 +2669,22 @@ migrate_queue(Queue, OldStore) ->
2666
2669
Queue ).
2667
2670
2668
2671
2669
- get_new_store_client (Queue ) ->
2672
+ get_new_store_client (Queue , NewStoreSup ) ->
2670
2673
Vhost = queue_vhost (Queue ),
2671
- Store = run_persistent_store (Vhost ),
2672
- get_client (Store ).
2674
+ get_new_client (NewStoreSup , Vhost ).
2673
2675
2676
+ get_new_client (NewStoreSup , VHost ) ->
2677
+ rabbit_msg_store_vhost_sup :client_init (NewStoreSup ,
2678
+ rabbit_guid :gen (),
2679
+ fun (_ ,_ ) -> ok end ,
2680
+ fun () -> ok end ,
2681
+ VHost ).
2682
+
2683
+ get_old_client (OldStore ) ->
2684
+ rabbit_msg_store :client_init (OldStore ,
2685
+ rabbit_guid :gen (),
2686
+ fun (_ ,_ ) -> ok end ,
2687
+ fun () -> ok end ).
2674
2688
2675
2689
list_persistent_queues () ->
2676
2690
Node = node (),
@@ -2683,3 +2697,32 @@ list_persistent_queues() ->
2683
2697
mnesia :read (rabbit_queue , Name , read ) =:= []]))
2684
2698
end ).
2685
2699
2700
+ start_recovery_terms (Queues ) ->
2701
+ {AllTerms , StartFunState } = rabbit_queue_index :start (Queues ),
2702
+ Refs = [Ref || Terms <- AllTerms ,
2703
+ Terms /= non_clean_shutdown ,
2704
+ begin
2705
+ Ref = proplists :get_value (persistent_ref , Terms ),
2706
+ Ref =/= undefined
2707
+ end ],
2708
+ {Refs , StartFunState }.
2709
+
2710
+ run_old_persistent_store (Refs , StartFunState ) ->
2711
+ OldStoreName = old_persistent_msg_store .
2712
+ ok = rabbit_sup :start_child (OldStoreName , rabbit_msg_store ,
2713
+ [OldStoreName , rabbit_mnesia :dir (),
2714
+ Refs , StartFunState ]),
2715
+ OldStoreName .
2716
+
2717
+ run_persistent_store (Vhost ) ->
2718
+
2719
+
2720
+ ? PERSISTENT_MSG_STORE .
2721
+
2722
+ start_new_store_sup () ->
2723
+ % Start persistent store sup without recovery.
2724
+ ok = rabbit_sup :start_child (? PERSISTENT_MSG_STORE , rabbit_msg_store_vhost_sup ,
2725
+ [? PERSISTENT_MSG_STORE , rabbit_mnesia :dir (),
2726
+ undefined , {fun (ok ) -> finished end , ok }]),
2727
+ ? PERSISTENT_MSG_STORE .
2728
+
0 commit comments