|
16 | 16 |
|
17 | 17 | -module(rabbit_queue_index).
|
18 | 18 |
|
19 |
| --export([erase/1, init/3, recover/6, |
| 19 | +-export([erase/1, init/3, reset_state/1, recover/6, |
20 | 20 | terminate/2, delete_and_terminate/1,
|
21 | 21 | pre_publish/7, flush_pre_publish_cache/2,
|
22 | 22 | publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
|
|
224 | 224 | -type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
|
225 | 225 |
|
226 | 226 | -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
|
| 227 | +-spec(reset_state/1 :: (qistate()) -> qistate()). |
227 | 228 | -spec(init/3 :: (rabbit_amqqueue:name(),
|
228 | 229 | on_sync_fun(), on_sync_fun()) -> qistate()).
|
229 | 230 | -spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
|
|
261 | 262 |
|
262 | 263 | erase(Name) ->
|
263 | 264 | #qistate { dir = Dir } = blank_state(Name),
|
264 |
| - case rabbit_file:is_dir(Dir) of |
265 |
| - true -> rabbit_file:recursive_delete([Dir]); |
266 |
| - false -> ok |
267 |
| - end. |
| 265 | + erase_index_dir(Dir). |
| 266 | + |
| 267 | +%% used during variable queue purge when there are no pending acks |
| 268 | +reset_state(#qistate{ dir = Dir, |
| 269 | + on_sync = OnSyncFun, |
| 270 | + on_sync_msg = OnSyncMsgFun, |
| 271 | + journal_handle = JournalHdl }) -> |
| 272 | + ok = erase_index_dir(Dir), |
| 273 | + ok = case JournalHdl of |
| 274 | + undefined -> ok; |
| 275 | + _ -> file_handle_cache:close(JournalHdl) |
| 276 | + end, |
| 277 | + blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). |
268 | 278 |
|
269 | 279 | init(Name, OnSyncFun, OnSyncMsgFun) ->
|
270 | 280 | State = #qistate { dir = Dir } = blank_state(Name),
|
@@ -507,20 +517,31 @@ all_queue_directory_names(Dir) ->
|
507 | 517 | %% startup and shutdown
|
508 | 518 | %%----------------------------------------------------------------------------
|
509 | 519 |
|
| 520 | +erase_index_dir(Dir) -> |
| 521 | + case rabbit_file:is_dir(Dir) of |
| 522 | + true -> rabbit_file:recursive_delete([Dir]); |
| 523 | + false -> ok |
| 524 | + end. |
| 525 | + |
510 | 526 | blank_state(QueueName) ->
|
511 | 527 | blank_state_dir(
|
512 | 528 | filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
|
513 | 529 |
|
514 | 530 | blank_state_dir(Dir) ->
|
| 531 | + blank_state_dir_funs(Dir, |
| 532 | + fun (_) -> ok end, |
| 533 | + fun (_) -> ok end). |
| 534 | + |
| 535 | +blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> |
515 | 536 | {ok, MaxJournal} =
|
516 | 537 | application:get_env(rabbit, queue_index_max_journal_entries),
|
517 | 538 | #qistate { dir = Dir,
|
518 | 539 | segments = segments_new(),
|
519 | 540 | journal_handle = undefined,
|
520 | 541 | dirty_count = 0,
|
521 | 542 | max_journal_entries = MaxJournal,
|
522 |
| - on_sync = fun (_) -> ok end, |
523 |
| - on_sync_msg = fun (_) -> ok end, |
| 543 | + on_sync = OnSyncFun, |
| 544 | + on_sync_msg = OnSyncMsgFun, |
524 | 545 | unconfirmed = gb_sets:new(),
|
525 | 546 | unconfirmed_msg = gb_sets:new(),
|
526 | 547 | pre_publish_cache = [],
|
|
0 commit comments