Skip to content

Commit 037bb3b

Browse files
ansdmergify[bot]
authored andcommitted
Fix quorum queue credit reply crash in AMQP session
Fixes #11841 PR #11307 introduced the invariant that at most one credit request between session proc and quorum queue proc can be in flight at any given time. This is not the case when rabbit_fifo_client re-sends credit requests on behalf of the session proc when the quorum queue leader changes. This commit therefore removes assertions which assumed only a single credit request to be in flight. This commit also removes field queue_flow_ctl.desired_credit since it is redundant to field client_flow_ctl.credit (cherry picked from commit ce915ae)
1 parent dd1f24a commit 037bb3b

File tree

3 files changed

+163
-75
lines changed

3 files changed

+163
-75
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 84 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,9 @@
172172
-record(queue_flow_ctl, {
173173
delivery_count :: sequence_no(),
174174
%% We cap the actual credit we grant to the sending queue.
175+
%% If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
176+
%% we will top up in batches to the sending queue.
175177
credit :: 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX,
176-
%% Credit as desired by the receiving client. If larger than
177-
%% LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
178-
desired_credit :: rabbit_queue_type:credit(),
179178
drain :: boolean()
180179
}).
181180

@@ -197,10 +196,18 @@
197196
%% client and for the link to the sending queue.
198197
client_flow_ctl :: #client_flow_ctl{} | credit_api_v1,
199198
queue_flow_ctl :: #queue_flow_ctl{} | credit_api_v1,
200-
%% True if we sent a credit request to the sending queue
201-
%% but haven't processed the corresponding credit reply yet.
202-
credit_req_in_flight :: boolean() | credit_api_v1,
203-
%% While credit_req_in_flight is true, we stash the
199+
%% 'true' means:
200+
%% * we haven't processed a credit reply yet since we last sent
201+
%% a credit request to the sending queue.
202+
%% * a credit request is certainly in flight
203+
%% * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
204+
%% will re-send credit requests on our behalf on quorum queue leader changes)
205+
%% 'false' means:
206+
%% * we processed a credit reply since we last sent a credit request to the sending queue
207+
%% * probably no credit request is in flight, but there might be
208+
%% (we aren't sure since we don't use correlations for credit requests)
209+
at_least_one_credit_req_in_flight :: boolean() | credit_api_v1,
210+
%% While at_least_one_credit_req_in_flight is true, we stash the
204211
%% latest credit request from the receiving client.
205212
stashed_credit_req :: none | #credit_req{} | credit_api_v1
206213
}).
@@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
10661073
echo = false},
10671074
#queue_flow_ctl{delivery_count = ?INITIAL_DELIVERY_COUNT,
10681075
credit = 0,
1069-
desired_credit = 0,
10701076
drain = false},
10711077
false,
10721078
none};
@@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
11161122
delivery_count = DeliveryCount,
11171123
client_flow_ctl = ClientFlowCtl,
11181124
queue_flow_ctl = QueueFlowCtl,
1119-
credit_req_in_flight = CreditReqInFlight,
1125+
at_least_one_credit_req_in_flight = CreditReqInFlight,
11201126
stashed_credit_req = StashedCreditReq},
11211127
OutgoingLinks = OutgoingLinks0#{HandleInt => Link},
11221128
State1 = State0#state{queue_states = QStates,
@@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
13921398
end
13931399
end.
13941400

1395-
handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, Drain},
1401+
handle_credit_reply(Action = {credit_reply, Ctag, _DeliveryCount, _Credit, _Available, _Drain},
13961402
State = #state{outgoing_links = OutgoingLinks}) ->
13971403
Handle = ctag_to_handle(Ctag),
13981404
case OutgoingLinks of
1399-
#{Handle := Link = #outgoing_link{queue_flow_ctl = QFC,
1400-
credit_req_in_flight = CreditReqInFlight}} ->
1401-
%% Assert that we expect a credit reply for this consumer.
1402-
true = CreditReqInFlight,
1403-
%% Assert that "The sender's value is always the last known value indicated by the receiver."
1404-
Drain = QFC#queue_flow_ctl.drain,
1405+
#{Handle := Link} ->
14051406
handle_credit_reply0(Action, Handle, Link, State);
14061407
_ ->
14071408
%% Ignore credit reply for a detached link.
@@ -1418,18 +1419,16 @@ handle_credit_reply0(
14181419
echo = CEcho
14191420
},
14201421
queue_flow_ctl = #queue_flow_ctl{
1421-
delivery_count = QDeliveryCount,
1422-
credit = QCredit,
1423-
desired_credit = DesiredCredit
1424-
} = QFC,
1422+
delivery_count = QDeliveryCount
1423+
} = QFC0,
14251424
stashed_credit_req = StashedCreditReq
14261425
} = Link0,
14271426
#state{outgoing_links = OutgoingLinks,
14281427
queue_states = QStates0
14291428
} = S0) ->
14301429

