Skip to content

Commit fab74cb

Browse files
committed
Fix link flow control in classic queues
This commit fixes java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0 followed by ./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2 Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around 8 - 10,000 messages. The bug was that in flight messages from classic queue process to session process were not taken into account when topping up credit to the classic queue process. The solution to this bug (and a much cleaner design anyway independent of this bug) is that queues should hold all link flow control state including the delivery-count. Hence, when credit API v2 is used the delivery-count will be held by the classic queue process, quorum queue process, and stream queue client instead of managing the delivery-count in the session. Also note that quorum queues will use serial number arithmetic for delivery-count in credit API v2. Furthermore, the double level crediting between (a) session process and rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was removed. Therefore, instead of managing 3 separate delivery-counts (i. session, ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used in rabbit_fifo. This simplifies a lot. This commit fixes quorum queues without bumping the machine version nor introducing new rabbit_fifo commands. Whether credit API v2 is used is solely determined at link attachment time depending on whether feature flag credit_api_v2 is enabled. Even when that feature flag will be enabled later on, this link will keep using credit API v1 until detached (or the node is shut down). Eventually, after feature flag credit_api_v2 has been enabled and a subsequent rolling upgrade, all links will use credit API v2. This approach is safe and simple. The 2 alternatives to move delivery-count from the session process to the queue processes would have been: 1. Explicit feature flag credit_api_v2 migration function * Can use a gen_server:call and only finish migration once all delivery-counts were migrated. Cons: * Extra new message format just for migration is required. * Risky as migration will fail if a target queue doesn’t reply. 2. Session always includes DeliveryCountSnd when crediting to the queue: Cons: * 2 delivery counts will be hold simulatenously in session proc and queue proc; could be solved by deleting the session proc’s delivery-count for credit-reply * What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
1 parent f54fa7f commit fab74cb

13 files changed

+673
-382
lines changed

deps/rabbit/app.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def all_beam_files(name = "all_beam_files"):
2424
app_name = "rabbit",
2525
dest = "ebin",
2626
erlc_opts = "//:erlc_opts",
27-
deps = ["//deps/rabbit_common:erlang_app"],
27+
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
2828
)
2929
erlang_bytecode(
3030
name = "other_beam",
@@ -286,7 +286,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
286286
app_name = "rabbit",
287287
dest = "test",
288288
erlc_opts = "//:test_erlc_opts",
289-
deps = ["//deps/rabbit_common:erlang_app"],
289+
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app"],
290290
)
291291
erlang_bytecode(
292292
name = "test_other_beam",

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 57 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,11 +1337,10 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
13371337
end;
13381338

13391339
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
1340-
Mode, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
1340+
ModeOrPrefetch, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
13411341
_From, State = #q{consumers = Consumers,
13421342
active_consumer = Holder,
13431343
single_active_consumer_on = SingleActiveConsumerOn}) ->
1344-
{PrefetchCount, _} = ParsedCreditMode = rabbit_queue_consumers:parse_credit_mode(Mode, Args),
13451344
ConsumerRegistration = case SingleActiveConsumerOn of
13461345
true ->
13471346
case ExclusiveConsume of
@@ -1350,30 +1349,27 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
13501349
false ->
13511350
Consumers1 = rabbit_queue_consumers:add(
13521351
ChPid, ConsumerTag, NoAck,
1353-
LimiterPid, LimiterActive,
1354-
ParsedCreditMode, Args,
1355-
ActingUser, Consumers),
1356-
1357-
case Holder of
1358-
none ->
1359-
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
1360-
{state, State#q{consumers = Consumers1,
1361-
has_had_consumers = true,
1362-
active_consumer = NewConsumer}};
1363-
_ ->
1364-
{state, State#q{consumers = Consumers1,
1365-
has_had_consumers = true}}
1366-
end
1352+
LimiterPid, LimiterActive, ModeOrPrefetch,
1353+
Args, ActingUser, Consumers),
1354+
case Holder of
1355+
none ->
1356+
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
1357+
{state, State#q{consumers = Consumers1,
1358+
has_had_consumers = true,
1359+
active_consumer = NewConsumer}};
1360+
_ ->
1361+
{state, State#q{consumers = Consumers1,
1362+
has_had_consumers = true}}
1363+
end
13671364
end;
13681365
false ->
13691366
case check_exclusive_access(Holder, ExclusiveConsume, State) of
13701367
in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
13711368
ok ->
13721369
Consumers1 = rabbit_queue_consumers:add(
13731370
ChPid, ConsumerTag, NoAck,
1374-
LimiterPid, LimiterActive,
1375-
ParsedCreditMode, Args,
1376-
ActingUser, Consumers),
1371+
LimiterPid, LimiterActive, ModeOrPrefetch,
1372+
Args, ActingUser, Consumers),
13771373
ExclusiveConsumer =
13781374
if ExclusiveConsume -> {ChPid, ConsumerTag};
13791375
true -> Holder
@@ -1400,7 +1396,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
14001396
{false, _} ->
14011397
{true, up}
14021398
end,
1403-
rabbit_core_metrics:consumer_created(
1399+
PrefetchCount = rabbit_queue_consumers:parse_prefetch_count(ModeOrPrefetch),
1400+
rabbit_core_metrics:consumer_created(
14041401
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
14051402
PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
14061403
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
@@ -1631,52 +1628,54 @@ handle_cast(update_mirroring, State = #q{q = Q,
16311628
State1 = State#q{mirroring_policy_version = NewVersion},
16321629
noreply(update_mirroring(Policy, State1))
16331630
end;
1634-
handle_cast({credit, ChPid, CTag, Credit, Drain}, State) ->
1635-
%% Feature flag credit_api_v2 is disabled,
1636-
%% i.e. this function clause should be deleted once that feature flag becomes required.
1637-
handle_cast({credit, ChPid, CTag, Credit, Drain, true, #{}}, State);
16381631

1639-
handle_cast({credit, ChPid, CTag, Credit, Drain, Reply, LinkStateProperties},
1640-
#q{consumers = Consumers0,
1641-
q = Q,
1632+
handle_cast({credit, SessionPid, CTag, Credit, Drain},
1633+
#q{q = Q,
16421634
backing_queue = BQ,
1643-
backing_queue_state = BQS0} = State0) ->
1635+
backing_queue_state = BQS0} = State) ->
1636+
%% Credit API v1.
1637+
%% Delete this function clause when feature flag credit_api_v2 becomes required.
1638+
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
1639+
rabbit_classic_queue:send_credit_reply_credit_api_v1(
1640+
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
1641+
handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain, false}, State);
1642+
handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain, Echo},
1643+
#q{consumers = Consumers0,
1644+
q = Q} = State0) ->
16441645
QName = amqqueue:get_name(Q),
1645-
Vsn = rabbit_classic_queue:credit_api_vsn(),
1646-
case Vsn of
1647-
v1 ->
1648-
%% Behave like non-native AMQP 1.0:
1649-
%% Send send_credit_reply before deliveries.
1650-
rabbit_classic_queue:send_credit_reply(
1651-
ChPid, QName, CTag, Credit, BQ:len(BQS0),
1652-
false, LinkStateProperties);
1653-
v2 ->
1654-
%% Send credit_reply after deliveries.
1655-
ok
1656-
end,
1657-
16581646
State = #q{backing_queue_state = PostBQS,
1659-
consumers = Consumers2}
1660-
= case rabbit_queue_consumers:set_credit(Credit, ChPid, CTag, Consumers0) of
1661-
unchanged ->
1662-
State0;
1663-
{unblocked, Consumers1} ->
1664-
State1 = State0#q{consumers = Consumers1},
1665-
run_message_queue(true, State1)
1666-
end,
1667-
1668-
case rabbit_queue_consumers:get_credit(ChPid, CTag) of
1669-
PostCred
1670-
when is_integer(PostCred) andalso Drain andalso PostCred > 0 ->
1671-
unchanged = rabbit_queue_consumers:set_credit(0, ChPid, CTag, Consumers2),
1647+
backing_queue = BQ} = case rabbit_queue_consumers:process_credit(
1648+
DeliveryCountRcv, Credit, SessionPid, CTag, Consumers0) of
1649+
unchanged ->
1650+
State0;
1651+
{unblocked, Consumers1} ->
1652+
State1 = State0#q{consumers = Consumers1},
1653+
run_message_queue(true, State1)
1654+
end,
1655+
case rabbit_queue_consumers:get_link_state(SessionPid, CTag) of
1656+
{credit_api_v1, PostCred}
1657+
when Drain andalso
1658+
is_integer(PostCred) andalso PostCred > 0 ->
1659+
%% credit API v1
1660+
rabbit_queue_consumers:drained(credit_api_v1, SessionPid, CTag),
1661+
rabbit_classic_queue:send_drained_credit_api_v1(SessionPid, QName, CTag, PostCred);
1662+
{PostDeliveryCountSnd, PostCred}
1663+
when is_integer(PostDeliveryCountSnd) andalso
1664+
Drain andalso
1665+
is_integer(PostCred) andalso PostCred > 0 ->
1666+
%% credit API v2
1667+
AdvancedDeliveryCount = serial_number:add(PostDeliveryCountSnd, PostCred),
1668+
rabbit_queue_consumers:drained(AdvancedDeliveryCount, SessionPid, CTag),
16721669
Avail = BQ:len(PostBQS),
16731670
rabbit_classic_queue:send_credit_reply(
1674-
ChPid, QName, CTag, PostCred, Avail, Drain, LinkStateProperties);
1675-
PostCred
1676-
when is_integer(PostCred) andalso Vsn =:= v2 andalso Reply ->
1671+
SessionPid, QName, CTag, AdvancedDeliveryCount, 0, Avail, Drain);
1672+
{PostDeliveryCountSnd, PostCred}
1673+
when is_integer(PostDeliveryCountSnd) andalso
1674+
Echo ->
1675+
%% credit API v2
16771676
Avail = BQ:len(PostBQS),
16781677
rabbit_classic_queue:send_credit_reply(
1679-
ChPid, QName, CTag, PostCred, Avail, Drain, LinkStateProperties);
1678+
SessionPid, QName, CTag, PostDeliveryCountSnd, PostCred, Avail, Drain);
16801679
_ ->
16811680
ok
16821681
end,

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
handle_event/3,
4242
deliver/3,
4343
settle/5,
44+
credit_v1/5,
4445
credit/7,
4546
dequeue/5,
4647
info/2,
@@ -58,7 +59,8 @@
5859
-export([confirm_to_sender/3,
5960
send_rejection/3,
6061
deliver_to_consumer/5,
61-
credit_api_vsn/0,
62+
send_credit_reply_credit_api_v1/3,
63+
send_drained_credit_api_v1/4,
6264
send_credit_reply/7]).
6365

