Skip to content

Commit 7888799

Browse files
authored
Merge pull request #12391 from rabbitmq/anon-term-errors
Comply with §2.2.2 of Anonymous Terminus extension
2 parents cf0a4e8 + 6863ae1 commit 7888799

File tree

5 files changed

+126
-45
lines changed

5 files changed

+126
-45
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,8 @@ rcv_settle_mode(_) -> undefined.
961961
% TODO: work out if we can assume accepted
962962
translate_delivery_state(undefined) -> undefined;
963963
translate_delivery_state(#'v1_0.accepted'{}) -> accepted;
964-
translate_delivery_state(#'v1_0.rejected'{}) -> rejected;
964+
translate_delivery_state(#'v1_0.rejected'{error = undefined}) -> rejected;
965+
translate_delivery_state(#'v1_0.rejected'{error = Error}) -> {rejected, Error};
965966
translate_delivery_state(#'v1_0.modified'{}) -> modified;
966967
translate_delivery_state(#'v1_0.released'{}) -> released;
967968
translate_delivery_state(#'v1_0.received'{}) -> received;

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2344,6 +2344,7 @@ incoming_link_transfer(
23442344
PayloadSize = iolist_size(PayloadBin),
23452345
validate_message_size(PayloadSize, MaxMessageSize),
23462346
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
2347+
messages_received(Settled),
23472348

23482349
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23492350
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
@@ -2352,7 +2353,6 @@ incoming_link_transfer(
23522353
check_user_id(Mc2, User),
23532354
TopicPermCache = check_write_permitted_on_topic(
23542355
X, User, RoutingKey, TopicPermCache0),
2355-
messages_received(Settled),
23562356
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
23572357
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
23582358
Opts = #{correlation => {HandleInt, DeliveryId}},
@@ -2388,9 +2388,34 @@ incoming_link_transfer(
23882388
[DeliveryTag, DeliveryId, Reason])
23892389
end;
23902390
{error, #'v1_0.error'{} = Err} ->
2391-
Disposition = released(DeliveryId),
2392-
Detach = detach(HandleInt, Link0, Err),
2393-
{error, [Disposition, Detach]}
2391+
Disposition = case Settled of
2392+
true -> [];
2393+
false -> [released(DeliveryId)]
2394+
end,
2395+
Detach = [detach(HandleInt, Link0, Err)],
2396+
{error, Disposition ++ Detach};
2397+
{error, anonymous_terminus, #'v1_0.error'{} = Err} ->
2398+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
2399+
case Settled of
2400+
true ->
2401+
Info = {map, [{{symbol, <<"delivery-tag">>}, DeliveryTag}]},
2402+
Err1 = Err#'v1_0.error'{info = Info},
2403+
Detach = detach(HandleInt, Link0, Err1),
2404+
{error, [Detach]};
2405+
false ->
2406+
Disposition = rejected(DeliveryId, Err),
2407+
DeliveryCount = add(DeliveryCount0, 1),
2408+
Credit1 = Credit0 - 1,
2409+
{Credit, Reply0} = maybe_grant_link_credit(
2410+
Credit1, MaxLinkCredit,
2411+
DeliveryCount, map_size(U0), Handle),
2412+
Reply = [Disposition | Reply0],
2413+
Link = Link0#incoming_link{
2414+
delivery_count = DeliveryCount,
2415+
credit = Credit,
2416+
multi_transfer_msg = undefined},
2417+
{ok, Reply, Link, State0}
2418+
end
23942419
end.
23952420

23962421
lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) ->
@@ -2414,16 +2439,16 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
24142439
check_internal_exchange(X),
24152440
lookup_routing_key(X, RKey, Mc, PermCache);
24162441
{error, not_found} ->
2417-
{error, error_not_found(XName)}
2442+
{error, anonymous_terminus, error_not_found(XName)}
24182443
end;
24192444
{error, bad_address} ->
2420-
{error,
2445+
{error, anonymous_terminus,
24212446
#'v1_0.error'{
24222447
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
24232448
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
24242449
end;
24252450
undefined ->
2426-
{error,
2451+
{error, anonymous_terminus,
24272452
#'v1_0.error'{
24282453
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
24292454
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
@@ -2467,6 +2492,12 @@ released(DeliveryId) ->
24672492
settled = true,
24682493
state = #'v1_0.released'{}}.
24692494

2495+
rejected(DeliveryId, Error) ->
2496+
#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
2497+
first = ?UINT(DeliveryId),
2498+
settled = true,
2499+
state = #'v1_0.rejected'{error = Error}}.
2500+
24702501
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
24712502
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
24722503
true ->

deps/rabbit/test/amqp_address_SUITE.erl

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ common_tests() ->
5454
target_per_message_queue,
5555
target_per_message_unset_to_address,
5656
target_per_message_bad_to_address,
57-
target_per_message_exchange_absent,
57+
target_per_message_exchange_absent_settled,
58+
target_per_message_exchange_absent_unsettled,
5859
target_bad_address,
5960
source_bad_address
6061
].
@@ -393,16 +394,15 @@ target_per_message_unset_to_address(Config) ->
393394
%% Send message with 'to' unset.
394395
DTag = <<1>>,
395396
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<0>>)),
396-
ok = wait_for_settled(released, DTag),
397-
receive {amqp10_event,
398-
{link, Sender,
399-
{detached,
400-
#'v1_0.error'{
401-
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
402-
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok
403-
after 5000 -> ct:fail("server did not close our outgoing link")
397+
ExpectedError = #'v1_0.error'{
398+
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
399+
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}},
400+
ok = wait_for_settled({rejected, ExpectedError}, DTag),
401+
402+
ok = amqp10_client:detach_link(Sender),
403+
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
404+
after 5000 -> ct:fail({missing_event, ?LINE})
404405
end,
405-
406406
ok = amqp10_client:end_session(Session),
407407
ok = amqp10_client:close_connection(Connection).
408408

@@ -449,34 +449,32 @@ bad_v2_addresses() ->
449449

450450
%% Test v2 target address 'null' with an invalid 'to' addresses.
451451
target_per_message_bad_to_address(Config) ->
452-
lists:foreach(fun(Addr) ->
453-
ok = target_per_message_bad_to_address0(Addr, Config)
454-
end, bad_v2_addresses()).
455-
456-
target_per_message_bad_to_address0(Address, Config) ->
457452
OpnConf = connection_config(Config),
458453
{ok, Connection} = amqp10_client:open_connection(OpnConf),
459454
{ok, Session} = amqp10_client:begin_session_sync(Connection),
460455
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
461456
ok = wait_for_credit(Sender),
462457

463-
DTag = <<255>>,
464-
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, <<0>>)),
465-
ok = amqp10_client:send_msg(Sender, Msg),
466-
ok = wait_for_settled(released, DTag),
467-
receive {amqp10_event,
468-
{link, Sender,
469-
{detached,
470-
#'v1_0.error'{
471-
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
472-
description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok
473-
after 5000 -> ct:fail("server did not close our outgoing link")
474-
end,
458+
lists:foreach(
459+
fun(Addr) ->
460+
DTag = <<"some delivery tag">>,
461+
Msg = amqp10_msg:set_properties(#{to => Addr}, amqp10_msg:new(DTag, <<0>>, false)),
462+
ok = amqp10_client:send_msg(Sender, Msg),
463+
receive
464+
{amqp10_disposition, {{rejected, Error}, DTag}} ->
465+
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
466+
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
467+
Error)
468+
after 5000 ->
469+
flush(missing_disposition),
470+
ct:fail(missing_disposition)
471+
end
472+
end, bad_v2_addresses()),
475473

476474
ok = amqp10_client:end_session(Session),
477475
ok = amqp10_client:close_connection(Connection).
478476

479-
target_per_message_exchange_absent(Config) ->
477+
target_per_message_exchange_absent_settled(Config) ->
480478
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
481479
XName = <<"🎈"/utf8>>,
482480
Address = rabbitmq_amqp_address:exchange(XName),
@@ -492,20 +490,59 @@ target_per_message_exchange_absent(Config) ->
492490
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),
493491

494492
DTag2 = <<2>>,
495-
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>)),
493+
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>, true)),
496494
ok = amqp10_client:send_msg(Sender, Msg2),
497-
ok = wait_for_settled(released, DTag2),
495+
496+
%% "the routing node MUST detach the link over which the message was sent with an error.
497+
%% [...] Additionally the info field of error MUST contain an entry with symbolic key delivery-tag
498+
%% and binary value of the delivery-tag of the message which caused the failure."
499+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
498500
receive {amqp10_event, {link, Sender, {detached, Error}}} ->
499501
?assertEqual(
500502
#'v1_0.error'{
501503
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
502-
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
504+
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>},
505+
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
506+
},
503507
Error)
504508
after 5000 -> ct:fail("server did not close our outgoing link")
505509
end,
506510

507511
ok = cleanup(Init).
508512

513+
target_per_message_exchange_absent_unsettled(Config) ->
514+
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
515+
XName = <<"🎈"/utf8>>,
516+
Address = rabbitmq_amqp_address:exchange(XName),
517+
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
518+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
519+
ok = wait_for_credit(Sender),
520+
521+
DTag1 = <<"my tag">>,
522+
Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"hey">>)),
523+
ok = amqp10_client:send_msg(Sender, Msg1),
524+
ok = wait_for_settled(released, DTag1),
525+
526+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),
527+
528+
%% "If the source of the link supports the rejected outcome, and the message has not
529+
%% already been settled by the sender, then the routing node MUST reject the message.
530+
%% In this case the error field of rejected MUST contain the error which would have been communicated
531+
%% in the detach which would have be sent if a link to the same address had been attempted."
532+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
533+
%% We test here multiple rejections implicilty checking that link flow control works correctly.
534+
ExpectedError = #'v1_0.error'{
535+
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
536+
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
537+
[begin
538+
DTag = Body = integer_to_binary(N),
539+
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, Body, false)),
540+
ok = amqp10_client:send_msg(Sender, Msg),
541+
ok = wait_for_settled({rejected, ExpectedError}, DTag)
542+
end || N <- lists:seq(1, 300)],
543+
544+
ok = cleanup(Init).
545+
509546
target_bad_address(Config) ->
510547
%% bad v1 and bad v2 target address
511548
TargetAddr = <<"/qqq/🎈"/utf8>>,