1431-
%% Assert that flow control state between us and the queue is in sync.
1432-
QCredit = Credit,
1430+
%% Assertion: Our (receiver) delivery-count should be always
1431+
%% in sync with the delivery-count of the sending queue.
14331432
QDeliveryCount = DeliveryCount,
14341433

14351434
case StashedCreditReq of
@@ -1439,24 +1438,32 @@ handle_credit_reply0(
14391438
S = pop_credit_req(Handle, Ctag, Link0, S0),
14401439
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
14411440
S;
1442-
none when QCredit =:= 0 andalso
1443-
DesiredCredit > 0 ->
1441+
none when Credit =:= 0 andalso
1442+
CCredit > 0 ->
14441443
QName = Link0#outgoing_link.queue_name,
14451444
%% Provide queue next batch of credits.
1446-
CappedCredit = cap_credit(DesiredCredit),
1445+
CappedCredit = cap_credit(CCredit),
14471446
{ok, QStates, Actions} =
14481447
rabbit_queue_type:credit(
14491448
QName, Ctag, DeliveryCount, CappedCredit, false, QStates0),
14501449
Link = Link0#outgoing_link{
1451-
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}
1452-
},
1450+
queue_flow_ctl = QFC0#queue_flow_ctl{credit = CappedCredit},
1451+
at_least_one_credit_req_in_flight = true},
14531452
S = S0#state{queue_states = QStates,
14541453
outgoing_links = OutgoingLinks#{Handle := Link}},
14551454
handle_queue_actions(Actions, S);
14561455
none ->
1457-
Link = Link0#outgoing_link{credit_req_in_flight = false},
1456+
%% Although we (the receiver) usually determine link credit, we set here
1457+
%% our link credit to what the queue says our link credit is (which is safer
1458+
%% in case credit requests got applied out of order in quorum queues).
1459+
%% This should be fine given that we asserted earlier that our delivery-count is
1460+
%% in sync with the delivery-count of the sending queue.
1461+
QFC = QFC0#queue_flow_ctl{credit = Credit},
1462+
Link = Link0#outgoing_link{
1463+
queue_flow_ctl = QFC,
1464+
at_least_one_credit_req_in_flight = false},
14581465
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
1459-
echo(CEcho, Handle, CDeliveryCount, DesiredCredit, Available, S),
1466+
echo(CEcho, Handle, CDeliveryCount, CCredit, Available, S),
14601467
S
14611468
end;
14621469
handle_credit_reply0(
@@ -1465,10 +1472,11 @@ handle_credit_reply0(
14651472
Link0 = #outgoing_link{
14661473
queue_name = QName,
14671474
client_flow_ctl = #client_flow_ctl{
1468-
delivery_count = CDeliveryCount0 } = CFC,
1475+
delivery_count = CDeliveryCount0,
1476+
credit = CCredit
1477+
} = CFC,
14691478
queue_flow_ctl = #queue_flow_ctl{
1470-
delivery_count = QDeliveryCount0,
1471-
desired_credit = DesiredCredit
1479+
delivery_count = QDeliveryCount0
14721480
} = QFC,
14731481
stashed_credit_req = StashedCreditReq},
14741482
S0 = #state{cfg = #cfg{writer_pid = Writer,
@@ -1480,31 +1488,38 @@ handle_credit_reply0(
14801488
0 = Credit,
14811489

14821490
case DeliveryCount =:= QDeliveryCount0 andalso
1483-
DesiredCredit > 0 of
1491+
CCredit > 0 of
14841492
true ->
14851493
%% We're in drain mode. The queue did not advance its delivery-count which means
1486-
%% it might still have messages available for us. We also desire more messages.
1494+
%% it might still have messages available for us. The client also desires more messages.
14871495
%% Therefore, we do the next round of credit top-up. We prioritise finishing
14881496
%% the current drain credit top-up rounds over a stashed credit request because
14891497
%% this is easier to reason about and the queue will reply promptly meaning
14901498
%% the stashed request will be processed soon enough.
1491-
CappedCredit = cap_credit(DesiredCredit),
1492-
Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}},
1493-
1494-
{ok, QStates, Actions} =
1495-
rabbit_queue_type:credit(
1496-
QName, Ctag, DeliveryCount, CappedCredit, true, QStates0),
1499+
CappedCredit = cap_credit(CCredit),
1500+
{ok, QStates, Actions} = rabbit_queue_type:credit(
1501+
QName, Ctag, DeliveryCount,
1502+
CappedCredit, true, QStates0),
1503+
Link = Link0#outgoing_link{
1504+
queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit},
1505+
at_least_one_credit_req_in_flight = true},
14971506
S = S0#state{queue_states = QStates,
14981507
outgoing_links = OutgoingLinks#{Handle := Link}},
14991508
handle_queue_actions(Actions, S);
15001509
false ->
1510+
case compare(DeliveryCount, QDeliveryCount0) of
1511+
equal -> ok;
1512+
greater -> ok; %% the sending queue advanced its delivery-count
1513+
less -> error({unexpected_delivery_count, DeliveryCount, QDeliveryCount0})
1514+
end,
1515+
15011516
%% We're in drain mode.
15021517
%% The queue either advanced its delivery-count which means it has
1503-
%% no more messages available for us, or we do not desire more messages.
1518+
%% no more messages available for us, or the client does not desire more messages.
15041519
%% Therefore, we're done with draining and we "the sender will (after sending
15051520
%% all available messages) advance the delivery-count as much as possible,
15061521
%% consuming all link-credit, and send the flow state to the receiver."
1507-
CDeliveryCount = add(CDeliveryCount0, DesiredCredit),
1522+
CDeliveryCount = add(CDeliveryCount0, CCredit),
15081523
Flow0 = #'v1_0.flow'{handle = ?UINT(Handle),
15091524
delivery_count = ?UINT(CDeliveryCount),
15101525
link_credit = ?UINT(0),
@@ -1519,9 +1534,8 @@ handle_credit_reply0(
15191534
queue_flow_ctl = QFC#queue_flow_ctl{
15201535
delivery_count = DeliveryCount,
15211536
credit = 0,
1522-
desired_credit = 0,
15231537
drain = false},
1524-
credit_req_in_flight = false
1538+
at_least_one_credit_req_in_flight = false
15251539
},
15261540
S = S0#state{outgoing_links = OutgoingLinks#{Handle := Link}},
15271541
case StashedCreditReq of
@@ -1553,19 +1567,17 @@ pop_credit_req(
15531567
LinkCreditSnd = amqp10_util:link_credit_snd(
15541568
DeliveryCountRcv, LinkCreditRcv, CDeliveryCount),
15551569
CappedCredit = cap_credit(LinkCreditSnd),
1556-
{ok, QStates, Actions} =
1557-
rabbit_queue_type:credit(
1558-
QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0),
1570+
{ok, QStates, Actions} = rabbit_queue_type:credit(
1571+
QName, Ctag, QDeliveryCount,
1572+
CappedCredit, Drain, QStates0),
15591573
Link = Link0#outgoing_link{
15601574
client_flow_ctl = CFC#client_flow_ctl{
15611575
credit = LinkCreditSnd,
15621576
echo = Echo},
15631577
queue_flow_ctl = QFC#queue_flow_ctl{
15641578
credit = CappedCredit,
1565-
desired_credit = LinkCreditSnd,
1566-
drain = Drain
1567-
},
1568-
credit_req_in_flight = true,
1579+
drain = Drain},
1580+
at_least_one_credit_req_in_flight = true,
15691581
stashed_credit_req = none
15701582
},
15711583
S = S0#state{queue_states = QStates,
@@ -1685,19 +1697,20 @@ sent_pending_delivery(
16851697
credit_api_version = CreditApiVsn,
16861698
client_flow_ctl = CFC0,
16871699
queue_flow_ctl = QFC0,
1688-
credit_req_in_flight = CreditReqInFlight0
1700+
at_least_one_credit_req_in_flight = CreditReqInFlight0
16891701
} = Link0 = maps:get(Handle, OutgoingLinks0),
16901702

