Skip to content

Commit 36da693

Browse files
committed
Handle undefined msg ID from queue
Makes the following tests green: ``` make -C deps/amqp10_client/ ct-system ``` Classic queues send an 'undefined' message ID to the channel when no ack is required. No ack is required when the send settle mode is settled. In this case it should be perfectly valid to always send the same (empty binary) delivery-tag from server to client.
1 parent 12cffa4 commit 36da693

File tree

3 files changed

+35
-28
lines changed

3 files changed

+35
-28
lines changed

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,10 @@
1414

1515
-include("src/amqp10_client.hrl").
1616

17-
-compile(export_all).
17+
-compile([export_all, nowarn_export_all]).
1818

19-
-define(UNAUTHORIZED_USER, <<"test_user_no_perm">>).
20-
21-
%% The latch constant defines how many processes are spawned in order
22-
%% to run certain functionality in parallel. It follows the standard
23-
%% countdown latch pattern.
24-
-define(LATCH, 100).
25-
26-
%% The wait constant defines how long a consumer waits before it
27-
%% unsubscribes
28-
-define(WAIT, 200).
29-
30-
%% How to long wait for a process to die after an expected failure
31-
-define(PROCESS_EXIT_TIMEOUT, 5000).
19+
suite() ->
20+
[{timetrap, {seconds, 120}}].
3221

3322
all() ->
3423
[
@@ -344,7 +333,7 @@ roundtrip(OpenConf, Body) ->
344333
<<"test1">>,
345334
settled,
346335
unsettled_state),
347-
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60000 * 5),
336+
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60_000 * 4),
348337
ok = amqp10_client:end_session(Session),
349338
ok = amqp10_client:close_connection(Connection),
350339
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
@@ -379,7 +368,7 @@ filtered_roundtrip(OpenConf, Body) ->
379368
settled,
380369
unsettled_state),
381370
ok = amqp10_client:send_msg(Sender, Msg1),
382-
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
371+
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
383372
?assertEqual(<<"msg-1-tag">>, amqp10_msg:delivery_tag(OutMsg1)),
384373

385374
timer:sleep(5 * 1000),
@@ -398,10 +387,10 @@ filtered_roundtrip(OpenConf, Body) ->
398387
unsettled_state,
399388
#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}),
400389

401-
{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
390+
{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
402391
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)),
403392

404-
{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60000 * 5),
393+
{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60_000 * 4),
405394
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsgFiltered)),
406395

407396
ok = amqp10_client:end_session(Session),
@@ -676,11 +665,13 @@ incoming_heartbeat(Config) ->
676665
idle_time_out => 1000, notify => self()},
677666
{ok, Connection} = amqp10_client:open_connection(CConf),
678667
receive
679-
{amqp10_event, {connection, Connection,
680-
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} ->
668+
{amqp10_event,
669+
{connection, Connection0,
670+
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}}
671+
when Connection0 =:= Connection ->
681672
ok
682673
after 5000 ->
683-
exit(incoming_heartbeat_assert)
674+
exit(incoming_heartbeat_assert)
684675
end,
685676
demonitor(MockRef).
686677

@@ -704,25 +695,30 @@ publish_messages(Sender, Data, Num) ->
704695

705696
receive_one(Receiver) ->
706697
receive
707-
{amqp10_msg, Receiver, Msg} ->
698+
{amqp10_msg, Receiver0, Msg}
699+
when Receiver0 =:= Receiver ->
708700
amqp10_client:accept_msg(Receiver, Msg)
709701
after 2000 ->
710702
timeout
711703
end.
712704

713705
await_disposition(DeliveryTag) ->
714706
receive
715-
{amqp10_disposition, {accepted, DeliveryTag}} -> ok
707+
{amqp10_disposition, {accepted, DeliveryTag0}}
708+
when DeliveryTag0 =:= DeliveryTag -> ok
716709
after 3000 ->
717710
flush(),
718711
exit(dispostion_timeout)
719712
end.
720713

721714
await_link(Who, What, Err) ->
722715
receive
723-
{amqp10_event, {link, Who, What}} ->
716+
{amqp10_event, {link, Who0, What0}}
717+
when Who0 =:= Who andalso
718+
What0 =:= What ->
724719
ok;
725-
{amqp10_event, {link, Who, {detached, Why}}} ->
720+
{amqp10_event, {link, Who0, {detached, Why}}}
721+
when Who0 =:= Who ->
726722
exit(Why)
727723
after 5000 ->
728724
flush(),

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
9797
-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(),
9898
boolean(), mc:state()}.
99-
-type msg_id() :: non_neg_integer().
99+
-type msg_id() :: rabbit_types:option(non_neg_integer()).
100100
-type ok_or_errors() ::
101101
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
102102
-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -952,11 +952,22 @@ handle_deliver(ConsumerTag, AckRequired,
952952
rabbit_log:warning("Delivery to non-existent consumer ~tp", [ConsumerTag]),
953953
State0;
954954
#outgoing_link{send_settled = SendSettled} ->
955+
%% "The delivery-tag MUST be unique amongst all deliveries that could be
956+
%% considered unsettled by either end of the link." [2.6.12]
957+
Dtag = if is_integer(MsgId) ->
958+
%% delivery-tag must be unique only per link (not per session)
959+
<<MsgId:64>>;
960+
MsgId =:= undefined ->
961+
%% Both ends of the link will always consider this message settled because
962+
%% "the sender will send all deliveries settled to the receiver" [3.8.2].
963+
%% Hence, the delivery tag does not have to be unique on this link.
964+
%% However, the spec still mandates to send a delivery tag.
965+
<<>>
966+
end,
955967
Transfer = #'v1_0.transfer'{
956968
handle = Handle,
957969
delivery_id = {uint, DeliveryId},
958-
%% delivery-tag must be unique only per link (not per session)
959-
delivery_tag = {binary, <<MsgId:64>>},
970+
delivery_tag = {binary, Dtag},
960971
%% [3.2.16]
961972
message_format = {uint, 0},
962973
settled = SendSettled,

0 commit comments

Comments
 (0)