6466
-spec is_enabled() -> boolean().
@@ -237,20 +239,17 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
237239
channel_pid := ChPid,
238240
limiter_pid := LimiterPid,
239241
limiter_active := LimiterActive,
240-
mode := Mode0,
242+
mode := Mode,
241243
consumer_tag := ConsumerTag,
242244
exclusive_consume := ExclusiveConsume,
243245
args := Args0,
244246
ok_msg := OkMsg,
245247
acting_user := ActingUser} = Spec,
246-
{Mode, Args} = case credit_api_vsn() of
247-
v2 -> {Mode0, Args0};
248-
v1 -> consumer_spec_v2_to_v1(Mode0, Args0)
249-
end,
248+
{ModeOrPrefetch, Args} = consume_backwards_compat(Mode, Args0),
250249
case delegate:invoke(QPid,
251250
{gen_server2, call,
252251
[{basic_consume, NoAck, ChPid, LimiterPid,
253-
LimiterActive, Mode, ConsumerTag,
252+
LimiterActive, ModeOrPrefetch, ConsumerTag,
254253
ExclusiveConsume, Args, OkMsg, ActingUser},
255254
infinity]}) of
256255
ok ->
@@ -261,9 +260,18 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
261260
Err
262261
end.
263262

264-
consumer_spec_v2_to_v1({simple_prefetch, PrefetchCount}, Args) ->
265-
{PrefetchCount, Args};
266-
consumer_spec_v2_to_v1(credited, Args) ->
263+
%% Delete this function when feature flag credit_api_v2 becomes required.
264+
consume_backwards_compat({simple_prefetch, PrefetchCount} = Mode, Args) ->
265+
case rabbit_feature_flags:is_enabled(credit_api_v2) of
266+
true -> {Mode, Args};
267+
false -> {PrefetchCount, Args}
268+
end;
269+
consume_backwards_compat({credited, InitialDeliveryCount} = Mode, Args)
270+
when is_integer(InitialDeliveryCount) ->
271+
%% credit API v2
272+
{Mode, Args};
273+
consume_backwards_compat({credited, credit_api_v1}, Args) ->
274+
%% credit API v1
267275
{_PrefetchCount = 0,
268276
[{<<"x-credit">>, table, [{<<"credit">>, long, 0},
269277
{<<"drain">>, bool, false}]} | Args]}.
@@ -293,15 +301,13 @@ settle(_QName, Op, _CTag, MsgIds, State) ->
293301
[{reject, Op == requeue, MsgIds, ChPid}]}),
294302
{State, []}.
295303