16911703
S = case CreditApiVsn of
1704+
1 ->
1705+
S0;
16921706
2 ->
16931707
#client_flow_ctl{
16941708
delivery_count = CDeliveryCount0,
16951709
credit = CCredit0
16961710
} = CFC0,
16971711
#queue_flow_ctl{
16981712
delivery_count = QDeliveryCount0,
1699-
credit = QCredit0,
1700-
desired_credit = DesiredCredit0
1713+
credit = QCredit0
17011714
} = QFC0,
17021715

17031716
CDeliveryCount = add(CDeliveryCount0, 1),
@@ -1715,17 +1728,16 @@ sent_pending_delivery(
17151728

17161729
QDeliveryCount = add(QDeliveryCount0, 1),
17171730
QCredit1 = max(0, QCredit0 - 1),
1718-
DesiredCredit = max(0, DesiredCredit0 - 1),
17191731

17201732
{QCredit, CreditReqInFlight, QStates, Actions} =
17211733
case QCredit1 =:= 0 andalso
1722-
DesiredCredit > 0 andalso
1734+
CCredit > 0 andalso
17231735
not CreditReqInFlight0 of
17241736
true ->
17251737
%% assertion
17261738
none = Link0#outgoing_link.stashed_credit_req,
17271739
%% Provide queue next batch of credits.
1728-
CappedCredit = cap_credit(DesiredCredit),
1740+
CappedCredit = cap_credit(CCredit),
17291741
{ok, QStates1, Actions0} =
17301742
rabbit_queue_type:credit(
17311743
QName, Ctag, QDeliveryCount, CappedCredit,
@@ -1740,17 +1752,15 @@ sent_pending_delivery(
17401752
credit = CCredit},
17411753
QFC = QFC0#queue_flow_ctl{
17421754
delivery_count = QDeliveryCount,
1743-
credit = QCredit,
1744-
desired_credit = DesiredCredit},
1745-
Link = Link0#outgoing_link{client_flow_ctl = CFC,
1746-
queue_flow_ctl = QFC,
1747-
credit_req_in_flight = CreditReqInFlight},
1755+
credit = QCredit},
1756+
Link = Link0#outgoing_link{
1757+
client_flow_ctl = CFC,
1758+
queue_flow_ctl = QFC,
1759+
at_least_one_credit_req_in_flight = CreditReqInFlight},
17481760
OutgoingLinks = OutgoingLinks0#{Handle := Link},
17491761
S1 = S0#state{outgoing_links = OutgoingLinks,
17501762
queue_states = QStates},
1751-
handle_queue_actions(Actions, S1);
1752-
1 ->
1753-
S0
1763+
handle_queue_actions(Actions, S1)
17541764
end,
17551765
record_outgoing_unsettled(Pending, S).
17561766

