21
21
22
22
-compile (export_all ).
23
23
24
- -define (PERSISTENT_MSG_STORE , msg_store_persistent_vhost ).
25
- -define (TRANSIENT_MSG_STORE , msg_store_transient_vhost ).
24
+ -define (PERSISTENT_MSG_STORE , msg_store_persistent ).
25
+ -define (TRANSIENT_MSG_STORE , msg_store_transient ).
26
26
27
27
-define (TIMEOUT , 30000 ).
28
+ -define (VHOST , <<" /" >>).
28
29
29
30
-define (VARIABLE_QUEUE_TESTCASES , [
30
31
variable_queue_dynamic_duration_change ,
@@ -253,9 +254,9 @@ msg_store1(_Config) ->
253
254
MSCState4 = msg_store_read (MsgIds2ndHalf , MSCState3 ),
254
255
ok = rabbit_msg_store :client_terminate (MSCState4 ),
255
256
% % stop and restart, preserving every other msg in 2nd half
256
- ok = rabbit_variable_queue :stop_msg_store (),
257
- ok = rabbit_variable_queue :start_msg_store (
258
- #{} , {fun ([]) -> finished ;
257
+ ok = rabbit_variable_queue :stop_msg_store (? VHOST ),
258
+ ok = rabbit_variable_queue :start_msg_store (? VHOST ,
259
+ [] , {fun ([]) -> finished ;
259
260
([MsgId |MsgIdsTail ])
260
261
when length (MsgIdsTail ) rem 2 == 0 ->
261
262
{MsgId , 1 , MsgIdsTail };
@@ -330,8 +331,8 @@ msg_store1(_Config) ->
330
331
passed .
331
332
332
333
restart_msg_store_empty () ->
333
- ok = rabbit_variable_queue :stop_msg_store (),
334
- ok = rabbit_variable_queue :start_msg_store (
334
+ ok = rabbit_variable_queue :stop_msg_store (? VHOST ),
335
+ ok = rabbit_variable_queue :start_msg_store (? VHOST ,
335
336
undefined , {fun (ok ) -> finished end , ok }).
336
337
337
338
msg_id_bin (X ) ->
@@ -376,10 +377,10 @@ on_disk_stop(Pid) ->
376
377
377
378
msg_store_client_init_capture (MsgStore , Ref ) ->
378
379
Pid = spawn (fun on_disk_capture /0 ),
379
- {Pid , rabbit_msg_store_vhost_sup :client_init (
380
- MsgStore , Ref , fun (MsgIds , _ActionTaken ) ->
381
- Pid ! {on_disk , MsgIds }
382
- end , undefined , << " / " >> )}.
380
+ {Pid , rabbit_vhost_msg_store :client_init (? VHOST , MsgStore , Ref ,
381
+ fun (MsgIds , _ActionTaken ) ->
382
+ Pid ! {on_disk , MsgIds }
383
+ end , undefined )}.
383
384
384
385
msg_store_contains (Atom , MsgIds , MSCState ) ->
385
386
Atom = lists :foldl (
@@ -456,14 +457,16 @@ test_msg_store_confirm_timer() ->
456
457
Ref = rabbit_guid :gen (),
457
458
MsgId = msg_id_bin (1 ),
458
459
Self = self (),
459
- MSCState = rabbit_msg_store_vhost_sup :client_init (
460
- ? PERSISTENT_MSG_STORE , Ref ,
461
- fun (MsgIds , _ActionTaken ) ->
462
- case gb_sets :is_member (MsgId , MsgIds ) of
463
- true -> Self ! on_disk ;
464
- false -> ok
465
- end
466
- end , undefined , <<" /" >>),
460
+ MSCState = rabbit_vhost_msg_store :client_init (
461
+ ? VHOST ,
462
+ ? PERSISTENT_MSG_STORE ,
463
+ Ref ,
464
+ fun (MsgIds , _ActionTaken ) ->
465
+ case gb_sets :is_member (MsgId , MsgIds ) of
466
+ true -> Self ! on_disk ;
467
+ false -> ok
468
+ end
469
+ end , undefined ),
467
470
ok = msg_store_write ([MsgId ], MSCState ),
468
471
ok = msg_store_keep_busy_until_confirm ([msg_id_bin (2 )], MSCState , false ),
469
472
ok = msg_store_remove ([MsgId ], MSCState ),
@@ -651,8 +654,8 @@ bq_queue_index1(_Config) ->
651
654
Qi8
652
655
end ),
653
656
654
- ok = rabbit_variable_queue :stop (),
655
- {ok , _ } = rabbit_variable_queue :start ([]),
657
+ ok = rabbit_variable_queue :stop (? VHOST ),
658
+ {ok , _ } = rabbit_variable_queue :start (? VHOST , []),
656
659
657
660
passed .
658
661
@@ -672,8 +675,8 @@ bq_queue_index_props1(_Config) ->
672
675
Qi2
673
676
end ),
674
677
675
- ok = rabbit_variable_queue :stop (),
676
- {ok , _ } = rabbit_variable_queue :start ([]),
678
+ ok = rabbit_variable_queue :stop (? VHOST ),
679
+ {ok , _ } = rabbit_variable_queue :start (? VHOST , []),
677
680
678
681
passed .
679
682
@@ -718,16 +721,16 @@ bq_queue_recover1(Config) ->
718
721
true , false , [], none , <<" acting-user" >>),
719
722
publish_and_confirm (Q , <<>>, Count ),
720
723
721
- SupPid = rabbit_ct_broker_helpers :get_queue_sup_pid (QPid ),
724
+ SupPid = rabbit_ct_broker_helpers :get_queue_sup_pid (Q ),
722
725
true = is_pid (SupPid ),
723
726
exit (SupPid , kill ),
724
727
exit (QPid , kill ),
725
728
MRef = erlang :monitor (process , QPid ),
726
729
receive {'DOWN' , MRef , process , QPid , _Info } -> ok
727
730
after 10000 -> exit (timeout_waiting_for_queue_death )
728
731
end ,
729
- rabbit_amqqueue :stop (),
730
- rabbit_amqqueue :start (rabbit_amqqueue :recover ()),
732
+ rabbit_amqqueue :stop (? VHOST ),
733
+ rabbit_amqqueue :start (rabbit_amqqueue :recover (? VHOST )),
731
734
{ok , Limiter } = rabbit_limiter :start_link (no_id ),
732
735
rabbit_amqqueue :with_or_die (
733
736
QName ,
@@ -1275,14 +1278,14 @@ init_test_queue(QName) ->
1275
1278
Res .
1276
1279
1277
1280
restart_test_queue (Qi , QName ) ->
1278
- _ = rabbit_queue_index :terminate ([], Qi ),
1279
- ok = rabbit_variable_queue :stop (),
1280
- {ok , _ } = rabbit_variable_queue :start ([QName ]),
1281
+ _ = rabbit_queue_index :terminate (? VHOST , [], Qi ),
1282
+ ok = rabbit_variable_queue :stop (? VHOST ),
1283
+ {ok , _ } = rabbit_variable_queue :start (? VHOST , [QName ]),
1281
1284
init_test_queue (QName ).
1282
1285
1283
1286
empty_test_queue (QName ) ->
1284
- ok = rabbit_variable_queue :stop (),
1285
- {ok , _ } = rabbit_variable_queue :start ([]),
1287
+ ok = rabbit_variable_queue :stop (? VHOST ),
1288
+ {ok , _ } = rabbit_variable_queue :start (? VHOST , []),
1286
1289
{0 , 0 , Qi } = init_test_queue (QName ),
1287
1290
_ = rabbit_queue_index :delete_and_terminate (Qi ),
1288
1291
ok .
@@ -1337,7 +1340,7 @@ nop(_) -> ok.
1337
1340
nop (_ , _ ) -> ok .
1338
1341
1339
1342
msg_store_client_init (MsgStore , Ref ) ->
1340
- rabbit_msg_store_vhost_sup :client_init (MsgStore , Ref , undefined , undefined , << " / " >> ).
1343
+ rabbit_vhost_msg_store :client_init (? VHOST , MsgStore , Ref , undefined , undefined ).
1341
1344
1342
1345
variable_queue_init (Q , Recover ) ->
1343
1346
rabbit_variable_queue :init (
0 commit comments