@@ -234,12 +234,21 @@ stop() ->
234
234
ok = rabbit_queue_index :stop ().
235
235
236
236
start_msg_store (Refs , StartFunState ) ->
237
- ok = rabbit_sup :start_child (? TRANSIENT_MSG_STORE , rabbit_msg_store ,
237
+ ok = rabbit_sup :start_child (? TRANSIENT_MSG_STORE , rabbit_msg_store_vhost_sup ,
238
238
[? TRANSIENT_MSG_STORE , rabbit_mnesia :dir (),
239
239
undefined , {fun (ok ) -> finished end , ok }]),
240
- ok = rabbit_sup :start_child (? PERSISTENT_MSG_STORE , rabbit_msg_store ,
240
+ ok = rabbit_sup :start_child (? PERSISTENT_MSG_STORE , rabbit_msg_store_vhost_sup ,
241
241
[? PERSISTENT_MSG_STORE , rabbit_mnesia :dir (),
242
- Refs , StartFunState ]).
242
+ Refs , StartFunState ]),
243
+ % % Start message store for all known vhosts
244
+ VHosts = rabbit_vhost :list (),
245
+ lists :foreach (
246
+ fun (VHost ) ->
247
+ rabbit_msg_store_vhost_sup :add_vhost (? TRANSIENT_MSG_STORE , VHost ),
248
+ rabbit_msg_store_vhost_sup :add_vhost (? PERSISTENT_MSG_STORE , VHost )
249
+ end ,
250
+ VHosts ),
251
+ ok .
243
252
244
253
stop_msg_store () ->
245
254
ok = rabbit_sup :stop_child (? PERSISTENT_MSG_STORE ),
@@ -258,22 +267,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
258
267
AsyncCallback , MsgOnDiskFun , MsgIdxOnDiskFun , MsgAndIdxOnDiskFun ) ->
259
268
IndexState = rabbit_queue_index :init (QueueName ,
260
269
MsgIdxOnDiskFun , MsgAndIdxOnDiskFun ),
270
+ VHost = QueueName # resource .virtual_host ,
261
271
init (IsDurable , IndexState , 0 , 0 , [],
262
272
case IsDurable of
263
273
true -> msg_store_client_init (? PERSISTENT_MSG_STORE ,
264
- MsgOnDiskFun , AsyncCallback );
274
+ MsgOnDiskFun , AsyncCallback ,
275
+ VHost );
265
276
false -> undefined
266
277
end ,
267
- msg_store_client_init (? TRANSIENT_MSG_STORE , undefined , AsyncCallback ));
278
+ msg_store_client_init (? TRANSIENT_MSG_STORE , undefined , AsyncCallback , VHost ));
268
279
269
280
% % We can be recovering a transient queue if it crashed
270
281
init (# amqqueue { name = QueueName , durable = IsDurable }, Terms ,
271
282
AsyncCallback , MsgOnDiskFun , MsgIdxOnDiskFun , MsgAndIdxOnDiskFun ) ->
272
283
{PRef , RecoveryTerms } = process_recovery_terms (Terms ),
284
+ VHost = QueueName # resource .virtual_host ,
273
285
{PersistentClient , ContainsCheckFun } =
274
286
case IsDurable of
275
287
true -> C = msg_store_client_init (? PERSISTENT_MSG_STORE , PRef ,
276
- MsgOnDiskFun , AsyncCallback ),
288
+ MsgOnDiskFun , AsyncCallback ,
289
+ VHost ),
277
290
{C , fun (MsgId ) when is_binary (MsgId ) ->
278
291
rabbit_msg_store :contains (MsgId , C );
279
292
(# basic_message {is_persistent = Persistent }) ->
@@ -282,11 +295,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
282
295
false -> {undefined , fun (_MsgId ) -> false end }
283
296
end ,
284
297
TransientClient = msg_store_client_init (? TRANSIENT_MSG_STORE ,
285
- undefined , AsyncCallback ),
298
+ undefined , AsyncCallback ,
299
+ VHost ),
286
300
{DeltaCount , DeltaBytes , IndexState } =
287
301
rabbit_queue_index :recover (
288
302
QueueName , RecoveryTerms ,
289
- rabbit_msg_store :successfully_recovered_state (? PERSISTENT_MSG_STORE ),
303
+ rabbit_msg_store_vhost_sup :successfully_recovered_state (? PERSISTENT_MSG_STORE ),
290
304
ContainsCheckFun , MsgIdxOnDiskFun , MsgAndIdxOnDiskFun ),
291
305
init (IsDurable , IndexState , DeltaCount , DeltaBytes , RecoveryTerms ,
292
306
PersistentClient , TransientClient ).
@@ -961,14 +975,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
961
975
end ),
962
976
Res .
963
977
964
- msg_store_client_init (MsgStore , MsgOnDiskFun , Callback ) ->
978
+ msg_store_client_init (MsgStore , MsgOnDiskFun , Callback , VHost ) ->
965
979
msg_store_client_init (MsgStore , rabbit_guid :gen (), MsgOnDiskFun ,
966
- Callback ).
980
+ Callback , VHost ).
967
981
968
- msg_store_client_init (MsgStore , Ref , MsgOnDiskFun , Callback ) ->
982
+ msg_store_client_init (MsgStore , Ref , MsgOnDiskFun , Callback , VHost ) ->
969
983
CloseFDsFun = msg_store_close_fds_fun (MsgStore =:= ? PERSISTENT_MSG_STORE ),
970
- rabbit_msg_store :client_init (MsgStore , Ref , MsgOnDiskFun ,
971
- fun () -> Callback (? MODULE , CloseFDsFun ) end ).
984
+ rabbit_msg_store_vhost_sup :client_init (
985
+ MsgStore , Ref , MsgOnDiskFun ,
986
+ fun () -> Callback (? MODULE , CloseFDsFun ) end ,
987
+ VHost ).
972
988
973
989
msg_store_write (MSCState , IsPersistent , MsgId , Msg ) ->
974
990
with_immutable_msg_store_state (
0 commit comments