Skip to content

Commit 3ae2abe

Browse files
Merge pull request #12491 from rabbitmq/mergify/bp/v4.0.x/pr-12392
CQ: Fix shared store scanner missing messages (backport #12392)
2 parents 46bfbe6 + 237ec93 commit 3ae2abe

File tree

2 files changed

+108
-94
lines changed

2 files changed

+108
-94
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 91 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
-export([compact_file/2, truncate_file/4, delete_file/2]). %% internal
1818

19-
-export([scan_file_for_valid_messages/1]). %% salvage tool
19+
-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool
2020

2121
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
2222
code_change/3, prioritise_call/4, prioritise_cast/3,
@@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) ->
14721472

14731473
-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB
14741474

1475-
scan_file_for_valid_messages(Dir, FileName) ->
1476-
scan_file_for_valid_messages(form_filename(Dir, FileName)).
1477-
1475+
%% Exported as a salvage tool. Not as accurate as node recovery
1476+
%% because it doesn't have the queue index.
14781477
scan_file_for_valid_messages(Path) ->
1478+
scan_file_for_valid_messages(Path, fun(Obj) -> {valid, Obj} end).
1479+
1480+
scan_file_for_valid_messages(Path, Fun) ->
14791481
case file:open(Path, [read, binary, raw]) of
14801482
{ok, Fd} ->
14811483
{ok, FileSize} = file:position(Fd, eof),
14821484
{ok, _} = file:position(Fd, bof),
1483-
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
1485+
Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []),
14841486
ok = file:close(Fd),
1485-
case Messages of
1486-
[] ->
1487-
{ok, [], 0};
1488-
[{_, TotalSize, Offset}|_] ->
1489-
{ok, Messages, Offset + TotalSize}
1490-
end;
1487+
{ok, Messages};
14911488
{error, enoent} ->
1492-
{ok, [], 0};
1489+
{ok, []};
14931490
{error, Reason} ->
14941491
{error, {unable_to_scan_file,
14951492
filename:basename(Path),
14961493
Reason}}
14971494
end.
14981495

1499-
scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
1496+
scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
15001497
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
15011498
eof ->
15021499
Acc;
@@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
15051502
<<>> -> Data0;
15061503
_ -> <<Buffer/binary, Data0/binary>>
15071504
end,
1508-
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
1505+
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
15091506
end.
15101507

15111508
%% Message might have been found.
15121509
scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
1513-
Fd, Offset, FileSize, MsgIdsFound, Acc)
1510+
Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
15141511
when Size >= 16 ->
15151512
<<MsgIdInt:128, _/bits>> = MsgIdAndMsg,
15161513
case MsgIdsFound of
@@ -1519,26 +1516,37 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
15191516
%% simply be a coincidence. Try the next byte.
15201517
#{MsgIdInt := true} ->
15211518
<<_, Rest2/bits>> = Data,
1522-
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc);
1519+
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
15231520
%% Data looks to be a message.
15241521
_ ->
15251522
%% Avoid sub-binary construction.
15261523
MsgId = <<MsgIdInt:128>>,
15271524
TotalSize = Size + 9,
1528-
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
1529-
MsgIdsFound#{MsgIdInt => true},
1530-
[{MsgId, TotalSize, Offset}|Acc])
1525+
case Fun({MsgId, TotalSize, Offset}) of
1526+
%% Confirmed to be a message by the provided fun.
1527+
{valid, Entry} ->
1528+
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
1529+
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
1530+
%% Confirmed to be a message but we don't need it anymore.
1531+
previously_valid ->
1532+
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
1533+
MsgIdsFound#{MsgIdInt => true}, Acc);
1534+
%% Not a message, try the next byte.
1535+
invalid ->
1536+
<<_, Rest2/bits>> = Data,
1537+
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
1538+
end
15311539
end;
15321540
%% This might be the start of a message.
1533-
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
1541+
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
15341542
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
1535-
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
1536-
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
1543+
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
1544+
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
15371545
when byte_size(Data) < 8 ->
1538-
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
1546+
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
15391547
%% This is definitely not a message. Try the next byte.
1540-
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
1541-
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
1548+
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
1549+
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).
15421550

