172
172
-record (queue_flow_ctl , {
173
173
delivery_count :: sequence_no (),
174
174
% % We cap the actual credit we grant to the sending queue.
175
+ % % If client_flow_ctl.credit is larger than LINK_CREDIT_RCV_FROM_QUEUE_MAX,
176
+ % % we will top up in batches to the sending queue.
175
177
credit :: 0 ..? LINK_CREDIT_RCV_FROM_QUEUE_MAX ,
176
- % % Credit as desired by the receiving client. If larger than
177
- % % LINK_CREDIT_RCV_FROM_QUEUE_MAX, we will top up in batches to the sending queue.
178
- desired_credit :: rabbit_queue_type :credit (),
179
178
drain :: boolean ()
180
179
}).
181
180
197
196
% % client and for the link to the sending queue.
198
197
client_flow_ctl :: # client_flow_ctl {} | credit_api_v1 ,
199
198
queue_flow_ctl :: # queue_flow_ctl {} | credit_api_v1 ,
200
- % % True if we sent a credit request to the sending queue
201
- % % but haven't processed the corresponding credit reply yet.
202
- credit_req_in_flight :: boolean () | credit_api_v1 ,
203
- % % While credit_req_in_flight is true, we stash the
199
+ % % 'true' means:
200
+ % % * we haven't processed a credit reply yet since we last sent
201
+ % % a credit request to the sending queue.
202
+ % % * a credit request is certainly in flight
203
+ % % * possibly multiple credit requests are in flight (e.g. rabbit_fifo_client
204
+ % % will re-send credit requests on our behalf on quorum queue leader changes)
205
+ % % 'false' means:
206
+ % % * we processed a credit reply since we last sent a credit request to the sending queue
207
+ % % * probably no credit request is in flight, but there might be
208
+ % % (we aren't sure since we don't use correlations for credit requests)
209
+ at_least_one_credit_req_in_flight :: boolean () | credit_api_v1 ,
210
+ % % While at_least_one_credit_req_in_flight is true, we stash the
204
211
% % latest credit request from the receiving client.
205
212
stashed_credit_req :: none | # credit_req {} | credit_api_v1
206
213
}).
@@ -1066,7 +1073,6 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
1066
1073
echo = false },
1067
1074
# queue_flow_ctl {delivery_count = ? INITIAL_DELIVERY_COUNT ,
1068
1075
credit = 0 ,
1069
- desired_credit = 0 ,
1070
1076
drain = false },
1071
1077
false ,
1072
1078
none };
@@ -1116,7 +1122,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
1116
1122
delivery_count = DeliveryCount ,
1117
1123
client_flow_ctl = ClientFlowCtl ,
1118
1124
queue_flow_ctl = QueueFlowCtl ,
1119
- credit_req_in_flight = CreditReqInFlight ,
1125
+ at_least_one_credit_req_in_flight = CreditReqInFlight ,
1120
1126
stashed_credit_req = StashedCreditReq },
1121
1127
OutgoingLinks = OutgoingLinks0 #{HandleInt => Link },
1122
1128
State1 = State0 # state {queue_states = QStates ,
@@ -1392,16 +1398,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
1392
1398
end
1393
1399
end .
1394
1400
1395
- handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , Drain },
1401
+ handle_credit_reply (Action = {credit_reply , Ctag , _DeliveryCount , _Credit , _Available , _Drain },
1396
1402
State = # state {outgoing_links = OutgoingLinks }) ->
1397
1403
Handle = ctag_to_handle (Ctag ),
1398
1404
case OutgoingLinks of
1399
- #{Handle := Link = # outgoing_link {queue_flow_ctl = QFC ,
1400
- credit_req_in_flight = CreditReqInFlight }} ->
1401
- % % Assert that we expect a credit reply for this consumer.
1402
- true = CreditReqInFlight ,
1403
- % % Assert that "The sender's value is always the last known value indicated by the receiver."
1404
- Drain = QFC # queue_flow_ctl .drain ,
1405
+ #{Handle := Link } ->
1405
1406
handle_credit_reply0 (Action , Handle , Link , State );
1406
1407
_ ->
1407
1408
% % Ignore credit reply for a detached link.
@@ -1418,18 +1419,16 @@ handle_credit_reply0(
1418
1419
echo = CEcho
1419
1420
},
1420
1421
queue_flow_ctl = # queue_flow_ctl {
1421
- delivery_count = QDeliveryCount ,
1422
- credit = QCredit ,
1423
- desired_credit = DesiredCredit
1424
- } = QFC ,
1422
+ delivery_count = QDeliveryCount
1423
+ } = QFC0 ,
1425
1424
stashed_credit_req = StashedCreditReq
1426
1425
} = Link0 ,
1427
1426
# state {outgoing_links = OutgoingLinks ,
1428
1427
queue_states = QStates0
1429
1428
} = S0 ) ->
1430
1429
1431
- % % Assert that flow control state between us and the queue is in sync.
1432
- QCredit = Credit ,
1430
+ % % Assertion: Our (receiver) delivery-count should be always
1431
+ % % in sync with the delivery-count of the sending queue.
1433
1432
QDeliveryCount = DeliveryCount ,
1434
1433
1435
1434
case StashedCreditReq of
@@ -1439,24 +1438,32 @@ handle_credit_reply0(
1439
1438
S = pop_credit_req (Handle , Ctag , Link0 , S0 ),
1440
1439
echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
1441
1440
S ;
1442
- none when QCredit =:= 0 andalso
1443
- DesiredCredit > 0 ->
1441
+ none when Credit =:= 0 andalso
1442
+ CCredit > 0 ->
1444
1443
QName = Link0 # outgoing_link .queue_name ,
1445
1444
% % Provide queue next batch of credits.
1446
- CappedCredit = cap_credit (DesiredCredit ),
1445
+ CappedCredit = cap_credit (CCredit ),
1447
1446
{ok , QStates , Actions } =
1448
1447
rabbit_queue_type :credit (
1449
1448
QName , Ctag , DeliveryCount , CappedCredit , false , QStates0 ),
1450
1449
Link = Link0 # outgoing_link {
1451
- queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }
1452
- },
1450
+ queue_flow_ctl = QFC0 # queue_flow_ctl {credit = CappedCredit },
1451
+ at_least_one_credit_req_in_flight = true },
1453
1452
S = S0 # state {queue_states = QStates ,
1454
1453
outgoing_links = OutgoingLinks #{Handle := Link }},
1455
1454
handle_queue_actions (Actions , S );
1456
1455
none ->
1457
- Link = Link0 # outgoing_link {credit_req_in_flight = false },
1456
+ % % Although we (the receiver) usually determine link credit, we set here
1457
+ % % our link credit to what the queue says our link credit is (which is safer
1458
+ % % in case credit requests got applied out of order in quorum queues).
1459
+ % % This should be fine given that we asserted earlier that our delivery-count is
1460
+ % % in sync with the delivery-count of the sending queue.
1461
+ QFC = QFC0 # queue_flow_ctl {credit = Credit },
1462
+ Link = Link0 # outgoing_link {
1463
+ queue_flow_ctl = QFC ,
1464
+ at_least_one_credit_req_in_flight = false },
1458
1465
S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
1459
- echo (CEcho , Handle , CDeliveryCount , DesiredCredit , Available , S ),
1466
+ echo (CEcho , Handle , CDeliveryCount , CCredit , Available , S ),
1460
1467
S
1461
1468
end ;
1462
1469
handle_credit_reply0 (
@@ -1465,10 +1472,11 @@ handle_credit_reply0(
1465
1472
Link0 = # outgoing_link {
1466
1473
queue_name = QName ,
1467
1474
client_flow_ctl = # client_flow_ctl {
1468
- delivery_count = CDeliveryCount0 } = CFC ,
1475
+ delivery_count = CDeliveryCount0 ,
1476
+ credit = CCredit
1477
+ } = CFC ,
1469
1478
queue_flow_ctl = # queue_flow_ctl {
1470
- delivery_count = QDeliveryCount0 ,
1471
- desired_credit = DesiredCredit
1479
+ delivery_count = QDeliveryCount0
1472
1480
} = QFC ,
1473
1481
stashed_credit_req = StashedCreditReq },
1474
1482
S0 = # state {cfg = # cfg {writer_pid = Writer ,
@@ -1480,31 +1488,38 @@ handle_credit_reply0(
1480
1488
0 = Credit ,
1481
1489
1482
1490
case DeliveryCount =:= QDeliveryCount0 andalso
1483
- DesiredCredit > 0 of
1491
+ CCredit > 0 of
1484
1492
true ->
1485
1493
% % We're in drain mode. The queue did not advance its delivery-count which means
1486
- % % it might still have messages available for us. We also desire more messages.
1494
+ % % it might still have messages available for us. The client also desires more messages.
1487
1495
% % Therefore, we do the next round of credit top-up. We prioritise finishing
1488
1496
% % the current drain credit top-up rounds over a stashed credit request because
1489
1497
% % this is easier to reason about and the queue will reply promptly meaning
1490
1498
% % the stashed request will be processed soon enough.
1491
- CappedCredit = cap_credit (DesiredCredit ),
1492
- Link = Link0 # outgoing_link {queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit }},
1493
-
1494
- {ok , QStates , Actions } =
1495
- rabbit_queue_type :credit (
1496
- QName , Ctag , DeliveryCount , CappedCredit , true , QStates0 ),
1499
+ CappedCredit = cap_credit (CCredit ),
1500
+ {ok , QStates , Actions } = rabbit_queue_type :credit (
1501
+ QName , Ctag , DeliveryCount ,
1502
+ CappedCredit , true , QStates0 ),
1503
+ Link = Link0 # outgoing_link {
1504
+ queue_flow_ctl = QFC # queue_flow_ctl {credit = CappedCredit },
1505
+ at_least_one_credit_req_in_flight = true },
1497
1506
S = S0 # state {queue_states = QStates ,
1498
1507
outgoing_links = OutgoingLinks #{Handle := Link }},
1499
1508
handle_queue_actions (Actions , S );
1500
1509
false ->
1510
+ case compare (DeliveryCount , QDeliveryCount0 ) of
1511
+ equal -> ok ;
1512
+ greater -> ok ; % % the sending queue advanced its delivery-count
1513
+ less -> error ({unexpected_delivery_count , DeliveryCount , QDeliveryCount0 })
1514
+ end ,
1515
+
1501
1516
% % We're in drain mode.
1502
1517
% % The queue either advanced its delivery-count which means it has
1503
- % % no more messages available for us, or we do not desire more messages.
1518
+ % % no more messages available for us, or the client does not desire more messages.
1504
1519
% % Therefore, we're done with draining and we "the sender will (after sending
1505
1520
% % all available messages) advance the delivery-count as much as possible,
1506
1521
% % consuming all link-credit, and send the flow state to the receiver."
1507
- CDeliveryCount = add (CDeliveryCount0 , DesiredCredit ),
1522
+ CDeliveryCount = add (CDeliveryCount0 , CCredit ),
1508
1523
Flow0 = # 'v1_0.flow' {handle = ? UINT (Handle ),
1509
1524
delivery_count = ? UINT (CDeliveryCount ),
1510
1525
link_credit = ? UINT (0 ),
@@ -1519,9 +1534,8 @@ handle_credit_reply0(
1519
1534
queue_flow_ctl = QFC # queue_flow_ctl {
1520
1535
delivery_count = DeliveryCount ,
1521
1536
credit = 0 ,
1522
- desired_credit = 0 ,
1523
1537
drain = false },
1524
- credit_req_in_flight = false
1538
+ at_least_one_credit_req_in_flight = false
1525
1539
},
1526
1540
S = S0 # state {outgoing_links = OutgoingLinks #{Handle := Link }},
1527
1541
case StashedCreditReq of
@@ -1553,19 +1567,17 @@ pop_credit_req(
1553
1567
LinkCreditSnd = amqp10_util :link_credit_snd (
1554
1568
DeliveryCountRcv , LinkCreditRcv , CDeliveryCount ),
1555
1569
CappedCredit = cap_credit (LinkCreditSnd ),
1556
- {ok , QStates , Actions } =
1557
- rabbit_queue_type : credit (
1558
- QName , Ctag , QDeliveryCount , CappedCredit , Drain , QStates0 ),
1570
+ {ok , QStates , Actions } = rabbit_queue_type : credit (
1571
+ QName , Ctag , QDeliveryCount ,
1572
+ CappedCredit , Drain , QStates0 ),
1559
1573
Link = Link0 # outgoing_link {
1560
1574
client_flow_ctl = CFC # client_flow_ctl {
1561
1575
credit = LinkCreditSnd ,
1562
1576
echo = Echo },
1563
1577
queue_flow_ctl = QFC # queue_flow_ctl {
1564
1578
credit = CappedCredit ,
1565
- desired_credit = LinkCreditSnd ,
1566
- drain = Drain
1567
- },
1568
- credit_req_in_flight = true ,
1579
+ drain = Drain },
1580
+ at_least_one_credit_req_in_flight = true ,
1569
1581
stashed_credit_req = none
1570
1582
},
1571
1583
S = S0 # state {queue_states = QStates ,
@@ -1685,19 +1697,20 @@ sent_pending_delivery(
1685
1697
credit_api_version = CreditApiVsn ,
1686
1698
client_flow_ctl = CFC0 ,
1687
1699
queue_flow_ctl = QFC0 ,
1688
- credit_req_in_flight = CreditReqInFlight0
1700
+ at_least_one_credit_req_in_flight = CreditReqInFlight0
1689
1701
} = Link0 = maps :get (Handle , OutgoingLinks0 ),
1690
1702
1691
1703
S = case CreditApiVsn of
1704
+ 1 ->
1705
+ S0 ;
1692
1706
2 ->
1693
1707
# client_flow_ctl {
1694
1708
delivery_count = CDeliveryCount0 ,
1695
1709
credit = CCredit0
1696
1710
} = CFC0 ,
1697
1711
# queue_flow_ctl {
1698
1712
delivery_count = QDeliveryCount0 ,
1699
- credit = QCredit0 ,
1700
- desired_credit = DesiredCredit0
1713
+ credit = QCredit0
1701
1714
} = QFC0 ,
1702
1715
1703
1716
CDeliveryCount = add (CDeliveryCount0 , 1 ),
@@ -1715,17 +1728,16 @@ sent_pending_delivery(
1715
1728
1716
1729
QDeliveryCount = add (QDeliveryCount0 , 1 ),
1717
1730
QCredit1 = max (0 , QCredit0 - 1 ),
1718
- DesiredCredit = max (0 , DesiredCredit0 - 1 ),
1719
1731
1720
1732
{QCredit , CreditReqInFlight , QStates , Actions } =
1721
1733
case QCredit1 =:= 0 andalso
1722
- DesiredCredit > 0 andalso
1734
+ CCredit > 0 andalso
1723
1735
not CreditReqInFlight0 of
1724
1736
true ->
1725
1737
% % assertion
1726
1738
none = Link0 # outgoing_link .stashed_credit_req ,
1727
1739
% % Provide queue next batch of credits.
1728
- CappedCredit = cap_credit (DesiredCredit ),
1740
+ CappedCredit = cap_credit (CCredit ),
1729
1741
{ok , QStates1 , Actions0 } =
1730
1742
rabbit_queue_type :credit (
1731
1743
QName , Ctag , QDeliveryCount , CappedCredit ,
@@ -1740,17 +1752,15 @@ sent_pending_delivery(
1740
1752
credit = CCredit },
1741
1753
QFC = QFC0 # queue_flow_ctl {
1742
1754
delivery_count = QDeliveryCount ,
1743
- credit = QCredit ,
1744
- desired_credit = DesiredCredit },
1745
- Link = Link0 # outgoing_link { client_flow_ctl = CFC ,
1746
- queue_flow_ctl = QFC ,
1747
- credit_req_in_flight = CreditReqInFlight },
1755
+ credit = QCredit } ,
1756
+ Link = Link0 # outgoing_link {
1757
+ client_flow_ctl = CFC ,
1758
+ queue_flow_ctl = QFC ,
1759
+ at_least_one_credit_req_in_flight = CreditReqInFlight },
1748
1760
OutgoingLinks = OutgoingLinks0 #{Handle := Link },
1749
1761
S1 = S0 # state {outgoing_links = OutgoingLinks ,
1750
1762
queue_states = QStates },
1751
- handle_queue_actions (Actions , S1 );
1752
- 1 ->
1753
- S0
1763
+ handle_queue_actions (Actions , S1 )
1754
1764
end ,
1755
1765
record_outgoing_unsettled (Pending , S ).
1756
1766
@@ -2677,7 +2687,7 @@ handle_outgoing_link_flow_control(
2677
2687
credit_api_version = CreditApiVsn ,
2678
2688
client_flow_ctl = CFC ,
2679
2689
queue_flow_ctl = QFC ,
2680
- credit_req_in_flight = CreditReqInFlight
2690
+ at_least_one_credit_req_in_flight = CreditReqInFlight
2681
2691
} = Link0 ,
2682
2692
# 'v1_0.flow' {handle = ? UINT (HandleInt ),
2683
2693
delivery_count = MaybeDeliveryCountRcv ,
@@ -2695,26 +2705,26 @@ handle_outgoing_link_flow_control(
2695
2705
2 ->
2696
2706
case CreditReqInFlight of
2697
2707
false ->
2698
- DesiredCredit = amqp10_util :link_credit_snd (
2708
+ LinkCreditSnd = amqp10_util :link_credit_snd (
2699
2709
DeliveryCountRcv ,
2700
2710
LinkCreditRcv ,
2701
2711
CFC # client_flow_ctl .delivery_count ),
2702
- CappedCredit = cap_credit (DesiredCredit ),
2712
+ CappedCredit = cap_credit (LinkCreditSnd ),
2703
2713
Link = Link0 # outgoing_link {
2704
- credit_req_in_flight = true ,
2705
2714
client_flow_ctl = CFC # client_flow_ctl {
2706
- credit = DesiredCredit ,
2715
+ credit = LinkCreditSnd ,
2707
2716
echo = Echo },
2708
2717
queue_flow_ctl = QFC # queue_flow_ctl {
2709
2718
credit = CappedCredit ,
2710
- desired_credit = DesiredCredit ,
2711
- drain = Drain } },
2719
+ drain = Drain } ,
2720
+ at_least_one_credit_req_in_flight = true },
2712
2721
{ok , QStates , Actions } = rabbit_queue_type :credit (
2713
2722
QName , Ctag ,
2714
2723
QFC # queue_flow_ctl .delivery_count ,
2715
2724
CappedCredit , Drain , QStates0 ),
2716
- State = State0 # state {queue_states = QStates ,
2717
- outgoing_links = OutgoingLinks #{HandleInt := Link }},
2725
+ State = State0 # state {
2726
+ queue_states = QStates ,
2727
+ outgoing_links = OutgoingLinks #{HandleInt := Link }},
2718
2728
handle_queue_actions (Actions , State );
2719
2729
true ->
2720
2730
% % A credit request is currently in-flight. Let's first process its reply
0 commit comments