Skip to content

Commit dcb2fe0

Browse files
committed
Fix message IDs settlement order
## What? This commit fixes issues that were present only on `main` branch and were introduced by #9022. 1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`) expect message IDs to be (n)acked in the order as they were delivered to the channel / session proc. Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5` was wrong causing not all messages to be acked adding a regression to also AMQP 0.9.1. 2. The order in which the session proc requeues or rejects multiple message IDs at once is important. For example, if the client sends a DISPOSITION with first=3 and last=5, the message IDs corresponding to delivery IDs 3,4,5 must be requeued or rejected in exactly that order. For example, quorum queues use this order of message IDs in https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_fifo.erl#L226-L234 to dead letter in that order. ## How? The session proc will settle (internal) message IDs to queues in ascending (AMQP) delivery ID order, i.e. in the order messages were sent to the client and in the order messages were settled by the client. This commit chooses to keep the session's outgoing_unsettled_map map data structure. An alternative would have been to use a queue or lqueue for the outgoing_unsettled_map as done in * https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_channel.erl#L135 * https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_queue_consumers.erl#L43 Whether a queue (as done by `rabbit_channel`) or a map (as done by `rabbit_amqp_session`) performs better depends on the pattern how clients ack messages. A queue will likely perform good enough because usually the oldest delivered messages will be acked first. However, given that there can be many different consumers on an AQMP 0.9.1 channel or AMQP 1.0 session, this commit favours a map because it will likely generate less garbage and is very efficient when for example a single new message (or few new messages) gets acked while many (older) messages are still checked out by the session (but by possibly different AMQP 1.0 receivers).
1 parent 4586236 commit dcb2fe0

10 files changed

+733
-16
lines changed

deps/amqp10_common/src/serial_number.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,7 @@ compare(A, B) ->
5959
[serial_number()].
6060
usort(L) ->
6161
lists:usort(fun(A, B) ->
62-
case compare(A, B) of
63-
greater -> false;
64-
_ -> true
65-
end
62+
compare(A, B) =/= greater
6663
end, L).
6764

6865
%% Takes a list of serial numbers and returns tuples

deps/rabbit/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,10 @@ rabbitmq_integration_suite(
402402
shard_count = 6,
403403
)
404404

405+
rabbitmq_integration_suite(
406+
name = "amqpl_consumer_ack_SUITE",
407+
)
408+
405409
rabbitmq_integration_suite(
406410
name = "message_containers_deaths_v2_SUITE",
407411
size = "medium",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,3 +2134,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21342134
erlc_opts = "//:test_erlc_opts",
21352135
deps = ["//deps/amqp_client:erlang_app"],
21362136
)
2137+
erlang_bytecode(
2138+
name = "amqpl_consumer_ack_SUITE_beam_files",
2139+
testonly = True,
2140+
srcs = ["test/amqpl_consumer_ack_SUITE.erl"],
2141+
outs = ["test/amqpl_consumer_ack_SUITE.beam"],
2142+
app_name = "rabbit",
2143+
erlc_opts = "//:test_erlc_opts",
2144+
deps = ["//deps/amqp_client:erlang_app"],
2145+
)

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,9 +1307,13 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
13071307
case DispositionRangeSize =< UnsettledMapSize of
13081308
true ->
13091309
%% It is cheaper to iterate over the range of settled delivery IDs.
1310-
serial_number:foldl(fun settle_delivery_id/2, {#{}, UnsettledMap0}, First, Last);
1310+
serial_number:foldl(fun settle_delivery_id/2,
1311+
{#{}, UnsettledMap0},
1312+
First, Last);
13111313
false ->
13121314
%% It is cheaper to iterate over the outgoing unsettled map.
1315+
Iter = maps:iterator(UnsettledMap0,
1316+
fun(D1, D2) -> compare(D1, D2) =/= greater end),
13131317
{Settled0, UnsettledList} =
13141318
maps:fold(
13151319
fun (DeliveryId,
@@ -1329,14 +1333,15 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
13291333
{SettledAcc, [{DeliveryId, Unsettled} | UnsettledAcc]}
13301334
end
13311335
end,
1332-
{#{}, []}, UnsettledMap0),
1336+
{#{}, []}, Iter),
13331337
{Settled0, maps:from_list(UnsettledList)}
13341338
end,
13351339

13361340
SettleOp = settle_op_from_outcome(Outcome),
13371341
{QStates, Actions} =
13381342
maps:fold(
1339-
fun({QName, Ctag}, MsgIds, {QS0, ActionsAcc}) ->
1343+
fun({QName, Ctag}, MsgIdsRev, {QS0, ActionsAcc}) ->
1344+
MsgIds = lists:reverse(MsgIdsRev),
13401345
case rabbit_queue_type:settle(QName, SettleOp, Ctag, MsgIds, QS0) of
13411346
{ok, QS, Actions0} ->
13421347
messages_acknowledged(SettleOp, QName, QS, MsgIds),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1972,7 +1972,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
19721972
end.
19731973

19741974
%% Settles (acknowledges) messages at the queue replica process level.
1975-
%% This happens in the youngest-first order (ascending by delivery tag).
1975+
%% This happens in the oldest-first order (ascending by delivery tag).
19761976
settle_acks(Acks, State = #ch{queue_states = QueueStates0}) ->
19771977
{QueueStates, Actions} =
19781978
foreach_per_queue(

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,7 @@ cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
275275
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
276276
rabbit_types:ctag(), [non_neg_integer()], state()) ->
277277
{state(), rabbit_queue_type:actions()}.
278-
settle(_QName, Op, _CTag, MsgIds0, State = #?STATE{pid = Pid}) ->
279-
%% Classic queues expect message IDs in sorted order.
280-
MsgIds = lists:usort(MsgIds0),
278+
settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
281279
Arg = case Op of
282280
complete ->
283281
{ack, MsgIds, self()};

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
%% channel record
4141
-record(cr, {ch_pid,
4242
monitor_ref,
43-
acktags,
44-
consumer_count,
43+
acktags :: ?QUEUE:?QUEUE({ack(), rabbit_types:ctag() | none}),
44+
consumer_count :: non_neg_integer(),
4545
%% Queue of {ChPid, #consumer{}} for consumers which have
4646
%% been blocked (rate/prefetch limited) for any reason
4747
blocked_consumers,

0 commit comments

Comments
 (0)