296-
credit(_QName, CTag, Credit, Drain, Reply, Properties, #?STATE{pid = QPid} = State) ->
297-
ChPid = self(),
298-
Request = case credit_api_vsn() of
299-
v2 ->
300-
%% TODO use a map for more flexiblity in the future?
301-
{credit, ChPid, CTag, Credit, Drain, Reply, Properties};
302-
v1 ->
303-
{credit, ChPid, CTag, Credit, Drain}
304-
end,
304+
credit_v1(_QName, Ctag, LinkCreditSnd, Drain, #?STATE{pid = QPid} = State) ->
305+
Request = {credit, self(), Ctag, LinkCreditSnd, Drain},
306+
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
307+
{State, []}.
308+
309+
credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo, #?STATE{pid = QPid} = State) ->
310+
Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo},
305311
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
306312
{State, []}.
307313

@@ -374,8 +380,7 @@ handle_event(_QName, Action, State)
374380
handle_event(_QName, {send_drained, {Ctag, Credit}}, State) ->
375381
%% This function clause should be deleted when feature flag
376382
%% credit_api_v2 becomes required.
377-
Action = {credit_reply, Ctag, Credit, _Available = 0,
378-
_Drain = true, _Properties = #{}},
383+
Action = {credit_reply_v1, Ctag, Credit, _Available = 0, _Drain = true},
379384
{ok, State, [Action]}.
380385

381386
settlement_action(_Type, _QRef, [], Acc) ->
@@ -642,27 +647,19 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
642647
Evt = {deliver, CTag, AckRequired, [Message]},
643648
send_queue_event(Pid, QName, Evt).
644649

645-
send_credit_reply(Pid, QName, Ctag, Credit, Available, Drain, Properties) ->
646-
case credit_api_vsn() of
647-
v2 ->
648-
Evt = {credit_reply, Ctag, Credit, Available, Drain, Properties},
649-
send_queue_event(Pid, QName, Evt);
650-
v1 ->
651-
case Drain of
652-
true ->
653-
Evt = {send_drained, {Ctag, Credit}},
654-
send_queue_event(Pid, QName, Evt);
655-
false ->
656-
Evt = {send_credit_reply, Available},
657-
send_queue_event(Pid, QName, Evt)
658-
end
659-
end.
650+
%% Delete this function when feature flag credit_api_v2 becomes required.
651+
send_credit_reply_credit_api_v1(Pid, QName, Available) ->
652+
Evt = {send_credit_reply, Available},
653+
send_queue_event(Pid, QName, Evt).
654+
655+
%% Delete this function when feature flag credit_api_v2 becomes required.
656+
send_drained_credit_api_v1(Pid, QName, Ctag, Credit) ->
657+
Evt = {send_drained, {Ctag, Credit}},
658+
send_queue_event(Pid, QName, Evt).
659+
660+
send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
661+
Evt = {credit_reply, Ctag, DeliveryCount, Credit, Available, Drain},
662+
send_queue_event(Pid, QName, Evt).
660663

661664
send_queue_event(Pid, QName, Event) ->
662665
gen_server:cast(Pid, {queue_event, QName, Event}).
663-
664-
credit_api_vsn() ->
665-
case rabbit_feature_flags:is_enabled(credit_api_v2) of
666-
true -> v2;
667-
false -> v1
668-
end.

0 commit comments

Comments
 (0)