Skip to content

Commit fdf783f

Browse files
Merge pull request #766 from rabbitmq/rabbitmq-server-567
Per vhost message store
2 parents 72f7c8e + 1d4e939 commit fdf783f

18 files changed

+912
-164
lines changed

scripts/rabbitmq-env

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,11 @@ rmq_normalize_path_var RABBITMQ_PLUGINS_DIR
236236
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
237237
[ "x" != "x$RABBITMQ_LOGS" ] && export RABBITMQ_LOGS_source=environment
238238
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
239+
[ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log"
239240

240-
rmq_normalize_path_var \
241-
RABBITMQ_LOGS
241+
rmq_normalize_path_var RABBITMQ_LOGS
242+
243+
rmq_normalize_path_var RABBITMQ_UPGRADE_LOG
242244

243245
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
244246

@@ -254,7 +256,8 @@ rmq_check_if_shared_with_mnesia \
254256
RABBITMQ_PLUGINS_EXPAND_DIR \
255257
RABBITMQ_ENABLED_PLUGINS_FILE \
256258
RABBITMQ_PLUGINS_DIR \
257-
RABBITMQ_LOGS
259+
RABBITMQ_LOGS \
260+
RABBITMQ_UPGRADE_LOG
258261

259262
##--- End of overridden <var_name> variables
260263

scripts/rabbitmq-server

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,11 @@ RABBITMQ_LISTEN_ARG=
160160
if [ "$RABBITMQ_LOGS" = '-' ]; then
161161
SASL_ERROR_LOGGER=tty
162162
RABBIT_LAGER_HANDLER=tty
163+
RABBITMQ_LAGER_HANDLER_UPGRADE=tty
163164
else
164165
SASL_ERROR_LOGGER=false
165166
RABBIT_LAGER_HANDLER='"'${RABBITMQ_LOGS}'"'
167+
RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"'
166168
fi
167169

168170
# Bump ETS table limit to 50000
@@ -216,6 +218,7 @@ start_rabbitmq_server() {
216218
-sasl sasl_error_logger "$SASL_ERROR_LOGGER" \
217219
-rabbit lager_log_root "\"$RABBITMQ_LOG_BASE\"" \
218220
-rabbit lager_handler "$RABBIT_LAGER_HANDLER" \
221+
-rabbit lager_handler_upgrade "$RABBITMQ_LAGER_HANDLER_UPGRADE" \
219222
-rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
220223
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
221224
-rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \

src/rabbit.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@
153153
{requires, core_initialized},
154154
{enables, routing_ready}]}).
155155

