@@ -180,7 +180,7 @@ is_compatible(_, _, _) ->
180
180
init (Q ) when ? is_amqqueue (Q ) ->
181
181
{ok , SoftLimit } = application :get_env (rabbit , quorum_commands_soft_limit ),
182
182
{Name , _ } = MaybeLeader = amqqueue :get_pid (Q ),
183
- Leader = case ra_leaderboard : lookup_leader ( Name ) of
183
+ Leader = case find_leader ( Q ) of
184
184
undefined ->
185
185
% % leader from queue record will have to suffice
186
186
MaybeLeader ;
@@ -1663,10 +1663,16 @@ open_files(Name) ->
1663
1663
end .
1664
1664
1665
1665
leader (Q ) when ? is_amqqueue (Q ) ->
1666
- {Name , Leader } = amqqueue :get_pid (Q ),
1667
- case is_process_alive (Name , Leader ) of
1668
- true -> Leader ;
1669
- false -> ''
1666
+ case find_leader (Q ) of
1667
+ undefined ->
1668
+ '' ;
1669
+ {Name , LeaderNode } ->
1670
+ case is_process_alive (Name , LeaderNode ) of
1671
+ true ->
1672
+ LeaderNode ;
1673
+ false ->
1674
+ ''
1675
+ end
1670
1676
end .
1671
1677
1672
1678
peek (Vhost , Queue , Pos ) ->
@@ -1742,12 +1748,6 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
1742
1748
{leader , LeaderNode },
1743
1749
{online , Online }].
1744
1750
1745
- is_process_alive (Name , Node ) ->
1746
- % % don't attempt rpc if node is not already connected
1747
- % % as this function is used for metrics and stats and the additional
1748
- % % latency isn't warranted
1749
- erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
1750
-
1751
1751
- spec quorum_messages (rabbit_amqqueue :name ()) -> non_neg_integer ().
1752
1752
1753
1753
quorum_messages (QName ) ->
@@ -1930,3 +1930,30 @@ wait_for_projections(Node, QName, N) ->
1930
1930
timer :sleep (100 ),
1931
1931
wait_for_projections (Node , QName , N - 1 )
1932
1932
end .
1933
+
1934
+ find_leader (Q ) when ? is_amqqueue (Q ) ->
1935
+ % % the get_pid field in the queue record is updated async after a leader
1936
+ % % change, so is likely to be the more stale than the leaderboard
1937
+ {Name , _Node } = MaybeLeader = amqqueue :get_pid (Q ),
1938
+ Leaders = case ra_leaderboard :lookup_leader (Name ) of
1939
+ undefined ->
1940
+ % % leader from queue record will have to suffice
1941
+ [MaybeLeader ];
1942
+ LikelyLeader ->
1943
+ [LikelyLeader , MaybeLeader ]
1944
+ end ,
1945
+ Nodes = [node () | nodes ()],
1946
+ case lists :search (fun ({_Nm , Nd }) ->
1947
+ lists :member (Nd , Nodes )
1948
+ end , Leaders ) of
1949
+ {value , Leader } ->
1950
+ Leader ;
1951
+ false ->
1952
+ undefined
1953
+ end .
1954
+
1955
+ is_process_alive (Name , Node ) ->
1956
+ % % don't attempt rpc if node is not already connected
1957
+ % % as this function is used for metrics and stats and the additional
1958
+ % % latency isn't warranted
1959
+ erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
0 commit comments