Skip to content

Commit b5a0b17

Browse files
committed
Fix message IDs settlement order
## What? This commit fixes issues that were present only on `main` branch and were introduced by #9022. 1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`) expect message IDs to be (n)acked in the order as they were delivered to the channel / session proc. Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5` was wrong causing not all messages to be acked adding a regression to also AMQP 0.9.1. 2. The order in which the session proc requeues or rejects multiple message IDs at once is important. For example, if the client sends a DISPOSITION with first=3 and last=5, the message IDs corresponding to delivery IDs 3,4,5 must be requeued or rejected in exactly that order. For example, quorum queues use this order of message IDs in https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_fifo.erl#L226-L234 to dead letter in that order. ## How? The session proc will settle (internal queue) message IDs in ascending (AMQP) delivery ID order, i.e. in the order messages were sent to the client and in the order messages were settled by the client. This commit chooses to keep the session's outgoing_unsettled_map map data structure. An alternative would have been to use a queue or lqueue for the outgoing_unsettled_map as done in * https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_channel.erl#L135 * https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_queue_consumers.erl#L43 Whether a queue (as done by `rabbit_channel`) or a map (as done by `rabbit_amqp_session`) performs better depends on the pattern how clients ack messages. A queue will likely perform good enough because usually the oldest delivered messages will be acked first. However, given that there can be many different consumers on an AQMP 0.9.1 channel or AMQP 1.0 session, this commit favours a map because it will likely generate less garbage and is very efficient when for example a new single message (or few new messages) gets acked while many (older) messages are still checked out by the session (but by possibly different AMQP 1.0 receivers).
1 parent 34d3f94 commit b5a0b17

10 files changed

+642
-16
lines changed

deps/amqp10_common/src/serial_number.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,7 @@ compare(A, B) ->
5959
[serial_number()].
6060
usort(L) ->
6161
lists:usort(fun(A, B) ->
62-
case compare(A, B) of
63-
greater -> false;
64-
_ -> true
65-
end
62+
compare(A, B) =/= greater
6663
end, L).
6764

6865
%% Takes a list of serial numbers and returns tuples

deps/rabbit/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,10 @@ rabbitmq_integration_suite(
402402
shard_count = 6,
403403
)
404404

