@@ -193,18 +193,23 @@ start_cluster(Q) ->
193
193
{error , {too_long , N }} ->
194
194
rabbit_data_coercion :to_atom (ra :new_uid (N ))
195
195
end ,
196
- {Leader , Followers } = rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
197
- LeaderId = {RaName , Leader },
196
+ {LeaderNode , FollowerNodes } =
197
+ rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
198
+ LeaderId = {RaName , LeaderNode },
198
199
NewQ0 = amqqueue :set_pid (Q , LeaderId ),
199
- NewQ1 = amqqueue :set_type_state (NewQ0 , #{nodes => [Leader | Followers ]}),
200
+ NewQ1 = amqqueue :set_type_state (NewQ0 ,
201
+ #{nodes => [LeaderNode | FollowerNodes ]}),
200
202
201
203
rabbit_log :debug (" Will start up to ~w replicas for quorum ~ts with leader on node '~ts '" ,
202
- [QuorumSize , rabbit_misc :rs (QName ), Leader ]),
204
+ [QuorumSize , rabbit_misc :rs (QName ), LeaderNode ]),
203
205
case rabbit_amqqueue :internal_declare (NewQ1 , false ) of
204
206
{created , NewQ } ->
205
207
RaConfs = [make_ra_conf (NewQ , ServerId )
206
208
|| ServerId <- members (NewQ )],
207
- try erpc_call (Leader , ra , start_cluster ,
209
+
210
+ % % khepri projections on remote nodes are eventually consistent
211
+ wait_for_projections (LeaderNode , QName ),
212
+ try erpc_call (LeaderNode , ra , start_cluster ,
208
213
[? RA_SYSTEM , RaConfs , ? START_CLUSTER_TIMEOUT ],
209
214
? START_CLUSTER_RPC_TIMEOUT ) of
210
215
{ok , _ , _ } ->
@@ -228,10 +233,10 @@ start_cluster(Q) ->
228
233
ActingUser }]),
229
234
{new , NewQ };
230
235
{error , Error } ->
231
- declare_queue_error (Error , NewQ , Leader , ActingUser )
236
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
232
237
catch
233
238
error :Error ->
234
- declare_queue_error (Error , NewQ , Leader , ActingUser )
239
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
235
240
end ;
236
241
{existing , _ } = Ex ->
237
242
Ex
@@ -321,26 +326,28 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
321
326
end .
322
327
323
328
become_leader (QName , Name ) ->
329
+ % % as this function is called synchronously when a ra node becomes leader
330
+ % % we need to ensure there is no chance of blocking as else the ra node
331
+ % % may not be able to establish its leadership
332
+ spawn (fun () -> become_leader0 (QName , Name ) end ).
333
+
334
+ become_leader0 (QName , Name ) ->
324
335
Fun = fun (Q1 ) ->
325
336
amqqueue :set_state (
326
337
amqqueue :set_pid (Q1 , {Name , node ()}),
327
338
live )
328
339
end ,
329
- % % as this function is called synchronously when a ra node becomes leader
330
- % % we need to ensure there is no chance of blocking as else the ra node
331
- % % may not be able to establish its leadership
332
- spawn (fun () ->
333
- _ = rabbit_amqqueue :update (QName , Fun ),
334
- case rabbit_amqqueue :lookup (QName ) of
335
- {ok , Q0 } when ? is_amqqueue (Q0 ) ->
336
- Nodes = get_nodes (Q0 ),
337
- [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
338
- [QName ], ? RPC_TIMEOUT )
339
- || Node <- Nodes , Node =/= node ()];
340
- _ ->
341
- ok
342
- end
343
- end ).
340
+ _ = rabbit_amqqueue :update (QName , Fun ),
341
+ case rabbit_amqqueue :lookup (QName ) of
342
+ {ok , Q0 } when ? is_amqqueue (Q0 ) ->
343
+ Nodes = get_nodes (Q0 ),
344
+ _ = [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
345
+ [QName ], ? RPC_TIMEOUT )
346
+ || Node <- Nodes , Node =/= node ()],
347
+ ok ;
348
+ _ ->
349
+ ok
350
+ end .
344
351
345
352
-spec all_replica_states () -> {node (), #{atom () => atom ()}}.
346
353
all_replica_states () ->
@@ -496,7 +503,7 @@ handle_tick(QName,
496
503
catch
497
504
_ :Err ->
498
505
rabbit_log :debug (" ~ts : handle tick failed with ~p " ,
499
- [rabbit_misc :rs (QName ), Err ]),
506
+ [rabbit_misc :rs (QName ), Err ]),
500
507
ok
501
508
end
502
509
end ).
@@ -512,7 +519,7 @@ repair_leader_record(QName, Self) ->
512
519
rabbit_log :debug (" ~ts : repairing leader record" ,
513
520
[rabbit_misc :rs (QName )]),
514
521
{_ , Name } = erlang :process_info (Self , registered_name ),
515
- become_leader (QName , Name ),
522
+ ok = become_leader0 (QName , Name ),
516
523
ok
517
524
end ,
518
525
ok .
@@ -579,7 +586,7 @@ recover(_Vhost, Queues) ->
579
586
Err1 == name_not_registered ->
580
587
rabbit_log :warning (" Quorum queue recovery: configured member of ~ts was not found on this node. Starting member as a new one. "
581
588
" Context: ~s " ,
582
- [rabbit_misc :rs (QName ), Err1 ]),
589
+ [rabbit_misc :rs (QName ), Err1 ]),
583
590
% queue was never started on this node
584
591
% so needs to be started from scratch.
585
592
case start_server (make_ra_conf (Q0 , ServerId )) of
0 commit comments