@@ -2677,7 +2687,7 @@ handle_outgoing_link_flow_control(
26772687
credit_api_version = CreditApiVsn,
26782688
client_flow_ctl = CFC,
26792689
queue_flow_ctl = QFC,
2680-
credit_req_in_flight = CreditReqInFlight
2690+
at_least_one_credit_req_in_flight = CreditReqInFlight
26812691
} = Link0,
26822692
#'v1_0.flow'{handle = ?UINT(HandleInt),
26832693
delivery_count = MaybeDeliveryCountRcv,
@@ -2695,26 +2705,26 @@ handle_outgoing_link_flow_control(
26952705
2 ->
26962706
case CreditReqInFlight of
26972707
false ->
2698-
DesiredCredit = amqp10_util:link_credit_snd(
2708+
LinkCreditSnd = amqp10_util:link_credit_snd(
26992709
DeliveryCountRcv,
27002710
LinkCreditRcv,
27012711
CFC#client_flow_ctl.delivery_count),
2702-
CappedCredit = cap_credit(DesiredCredit),
2712+
CappedCredit = cap_credit(LinkCreditSnd),
27032713
Link = Link0#outgoing_link{
2704-
credit_req_in_flight = true,
27052714
client_flow_ctl = CFC#client_flow_ctl{
2706-
credit = DesiredCredit,
2715+
credit = LinkCreditSnd,
27072716
echo = Echo},
27082717
queue_flow_ctl = QFC#queue_flow_ctl{
27092718
credit = CappedCredit,
2710-
desired_credit = DesiredCredit,
2711-
drain = Drain}},
2719+
drain = Drain},
2720+
at_least_one_credit_req_in_flight = true},
27122721
{ok, QStates, Actions} = rabbit_queue_type:credit(
27132722
QName, Ctag,
27142723
QFC#queue_flow_ctl.delivery_count,
27152724
CappedCredit, Drain, QStates0),
2716-
State = State0#state{queue_states = QStates,
2717-
outgoing_links = OutgoingLinks#{HandleInt := Link}},
2725+
State = State0#state{
2726+
queue_states = QStates,
2727+
outgoing_links = OutgoingLinks#{HandleInt := Link}},
27182728
handle_queue_actions(Actions, State);
27192729
true ->
27202730
%% A credit request is currently in-flight. Let's first process its reply

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ seq_applied({Seq, Response},
666666
when Response /= not_enqueued ->
667667
{[Corr | Corrs], Actions, State#state{pending = Pending}};
668668
_ ->
669-
{Corrs, Actions, State#state{}}
669+
{Corrs, Actions, State}
670670
end;
671671
seq_applied(_Seq, Acc) ->
672672
Acc.

0 commit comments

Comments
 (0)