15431551
%%----------------------------------------------------------------------------
15441552
%% Ets index
@@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
17421750

17431751
build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
17441752
File, Files) ->
1745-
FileName = filenum_to_name(File),
1753+
Path = form_filename(Dir, filenum_to_name(File)),
17461754
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
1747-
[form_filename(Dir, FileName), length(Files)]),
1755+
[Path, length(Files)]),
17481756
%% The scan function already dealt with duplicate messages
1749-
%% within the file. We then get messages in reverse order.
1750-
{ok, Messages, FileSize} =
1751-
scan_file_for_valid_messages(Dir, FileName),
1752-
%% Valid messages are in file order so the last message is
1753-
%% the last message from the list.
1754-
{ValidMessages, ValidTotalSize} =
1755-
lists:foldl(
1756-
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1757-
%% Fan-out may result in the same message data in multiple
1758-
%% files so we have to guard against it.
1759-
case index_lookup(IndexEts, MsgId) of
1760-
#msg_location { file = undefined } = StoreEntry ->
1761-
ok = index_update(IndexEts, StoreEntry #msg_location {
1762-
file = File, offset = Offset,
1763-
total_size = TotalSize }),
1764-
{[Obj | VMAcc], VTSAcc + TotalSize};
1765-
_ ->
1766-
{VMAcc, VTSAcc}
1767-
end
1768-
end, {[], 0}, Messages),
1769-
FileSize1 =
1770-
case Files of
1771-
%% if it's the last file, we'll truncate to remove any
1772-
%% rubbish above the last valid message. This affects the
1773-
%% file size.
1774-
[] -> case ValidMessages of
1775-
[] -> 0;
1776-
_ -> {_MsgId, TotalSize, Offset} =
1777-
lists:last(ValidMessages),
1778-
Offset + TotalSize
1779-
end;
1780-
[_|_] -> FileSize
1781-
end,
1757+
%% within the file, and only returns valid messages (we do
1758+
%% the index lookup in the fun). But we get messages in reverse order.
1759+
{ok, Messages} = scan_file_for_valid_messages(Path,
1760+
fun (Obj = {MsgId, TotalSize, Offset}) ->
1761+
%% Fan-out may result in the same message data in multiple
1762+
%% files so we have to guard against it.
1763+
case index_lookup(IndexEts, MsgId) of
1764+
#msg_location { file = undefined } = StoreEntry ->
1765+
ok = index_update(IndexEts, StoreEntry #msg_location {
1766+
file = File, offset = Offset,
1767+
total_size = TotalSize }),
1768+
{valid, Obj};
1769+
_ ->
1770+
invalid
1771+
end
1772+
end),
1773+
ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages),
1774+
%% Any file may have rubbish at the end of it that we will want truncated.
1775+
%% Note that the last message in the file is the first in the list.
1776+
FileSize = case Messages of
1777+
[] ->
1778+
0;
1779+
[{_, TotalSize, Offset}|_] ->
1780+
Offset + TotalSize
1781+
end,
17821782
ok = gatherer:in(Gatherer, #file_summary {
17831783
file = File,
17841784
valid_total_size = ValidTotalSize,
1785-
file_size = FileSize1,
1785+
file_size = FileSize,
17861786
locked = false }),
17871787
ok = gatherer:finish(Gatherer).
17881788

