Skip to content

Commit 18acc01

Browse files
Loïc Hoguinlhoguin
authored andcommitted
CQ: Make dirty recovery of shared store more efficient
This only applies to v2 because modifying this part of the v1 code is seen as too risky considering v1 will soon get removed.
1 parent 1723798 commit 18acc01

File tree

2 files changed

+39
-39
lines changed

2 files changed

+39
-39
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,8 +1125,11 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
11251125
empty ->
11261126
ok = gatherer:stop(Gatherer),
11271127
finished;
1128+
%% From v1 index walker. @todo Remove when no longer possible to convert from v1.
11281129
{value, {MsgId, Count}} ->
1129-
{MsgId, Count, {next, Gatherer}}
1130+
{MsgId, Count, {next, Gatherer}};
1131+
{value, MsgIds} ->
1132+
{MsgIds, {next, Gatherer}}
11301133
end.
11311134

11321135
queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
@@ -1153,27 +1156,30 @@ queue_index_walker_segment(F, Gatherer) ->
11531156
{ok, <<?MAGIC:32,?VERSION:8,
11541157
FromSeqId:64/unsigned,ToSeqId:64/unsigned,
11551158
_/bits>>} ->
1156-
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId);
1159+
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId, []);
11571160
_ ->
11581161
%% Invalid segment file. Skip.
11591162
ok
11601163
end,
11611164
ok = file:close(Fd).
11621165

1163-
queue_index_walker_segment(_, _, N, N) ->
1166+
queue_index_walker_segment(_, Gatherer, N, N, Acc) ->
11641167
%% We reached the end of the segment file.
1168+
gatherer:sync_in(Gatherer, Acc),
11651169
ok;
1166-
queue_index_walker_segment(Fd, Gatherer, N, Total) ->
1170+
queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) ->
11671171
case file:read(Fd, ?ENTRY_SIZE) of
11681172
%% We found a non-ack persistent entry. Gather it.
11691173
{ok, <<1,_:7,1:1,_,1,Id:16/binary,_/bits>>} ->
1170-
gatherer:sync_in(Gatherer, {Id, 1}),
1171-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1174+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, [Id|Acc]);
11721175
%% We found an ack, a transient entry or a non-entry. Skip it.
11731176
{ok, _} ->
1174-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1177+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, Acc);
11751178
%% We reached the end of a partial segment file.
1179+
eof when Acc =:= [] ->
1180+
ok;
11761181
eof ->
1182+
gatherer:sync_in(Gatherer, Acc),
11771183
ok
11781184
end.
11791185

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,15 @@
2626

2727
-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
2828

29-
%% We flush to disk when the write buffer gets above the max size,
30-
%% or at an interval to make sure we don't keep the data in memory
31-
%% too long. Confirms are sent after the data is flushed to disk.
32-
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB.
33-
-define(SYNC_INTERVAL, 200). %% Milliseconds.
29+
%% We flush to disk at an interval to make sure we don't keep
30+
%% the data in memory too long. Confirms are sent after the
31+
%% data is flushed to disk.
32+
-define(SYNC_INTERVAL, 200). %% Milliseconds.
3433

3534
-define(CLEAN_FILENAME, "clean.dot").
3635
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
3736

38-
-define(BINARY_MODE, [raw, binary]).
39-
-define(READ_MODE, [read]).
40-
-define(WRITE_MODE, [write]).
41-
4237
-define(FILE_EXTENSION, ".rdq").
43-
-define(FILE_EXTENSION_TMP, ".rdt").
4438

4539
%% We keep track of flying messages for writes and removes. The idea is that
4640
%% when a remove comes in before we could process the write, we skip the
@@ -1575,24 +1569,22 @@ count_msg_refs(Gen, Seed, State) ->
15751569
case Gen(Seed) of
15761570
finished ->
15771571
ok;
1572+
%% @todo This is currently required by tests but can't happen otherwise?
15781573
{_MsgId, 0, Next} ->
15791574
count_msg_refs(Gen, Next, State);
1580-
{MsgId, Delta, Next} ->
1581-
ok = case index_lookup(MsgId, State) of
1582-
not_found ->
1583-
index_insert(#msg_location { msg_id = MsgId,
1584-
file = undefined,
1585-
ref_count = Delta },
1586-
State);
1587-
#msg_location { ref_count = RefCount } = StoreEntry ->
1588-
NewRefCount = RefCount + Delta,
1589-
case NewRefCount of
1590-
0 -> index_delete(MsgId, State);
1591-
_ -> index_update(StoreEntry #msg_location {
1592-
ref_count = NewRefCount },
1593-
State)
1594-
end
1595-
end,
1575+
%% This clause is kept for v1 compatibility purposes.
1576+
%% It can be removed once we no longer support converting from v1 data.
1577+
{MsgId, 1, Next} ->
1578+
%% @todo Remove index_module and avoid this element/2.
1579+
_ = ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
1580+
#msg_location{msg_id=MsgId, file=undefined, ref_count=1}),
1581+
count_msg_refs(Gen, Next, State);
1582+
{MsgIds, Next} ->
1583+
lists:foreach(fun(MsgId) ->
1584+
%% @todo Remove index_module and avoid this element/2.
1585+
ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
1586+
#msg_location{msg_id=MsgId, file=undefined, ref_count=1})
1587+
end, MsgIds),
15961588
count_msg_refs(Gen, Next, State)
15971589
end.
15981590

@@ -1621,15 +1613,17 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
16211613
FileName = filenum_to_name(File),
16221614
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
16231615
[form_filename(Dir, FileName), length(Files)]),
1624-
{ok, Messages0, FileSize} =
1616+
%% The scan function already dealt with duplicate messages
1617+
%% within the file. We then get messages in reverse order.
1618+
{ok, Messages, FileSize} =
16251619
scan_file_for_valid_messages(Dir, FileName),
1626-
%% The scan gives us messages end-of-file first so we reverse the list
1627-
%% in case a compaction had occurred before shutdown to not have to repeat it.
1628-
Messages = lists:reverse(Messages0),
1620+
%% Valid messages are in file order so the last message is
1621+
%% the last message from the list.
16291622
{ValidMessages, ValidTotalSize} =
16301623
lists:foldl(
16311624
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1632-
%% We only keep the first message in the file. Duplicates (due to compaction) get ignored.
1625+
%% Fan-out may result in the same message data in multiple
1626+
%% files so we have to guard against it.
16331627
case index_lookup(MsgId, State) of
16341628
#msg_location { file = undefined } = StoreEntry ->
16351629
ok = index_update(StoreEntry #msg_location {
@@ -1649,7 +1643,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
16491643
[] -> case ValidMessages of
16501644
[] -> 0;
16511645
_ -> {_MsgId, TotalSize, Offset} =
1652-
hd(ValidMessages),
1646+
lists:last(ValidMessages),
16531647
Offset + TotalSize
16541648
end;
16551649
[_|_] -> FileSize

0 commit comments

Comments
 (0)