Skip to content

Commit 0cc04a1

Browse files
committed
rabbit_amqqueue_process: handle is_duplicate instructions
The `is_duplicate` callback can signal the amqqueue process that the message is a duplicate and suggests whether to just ignore it or to reject it. This enables the `rabbit_backing_queue` implementations to notify a publisher that the message is deemed a duplicate. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent aad01b4 commit 0cc04a1

File tree

1 file changed

+34
-29
lines changed

1 file changed

+34
-29
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
658658
State#q{consumers = Consumers})}
659659
end.
660660

661-
maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
661+
maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
662+
Delivered,
663+
State = #q{overflow = Overflow,
664+
backing_queue = BQ,
665+
backing_queue_state = BQS}) ->
662666
send_mandatory(Delivery), %% must do this before confirms
663667
case {will_overflow(Delivery, State), Overflow} of
664668
{true, 'reject-publish'} ->
665669
%% Drop publish and nack to publisher
666670
send_reject_publish(Delivery, Delivered, State);
667671
_ ->
668-
%% Enqueue and maybe drop head later
669-
deliver_or_enqueue(Delivery, Delivered, State)
672+
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
673+
State1 = State#q{backing_queue_state = BQS1},
674+
case IsDuplicate of
675+
true -> State1;
676+
{true, drop} -> State1;
677+
%% Drop publish and nack to publisher
678+
{true, reject} ->
679+
send_reject_publish(Delivery, Delivered, State1);
680+
%% Enqueue and maybe drop head later
681+
false ->
682+
deliver_or_enqueue(Delivery, Delivered, State1)
683+
end
670684
end.
671685

672686
deliver_or_enqueue(Delivery = #delivery{message = Message,
673687
sender = SenderPid,
674688
flow = Flow},
675-
Delivered, State = #q{backing_queue = BQ,
676-
backing_queue_state = BQS}) ->
689+
Delivered,
690+
State = #q{backing_queue = BQ}) ->
677691
{Confirm, State1} = send_or_record_confirm(Delivery, State),
678692
Props = message_properties(Message, Confirm, State1),
679-
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
680-
State2 = State1#q{backing_queue_state = BQS1},
681-
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
682-
State2) of
683-
true ->
693+
case attempt_delivery(Delivery, Props, Delivered, State1) of
694+
{delivered, State2} ->
684695
State2;
685-
{delivered, State3} ->
686-
State3;
687696
%% The next one is an optimisation
688-
{undelivered, State3 = #q{ttl = 0, dlx = undefined,
689-
backing_queue_state = BQS2,
697+
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
698+
backing_queue_state = BQS,
690699
msg_id_to_channel = MTC}} ->
691-
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
692-
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
693-
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
694-
695-
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
696-
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
697-
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
698-
QLen = BQ:len(BQS4),
700+
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
701+
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
702+
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
703+
704+
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
705+
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
706+
maybe_drop_head(State2#q{backing_queue_state = BQS1}),
707+
QLen = BQ:len(BQS2),
699708
%% optimisation: it would be perfectly safe to always
700709
%% invoke drop_expired_msgs here, but that is expensive so
701710
%% we only do that if a new message that might have an
@@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
704713
%% has no expiry and becomes the head of the queue then
705714
%% the call is unnecessary.
706715
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
707-
{false, false, _} -> State4;
708-
{true, true, undefined} -> State4;
709-
{_, _, _} -> drop_expired_msgs(State4)
716+
{false, false, _} -> State3;
717+
{true, true, undefined} -> State3;
718+
{_, _, _} -> drop_expired_msgs(State3)
710719
end
711720
end.
712721

@@ -1630,7 +1639,3 @@ update_ha_mode(State) ->
16301639
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
16311640
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
16321641
State.
1633-
1634-
1635-
1636-

0 commit comments

Comments
 (0)