@@ -1127,9 +1127,8 @@ send_pending(#state{remote_incoming_window = Space,
1127
1127
{{value , # pending_transfer {
1128
1128
frames = Frames ,
1129
1129
queue_pid = QPid ,
1130
- outgoing_unsettled = # outgoing_unsettled {
1131
- consumer_tag = Ctag ,
1132
- queue_name = QName }} = Pending }, Buf1 }
1130
+ outgoing_unsettled = # outgoing_unsettled {queue_name = QName }
1131
+ } = Pending }, Buf1 }
1133
1132
when Space > 0 ->
1134
1133
SendFun = case rabbit_queue_type :module (QName , State0 # state .queue_states ) of
1135
1134
{ok , rabbit_classic_queue } ->
@@ -1143,28 +1142,20 @@ send_pending(#state{remote_incoming_window = Space,
1143
1142
WriterPid , Ch , Transfer , Sections )
1144
1143
end
1145
1144
end ,
1146
- % % rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
1145
+ { NumTransfersSent , Buf , State1 } =
1147
1146
case send_frames (SendFun , Frames , Space ) of
1148
1147
{all , SpaceLeft } ->
1149
- State1 = # state {outgoing_links = OutgoingLinks0 } = session_flow_control_sent_transfers (
1150
- Space - SpaceLeft , State0 ),
1151
- HandleInt = ctag_to_handle (Ctag ),
1152
- OutgoingLinks = maps :update_with (
1153
- HandleInt ,
1154
- fun (# outgoing_link {delivery_count = {credit_api_v1 , C }} = Link ) ->
1155
- Link # outgoing_link {delivery_count = {credit_api_v1 , add (C , 1 )}};
1156
- (# outgoing_link {delivery_count = credit_api_v2 } = Link ) ->
1157
- Link
1158
- end ,
1159
- OutgoingLinks0 ),
1160
- State2 = State1 # state {outgoing_links = OutgoingLinks },
1161
- State = record_outgoing_unsettled (Pending , State2 ),
1162
- send_pending (State # state {outgoing_pending = Buf1 });
1148
+ {Space - SpaceLeft ,
1149
+ Buf1 ,
1150
+ record_outgoing_unsettled (Pending , State0 )};
1163
1151
{some , Rest } ->
1164
- State = session_flow_control_sent_transfers (Space , State0 ),
1165
- Buf = queue :in_r (Pending # pending_transfer {frames = Rest }, Buf1 ),
1166
- send_pending (State # state {outgoing_pending = Buf })
1167
- end ;
1152
+ {Space ,
1153
+ queue :in_r (Pending # pending_transfer {frames = Rest }, Buf1 ),
1154
+ State0 }
1155
+ end ,
1156
+ State2 = session_flow_control_sent_transfers (NumTransfersSent , State1 ),
1157
+ State = State2 # state {outgoing_pending = Buf },
1158
+ send_pending (State );
1168
1159
{{value , # pending_transfer {}}, _ }
1169
1160
when Space =:= 0 ->
1170
1161
State0
@@ -1415,17 +1406,18 @@ handle_deliver(ConsumerTag, AckRequired,
1415
1406
Msg = {QName , QPid , MsgId , Redelivered , Mc0 },
1416
1407
State = # state {outgoing_pending = Pending ,
1417
1408
outgoing_delivery_id = DeliveryId ,
1418
- outgoing_links = OutgoingLinks ,
1409
+ outgoing_links = OutgoingLinks0 ,
1419
1410
cfg = # cfg {outgoing_max_frame_size = MaxFrameSize ,
1420
1411
conn_name = ConnName ,
1421
1412
channel_num = ChannelNum ,
1422
1413
user = # user {username = Username },
1423
1414
trace_state = Trace }}) ->
1424
1415
Handle = ctag_to_handle (ConsumerTag ),
1425
- case OutgoingLinks of
1416
+ case OutgoingLinks0 of
1426
1417
#{Handle := # outgoing_link {queue_type = QType ,
1427
1418
send_settled = SendSettled ,
1428
- max_message_size = MaxMessageSize }} ->
1419
+ max_message_size = MaxMessageSize ,
1420
+ delivery_count = DelCount } = Link0 } ->
1429
1421
Dtag = delivery_tag (MsgId , SendSettled ),
1430
1422
Transfer = # 'v1_0.transfer' {
1431
1423
handle = ? UINT (Handle ),
@@ -1451,6 +1443,13 @@ handle_deliver(ConsumerTag, AckRequired,
1451
1443
end ,
1452
1444
messages_delivered (Redelivered , QType ),
1453
1445
rabbit_trace :tap_out (Msg , ConnName , ChannelNum , Username , Trace ),
1446
+ OutgoingLinks = case DelCount of
1447
+ credit_api_v2 ->
1448
+ OutgoingLinks0 ;
1449
+ {credit_api_v1 , C } ->
1450
+ Link = Link0 # outgoing_link {delivery_count = {credit_api_v1 , add (C , 1 )}},
1451
+ maps :update (Handle , Link , OutgoingLinks0 )
1452
+ end ,
1454
1453
Del = # outgoing_unsettled {
1455
1454
msg_id = MsgId ,
1456
1455
consumer_tag = ConsumerTag ,
@@ -1465,8 +1464,9 @@ handle_deliver(ConsumerTag, AckRequired,
1465
1464
queue_pid = QPid ,
1466
1465
delivery_id = DeliveryId ,
1467
1466
outgoing_unsettled = Del },
1468
- State # state {outgoing_delivery_id = add (DeliveryId , 1 ),
1469
- outgoing_pending = queue :in (PendingTransfer , Pending )};
1467
+ State # state {outgoing_pending = queue :in (PendingTransfer , Pending ),
1468
+ outgoing_delivery_id = add (DeliveryId , 1 ),
1469
+ outgoing_links = OutgoingLinks };
1470
1470
_ ->
1471
1471
% % TODO handle missing link -- why does the queue think it's there?
1472
1472
rabbit_log :warning (
0 commit comments