@@ -582,6 +582,8 @@ transfer_leadership(Q, Destination) ->
582
582
% % Moves the primary replica (leader) of a classic mirrored queue to another node
583
583
% % which already hosts a replica of this queue. In this case we can stop
584
584
% % fewer replicas and reduce the load the operation has on the cluster.
585
+ % % Note that there is no guarantee that the queue will actually end up on the
586
+ % % destination node. The actual destination node is returned.
585
587
migrate_leadership_to_existing_replica (Q , Destination ) ->
586
588
QName = amqqueue :get_name (Q ),
587
589
{PreTransferPrimaryNode , PreTransferMirrorNodes , _PreTransferInSyncMirrorNodes } = actual_queue_nodes (Q ),
@@ -596,7 +598,7 @@ migrate_leadership_to_existing_replica(Q, Destination) ->
596
598
NodesToDropMirrorsOn = [PreTransferPrimaryNode ],
597
599
drop_mirrors (QName , NodesToDropMirrorsOn ),
598
600
599
- case wait_for_new_master (QName , Destination ) of
601
+ case wait_for_different_master (QName , PreTransferPrimaryNode ) of
600
602
not_migrated ->
601
603
{not_migrated , undefined };
602
604
{{not_migrated , Destination } = Result , _Q1 } ->
@@ -635,6 +637,36 @@ wait_for_new_master(QName, Destination, N) ->
635
637
end
636
638
end .
637
639
640
+ - spec wait_for_different_master (rabbit_amqqueue :name (), atom ()) -> {{migrated , node ()}, amqqueue :amqqueue ()} | {{not_migrated , node ()}, amqqueue :amqqueue ()} | not_migrated .
641
+ wait_for_different_master (QName , Source ) ->
642
+ wait_for_different_master (QName , Source , 100 ).
643
+
644
+ wait_for_different_master (QName , _ , 0 ) ->
645
+ case rabbit_amqqueue :lookup (QName ) of
646
+ {error , not_found } -> not_migrated ;
647
+ {ok , Q } -> {{not_migrated , undefined }, Q }
648
+ end ;
649
+ wait_for_different_master (QName , Source , N ) ->
650
+ case rabbit_amqqueue :lookup (QName ) of
651
+ {error , not_found } ->
652
+ not_migrated ;
653
+ {ok , Q } ->
654
+ case amqqueue :get_pid (Q ) of
655
+ none ->
656
+ timer :sleep (100 ),
657
+ wait_for_different_master (QName , Source , N - 1 );
658
+ Pid ->
659
+ case node (Pid ) of
660
+ Source ->
661
+ timer :sleep (100 ),
662
+ wait_for_different_master (QName , Source , N - 1 );
663
+ Destination ->
664
+ {{migrated , Destination }, Q }
665
+ end
666
+ end
667
+ end .
668
+
669
+
638
670
% % The arrival of a newly synced mirror may cause the master to die if
639
671
% % the policy does not want the master but it has been kept alive
640
672
% % because there were no synced mirrors.
0 commit comments