Skip to content

Commit 22d6e74

Browse files
Merge pull request #12921 from rabbitmq/rabbitmq-server-12913
By @noxdafox: Improve rabbit_backing_queue:is_duplicate behaviour (#12913), take 2
2 parents 9d8ae14 + 8d7535e commit 22d6e74

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -725,13 +725,26 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
725725
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
726726
State1 = State#q{backing_queue_state = BQS1},
727727
case IsDuplicate of
728-
true -> State1;
729-
{true, drop} -> State1;
730-
%% Drop publish and nack to publisher
731-
{true, reject} ->
728+
true ->
729+
%% Publish to DLX
730+
_ = with_dlx(
731+
DLX,
732+
fun (X) ->
733+
rabbit_global_counters:messages_dead_lettered(maxlen,
734+
rabbit_classic_queue,
735+
at_most_once, 1),
736+
QName = qname(State1),
737+
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
738+
end,
739+
fun () ->
740+
rabbit_global_counters:messages_dead_lettered(maxlen,
741+
rabbit_classic_queue,
742+
disabled, 1)
743+
end),
744+
%% Drop publish and nack to publisher
732745
send_reject_publish(Delivery, State1);
733-
%% Enqueue and maybe drop head later
734746
false ->
747+
%% Enqueue and maybe drop head later
735748
deliver_or_enqueue(Delivery, Delivered, State1)
736749
end
737750
end.

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,8 @@
220220

221221
%% Called prior to a publish or publish_delivered call. Allows the BQ
222222
%% to signal that it's already seen this message, (e.g. it was published
223-
%% or discarded previously) specifying whether to drop the message or reject it.
224-
-callback is_duplicate(mc:state(), state())
225-
-> {{true, drop} | {true, reject} | boolean(), state()}.
223+
%% or discarded previously).
224+
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.
226225

227226
-callback set_queue_mode(queue_mode(), state()) -> state().
228227

0 commit comments

Comments
 (0)