@@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
313
313
'undefined' | non_neg_integer (), qistate ()}.
314
314
315
315
recover (# resource { virtual_host = VHost } = Name , Terms , MsgStoreRecovered ,
316
- ContainsCheckFun , OnSyncFun , OnSyncMsgFun , Context ) ->
316
+ ContainsCheckFun , OnSyncFun , OnSyncMsgFun ,
317
+ % % We only allow using this module when converting to v2.
318
+ convert ) ->
317
319
#{segment_entry_count := SegmentEntryCount } = rabbit_vhost :read_config (VHost ),
318
320
put (segment_entry_count , SegmentEntryCount ),
319
321
VHostDir = rabbit_vhost :msg_store_dir_path (VHost ),
@@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
323
325
CleanShutdown = Terms /= non_clean_shutdown ,
324
326
case CleanShutdown andalso MsgStoreRecovered of
325
327
true -> case proplists :get_value (segments , Terms , non_clean_shutdown ) of
326
- non_clean_shutdown -> init_dirty (false , ContainsCheckFun , State1 , Context );
328
+ non_clean_shutdown -> init_dirty (false , ContainsCheckFun , State1 );
327
329
RecoveredCounts -> init_clean (RecoveredCounts , State1 )
328
330
end ;
329
- false -> init_dirty (CleanShutdown , ContainsCheckFun , State1 , Context )
331
+ false -> init_dirty (CleanShutdown , ContainsCheckFun , State1 )
330
332
end .
331
333
332
334
-spec terminate (rabbit_types :vhost (), [any ()], qistate ()) -> qistate ().
@@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
644
646
-define (RECOVER_BYTES , 2 ).
645
647
-define (RECOVER_COUNTER_SIZE , 2 ).
646
648
647
- init_dirty (CleanShutdown , ContainsCheckFun , State , Context ) ->
649
+ init_dirty (CleanShutdown , ContainsCheckFun , State ) ->
648
650
% % Recover the journal completely. This will also load segments
649
651
% % which have entries in the journal and remove duplicates. The
650
652
% % counts will correctly reflect the combination of the segment
@@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
679
681
% % recovery fails with a crash.
680
682
State2 = flush_journal (State1 # qistate { segments = Segments1 ,
681
683
dirty_count = DirtyCount }),
682
- case Context of
683
- convert ->
684
- {Count , Bytes , State2 };
685
- main ->
686
- % % We try to see if there are segment files from the v2 index.
687
- case rabbit_file :wildcard (" .*\\ .qi" , Dir ) of
688
- % % We are recovering a dirty queue that was using the v2 index or in
689
- % % the process of converting from v2 to v1.
690
- [_ |_ ] ->
691
- # resource {virtual_host = VHost , name = QName } = State2 # qistate .queue_name ,
692
- rabbit_log :info (" Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert" ,
693
- [QName , VHost , Count ]),
694
- CountersRef = counters :new (? RECOVER_COUNTER_SIZE , []),
695
- State3 = recover_index_v2_dirty (State2 , ContainsCheckFun , CountersRef ),
696
- {Count + counters :get (CountersRef , ? RECOVER_COUNT ),
697
- Bytes + counters :get (CountersRef , ? RECOVER_BYTES ),
698
- State3 };
699
- % % Otherwise keep default values.
700
- [] ->
701
- {Count , Bytes , State2 }
702
- end
703
- end .
704
-
705
- recover_index_v2_dirty (State0 = # qistate { queue_name = Name ,
706
- on_sync = OnSyncFun ,
707
- on_sync_msg = OnSyncMsgFun },
708
- ContainsCheckFun , CountersRef ) ->
709
- # resource {virtual_host = VHost , name = QName } = Name ,
710
- rabbit_log :info (" Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown" , [QName , VHost ]),
711
- % % We cannot use the counts/bytes because some messages may be in both
712
- % % the v1 and v2 indexes after a crash.
713
- {_ , _ , V2State } = rabbit_classic_queue_index_v2 :recover (Name , non_clean_shutdown , true ,
714
- ContainsCheckFun , OnSyncFun , OnSyncMsgFun ,
715
- convert ),
716
- State = recover_index_v2_common (State0 , V2State , CountersRef ),
717
- rabbit_log :info (" Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1" ,
718
- [QName , VHost , counters :get (CountersRef , ? RECOVER_COUNT )]),
719
- State .
720
-
721
- % % At this point all messages are persistent because transient messages
722
- % % were dropped during the v2 index recovery.
723
- recover_index_v2_common (State0 = # qistate { queue_name = Name , dir = Dir },
724
- V2State , CountersRef ) ->
725
- % % Use a temporary per-queue store state to read embedded messages.
726
- StoreState0 = rabbit_classic_queue_store_v2 :init (Name ),
727
- % % Go through the v2 index and publish messages to v1 index.
728
- {LoSeqId , HiSeqId , _ } = rabbit_classic_queue_index_v2 :bounds (V2State ),
729
- % % When resuming after a crash we need to double check the messages that are both
730
- % % in the v1 and v2 index (effectively the messages below the upper bound of the
731
- % % v1 index that are about to be written to it).
732
- {_ , V1HiSeqId , _ } = bounds (State0 ),
733
- SkipFun = fun
734
- (SeqId , FunState0 ) when SeqId < V1HiSeqId ->
735
- case read (SeqId , SeqId + 1 , FunState0 ) of
736
- % % Message already exists, skip.
737
- {[_ ], FunState } ->
738
- {skip , FunState };
739
- % % Message doesn't exist, write.
740
- {[], FunState } ->
741
- {write , FunState }
742
- end ;
743
- % % Message is out of bounds of the v1 index.
744
- (_ , FunState ) ->
745
- {write , FunState }
746
- end ,
747
- % % We use a common function also used with conversion on policy change.
748
- {State1 , _StoreState } = rabbit_variable_queue :convert_from_v2_to_v1_loop (Name , State0 , V2State , StoreState0 ,
749
- {CountersRef , ? RECOVER_COUNT , ? RECOVER_BYTES },
750
- LoSeqId , HiSeqId , SkipFun ),
751
- % % Delete any remaining v2 index files.
752
- OldFiles = rabbit_file :wildcard (" .*\\ .qi" , Dir )
753
- ++ rabbit_file :wildcard (" .*\\ .qs" , Dir ),
754
- _ = [rabbit_file :delete (filename :join (Dir , F )) || F <- OldFiles ],
755
- % % Ensure that everything in the v1 index is written to disk.
756
- State = flush (State1 ),
757
- % % Clean up all the garbage that we have surely been creating.
758
- garbage_collect (),
759
- State .
684
+ {Count , Bytes , State2 }.
760
685
761
686
terminate (State = # qistate { journal_handle = JournalHdl ,
762
687
segments = Segments }) ->
0 commit comments