@@ -2689,17 +2689,44 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
2689
2689
OldStoreClient = get_old_client (OldStore ),
2690
2690
NewStoreClient = get_new_store_client (Queue , NewStoreSup ),
2691
2691
walk_queue_index (
2692
- fun (MessageIdInStore ) ->
2693
- Msg = get_msg_from_store (OldStoreClient ),
2694
- put_message_to_store (Msg , NewStoreClient )
2692
+ fun (MessageIdInStore , OldC ) ->
2693
+ case rabbit_msg_store :read (MessageIdInStore , OldStoreClient ) of
2694
+ {{ok , Msg }, OldC1 } ->
2695
+ ok = rabbit_msg_store :write (MessageIdInStore , Msg , NewStoreClient ),
2696
+ OldC1 ;
2697
+ _ -> OldC
2698
+ end
2695
2699
end ,
2696
2700
Queue ).
2697
2701
2702
+ spawn_for_each (Fun , List ) ->
2703
+ Ref = erlang :make_ref (),
2704
+ Self = self (),
2705
+ Processes = lists :map (
2706
+ fun (El ) ->
2707
+ spawn_link (
2708
+ fun () ->
2709
+ Fun (El ),
2710
+ Self ! {ok , self (), Ref }
2711
+ end )
2712
+ end ,
2713
+ List ),
2714
+ {Ref , Processes }.
2715
+
2716
+ wait ({Ref , Processes }) ->
2717
+ lists :foreach (
2718
+ fun (Proc ) ->
2719
+ receive {ok , Proc , Ref } -> ok
2720
+ end
2721
+ end ,
2722
+ Processes ).
2698
2723
2699
2724
get_new_store_client (Queue , NewStoreSup ) ->
2700
2725
Vhost = queue_vhost (Queue ),
2701
2726
get_new_client (NewStoreSup , Vhost ).
2702
2727
2728
+ queue_vhost (# amqqueue {name = # resource {virtual_host = VHost }}) -> VHost .
2729
+
2703
2730
get_new_client (NewStoreSup , VHost ) ->
2704
2731
rabbit_msg_store_vhost_sup :client_init (NewStoreSup ,
2705
2732
rabbit_guid :gen (),
@@ -2735,7 +2762,7 @@ start_recovery_terms(Queues) ->
2735
2762
{Refs , StartFunState }.
2736
2763
2737
2764
run_old_persistent_store (Refs , StartFunState ) ->
2738
- OldStoreName = old_persistent_msg_store .
2765
+ OldStoreName = old_persistent_msg_store ,
2739
2766
ok = rabbit_sup :start_child (OldStoreName , rabbit_msg_store ,
2740
2767
[OldStoreName , rabbit_mnesia :dir (),
2741
2768
Refs , StartFunState ]),
@@ -2753,3 +2780,12 @@ start_new_store_sup() ->
2753
2780
undefined , {fun (ok ) -> finished end , ok }]),
2754
2781
? PERSISTENT_MSG_STORE .
2755
2782
2783
+ delete_old_store (OldStore ) ->
2784
+ gen_server :stop (OldStore ),
2785
+ rabbit_file :recursive_delete (
2786
+ filename :join ([rabbit_mnesia :dir (), ? PERSISTENT_MSG_STORE ])).
2787
+
2788
+
2789
+
2790
+
2791
+
0 commit comments