Skip to content

Commit 6aa219f

Browse files
author
Daniil Fedotov
committed
After rebase fixes
1 parent 6759e2b commit 6aa219f

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

src/rabbit_variable_queue.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,8 @@ terminate(_Reason, State) ->
620620
purge_pending_ack(true, State),
621621
PRef = case MSCStateP of
622622
undefined -> undefined;
623-
{MP, CP} -> ok = MP:client_terminate(CP),
623+
{MP, CP} ->
624+
ok = maybe_client_terminate(MP, CP),
624625
MP:client_ref(CP)
625626
end,
626627
{MT, CT} = MSCStateT,
@@ -2838,7 +2839,7 @@ move_messages_to_vhost_store(Queues) ->
28382839
ok = delete_old_store(OldStore),
28392840
ok = rabbit_queue_index:cleanup_global_recovery_terms(),
28402841
[ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
2841-
ok = stop_new_store(NewMsgStore).
2842+
ok = stop_new_store(NewMsgStore),
28422843
rabbit_file:write_term_file(msg_store_module_file(), [rabbit_msg_store]),
28432844
ok.
28442845

@@ -2992,12 +2993,12 @@ log_upgrade_verbose(Msg) ->
29922993
log_upgrade_verbose(Msg, Args) ->
29932994
rabbit_log_upgrade:info(Msg, Args).
29942995

2995-
maybe_client_terminate(MSCStateP) ->
2996+
maybe_client_terminate(MP, CP) ->
29962997
%% Queue might have been asked to stop by the supervisor, it needs a clean
29972998
%% shutdown in order for the supervising strategy to work - if it reaches max
29982999
%% restarts might bring the vhost down.
29993000
try
3000-
rabbit_msg_store:client_terminate(MSCStateP)
3001+
MP:client_terminate(CP)
30013002
catch
30023003
_:_ ->
30033004
ok

src/rabbit_vhost_msg_store.erl

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]).
2222

2323

24-
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
25-
ClientRefs == undefined ->
24+
start(VHost, Type, ClientRefs, StartupFunState, MsgStoreModule)
25+
when is_list(ClientRefs); ClientRefs == undefined ->
2626
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
2727
{ok, VHostSup} ->
28+
ets:insert(rabbit_vhost_sup_sup, {{VHost, Type}, MsgStoreModule}),
2829
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
2930
supervisor2:start_child(VHostSup,
3031
{Type, {rabbit_msg_store, start_link,
31-
[Type, VHostDir, ClientRefs, StartupFunState]},
32+
[Type, VHostDir, ClientRefs,
33+
StartupFunState, MsgStoreModule]},
3234
transient, ?WORKER_WAIT, worker, [rabbit_msg_store]});
3335
%% we can get here if a vhost is added and removed concurrently
3436
%% e.g. some integration tests do it
@@ -52,16 +54,19 @@ stop(VHost, Type) ->
5254
end.
5355

5456
client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
55-
with_vhost_store(VHost, Type, fun(StorePid) ->
56-
rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)
57+
with_vhost_store(VHost, Type, fun(StorePid, MsgStoreModule) ->
58+
MsgStoreModule:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)
5759
end).
5860

5961
with_vhost_store(VHost, Type, Fun) ->
6062
case vhost_store_pid(VHost, Type) of
6163
no_pid ->
6264
throw({message_store_not_started, Type, VHost});
6365
Pid when is_pid(Pid) ->
64-
Fun(Pid)
66+
case ets:lookup(rabbit_vhost_sup_sup, {VHost, Type}) of
67+
[MsgStoreModule] -> Fun(Pid, MsgStoreModule);
68+
[] -> error({message_store_module_not_found, {VHost, Type, Pid}})
69+
end
6570
end.
6671

6772
vhost_store_pid(VHost, Type) ->
@@ -72,6 +77,6 @@ vhost_store_pid(VHost, Type) ->
7277
end.
7378

7479
successfully_recovered_state(VHost, Type) ->
75-
with_vhost_store(VHost, Type, fun(StorePid) ->
76-
rabbit_msg_store:successfully_recovered_state(StorePid)
80+
with_vhost_store(VHost, Type, fun(StorePid, MsgStoreModule) ->
81+
MsgStoreModule:successfully_recovered_state(StorePid)
7782
end).

0 commit comments

Comments
 (0)