@@ -165,18 +165,27 @@ declare_policy_exactly(Config) ->
165
165
unset_location_config (Config ),
166
166
% Note:
167
167
% Node0 has 15 queues, Node1 has 8 and Node2 has 1
168
- Node1 = rabbit_ct_broker_helpers :get_node_config (Config , 1 , nodename ),
169
168
Policy = [{<<" queue-master-locator" >>, <<" min-masters" >>},
170
169
{<<" ha-mode" >>, <<" exactly" >>},
171
170
{<<" ha-params" >>, 2 }],
172
171
ok = rabbit_ct_broker_helpers :set_policy (Config , 0 , ? POLICY ,
173
172
<<" .*" >>, <<" queues" >>, Policy ),
174
- QueueName = rabbit_misc :r (<<" /" >>, queue , Q = <<" qm.test" >>),
175
- declare (Config , QueueName , false , false , _Args = [], none ),
176
- % Note: even though Node2 has the fewest masters, the "exactly" policy
177
- % chooses Node0 and Node1 as eligible, and then "min-masters"
178
- % chooses Node1
179
- verify_min_master (Config , Q , Node1 ).
173
+ QueueRec = rabbit_misc :r (<<" /" >>, queue , Q = <<" qm.test" >>),
174
+ declare (Config , QueueRec , false , false , _Args = [], none ),
175
+
176
+ Node0 = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
177
+ rabbit_ct_broker_helpers :control_action (sync_queue , Node0 ,
178
+ [binary_to_list (Q )], [{" -p" , " /" }]),
179
+ wait_for_sync (Config , Node0 , QueueRec , 1 ),
180
+
181
+ {ok , Queue } = rabbit_ct_broker_helpers :rpc (Config , Node0 ,
182
+ rabbit_amqqueue , lookup , [QueueRec ]),
183
+ ct :pal (" Queue after sync ~p~n " , [Queue ]),
184
+ {MNode , SNodes , SSNodes } = rabbit_ct_broker_helpers :rpc (Config , Node0 ,
185
+ rabbit_mirror_queue_misc ,
186
+ actual_queue_nodes , [Queue ]),
187
+ ct :pal (" MNode ~p SNodes ~p SSNodes ~p~n " , [MNode , SNodes , SSNodes ]),
188
+ verify_min_master (Config , Q , Node0 ).
180
189
181
190
declare_config (Config ) ->
182
191
setup_test_environment (Config ),
@@ -286,44 +295,66 @@ min_master_node(Config) ->
286
295
287
296
set_location_config (Config , Strategy ) ->
288
297
Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
289
- [ok = rpc :call (Node , application , set_env ,
290
- [rabbit , queue_master_locator , Strategy ]) || Node <- Nodes ],
298
+ [ok = rabbit_ct_broker_helpers :rpc (Config , Node ,
299
+ application , set_env ,
300
+ [rabbit , queue_master_locator , Strategy ]) || Node <- Nodes ],
291
301
ok .
292
302
293
303
unset_location_config (Config ) ->
294
304
Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
295
- [ok = rpc :call (Node , application , unset_env ,
296
- [rabbit , queue_master_locator ]) || Node <- Nodes ],
305
+ [ok = rabbit_ct_broker_helpers :rpc (Config , Node ,
306
+ application , unset_env ,
307
+ [rabbit , queue_master_locator ]) || Node <- Nodes ],
297
308
ok .
298
309
299
- declare (Config , QueueName , Durable , AutoDelete , Args , Owner ) ->
300
- Node = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
301
- {new , Queue } = rpc :call (Node , rabbit_amqqueue , declare ,
302
- [QueueName , Durable , AutoDelete , Args , Owner ]),
310
+ declare (Config , QueueName , Durable , AutoDelete , Args0 , Owner ) ->
311
+ Args1 = [QueueName , Durable , AutoDelete , Args0 , Owner ],
312
+ {new , Queue } = rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_amqqueue , declare , Args1 ),
303
313
Queue .
304
314
305
315
verify_min_master (Config , Q , MinMasterNode ) ->
306
- Node = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ) ,
307
- Rpc = rpc : call ( Node , rabbit_queue_master_location_misc ,
308
- lookup_master , [Q , ? DEFAULT_VHOST_PATH ]),
316
+ Rpc = rabbit_ct_broker_helpers :rpc (Config , 0 ,
317
+ rabbit_queue_master_location_misc ,
318
+ lookup_master , [Q , ? DEFAULT_VHOST_PATH ]),
309
319
? assertEqual ({ok , MinMasterNode }, Rpc ).
310
320
311
321
verify_min_master (Config , Q ) ->
312
322
MinMaster = min_master_node (Config ),
313
323
verify_min_master (Config , Q , MinMaster ).
314
324
315
325
verify_random (Config , Q ) ->
316
- [Node | _ ] = Nodes = rabbit_ct_broker_helpers :get_node_configs (Config ,
317
- nodename ) ,
318
- { ok , Master } = rpc : call ( Node , rabbit_queue_master_location_misc ,
319
- lookup_master , [Q , ? DEFAULT_VHOST_PATH ]),
326
+ [Node | _ ] = Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
327
+ { ok , Master } = rabbit_ct_broker_helpers : rpc ( Config , Node ,
328
+ rabbit_queue_master_location_misc ,
329
+ lookup_master , [Q , ? DEFAULT_VHOST_PATH ]),
320
330
? assert (lists :member (Master , Nodes )).
321
331
322
332
verify_client_local (Config , Q ) ->
323
333
Node = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
324
- ? assertEqual ({ok , Node }, rpc :call (Node , rabbit_queue_master_location_misc ,
325
- lookup_master , [Q , ? DEFAULT_VHOST_PATH ])).
334
+ Rpc = rabbit_ct_broker_helpers :rpc (Config , Node ,
335
+ rabbit_queue_master_location_misc ,
336
+ lookup_master , [Q , ? DEFAULT_VHOST_PATH ]),
337
+ ? assertEqual ({ok , Node }, Rpc ).
326
338
327
339
set_location_policy (Config , Name , Strategy ) ->
328
340
ok = rabbit_ct_broker_helpers :set_policy (Config , 0 ,
329
341
Name , <<" .*" >>, <<" queues" >>, [{<<" queue-master-locator" >>, Strategy }]).
342
+
343
+ wait_for_sync (Config , Nodename , Q , ExpectedSSPidLen ) ->
344
+ wait_for_sync (Config , Nodename , Q , ExpectedSSPidLen , 600 ).
345
+
346
+ wait_for_sync (_ , _ , _ , _ , 0 ) ->
347
+ throw (sync_timeout );
348
+ wait_for_sync (Config , Nodename , Q , ExpectedSSPidLen , N ) ->
349
+ case synced (Config , Nodename , Q , ExpectedSSPidLen ) of
350
+ true -> ok ;
351
+ false -> timer :sleep (100 ),
352
+ wait_for_sync (Config , Nodename , Q , ExpectedSSPidLen , N - 1 )
353
+ end .
354
+
355
+ synced (Config , Nodename , Q , ExpectedSSPidLen ) ->
356
+ Info = rabbit_ct_broker_helpers :rpc (Config , Nodename ,
357
+ rabbit_amqqueue , info_all , [<<" /" >>, [name , synchronised_slave_pids ]]),
358
+ ct :pal (" synced Info: ~p~n " , [Info ]),
359
+ [SSPids ] = [Pids || [{name , Q1 }, {synchronised_slave_pids , Pids }] <- Info , Q =:= Q1 ],
360
+ length (SSPids ) =:= ExpectedSSPidLen .
0 commit comments