Skip to content

Commit 1cdfeb2

Browse files
committed
Correct types when using msg store clients with module tag
variable queue state stores msg store clients with message store module tags
1 parent 7f90953 commit 1cdfeb2

File tree

2 files changed

+27
-21
lines changed

2 files changed

+27
-21
lines changed

src/rabbit_variable_queue.erl

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,18 +1261,19 @@ trim_msg_status(MsgStatus) ->
12611261
queue_index -> MsgStatus
12621262
end.
12631263

1264-
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
1265-
{Result, MSCStateP1} = Fun(MSCStateP),
1266-
{Result, {MSCStateP1, MSCStateT}};
1267-
with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
1268-
{Result, MSCStateT1} = Fun(MSCStateT),
1269-
{Result, {MSCStateP, MSCStateT1}}.
1264+
with_msg_store_state({{Mod, MSCStatePInternal}, MSCStateT}, true, Fun) ->
1265+
{Result, MSCStatePInternal1} = Fun(Mod, MSCStatePInternal),
1266+
{Result, {{Mod, MSCStatePInternal1}, MSCStateT}};
1267+
with_msg_store_state({MSCStateP, {Mod, MSCStateTInternal}}, false, Fun) ->
1268+
{Result, MSCStateTInternal1} = Fun(Mod, MSCStateTInternal),
1269+
{Result, {MSCStateP, {Mod, MSCStateTInternal1}}}.
12701270

12711271
with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
1272-
{Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
1273-
fun (MSCState1) ->
1274-
{Fun(MSCState1), MSCState1}
1275-
end),
1272+
{Res, MSCState} = with_msg_store_state(
1273+
MSCState, IsPersistent,
1274+
fun (Mod, MSCState1) ->
1275+
{Fun(Mod, MSCState1), MSCState1}
1276+
end),
12761277
Res.
12771278

12781279
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
@@ -1290,28 +1291,30 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
12901291
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
12911292
with_immutable_msg_store_state(
12921293
MSCState, IsPersistent,
1293-
fun ({Mod, MSCState1}) ->
1294-
Mod:write_flow(MsgId, Msg, MSCState1)
1294+
fun (Mod, MSCStateInternal) ->
1295+
Mod:write_flow(MsgId, Msg, MSCStateInternal)
12951296
end).
12961297

12971298
msg_store_read(MSCState, IsPersistent, MsgId) ->
12981299
with_msg_store_state(
12991300
MSCState, IsPersistent,
1300-
fun ({Mod, MSCState1}) ->
1301-
Mod:read(MsgId, MSCState1)
1301+
fun (Mod, MSCStateInternal) ->
1302+
Mod:read(MsgId, MSCStateInternal)
13021303
end).
13031304

13041305
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
13051306
with_immutable_msg_store_state(
13061307
MSCState, IsPersistent,
1307-
fun ({Mod, MSCState1}) ->
1308-
Mod:remove(MsgIds, MSCState1)
1308+
fun (Mod, MSCStateInternal) ->
1309+
Mod:remove(MsgIds, MSCStateInternal)
13091310
end).
13101311

13111312
msg_store_close_fds(MSCState, IsPersistent) ->
13121313
with_msg_store_state(
13131314
MSCState, IsPersistent,
1314-
fun ({Mod, MSCState1}) -> Mod:close_all_indicated(MSCState1) end).
1315+
fun (Mod, MSCStateInternal) ->
1316+
Mod:close_all_indicated(MSCStateInternal)
1317+
end).
13151318

13161319
msg_store_close_fds_fun(IsPersistent) ->
13171320
fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
@@ -2938,6 +2941,7 @@ start_new_store_sup() ->
29382941
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
29392942
rabbit_msg_store_vhost_sup,
29402943
[?PERSISTENT_MSG_STORE_SUP,
2944+
rabbit_msg_store,
29412945
undefined, {fun (ok) -> finished end, ok}]),
29422946
?PERSISTENT_MSG_STORE_SUP.
29432947

test/backing_queue_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,11 @@ on_disk_stop(Pid) ->
376376

377377
msg_store_client_init_capture(MsgStore, Ref) ->
378378
Pid = spawn(fun on_disk_capture/0),
379-
{Pid, rabbit_msg_store_vhost_sup:client_init(
379+
{rabbit_msg_store, MSCState} = rabbit_msg_store_vhost_sup:client_init(
380380
MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
381381
Pid ! {on_disk, MsgIds}
382-
end, undefined, <<"/">>)}.
382+
end, undefined, <<"/">>),
383+
{Pid, MSCState}.
383384

384385
msg_store_contains(Atom, MsgIds, MSCState) ->
385386
Atom = lists:foldl(
@@ -456,7 +457,7 @@ test_msg_store_confirm_timer() ->
456457
Ref = rabbit_guid:gen(),
457458
MsgId = msg_id_bin(1),
458459
Self = self(),
459-
MSCState = rabbit_msg_store_vhost_sup:client_init(
460+
{rabbit_msg_store, MSCState} = rabbit_msg_store_vhost_sup:client_init(
460461
?PERSISTENT_MSG_STORE, Ref,
461462
fun (MsgIds, _ActionTaken) ->
462463
case gb_sets:is_member(MsgId, MsgIds) of
@@ -1337,7 +1338,8 @@ nop(_) -> ok.
13371338
nop(_, _) -> ok.
13381339

13391340
msg_store_client_init(MsgStore, Ref) ->
1340-
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
1341+
{rabbit_msg_store, MSCState} = rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>),
1342+
MSCState.
13411343

13421344
variable_queue_init(Q, Recover) ->
13431345
rabbit_variable_queue:init(

0 commit comments

Comments
 (0)