Skip to content

Commit 924df7f

Browse files
committed
Expose some message index related functions for salvage tool
`rabbit_msg_store:scan_file_for_valid_messages/1` `rabbit_queue_index:scan_queue_segments/4`
1 parent 7d8c3a2 commit 924df7f

File tree

2 files changed

+39
-23
lines changed

2 files changed

+39
-23
lines changed

src/rabbit_msg_store.erl

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
-export([set_maximum_since_use/2, combine_files/3,
2727
delete_file/2]). %% internal
2828

29+
-export([scan_file_for_valid_messages/1]). %% salvage tool
30+
2931
-export([transform_dir/3, force_recovery/2]). %% upgrade
3032

3133
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -1401,12 +1403,15 @@ should_mask_action(CRef, MsgId,
14011403
%% file helper functions
14021404
%%----------------------------------------------------------------------------
14031405

1404-
open_file(Dir, FileName, Mode) ->
1406+
open_file(File, Mode) ->
14051407
file_handle_cache:open_with_absolute_path(
1406-
form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
1408+
File, ?BINARY_MODE ++ Mode,
14071409
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
14081410
{read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
14091411

1412+
open_file(Dir, FileName, Mode) ->
1413+
open_file(form_filename(Dir, FileName), Mode).
1414+
14101415
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
14111416
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
14121417

@@ -1696,18 +1701,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
16961701
ok = file_handle_cache:delete(TmpHdl),
16971702
ok.
16981703

1699-
scan_file_for_valid_messages(Dir, FileName) ->
1700-
case open_file(Dir, FileName, ?READ_MODE) of
1704+
scan_file_for_valid_messages(File) ->
1705+
case open_file(File, ?READ_MODE) of
17011706
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
1702-
Hdl, filelib:file_size(
1703-
form_filename(Dir, FileName)),
1707+
Hdl, filelib:file_size(File),
17041708
fun scan_fun/2, []),
17051709
ok = file_handle_cache:close(Hdl),
17061710
Valid;
17071711
{error, enoent} -> {ok, [], 0};
1708-
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
1712+
{error, Reason} -> {error, {unable_to_scan_file,
1713+
filename:basename(File),
1714+
Reason}}
17091715
end.
17101716

1717+
scan_file_for_valid_messages(Dir, FileName) ->
1718+
scan_file_for_valid_messages(form_filename(Dir, FileName)).
1719+
17111720
scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
17121721
[{MsgId, TotalSize, Offset} | Acc].
17131722

src/rabbit_queue_index.erl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
2424

2525
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
26-
-export([scan_queue_segments/3]).
26+
-export([scan_queue_segments/3, scan_queue_segments/4]).
2727

2828
%% Migrates from global to per-vhost message stores
2929
-export([move_to_per_vhost_stores/1,
@@ -264,8 +264,9 @@
264264

265265
-spec erase(rabbit_amqqueue:name()) -> 'ok'.
266266

267-
erase(Name) ->
268-
#qistate { dir = Dir } = blank_state(Name),
267+
erase(#resource{ virtual_host = VHost } = Name) ->
268+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
269+
#qistate { dir = Dir } = blank_state(VHostDir, Name),
269270
erase_index_dir(Dir).
270271

271272
%% used during variable queue purge when there are no pending acks
@@ -287,8 +288,9 @@ reset_state(#qistate{ queue_name = Name,
287288
-spec init(rabbit_amqqueue:name(),
288289
on_sync_fun(), on_sync_fun()) -> qistate().
289290

290-
init(Name, OnSyncFun, OnSyncMsgFun) ->
291-
State = #qistate { dir = Dir } = blank_state(Name),
291+
init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
292+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
293+
State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
292294
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
293295
State#qistate{on_sync = OnSyncFun,
294296
on_sync_msg = OnSyncMsgFun}.
@@ -299,9 +301,10 @@ init(Name, OnSyncFun, OnSyncMsgFun) ->
299301
{'undefined' | non_neg_integer(),
300302
'undefined' | non_neg_integer(), qistate()}.
301303

302-
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
303-
OnSyncFun, OnSyncMsgFun) ->
304-
State = blank_state(Name),
304+
recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
305+
ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
306+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
307+
State = blank_state(VHostDir, Name),
305308
State1 = State #qistate{on_sync = OnSyncFun,
306309
on_sync_msg = OnSyncMsgFun},
307310
CleanShutdown = Terms /= non_clean_shutdown,
@@ -558,17 +561,16 @@ erase_index_dir(Dir) ->
558561
false -> ok
559562
end.
560563

561-
blank_state(QueueName) ->
562-
Dir = queue_dir(QueueName),
564+
blank_state(VHostDir, QueueName) ->
565+
Dir = queue_dir(VHostDir, QueueName),
563566
blank_state_name_dir_funs(QueueName,
564567
Dir,
565568
fun (_) -> ok end,
566569
fun (_) -> ok end).
567570

568-
queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
571+
queue_dir(VHostDir, QueueName) ->
569572
%% Queue directory is
570573
%% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
571-
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
572574
QueueDir = queue_name_to_dir_name(QueueName),
573575
filename:join([VHostDir, "queues", QueueDir]).
574576

@@ -734,9 +736,13 @@ queue_index_walker_reader(QueueName, Gatherer) ->
734736
end, ok, QueueName),
735737
ok = gatherer:finish(Gatherer).
736738

737-
scan_queue_segments(Fun, Acc, QueueName) ->
739+
scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
740+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
741+
scan_queue_segments(Fun, Acc, VHostDir, QueueName).
742+
743+
scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
738744
State = #qistate { segments = Segments, dir = Dir } =
739-
recover_journal(blank_state(QueueName)),
745+
recover_journal(blank_state(VHostDir, QueueName)),
740746
Result = lists:foldr(
741747
fun (Seg, AccN) ->
742748
segment_entries_foldr(
@@ -1468,10 +1474,11 @@ drive_transform_fun(Fun, Hdl, Contents) ->
14681474
drive_transform_fun(Fun, Hdl, Contents1)
14691475
end.
14701476

1471-
move_to_per_vhost_stores(#resource{} = QueueName) ->
1477+
move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
14721478
OldQueueDir = filename:join([queues_base_dir(), "queues",
14731479
queue_name_to_dir_name_legacy(QueueName)]),
1474-
NewQueueDir = queue_dir(QueueName),
1480+
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
1481+
NewQueueDir = queue_dir(VHostDir, QueueName),
14751482
rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
14761483
[OldQueueDir, NewQueueDir]),
14771484
case rabbit_file:is_dir(OldQueueDir) of

0 commit comments

Comments
 (0)