@@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
19331933
%% Load the messages. It's possible to get 0 messages here;
19341934
%% that's OK. That means we have little to do as the file is
19351935
%% about to be deleted.
1936-
{Messages, _} = scan_and_vacuum_message_file(File, State),
1936+
Messages = scan_and_vacuum_message_file(File, State),
19371937
%% Blank holes. We must do this first otherwise the file is left
19381938
%% with data that may confuse the code (for example data that looks
19391939
%% like a message, isn't a message, but spans over a real message).
@@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20872087
_ ->
20882088
[#file_summary{ valid_total_size = 0,
20892089
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
2090-
{[], 0} = scan_and_vacuum_message_file(File, State),
2090+
[] = scan_and_vacuum_message_file(File, State),
20912091
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
20922092
true = ets:delete(FileSummaryEts, File),
20932093
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),
@@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
20962096

20972097
scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
20982098
%% Messages here will be end-of-file at start-of-list
2099-
{ok, Messages, _FileSize} =
2100-
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
2101-
%% foldl will reverse so will end up with msgs in ascending offset order
2102-
lists:foldl(
2103-
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
2104-
case index_lookup(IndexEts, MsgId) of
2105-
#msg_location { file = File, total_size = TotalSize,
2106-
offset = Offset, ref_count = 0 } = Entry ->
2107-
index_delete_object(IndexEts, Entry),
2108-
Acc;
2109-
#msg_location { file = File, total_size = TotalSize,
2110-
offset = Offset } = Entry ->
2111-
{[ Entry | List ], TotalSize + Size};
2112-
%% Fan-out may remove the entry but also write a new
2113-
%% entry in a different file when it needs to write
2114-
%% a message and the existing reference is in a file
2115-
%% that's about to be deleted. So we explicitly accept
2116-
%% these cases and ignore this message.
2117-
#msg_location { file = OtherFile, total_size = TotalSize }
2118-
when File =/= OtherFile ->
2119-
Acc;
2120-
not_found ->
2121-
Acc
2122-
end
2123-
end, {[], 0}, Messages).
2099+
Path = form_filename(Dir, filenum_to_name(File)),
2100+
{ok, Messages} = scan_file_for_valid_messages(Path,
2101+
fun ({MsgId, TotalSize, Offset}) ->
2102+
case index_lookup(IndexEts, MsgId) of
2103+
#msg_location { file = File, total_size = TotalSize,
2104+
offset = Offset, ref_count = 0 } = Entry ->
2105+
index_delete_object(IndexEts, Entry),
2106+
%% The message was valid, but since we have now deleted
2107+
%% it due to having no ref_count, it becomes invalid.
2108+
%% We still want to let the scan function skip though.
2109+
previously_valid;
2110+
#msg_location { file = File, total_size = TotalSize,
2111+
offset = Offset } = Entry ->
2112+
{valid, Entry};
2113+
%% Fan-out may remove the entry but also write a new
2114+
%% entry in a different file when it needs to write
2115+
%% a message and the existing reference is in a file
2116+
%% that's about to be deleted. So we explicitly accept
2117+
%% these cases and ignore this message.
2118+
#msg_location { file = OtherFile, total_size = TotalSize }
2119+
when File =/= OtherFile ->
2120+
invalid;
2121+
not_found ->
2122+
invalid
2123+
end
2124+
end),
2125+
%% @todo Do we really need to reverse messages?
2126+
lists:reverse(Messages).

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,22 @@ msg_store_file_scan1(Config) ->
629629
%% Messages with no content.
630630
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
631631
ok = Scan([{msg, gen_id(), <<>>}]),
632+
%% Tricky messages.
633+
%%
634+
%% These only get properly detected when the index is populated.
635+
%% In this test case we simulate the index with a fun.
636+
TrickyScan = fun (Blocks, Expected, Fun) ->
637+
Path = gen_msg_file(Config, Blocks),
638+
Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun),
639+
case Result of
640+
Expected -> ok;
641+
_ -> {expected, Expected, got, Result}
642+
end
643+
end,
644+
ok = TrickyScan(
645+
[{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}],
646+
{ok, [{<<"idididididididid">>, 4378, 1}]},
647+
fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end),
632648
%% All good!!
633649
passed.
634650

@@ -661,12 +677,7 @@ gen_msg_file(Config, Blocks) ->
661677

662678
gen_result(Blocks) ->
663679
Messages = gen_result(Blocks, 0, []),
664-
case Messages of
665-
[] ->
666-
{ok, [], 0};
667-
[{_, TotalSize, Offset}|_] ->
668-
{ok, Messages, Offset + TotalSize}
669-
end.
680+
{ok, Messages}.
670681

671682
gen_result([], _, Acc) ->
672683
Acc;

0 commit comments

Comments
 (0)