Skip to content

Commit 217377b

Browse files
committed
quic: inbound connection-level flow control
Track the peer's connection level flow control window. Update the window with MAX_DATA frames as data is consumed by the user. Adjust shouldUpdateFlowControl so that we can use the same algorithm for both stream-level and connection-level flow control. The new algorithm is to send an update when doing so extends the peer's window by at least 1/8 of the maximum window size. For golang/go#58547 Change-Id: I2d8d82d06f0cb4b2ac25b3396c3cf4126a96e9cc Reviewed-on: https://go-review.googlesource.com/c/net/+/526716 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 044c308 commit 217377b

File tree

9 files changed

+386
-9
lines changed

9 files changed

+386
-9
lines changed

internal/quic/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ type Config struct {
4141
// If zero, the default value of 1MiB is used.
4242
// If negative, the limit is zero.
4343
MaxStreamWriteBufferSize int64
44+
45+
// MaxConnReadBufferSize is the maximum amount of data sent by the peer that a
46+
// connection will buffer for reading, across all streams.
47+
// If zero, the default value of 1MiB is used.
48+
// If negative, the limit is zero.
49+
MaxConnReadBufferSize int64
4450
}
4551

4652
func configDefault(v, def, limit int64) int64 {
@@ -69,3 +75,7 @@ func (c *Config) maxStreamReadBufferSize() int64 {
6975
func (c *Config) maxStreamWriteBufferSize() int64 {
7076
return configDefault(c.MaxStreamWriteBufferSize, 1<<20, maxVarint)
7177
}
78+
79+
func (c *Config) maxConnReadBufferSize() int64 {
80+
return configDefault(c.MaxConnReadBufferSize, 1<<20, maxVarint)
81+
}

internal/quic/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import "testing"
1010

1111
func TestConfigTransportParameters(t *testing.T) {
1212
const (
13+
wantInitialMaxData = int64(1)
1314
wantInitialMaxStreamData = int64(2)
1415
wantInitialMaxStreamsBidi = int64(3)
1516
wantInitialMaxStreamsUni = int64(4)
@@ -18,12 +19,16 @@ func TestConfigTransportParameters(t *testing.T) {
1819
c.MaxBidiRemoteStreams = wantInitialMaxStreamsBidi
1920
c.MaxUniRemoteStreams = wantInitialMaxStreamsUni
2021
c.MaxStreamReadBufferSize = wantInitialMaxStreamData
22+
c.MaxConnReadBufferSize = wantInitialMaxData
2123
})
2224
tc.handshake()
2325
if tc.sentTransportParameters == nil {
2426
t.Fatalf("conn didn't send transport parameters during handshake")
2527
}
2628
p := tc.sentTransportParameters
29+
if got, want := p.initialMaxData, wantInitialMaxData; got != want {
30+
t.Errorf("initial_max_data = %v, want %v", got, want)
31+
}
2732
if got, want := p.initialMaxStreamDataBidiLocal, wantInitialMaxStreamData; got != want {
2833
t.Errorf("initial_max_stream_data_bidi_local = %v, want %v", got, want)
2934
}

internal/quic/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.
117117
maxUDPPayloadSize: maxUDPPayloadSize,
118118
maxAckDelay: maxAckDelay,
119119
disableActiveMigration: true,
120+
initialMaxData: config.maxConnReadBufferSize(),
120121
initialMaxStreamDataBidiLocal: config.maxStreamReadBufferSize(),
121122
initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
122123
initialMaxStreamDataUni: config.maxStreamReadBufferSize(),

internal/quic/conn_flow.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import (
10+
"sync/atomic"
11+
"time"
12+
)
13+
14+
// connInflow tracks connection-level flow control for data sent by the peer to us.
15+
//
16+
// There are four byte offsets of significance in the stream of data received from the peer,
17+
// each >= to the previous:
18+
//
19+
// - bytes read by the user
20+
// - bytes received from the peer
21+
// - limit sent to the peer in a MAX_DATA frame
22+
// - potential new limit to sent to the peer
23+
//
24+
// We maintain a flow control window, so as bytes are read by the user
25+
// the potential limit is extended correspondingly.
26+
//
27+
// We keep an atomic counter of bytes read by the user and not yet applied to the
28+
// potential limit (credit). When this count grows large enough, we update the
29+
// new limit to send and mark that we need to send a new MAX_DATA frame.
30+
type connInflow struct {
31+
sent sentVal // set when we need to send a MAX_DATA update to the peer
32+
usedLimit int64 // total bytes sent by the peer, must be less than sentLimit
33+
sentLimit int64 // last MAX_DATA sent to the peer
34+
newLimit int64 // new MAX_DATA to send
35+
36+
credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
37+
}
38+
39+
func (c *Conn) inflowInit() {
40+
// The initial MAX_DATA limit is sent as a transport parameter.
41+
c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
42+
c.streams.inflow.newLimit = c.streams.inflow.sentLimit
43+
}
44+
45+
// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
46+
// We may extend the peer's flow control window.
47+
//
48+
// This is called indirectly by the user, via Read or CloseRead.
49+
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
50+
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
51+
// We should send a MAX_DATA update to the peer.
52+
// Record this on the Conn's main loop.
53+
c.sendMsg(func(now time.Time, c *Conn) {
54+
c.sendMaxDataUpdate()
55+
})
56+
}
57+
}
58+
59+
// handleStreamBytesReadOnLoop extends the peer's flow control window after
60+
// data has been discarded due to a RESET_STREAM frame.
61+
//
62+
// This is called on the conn's loop.
63+
func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
64+
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
65+
c.sendMaxDataUpdate()
66+
}
67+
}
68+
69+
func (c *Conn) sendMaxDataUpdate() {
70+
c.streams.inflow.sent.setUnsent()
71+
// Apply current credit to the limit.
72+
// We don't strictly need to do this here
73+
// since appendMaxDataFrame will do so as well,
74+
// but this avoids redundant trips down this path
75+
// if the MAX_DATA frame doesn't go out right away.
76+
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
77+
}
78+
79+
func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
80+
return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
81+
}
82+
83+
// handleStreamBytesReceived records that the peer has sent us stream data.
84+
func (c *Conn) handleStreamBytesReceived(n int64) error {
85+
c.streams.inflow.usedLimit += n
86+
if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
87+
return localTransportError(errFlowControl)
88+
}
89+
return nil
90+
}
91+
92+
// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
93+
//
94+
// It returns true if no more frames need appending,
95+
// false if it could not fit a frame in the current packet.
96+
func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
97+
if c.streams.inflow.sent.shouldSendPTO(pto) {
98+
// Add any unapplied credit to the new limit now.
99+
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
100+
if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
101+
return false
102+
}
103+
c.streams.inflow.sent.setSent(pnum)
104+
}
105+
return true
106+
}
107+
108+
// ackOrLossMaxData records the fate of a MAX_DATA frame.
109+
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
110+
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
111+
}

