@@ -180,7 +180,7 @@ is_compatible(_, _, _) ->
180
180
init (Q ) when ? is_amqqueue (Q ) ->
181
181
{ok , SoftLimit } = application :get_env (rabbit , quorum_commands_soft_limit ),
182
182
{Name , _ } = MaybeLeader = amqqueue :get_pid (Q ),
183
- Leader = case ra_leaderboard : lookup_leader ( Name ) of
183
+ Leader = case find_leader ( Q ) of
184
184
undefined ->
185
185
% % leader from queue record will have to suffice
186
186
MaybeLeader ;
@@ -1349,6 +1349,23 @@ shrink_all(Node) ->
1349
1349
case delete_member (Q , Node ) of
1350
1350
ok ->
1351
1351
{QName , {ok , Size - 1 }};
1352
+ {error , cluster_change_not_permitted } ->
1353
+ % % this could be timing related and due to a new leader just being
1354
+ % % elected but it's noop command not been committed yet.
1355
+ % % lets sleep and retry once
1356
+ rabbit_log :info (" ~ts : failed to remove member (replica) on node ~w "
1357
+ " as cluster change is not permitted. "
1358
+ " retrying once in 500ms" ,
1359
+ [rabbit_misc :rs (QName ), Node ]),
1360
+ timer :sleep (500 ),
1361
+ case delete_member (Q , Node ) of
1362
+ ok ->
1363
+ {QName , {ok , Size - 1 }};
1364
+ {error , Err } ->
1365
+ rabbit_log :warning (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1366
+ [rabbit_misc :rs (QName ), Node , Err ]),
1367
+ {QName , {error , Size , Err }}
1368
+ end ;
1352
1369
{error , Err } ->
1353
1370
rabbit_log :warning (" ~ts : failed to remove member (replica) on node ~w , error: ~w " ,
1354
1371
[rabbit_misc :rs (QName ), Node , Err ]),
@@ -1663,10 +1680,16 @@ open_files(Name) ->
1663
1680
end .
1664
1681
1665
1682
leader (Q ) when ? is_amqqueue (Q ) ->
1666
- {Name , Leader } = amqqueue :get_pid (Q ),
1667
- case is_process_alive (Name , Leader ) of
1668
- true -> Leader ;
1669
- false -> ''
1683
+ case find_leader (Q ) of
1684
+ undefined ->
1685
+ '' ;
1686
+ {Name , LeaderNode } ->
1687
+ case is_process_alive (Name , LeaderNode ) of
1688
+ true ->
1689
+ LeaderNode ;
1690
+ false ->
1691
+ ''
1692
+ end
1670
1693
end .
1671
1694
1672
1695
peek (Vhost , Queue , Pos ) ->
@@ -1742,12 +1765,6 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
1742
1765
{leader , LeaderNode },
1743
1766
{online , Online }].
1744
1767
1745
- is_process_alive (Name , Node ) ->
1746
- % % don't attempt rpc if node is not already connected
1747
- % % as this function is used for metrics and stats and the additional
1748
- % % latency isn't warranted
1749
- erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
1750
-
1751
1768
- spec quorum_messages (rabbit_amqqueue :name ()) -> non_neg_integer ().
1752
1769
1753
1770
quorum_messages (QName ) ->
@@ -1930,3 +1947,30 @@ wait_for_projections(Node, QName, N) ->
1930
1947
timer :sleep (100 ),
1931
1948
wait_for_projections (Node , QName , N - 1 )
1932
1949
end .
1950
+
1951
+ find_leader (Q ) when ? is_amqqueue (Q ) ->
1952
+ % % the get_pid field in the queue record is updated async after a leader
1953
+ % % change, so is likely to be the more stale than the leaderboard
1954
+ {Name , _Node } = MaybeLeader = amqqueue :get_pid (Q ),
1955
+ Leaders = case ra_leaderboard :lookup_leader (Name ) of
1956
+ undefined ->
1957
+ % % leader from queue record will have to suffice
1958
+ [MaybeLeader ];
1959
+ LikelyLeader ->
1960
+ [LikelyLeader , MaybeLeader ]
1961
+ end ,
1962
+ Nodes = [node () | nodes ()],
1963
+ case lists :search (fun ({_Nm , Nd }) ->
1964
+ lists :member (Nd , Nodes )
1965
+ end , Leaders ) of
1966
+ {value , Leader } ->
1967
+ Leader ;
1968
+ false ->
1969
+ undefined
1970
+ end .
1971
+
1972
+ is_process_alive (Name , Node ) ->
1973
+ % % don't attempt rpc if node is not already connected
1974
+ % % as this function is used for metrics and stats and the additional
1975
+ % % latency isn't warranted
1976
+ erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
0 commit comments