405+
rabbitmq_integration_suite(
406+
name = "amqpl_consumer_ack_SUITE",
407+
)
408+
405409
rabbitmq_integration_suite(
406410
name = "message_containers_deaths_v2_SUITE",
407411
size = "medium",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,3 +2134,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21342134
erlc_opts = "//:test_erlc_opts",
21352135
deps = ["//deps/amqp_client:erlang_app"],
21362136
)
2137+
erlang_bytecode(
2138+
name = "amqpl_consumer_ack_SUITE_beam_files",
2139+
testonly = True,
2140+
srcs = ["test/amqpl_consumer_ack_SUITE.erl"],
2141+
outs = ["test/amqpl_consumer_ack_SUITE.beam"],
2142+
app_name = "rabbit",
2143+
erlc_opts = "//:test_erlc_opts",
2144+
deps = ["//deps/amqp_client:erlang_app"],
2145+
)

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,9 +1307,13 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
13071307
case DispositionRangeSize =< UnsettledMapSize of
13081308
true ->
13091309
%% It is cheaper to iterate over the range of settled delivery IDs.
1310-
serial_number:foldl(fun settle_delivery_id/2, {#{}, UnsettledMap0}, First, Last);
1310+
serial_number:foldl(fun settle_delivery_id/2,
1311+
{#{}, UnsettledMap0},
1312+
First, Last);
13111313
false ->
13121314
%% It is cheaper to iterate over the outgoing unsettled map.
1315+
Iter = maps:iterator(UnsettledMap0,
1316+
fun(A, B) -> compare(A, B) =/= greater end),
13131317
{Settled0, UnsettledList} =
13141318
maps:fold(
13151319
fun (DeliveryId,
@@ -1329,14 +1333,15 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
13291333
{SettledAcc, [{DeliveryId, Unsettled} | UnsettledAcc]}
13301334
end
13311335
end,
1332-
{#{}, []}, UnsettledMap0),
1336+
{#{}, []}, Iter),
13331337
{Settled0, maps:from_list(UnsettledList)}
13341338
end,
13351339

13361340
SettleOp = settle_op_from_outcome(Outcome),
13371341
{QStates, Actions} =
13381342
maps:fold(
1339-
fun({QName, Ctag}, MsgIds, {QS0, ActionsAcc}) ->
1343+
fun({QName, Ctag}, MsgIdsRev, {QS0, ActionsAcc}) ->
1344+
MsgIds = lists:reverse(MsgIdsRev),
13401345
case rabbit_queue_type:settle(QName, SettleOp, Ctag, MsgIds, QS0) of
13411346
{ok, QS, Actions0} ->
13421347
messages_acknowledged(SettleOp, QName, QS, MsgIds),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1972,7 +1972,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
19721972
end.
19731973

19741974
%% Settles (acknowledges) messages at the queue replica process level.
1975-
%% This happens in the youngest-first order (ascending by delivery tag).
1975+
%% This happens in the oldest-first order (ascending by delivery tag).
19761976
settle_acks(Acks, State = #ch{queue_states = QueueStates0}) ->
19771977
{QueueStates, Actions} =
19781978
foreach_per_queue(

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,7 @@ cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
275275
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
276276
rabbit_types:ctag(), [non_neg_integer()], state()) ->
277277
{state(), rabbit_queue_type:actions()}.
278-
settle(_QName, Op, _CTag, MsgIds0, State = #?STATE{pid = Pid}) ->
279-
%% Classic queues expect message IDs in sorted order.
280-
MsgIds = lists:usort(MsgIds0),
278+
settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
281279
Arg = case Op of
282280
complete ->
283281
{ack, MsgIds, self()};

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
%% channel record
4141
-record(cr, {ch_pid,
4242
monitor_ref,
43-
acktags,
44-
consumer_count,
43+
acktags :: ?QUEUE:?QUEUE({ack(), rabbit_types:ctag() | none}),
44+
consumer_count :: non_neg_integer(),
4545
%% Queue of {ChPid, #consumer{}} for consumers which have
4646
%% been blocked (rate/prefetch limited) for any reason
4747
blocked_consumers,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 225 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ groups() ->
108108
classic_priority_queue,
109109
dead_letter_headers_exchange,
110110
dead_letter_reject,
111+
dead_letter_reject_message_order_classic_queue,
112+
dead_letter_reject_message_order_quorum_queue,
113+
accept_multiple_message_order_classic_queue,
114+
accept_multiple_message_order_quorum_queue,
115+
release_multiple_message_order_classic_queue,
116+
release_multiple_message_order_quorum_queue,
111117
immutable_bare_message,
112118
receive_many_made_available_over_time_classic_queue,
113119
receive_many_made_available_over_time_quorum_queue,
@@ -158,7 +164,10 @@ groups() ->
158164
init_per_suite(Config) ->
159165
{ok, _} = application:ensure_all_started(amqp10_client),
160166
rabbit_ct_helpers:log_environment(),
161-
Config.
167+
rabbit_ct_helpers:merge_app_env(
168+
Config, {rabbit, [{quorum_tick_interval, 1000},
169+
{stream_tick_interval, 1000}
170+
]}).
162171

163172
end_per_suite(Config) ->
164173
Config.
@@ -3747,6 +3756,97 @@ dead_letter_reject(Config) ->
37473756
ok = end_session_sync(Session),
37483757
ok = amqp10_client:close_connection(Connection).
37493758

3759+
dead_letter_reject_message_order_classic_queue(Config) ->
3760+
dead_letter_reject_message_order(<<"classic">>, Config).
3761+
3762+
dead_letter_reject_message_order_quorum_queue(Config) ->
3763+
dead_letter_reject_message_order(<<"quorum">>, Config).
3764+
3765+
dead_letter_reject_message_order(QType, Config) ->
3766+
{Connection, Session, LinkPair} = init(Config),
3767+
QName1 = <<"q1">>,
3768+
QName2 = <<"q2">>,
3769+
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(
3770+
LinkPair,
3771+
QName1,
3772+
#{arguments => #{<<"x-queue-type">> => {utf8, QType},
3773+
<<"x-dead-letter-exchange">> => {utf8, <<>>},
3774+
<<"x-dead-letter-routing-key">> => {utf8, QName2}
3775+
}}),
3776+
%% We don't care about the target dead letter queue type.
3777+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}),
3778+
3779+
{ok, Sender} = amqp10_client:attach_sender_link(
3780+
Session, <<"sender">>, <<"/queue/", QName1/binary>>, unsettled),
3781+
wait_for_credit(Sender),
3782+
{ok, Receiver1} = amqp10_client:attach_receiver_link(
3783+
Session, <<"receiver 1">>, <<"/queue/", QName1/binary>>, unsettled),
3784+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
3785+
Session, <<"receiver 2">>, <<"/queue/", QName2/binary>>, unsettled),
3786+
3787+
[begin
3788+
Bin = integer_to_binary(N),
3789+
Msg = amqp10_msg:new(Bin, Bin, true),
3790+
ok = amqp10_client:send_msg(Sender, Msg)
3791+
end || N <- lists:seq(1, 5)],
3792+
3793+
{ok, Msg1} = amqp10_client:get_msg(Receiver1),
3794+
{ok, Msg2} = amqp10_client:get_msg(Receiver1),
3795+
{ok, _Msg3} = amqp10_client:get_msg(Receiver1),
3796+
{ok, Msg4} = amqp10_client:get_msg(Receiver1),
3797+
{ok, Msg5} = amqp10_client:get_msg(Receiver1),
3798+
assert_messages(QName1, 5, 5, Config),
3799+
3800+
%% Reject messages in the following order: 2, 3, 4, 1, 5
3801+
ok = amqp10_client_session:disposition(
3802+
Receiver1,
3803+
amqp10_msg:delivery_id(Msg2),
3804+
amqp10_msg:delivery_id(Msg4),
3805+
true,
3806+
rejected),
3807+
ok = amqp10_client_session:disposition(
3808+
Receiver1,
3809+
amqp10_msg:delivery_id(Msg1),
3810+
amqp10_msg:delivery_id(Msg5),
3811+
true,
3812+
rejected),
3813+
3814+
assert_messages(QName1, 0, 0, Config),
3815+
%% All 5 messages should be in the dead letter queue.
3816+
assert_messages(QName2, 5, 0, Config),
3817+
3818+
{ok, MsgDead2} = amqp10_client:get_msg(Receiver2),
3819+
{ok, MsgDead3} = amqp10_client:get_msg(Receiver2),
3820+
{ok, MsgDead4} = amqp10_client:get_msg(Receiver2),
3821+
{ok, MsgDead1} = amqp10_client:get_msg(Receiver2),
3822+
{ok, MsgDead5} = amqp10_client:get_msg(Receiver2),
3823+
assert_messages(QName2, 5, 5, Config),
3824+
3825+
%% Messages should be dead lettered in the order we rejected.
3826+
?assertEqual(<<"2">>, amqp10_msg:body_bin(MsgDead2)),
3827+
?assertEqual(<<"3">>, amqp10_msg:body_bin(MsgDead3)),
3828+
?assertEqual(<<"4">>, amqp10_msg:body_bin(MsgDead4)),
3829+
?assertEqual(<<"1">>, amqp10_msg:body_bin(MsgDead1)),
3830+
?assertEqual(<<"5">>, amqp10_msg:body_bin(MsgDead5)),
3831+
3832+
%% Accept all messages in the dead letter queue.
3833+
ok = amqp10_client_session:disposition(
3834+
Receiver2,
3835+
amqp10_msg:delivery_id(MsgDead2),
3836+
amqp10_msg:delivery_id(MsgDead5),
3837+
true,
3838+
accepted),
3839+
assert_messages(QName2, 0, 0, Config),
3840+
3841+
ok = amqp10_client:detach_link(Receiver1),
3842+
ok = amqp10_client:detach_link(Receiver2),
3843+
ok = amqp10_client:detach_link(Sender),
3844+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1),
3845+
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2),
3846+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
3847+
ok = end_session_sync(Session),
3848+
ok = amqp10_client:close_connection(Connection).
3849+
37503850
%% Dead letter from a quorum queue into a stream.
37513851
dead_letter_into_stream(Config) ->
37523852
{Connection0, Session0, LinkPair0} = init(0, Config),
@@ -3820,6 +3920,124 @@ dead_letter_into_stream(Config) ->
38203920
ok = amqp10_client:close_connection(Connection0),
38213921
ok = amqp10_client:close_connection(Connection1).
38223922

3923+
accept_multiple_message_order_classic_queue(Config) ->
3924+
accept_multiple_message_order(<<"classic">>, Config).
3925+
3926+
accept_multiple_message_order_quorum_queue(Config) ->
3927+
accept_multiple_message_order(<<"quorum">>, Config).
3928+
3929+
accept_multiple_message_order(QType, Config) ->
3930+
QName = atom_to_binary(?FUNCTION_NAME),
3931+
Address = <<"/queue/", QName/binary>>,
3932+
3933+
{Connection, Session, LinkPair} = init(Config),
3934+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
3935+
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
3936+
3937+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
3938+
ok = wait_for_credit(Sender),
3939+
[begin
3940+
Bin = integer_to_binary(N),
3941+
Msg = amqp10_msg:new(Bin, Bin, true),
3942+
ok = amqp10_client:send_msg(Sender, Msg)
3943+
end || N <- lists:seq(1, 5)],
3944+
ok = amqp10_client:detach_link(Sender),
3945+
assert_messages(QName, 5, 0, Config),
3946+
3947+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, QName, unsettled),
3948+
{ok, Msg1} = amqp10_client:get_msg(Receiver),
3949+
{ok, Msg2} = amqp10_client:get_msg(Receiver),
3950+
{ok, _Msg3} = amqp10_client:get_msg(Receiver),
3951+
{ok, Msg4} = amqp10_client:get_msg(Receiver),
3952+
{ok, Msg5} = amqp10_client:get_msg(Receiver),
3953+
assert_messages(QName, 5, 5, Config),
3954+
3955+
%% Accept messages out of order.
3956+
ok = amqp10_client_session:disposition(
3957+
Receiver,
3958+
amqp10_msg:delivery_id(Msg2),
3959+
amqp10_msg:delivery_id(Msg4),
3960+
true,
3961+
accepted),
3962+
assert_messages(QName, 2, 2, Config),
3963+
3964+
ok = amqp10_client:accept_msg(Receiver, Msg5),
3965+
assert_messages(QName, 1, 1, Config),
3966+
3967+
ok = amqp10_client:accept_msg(Receiver, Msg1),
3968+
assert_messages(QName, 0, 0, Config),
3969+
3970+
ok = amqp10_client:detach_link(Receiver),
3971+
?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
3972+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
3973+
ok = end_session_sync(Session),
3974+
ok = amqp10_client:close_connection(Connection).
3975+
3976+
release_multiple_message_order_classic_queue(Config) ->
3977+
release_multiple_message_order(<<"classic">>, Config).
3978+
3979+
release_multiple_message_order_quorum_queue(Config) ->
3980+
release_multiple_message_order(<<"quorum">>, Config).
3981+
3982+
release_multiple_message_order(QType, Config) ->
3983+
QName = atom_to_binary(?FUNCTION_NAME),
3984+
Address = <<"/queue/", QName/binary>>,
3985+
3986+
{Connection, Session, LinkPair} = init(Config),
3987+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
3988+
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
3989+
3990+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, settled),
3991+
ok = wait_for_credit(Sender),
3992+
[begin
3993+
Bin = integer_to_binary(N),
3994+
Msg = amqp10_msg:new(Bin, Bin, true),
3995+
ok = amqp10_client:send_msg(Sender, Msg)
3996+
end || N <- lists:seq(1, 4)],
3997+
ok = amqp10_client:detach_link(Sender),
3998+
assert_messages(QName, 4, 0, Config),
3999+
4000+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, QName, unsettled),
4001+
{ok, Msg1} = amqp10_client:get_msg(Receiver),
4002+
{ok, Msg2} = amqp10_client:get_msg(Receiver),
4003+
{ok, Msg3} = amqp10_client:get_msg(Receiver),
4004+
{ok, Msg4} = amqp10_client:get_msg(Receiver),
4005+
assert_messages(QName, 4, 4, Config),
4006+
4007+
%% Release messages out of order.
4008+
ok = amqp10_client_session:disposition(
4009+
Receiver,
4010+
amqp10_msg:delivery_id(Msg2),
4011+
amqp10_msg:delivery_id(Msg3),
4012+
true,
4013+
released),
4014+
%% Both messages should be requeued and redelivered.
4015+
assert_messages(QName, 4, 2, Config),
4016+
4017+
{ok, Msg2b} = amqp10_client:get_msg(Receiver),
4018+
{ok, Msg3b} = amqp10_client:get_msg(Receiver),
4019+
assert_messages(QName, 4, 4, Config),
4020+
?assertEqual([<<"2">>], amqp10_msg:body(Msg2b)),
4021+
?assertEqual([<<"3">>], amqp10_msg:body(Msg3b)),
4022+
4023+
ok = amqp10_client_session:disposition(
4024+
Receiver,
4025+
amqp10_msg:delivery_id(Msg4),
4026+
amqp10_msg:delivery_id(Msg3b),
4027+
true,
4028+
accepted),
4029+
assert_messages(QName, 1, 1, Config),
4030+
4031+
ok = amqp10_client:accept_msg(Receiver, Msg1),
4032+
assert_messages(QName, 0, 0, Config),
4033+
4034+
ok = amqp10_client:detach_link(Receiver),
4035+
?assertMatch({ok, #{message_count := 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
4036+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
4037+
ok = end_session_sync(Session),
4038+
ok = amqp10_client:close_connection(Connection).
4039+
4040+
38234041
%% This test asserts the following §3.2 requirement:
38244042
%% "The bare message is immutable within the AMQP network. That is, none of the sections can be
38254043
%% changed by any node acting as an AMQP intermediary. If a section of the bare message is
@@ -4359,6 +4577,7 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
43594577
ok = end_session_sync(Session),
43604578
ok = amqp10_client:close_connection(Connection).
43614579

4580+
43624581
%% internal
43634582
%%
43644583

@@ -4592,13 +4811,16 @@ send_messages_with_group_id(Sender, N, GroupId) ->
45924811
end || I <- lists:seq(1, N)].
45934812

45944813
assert_messages(QNameBin, NumTotalMsgs, NumUnackedMsgs, Config) ->
4814+
assert_messages(QNameBin, NumTotalMsgs, NumUnackedMsgs, Config, 0).
4815+
4816+
assert_messages(QNameBin, NumTotalMsgs, NumUnackedMsgs, Config, Node) ->
45954817
Vhost = ?config(rmq_vhost, Config),
45964818
eventually(
45974819
?_assertEqual(
45984820
lists:sort([{messages, NumTotalMsgs}, {messages_unacknowledged, NumUnackedMsgs}]),
45994821
begin
4600-
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [QNameBin, Vhost]),
4601-
Infos = rpc(Config, rabbit_amqqueue, info, [Q, [messages, messages_unacknowledged]]),
4822+
{ok, Q} = rpc(Config, Node, rabbit_amqqueue, lookup, [QNameBin, Vhost]),
4823+
Infos = rpc(Config, Node, rabbit_amqqueue, info, [Q, [messages, messages_unacknowledged]]),
46024824
lists:sort(Infos)
46034825
end
46044826
), 500, 5).

0 commit comments

Comments
 (0)