Skip to content

Commit d99be25

Browse files
Merge pull request #8739 from rabbitmq/mergify/bp/v3.11.x/pr-8736
Settle unroutable message with released state (backport #8015) (backport #8736)
2 parents 7b75296 + 2566c73 commit d99be25

File tree

9 files changed

+233
-21
lines changed

9 files changed

+233
-21
lines changed

deps/amqp_client/src/amqp_channel.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,9 @@ flush_writer(#state{driver = direct}) ->
886886
ok.
887887
amqp_msg(none) ->
888888
none;
889+
amqp_msg({DTag, Content}) ->
890+
{Props, Payload} = rabbit_basic_common:from_content(Content),
891+
{DTag, #amqp_msg{props = Props, payload = Payload}};
889892
amqp_msg(Content) ->
890893
{Props, Payload} = rabbit_basic_common:from_content(Content),
891894
#amqp_msg{props = Props, payload = Payload}.

deps/rabbit/src/rabbit_channel.erl

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,10 @@
113113
consumer_timeout,
114114
authz_context,
115115
%% defines how ofter gc will be executed
116-
writer_gc_threshold
116+
writer_gc_threshold,
117+
%% true with AMQP 1.0 to include the publishing sequence
118+
%% in the return callback, false otherwise
119+
extended_return_callback
117120
}).
118121

119122
-record(pending_ack, {
@@ -520,6 +523,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
520523
MaxMessageSize = get_max_message_size(),
521524
ConsumerTimeout = get_consumer_timeout(),
522525
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
526+
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
523527
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
524528
State = #ch{cfg = #conf{state = starting,
525529
protocol = Protocol,
@@ -538,7 +542,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
538542
max_message_size = MaxMessageSize,
539543
consumer_timeout = ConsumerTimeout,
540544
authz_context = OptionalVariables,
541-
writer_gc_threshold = GCThreshold
545+
writer_gc_threshold = GCThreshold,
546+
extended_return_callback = UseExtendedReturnCallback
542547
},
543548
limiter = Limiter,
544549
tx = none,
@@ -1101,6 +1106,15 @@ extract_variable_map_from_amqp_params([Value]) ->
11011106
extract_variable_map_from_amqp_params(_) ->
11021107
#{}.
11031108

1109+
%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
1110+
%% Used for AMQP 1.0
1111+
use_extended_return_callback({amqp_params_direct,_,_,_,_,
1112+
{amqp_adapter_info,_,_,_,_,_,{'AMQP',"1.0"},_},
1113+
_}) ->
1114+
true;
1115+
use_extended_return_callback(_) ->
1116+
false.
1117+
11041118
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
11051119
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
11061120
case Size of
@@ -1942,9 +1956,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
19421956
ok
19431957
end.
19441958

1945-
basic_return(#basic_message{exchange_name = ExchangeName,
1946-
routing_keys = [RoutingKey | _CcRoutes],
1947-
content = Content},
1959+
basic_return(Content, #basic_message{exchange_name = ExchangeName,
1960+
routing_keys = [RoutingKey | _CcRoutes]},
19481961
State = #ch{cfg = #conf{protocol = Protocol,
19491962
writer_pid = WriterPid}},
19501963
Reason) ->
@@ -2179,7 +2192,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21792192
mandatory = Mandatory,
21802193
confirm = Confirm,
21812194
msg_seq_no = MsgSeqNo},
2182-
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
2195+
RoutedToQueueNames = [QName]},
2196+
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2197+
queue_states = QueueStates0}) -> %% optimisation when there is one queue
21832198
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
21842199
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
21852200
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2188,7 +2203,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21882203
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
21892204
%% NB: the order here is important since basic.returns must be
21902205
%% sent before confirms.
2191-
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2206+
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
21922207
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
21932208
%% Actions must be processed after registering confirms as actions may
21942209
%% contain rejections of publishes
@@ -2216,7 +2231,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22162231
mandatory = Mandatory,
22172232
confirm = Confirm,
22182233
msg_seq_no = MsgSeqNo},
2219-
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
2234+
RoutedToQueueNames},
2235+
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2236+
queue_states = QueueStates0}) ->
22202237
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
22212238
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
22222239
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2225,7 +2242,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22252242
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
22262243
%% NB: the order here is important since basic.returns must be
22272244
%% sent before confirms.
2228-
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2245+
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
22292246
State1 = process_routing_confirm(Confirm, QueueNames,
22302247
MsgSeqNo, XName, State0),
22312248
%% Actions must be processed after registering confirms as actions may
@@ -2247,19 +2264,32 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22472264
[rabbit_misc:rs(Resource)])
22482265
end.
22492266

