Skip to content

RFC rabbit_amqqueue_process: improve message duplicates handling #1774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 34 additions & 29 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.

maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
Delivered,
State = #q{overflow = Overflow,
backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
%% Enqueue and maybe drop head later
deliver_or_enqueue(Delivery, Delivered, State)
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
case IsDuplicate of
true -> State1;
{true, drop} -> State1;
%% Drop publish and nack to publisher
{true, reject} ->
send_reject_publish(Delivery, Delivered, State1);
%% Enqueue and maybe drop head later
false ->
deliver_or_enqueue(Delivery, Delivered, State1)
end
end.

deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
Delivered,
State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
State2) of
true ->
case attempt_delivery(Delivery, Props, Delivered, State1) of
{delivered, State2} ->
State2;
{delivered, State3} ->
State3;
%% The next one is an optimisation
{undelivered, State3 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS2,
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->

BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
QLen = BQ:len(BQS4),
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
{undelivered, State2 = #q{backing_queue_state = BQS}} ->

BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
Expand All @@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
{false, false, _} -> State3;
{true, true, undefined} -> State3;
{_, _, _} -> drop_expired_msgs(State3)
end
end.

Expand Down Expand Up @@ -1610,7 +1619,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.




6 changes: 3 additions & 3 deletions src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
{true, State #state { seen_status = maps:remove(MsgId, SS) }};
{{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
Expand All @@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
{true, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
{{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
end.

set_queue_mode(Mode, State = #state { gm = GM,
Expand Down