Skip to content

Commit f93baa3

Browse files
committed
amqqueue_process: adopt new is_duplicate backing queue callback
As the de-duplication plugin is the only adopter of the `is_duplicate` callback, we now use a simpler signature. When a message is deemed duplicated, we discard it and re-route it to dead letter exchange. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent c927446 commit f93baa3

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -725,13 +725,26 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
725725
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
726726
State1 = State#q{backing_queue_state = BQS1},
727727
case IsDuplicate of
728-
true -> State1;
729-
{true, drop} -> State1;
730-
%% Drop publish and nack to publisher
731-
{true, reject} ->
728+
true ->
729+
%% Publish to DLX
730+
_ = with_dlx(
731+
DLX,
732+
fun (X) ->
733+
rabbit_global_counters:messages_dead_lettered(maxlen,
734+
rabbit_classic_queue,
735+
at_most_once, 1),
736+
QName = qname(State1),
737+
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
738+
end,
739+
fun () ->
740+
rabbit_global_counters:messages_dead_lettered(maxlen,
741+
rabbit_classic_queue,
742+
disabled, 1)
743+
end),
744+
%% Drop publish and nack to publisher
732745
send_reject_publish(Delivery, State1);
733-
%% Enqueue and maybe drop head later
734746
false ->
747+
%% Enqueue and maybe drop head later
735748
deliver_or_enqueue(Delivery, Delivered, State1)
736749
end
737750
end.

0 commit comments

Comments
 (0)