Skip to content

Commit 0fe4f7f

Browse files
author
Loïc Hoguin
committed
More reliable upgrades from old index
The files are only deleted after we have asked the message store to sync to disk. We always look for .idx files and load them if any, except on clean recovery. This allows resuming the upgrade. When a node is shut down during an upgrade (normally or not) and then restarted, it is possible that some messages end up duplicated, however.
1 parent 50eae64 commit 0fe4f7f

File tree

2 files changed

+40
-28
lines changed

2 files changed

+40
-28
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -199,29 +199,30 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, IsMsgStoreClean,
199199
State}
200200
end.
201201

202-
recover_segments(State = #mqistate { dir = Dir }, Terms, IsMsgStoreClean,
202+
recover_segments(State0 = #mqistate { dir = Dir }, Terms, IsMsgStoreClean,
203203
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef) ->
204204
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
205-
case SegmentFiles of
206-
%% No segments found. We try to see if there are segment files
207-
%% from the old index.
205+
State = case SegmentFiles of
206+
%% No segments found.
208207
[] ->
209-
case rabbit_file:wildcard(".*\\.idx", Dir) of
210-
%% We are recovering a dirty queue that was using the old index.
211-
[_|_] ->
212-
recover_index_v1_dirty(State, Terms, IsMsgStoreClean,
213-
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
214-
CountersRef);
215-
%% Otherwise keep default values.
216-
[] ->
217-
State
218-
end;
208+
State0;
219209
%% Count unackeds in the segments.
220210
_ ->
221211
Segments = lists:sort([
222212
list_to_integer(filename:basename(F, ?SEGMENT_EXTENSION))
223213
|| F <- SegmentFiles]),
224-
recover_segments(State, ContainsCheckFun, CountersRef, Segments)
214+
recover_segments(State0, ContainsCheckFun, CountersRef, Segments)
215+
end,
216+
%% We always try to see if there are segment files from the old index as well.
217+
case rabbit_file:wildcard(".*\\.idx", Dir) of
218+
%% We are recovering a dirty queue that was using the old index.
219+
[_|_] ->
220+
recover_index_v1_dirty(State, Terms, IsMsgStoreClean,
221+
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
222+
CountersRef);
223+
%% Otherwise keep default values.
224+
[] ->
225+
State
225226
end.
226227

227228
recover_segments(State, _, _, []) ->
@@ -384,7 +385,7 @@ recover_index_v1_loop(State0 = #mqistate{ queue_name = Name },
384385
MsgId = case MsgOrId of
385386
Msg = #basic_message{ id = MsgId0 } ->
386387
%% We must do a synchronous write to avoid overloading the message store.
387-
rabbit_msg_store:sync_write(MsgId0, Msg, MSClient),
388+
rabbit_msg_store:blocking_write(MsgId0, Msg, MSClient),
388389
MsgId0;
389390
MsgId0 ->
390391
MsgId0
@@ -395,6 +396,7 @@ recover_index_v1_loop(State0 = #mqistate{ queue_name = Name },
395396
publish(MsgId, SeqId, Props, IsPersistent, IsDelivered, infinity, State1)
396397
end, State0, Messages),
397398
State = flush(State2),
399+
rabbit_msg_store:force_sync(MSClient),
398400
%% We have written everything to disk. We can delete the old segment file
399401
%% to free up much needed space, to avoid doubling disk usage during the upgrade.
400402
rabbit_queue_index:delete_segment_file_for_seq_id(LoSeqId, V1State),
@@ -997,17 +999,17 @@ queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
997999
?DEBUG("~0p ~0p", [Name, Gatherer]),
9981000
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
9991001
Dir = queue_dir(VHostDir, Name),
1002+
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
1003+
_ = [queue_index_walker_segment(filename:join(Dir, F), Gatherer) || F <- SegmentFiles],
10001004
%% When there are files belonging to the old index, we go through
1001-
%% the old index walker function. We will upgrade to the new index
1002-
%% in the recover step.
1005+
%% the old index walker function as well.
10031006
case rabbit_file:wildcard(".*\\.idx", Dir) of
10041007
[_|_] ->
10051008
rabbit_queue_index:queue_index_walker_reader(Name, Gatherer);
10061009
[] ->
1007-
SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir),
1008-
_ = [queue_index_walker_segment(filename:join(Dir, F), Gatherer) || F <- SegmentFiles],
1009-
ok = gatherer:finish(Gatherer)
1010-
end.
1010+
ok
1011+
end,
1012+
ok = gatherer:finish(Gatherer).
10111013

10121014
queue_index_walker_segment(F, Gatherer) ->
10131015
?DEBUG("~0p ~0p", [F, Gatherer]),

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
-export([scan_file_for_valid_messages/1]). %% salvage tool
2121

2222
-export([transform_dir/3, force_recovery/2]). %% upgrade
23-
-export([sync_write/3]). %% Used when upgrading to the modern index.
23+
-export([blocking_write/3, force_sync/1]). %% Used when upgrading to the modern index.
2424

2525
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
2626
code_change/3, prioritise_call/4, prioritise_cast/3,
@@ -516,17 +516,23 @@ write_flow(MsgId, Msg,
516516

517517
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
518518

519-
-spec sync_write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
519+
-spec blocking_write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
520520

521521
%% Used when upgrading to the modern index.
522-
sync_write(MsgId, Msg,
523-
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
524-
client_ref = CRef }) ->
522+
blocking_write(MsgId, Msg,
523+
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
524+
client_ref = CRef }) ->
525525
file_handle_cache_stats:update(msg_store_write),
526526
ok = client_update_flying(+1, MsgId, CState),
527527
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
528528
ok = server_call(CState, {write, CRef, MsgId, noflow}).
529529

530+
-spec force_sync(client_msstate()) -> 'ok'.
531+
532+
%% Used when upgrading to the modern index.
533+
force_sync(CState) ->
534+
ok = server_call(CState, force_sync).
535+
530536
-spec read(rabbit_types:msg_id(), client_msstate()) ->
531537
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
532538

@@ -884,7 +890,11 @@ handle_call({contains, MsgId}, From, State) ->
884890
%% Used when upgrading to the modern index.
885891
handle_call(Write = {write, _, _, _}, _From, State) ->
886892
{noreply, State1, _} = handle_cast(Write, State),
887-
reply(ok, State1).
893+
reply(ok, State1);
894+
895+
handle_call(force_sync, _From, State = #msstate { current_file_handle = CurHdl }) ->
896+
ok = file_handle_cache:sync(CurHdl),
897+
reply(ok, State).
888898

889899
handle_cast({client_dying, CRef},
890900
State = #msstate { dying_clients = DyingClients,

0 commit comments

Comments
 (0)