Skip to content

DO NOT MERGE Pluggable message stores experiment #1142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ define PROJECT_ENV
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{msg_store_module, rabbit_msg_store},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
Expand Down
4 changes: 2 additions & 2 deletions src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

-module(rabbit_msg_store).

-behaviour(gen_server2).

-behaviour(rabbit_msg_store_behaviour).
-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
Expand All @@ -28,6 +27,7 @@

-export([transform_dir/3, force_recovery/2]). %% upgrade

-behaviour(gen_server2).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/4, prioritise_cast/3,
prioritise_info/3, format_message_queue/2]).
Expand Down
65 changes: 65 additions & 0 deletions src/rabbit_msg_store_behaviour.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%


-module(rabbit_msg_store_behaviour).

-type msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A}).
-type maybe_msg_id_fun() ::
'undefined' | fun ((gb_sets:set(), 'written' | 'ignored') -> any()).

-type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok').

-type server() :: atom() | pid().

-type client_ref() :: binary().

-type msg() :: any().

-export_type([msg_ref_delta_gen/1,
maybe_msg_id_fun/0,
maybe_close_fds_fun/0,
server/0,
client_ref/0,
msg/0]).

-callback start_link (atom(), file:filename(),
[binary()] | 'undefined',
{msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error().

-callback successfully_recovered_state(server()) -> boolean().

-callback client_init(server(), client_ref(), maybe_msg_id_fun(), maybe_close_fds_fun()) -> term().

-callback client_terminate(term()) -> 'ok'.

-callback client_delete_and_terminate(term()) -> 'ok'.

-callback client_ref(term()) -> client_ref().

-callback close_all_indicated (term()) -> rabbit_types:ok(term()).

-callback write(rabbit_types:msg_id(), msg(), term()) -> 'ok'.

-callback write_flow(rabbit_types:msg_id(), msg(), term()) -> 'ok'.

-callback read(rabbit_types:msg_id(), term()) -> {rabbit_types:ok(msg()) | 'not_found', term()}.

-callback contains(rabbit_types:msg_id(), term()) -> boolean().

-callback remove([rabbit_types:msg_id()], term()) -> 'ok'.
118 changes: 77 additions & 41 deletions src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
-export([start/2, stop/1]).

%% exported for testing only
-export([start_msg_store/3, stop_msg_store/1, init/6]).
-export([start_msg_store/4, stop_msg_store/1, init/6]).

-export([move_messages_to_vhost_store/0]).

Expand All @@ -43,6 +43,7 @@

-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
-define(MSG_STORE_MODULE_FILE, "msg_store_module").

%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
Expand Down Expand Up @@ -480,28 +481,49 @@ explicit_gc_run_operation_threshold_for_mode(Mode) ->

start(VHost, DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
%% Group recovery terms by vhost.
ClientRefs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
start_msg_store(VHost, ClientRefs, StartFunState),
IsEmpty = DurableQueues == [],
start_msg_store(VHost, ClientRefs, StartFunState, IsEmpty),
{ok, AllTerms}.

stop(VHost) ->
ok = stop_msg_store(VHost),
ok = rabbit_queue_index:stop(VHost).

start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
start_msg_store(VHost, Refs, StartFunState, IsEmpty) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
MsgStoreModule = application:get_env(rabbit, msg_store_module, rabbit_msg_store),
case rabbit_file:read_term_file(msg_store_module_file()) of
%% The same message store module
{ok, [MsgStoreModule]} -> ok;
%% Fresh message store
{error, enoent} -> ok;
{ok, [OldModule]} ->
case IsEmpty of
%% There is no data in the old message store.
%% So it's safe to start with the new one
true -> ok;
false ->
error({msg_store_module_mismatch,
MsgStoreModule,
OldModule})
end;
Other ->
error(Other)
end,
rabbit_log:info("Using ~p to provide message store", [MsgStoreModule]),
do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE, MsgStoreModule),
do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState, MsgStoreModule),
rabbit_file:write_term_file(msg_store_module_file(), [MsgStoreModule]),
ok.

do_start_msg_store(VHost, Type, Refs, StartFunState) ->
case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
do_start_msg_store(VHost, Type, Refs, StartFunState, MsgStoreModule) ->
case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState, MsgStoreModule) of
{ok, _} ->
rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
{error, {no_such_vhost, VHost}} = Err ->
Expand All @@ -514,6 +536,9 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) ->
exit({error, Error})
end.

msg_store_module_file() ->
filename:join(rabbit_mnesia:dir(), ?MSG_STORE_MODULE_FILE).

abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.