deps/rabbit/test/amqp_auth_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ target_per_message_internal_exchange(Config) ->
530530
ExpectedErr = error_unauthorized(
531531
<<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>),
532532
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
533-
after 5000 -> flush(aaa),
533+
after 5000 -> flush(missing_event),
534534
ct:fail({missing_event, ?LINE})
535535
end,
536536
ok = close_connection_sync(Conn1),

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ groups() ->
6262
server_closes_link_classic_queue,
6363
server_closes_link_quorum_queue,
6464
server_closes_link_stream,
65-
server_closes_link_exchange,
65+
server_closes_link_exchange_settled,
66+
server_closes_link_exchange_unsettled,
6667
link_target_classic_queue_deleted,
6768
link_target_quorum_queue_deleted,
6869
target_queues_deleted_accepted,
@@ -1513,7 +1514,13 @@ server_closes_link(QType, Config) ->
15131514
ok = end_session_sync(Session),
15141515
ok = amqp10_client:close_connection(Connection).
15151516

1516-
server_closes_link_exchange(Config) ->
1517+
server_closes_link_exchange_settled(Config) ->
1518+
server_closes_link_exchange(true, Config).
1519+
1520+
server_closes_link_exchange_unsettled(Config) ->
1521+
server_closes_link_exchange(false, Config).
1522+
1523+
server_closes_link_exchange(Settled, Config) ->
15171524
XName = atom_to_binary(?FUNCTION_NAME),
15181525
QName = <<"my queue">>,
15191526
RoutingKey = <<"my routing key">>,
@@ -1543,8 +1550,13 @@ server_closes_link_exchange(Config) ->
15431550
%% When we publish the next message, we expect:
15441551
%% 1. that the message is released because the exchange doesn't exist anymore, and
15451552
DTag2 = <<255>>,
1546-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
1547-
ok = wait_for_settlement(DTag2, released),
1553+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, Settled)),
1554+
case Settled of
1555+
true ->
1556+
ok;
1557+
false ->
1558+
ok = wait_for_settlement(DTag2, released)
1559+
end,
15481560
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
15491561
receive {amqp10_event,
15501562
{link, Sender,
@@ -5992,7 +6004,7 @@ assert_messages(QNameBin, NumTotalMsgs, NumUnackedMsgs, Config, Node) ->
59926004
Infos = rpc(Config, Node, rabbit_amqqueue, info, [Q, [messages, messages_unacknowledged]]),
59936005
lists:sort(Infos)
59946006
end
5995-
), 500, 5).
6007+
), 500, 10).
59966008

59976009
serial_number_increment(S) ->
59986010
case S + 1 of

0 commit comments

Comments
 (0)