Skip to content

Commit 9b42b93

Browse files
author
Daniil Fedotov
committed
Check previous msg_store_module on startup
1 parent 1cdfeb2 commit 9b42b93

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

src/rabbit_variable_queue.erl

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@
3434
-export([add_vhost_msg_store/1]).
3535

3636
%% exported for testing only
37-
-export([start_msg_store/2, stop_msg_store/0, init/6]).
37+
-export([start_msg_store/3, stop_msg_store/0, init/6]).
3838

3939
-export([move_messages_to_vhost_store/0]).
4040
-export([stop_vhost_msg_store/1]).
4141
-include_lib("stdlib/include/qlc.hrl").
4242

4343
-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
44+
-define(MSG_STORE_MODULE_FILE, "msg_store_module").
4445

4546
%%----------------------------------------------------------------------------
4647
%% Messages, and their position in the queue, can be in memory or on
@@ -499,15 +500,36 @@ start(DurableQueues) ->
499500
end,
500501
{DurableQueues, #{}},
501502
AllTerms),
502-
start_msg_store(VhostRefs, StartFunState),
503+
IsEmpty = DurableQueues == [],
504+
start_msg_store(VhostRefs, StartFunState, IsEmpty),
503505
{ok, AllTerms}.
504506

505507
stop() ->
506508
ok = stop_msg_store(),
507509
ok = rabbit_queue_index:stop().
508510

509-
start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
511+
start_msg_store(Refs, StartFunState, IsEmpty) when is_map(Refs); Refs == undefined ->
510512
MsgStoreModule = application:get_env(rabbit, msg_store_module, rabbit_msg_store),
513+
case rabbit_file:read_term_file(msg_store_module_file()) of
514+
%% The same message store module
515+
{ok, [MsgStoreModule]} -> ok;
516+
%% Fresh message store
517+
{error, enoent} -> ok;
518+
{ok, [OldModule]} ->
519+
case IsEmpty of
520+
%% There is no data in the old message store.
521+
%% So it's safe to start with the new one
522+
true -> ok;
523+
false ->
524+
error({msg_store_module_mismatch,
525+
MsgStoreModule,
526+
OldModule})
527+
end;
528+
Other ->
529+
error(Other)
530+
end,
531+
rabbit_file:read_term_file(msg_store_module_file(),
532+
[MsgStoreModule]),
511533
rabbit_log:info("Using ~p to provide message store", [MsgStoreModule]),
512534
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
513535
[?TRANSIENT_MSG_STORE_SUP,
@@ -527,8 +549,14 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
527549
add_vhost_msg_store(Vhost)
528550
end,
529551
lists:sort(VHosts)),
552+
%% When message store is started, we can save a module
553+
rabbit_file:write_term_file(msg_store_module_file(),
554+
[MsgStoreModule]),
530555
ok.
531556

557+
msg_store_module_file() ->
558+
filename:join(rabbit_mnesia:dir(), ?MSG_STORE_MODULE_FILE).
559+
532560
add_vhost_msg_store(VHost) ->
533561
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
534562
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
@@ -2827,6 +2855,8 @@ move_messages_to_vhost_store() ->
28272855

28282856
ok = rabbit_queue_index:stop(),
28292857
ok = rabbit_sup:stop_child(NewStoreSup),
2858+
%% Specify old message store module.
2859+
rabbit_file:write_term_file(msg_store_module_file(), [rabbit_msg_store]),
28302860
ok.
28312861

28322862
in_batches(Size, MFA, List, MessageStart, MessageEnd) ->

test/backing_queue_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ msg_store1(_Config) ->
261261
{MsgId, 1, MsgIdsTail};
262262
([MsgId|MsgIdsTail]) ->
263263
{MsgId, 0, MsgIdsTail}
264-
end, MsgIds2ndHalf}),
264+
end, MsgIds2ndHalf}, true),
265265
MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
266266
%% check we have the right msgs left
267267
lists:foldl(
@@ -332,7 +332,7 @@ msg_store1(_Config) ->
332332
restart_msg_store_empty() ->
333333
ok = rabbit_variable_queue:stop_msg_store(),
334334
ok = rabbit_variable_queue:start_msg_store(
335-
undefined, {fun (ok) -> finished end, ok}).
335+
undefined, {fun (ok) -> finished end, ok}, true).
336336

337337
msg_id_bin(X) ->
338338
erlang:md5(term_to_binary(X)).

0 commit comments

Comments
 (0)