Expand Down Expand Up @@ -552,14 +577,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
Persistent
end};
true -> {M, C} = msg_store_client_init(?PERSISTENT_MSG_STORE,
PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{{M, C},
fun
(MsgId) when is_binary(MsgId) ->
M:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
Persistent
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
Expand Down Expand Up @@ -592,10 +620,12 @@ terminate(_Reason, State) ->
purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = maybe_client_terminate(MSCStateP),
rabbit_msg_store:client_ref(MSCStateP)
{MP, CP} ->
ok = maybe_client_terminate(MP, CP),
MP:client_ref(CP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
{MT, CT} = MSCStateT,
ok = MT:client_delete_and_terminate(CT),
Terms = [{persistent_ref, PRef},
{persistent_count, PCount},
{persistent_bytes, PBytes}],
Expand All @@ -614,9 +644,10 @@ delete_and_terminate(_Reason, State) ->
purge_pending_ack_delete_and_terminate(State1),
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
{MP, CP} -> MP:client_delete_and_terminate(CP)
end,
rabbit_msg_store:client_delete_and_terminate(MSCStateT),
{MT, CT} = MSCStateT,
MT:client_delete_and_terminate(CT),
a(State2 #vqstate { msg_store_clients = undefined }).

delete_crashed(#amqqueue{name = QName}) ->
Expand Down Expand Up @@ -1236,18 +1267,19 @@ trim_msg_status(MsgStatus) ->
queue_index -> MsgStatus
end.

with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
{Result, {MSCStateP1, MSCStateT}};
with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
{Result, MSCStateT1} = Fun(MSCStateT),
{Result, {MSCStateP, MSCStateT1}}.
with_msg_store_state({{Mod, MSCStatePInternal}, MSCStateT}, true, Fun) ->
{Result, MSCStatePInternal1} = Fun(Mod, MSCStatePInternal),
{Result, {{Mod, MSCStatePInternal1}, MSCStateT}};
with_msg_store_state({MSCStateP, {Mod, MSCStateTInternal}}, false, Fun) ->
{Result, MSCStateTInternal1} = Fun(Mod, MSCStateTInternal),
{Result, {MSCStateP, {Mod, MSCStateTInternal1}}}.

with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
{Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
fun (MSCState1) ->
{Fun(MSCState1), MSCState1}
end),
{Res, MSCState} = with_msg_store_state(
MSCState, IsPersistent,
fun (Mod, MSCState1) ->
{Fun(Mod, MSCState1), MSCState1}
end),
Res.

msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
Expand All @@ -1265,28 +1297,30 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
fun (Mod, MSCStateInternal) ->
Mod:write_flow(MsgId, Msg, MSCStateInternal)
end).

msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
rabbit_msg_store:read(MsgId, MSCState1)
fun (Mod, MSCStateInternal) ->
Mod:read(MsgId, MSCStateInternal)
end).

msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
rabbit_msg_store:remove(MsgIds, MCSState1)
fun (Mod, MSCStateInternal) ->
Mod:remove(MsgIds, MSCStateInternal)
end).

msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
fun (Mod, MSCStateInternal) ->
Mod:close_all_indicated(MSCStateInternal)
end).

msg_store_close_fds_fun(IsPersistent) ->
fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
Expand Down Expand Up @@ -2805,7 +2839,9 @@ move_messages_to_vhost_store(Queues) ->
ok = delete_old_store(OldStore),
ok = rabbit_queue_index:cleanup_global_recovery_terms(),
[ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
ok = stop_new_store(NewMsgStore).
ok = stop_new_store(NewMsgStore),
rabbit_file:write_term_file(msg_store_module_file(), [rabbit_msg_store]),
ok.

in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
Expand Down Expand Up @@ -2957,12 +2993,12 @@ log_upgrade_verbose(Msg) ->
log_upgrade_verbose(Msg, Args) ->
rabbit_log_upgrade:info(Msg, Args).

maybe_client_terminate(MSCStateP) ->
maybe_client_terminate(MP, CP) ->
%% Queue might have been asked to stop by the supervisor, it needs a clean
%% shutdown in order for the supervising strategy to work - if it reaches max
%% restarts might bring the vhost down.
try
rabbit_msg_store:client_terminate(MSCStateP)
MP:client_terminate(CP)
catch
_:_ ->
ok
Expand Down
29 changes: 18 additions & 11 deletions src/rabbit_vhost_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@

-include("rabbit.hrl").

-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]).
-export([start/5, stop/2, client_init/5, successfully_recovered_state/2]).


start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
start(VHost, Type, ClientRefs, StartupFunState, MsgStoreModule)
when is_list(ClientRefs); ClientRefs == undefined ->
case rabbit_vhost_sup_sup:vhost_sup(VHost) of
{ok, VHostSup} ->
true = ets:insert(rabbit_vhost_sup_sup, {msg_store_module, {VHost, Type}, MsgStoreModule}),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
{Type, {rabbit_msg_store, start_link,
[Type, VHostDir, ClientRefs, StartupFunState]},
transient, ?WORKER_WAIT, worker, [rabbit_msg_store]});
{Type, {MsgStoreModule, start_link,
[Type, VHostDir, ClientRefs,
StartupFunState]},
transient, ?WORKER_WAIT, worker, [MsgStoreModule]});
%% we can get here if a vhost is added and removed concurrently
%% e.g. some integration tests do it
{error, {no_such_vhost, VHost}} = E ->
Expand All @@ -52,16 +54,21 @@ stop(VHost, Type) ->
end.

client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)
with_vhost_store(VHost, Type, fun(StorePid, MsgStoreModule) ->
{MsgStoreModule, MsgStoreModule:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)}
end).

with_vhost_store(VHost, Type, Fun) ->
case vhost_store_pid(VHost, Type) of
no_pid ->
throw({message_store_not_started, Type, VHost});
Pid when is_pid(Pid) ->
Fun(Pid)
case ets:lookup(rabbit_vhost_sup_sup, {VHost, Type}) of
[{msg_store_module, _, MsgStoreModule}] ->
Fun(Pid, MsgStoreModule);
[] ->
error({message_store_module_not_found, {VHost, Type, Pid}})
end
end.

vhost_store_pid(VHost, Type) ->
Expand All @@ -72,6 +79,6 @@ vhost_store_pid(VHost, Type) ->
end.

successfully_recovered_state(VHost, Type) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
rabbit_msg_store:successfully_recovered_state(StorePid)
with_vhost_store(VHost, Type, fun(StorePid, MsgStoreModule) ->
MsgStoreModule:successfully_recovered_state(StorePid)
end).
Loading