Skip to content

Commit 38b51ac

Browse files
author
Daniil Fedotov
committed
Updated tests to support per-vhost message store
1 parent caef1f0 commit 38b51ac

File tree

2 files changed

+34
-18
lines changed

2 files changed

+34
-18
lines changed

test/channel_operation_timeout_test_queue.erl

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,21 @@ stop() ->
230230
ok = rabbit_queue_index:stop().
231231

232232
start_msg_store(Refs, StartFunState) ->
233-
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
233+
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
234234
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
235235
undefined, {fun (ok) -> finished end, ok}]),
236-
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
236+
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
237237
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
238-
Refs, StartFunState]).
238+
Refs, StartFunState]),
239+
%% Start message store for all known vhosts
240+
VHosts = rabbit_vhost:list(),
241+
lists:foreach(
242+
fun(VHost) ->
243+
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
244+
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
245+
end,
246+
VHosts),
247+
ok.
239248

240249
stop_msg_store() ->
241250
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
@@ -254,22 +263,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
254263
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
255264
IndexState = rabbit_queue_index:init(QueueName,
256265
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
266+
VHost = QueueName#resource.virtual_host,
257267
init(IsDurable, IndexState, 0, 0, [],
258268
case IsDurable of
259269
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
260-
MsgOnDiskFun, AsyncCallback);
270+
MsgOnDiskFun, AsyncCallback,
271+
VHost);
261272
false -> undefined
262273
end,
263-
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
274+
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
264275

265276
%% We can be recovering a transient queue if it crashed
266277
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
267278
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
268279
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
280+
VHost = QueueName#resource.virtual_host,
269281
{PersistentClient, ContainsCheckFun} =
270282
case IsDurable of
271283
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
272-
MsgOnDiskFun, AsyncCallback),
284+
MsgOnDiskFun, AsyncCallback,
285+
VHost),
273286
{C, fun (MsgId) when is_binary(MsgId) ->
274287
rabbit_msg_store:contains(MsgId, C);
275288
(#basic_message{is_persistent = Persistent}) ->
@@ -278,11 +291,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
278291
false -> {undefined, fun(_MsgId) -> false end}
279292
end,
280293
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
281-
undefined, AsyncCallback),
294+
undefined, AsyncCallback,
295+
VHost),
282296
{DeltaCount, DeltaBytes, IndexState} =
283297
rabbit_queue_index:recover(
284298
QueueName, RecoveryTerms,
285-
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
299+
rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE),
286300
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
287301
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
288302
PersistentClient, TransientClient).
@@ -957,14 +971,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
957971
end),
958972
Res.
959973

960-
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
974+
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
961975
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
962-
Callback).
976+
Callback, VHost).
963977

964-
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
978+
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
965979
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
966-
rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
967-
fun () -> Callback(?MODULE, CloseFDsFun) end).
980+
rabbit_msg_store_vhost_sup:client_init(
981+
MsgStore, Ref, MsgOnDiskFun,
982+
fun () -> Callback(?MODULE, CloseFDsFun) end,
983+
VHost).
968984

969985
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
970986
with_immutable_msg_store_state(

test/unit_inbroker_SUITE.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,10 +468,10 @@ on_disk_stop(Pid) ->
468468

469469
msg_store_client_init_capture(MsgStore, Ref) ->
470470
Pid = spawn(fun on_disk_capture/0),
471-
{Pid, rabbit_msg_store:client_init(
471+
{Pid, rabbit_msg_store_vhost_sup:client_init(
472472
MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
473473
Pid ! {on_disk, MsgIds}
474-
end, undefined)}.
474+
end, undefined, <<"/">>)}.
475475

476476
msg_store_contains(Atom, MsgIds, MSCState) ->
477477
Atom = lists:foldl(
@@ -548,14 +548,14 @@ test_msg_store_confirm_timer() ->
548548
Ref = rabbit_guid:gen(),
549549
MsgId = msg_id_bin(1),
550550
Self = self(),
551-
MSCState = rabbit_msg_store:client_init(
551+
MSCState = rabbit_msg_store_vhost_sup:client_init(
552552
?PERSISTENT_MSG_STORE, Ref,
553553
fun (MsgIds, _ActionTaken) ->
554554
case gb_sets:is_member(MsgId, MsgIds) of
555555
true -> Self ! on_disk;
556556
false -> ok
557557
end
558-
end, undefined),
558+
end, undefined, <<"/">>),
559559
ok = msg_store_write([MsgId], MSCState),
560560
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
561561
ok = msg_store_remove([MsgId], MSCState),
@@ -1424,7 +1424,7 @@ nop(_) -> ok.
14241424
nop(_, _) -> ok.
14251425

14261426
msg_store_client_init(MsgStore, Ref) ->
1427-
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
1427+
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
14281428

14291429
variable_queue_init(Q, Recover) ->
14301430
rabbit_variable_queue:init(

0 commit comments

Comments
 (0)