@@ -2662,17 +2662,44 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
2662
2662
OldStoreClient = get_old_client (OldStore ),
2663
2663
NewStoreClient = get_new_store_client (Queue , NewStoreSup ),
2664
2664
walk_queue_index (
2665
- fun (MessageIdInStore ) ->
2666
- Msg = get_msg_from_store (OldStoreClient ),
2667
- put_message_to_store (Msg , NewStoreClient )
2665
+ fun (MessageIdInStore , OldC ) ->
2666
+ case rabbit_msg_store :read (MessageIdInStore , OldStoreClient ) of
2667
+ {{ok , Msg }, OldC1 } ->
2668
+ ok = rabbit_msg_store :write (MessageIdInStore , Msg , NewStoreClient ),
2669
+ OldC1 ;
2670
+ _ -> OldC
2671
+ end
2668
2672
end ,
2669
2673
Queue ).
2670
2674
2675
+ spawn_for_each (Fun , List ) ->
2676
+ Ref = erlang :make_ref (),
2677
+ Self = self (),
2678
+ Processes = lists :map (
2679
+ fun (El ) ->
2680
+ spawn_link (
2681
+ fun () ->
2682
+ Fun (El ),
2683
+ Self ! {ok , self (), Ref }
2684
+ end )
2685
+ end ,
2686
+ List ),
2687
+ {Ref , Processes }.
2688
+
2689
+ wait ({Ref , Processes }) ->
2690
+ lists :foreach (
2691
+ fun (Proc ) ->
2692
+ receive {ok , Proc , Ref } -> ok
2693
+ end
2694
+ end ,
2695
+ Processes ).
2671
2696
2672
2697
get_new_store_client (Queue , NewStoreSup ) ->
2673
2698
Vhost = queue_vhost (Queue ),
2674
2699
get_new_client (NewStoreSup , Vhost ).
2675
2700
2701
+ queue_vhost (# amqqueue {name = # resource {virtual_host = VHost }}) -> VHost .
2702
+
2676
2703
get_new_client (NewStoreSup , VHost ) ->
2677
2704
rabbit_msg_store_vhost_sup :client_init (NewStoreSup ,
2678
2705
rabbit_guid :gen (),
@@ -2708,7 +2735,7 @@ start_recovery_terms(Queues) ->
2708
2735
{Refs , StartFunState }.
2709
2736
2710
2737
run_old_persistent_store (Refs , StartFunState ) ->
2711
- OldStoreName = old_persistent_msg_store .
2738
+ OldStoreName = old_persistent_msg_store ,
2712
2739
ok = rabbit_sup :start_child (OldStoreName , rabbit_msg_store ,
2713
2740
[OldStoreName , rabbit_mnesia :dir (),
2714
2741
Refs , StartFunState ]),
@@ -2726,3 +2753,12 @@ start_new_store_sup() ->
2726
2753
undefined , {fun (ok ) -> finished end , ok }]),
2727
2754
? PERSISTENT_MSG_STORE .
2728
2755
2756
+ delete_old_store (OldStore ) ->
2757
+ gen_server :stop (OldStore ),
2758
+ rabbit_file :recursive_delete (
2759
+ filename :join ([rabbit_mnesia :dir (), ? PERSISTENT_MSG_STORE ])).
2760
+
2761
+
2762
+
2763
+
2764
+
0 commit comments