156+
-rabbit_boot_step({upgrade_queues,
157+
[{description, "per-vhost message store migration"},
158+
{mfa, {rabbit_upgrade,
159+
maybe_migrate_queues_to_per_vhost_storage,
160+
[]}},
161+
{requires, [core_initialized]},
162+
{enables, recovery}]}).
163+
156164
-rabbit_boot_step({recovery,
157165
[{description, "exchange, queue and binding recovery"},
158166
{mfa, {rabbit, recover, []}},

src/rabbit_lager.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ configure_lager() ->
210210
%% messages to the default sink. To know the list of expected extra
211211
%% sinks, we look at the 'lager_extra_sinks' compilation option.
212212
Sinks0 = application:get_env(lager, extra_sinks, []),
213-
Sinks1 = configure_extra_sinks(Sinks0,
213+
Sinks1 = configure_extra_sinks(Sinks0,
214214
[error_logger | list_expected_sinks()]),
215215
%% TODO Waiting for basho/lager#303
216216
%% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1,
@@ -231,18 +231,25 @@ configure_lager() ->
231231
configure_extra_sinks(Sinks, [SinkName | Rest]) ->
232232
Sink0 = proplists:get_value(SinkName, Sinks, []),
233233
Sink1 = case proplists:is_defined(handlers, Sink0) of
234-
false -> lists:keystore(handlers, 1, Sink0,
235-
{handlers,
236-
[{lager_forwarder_backend,
237-
lager_util:make_internal_sink_name(lager)
238-
}]});
234+
false -> default_sink_config(SinkName, Sink0);
239235
true -> Sink0
240236
end,
241237
Sinks1 = lists:keystore(SinkName, 1, Sinks, {SinkName, Sink1}),
242238
configure_extra_sinks(Sinks1, Rest);
243239
configure_extra_sinks(Sinks, []) ->
244240
Sinks.
245241

242+
default_sink_config(rabbit_log_upgrade_lager_event, Sink) ->
243+
Handlers = lager_handlers(application:get_env(rabbit,
244+
lager_handler_upgrade,
245+
tty)),
246+
lists:keystore(handlers, 1, Sink, {handlers, Handlers});
247+
default_sink_config(_, Sink) ->
248+
lists:keystore(handlers, 1, Sink,
249+
{handlers,
250+
[{lager_forwarder_backend,
251+
lager_util:make_internal_sink_name(lager)}]}).
252+
246253
list_expected_sinks() ->
247254
case application:get_env(rabbit, lager_extra_sinks) of
248255
{ok, List} ->

src/rabbit_log.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event;
7878
make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event;
7979
make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event;
8080
make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event;
81+
make_internal_sink_name(rabbit_log_upgrade) -> rabbit_log_upgrade_lager_event;
8182
make_internal_sink_name(Category) ->
8283
lager_util:make_internal_sink_name(Category).
8384

src/rabbit_msg_store.erl

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
-behaviour(gen_server2).
2020

21-
-export([start_link/4, successfully_recovered_state/1,
21+
-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
2222
client_init/4, client_terminate/1, client_delete_and_terminate/1,
2323
client_ref/1, close_all_indicated/1,
2424
write/3, write_flow/3, read/2, contains/2, remove/2]).
@@ -63,7 +63,7 @@
6363
%% the module for index ops,
6464
%% rabbit_msg_store_ets_index by default
6565
index_module,
66-
%% %% where are messages?
66+
%% where are messages?
6767
index_state,
6868
%% current file name as number
6969
current_file,
@@ -91,8 +91,6 @@
9191
flying_ets,
9292
%% set of dying clients
9393
dying_clients,
94-
%% index of file positions for client death messages
95-
dying_client_index,
9694
%% map of references of all registered clients
9795
%% to callbacks
9896
clients,
@@ -474,15 +472,20 @@
474472
%% public API
475473
%%----------------------------------------------------------------------------
476474

477-
start_link(Server, Dir, ClientRefs, StartupFunState) ->
478-
gen_server2:start_link({local, Server}, ?MODULE,
479-
[Server, Dir, ClientRefs, StartupFunState],
475+
start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
476+
gen_server2:start_link(?MODULE,
477+
[Name, Dir, ClientRefs, StartupFunState],
478+
[{timeout, infinity}]).
479+
480+
start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
481+
gen_server2:start_link({local, Name}, ?MODULE,
482+
[Name, Dir, ClientRefs, StartupFunState],
480483
[{timeout, infinity}]).
481484

482485
successfully_recovered_state(Server) ->
483486
gen_server2:call(Server, successfully_recovered_state, infinity).
484487

485-
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
488+
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
486489
{IState, IModule, Dir, GCPid,
487490
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
488491
gen_server2:call(
@@ -522,7 +525,7 @@ write_flow(MsgId, Msg,
522525
%% rabbit_amqqueue_process process via the
523526
%% rabbit_variable_queue. We are accessing the
524527
%% rabbit_amqqueue_process process dictionary.
525-
credit_flow:send(whereis(Server), CreditDiscBound),
528+
credit_flow:send(Server, CreditDiscBound),
526529
client_write(MsgId, Msg, flow, CState).
527530

528531
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -548,7 +551,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
548551
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
549552
server_cast(CState, {remove, CRef, MsgIds}).
550553

551-
set_maximum_since_use(Server, Age) ->
554+
set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
552555
gen_server2:cast(Server, {set_maximum_since_use, Age}).
553556

554557
%%----------------------------------------------------------------------------
@@ -699,27 +702,25 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
699702
end.
700703

701704
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
702-
dying_clients = DyingClients,
703-
dying_client_index = DyingIndex }) ->
704-
ets:delete(DyingIndex, CRef),
705+
dying_clients = DyingClients }) ->
705706
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
706-
dying_clients = sets:del_element(CRef, DyingClients) }.
707+
dying_clients = maps:remove(CRef, DyingClients) }.
707708

708709

709710
%%----------------------------------------------------------------------------
710711
%% gen_server callbacks
711712
%%----------------------------------------------------------------------------
712713

713-
init([Server, BaseDir, ClientRefs, StartupFunState]) ->
714+
init([Name, BaseDir, ClientRefs, StartupFunState]) ->
714715
process_flag(trap_exit, true),
715716

716717
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
717718
[self()]),
718719

719-
Dir = filename:join(BaseDir, atom_to_list(Server)),
720+
Dir = filename:join(BaseDir, atom_to_list(Name)),
720721

721-
{ok, IndexModule} = application:get_env(msg_store_index_module),
722-
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
722+
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
723+
rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]),
723724

724725
AttemptFileSummaryRecovery =
725726
case ClientRefs of
@@ -738,7 +739,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
738739

