@@ -302,25 +302,35 @@ recover_segment(State, ContainsCheckFun, CountersRef, Fd,
302
302
Unacked - 1 , LocBytes0 )
303
303
end .
304
304
305
- recover_index_v1_clean (State = # mqistate { queue_name = Name }, Terms , IsMsgStoreClean ,
305
+ recover_index_v1_clean (State0 = # mqistate { queue_name = Name }, Terms , IsMsgStoreClean ,
306
306
ContainsCheckFun , OnSyncFun , OnSyncMsgFun ) ->
307
+ # resource {virtual_host = VHost , name = QName } = Name ,
308
+ logger :info (" Converting clean queue ~s on vhost ~s to the new index format" , [QName , VHost ]),
307
309
{_ , _ , V1State } = rabbit_queue_index :recover (Name , Terms , IsMsgStoreClean ,
308
310
ContainsCheckFun , OnSyncFun , OnSyncMsgFun ),
309
311
% % We will ignore the counter results because on clean shutdown
310
312
% % we do not need to calculate the values again. This lets us
311
313
% % share code with dirty recovery.
312
314
DummyCountersRef = counters :new (2 , []),
313
- recover_index_v1_common (State , Terms , V1State , DummyCountersRef ).
315
+ State = recover_index_v1_common (State0 , Terms , V1State , DummyCountersRef ),
316
+ logger :info (" Queue ~s on vhost ~s converted ~b total messages to the new index format" ,
317
+ [QName , VHost , counters :get (DummyCountersRef , ? RECOVER_COUNT )]),
318
+ State .
314
319
315
- recover_index_v1_dirty (State = # mqistate { queue_name = Name }, Terms , IsMsgStoreClean ,
320
+ recover_index_v1_dirty (State0 = # mqistate { queue_name = Name }, Terms , IsMsgStoreClean ,
316
321
ContainsCheckFun , OnSyncFun , OnSyncMsgFun ,
317
322
CountersRef ) ->
323
+ # resource {virtual_host = VHost , name = QName } = Name ,
324
+ logger :info (" Converting dirty queue ~s on vhost ~s to the new index format" , [QName , VHost ]),
318
325
% % We ignore the count and bytes returned here because we cannot trust
319
326
% % rabbit_queue_index: it has a bug that may lead to more bytes being
320
327
% % returned than it really has.
321
328
{_ , _ , V1State } = rabbit_queue_index :recover (Name , Terms , IsMsgStoreClean ,
322
329
ContainsCheckFun , OnSyncFun , OnSyncMsgFun ),
323
- recover_index_v1_common (State , Terms , V1State , CountersRef ).
330
+ State = recover_index_v1_common (State0 , Terms , V1State , CountersRef ),
331
+ logger :info (" Queue ~s on vhost ~s converted ~b total messages to the new index format" ,
332
+ [QName , VHost , counters :get (CountersRef , ? RECOVER_COUNT )]),
333
+ State .
324
334
325
335
recover_index_v1_common (State0 = # mqistate { queue_name = # resource { virtual_host = VHost },
326
336
dir = Dir }, Terms , V1State , CountersRef ) ->
@@ -359,16 +369,22 @@ recover_index_v1_common(State0 = #mqistate{ queue_name = #resource{ virtual_host
359
369
360
370
recover_index_v1_loop (State , _ , _ , _ , HiSeqId , HiSeqId ) ->
361
371
State ;
362
- recover_index_v1_loop (State0 , MSClient , V1State0 , CountersRef , LoSeqId , HiSeqId ) ->
372
+ recover_index_v1_loop (State0 = # mqistate { queue_name = Name },
373
+ MSClient , V1State0 , CountersRef , LoSeqId , HiSeqId ) ->
363
374
UpSeqId = lists :min ([rabbit_queue_index :next_segment_boundary (LoSeqId ),
364
375
HiSeqId ]),
365
376
{Messages , V1State } = rabbit_queue_index :read (LoSeqId , UpSeqId , V1State0 ),
366
- counters :add (CountersRef , ? RECOVER_COUNT , length (Messages )),
367
- State = lists :foldl (fun ({MsgOrId , SeqId , Props , IsPersistent , IsDelivered }, State1 ) ->
377
+ % % We do a garbage collect immediately after the old index read
378
+ % % and ack because they may have created a lot of garbage.
379
+ garbage_collect (),
380
+ MessagesCount = length (Messages ),
381
+ counters :add (CountersRef , ? RECOVER_COUNT , MessagesCount ),
382
+ State2 = lists :foldl (fun ({MsgOrId , SeqId , Props , IsPersistent , IsDelivered }, State1 ) ->
368
383
% % We must move embedded messages to the message store.
369
384
MsgId = case MsgOrId of
370
385
Msg = # basic_message { id = MsgId0 } ->
371
- rabbit_msg_store :write (MsgId0 , Msg , MSClient ),
386
+ % % We must do a synchronous write to avoid overloading the message store.
387
+ rabbit_msg_store :sync_write (MsgId0 , Msg , MSClient ),
372
388
MsgId0 ;
373
389
MsgId0 ->
374
390
MsgId0
@@ -378,6 +394,15 @@ recover_index_v1_loop(State0, MSClient, V1State0, CountersRef, LoSeqId, HiSeqId)
378
394
counters :add (CountersRef , ? RECOVER_BYTES , Props # message_properties .size ),
379
395
publish (MsgId , SeqId , Props , IsPersistent , IsDelivered , infinity , State1 )
380
396
end , State0 , Messages ),
397
+ State = flush (State2 ),
398
+ % % We have written everything to disk. We can delete the old segment file
399
+ % % to free up much needed space, to avoid doubling disk usage during the upgrade.
400
+ rabbit_queue_index :delete_segment_file_for_seq_id (LoSeqId , V1State ),
401
+ % % Log some progress to keep the user aware of what's going on, as moving
402
+ % % embedded messages can take quite some time.
403
+ # resource {virtual_host = VHost , name = QName } = Name ,
404
+ logger :info (" Queue ~s on vhost ~s converted ~b more messages to the new index format" ,
405
+ [QName , VHost , MessagesCount ]),
381
406
recover_index_v1_loop (State , MSClient , V1State , CountersRef , UpSeqId , HiSeqId ).
382
407
383
408
-spec terminate (rabbit_types :vhost (), [any ()], State ) -> State when State :: mqistate ().
0 commit comments