Skip to content

Commit af68fc8

Browse files
authored
Merge pull request #1765 from rabbitmq/remove-dead-letter-process
Remove dead letter process
2 parents 69bd841 + 0dee589 commit af68fc8

File tree

5 files changed

+14
-160
lines changed

5 files changed

+14
-160
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -954,13 +954,11 @@ dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
954954

955955
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
956956
backing_queue_state = BQS,
957-
backing_queue = BQ,
958-
q = #amqqueue{ name = Resource } }) ->
959-
#resource{virtual_host = VHost} = Resource,
957+
backing_queue = BQ}) ->
960958
QName = qname(State),
961959
{Res, Acks1, BQS1} =
962960
Fun(fun (Msg, AckTag, Acks) ->
963-
rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, [{Reason, Msg}]),
961+
rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
964962
[AckTag | Acks]
965963
end, [], BQS),
966964
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),

src/rabbit_dead_letter.erl

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
-module(rabbit_dead_letter).
1818

19-
-export([publish/6]).
19+
-export([publish/5]).
2020

2121
-include("rabbit.hrl").
2222
-include("rabbit_framing.hrl").
@@ -26,20 +26,17 @@
2626
-type reason() :: 'expired' | 'rejected' | 'maxlen'.
2727

2828
-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
29-
'undefined' | binary(), rabbit_amqqueue:name(),
30-
#{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
29+
'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
3130

3231
%%----------------------------------------------------------------------------
3332

34-
publish(Msg, Reason, X, RK, QName, QueueStates0) ->
33+
publish(Msg, Reason, X, RK, QName) ->
3534
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
3635
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
3736
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
3837
rabbit_exchange:route(X, Delivery)),
3938
lists:foreach(fun log_cycle_once/1, Cycles),
40-
{_, _, QueueStates} = rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues),
41-
Delivery, QueueStates0),
42-
QueueStates.
39+
rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery).
4340

4441
make_msg(Msg = #basic_message{content = Content,
4542
exchange_name = Exchange,

src/rabbit_quorum_queue.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
-export([credit/4]).
2424
-export([purge/1]).
2525
-export([stateless_deliver/2, deliver/3]).
26-
-export([dead_letter_publish/5]).
26+
-export([dead_letter_publish/4]).
2727
-export([queue_name/1]).
2828
-export([cluster_state/1, status/2]).
2929
-export([cancel_consumer_handler/3, cancel_consumer/3]).
@@ -523,11 +523,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
523523
end.
524524

525525
%%----------------------------------------------------------------------------
526-
dlx_mfa(#amqqueue{name = Resource} = Q) ->
527-
#resource{virtual_host = VHost} = Resource,
526+
dlx_mfa(Q) ->
528527
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
529528
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
530-
{?MODULE, dead_letter_publish, [VHost, DLX, DLXRKey, Q#amqqueue.name]}.
529+
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
531530

532531
init_dlx(undefined, _Q) ->
533532
undefined;
@@ -545,10 +544,12 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
545544
{PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
546545
end.
547546

548-
dead_letter_publish(_, undefined, _, _, _) ->
547+
dead_letter_publish(undefined, _, _, _) ->
549548
ok;
550-
dead_letter_publish(VHost, X, RK, QName, ReasonMsgs) ->
551-
rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, ReasonMsgs).
549+
dead_letter_publish(X, RK, QName, ReasonMsgs) ->
550+
{ok, Exchange} = rabbit_exchange:lookup(X),
551+
[rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
552+
|| {Reason, Msg} <- ReasonMsgs].
552553

553554
%% TODO escape hack
554555
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->

src/rabbit_variable_queue.erl

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,10 @@ start(VHost, DurableQueues) ->
488488
Ref = proplists:get_value(persistent_ref, Terms),
489489
Ref =/= undefined
490490
end],
491-
start_dead_letter_process(VHost),
492491
start_msg_store(VHost, ClientRefs, StartFunState),
493492
{ok, AllTerms}.
494493

495494
stop(VHost) ->
496-
ok = rabbit_vhost_dead_letter:stop(VHost),
497495
ok = stop_msg_store(VHost),
498496
ok = rabbit_queue_index:stop(VHost).
499497

@@ -517,14 +515,6 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) ->
517515
exit({error, Error})
518516
end.
519517

520-
start_dead_letter_process(VHost) ->
521-
case rabbit_vhost_dead_letter:start(VHost) of
522-
{ok, _} ->
523-
rabbit_log:info("Started dead letter process for vhost '~s'~n", [VHost]);
524-
Err ->
525-
exit(Err)
526-
end.
527-
528518
abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
529519
abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
530520

src/rabbit_vhost_dead_letter.erl

Lines changed: 0 additions & 132 deletions
This file was deleted.

0 commit comments

Comments
 (0)