internal/quic/conn_flow_test.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.21
6+
7+
package quic
8+
9+
import "testing"
10+
11+
func TestConnInflowReturnOnRead(t *testing.T) {
12+
ctx := canceledContext()
13+
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
14+
c.MaxConnReadBufferSize = 64
15+
})
16+
tc.writeFrames(packetType1RTT, debugFrameStream{
17+
id: s.id,
18+
data: make([]byte, 64),
19+
})
20+
const readSize = 8
21+
if n, err := s.ReadContext(ctx, make([]byte, readSize)); n != readSize || err != nil {
22+
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize)
23+
}
24+
tc.wantFrame("available window increases, send a MAX_DATA",
25+
packetType1RTT, debugFrameMaxData{
26+
max: 64 + readSize,
27+
})
28+
if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64-readSize || err != nil {
29+
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize)
30+
}
31+
tc.wantFrame("available window increases, send a MAX_DATA",
32+
packetType1RTT, debugFrameMaxData{
33+
max: 128,
34+
})
35+
}
36+
37+
func TestConnInflowReturnOnClose(t *testing.T) {
38+
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
39+
c.MaxConnReadBufferSize = 64
40+
})
41+
tc.ignoreFrame(frameTypeStopSending)
42+
tc.writeFrames(packetType1RTT, debugFrameStream{
43+
id: s.id,
44+
data: make([]byte, 64),
45+
})
46+
s.CloseRead()
47+
tc.wantFrame("closing stream updates connection-level flow control",
48+
packetType1RTT, debugFrameMaxData{
49+
max: 128,
50+
})
51+
}
52+
53+
func TestConnInflowReturnOnReset(t *testing.T) {
54+
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
55+
c.MaxConnReadBufferSize = 64
56+
})
57+
tc.ignoreFrame(frameTypeStopSending)
58+
tc.writeFrames(packetType1RTT, debugFrameStream{
59+
id: s.id,
60+
data: make([]byte, 32),
61+
})
62+
tc.writeFrames(packetType1RTT, debugFrameResetStream{
63+
id: s.id,
64+
finalSize: 64,
65+
})
66+
s.CloseRead()
67+
tc.wantFrame("receiving stream reseet updates connection-level flow control",
68+
packetType1RTT, debugFrameMaxData{
69+
max: 128,
70+
})
71+
}
72+
73+
func TestConnInflowStreamViolation(t *testing.T) {
74+
tc := newTestConn(t, serverSide, func(c *Config) {
75+
c.MaxConnReadBufferSize = 100
76+
})
77+
tc.handshake()
78+
tc.ignoreFrame(frameTypeAck)
79+
// Total MAX_DATA consumed: 50
80+
tc.writeFrames(packetType1RTT, debugFrameStream{
81+
id: newStreamID(clientSide, bidiStream, 0),
82+
data: make([]byte, 50),
83+
})
84+
// Total MAX_DATA consumed: 80
85+
tc.writeFrames(packetType1RTT, debugFrameStream{
86+
id: newStreamID(clientSide, uniStream, 0),
87+
off: 20,
88+
data: make([]byte, 10),
89+
})
90+
// Total MAX_DATA consumed: 100
91+
tc.writeFrames(packetType1RTT, debugFrameStream{
92+
id: newStreamID(clientSide, bidiStream, 0),
93+
off: 70,
94+
fin: true,
95+
})
96+
// This stream has already consumed quota for these bytes.
97+
// Total MAX_DATA consumed: 100
98+
tc.writeFrames(packetType1RTT, debugFrameStream{
99+
id: newStreamID(clientSide, uniStream, 0),
100+
data: make([]byte, 20),
101+
})
102+
tc.wantIdle("peer has consumed all MAX_DATA quota")
103+
104+
// Total MAX_DATA consumed: 101
105+
tc.writeFrames(packetType1RTT, debugFrameStream{
106+
id: newStreamID(clientSide, bidiStream, 2),
107+
data: make([]byte, 1),
108+
})
109+
tc.wantFrame("peer violates MAX_DATA limit",
110+
packetType1RTT, debugFrameConnectionCloseTransport{
111+
code: errFlowControl,
112+
})
113+
}
114+
115+
func TestConnInflowResetViolation(t *testing.T) {
116+
tc := newTestConn(t, serverSide, func(c *Config) {
117+
c.MaxConnReadBufferSize = 100
118+
})
119+
tc.handshake()
120+
tc.ignoreFrame(frameTypeAck)
121+
tc.writeFrames(packetType1RTT, debugFrameStream{
122+
id: newStreamID(clientSide, bidiStream, 0),
123+
data: make([]byte, 100),
124+
})
125+
tc.wantIdle("peer has consumed all MAX_DATA quota")
126+
127+
tc.writeFrames(packetType1RTT, debugFrameResetStream{
128+
id: newStreamID(clientSide, uniStream, 0),
129+
finalSize: 0,
130+
})
131+
tc.wantIdle("stream reset does not consume MAX_DATA quota, no error")
132+
133+
tc.writeFrames(packetType1RTT, debugFrameResetStream{
134+
id: newStreamID(clientSide, uniStream, 1),
135+
finalSize: 1,
136+
})
137+
tc.wantFrame("RESET_STREAM final size violates MAX_DATA limit",
138+
packetType1RTT, debugFrameConnectionCloseTransport{
139+
code: errFlowControl,
140+
})
141+
}
142+
143+
func TestConnInflowMultipleStreams(t *testing.T) {
144+
ctx := canceledContext()
145+
tc := newTestConn(t, serverSide, func(c *Config) {
146+
c.MaxConnReadBufferSize = 128
147+
})
148+
tc.handshake()
149+
tc.ignoreFrame(frameTypeAck)
150+
151+
var streams []*Stream
152+
for _, id := range []streamID{
153+
newStreamID(clientSide, uniStream, 0),
154+
newStreamID(clientSide, uniStream, 1),
155+
newStreamID(clientSide, bidiStream, 0),
156+
newStreamID(clientSide, bidiStream, 1),
157+
} {
158+
tc.writeFrames(packetType1RTT, debugFrameStream{
159+
id: id,
160+
data: make([]byte, 32),
161+
})
162+
s, err := tc.conn.AcceptStream(ctx)
163+
if err != nil {
164+
t.Fatalf("AcceptStream() = %v", err)
165+
}
166+
streams = append(streams, s)
167+
if n, err := s.ReadContext(ctx, make([]byte, 1)); err != nil || n != 1 {
168+
t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err)
169+
}
170+
}
171+
tc.wantIdle("streams have read data, but not enough to update MAX_DATA")
172+
173+
if n, err := streams[0].ReadContext(ctx, make([]byte, 32)); err != nil || n != 31 {
174+
t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err)
175+
}
176+
tc.wantFrame("read enough data to trigger a MAX_DATA update",
177+
packetType1RTT, debugFrameMaxData{
178+
max: 128 + 32 + 1 + 1 + 1,
179+
})
180+
181+
streams[2].CloseRead()
182+
tc.wantFrame("closed stream triggers another MAX_DATA update",
183+
packetType1RTT, debugFrameMaxData{
184+
max: 128 + 32 + 1 + 32 + 1,
185+
})
186+
}

internal/quic/conn_loss.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ func (c *Conn) handleAckOrLoss(space numberSpace, sent *sentPacket, fate packetF
4444
case frameTypeCrypto:
4545
start, end := sent.nextRange()
4646
c.crypto[space].ackOrLoss(start, end, fate)
47+
case frameTypeMaxData:
48+
c.ackOrLossMaxData(sent.num, fate)
4749
case frameTypeResetStream,
4850
frameTypeStopSending,
4951
frameTypeMaxStreamData,

0 commit comments

Comments
 (0)