Skip to content

Commit 05a4ce9

Browse files
HoloRinmichaelklishin
authored andcommitted
Expose some message index related functions for salvage tool
`rabbit_msg_store:scan_file_for_valid_messages/1` `rabbit_queue_index:scan_queue_segments/4` (cherry picked from commit 924df7f) Conflicts: deps/rabbit/src/rabbit_queue_index.erl
1 parent 88f90a2 commit 05a4ce9

File tree

3 files changed

+42
-24
lines changed

3 files changed

+42
-24
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
-export([set_maximum_since_use/2, combine_files/3,
1818
delete_file/2]). %% internal
1919

20+
-export([scan_file_for_valid_messages/1]). %% salvage tool
21+
2022
-export([transform_dir/3, force_recovery/2]). %% upgrade
2123

2224
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -1392,12 +1394,15 @@ should_mask_action(CRef, MsgId,
13921394
%% file helper functions
13931395
%%----------------------------------------------------------------------------
13941396

1395-
open_file(Dir, FileName, Mode) ->
1397+
open_file(File, Mode) ->
13961398
file_handle_cache:open_with_absolute_path(
1397-
form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
1399+
File, ?BINARY_MODE ++ Mode,
13981400
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
13991401
{read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
14001402

1403+
open_file(Dir, FileName, Mode) ->
1404+
open_file(form_filename(Dir, FileName), Mode).
1405+
14011406
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
14021407
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
14031408

@@ -1687,18 +1692,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
16871692
ok = file_handle_cache:delete(TmpHdl),
16881693
ok.
16891694

1690-
scan_file_for_valid_messages(Dir, FileName) ->
1691-
case open_file(Dir, FileName, ?READ_MODE) of
1695+
scan_file_for_valid_messages(File) ->
1696+
case open_file(File, ?READ_MODE) of
16921697
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
1693-
Hdl, filelib:file_size(
1694-
form_filename(Dir, FileName)),
1698+
Hdl, filelib:file_size(File),
16951699
fun scan_fun/2, []),
16961700
ok = file_handle_cache:close(Hdl),
16971701
Valid;
16981702
{error, enoent} -> {ok, [], 0};
1699-
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
1703+
{error, Reason} -> {error, {unable_to_scan_file,
1704+
filename:basename(File),
1705+
Reason}}
17001706
end.
17011707

1708+
scan_file_for_valid_messages(Dir, FileName) ->
1709+
scan_file_for_valid_messages(form_filename(Dir, FileName)).
1710+
17021711
scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
17031712
[{MsgId, TotalSize, Offset} | Acc].
17041713

deps/rabbit/src/rabbit_queue_index.erl

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
1717

1818
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
19-
-export([scan_queue_segments/3]).
19+
-export([scan_queue_segments/3, scan_queue_segments/4]).
2020

2121
%% Migrates from global to per-vhost message stores
2222
-export([move_to_per_vhost_stores/1,
@@ -255,8 +255,9 @@
255255

256256
-spec erase(rabbit_amqqueue:name()) -> 'ok'.
257257

258-
erase(Name) ->
259-
#qistate { dir = Dir } = blank_state(Name),
258+
erase(#resource{ virtual_host = VHost } = Name) ->
259+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
260+
#qistate { dir = Dir } = blank_state(VHostDir, Name),
260261
erase_index_dir(Dir).
261262

262263
%% used during variable queue purge when there are no pending acks
@@ -278,8 +279,9 @@ reset_state(#qistate{ queue_name = Name,
278279
-spec init(rabbit_amqqueue:name(),
279280
on_sync_fun(), on_sync_fun()) -> qistate().
280281

281-
init(Name, OnSyncFun, OnSyncMsgFun) ->
282-
State = #qistate { dir = Dir } = blank_state(Name),
282+
init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
283+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
284+
State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
283285
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
284286
State#qistate{on_sync = OnSyncFun,
285287
on_sync_msg = OnSyncMsgFun}.
@@ -290,9 +292,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) ->
290292
{'undefined' | non_neg_integer(),
291293
'undefined' | non_neg_integer(), qistate()}.
292294

293-
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
294-
OnSyncFun, OnSyncMsgFun) ->
295-
State = blank_state(Name),
295+
recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
296+
ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
297+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
298+
State = blank_state(VHostDir, Name),
296299
State1 = State #qistate{on_sync = OnSyncFun,
297300
on_sync_msg = OnSyncMsgFun},
298301
CleanShutdown = Terms /= non_clean_shutdown,
@@ -549,17 +552,16 @@ erase_index_dir(Dir) ->
549552
false -> ok
550553
end.
551554

552-
blank_state(QueueName) ->
553-
Dir = queue_dir(QueueName),
555+
blank_state(VHostDir, QueueName) ->
556+
Dir = queue_dir(VHostDir, QueueName),
554557
blank_state_name_dir_funs(QueueName,
555558
Dir,
556559
fun (_) -> ok end,
557560
fun (_) -> ok end).
558561

559-
queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
562+
queue_dir(VHostDir, QueueName) ->
560563
%% Queue directory is
561564
%% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
562-
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
563565
QueueDir = queue_name_to_dir_name(QueueName),
564566
filename:join([VHostDir, "queues", QueueDir]).
565567

@@ -725,9 +727,13 @@ queue_index_walker_reader(QueueName, Gatherer) ->
725727
end, ok, QueueName),
726728
ok = gatherer:finish(Gatherer).
727729

728-
scan_queue_segments(Fun, Acc, QueueName) ->
730+
scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
731+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
732+
scan_queue_segments(Fun, Acc, VHostDir, QueueName).
733+
734+
scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
729735
State = #qistate { segments = Segments, dir = Dir } =
730-
recover_journal(blank_state(QueueName)),
736+
recover_journal(blank_state(VHostDir, QueueName)),
731737
Result = lists:foldr(
732738
fun (Seg, AccN) ->
733739
segment_entries_foldr(
@@ -1465,12 +1471,13 @@ drive_transform_fun(Fun, Hdl, Contents) ->
14651471
drive_transform_fun(Fun, Hdl, Contents1)
14661472
end.
14671473

1468-
move_to_per_vhost_stores(#resource{} = QueueName) ->
1474+
move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
14691475
OldQueueDir = filename:join([queues_base_dir(), "queues",
14701476
queue_name_to_dir_name_legacy(QueueName)]),
1471-
NewQueueDir = queue_dir(QueueName),
1477+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
1478+
NewQueueDir = queue_dir(VHostDir, QueueName),
14721479
_ = rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
1473-
[OldQueueDir, NewQueueDir]),
1480+
[OldQueueDir, NewQueueDir]),
14741481
case rabbit_file:is_dir(OldQueueDir) of
14751482
true ->
14761483
ok = rabbit_file:ensure_dir(NewQueueDir),

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ start_queue(Config) ->
382382

383383
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
384384
LQ = ?config(queue_name, Config),
385+
Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
385386
?assertEqual({'queue.declare_ok', LQ, 0, 0},
386387
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
387388

@@ -404,6 +405,7 @@ start_queue(Config) ->
404405
%% Check that the application and process are still up
405406
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
406407
rpc:call(Server, application, which_applications, []))),
408+
Expected = Children + 1,
407409
?assertMatch(Expected,
408410
length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
409411

0 commit comments

Comments
 (0)