-
Notifications
You must be signed in to change notification settings - Fork 3.9k
improvements for BQ:purge #300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4e40d07
5f410cd
1d9a92b
99634fc
6e6651f
b1837a8
f2028a0
09c2922
1609fbe
e6b01de
db235d5
9357489
8719045
5154303
bb515df
fe4c798
8d9206d
e0a30b4
d9c9e16
647ae31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -533,38 +533,29 @@ terminate(_Reason, State) -> | |
%% the only difference between purge and delete is that delete also | ||
%% needs to delete everything that's been delivered and not ack'd. | ||
delete_and_terminate(_Reason, State) -> | ||
%% TODO: there is no need to interact with qi at all - which we do | ||
%% as part of 'purge' and 'purge_pending_ack', other than | ||
%% deleting it. | ||
{_PurgeCount, State1} = purge(State), | ||
State2 = #vqstate { index_state = IndexState, | ||
msg_store_clients = {MSCStateP, MSCStateT} } = | ||
purge_pending_ack(false, State1), | ||
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), | ||
%% Normally when we purge messages we interact with the qi by | ||
%% issues delivers and acks for every purged message. In this case | ||
%% we don't need to do that, so we just delete the qi. | ||
State1 = purge_and_index_reset(State), | ||
State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = | ||
purge_pending_ack_delete_and_terminate(State1), | ||
case MSCStateP of | ||
undefined -> ok; | ||
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) | ||
end, | ||
rabbit_msg_store:client_delete_and_terminate(MSCStateT), | ||
a(State2 #vqstate { index_state = IndexState1, | ||
msg_store_clients = undefined }). | ||
a(State2 #vqstate { msg_store_clients = undefined }). | ||
|
||
delete_crashed(#amqqueue{name = QName}) -> | ||
ok = rabbit_queue_index:erase(QName). | ||
|
||
purge(State = #vqstate { q4 = Q4, | ||
len = Len }) -> | ||
%% TODO: when there are no pending acks, which is a common case, | ||
%% we could simply wipe the qi instead of issuing delivers and | ||
%% acks for all the messages. | ||
State1 = remove_queue_entries(Q4, State), | ||
|
||
State2 = #vqstate { q1 = Q1 } = | ||
purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), | ||
|
||
State3 = remove_queue_entries(Q1, State2), | ||
|
||
{Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. | ||
purge(State = #vqstate { len = Len }) -> | ||
case is_pending_ack_empty(State) of | ||
true -> | ||
{Len, purge_and_index_reset(State)}; | ||
false -> | ||
{Len, purge_when_pending_acks(State)} | ||
end. | ||
|
||
purge_acks(State) -> a(purge_pending_ack(false, State)). | ||
|
||
|
@@ -754,10 +745,8 @@ len(#vqstate { len = Len }) -> Len. | |
|
||
is_empty(State) -> 0 == len(State). | ||
|
||
depth(State = #vqstate { ram_pending_ack = RPA, | ||
disk_pending_ack = DPA, | ||
qi_pending_ack = QPA }) -> | ||
len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). | ||
depth(State) -> | ||
len(State) + count_pending_acks(State). | ||
|
||
set_ram_duration_target( | ||
DurationTarget, State = #vqstate { | ||
|
@@ -1072,7 +1061,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> | |
maybe_write_delivered(true, SeqId, IndexState) -> | ||
rabbit_queue_index:deliver([SeqId], IndexState). | ||
|
||
betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> | ||
betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> | ||
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} = | ||
lists:foldr( | ||
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, | ||
|
@@ -1095,9 +1084,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> | |
end | ||
end | ||
end, {?QUEUE:new(), [], [], 0, 0}, List), | ||
{Filtered, RamReadyCount, RamBytes, | ||
rabbit_queue_index:ack( | ||
Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. | ||
{Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. | ||
%% [0] We don't increase RamBytes here, even though it pertains to | ||
%% unacked messages too, since if HaveMsg then the message must have | ||
%% been stored in the QI, thus the message must have been in | ||
|
@@ -1323,25 +1310,75 @@ remove(AckRequired, MsgStatus = #msg_status { | |
State2 #vqstate {out_counter = OutCount + 1, | ||
index_state = IndexState2})}. | ||
|
||
purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> | ||
%%---------------------------------------------------------------------------- | ||
%% Helpers for Public API purge/1 function | ||
%%---------------------------------------------------------------------------- | ||
|
||
%% The difference between purge_when_pending_acks/1 | ||
%% vs. purge_and_index_reset/1 is that the first one issues a deliver | ||
%% and an ack to the queue index for every message that's being | ||
%% removed, while the later just resets the queue index state. | ||
purge_when_pending_acks(State) -> | ||
State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), | ||
a(State1). | ||
|
||
purge_and_index_reset(State) -> | ||
State1 = purge1(process_delivers_and_acks_fun(none), State), | ||
a(reset_qi_state(State1)). | ||
|
||
%% This function removes messages from each of {q1, q2, q3, q4}. | ||
%% | ||
%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 | ||
%% are specially handled by purge_betas_and_deltas/2. | ||
%% | ||
%% purge_betas_and_deltas/2 loads messages from the queue index, | ||
%% filling up q3 and in some cases moving messages form q2 to q3 while | ||
%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The | ||
%% messages loaded into q3 are removed by calling | ||
%% remove_queue_entries/3 until there are no more messages to be read | ||
%% from the queue index. Messages are read in batches from the queue | ||
%% index. | ||
purge1(AfterFun, State = #vqstate { q4 = Q4}) -> | ||
State1 = remove_queue_entries(Q4, AfterFun, State), | ||
|
||
State2 = #vqstate {q1 = Q1} = | ||
purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), | ||
|
||
State3 = remove_queue_entries(Q1, AfterFun, State2), | ||
|
||
a(State3#vqstate{q1 = ?QUEUE:new()}). | ||
|
||
reset_qi_state(State = #vqstate{index_state = IndexState}) -> | ||
State#vqstate{index_state = | ||
rabbit_queue_index:reset_state(IndexState)}. | ||
|
||
is_pending_ack_empty(State) -> | ||
count_pending_acks(State) =:= 0. | ||
|
||
count_pending_acks(#vqstate { ram_pending_ack = RPA, | ||
disk_pending_ack = DPA, | ||
qi_pending_ack = QPA }) -> | ||
gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). | ||
|
||
purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) -> | ||
case ?QUEUE:is_empty(Q3) of | ||
true -> State; | ||
false -> State1 = remove_queue_entries(Q3, State), | ||
purge_betas_and_deltas(maybe_deltas_to_betas( | ||
false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State), | ||
purge_betas_and_deltas(DelsAndAcksFun, | ||
maybe_deltas_to_betas( | ||
DelsAndAcksFun, | ||
State1#vqstate{q3 = ?QUEUE:new()})) | ||
end. | ||
|
||
remove_queue_entries(Q, State = #vqstate{index_state = IndexState, | ||
msg_store_clients = MSCState}) -> | ||
remove_queue_entries(Q, DelsAndAcksFun, | ||
State = #vqstate{msg_store_clients = MSCState}) -> | ||
{MsgIdsByStore, Delivers, Acks, State1} = | ||
?QUEUE:foldl(fun remove_queue_entries1/2, | ||
{orddict:new(), [], [], State}, Q), | ||
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> | ||
msg_store_remove(MSCState, IsPersistent, MsgIds) | ||
end, ok, MsgIdsByStore), | ||
IndexState1 = rabbit_queue_index:ack( | ||
Acks, rabbit_queue_index:deliver(Delivers, IndexState)), | ||
State1#vqstate{index_state = IndexState1}. | ||
DelsAndAcksFun(Delivers, Acks, State1). | ||
|
||
remove_queue_entries1( | ||
#msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, | ||
|
@@ -1356,6 +1393,18 @@ remove_queue_entries1( | |
cons_if(IndexOnDisk, SeqId, Acks), | ||
stats({-1, 0}, {MsgStatus, none}, State)}. | ||
|
||
process_delivers_and_acks_fun(deliver_and_ack) -> | ||
fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> | ||
IndexState1 = | ||
rabbit_queue_index:ack( | ||
Acks, rabbit_queue_index:deliver(Delivers, IndexState)), | ||
State #vqstate { index_state = IndexState1 } | ||
end; | ||
process_delivers_and_acks_fun(_) -> | ||
fun (_, _, State) -> | ||
State | ||
end. | ||
|
||
%%---------------------------------------------------------------------------- | ||
%% Internal gubbins for publishing | ||
%%---------------------------------------------------------------------------- | ||
|
@@ -1509,11 +1558,29 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, | |
end. | ||
|
||
purge_pending_ack(KeepPersistent, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function got split in 2. The part that collects pending acks, and the part that handles the removal of those pending acks/msgs depending on the |
||
State = #vqstate { ram_pending_ack = RPA, | ||
disk_pending_ack = DPA, | ||
qi_pending_ack = QPA, | ||
index_state = IndexState, | ||
State = #vqstate { index_state = IndexState, | ||
msg_store_clients = MSCState }) -> | ||
{IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), | ||
case KeepPersistent of | ||
true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), | ||
State1; | ||
false -> IndexState1 = | ||
rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), | ||
remove_msgs_by_id(MsgIdsByStore, MSCState), | ||
State1 #vqstate { index_state = IndexState1 } | ||
end. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
purge_pending_ack_delete_and_terminate( | ||
State = #vqstate { index_state = IndexState, | ||
msg_store_clients = MSCState }) -> | ||
{_, MsgIdsByStore, State1} = purge_pending_ack1(State), | ||
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), | ||
remove_msgs_by_id(MsgIdsByStore, MSCState), | ||
State1 #vqstate { index_state = IndexState1 }. | ||
|
||
purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, | ||
disk_pending_ack = DPA, | ||
qi_pending_ack = QPA }) -> | ||
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, | ||
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = | ||
rabbit_misc:gb_trees_fold( | ||
|
@@ -1523,19 +1590,26 @@ purge_pending_ack(KeepPersistent, | |
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), | ||
disk_pending_ack = gb_trees:empty(), | ||
qi_pending_ack = gb_trees:empty()}, | ||
{IndexOnDiskSeqIds, MsgIdsByStore, State1}. | ||
|
||
case KeepPersistent of | ||
true -> case orddict:find(false, MsgIdsByStore) of | ||
error -> State1; | ||
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, | ||
MsgIds), | ||
State1 | ||
end; | ||
false -> IndexState1 = | ||
rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), | ||
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds) | ||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], | ||
State1 #vqstate { index_state = IndexState1 } | ||
%% MsgIdsByStore is an orddict with two keys: | ||
%% | ||
%% true: holds a list of Persistent Message Ids. | ||
%% false: holds a list of Transient Message Ids. | ||
%% | ||
%% When we call orddict:to_list/1 we get two sets of msg ids, where | ||
%% IsPersistent is either true for persistent messages or false for | ||
%% transient ones. The msg_store_remove/3 function takes this boolean | ||
%% flag to determine from which store the messages should be removed | ||
%% from. | ||
remove_msgs_by_id(MsgIdsByStore, MSCState) -> | ||
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds) | ||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. | ||
|
||
remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> | ||
case orddict:find(false, MsgIdsByStore) of | ||
error -> ok; | ||
{ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) | ||
end. | ||
|
||
accumulate_ack_init() -> {[], orddict:new(), []}. | ||
|
@@ -1869,9 +1943,15 @@ fetch_from_q3(State = #vqstate { q1 = Q1, | |
{loaded, {MsgStatus, State2}} | ||
end. | ||
|
||
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> | ||
maybe_deltas_to_betas(State) -> | ||
AfterFun = process_delivers_and_acks_fun(deliver_and_ack), | ||
maybe_deltas_to_betas(AfterFun, State). | ||
|
||
maybe_deltas_to_betas(_DelsAndAcksFun, | ||
State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> | ||
State; | ||
maybe_deltas_to_betas(State = #vqstate { | ||
maybe_deltas_to_betas(DelsAndAcksFun, | ||
State = #vqstate { | ||
q2 = Q2, | ||
delta = Delta, | ||
q3 = Q3, | ||
|
@@ -1891,34 +1971,35 @@ maybe_deltas_to_betas(State = #vqstate { | |
DeltaSeqIdEnd]), | ||
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, | ||
IndexState), | ||
{Q3a, RamCountsInc, RamBytesInc, IndexState2} = | ||
{Q3a, RamCountsInc, RamBytesInc, State1} = | ||
betas_from_index_entries(List, TransientThreshold, | ||
RPA, DPA, QPA, IndexState1), | ||
State1 = State #vqstate { index_state = IndexState2, | ||
ram_msg_count = RamMsgCount + RamCountsInc, | ||
ram_bytes = RamBytes + RamBytesInc, | ||
disk_read_count = DiskReadCount + RamCountsInc}, | ||
RPA, DPA, QPA, DelsAndAcksFun, | ||
State #vqstate { index_state = IndexState1 }), | ||
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, | ||
ram_bytes = RamBytes + RamBytesInc, | ||
disk_read_count = DiskReadCount + RamCountsInc }, | ||
case ?QUEUE:len(Q3a) of | ||
0 -> | ||
%% we ignored every message in the segment due to it being | ||
%% transient and below the threshold | ||
maybe_deltas_to_betas( | ||
State1 #vqstate { | ||
DelsAndAcksFun, | ||
State2 #vqstate { | ||
delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); | ||
Q3aLen -> | ||
Q3b = ?QUEUE:join(Q3, Q3a), | ||
case DeltaCount - Q3aLen of | ||
0 -> | ||
%% delta is now empty, but it wasn't before, so | ||
%% can now join q2 onto q3 | ||
State1 #vqstate { q2 = ?QUEUE:new(), | ||
State2 #vqstate { q2 = ?QUEUE:new(), | ||
delta = ?BLANK_DELTA, | ||
q3 = ?QUEUE:join(Q3b, Q2) }; | ||
N when N > 0 -> | ||
Delta1 = d(#delta { start_seq_id = DeltaSeqId1, | ||
count = N, | ||
end_seq_id = DeltaSeqIdEnd }), | ||
State1 #vqstate { delta = Delta1, | ||
State2 #vqstate { delta = Delta1, | ||
q3 = Q3b } | ||
end | ||
end. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a usable index state, since after BQ:purge/1 the index state might keep being used by future publishes and so on.