Skip to content

Commit 6a979b6

Browse files
committed
amqqueue_process: adopt new is_duplicate backing queue callback
As the de-duplication plugin is the only adopter of the `is_duplicate` callback, we now use a simpler signature. When a message is deemed duplicated, we discard it and re-route it to dead letter exchange. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent c927446 commit 6a979b6

File tree

1 file changed

+19
-8
lines changed

1 file changed

+19
-8
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -722,16 +722,27 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
722722
%% Drop publish and nack to publisher
723723
send_reject_publish(Delivery, State);
724724
_ ->
725-
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
726-
State1 = State#q{backing_queue_state = BQS1},
727-
case IsDuplicate of
728-
true -> State1;
729-
{true, drop} -> State1;
730-
%% Drop publish and nack to publisher
731-
{true, reject} ->
725+
case BQ:is_duplicate(Message, BQS) of
726+
{true, State1} ->
727+
%% Publish to DLX
728+
_ = with_dlx(
729+
DLX,
730+
fun (X) ->
731+
rabbit_global_counters:messages_dead_lettered(maxlen,
732+
rabbit_classic_queue,
733+
at_most_once, 1),
734+
QName = qname(State1),
735+
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
736+
end,
737+
fun () ->
738+
rabbit_global_counters:messages_dead_lettered(maxlen,
739+
rabbit_classic_queue,
740+
disabled, 1)
741+
end),
742+
%% Drop publish and nack to publisher
732743
send_reject_publish(Delivery, State1);
733744
%% Enqueue and maybe drop head later
734-
false ->
745+
{false, State1} ->
735746
deliver_or_enqueue(Delivery, Delivered, State1)
736747
end
737748
end.

0 commit comments

Comments
 (0)