37
37
-export ([stop_vhost_msg_store /1 ]).
38
38
-include_lib (" stdlib/include/qlc.hrl" ).
39
39
40
+ -define (QUEUE_MIGRATION_BATCH_SIZE , 100 ).
41
+
40
42
% %----------------------------------------------------------------------------
41
43
% % Messages, and their position in the queue, can be in memory or on
42
44
% % disk, or both. Persistent messages will have both message and
351
353
% %----------------------------------------------------------------------------
352
354
353
355
-rabbit_upgrade ({multiple_routing_keys , local , []}).
354
- -rabbit_upgrade ({move_messages_to_vhost_store , queues , []}).
356
+ -rabbit_upgrade ({move_messages_to_vhost_store , message_store , []}).
355
357
356
358
-compile (export_all ).
357
359
@@ -2716,7 +2718,7 @@ move_messages_to_vhost_store() ->
2716
2718
rabbit_log :info (" Moving messages to per-vhost message store" ),
2717
2719
Queues = list_persistent_queues (),
2718
2720
% % Move the queue index for each persistent queue to the new store
2719
- lists :map (
2721
+ lists :foreach (
2720
2722
fun (Queue ) ->
2721
2723
# amqqueue {name = QueueName } = Queue ,
2722
2724
rabbit_queue_index :move_to_per_vhost_stores (QueueName )
@@ -2725,39 +2727,49 @@ move_messages_to_vhost_store() ->
2725
2727
% % Legacy (global) msg_store may require recovery.
2726
2728
% % This upgrade step should only be started
2727
2729
% % if we are upgrading from a pre-3.7.0 version.
2728
- {RecoveryTerms , StartFunState } = start_recovery_terms (Queues ),
2729
- OldStore = run_old_persistent_store (RecoveryTerms , StartFunState ),
2730
+ {QueuesWithTerms , RecoveryRefs , StartFunState } = start_recovery_terms (Queues ),
2731
+
2732
+ OldStore = run_old_persistent_store (RecoveryRefs , StartFunState ),
2730
2733
% % New store should not be recovered.
2731
2734
NewStoreSup = start_new_store_sup (),
2735
+ in_batches (? QUEUE_MIGRATION_BATCH_SIZE ,
2736
+ {rabbit_variable_queue , migrate_queue , [OldStore , NewStoreSup ]},
2737
+ QueuesWithTerms ),
2732
2738
2733
- % {ok, Gatherer} = gatherer:start_link(),
2734
- lists :map (
2735
- fun (Queue ) ->
2736
- migrate_queue (Queue , OldStore , NewStoreSup ),
2737
- # amqqueue {name = QueueName } = Queue ,
2738
- rabbit_log :info (" Queue migration finished ~p " , [QueueName ])
2739
- % ok = gatherer:fork(Gatherer),
2740
- % ok = worker_pool:submit_async(
2741
- % fun () ->
2742
- % migrate_queue(Queue, OldStore, NewStoreSup),
2743
- % gatherer:finish(Gatherer)
2744
- % end)
2745
- end ,
2746
- Queues ),
2747
- % empty = gatherer:out(Gatherer),
2748
- % ok = gatherer:stop(Gatherer),
2749
-
2739
+ rabbit_log :info (" Message store migration finished" ),
2750
2740
delete_old_store (OldStore ),
2751
2741
2752
2742
ok = rabbit_queue_index :stop (),
2753
- ok = rabbit_sup :stop_child (NewStoreSup ).
2743
+ ok = rabbit_sup :stop_child (NewStoreSup ),
2744
+ ok .
2754
2745
2755
- migrate_queue (Queue , OldStore , NewStoreSup ) ->
2756
- # amqqueue {name = QueueName } = Queue ,
2746
+ in_batches (Size , MFA , List ) ->
2747
+ in_batches (Size , 1 , MFA , List ).
2748
+
2749
+ in_batches (_ , _ , _ , []) -> ok ;
2750
+ in_batches (Size , BatchNum , MFA , List ) ->
2751
+ {Batch , Tail } = case Size > length (List ) of
2752
+ true -> {List , []};
2753
+ false -> lists :split (Size , List )
2754
+ end ,
2755
+ rabbit_log :info (" Migrating batch ~p of ~p queues ~n " , [BatchNum , Size ]),
2756
+ {M , F , A } = MFA ,
2757
+ Keys = [ rpc :async_call (node (), M , F , [El | A ]) || El <- Batch ],
2758
+ lists :foreach (fun (Key ) ->
2759
+ case rpc :yield (Key ) of
2760
+ {badrpc , Err } -> throw (Err );
2761
+ _ -> ok
2762
+ end
2763
+ end ,
2764
+ Keys ),
2765
+ rabbit_log :info (" Batch ~p of ~p queues migrated ~n " , [BatchNum , Size ]),
2766
+ in_batches (Size , BatchNum + 1 , MFA , Tail ).
2767
+
2768
+ migrate_queue ({QueueName , RecoveryTerm }, OldStore , NewStoreSup ) ->
2757
2769
rabbit_log :info (" Migrating messages in queue ~s in vhost ~s to per-vhost message store~n " ,
2758
2770
[QueueName # resource .name , QueueName # resource .virtual_host ]),
2759
2771
OldStoreClient = get_global_store_client (OldStore ),
2760
- NewStoreClient = get_per_vhost_store_client (Queue , NewStoreSup ),
2772
+ NewStoreClient = get_per_vhost_store_client (QueueName , NewStoreSup ),
2761
2773
% % WARNING: During scan_queue_segments queue index state is being recovered
2762
2774
% % and terminated. This can cause side effects!
2763
2775
rabbit_queue_index :scan_queue_segments (
@@ -2771,23 +2783,25 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
2771
2783
OldC
2772
2784
end ,
2773
2785
OldStoreClient ,
2774
- QueueName ).
2786
+ QueueName ),
2787
+ rabbit_msg_store :client_terminate (OldStoreClient ),
2788
+ rabbit_msg_store :client_terminate (NewStoreClient ),
2789
+ NewClientRef = rabbit_msg_store :client_ref (NewStoreClient ),
2790
+ NewRecoveryTerm = lists :keyreplace (persistent_ref , 1 , RecoveryTerm ,
2791
+ {persistent_ref , NewClientRef }),
2792
+ rabbit_queue_index :update_recovery_term (QueueName , NewRecoveryTerm ),
2793
+ rabbit_log :info (" Queue migration finished ~p " , [QueueName ]),
2794
+ {QueueName , NewClientRef }.
2775
2795
2776
2796
migrate_message (MsgId , OldC , NewC ) ->
2777
2797
case rabbit_msg_store :read (MsgId , OldC ) of
2778
2798
{{ok , Msg }, OldC1 } ->
2779
- case rabbit_msg_store :contains (MsgId , NewC ) of
2780
- false -> ok = rabbit_msg_store :write (MsgId , Msg , NewC );
2781
- true -> ok
2782
- end ,
2783
- % TODO: maybe remove in batches?
2784
- ok = rabbit_msg_store :remove ([MsgId ], OldC1 ),
2799
+ ok = rabbit_msg_store :write (MsgId , Msg , NewC ),
2785
2800
OldC1 ;
2786
2801
_ -> OldC
2787
2802
end .
2788
2803
2789
- get_per_vhost_store_client (# amqqueue {name = # resource {virtual_host = VHost }},
2790
- NewStoreSup ) ->
2804
+ get_per_vhost_store_client (# resource {virtual_host = VHost }, NewStoreSup ) ->
2791
2805
rabbit_msg_store_vhost_sup :client_init (NewStoreSup ,
2792
2806
rabbit_guid :gen (),
2793
2807
fun (_ ,_ ) -> ok end ,
@@ -2820,7 +2834,7 @@ start_recovery_terms(Queues) ->
2820
2834
Ref = proplists :get_value (persistent_ref , Terms ),
2821
2835
Ref =/= undefined
2822
2836
end ],
2823
- {Refs , StartFunState }.
2837
+ {lists : zip ( QueueNames , AllTerms ), Refs , StartFunState }.
2824
2838
2825
2839
run_old_persistent_store (Refs , StartFunState ) ->
2826
2840
OldStoreName = ? PERSISTENT_MSG_STORE ,
0 commit comments