116
116
% % ---- Journal details ----
117
117
118
118
-define (JOURNAL_FILENAME , " journal.jif" ).
119
+ -define (QUEUE_NAME_STUB_FILE , " .queue_name" ).
119
120
120
121
-define (PUB_PERSIST_JPREFIX , 2#00 ).
121
122
-define (PUB_TRANS_JPREFIX , 2#01 ).
204
205
% % optimisation
205
206
pre_publish_cache ,
206
207
% % optimisation
207
- delivered_cache }).
208
+ delivered_cache ,
209
+ % % queue name resource record
210
+ queue_name }).
208
211
209
212
-record (segment , {
210
213
% % segment ID (an integer)
@@ -295,7 +298,8 @@ erase(Name) ->
295
298
erase_index_dir (Dir ).
296
299
297
300
% % used during variable queue purge when there are no pending acks
298
- reset_state (# qistate { dir = Dir ,
301
+ reset_state (# qistate { queue_name = Name ,
302
+ dir = Dir ,
299
303
on_sync = OnSyncFun ,
300
304
on_sync_msg = OnSyncMsgFun ,
301
305
journal_handle = JournalHdl }) ->
@@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir,
304
308
_ -> file_handle_cache :close (JournalHdl )
305
309
end ,
306
310
ok = erase_index_dir (Dir ),
307
- blank_state_dir_funs ( Dir , OnSyncFun , OnSyncMsgFun ).
311
+ blank_state_name_dir_funs ( Name , Dir , OnSyncFun , OnSyncMsgFun ).
308
312
309
313
init (Name , OnSyncFun , OnSyncMsgFun ) ->
310
314
State = # qistate { dir = Dir } = blank_state (Name ),
@@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) ->
520
524
{OrderedTerms , {fun queue_index_walker /1 , {start , DurableQueueNames }}}.
521
525
522
526
523
- read_global_recovery_terms (DurableQueueNames ) ->
524
- ok = rabbit_recovery_terms :open_global_table (),
525
-
526
- DurableTerms =
527
- lists :foldl (
528
- fun (QName , RecoveryTerms ) ->
529
- DirName = queue_name_to_dir_name (QName ),
530
- RecoveryInfo = case rabbit_recovery_terms :read_global (DirName ) of
531
- {error , _ } -> non_clean_shutdown ;
532
- {ok , Terms } -> Terms
533
- end ,
534
- [RecoveryInfo | RecoveryTerms ]
535
- end , [], DurableQueueNames ),
536
-
537
- ok = rabbit_recovery_terms :close_global_table (),
538
- % % The backing queue interface requires that the queue recovery terms
539
- % % which come back from start/1 are in the same order as DurableQueueNames
540
- OrderedTerms = lists :reverse (DurableTerms ),
541
- {OrderedTerms , {fun queue_index_walker /1 , {start , DurableQueueNames }}}.
542
-
543
- cleanup_global_recovery_terms () ->
544
- rabbit_file :recursive_delete ([filename :join ([queues_base_dir (), " queues" ])]),
545
- rabbit_recovery_terms :delete_global_table (),
546
- ok .
547
-
548
-
549
527
stop (VHost ) -> rabbit_recovery_terms :stop (VHost ).
550
528
551
529
all_queue_directory_names (VHost ) ->
@@ -567,10 +545,9 @@ erase_index_dir(Dir) ->
567
545
end .
568
546
569
547
blank_state (QueueName ) ->
570
- blank_state_dir (queue_dir (QueueName )).
571
-
572
- blank_state_dir (Dir ) ->
573
- blank_state_dir_funs (Dir ,
548
+ Dir = queue_dir (QueueName ),
549
+ blank_state_name_dir_funs (QueueName ,
550
+ Dir ,
574
551
fun (_ ) -> ok end ,
575
552
fun (_ ) -> ok end ).
576
553
@@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
581
558
QueueDir = queue_name_to_dir_name (QueueName ),
582
559
filename :join ([VHostDir , " queues" , QueueDir ]).
583
560
584
- blank_state_dir_funs (Dir , OnSyncFun , OnSyncMsgFun ) ->
561
+ queue_name_to_dir_name (# resource { kind = queue ,
562
+ virtual_host = VHost ,
563
+ name = QName }) ->
564
+ <<Num :128 >> = erlang :md5 (<<" queue" , VHost /binary , QName /binary >>),
565
+ rabbit_misc :format (" ~.36B " , [Num ]).
566
+
567
+ queue_name_to_dir_name_legacy (Name = # resource { kind = queue }) ->
568
+ <<Num :128 >> = erlang :md5 (term_to_binary_compat :queue_name_to_binary (Name )),
569
+ rabbit_misc :format (" ~.36B " , [Num ]).
570
+
571
+ queues_base_dir () ->
572
+ rabbit_mnesia :dir ().
573
+
574
+ blank_state_name_dir_funs (Name , Dir , OnSyncFun , OnSyncMsgFun ) ->
585
575
{ok , MaxJournal } =
586
576
application :get_env (rabbit , queue_index_max_journal_entries ),
587
577
# qistate { dir = Dir ,
@@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
594
584
unconfirmed = gb_sets :new (),
595
585
unconfirmed_msg = gb_sets :new (),
596
586
pre_publish_cache = [],
597
- delivered_cache = [] }.
587
+ delivered_cache = [],
588
+ queue_name = Name }.
598
589
599
590
init_clean (RecoveredCounts , State ) ->
600
591
% % Load the journal. Since this is a clean recovery this (almost)
@@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
690
681
add_to_journal (RelSeq , del , Segment )),
691
682
DirtyCount + 2 }.
692
683
693
- queue_name_to_dir_name (Name = # resource { kind = queue }) ->
694
- <<Num :128 >> = erlang :md5 (term_to_binary_compat :term_to_binary_1 (Name )),
695
- rabbit_misc :format (" ~.36B " , [Num ]).
696
-
697
- queues_base_dir () ->
698
- rabbit_mnesia :dir ().
699
-
700
684
% %----------------------------------------------------------------------------
701
685
% % msg store startup delta function
702
686
% %----------------------------------------------------------------------------
@@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
890
874
end .
891
875
892
876
get_journal_handle (State = # qistate { journal_handle = undefined ,
893
- dir = Dir }) ->
877
+ dir = Dir ,
878
+ queue_name = Name }) ->
894
879
Path = filename :join (Dir , ? JOURNAL_FILENAME ),
895
880
ok = rabbit_file :ensure_dir (Path ),
881
+ ok = ensure_queue_name_stub_file (Dir , Name ),
896
882
{ok , Hdl } = file_handle_cache :open_with_absolute_path (
897
883
Path , ? WRITE_MODE , [{write_buffer , infinity }]),
898
884
{Hdl , State # qistate { journal_handle = Hdl }};
@@ -1413,7 +1399,8 @@ store_msg_segment(_) ->
1413
1399
1414
1400
1415
1401
1416
-
1402
+ % %----------------------------------------------------------------------------
1403
+ % % Migration functions
1417
1404
% %----------------------------------------------------------------------------
1418
1405
1419
1406
foreach_queue_index (Funs ) ->
@@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) ->
1467
1454
1468
1455
move_to_per_vhost_stores (# resource {} = QueueName ) ->
1469
1456
OldQueueDir = filename :join ([queues_base_dir (), " queues" ,
1470
- queue_name_to_dir_name (QueueName )]),
1457
+ queue_name_to_dir_name_legacy (QueueName )]),
1471
1458
NewQueueDir = queue_dir (QueueName ),
1472
1459
case rabbit_file :is_dir (OldQueueDir ) of
1473
1460
true ->
1474
1461
ok = rabbit_file :ensure_dir (NewQueueDir ),
1475
- ok = rabbit_file :rename (OldQueueDir , NewQueueDir );
1462
+ ok = rabbit_file :rename (OldQueueDir , NewQueueDir ),
1463
+ ok = ensure_queue_name_stub_file (NewQueueDir , QueueName );
1476
1464
false ->
1477
1465
rabbit_log :info (" Queue index directory not found for queue ~p~n " ,
1478
1466
[QueueName ])
1479
1467
end ,
1480
1468
ok .
1481
1469
1470
+ ensure_queue_name_stub_file (Dir , # resource {virtual_host = VHost , name = QName }) ->
1471
+ QueueNameFile = filename :join (Dir , ? QUEUE_NAME_STUB_FILE ),
1472
+ file :write_file (QueueNameFile , <<" VHOST: " , VHost /binary , " \n " ,
1473
+ " QUEUE: " , QName /binary , " \n " >>).
1474
+
1475
+ read_global_recovery_terms (DurableQueueNames ) ->
1476
+ ok = rabbit_recovery_terms :open_global_table (),
1477
+
1478
+ DurableTerms =
1479
+ lists :foldl (
1480
+ fun (QName , RecoveryTerms ) ->
1481
+ DirName = queue_name_to_dir_name_legacy (QName ),
1482
+ RecoveryInfo = case rabbit_recovery_terms :read_global (DirName ) of
1483
+ {error , _ } -> non_clean_shutdown ;
1484
+ {ok , Terms } -> Terms
1485
+ end ,
1486
+ [RecoveryInfo | RecoveryTerms ]
1487
+ end , [], DurableQueueNames ),
1488
+
1489
+ ok = rabbit_recovery_terms :close_global_table (),
1490
+ % % The backing queue interface requires that the queue recovery terms
1491
+ % % which come back from start/1 are in the same order as DurableQueueNames
1492
+ OrderedTerms = lists :reverse (DurableTerms ),
1493
+ {OrderedTerms , {fun queue_index_walker /1 , {start , DurableQueueNames }}}.
1494
+
1495
+ cleanup_global_recovery_terms () ->
1496
+ rabbit_file :recursive_delete ([filename :join ([queues_base_dir (), " queues" ])]),
1497
+ rabbit_recovery_terms :delete_global_table (),
1498
+ ok .
1499
+
1500
+
1482
1501
update_recovery_term (# resource {virtual_host = VHost } = QueueName , Term ) ->
1483
1502
Key = queue_name_to_dir_name (QueueName ),
1484
1503
rabbit_recovery_terms :store (VHost , Key , Term ).
0 commit comments