Skip to content

Commit cc57bb9

Browse files
authored
Merge pull request #8736 from rabbitmq/mergify/bp/v3.12.x/pr-8015
Settle unroutable message with released state (backport #8015)
2 parents 4c216ff + 0131046 commit cc57bb9

File tree

9 files changed

+233
-21
lines changed

9 files changed

+233
-21
lines changed

deps/amqp_client/src/amqp_channel.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,9 @@ flush_writer(#state{driver = direct}) ->
886886
ok.
887887
amqp_msg(none) ->
888888
none;
889+
amqp_msg({DTag, Content}) ->
890+
{Props, Payload} = rabbit_basic_common:from_content(Content),
891+
{DTag, #amqp_msg{props = Props, payload = Payload}};
889892
amqp_msg(Content) ->
890893
{Props, Payload} = rabbit_basic_common:from_content(Content),
891894
#amqp_msg{props = Props, payload = Payload}.

deps/rabbit/src/rabbit_channel.erl

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@
111111
consumer_timeout,
112112
authz_context,
113113
%% defines how ofter gc will be executed
114-
writer_gc_threshold
114+
writer_gc_threshold,
115+
%% true with AMQP 1.0 to include the publishing sequence
116+
%% in the return callback, false otherwise
117+
extended_return_callback
115118
}).
116119

117120
-record(pending_ack, {
@@ -518,6 +521,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
518521
MaxMessageSize = get_max_message_size(),
519522
ConsumerTimeout = get_consumer_timeout(),
520523
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
524+
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
521525
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
522526
State = #ch{cfg = #conf{state = starting,
523527
protocol = Protocol,
@@ -536,7 +540,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
536540
max_message_size = MaxMessageSize,
537541
consumer_timeout = ConsumerTimeout,
538542
authz_context = OptionalVariables,
539-
writer_gc_threshold = GCThreshold
543+
writer_gc_threshold = GCThreshold,
544+
extended_return_callback = UseExtendedReturnCallback
540545
},
541546
limiter = Limiter,
542547
tx = none,
@@ -1076,6 +1081,15 @@ extract_variable_map_from_amqp_params([Value]) ->
10761081
extract_variable_map_from_amqp_params(_) ->
10771082
#{}.
10781083

1084+
%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
1085+
%% Used for AMQP 1.0
1086+
use_extended_return_callback({amqp_params_direct,_,_,_,_,
1087+
{amqp_adapter_info,_,_,_,_,_,{'AMQP',"1.0"},_},
1088+
_}) ->
1089+
true;
1090+
use_extended_return_callback(_) ->
1091+
false.
1092+
10791093
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
10801094
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
10811095
case Size of
@@ -1917,9 +1931,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
19171931
ok
19181932
end.
19191933

1920-
basic_return(#basic_message{exchange_name = ExchangeName,
1921-
routing_keys = [RoutingKey | _CcRoutes],
1922-
content = Content},
1934+
basic_return(Content, #basic_message{exchange_name = ExchangeName,
1935+
routing_keys = [RoutingKey | _CcRoutes]},
19231936
State = #ch{cfg = #conf{protocol = Protocol,
19241937
writer_pid = WriterPid}},
19251938
Reason) ->
@@ -2154,7 +2167,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21542167
mandatory = Mandatory,
21552168
confirm = Confirm,
21562169
msg_seq_no = MsgSeqNo},
2157-
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
2170+
RoutedToQueueNames = [QName]},
2171+
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2172+
queue_states = QueueStates0}) -> %% optimisation when there is one queue
21582173
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
21592174
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
21602175
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2163,7 +2178,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21632178
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
21642179
%% NB: the order here is important since basic.returns must be
21652180
%% sent before confirms.
2166-
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2181+
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
21672182
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
21682183
%% Actions must be processed after registering confirms as actions may
21692184
%% contain rejections of publishes
@@ -2191,7 +2206,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21912206
mandatory = Mandatory,
21922207
confirm = Confirm,
21932208
msg_seq_no = MsgSeqNo},
2194-
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
2209+
RoutedToQueueNames},
2210+
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
2211+
queue_states = QueueStates0}) ->
21952212
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
21962213
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
21972214
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
@@ -2200,7 +2217,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22002217
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
22012218
%% NB: the order here is important since basic.returns must be
22022219
%% sent before confirms.
2203-
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2220+
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
22042221
State1 = process_routing_confirm(Confirm, QueueNames,
22052222
MsgSeqNo, XName, State0),
22062223
%% Actions must be processed after registering confirms as actions may
@@ -2222,19 +2239,32 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22222239
[rabbit_misc:rs(Resource)])
22232240
end.
22242241

