Skip to content

Commit d16d296

Browse files
Merge pull request #8821 from rabbitmq/mergify/bp/v3.12.x/pr-8799
Fix at-least-once dead lettering when the target include the source (backport #8799)
2 parents 23f10fc + e148941 commit d16d296

File tree

5 files changed

+15
-11
lines changed

5 files changed

+15
-11
lines changed

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ delivery_effects(CPid, Msgs0) ->
236236
Msgs = lists:zipwith(fun (Cmd, {Reason, MsgId}) ->
237237
{MsgId, {Reason, rabbit_fifo:get_msg(Cmd)}}
238238
end, Log, RsnIds),
239-
[{send_msg, CPid, {dlx_delivery, Msgs}, [ra_event]}]
239+
[{send_msg, CPid, {dlx_event, self(), {dlx_delivery, Msgs}}, [cast]}]
240240
end}].
241241

242242
-spec state_enter(ra_server:ra_state() | eol, rabbit_types:r('queue'), dead_letter_handler(), state()) ->
@@ -308,7 +308,7 @@ update_config(at_least_once, at_least_once, _, State) ->
308308
{State, []};
309309
Pid ->
310310
%% Notify rabbit_fifo_dlx_worker about potentially updated policies.
311-
{State, [{send_msg, Pid, lookup_topology, ra_event}]}
311+
{State, [{send_msg, Pid, {dlx_event, self(), lookup_topology}, cast}]}
312312
end;
313313
update_config(SameDLH, SameDLH, _, State) ->
314314
{State, []};

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ process_command(Cmd, #state{leader = Leader} = State, Tries) ->
5757
process_command(Cmd, State, Tries - 1)
5858
end.
5959

60-
-spec handle_ra_event(ra:server_id(), term(), state()) ->
60+
-spec handle_ra_event(pid(), term(), state()) ->
6161
{ok, state(), actions()}.
62-
handle_ra_event(Leader, {machine, {dlx_delivery, _} = Del}, #state{leader = Leader} = State) ->
62+
handle_ra_event(Leader, {dlx_delivery, _} = Del,
63+
#state{leader = _Leader} = State) when node(Leader) == node() ->
6364
handle_delivery(Del, State);
6465
handle_ra_event(From, Evt, State) ->
6566
rabbit_log:debug("Ignoring ra event ~tp from ~tp", [Evt, From]),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,21 @@ handle_call(Request, From, State) ->
135135
rabbit_log:info("~ts received unhandled call from ~tp: ~tp", [?MODULE, From, Request]),
136136
{noreply, State}.
137137

138-
handle_cast({queue_event, QRef, {_From, {machine, lookup_topology}}},
139-
#state{queue_ref = QRef} = State0) ->
138+
handle_cast({dlx_event, _LeaderPid, lookup_topology},
139+
#state{queue_ref = _} = State0) ->
140140
State = lookup_topology(State0),
141141
redeliver_and_ack(State);
142-
handle_cast({queue_event, QRef, {From, Evt}},
143-
#state{queue_ref = QRef,
142+
handle_cast({dlx_event, LeaderPid, Evt},
143+
#state{queue_ref = _QRef,
144144
dlx_client_state = DlxState0} = State0) ->
145145
%% received dead-letter message from source queue
146-
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(From, Evt, DlxState0),
146+
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0),
147147
State1 = State0#state{dlx_client_state = DlxState},
148148
State = handle_queue_actions(Actions, State1),
149149
{noreply, State};
150150
handle_cast({queue_event, QRef, Evt},
151151
#state{queue_type_state = QTypeState0} = State0) ->
152+
152153
case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of
153154
{ok, QTypeState1, Actions} ->
154155
%% received e.g. confirm from target queue

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
-module(dead_lettering_SUITE).
1010

1111
-include_lib("common_test/include/ct.hrl").
12-
-include_lib("kernel/include/file.hrl").
1312
-include_lib("amqp_client/include/amqp_client.hrl").
1413
-include_lib("eunit/include/eunit.hrl").
1514
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
@@ -1043,6 +1042,7 @@ dead_letter_headers_cycle(Config) ->
10431042
publish(Ch, QName, [P]),
10441043
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
10451044
[DTag] = consume(Ch, QName, [P]),
1045+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10461046
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
10471047
multiple = false,
10481048
requeue = false}),
@@ -1053,6 +1053,7 @@ dead_letter_headers_cycle(Config) ->
10531053
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
10541054
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
10551055

1056+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10561057
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
10571058
multiple = false,
10581059
requeue = false}),

deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,8 @@ reject_publish(Config, QArg) when is_tuple(QArg) ->
589589
ok = publish_confirm(Ch, SourceQ),
590590
RaName = ra_name(SourceQ),
591591
eventually(?_assertMatch([{2, 2}], %% 2 messages with 1 byte each
592-
dirty_query([Server], RaName, fun rabbit_fifo:query_stat_dlx/1))),
592+
dirty_query([Server], RaName,
593+
fun rabbit_fifo:query_stat_dlx/1))),
593594
%% Now, we have 2 expired messages in the source quorum queue's discards queue.
594595
%% Now that we are over the limit we expect publishes to be rejected.
595596
?assertEqual(fail, publish_confirm(Ch, SourceQ)),

0 commit comments

Comments
 (0)