739740
{CleanShutdown, IndexState, ClientRefs1} =
740741
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
741-
ClientRefs, Dir, Server),
742+
ClientRefs, Dir),
742743
Clients = dict:from_list(
743744
[{CRef, {undefined, undefined, undefined}} ||
744745
CRef <- ClientRefs1]),
@@ -755,10 +756,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
755756
[ordered_set, public]),
756757
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
757758
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
758-
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
759-
[set, public, {keypos, #dying_client.client_ref}]),
760759

761-
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
760+
{ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit),
762761

763762
{ok, GCPid} = rabbit_msg_store_gc:start_link(
764763
#gc_state { dir = Dir,
@@ -787,8 +786,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
787786
file_summary_ets = FileSummaryEts,
788787
cur_file_cache_ets = CurFileCacheEts,
789788
flying_ets = FlyingEts,
790-
dying_clients = sets:new(),
791-
dying_client_index = DyingIndex,
789+
dying_clients = #{},
792790
clients = Clients,
793791
successfully_recovered = CleanShutdown,
794792
file_size_limit = FileSizeLimit,
@@ -866,14 +864,14 @@ handle_call({contains, MsgId}, From, State) ->
866864

867865
handle_cast({client_dying, CRef},
868866
State = #msstate { dying_clients = DyingClients,
869-
dying_client_index = DyingIndex,
870867
current_file_handle = CurHdl,
871868
current_file = CurFile }) ->
872-
DyingClients1 = sets:add_element(CRef, DyingClients),
873869
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
874-
true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
875-
file = CurFile,
876-
offset = CurOffset}),
870+
DyingClients1 = maps:put(CRef,
871+
#dying_client{client_ref = CRef,
872+
file = CurFile,
873+
offset = CurOffset},
874+
DyingClients),
877875
noreply(State #msstate { dying_clients = DyingClients1 });
878876

879877
handle_cast({client_delete, CRef},
@@ -995,12 +993,25 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
995993
State2
996994
end,
997995
State3 = close_all_handles(State1),
998-
ok = store_file_summary(FileSummaryEts, Dir),
996+
case store_file_summary(FileSummaryEts, Dir) of
997+
ok -> ok;
998+
{error, FSErr} ->
999+
rabbit_log:error("Unable to store file summary"
1000+
" for vhost message store for directory ~p~n"
1001+
"Error: ~p~n",
1002+
[Dir, FSErr])
1003+
end,
9991004
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
10001005
CurFileCacheEts, FlyingEts]],
10011006
IndexModule:terminate(IndexState),
1002-
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
1003-
{index_module, IndexModule}], Dir),
1007+
case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
1008+
{index_module, IndexModule}], Dir) of
1009+
ok -> ok;
1010+
{error, RTErr} ->
1011+
rabbit_log:error("Unable to save message store recovery terms"
1012+
"for directory ~p~nError: ~p~n",
1013+
[Dir, RTErr])
1014+
end,
10041015
State3 #msstate { index_state = undefined,
10051016
current_file_handle = undefined }.
10061017

@@ -1357,17 +1368,15 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
13571368
%% msg and thus should be ignored. Note that this (correctly) returns
13581369
%% false when testing to remove the death msg itself.
13591370
should_mask_action(CRef, MsgId,
1360-
State = #msstate { dying_clients = DyingClients,
1361-
dying_client_index = DyingIndex }) ->
1362-
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
1363-
{false, Location} ->
1371+
State = #msstate{dying_clients = DyingClients}) ->
1372+
case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
1373+
{error, Location} ->
13641374
{false, Location};
1365-
{true, not_found} ->
1375+
{{ok, _}, not_found} ->
13661376
{true, not_found};
1367-
{true, #msg_location { file = File, offset = Offset,
1368-
ref_count = RefCount } = Location} ->
1369-
[#dying_client { file = DeathFile, offset = DeathOffset }] =
1370-
ets:lookup(DyingIndex, CRef),
1377+
{{ok, Client}, #msg_location { file = File, offset = Offset,
1378+
ref_count = RefCount } = Location} ->
1379+
#dying_client{file = DeathFile, offset = DeathOffset} = Client,
13711380
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
13721381
{true, _} -> true;
13731382
{false, 0} -> false_if_increment;
@@ -1538,16 +1547,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
15381547
%% shutdown and recovery
15391548
%%----------------------------------------------------------------------------
15401549

1541-
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
1550+
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) ->
15421551
{false, IndexModule:new(Dir), []};
1543-
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
1544-
rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
1552+
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) ->
1553+
rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]),
15451554
{false, IndexModule:new(Dir), []};
1546-
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
1555+
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) ->
15471556
Fresh = fun (ErrorMsg, ErrorArgs) ->
1548-
rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
1557+
rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n"
15491558
"rebuilding indices from scratch~n",
1550-
[Server | ErrorArgs]),
1559+
[Dir | ErrorArgs]),
15511560
{false, IndexModule:new(Dir), []}
15521561
end,
15531562
case read_recovery_terms(Dir) of
@@ -1582,7 +1591,7 @@ read_recovery_terms(Dir) ->
15821591
end.
15831592

15841593
store_file_summary(Tid, Dir) ->
1585-
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1594+
ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
15861595
[{extended_info, [object_count]}]).
15871596

15881597
recover_file_summary(false, _Dir) ->

src/rabbit_msg_store_ets_index.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ delete_by_file(File, State) ->
7474
ok.
7575

7676
terminate(#state { table = MsgLocations, dir = Dir }) ->
77-
ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
78-
[{extended_info, [object_count]}]),
77+
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
78+
[{extended_info, [object_count]}]) of
79+
ok -> ok;
80+
{error, Err} ->
81+
rabbit_log:error("Unable to save message store index"
82+
" for directory ~p.~nError: ~p~n",
83+
[Dir, Err])
84+
end,
7985
ets:delete(MsgLocations).

0 commit comments

Comments
 (0)