2225-
process_routing_mandatory(_Mandatory = true,
2242+
process_routing_mandatory(_ExtendedReturnCallback = false,
2243+
_Mandatory = true,
2244+
_RoutedToQs = [],
2245+
_MsgSeqNo,
2246+
#basic_message{content = Content} = Msg, State) ->
2247+
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2248+
ok = basic_return(Content, Msg, State, no_route),
2249+
ok;
2250+
process_routing_mandatory(_ExtendedReturnCallback = true,
2251+
_Mandatory = true,
22262252
_RoutedToQs = [],
2227-
Msg, State) ->
2253+
MsgSeqNo,
2254+
#basic_message{content = Content} = Msg, State) ->
22282255
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
2229-
ok = basic_return(Msg, State, no_route),
2256+
%% providing the publishing sequence for AMQP 1.0
2257+
ok = basic_return({MsgSeqNo, Content}, Msg, State, no_route),
22302258
ok;
2231-
process_routing_mandatory(_Mandatory = false,
2259+
process_routing_mandatory(_ExtendedReturnCallback,
2260+
_Mandatory = false,
22322261
_RoutedToQs = [],
2262+
_MsgSeqNo,
22332263
#basic_message{exchange_name = ExchangeName}, State) ->
22342264
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
22352265
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
22362266
ok;
2237-
process_routing_mandatory(_, _, _, _) ->
2267+
process_routing_mandatory(_, _, _, _, _, _) ->
22382268
ok.
22392269

22402270
process_routing_confirm(false, _, _, _, State) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@
105105

106106
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
107107
-spec send_command
108-
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
108+
(pid(), rabbit_framing:amqp_method_record(),
109+
rabbit_types:content() |
110+
{integer(), rabbit_types:content()} %% publishing sequence for AMQP 1.0 return callback
111+
) ->
109112
'ok'.
110113
-spec send_command_sync(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
111114
-spec send_command_sync

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_incoming_link.erl

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
%% Just make these constant for the time being.
1818
-define(INCOMING_CREDIT, 65536).
1919

20-
-record(incoming_link, {name, exchange, routing_key,
20+
-record(incoming_link, {name, exchange, routing_key, mandatory,
2121
delivery_id = undefined,
2222
delivery_count = 0,
2323
send_settle_mode = undefined,
@@ -53,6 +53,7 @@ attach(#'v1_0.attach'{name = Name,
5353
SndSettleMode == ?V_1_0_SENDER_SETTLE_MODE_MIXED ->
5454
amqp_channel:register_confirm_handler(BCh, self()),
5555
rabbit_amqp1_0_channel:call(BCh, #'confirm.select'{}),
56+
amqp_channel:register_return_handler(BCh, self()),
5657
true
5758
end,
5859
Flow = #'v1_0.flow'{ handle = Handle,
@@ -69,7 +70,8 @@ attach(#'v1_0.attach'{name = Name,
6970
initial_delivery_count = undefined, % must be, I am the receiver
7071
role = ?RECV_ROLE}, %% server is receiver
7172
IncomingLink1 =
72-
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode},
73+
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode,
74+
mandatory = Confirm},
7375
{ok, [Attach, Flow], IncomingLink1, Confirm};
7476
{error, Reason} ->
7577
%% TODO proper link establishment protocol here?
@@ -142,7 +144,8 @@ transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
142144
end,
143145
rabbit_amqp1_0_channel:cast_flow(
144146
BCh, #'basic.publish'{exchange = X,
145-
routing_key = RKey}, Msg),
147+
routing_key = RKey,
148+
mandatory = true}, Msg),
146149
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
147150
C when C =< 0 ->
148151
{true, ?INCOMING_CREDIT div 2};
@@ -206,6 +209,7 @@ ensure_target(Target = #'v1_0.target'{address = Address,
206209
dest, DCh, Dest, DeclareParams,
207210
RouteState)
208211
end),
212+
maybe_ensure_queue(Dest, DCh),
209213
{XName, RK} = rabbit_routing_util:parse_routing(Dest),
210214
{ok, Target, Link#incoming_link{
211215
route_state = RouteState1,
@@ -222,6 +226,20 @@ ensure_target(Target = #'v1_0.target'{address = Address,
222226
{error, {address_not_utf8_string, Address}}
223227
end.
224228

229+
maybe_ensure_queue({amqqueue, Q}, Ch) ->
230+
try
231+
rabbit_amqp1_0_channel:convert_error(
232+
fun () ->
233+
Method = #'queue.declare'{queue = list_to_binary(Q),
234+
passive = true},
235+
amqp_channel:call(Ch, Method)
236+
end)
237+
catch exit:#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED} ->
238+
ok
239+
end;
240+
maybe_ensure_queue(_, _) ->
241+
ok.
242+
225243
incoming_flow(#incoming_link{ delivery_count = Count }, Handle) ->
226244
#'v1_0.flow'{handle = Handle,
227245
delivery_count = {uint, Count},

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
incr_incoming_id/1, next_delivery_id/1, transfers_left/1,
1515
record_transfers/2, bump_outgoing_window/1,
1616
record_outgoing/4, settle/3, flow_fields/2, channel/1,
17-
flow/2, ack/2, validate_attach/1]).
17+
flow/2, ack/2, return/2, validate_attach/1]).
1818

1919
-import(rabbit_amqp1_0_util, [protocol_error/3,
2020
serial_add/2, serial_diff/2, serial_compare/2]).
@@ -396,3 +396,25 @@ acknowledgement(DeliveryIds, Disposition) ->
396396
last = {uint, lists:last(DeliveryIds)},
397397
settled = true,
398398
state = #'v1_0.accepted'{} }.
399+
400+
return(DTag, Session = #session{incoming_unsettled_map = Unsettled}) ->
401+
{DeliveryId,
402+
Unsettled1} = case gb_trees:lookup(DTag, Unsettled) of
403+
{value, #incoming_delivery{ delivery_id = Id }} ->
404+
{Id, gb_trees:delete(DTag, Unsettled)};
405+
none ->
406+
{undefined, Unsettled}
407+
end,
408+
Disposition = case DeliveryId of
409+
undefined -> undefined;
410+
_ -> release(DeliveryId,
411+
#'v1_0.disposition'{role = ?RECV_ROLE})
412+
end,
413+
{Disposition,
414+
Session#session{incoming_unsettled_map = Unsettled1}}.
415+
416+
release(DeliveryId, Disposition) ->
417+
Disposition#'v1_0.disposition'{ first = {uint, DeliveryId},
418+
last = {uint, DeliveryId},
419+
settled = true,
420+
state = #'v1_0.released'{} }.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,24 @@ handle_info(#'basic.ack'{} = Ack, State = #state{writer_pid = WriterPid,
130130
F <- rabbit_amqp1_0_session:flow_fields(Reply, Session)],
131131
{noreply, state(Session1, State)};
132132

133+
handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
134+
session = Session}) ->
135+
{Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
136+
case Reply of
137+
undefined ->
138+
ok;
139+
_ ->
140+
rabbit_amqp1_0_writer:send_command(
141+
WriterPid,
142+
rabbit_amqp1_0_session:flow_fields(Reply, Session)
143+
)
144+
end,
145+
{noreply, state(Session1, State)};
146+
147+
handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
148+
rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
149+
{noreply, state(Session, State)};
150+
133151
handle_info({bump_credit, Msg}, State) ->
134152
credit_flow:handle_bump_msg(Msg),
135153
{noreply, State};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ groups() ->
2424
[
2525
{tests, [], [
2626
reliable_send_receive_with_outcomes,
27+
publishing_to_non_existing_queue_should_settle_with_released,
28+
open_link_to_non_existing_destination_should_end_session,
2729
roundtrip_classic_queue_with_drain,
2830
roundtrip_quorum_queue_with_drain,
2931
roundtrip_stream_queue_with_drain,
@@ -151,6 +153,68 @@ reliable_send_receive(Config, Outcome) ->
151153

152154
ok.
153155

156+
publishing_to_non_existing_queue_should_settle_with_released(Config) ->
157+
Container = atom_to_binary(?FUNCTION_NAME, utf8),
158+
Suffix = <<"foo">>,
159+
%% does not exist
160+
QName = <<Container/binary, Suffix/binary>>,
161+
Host = ?config(rmq_hostname, Config),
162+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
163+
Address = <<"/exchange/amq.direct/", QName/binary>>,
164+
165+
OpnConf = #{address => Host,
166+
port => Port,
167+
container_id => Container,
168+
sasl => {plain, <<"guest">>, <<"guest">>}},
169+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
170+
{ok, Session} = amqp10_client:begin_session(Connection),
171+
SenderLinkName = <<"test-sender">>,
172+
{ok, Sender} = amqp10_client:attach_sender_link(Session,
173+
SenderLinkName,
174+
Address),
175+
ok = wait_for_credit(Sender),
176+
DTag1 = <<"dtag-1">>,
177+
%% create an unsettled message,
178+
%% link will be in "mixed" mode by default
179+
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
180+
ok = amqp10_client:send_msg(Sender, Msg1),
181+
ok = wait_for_settlement(DTag1, released),
182+
183+
ok = amqp10_client:detach_link(Sender),
184+
ok = amqp10_client:close_connection(Connection),
185+
flush("post sender close"),
186+
ok.
187+
188+
open_link_to_non_existing_destination_should_end_session(Config) ->
189+
Container = atom_to_list(?FUNCTION_NAME),
190+
Name = Container ++ "foo",
191+
Addresses = [
192+
"/exchange/" ++ Name ++ "/bar",
193+
"/amq/queue/" ++ Name
194+
],
195+
Host = ?config(rmq_hostname, Config),
196+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
197+
OpnConf = #{address => Host,
198+
port => Port,
199+
container_id => list_to_binary(Container),
200+
sasl => {plain, <<"guest">>, <<"guest">>}},
201+
202+
[begin
203+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
204+
{ok, Session} = amqp10_client:begin_session(Connection),
205+
SenderLinkName = <<"test-sender">>,
206+
ct:pal("Address ~p", [Address]),
207+
{ok, _} = amqp10_client:attach_sender_link(Session,
208+
SenderLinkName,
209+
list_to_binary(Address)),
210+
211+
wait_for_session_end(Session),
212+
ok = amqp10_client:close_connection(Connection),
213+
flush("post sender close")
214+
215+
end || Address <- Addresses],
216+
ok.
217+
154218
roundtrip_classic_queue_with_drain(Config) ->
155219
QName = atom_to_binary(?FUNCTION_NAME, utf8),
156220
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
@@ -370,14 +434,27 @@ wait_for_credit(Sender) ->
370434
ct:fail(credited_timeout)
371435
end.
372436

437+
wait_for_session_end(Session) ->
438+
receive
439+
{amqp10_event, {session, Session, {ended, _}}} ->
440+
flush(?FUNCTION_NAME),
441+
ok
442+
after 5000 ->
443+
flush("wait_for_session_end timed out"),
444+
ct:fail(settled_timeout)
445+
end.
446+
373447
wait_for_settlement(Tag) ->
448+
wait_for_settlement(Tag, accepted).
449+
450+
wait_for_settlement(Tag, State) ->
374451
receive
375-
{amqp10_disposition, {accepted, Tag}} ->
452+
{amqp10_disposition, {State, Tag}} ->
376453
flush(?FUNCTION_NAME),
377454
ok
378455
after 5000 ->
379456
flush("wait_for_settlement timed out"),
380-
ct:fail(credited_timeout)
457+
ct:fail(settled_timeout)
381458
end.
382459

383460
wait_for_accepts(0) -> ok;

0 commit comments

Comments
 (0)