@@ -230,18 +230,23 @@ start_cluster(Q) ->
230
230
{error , {too_long , N }} ->
231
231
rabbit_data_coercion :to_atom (ra :new_uid (N ))
232
232
end ,
233
- {Leader , Followers } = rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
234
- LeaderId = {RaName , Leader },
233
+ {LeaderNode , FollowerNodes } =
234
+ rabbit_queue_location :select_leader_and_followers (Q , QuorumSize ),
235
+ LeaderId = {RaName , LeaderNode },
235
236
NewQ0 = amqqueue :set_pid (Q , LeaderId ),
236
- NewQ1 = amqqueue :set_type_state (NewQ0 , #{nodes => [Leader | Followers ]}),
237
+ NewQ1 = amqqueue :set_type_state (NewQ0 ,
238
+ #{nodes => [LeaderNode | FollowerNodes ]}),
237
239
238
240
rabbit_log :debug (" Will start up to ~w replicas for quorum ~ts with leader on node '~ts '" ,
239
- [QuorumSize , rabbit_misc :rs (QName ), Leader ]),
241
+ [QuorumSize , rabbit_misc :rs (QName ), LeaderNode ]),
240
242
case rabbit_amqqueue :internal_declare (NewQ1 , false ) of
241
243
{created , NewQ } ->
242
244
RaConfs = [make_ra_conf (NewQ , ServerId )
243
245
|| ServerId <- members (NewQ )],
244
- try erpc_call (Leader , ra , start_cluster ,
246
+
247
+ % % khepri projections on remote nodes are eventually consistent
248
+ wait_for_projections (LeaderNode , QName ),
249
+ try erpc_call (LeaderNode , ra , start_cluster ,
245
250
[? RA_SYSTEM , RaConfs , ? START_CLUSTER_TIMEOUT ],
246
251
? START_CLUSTER_RPC_TIMEOUT ) of
247
252
{ok , _ , _ } ->
@@ -266,10 +271,10 @@ start_cluster(Q) ->
266
271
ActingUser }]),
267
272
{new , NewQ };
268
273
{error , Error } ->
269
- declare_queue_error (Error , NewQ , Leader , ActingUser )
274
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
270
275
catch
271
276
error :Error ->
272
- declare_queue_error (Error , NewQ , Leader , ActingUser )
277
+ declare_queue_error (Error , NewQ , LeaderNode , ActingUser )
273
278
end ;
274
279
{existing , _ } = Ex ->
275
280
Ex
@@ -359,26 +364,28 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
359
364
end .
360
365
361
366
become_leader (QName , Name ) ->
367
+ % % as this function is called synchronously when a ra node becomes leader
368
+ % % we need to ensure there is no chance of blocking as else the ra node
369
+ % % may not be able to establish its leadership
370
+ spawn (fun () -> become_leader0 (QName , Name ) end ).
371
+
372
+ become_leader0 (QName , Name ) ->
362
373
Fun = fun (Q1 ) ->
363
374
amqqueue :set_state (
364
375
amqqueue :set_pid (Q1 , {Name , node ()}),
365
376
live )
366
377
end ,
367
- % % as this function is called synchronously when a ra node becomes leader
368
- % % we need to ensure there is no chance of blocking as else the ra node
369
- % % may not be able to establish its leadership
370
- spawn (fun () ->
371
- _ = rabbit_amqqueue :update (QName , Fun ),
372
- case rabbit_amqqueue :lookup (QName ) of
373
- {ok , Q0 } when ? is_amqqueue (Q0 ) ->
374
- Nodes = get_nodes (Q0 ),
375
- [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
376
- [QName ], ? RPC_TIMEOUT )
377
- || Node <- Nodes , Node =/= node ()];
378
- _ ->
379
- ok
380
- end
381
- end ).
378
+ _ = rabbit_amqqueue :update (QName , Fun ),
379
+ case rabbit_amqqueue :lookup (QName ) of
380
+ {ok , Q0 } when ? is_amqqueue (Q0 ) ->
381
+ Nodes = get_nodes (Q0 ),
382
+ _ = [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
383
+ [QName ], ? RPC_TIMEOUT )
384
+ || Node <- Nodes , Node =/= node ()],
385
+ ok ;
386
+ _ ->
387
+ ok
388
+ end .
382
389
383
390
-spec all_replica_states () -> {node (), #{atom () => atom ()}}.
384
391
all_replica_states () ->
@@ -550,7 +557,7 @@ handle_tick(QName,
550
557
catch
551
558
_ :Err ->
552
559
rabbit_log :debug (" ~ts : handle tick failed with ~p " ,
553
- [rabbit_misc :rs (QName ), Err ]),
560
+ [rabbit_misc :rs (QName ), Err ]),
554
561
ok
555
562
end
556
563
end ).
@@ -566,7 +573,7 @@ repair_leader_record(QName, Self) ->
566
573
rabbit_log :debug (" ~ts : repairing leader record" ,
567
574
[rabbit_misc :rs (QName )]),
568
575
{_ , Name } = erlang :process_info (Self , registered_name ),
569
- become_leader (QName , Name ),
576
+ ok = become_leader0 (QName , Name ),
570
577
ok
571
578
end ,
572
579
ok .
@@ -633,7 +640,7 @@ recover(_Vhost, Queues) ->
633
640
Err1 == name_not_registered ->
634
641
rabbit_log :warning (" Quorum queue recovery: configured member of ~ts was not found on this node. Starting member as a new one. "
635
642
" Context: ~s " ,
636
- [rabbit_misc :rs (QName ), Err1 ]),
643
+ [rabbit_misc :rs (QName ), Err1 ]),
637
644
% queue was never started on this node
638
645
% so needs to be started from scratch.
639
646
case start_server (make_ra_conf (Q0 , ServerId )) of
@@ -1901,3 +1908,23 @@ force_all_queues_shrink_member_to_current_member() ->
1901
1908
is_minority (All , Up ) ->
1902
1909
MinQuorum = length (All ) div 2 + 1 ,
1903
1910
length (Up ) < MinQuorum .
1911
+
1912
+ wait_for_projections (Node , QName ) ->
1913
+ case rabbit_feature_flags :is_enabled (khepri_db ) andalso
1914
+ Node =/= node () of
1915
+ true ->
1916
+ wait_for_projections (Node , QName , 256 );
1917
+ false ->
1918
+ ok
1919
+ end .
1920
+
1921
+ wait_for_projections (Node , QName , 0 ) ->
1922
+ exit ({wait_for_projections_timed_out , Node , QName });
1923
+ wait_for_projections (Node , QName , N ) ->
1924
+ case erpc_call (Node , rabbit_amqqueue , lookup , [QName ], 100 ) of
1925
+ {ok , _ } ->
1926
+ ok ;
1927
+ _ ->
1928
+ timer :sleep (100 ),
1929
+ wait_for_projections (Node , QName , N - 1 )
1930
+ end .
0 commit comments