Skip to content

Commit 5c69639

Browse files
author
Daniil Fedotov
committed
Change directory name generation function for queue indexes and vhosts.
It was a mistake to relate on md5(term_to_binary(..)) to generate vhost and queue directory name, since term_to_binary format can change. Migration functions take care of renaming directories.
1 parent 35f4308 commit 5c69639

File tree

3 files changed

+70
-47
lines changed

3 files changed

+70
-47
lines changed

src/rabbit_queue_index.erl

Lines changed: 65 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
%% ---- Journal details ----
117117

118118
-define(JOURNAL_FILENAME, "journal.jif").
119+
-define(QUEUE_NAME_STUB_FILE, ".queue_name").
119120

120121
-define(PUB_PERSIST_JPREFIX, 2#00).
121122
-define(PUB_TRANS_JPREFIX, 2#01).
@@ -204,7 +205,9 @@
204205
%% optimisation
205206
pre_publish_cache,
206207
%% optimisation
207-
delivered_cache}).
208+
delivered_cache,
209+
%% queue name resource record
210+
queue_name}).
208211

209212
-record(segment, {
210213
%% segment ID (an integer)
@@ -295,7 +298,8 @@ erase(Name) ->
295298
erase_index_dir(Dir).
296299

297300
%% used during variable queue purge when there are no pending acks
298-
reset_state(#qistate{ dir = Dir,
301+
reset_state(#qistate{ queue_name = Name,
302+
dir = Dir,
299303
on_sync = OnSyncFun,
300304
on_sync_msg = OnSyncMsgFun,
301305
journal_handle = JournalHdl }) ->
@@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir,
304308
_ -> file_handle_cache:close(JournalHdl)
305309
end,
306310
ok = erase_index_dir(Dir),
307-
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
311+
blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
308312

309313
init(Name, OnSyncFun, OnSyncMsgFun) ->
310314
State = #qistate { dir = Dir } = blank_state(Name),
@@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) ->
520524
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
521525

522526

523-
read_global_recovery_terms(DurableQueueNames) ->
524-
ok = rabbit_recovery_terms:open_global_table(),
525-
526-
DurableTerms =
527-
lists:foldl(
528-
fun(QName, RecoveryTerms) ->
529-
DirName = queue_name_to_dir_name(QName),
530-
RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
531-
{error, _} -> non_clean_shutdown;
532-
{ok, Terms} -> Terms
533-
end,
534-
[RecoveryInfo | RecoveryTerms]
535-
end, [], DurableQueueNames),
536-
537-
ok = rabbit_recovery_terms:close_global_table(),
538-
%% The backing queue interface requires that the queue recovery terms
539-
%% which come back from start/1 are in the same order as DurableQueueNames
540-
OrderedTerms = lists:reverse(DurableTerms),
541-
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
542-
543-
cleanup_global_recovery_terms() ->
544-
rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
545-
rabbit_recovery_terms:delete_global_table(),
546-
ok.
547-
548-
549527
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
550528

551529
all_queue_directory_names(VHost) ->
@@ -567,10 +545,9 @@ erase_index_dir(Dir) ->
567545
end.
568546

569547
blank_state(QueueName) ->
570-
blank_state_dir(queue_dir(QueueName)).
571-
572-
blank_state_dir(Dir) ->
573-
blank_state_dir_funs(Dir,
548+
Dir = queue_dir(QueueName),
549+
blank_state_name_dir_funs(QueueName,
550+
Dir,
574551
fun (_) -> ok end,
575552
fun (_) -> ok end).
576553

@@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
581558
QueueDir = queue_name_to_dir_name(QueueName),
582559
filename:join([VHostDir, "queues", QueueDir]).
583560

584-
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
561+
queue_name_to_dir_name(#resource { kind = queue,
562+
virtual_host = VHost,
563+
name = QName }) ->
564+
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
565+
rabbit_misc:format("~.36B", [Num]).
566+
567+
queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
568+
<<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
569+
rabbit_misc:format("~.36B", [Num]).
570+
571+
queues_base_dir() ->
572+
rabbit_mnesia:dir().
573+
574+
blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
585575
{ok, MaxJournal} =
586576
application:get_env(rabbit, queue_index_max_journal_entries),
587577
#qistate { dir = Dir,
@@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
594584
unconfirmed = gb_sets:new(),
595585
unconfirmed_msg = gb_sets:new(),
596586
pre_publish_cache = [],
597-
delivered_cache = [] }.
587+
delivered_cache = [],
588+
queue_name = Name }.
598589

