Skip to content

improvements for BQ:purge #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Sep 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(rabbit_queue_index).

-export([erase/1, init/3, recover/6,
-export([erase/1, init/3, reset_state/1, recover/6,
terminate/2, delete_and_terminate/1,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
Expand Down Expand Up @@ -220,6 +220,7 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').

-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
-spec(reset_state/1 :: (qistate()) -> qistate()).
-spec(init/3 :: (rabbit_amqqueue:name(),
on_sync_fun(), on_sync_fun()) -> qistate()).
-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
Expand Down Expand Up @@ -257,10 +258,19 @@

erase(Name) ->
#qistate { dir = Dir } = blank_state(Name),
case rabbit_file:is_dir(Dir) of
true -> rabbit_file:recursive_delete([Dir]);
false -> ok
end.
erase_index_dir(Dir).

%% used during variable queue purge when there are no pending acks
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a usable index state, since after BQ:purge/1 the index state might keep being used by future publishes and so on.

reset_state(#qistate{ dir = Dir,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
journal_handle = JournalHdl }) ->
ok = erase_index_dir(Dir),
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).

init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
Expand Down Expand Up @@ -431,20 +441,31 @@ all_queue_directory_names(Dir) ->
%% startup and shutdown
%%----------------------------------------------------------------------------

erase_index_dir(Dir) ->
case rabbit_file:is_dir(Dir) of
true -> rabbit_file:recursive_delete([Dir]);
false -> ok
end.

blank_state(QueueName) ->
blank_state_dir(
filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).

blank_state_dir(Dir) ->
blank_state_dir_funs(Dir,
fun (_) -> ok end,
fun (_) -> ok end).

blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
on_sync_msg = fun (_) -> ok end,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new() }.

Expand Down
213 changes: 147 additions & 66 deletions src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -533,38 +533,29 @@ terminate(_Reason, State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'purge_pending_ack', other than
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
%% Normally when we purge messages we interact with the qi by
%% issues delivers and acks for every purged message. In this case
%% we don't need to do that, so we just delete the qi.
State1 = purge_and_index_reset(State),
State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack_delete_and_terminate(State1),
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
end,
rabbit_msg_store:client_delete_and_terminate(MSCStateT),
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
a(State2 #vqstate { msg_store_clients = undefined }).

delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).

purge(State = #vqstate { q4 = Q4,
len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
State1 = remove_queue_entries(Q4, State),

State2 = #vqstate { q1 = Q1 } =
purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),

State3 = remove_queue_entries(Q1, State2),

{Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
purge(State = #vqstate { len = Len }) ->
case is_pending_ack_empty(State) of
true ->
{Len, purge_and_index_reset(State)};
false ->
{Len, purge_when_pending_acks(State)}
end.

purge_acks(State) -> a(purge_pending_ack(false, State)).

Expand Down Expand Up @@ -754,10 +745,8 @@ len(#vqstate { len = Len }) -> Len.

is_empty(State) -> 0 == len(State).

depth(State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA }) ->
len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
depth(State) ->
len(State) + count_pending_acks(State).

set_ram_duration_target(
DurationTarget, State = #vqstate {
Expand Down Expand Up @@ -1072,7 +1061,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).

betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
Expand All @@ -1095,9 +1084,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
end
end
end, {?QUEUE:new(), [], [], 0, 0}, List),
{Filtered, RamReadyCount, RamBytes,
rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
{Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
Expand Down Expand Up @@ -1323,25 +1310,75 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.

purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
%%----------------------------------------------------------------------------
%% Helpers for Public API purge/1 function
%%----------------------------------------------------------------------------

%% The difference between purge_when_pending_acks/1
%% vs. purge_and_index_reset/1 is that the first one issues a deliver
%% and an ack to the queue index for every message that's being
%% removed, while the later just resets the queue index state.
purge_when_pending_acks(State) ->
State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
a(State1).

purge_and_index_reset(State) ->
State1 = purge1(process_delivers_and_acks_fun(none), State),
a(reset_qi_state(State1)).

%% This function removes messages from each of {q1, q2, q3, q4}.
%%
%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
%% are specially handled by purge_betas_and_deltas/2.
%%
%% purge_betas_and_deltas/2 loads messages from the queue index,
%% filling up q3 and in some cases moving messages form q2 to q3 while
%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
%% messages loaded into q3 are removed by calling
%% remove_queue_entries/3 until there are no more messages to be read
%% from the queue index. Messages are read in batches from the queue
%% index.
purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
State1 = remove_queue_entries(Q4, AfterFun, State),

State2 = #vqstate {q1 = Q1} =
purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),

State3 = remove_queue_entries(Q1, AfterFun, State2),

a(State3#vqstate{q1 = ?QUEUE:new()}).

reset_qi_state(State = #vqstate{index_state = IndexState}) ->
State#vqstate{index_state =
rabbit_queue_index:reset_state(IndexState)}.

is_pending_ack_empty(State) ->
count_pending_acks(State) =:= 0.

count_pending_acks(#vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA }) ->
gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).

purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
true -> State;
false -> State1 = remove_queue_entries(Q3, State),
purge_betas_and_deltas(maybe_deltas_to_betas(
false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State),
purge_betas_and_deltas(DelsAndAcksFun,
maybe_deltas_to_betas(
DelsAndAcksFun,
State1#vqstate{q3 = ?QUEUE:new()}))
end.

remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
msg_store_clients = MSCState}) ->
remove_queue_entries(Q, DelsAndAcksFun,
State = #vqstate{msg_store_clients = MSCState}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
IndexState1 = rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
State1#vqstate{index_state = IndexState1}.
DelsAndAcksFun(Delivers, Acks, State1).

remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
Expand All @@ -1356,6 +1393,18 @@ remove_queue_entries1(
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, State)}.

process_delivers_and_acks_fun(deliver_and_ack) ->
fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
IndexState1 =
rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
State #vqstate { index_state = IndexState1 }
end;
process_delivers_and_acks_fun(_) ->
fun (_, _, State) ->
State
end.

%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -1509,11 +1558,29 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
end.

purge_pending_ack(KeepPersistent,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function got split in 2. The part that collects pending acks, and the part that handles the removal of those pending acks/msgs depending on the KeepPersistent flag.

State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA,
index_state = IndexState,
State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
case KeepPersistent of
true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
State1;
false -> IndexState1 =
rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
remove_msgs_by_id(MsgIdsByStore, MSCState),
State1 #vqstate { index_state = IndexState1 }
end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purge_pending_ack_delete_and_terminate/1 shoud work like the old purge_pending_ack/2 when KeepPersistent = false, but instead of acking/delivering messages to the index, we just call rabbit_queue_index:delete_and_terminate/1

purge_pending_ack_delete_and_terminate(
State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
{_, MsgIdsByStore, State1} = purge_pending_ack1(State),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
remove_msgs_by_id(MsgIdsByStore, MSCState),
State1 #vqstate { index_state = IndexState1 }.

purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
Expand All @@ -1523,19 +1590,26 @@ purge_pending_ack(KeepPersistent,
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
qi_pending_ack = gb_trees:empty()},
{IndexOnDiskSeqIds, MsgIdsByStore, State1}.

case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
MsgIds),
State1
end;
false -> IndexState1 =
rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
%% MsgIdsByStore is an orddict with two keys:
%%
%% true: holds a list of Persistent Message Ids.
%% false: holds a list of Transient Message Ids.
%%
%% When we call orddict:to_list/1 we get two sets of msg ids, where
%% IsPersistent is either true for persistent messages or false for
%% transient ones. The msg_store_remove/3 function takes this boolean
%% flag to determine from which store the messages should be removed
%% from.
remove_msgs_by_id(MsgIdsByStore, MSCState) ->
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].

remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
case orddict:find(false, MsgIdsByStore) of
error -> ok;
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
end.

accumulate_ack_init() -> {[], orddict:new(), []}.
Expand Down Expand Up @@ -1869,9 +1943,15 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
{loaded, {MsgStatus, State2}}
end.

maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
maybe_deltas_to_betas(State) ->
AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
maybe_deltas_to_betas(AfterFun, State).

maybe_deltas_to_betas(_DelsAndAcksFun,
State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(State = #vqstate {
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
q2 = Q2,
delta = Delta,
q3 = Q3,
Expand All @@ -1891,34 +1971,35 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, IndexState2} =
{Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
RPA, DPA, QPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2,
ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,
disk_read_count = DiskReadCount + RamCountsInc},
RPA, DPA, QPA, DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,
disk_read_count = DiskReadCount + RamCountsInc },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
maybe_deltas_to_betas(
State1 #vqstate {
DelsAndAcksFun,
State2 #vqstate {
delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
Q3aLen ->
Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
State1 #vqstate { q2 = ?QUEUE:new(),
State2 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
count = N,
end_seq_id = DeltaSeqIdEnd }),
State1 #vqstate { delta = Delta1,
State2 #vqstate { delta = Delta1,
q3 = Q3b }
end
end.
Expand Down