Skip to content

Commit cae7dab

Browse files
committed
quic: outbound connection-level flow control
Track the peer-provided flow control window. Only send stream data when the window permits. For golang/go#58547 Change-Id: I30c054346623e389b3d1cff1de629f1bbf918635 Reviewed-on: https://go-review.googlesource.com/c/net/+/527376 Reviewed-by: Jonathan Amsterdam <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]>
1 parent 217377b commit cae7dab

File tree

8 files changed

+432
-88
lines changed

8 files changed

+432
-88
lines changed

internal/quic/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func (c *Conn) discardKeys(now time.Time, space numberSpace) {
169169

170170
// receiveTransportParameters applies transport parameters sent by the peer.
171171
func (c *Conn) receiveTransportParameters(p transportParameters) error {
172+
c.streams.outflow.setMaxData(p.initialMaxData)
172173
c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
173174
c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
174175
c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal

internal/quic/conn_flow.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,26 @@ func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool)
109109
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
110110
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
111111
}
112+
113+
// connOutflow tracks connection-level flow control for data sent by us to the peer.
114+
type connOutflow struct {
115+
max int64 // largest MAX_DATA received from peer
116+
used int64 // total bytes of STREAM data sent to peer
117+
}
118+
119+
// setMaxData updates the connection-level flow control limit
120+
// with the initial limit conveyed in transport parameters
121+
// or an update from a MAX_DATA frame.
122+
func (f *connOutflow) setMaxData(maxData int64) {
123+
f.max = max(f.max, maxData)
124+
}
125+
126+
// avail returns the number of connection-level flow control bytes available.
127+
func (f *connOutflow) avail() int64 {
128+
return f.max - f.used
129+
}
130+
131+
// consume records consumption of n bytes of flow.
132+
func (f *connOutflow) consume(n int64) {
133+
f.used += n
134+
}

internal/quic/conn_flow_test.go

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
package quic
88

9-
import "testing"
9+
import (
10+
"testing"
11+
)
1012

1113
func TestConnInflowReturnOnRead(t *testing.T) {
1214
ctx := canceledContext()
@@ -184,3 +186,149 @@ func TestConnInflowMultipleStreams(t *testing.T) {
184186
max: 128 + 32 + 1 + 32 + 1,
185187
})
186188
}
189+
190+
func TestConnOutflowBlocked(t *testing.T) {
191+
tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
192+
permissiveTransportParameters,
193+
func(p *transportParameters) {
194+
p.initialMaxData = 10
195+
})
196+
tc.ignoreFrame(frameTypeAck)
197+
198+
data := makeTestData(32)
199+
n, err := s.Write(data)
200+
if n != len(data) || err != nil {
201+
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
202+
}
203+
204+
tc.wantFrame("stream writes data up to MAX_DATA limit",
205+
packetType1RTT, debugFrameStream{
206+
id: s.id,
207+
data: data[:10],
208+
})
209+
tc.wantIdle("stream is blocked by MAX_DATA limit")
210+
211+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
212+
max: 20,
213+
})
214+
tc.wantFrame("stream writes data up to new MAX_DATA limit",
215+
packetType1RTT, debugFrameStream{
216+
id: s.id,
217+
off: 10,
218+
data: data[10:20],
219+
})
220+
tc.wantIdle("stream is blocked by new MAX_DATA limit")
221+
222+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
223+
max: 100,
224+
})
225+
tc.wantFrame("stream writes remaining data",
226+
packetType1RTT, debugFrameStream{
227+
id: s.id,
228+
off: 20,
229+
data: data[20:],
230+
})
231+
}
232+
233+
func TestConnOutflowMaxDataDecreases(t *testing.T) {
234+
tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
235+
permissiveTransportParameters,
236+
func(p *transportParameters) {
237+
p.initialMaxData = 10
238+
})
239+
tc.ignoreFrame(frameTypeAck)
240+
241+
// Decrease in MAX_DATA is ignored.
242+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
243+
max: 5,
244+
})
245+
246+
data := makeTestData(32)
247+
n, err := s.Write(data)
248+
if n != len(data) || err != nil {
249+
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
250+
}
251+
252+
tc.wantFrame("stream writes data up to MAX_DATA limit",
253+
packetType1RTT, debugFrameStream{
254+
id: s.id,
255+
data: data[:10],
256+
})
257+
}
258+
259+
func TestConnOutflowMaxDataRoundRobin(t *testing.T) {
260+
ctx := canceledContext()
261+
tc := newTestConn(t, clientSide, permissiveTransportParameters,
262+
func(p *transportParameters) {
263+
p.initialMaxData = 0
264+
})
265+
tc.handshake()
266+
tc.ignoreFrame(frameTypeAck)
267+
268+
s1, err := tc.conn.newLocalStream(ctx, uniStream)
269+
if err != nil {
270+
t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
271+
}
272+
s2, err := tc.conn.newLocalStream(ctx, uniStream)
273+
if err != nil {
274+
t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
275+
}
276+
277+
s1.Write(make([]byte, 10))
278+
s2.Write(make([]byte, 10))
279+
280+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
281+
max: 1,
282+
})
283+
tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
284+
packetType1RTT, debugFrameStream{
285+
id: s1.id,
286+
data: []byte{0},
287+
})
288+
289+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
290+
max: 2,
291+
})
292+
tc.wantFrame("stream 2 writes data up to MAX_DATA limit",
293+
packetType1RTT, debugFrameStream{
294+
id: s2.id,
295+
data: []byte{0},
296+
})
297+
298+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
299+
max: 3,
300+
})
301+
tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
302+
packetType1RTT, debugFrameStream{
303+
id: s1.id,
304+
off: 1,
305+
data: []byte{0},
306+
})
307+
}
308+
309+
func TestConnOutflowMetaAndData(t *testing.T) {
310+
tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
311+
permissiveTransportParameters,
312+
func(p *transportParameters) {
313+
p.initialMaxData = 0
314+
})
315+
tc.ignoreFrame(frameTypeAck)
316+
317+
data := makeTestData(32)
318+
s.Write(data)
319+
320+
s.CloseRead()
321+
tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled",
322+
packetType1RTT, debugFrameStopSending{
323+
id: s.id,
324+
})
325+
326+
tc.writeFrames(packetType1RTT, debugFrameMaxData{
327+
max: 100,
328+
})
329+
tc.wantFrame("unblocked MAX_DATA",
330+
packetType1RTT, debugFrameStream{
331+
id: s.id,
332+
data: data,
333+
})
334+
}

internal/quic/conn_recv.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (c *Conn) handleFrames(now time.Time, ptype packetType, space numberSpace,
186186
if !frameOK(c, ptype, __01) {
187187
return
188188
}
189-
_, n = consumeMaxDataFrame(payload)
189+
n = c.handleMaxDataFrame(now, payload)
190190
case frameTypeMaxStreamData:
191191
if !frameOK(c, ptype, __01) {
192192
return
@@ -280,6 +280,15 @@ func (c *Conn) handleAckFrame(now time.Time, space numberSpace, payload []byte)
280280
return n
281281
}
282282

283+
func (c *Conn) handleMaxDataFrame(now time.Time, payload []byte) int {
284+
maxData, n := consumeMaxDataFrame(payload)
285+
if n < 0 {
286+
return -1
287+
}
288+
c.streams.outflow.setMaxData(maxData)
289+
return n
290+
}
291+
283292
func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
284293
id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
285294
if n < 0 {

0 commit comments

Comments
 (0)