34
34
-export ([add_vhost_msg_store /1 ]).
35
35
36
36
% % exported for testing only
37
- -export ([start_msg_store /2 , stop_msg_store /0 , init /6 ]).
37
+ -export ([start_msg_store /3 , stop_msg_store /0 , init /6 ]).
38
38
39
39
-export ([move_messages_to_vhost_store /0 ]).
40
40
-export ([stop_vhost_msg_store /1 ]).
41
41
-include_lib (" stdlib/include/qlc.hrl" ).
42
42
43
43
-define (QUEUE_MIGRATION_BATCH_SIZE , 100 ).
44
+ -define (MSG_STORE_MODULE_FILE , " msg_store_module" ).
44
45
45
46
% %----------------------------------------------------------------------------
46
47
% % Messages, and their position in the queue, can be in memory or on
@@ -499,15 +500,36 @@ start(DurableQueues) ->
499
500
end ,
500
501
{DurableQueues , #{}},
501
502
AllTerms ),
502
- start_msg_store (VhostRefs , StartFunState ),
503
+ start_msg_store (VhostRefs , StartFunState , DurableQueues ),
503
504
{ok , AllTerms }.
504
505
505
506
stop () ->
506
507
ok = stop_msg_store (),
507
508
ok = rabbit_queue_index :stop ().
508
509
509
- start_msg_store (Refs , StartFunState ) when is_map (Refs ); Refs == undefined ->
510
+ start_msg_store (Refs , StartFunState , DurableQueues ) when is_map (Refs );
511
+ Refs == undefined ->
510
512
MsgStoreModule = application :get_env (rabbit , msg_store_module , rabbit_msg_store ),
513
+ case rabbit_file :read_term_file (msg_store_module_file ()) of
514
+ % % The same message store module
515
+ {ok , [MsgStoreModule ]} -> ok ;
516
+ % % Fresh message store
517
+ {error , enoent } -> ok ;
518
+ {ok , [OldModule ]} ->
519
+ case DurableQueues of
520
+ % % There is no data in the old message store.
521
+ % % So it's safe to start with the new one
522
+ [] -> ok ;
523
+ _ ->
524
+ error ({msg_store_module_mismatch ,
525
+ MsgStoreModule ,
526
+ OldModule })
527
+ end ;
528
+ Other ->
529
+ error (Other )
530
+ end ,
531
+ rabbit_file :read_term_file (msg_store_module_file (),
532
+ [MsgStoreModule ]),
511
533
rabbit_log :info (" Using ~p to provide message store" , [MsgStoreModule ]),
512
534
ok = rabbit_sup :start_child (? TRANSIENT_MSG_STORE_SUP , rabbit_msg_store_vhost_sup ,
513
535
[? TRANSIENT_MSG_STORE_SUP ,
@@ -527,8 +549,14 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
527
549
add_vhost_msg_store (Vhost )
528
550
end ,
529
551
lists :sort (VHosts )),
552
+ % % When message store is started, we can save a module
553
+ rabbit_file :write_term_file (msg_store_module_file (),
554
+ [MsgStoreModule ]),
530
555
ok .
531
556
557
+ msg_store_module_file () ->
558
+ filename :join (rabbit_mnesia :dir (), ? MSG_STORE_MODULE_FILE ).
559
+
532
560
add_vhost_msg_store (VHost ) ->
533
561
rabbit_log :info (" Starting message stores for vhost '~s '~n " , [VHost ]),
534
562
rabbit_msg_store_vhost_sup :add_vhost (? TRANSIENT_MSG_STORE_SUP , VHost ),
@@ -2827,6 +2855,8 @@ move_messages_to_vhost_store() ->
2827
2855
2828
2856
ok = rabbit_queue_index :stop (),
2829
2857
ok = rabbit_sup :stop_child (NewStoreSup ),
2858
+ % % Specify old message store module.
2859
+ rabbit_file :write_term_file (msg_store_module_file (), [rabbit_msg_store ]),
2830
2860
ok .
2831
2861
2832
2862
in_batches (Size , MFA , List , MessageStart , MessageEnd ) ->
0 commit comments