@@ -193,18 +193,20 @@ start_cluster(Q) ->
193
193
{error , {too_long , N }} ->
194
194
rabbit_data_coercion :to_atom (ra :new_uid (N ))
195
195
end ,
196
- {Leader , Followers } = rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
197
- LeaderId = {RaName , Leader },
196
+ {LeaderNode , FollowerNodes } =
197
+ rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
198
+ LeaderId = {RaName , LeaderNode },
198
199
NewQ0 = amqqueue :set_pid (Q , LeaderId ),
199
- NewQ1 = amqqueue :set_type_state (NewQ0 , #{nodes => [Leader | Followers ]}),
200
+ NewQ1 = amqqueue :set_type_state (NewQ0 ,
201
+ #{nodes => [LeaderNode | FollowerNodes ]}),
200
202
201
203
rabbit_log :debug (" Will start up to ~w replicas for quorum ~ts with leader on node '~ts '" ,
202
- [QuorumSize , rabbit_misc :rs (QName ), Leader ]),
204
+ [QuorumSize , rabbit_misc :rs (QName ), LeaderNode ]),
203
205
case rabbit_amqqueue :internal_declare (NewQ1 , false ) of
204
206
{created , NewQ } ->
205
207
RaConfs = [make_ra_conf (NewQ , ServerId )
206
208
|| ServerId <- members (NewQ )],
207
- try erpc_call (Leader , ra , start_cluster ,
209
+ try erpc_call (LeaderNode , ra , start_cluster ,
208
210
[? RA_SYSTEM , RaConfs , ? START_CLUSTER_TIMEOUT ],
209
211
? START_CLUSTER_RPC_TIMEOUT ) of
210
212
{ok , _ , _ } ->
@@ -228,10 +230,10 @@ start_cluster(Q) ->
228
230
ActingUser }]),
229
231
{new , NewQ };
230
232
{error , Error } ->
231
- declare_queue_error (Error , NewQ , Leader , ActingUser )
233
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
232
234
catch
233
235
error :Error ->
234
- declare_queue_error (Error , NewQ , Leader , ActingUser )
236
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
235
237
end ;
236
238
{existing , _ } = Ex ->
237
239
Ex
@@ -321,26 +323,28 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
321
323
end .
322
324
323
325
become_leader (QName , Name ) ->
326
+ % % as this function is called synchronously when a ra node becomes leader
327
+ % % we need to ensure there is no chance of blocking as else the ra node
328
+ % % may not be able to establish its leadership
329
+ spawn (fun () -> become_leader0 (QName , Name ) end ).
330
+
331
+ become_leader0 (QName , Name ) ->
324
332
Fun = fun (Q1 ) ->
325
333
amqqueue :set_state (
326
334
amqqueue :set_pid (Q1 , {Name , node ()}),
327
335
live )
328
336
end ,
329
- % % as this function is called synchronously when a ra node becomes leader
330
- % % we need to ensure there is no chance of blocking as else the ra node
331
- % % may not be able to establish its leadership
332
- spawn (fun () ->
333
- _ = rabbit_amqqueue :update (QName , Fun ),
334
- case rabbit_amqqueue :lookup (QName ) of
335
- {ok , Q0 } when ? is_amqqueue (Q0 ) ->
336
- Nodes = get_nodes (Q0 ),
337
- [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
338
- [QName ], ? RPC_TIMEOUT )
339
- || Node <- Nodes , Node =/= node ()];
340
- _ ->
341
- ok
342
- end
343
- end ).
337
+ _ = rabbit_amqqueue :update (QName , Fun ),
338
+ case rabbit_amqqueue :lookup (QName ) of
339
+ {ok , Q0 } when ? is_amqqueue (Q0 ) ->
340
+ Nodes = get_nodes (Q0 ),
341
+ _ = [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
342
+ [QName ], ? RPC_TIMEOUT )
343
+ || Node <- Nodes , Node =/= node ()],
344
+ ok ;
345
+ _ ->
346
+ ok
347
+ end .
344
348
345
349
-spec all_replica_states () -> {node (), #{atom () => atom ()}}.
346
350
all_replica_states () ->
@@ -496,7 +500,7 @@ handle_tick(QName,
496
500
catch
497
501
_ :Err ->
498
502
rabbit_log :debug (" ~ts : handle tick failed with ~p " ,
499
- [rabbit_misc :rs (QName ), Err ]),
503
+ [rabbit_misc :rs (QName ), Err ]),
500
504
ok
501
505
end
502
506
end ).
@@ -512,7 +516,7 @@ repair_leader_record(QName, Self) ->
512
516
rabbit_log :debug (" ~ts : repairing leader record" ,
513
517
[rabbit_misc :rs (QName )]),
514
518
{_ , Name } = erlang :process_info (Self , registered_name ),
515
- become_leader (QName , Name ),
519
+ ok = become_leader0 (QName , Name ),
516
520
ok
517
521
end ,
518
522
ok .
@@ -579,7 +583,7 @@ recover(_Vhost, Queues) ->
579
583
Err1 == name_not_registered ->
580
584
rabbit_log :warning (" Quorum queue recovery: configured member of ~ts was not found on this node. Starting member as a new one. "
581
585
" Context: ~s " ,
582
- [rabbit_misc :rs (QName ), Err1 ]),
586
+ [rabbit_misc :rs (QName ), Err1 ]),
583
587
% queue was never started on this node
584
588
% so needs to be started from scratch.
585
589
case start_server (make_ra_conf (Q0 , ServerId )) of
0 commit comments