@@ -602,6 +602,8 @@ transfer_leadership(Q, Destination) ->
602
602
% % Moves the primary replica (leader) of a classic mirrored queue to another node
603
603
% % which already hosts a replica of this queue. In this case we can stop
604
604
% % fewer replicas and reduce the load the operation has on the cluster.
605
+ % % Note that there is no guarantee that the queue will actually end up on the
606
+ % % destination node. The actual destination node is returned.
605
607
migrate_leadership_to_existing_replica (Q , Destination ) ->
606
608
QName = amqqueue :get_name (Q ),
607
609
{PreTransferPrimaryNode , PreTransferMirrorNodes , _PreTransferInSyncMirrorNodes } = actual_queue_nodes (Q ),
@@ -616,7 +618,7 @@ migrate_leadership_to_existing_replica(Q, Destination) ->
616
618
NodesToDropMirrorsOn = [PreTransferPrimaryNode ],
617
619
drop_mirrors (QName , NodesToDropMirrorsOn ),
618
620
619
- case wait_for_new_master (QName , Destination ) of
621
+ case wait_for_different_master (QName , PreTransferPrimaryNode ) of
620
622
not_migrated ->
621
623
{not_migrated , undefined };
622
624
{{not_migrated , Destination } = Result , _Q1 } ->
@@ -655,6 +657,36 @@ wait_for_new_master(QName, Destination, N) ->
655
657
end
656
658
end .
657
659
660
+ - spec wait_for_different_master (rabbit_amqqueue :name (), atom ()) -> {{migrated , node ()}, amqqueue :amqqueue ()} | {{not_migrated , node ()}, amqqueue :amqqueue ()} | not_migrated .
661
+ wait_for_different_master (QName , Source ) ->
662
+ wait_for_different_master (QName , Source , 100 ).
663
+
664
+ wait_for_different_master (QName , _ , 0 ) ->
665
+ case rabbit_amqqueue :lookup (QName ) of
666
+ {error , not_found } -> not_migrated ;
667
+ {ok , Q } -> {{not_migrated , undefined }, Q }
668
+ end ;
669
+ wait_for_different_master (QName , Source , N ) ->
670
+ case rabbit_amqqueue :lookup (QName ) of
671
+ {error , not_found } ->
672
+ not_migrated ;
673
+ {ok , Q } ->
674
+ case amqqueue :get_pid (Q ) of
675
+ none ->
676
+ timer :sleep (100 ),
677
+ wait_for_different_master (QName , Source , N - 1 );
678
+ Pid ->
679
+ case node (Pid ) of
680
+ Source ->
681
+ timer :sleep (100 ),
682
+ wait_for_different_master (QName , Source , N - 1 );
683
+ Destination ->
684
+ {{migrated , Destination }, Q }
685
+ end
686
+ end
687
+ end .
688
+
689
+
658
690
% % The arrival of a newly synced mirror may cause the master to die if
659
691
% % the policy does not want the master but it has been kept alive
660
692
% % because there were no synced mirrors.
0 commit comments