599590
init_clean(RecoveredCounts, State) ->
600591
%% Load the journal. Since this is a clean recovery this (almost)
@@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
690681
add_to_journal(RelSeq, del, Segment)),
691682
DirtyCount + 2}.
692683

693-
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
694-
<<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
695-
rabbit_misc:format("~.36B", [Num]).
696-
697-
queues_base_dir() ->
698-
rabbit_mnesia:dir().
699-
700684
%%----------------------------------------------------------------------------
701685
%% msg store startup delta function
702686
%%----------------------------------------------------------------------------
@@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
890874
end.
891875

892876
get_journal_handle(State = #qistate { journal_handle = undefined,
893-
dir = Dir }) ->
877+
dir = Dir,
878+
queue_name = Name }) ->
894879
Path = filename:join(Dir, ?JOURNAL_FILENAME),
895880
ok = rabbit_file:ensure_dir(Path),
881+
ok = ensure_queue_name_stub_file(Dir, Name),
896882
{ok, Hdl} = file_handle_cache:open_with_absolute_path(
897883
Path, ?WRITE_MODE, [{write_buffer, infinity}]),
898884
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -1413,7 +1399,8 @@ store_msg_segment(_) ->
14131399

14141400

14151401

1416-
1402+
%%----------------------------------------------------------------------------
1403+
%% Migration functions
14171404
%%----------------------------------------------------------------------------
14181405

14191406
foreach_queue_index(Funs) ->
@@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) ->
14671454

14681455
move_to_per_vhost_stores(#resource{} = QueueName) ->
14691456
OldQueueDir = filename:join([queues_base_dir(), "queues",
1470-
queue_name_to_dir_name(QueueName)]),
1457+
queue_name_to_dir_name_legacy(QueueName)]),
14711458
NewQueueDir = queue_dir(QueueName),
14721459
case rabbit_file:is_dir(OldQueueDir) of
14731460
true ->
14741461
ok = rabbit_file:ensure_dir(NewQueueDir),
1475-
ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
1462+
ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
1463+
ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
14761464
false ->
14771465
rabbit_log:info("Queue index directory not found for queue ~p~n",
14781466
[QueueName])
14791467
end,
14801468
ok.
14811469

1470+
ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
1471+
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
1472+
file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
1473+
"QUEUE: ", QName/binary, "\n">>).
1474+
1475+
read_global_recovery_terms(DurableQueueNames) ->
1476+
ok = rabbit_recovery_terms:open_global_table(),
1477+
1478+
DurableTerms =
1479+
lists:foldl(
1480+
fun(QName, RecoveryTerms) ->
1481+
DirName = queue_name_to_dir_name_legacy(QName),
1482+
RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
1483+
{error, _} -> non_clean_shutdown;
1484+
{ok, Terms} -> Terms
1485+
end,
1486+
[RecoveryInfo | RecoveryTerms]
1487+
end, [], DurableQueueNames),
1488+
1489+
ok = rabbit_recovery_terms:close_global_table(),
1490+
%% The backing queue interface requires that the queue recovery terms
1491+
%% which come back from start/1 are in the same order as DurableQueueNames
1492+
OrderedTerms = lists:reverse(DurableTerms),
1493+
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
1494+
1495+
cleanup_global_recovery_terms() ->
1496+
rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
1497+
rabbit_recovery_terms:delete_global_table(),
1498+
ok.
1499+
1500+
14821501
update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
14831502
Key = queue_name_to_dir_name(QueueName),
14841503
rabbit_recovery_terms:store(VHost, Key, Term).

src/rabbit_vhost.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ set_limits(VHost = #vhost{}, Limits) ->
213213

214214

215215
dir(Vhost) ->
216-
<<Num:128>> = erlang:md5(term_to_binary(Vhost)),
216+
<<Num:128>> = erlang:md5(Vhost),
217217
rabbit_misc:format("~.36B", [Num]).
218218

219219
msg_store_dir_path(VHost) ->

test/clustering_management_SUITE.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,10 @@ change_cluster_node_type(Config) ->
402402
assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]},
403403
[Rabbit, Hare]),
404404
change_cluster_node_type(Rabbit, disc),
405+
406+
rabbit_control_helper:command(cluster_status, Rabbit, []),
407+
rabbit_control_helper:command(cluster_status, Hare, []),
408+
405409
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
406410
[Rabbit, Hare]),
407411
change_cluster_node_type(Rabbit, ram),

0 commit comments

Comments
 (0)