2250-
process_routing_mandatory(_Mandatory = true,
2267+
process_routing_mandatory(_ExtendedReturnCallback = false,
2268+
_Mandatory = true,
2269+
_RoutedToQs = [],
2270+
_MsgSeqNo,
2271+
#basic_message{content = Content} = Msg, State) ->
2272+
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2273+
ok = basic_return(Content, Msg, State, no_route),
2274+
ok;
2275+
process_routing_mandatory(_ExtendedReturnCallback = true,
2276+
_Mandatory = true,
22512277
_RoutedToQs = [],
2252-
Msg, State) ->
2278+
MsgSeqNo,
2279+
#basic_message{content = Content} = Msg, State) ->
22532280
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2254-
ok = basic_return(Msg, State, no_route),
2281+
%% providing the publishing sequence for AMQP 1.0
2282+
ok = basic_return({MsgSeqNo, Content}, Msg, State, no_route),
22552283
ok;
2256-
process_routing_mandatory(_Mandatory = false,
2284+
process_routing_mandatory(_ExtendedReturnCallback,
2285+
_Mandatory = false,
22572286
_RoutedToQs = [],
2287+
_MsgSeqNo,
22582288
#basic_message{exchange_name = ExchangeName}, State) ->
22592289
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
22602290
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
22612291
ok;
2262-
process_routing_mandatory(_, _, _, _) ->
2292+
process_routing_mandatory(_, _, _, _, _, _) ->
22632293
ok.
22642294

22652295
process_routing_confirm(false, _, _, _, State) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@
105105

106106
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
107107
-spec send_command
108-
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
108+
(pid(), rabbit_framing:amqp_method_record(),
109+
rabbit_types:content() |
110+
{integer(), rabbit_types:content()} %% publishing sequence for AMQP 1.0 return callback
111+
) ->
109112
'ok'.
110113
-spec send_command_sync(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
111114
-spec send_command_sync

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
%% Just make these constant for the time being.
1818
-define(INCOMING_CREDIT, 65536).
1919

20-
-record(incoming_link, {name, exchange, routing_key,
20+
-record(incoming_link, {name, exchange, routing_key, mandatory,
2121
delivery_id = undefined,
2222
delivery_count = 0,
2323
send_settle_mode = undefined,
@@ -53,6 +53,7 @@ attach(#'v1_0.attach'{name = Name,
5353
SndSettleMode == ?V_1_0_SENDER_SETTLE_MODE_MIXED ->
5454
amqp_channel:register_confirm_handler(BCh, self()),
5555
rabbit_amqp1_0_channel:call(BCh, #'confirm.select'{}),
56+
amqp_channel:register_return_handler(BCh, self()),
5657
true
5758
end,
5859
Flow = #'v1_0.flow'{ handle = Handle,
@@ -69,7 +70,8 @@ attach(#'v1_0.attach'{name = Name,
6970
initial_delivery_count = undefined, % must be, I am the receiver
7071
role = ?RECV_ROLE}, %% server is receiver
7172
IncomingLink1 =
72-
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode},
73+
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode,
74+
mandatory = Confirm},
7375
{ok, [Attach, Flow], IncomingLink1, Confirm};
7476
{error, Reason} ->
7577
%% TODO proper link establishment protocol here?
@@ -142,7 +144,8 @@ transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
142144
end,
143145
rabbit_amqp1_0_channel:cast_flow(
144146
BCh, #'basic.publish'{exchange = X,
145-
routing_key = RKey}, Msg),
147+
routing_key = RKey,
148+
mandatory = true}, Msg),
146149
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
147150
C when C =< 0 ->
148151
{true, ?INCOMING_CREDIT div 2};
@@ -206,6 +209,7 @@ ensure_target(Target = #'v1_0.target'{address = Address,
206209
dest, DCh, Dest, DeclareParams,
207210
RouteState)
208211
end),
212+
maybe_ensure_queue(Dest, DCh),
209213
{XName, RK} = rabbit_routing_util:parse_routing(Dest),
210214
{ok, Target, Link#incoming_link{
211215
route_state = RouteState1,
@@ -222,6 +226,20 @@ ensure_target(Target = #'v1_0.target'{address = Address,
222226
{error, {address_not_utf8_string, Address}}
223227
end.
224228

229+
maybe_ensure_queue({amqqueue, Q}, Ch) ->
230+
try
231+
rabbit_amqp1_0_channel:convert_error(
232+
fun () ->
233+
Method = #'queue.declare'{queue = list_to_binary(Q),
234+
passive = true},
235+
amqp_channel:call(Ch, Method)
236+
end)
237+
catch exit:#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED} ->
238+
ok
239+
end;
240+
maybe_ensure_queue(_, _) ->
241+
ok.
242+
225243
incoming_flow(#incoming_link{ delivery_count = Count }, Handle) ->
226244
#'v1_0.flow'{handle = Handle,
227245
delivery_count = {uint, Count},

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
incr_incoming_id/1, next_delivery_id/1, transfers_left/1,
1515
record_transfers/2, bump_outgoing_window/1,
1616
record_outgoing/4, settle/3, flow_fields/2, channel/1,
17-
flow/2, ack/2, validate_attach/1]).
17+
flow/2, ack/2, return/2, validate_attach/1]).
1818

1919
-import(rabbit_amqp1_0_util, [protocol_error/3,
2020
serial_add/2, serial_diff/2, serial_compare/2]).
@@ -396,3 +396,25 @@ acknowledgement(DeliveryIds, Disposition) ->
396396
last = {uint, lists:last(DeliveryIds)},
397397
settled = true,
398398
state = #'v1_0.accepted'{} }.
399+
400+
return(DTag, Session = #session{incoming_unsettled_map = Unsettled}) ->
401+
{DeliveryId,
402+
Unsettled1} = case gb_trees:lookup(DTag, Unsettled) of
403+
{value, #incoming_delivery{ delivery_id = Id }} ->
404+
{Id, gb_trees:delete(DTag, Unsettled)};
405+
none ->
406+
{undefined, Unsettled}
407+
end,
408+
Disposition = case DeliveryId of
409+
undefined -> undefined;
410+
_ -> release(DeliveryId,
411+
#'v1_0.disposition'{role = ?RECV_ROLE})
412+
end,
413+
{Disposition,
414+
Session#session{incoming_unsettled_map = Unsettled1}}.
415+
416+
release(DeliveryId, Disposition) ->
417+
Disposition#'v1_0.disposition'{ first = {uint, DeliveryId},
418+
last = {uint, DeliveryId},
419+
settled = true,
420+
state = #'v1_0.released'{} }.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,24 @@ handle_info(#'basic.ack'{} = Ack, State = #state{writer_pid = WriterPid,
130130
F <- rabbit_amqp1_0_session:flow_fields(Reply, Session)],
131131
{noreply, state(Session1, State)};
132132

133+
handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
134+
session = Session}) ->
135+
{Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
136+
case Reply of
137+
undefined ->
138+
ok;
139+
_ ->
140+
rabbit_amqp1_0_writer:send_command(
141+
WriterPid,
142+
rabbit_amqp1_0_session:flow_fields(Reply, Session)
143+
)
144+
end,
145+
{noreply, state(Session1, State)};
146+
147+
handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
148+
rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
149+
{noreply, state(Session, State)};
150+
133151
handle_info({bump_credit, Msg}, State) ->
134152
credit_flow:handle_bump_msg(Msg),
135153
{noreply, State};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ groups() ->
2424
[
2525
{tests, [], [
2626
reliable_send_receive_with_outcomes,
27+
publishing_to_non_existing_queue_should_settle_with_released,
28+
open_link_to_non_existing_destination_should_end_session,
2729
roundtrip_classic_queue_with_drain,
2830
roundtrip_quorum_queue_with_drain,
2931
roundtrip_stream_queue_with_drain,
@@ -163,6 +165,68 @@ reliable_send_receive(Config, Outcome) ->
163165

164166
ok.
165167

168+
publishing_to_non_existing_queue_should_settle_with_released(Config) ->
169+
Container = atom_to_binary(?FUNCTION_NAME, utf8),
170+
Suffix = <<"foo">>,
171+
%% does not exist
172+
QName = <<Container/binary, Suffix/binary>>,
173+
Host = ?config(rmq_hostname, Config),
174+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
175+
Address = <<"/exchange/amq.direct/", QName/binary>>,
176+
177+
OpnConf = #{address => Host,
178+
port => Port,
179+
container_id => Container,
180+
sasl => {plain, <<"guest">>, <<"guest">>}},
181+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
182+
{ok, Session} = amqp10_client:begin_session(Connection),
183+
SenderLinkName = <<"test-sender">>,
184+
{ok, Sender} = amqp10_client:attach_sender_link(Session,
185+
SenderLinkName,
186+
Address),
187+
ok = wait_for_credit(Sender),
188+
DTag1 = <<"dtag-1">>,
189+
%% create an unsettled message,
190+
%% link will be in "mixed" mode by default
191+
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
192+
ok = amqp10_client:send_msg(Sender, Msg1),
193+
ok = wait_for_settlement(DTag1, released),
194+
195+
ok = amqp10_client:detach_link(Sender),
196+
ok = amqp10_client:close_connection(Connection),
197+
flush("post sender close"),
198+
ok.
199+
200+
open_link_to_non_existing_destination_should_end_session(Config) ->
201+
Container = atom_to_list(?FUNCTION_NAME),
202+
Name = Container ++ "foo",
203+
Addresses = [
204+
"/exchange/" ++ Name ++ "/bar",
205+
"/amq/queue/" ++ Name
206+
],
207+
Host = ?config(rmq_hostname, Config),
208+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
209+
OpnConf = #{address => Host,
210+
port => Port,
211+
container_id => list_to_binary(Container),
212+
sasl => {plain, <<"guest">>, <<"guest">>}},
213+
214+
[begin
215+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
216+
{ok, Session} = amqp10_client:begin_session(Connection),
217+
SenderLinkName = <<"test-sender">>,
218+
ct:pal("Address ~p", [Address]),
219+
{ok, _} = amqp10_client:attach_sender_link(Session,
220+
SenderLinkName,
221+
list_to_binary(Address)),
222+
223+
wait_for_session_end(Session),
224+
ok = amqp10_client:close_connection(Connection),
225+
flush("post sender close")
226+
227+
end || Address <- Addresses],
228+
ok.
229+
166230
roundtrip_classic_queue_with_drain(Config) ->
167231
QName = atom_to_binary(?FUNCTION_NAME, utf8),
168232
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
@@ -382,14 +446,27 @@ wait_for_credit(Sender) ->
382446
ct:fail(credited_timeout)
383447
end.
384448

449+
wait_for_session_end(Session) ->
450+
receive
451+
{amqp10_event, {session, Session, {ended, _}}} ->
452+
flush(?FUNCTION_NAME),
453+
ok
454+
after 5000 ->
455+
flush("wait_for_session_end timed out"),
456+
ct:fail(settled_timeout)
457+
end.
458+
385459
wait_for_settlement(Tag) ->
460+
wait_for_settlement(Tag, accepted).
461+
462+
wait_for_settlement(Tag, State) ->
386463
receive
387-
{amqp10_disposition, {accepted, Tag}} ->
464+
{amqp10_disposition, {State, Tag}} ->
388465
flush(?FUNCTION_NAME),
389466
ok
390467
after 5000 ->
391468
flush("wait_for_settlement timed out"),
392-
ct:fail(credited_timeout)
469+
ct:fail(settled_timeout)
393470
end.
394471

395472
wait_for_accepts(0) -> ok;

0 commit comments

Comments
 (0)