Skip to content

Commit db888df

Browse files
Merge pull request #1774 from noxdafox/master
RFC rabbit_amqqueue_process: improve message duplicates handling
2 parents 3abfe38 + 11002ab commit db888df

File tree

2 files changed

+37
-32
lines changed

2 files changed

+37
-32
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
658658
State#q{consumers = Consumers})}
659659
end.
660660

661-
maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
661+
maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
662+
Delivered,
663+
State = #q{overflow = Overflow,
664+
backing_queue = BQ,
665+
backing_queue_state = BQS}) ->
662666
send_mandatory(Delivery), %% must do this before confirms
663667
case {will_overflow(Delivery, State), Overflow} of
664668
{true, 'reject-publish'} ->
665669
%% Drop publish and nack to publisher
666670
send_reject_publish(Delivery, Delivered, State);
667671
_ ->
668-
%% Enqueue and maybe drop head later
669-
deliver_or_enqueue(Delivery, Delivered, State)
672+
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
673+
State1 = State#q{backing_queue_state = BQS1},
674+
case IsDuplicate of
675+
true -> State1;
676+
{true, drop} -> State1;
677+
%% Drop publish and nack to publisher
678+
{true, reject} ->
679+
send_reject_publish(Delivery, Delivered, State1);
680+
%% Enqueue and maybe drop head later
681+
false ->
682+
deliver_or_enqueue(Delivery, Delivered, State1)
683+
end
670684
end.
671685

672686
deliver_or_enqueue(Delivery = #delivery{message = Message,
673687
sender = SenderPid,
674688
flow = Flow},
675-
Delivered, State = #q{backing_queue = BQ,
676-
backing_queue_state = BQS}) ->
689+
Delivered,
690+
State = #q{backing_queue = BQ}) ->
677691
{Confirm, State1} = send_or_record_confirm(Delivery, State),
678692
Props = message_properties(Message, Confirm, State1),
679-
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
680-
State2 = State1#q{backing_queue_state = BQS1},
681-
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
682-
State2) of
683-
true ->
693+
case attempt_delivery(Delivery, Props, Delivered, State1) of
694+
{delivered, State2} ->
684695
State2;
685-
{delivered, State3} ->
686-
State3;
687696
%% The next one is an optimisation
688-
{undelivered, State3 = #q{ttl = 0, dlx = undefined,
689-
backing_queue_state = BQS2,
697+
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
698+
backing_queue_state = BQS,
690699
msg_id_to_channel = MTC}} ->
691-
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
692-
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
693-
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
694-
695-
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
696-
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
697-
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
698-
QLen = BQ:len(BQS4),
700+
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
701+
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
702+
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
703+
704+
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
705+
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
706+
maybe_drop_head(State2#q{backing_queue_state = BQS1}),
707+
QLen = BQ:len(BQS2),
699708
%% optimisation: it would be perfectly safe to always
700709
%% invoke drop_expired_msgs here, but that is expensive so
701710
%% we only do that if a new message that might have an
@@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
704713
%% has no expiry and becomes the head of the queue then
705714
%% the call is unnecessary.
706715
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
707-
{false, false, _} -> State4;
708-
{true, true, undefined} -> State4;
709-
{_, _, _} -> drop_expired_msgs(State4)
716+
{false, false, _} -> State3;
717+
{true, true, undefined} -> State3;
718+
{_, _, _} -> drop_expired_msgs(State3)
710719
end
711720
end.
712721

@@ -1610,7 +1619,3 @@ update_ha_mode(State) ->
16101619
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
16111620
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
16121621
State.
1613-
1614-
1615-
1616-

src/rabbit_mirror_queue_master.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
462462
%% immediately after calling is_duplicate). The msg is
463463
%% invalid. We will not see this again, nor will we be
464464
%% further involved in confirming this message, so erase.
465-
{true, State #state { seen_status = maps:remove(MsgId, SS) }};
465+
{{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
466466
{ok, Disposition}
467467
when Disposition =:= confirmed
468468
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
477477
%% Message was discarded while we were a slave. Confirm now.
478478
%% As above, amqqueue_process will have the entry for the
479479
%% msg_id_to_channel mapping.
480-
{true, State #state { seen_status = maps:remove(MsgId, SS),
481-
confirmed = [MsgId | Confirmed] }}
480+
{{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
481+
confirmed = [MsgId | Confirmed] }}
482482
end.
483483

484484
set_queue_mode(Mode, State = #state { gm = GM,

0 commit comments

Comments
 (0)