Skip to content

Commit da5f9f7

Browse files
neildgopherbot
authored andcommitted
quic: don't block Writes on stream-level flow control
Data written to a stream can be sent to the peer in a STREAM frame only when: - congestion control window is available - pacing does not block sending - stream-level flow control is available - connection-level flow control is available There must be a pushback mechanism to limit the amount of locally buffered stream data, but I no longer believe the stream-level flow control needs to be part of that pushback. Using connection-level flow control (not yet implemented) to block stream Write calls is problematic, because it makes it difficult to fairly divide available send capacity between multiple streams. If writes to a stream consume connection-level flow control before we commit that data to the wire, it becomes very easy for one stream to starve others. It's confusing to use stream-level flow control to block Write calls, but not connection-level flow control. This will especially produce unexpected behavior if the recipient chooses to provide unlimited stream-level quota but limited connection-level quota. Change Stream.Write to only block writes based on the configured maximum send buffer size. We may now buffer data which cannot be immediately sent, but that was the case already when transmission is blocked by congestion control. In the future, we may want to make the stream buffer sizes adaptive in response to the amount of in-flight data. Rename Config.Stream*BufferSize to MaxStream*BufferSize, to allow for possibly adding a minimum size later. For golang/go#58547 Change-Id: I528a611fefb16b323776965c5b2ab5644035ed7a Reviewed-on: https://go-review.googlesource.com/c/net/+/524958 LUCI-TryBot-Result: Go LUCI <[email protected]> Commit-Queue: Damien Neil <[email protected]> Auto-Submit: Damien Neil <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent c3c6260 commit da5f9f7

File tree

10 files changed

+145
-79
lines changed

10 files changed

+145
-79
lines changed

internal/quic/config.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ type Config struct {
3030
// If negative, the limit is zero.
3131
MaxUniRemoteStreams int64
3232

33-
// StreamReadBufferSize is the maximum amount of data sent by the peer that a
33+
// MaxStreamReadBufferSize is the maximum amount of data sent by the peer that a
3434
// stream will buffer for reading.
3535
// If zero, the default value of 1MiB is used.
3636
// If negative, the limit is zero.
37-
StreamReadBufferSize int64
37+
MaxStreamReadBufferSize int64
3838

39-
// StreamWriteBufferSize is the maximum amount of data a stream will buffer for
39+
// MaxStreamWriteBufferSize is the maximum amount of data a stream will buffer for
4040
// sending to the peer.
4141
// If zero, the default value of 1MiB is used.
4242
// If negative, the limit is zero.
43-
StreamWriteBufferSize int64
43+
MaxStreamWriteBufferSize int64
4444
}
4545

4646
func configDefault(v, def, limit int64) int64 {
@@ -62,10 +62,10 @@ func (c *Config) maxUniRemoteStreams() int64 {
6262
return configDefault(c.MaxUniRemoteStreams, 100, maxStreamsLimit)
6363
}
6464

65-
func (c *Config) streamReadBufferSize() int64 {
66-
return configDefault(c.StreamReadBufferSize, 1<<20, maxVarint)
65+
func (c *Config) maxStreamReadBufferSize() int64 {
66+
return configDefault(c.MaxStreamReadBufferSize, 1<<20, maxVarint)
6767
}
6868

69-
func (c *Config) streamWriteBufferSize() int64 {
70-
return configDefault(c.StreamWriteBufferSize, 1<<20, maxVarint)
69+
func (c *Config) maxStreamWriteBufferSize() int64 {
70+
return configDefault(c.MaxStreamWriteBufferSize, 1<<20, maxVarint)
7171
}

internal/quic/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestConfigTransportParameters(t *testing.T) {
1717
tc := newTestConn(t, clientSide, func(c *Config) {
1818
c.MaxBidiRemoteStreams = wantInitialMaxStreamsBidi
1919
c.MaxUniRemoteStreams = wantInitialMaxStreamsUni
20-
c.StreamReadBufferSize = wantInitialMaxStreamData
20+
c.MaxStreamReadBufferSize = wantInitialMaxStreamData
2121
})
2222
tc.handshake()
2323
if tc.sentTransportParameters == nil {

internal/quic/conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.
117117
maxUDPPayloadSize: maxUDPPayloadSize,
118118
maxAckDelay: maxAckDelay,
119119
disableActiveMigration: true,
120-
initialMaxStreamDataBidiLocal: config.streamReadBufferSize(),
121-
initialMaxStreamDataBidiRemote: config.streamReadBufferSize(),
122-
initialMaxStreamDataUni: config.streamReadBufferSize(),
120+
initialMaxStreamDataBidiLocal: config.maxStreamReadBufferSize(),
121+
initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
122+
initialMaxStreamDataUni: config.maxStreamReadBufferSize(),
123123
initialMaxStreamsBidi: c.streams.remoteLimit[bidiStream].max,
124124
initialMaxStreamsUni: c.streams.remoteLimit[uniStream].max,
125125
activeConnIDLimit: activeConnIDLimit,

internal/quic/conn_loss_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func TestLostMaxStreamDataFrame(t *testing.T) {
297297
const maxWindowSize = 10
298298
buf := make([]byte, maxWindowSize)
299299
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
300-
c.StreamReadBufferSize = maxWindowSize
300+
c.MaxStreamReadBufferSize = maxWindowSize
301301
})
302302

303303
// We send MAX_STREAM_DATA = 19.
@@ -339,7 +339,7 @@ func TestLostMaxStreamDataFrameAfterStreamFinReceived(t *testing.T) {
339339
const maxWindowSize = 10
340340
buf := make([]byte, maxWindowSize)
341341
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
342-
c.StreamReadBufferSize = maxWindowSize
342+
c.MaxStreamReadBufferSize = maxWindowSize
343343
})
344344

345345
tc.writeFrames(packetType1RTT, debugFrameStream{

internal/quic/conn_streams.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, er
7676
}
7777

7878
s := newStream(c, newStreamID(c.side, styp, num))
79-
s.outmaxbuf = c.config.streamWriteBufferSize()
79+
s.outmaxbuf = c.config.maxStreamWriteBufferSize()
8080
s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
8181
if styp == bidiStream {
82-
s.inmaxbuf = c.config.streamReadBufferSize()
83-
s.inwin = c.config.streamReadBufferSize()
82+
s.inmaxbuf = c.config.maxStreamReadBufferSize()
83+
s.inwin = c.config.maxStreamReadBufferSize()
8484
}
8585
s.inUnlock()
8686
s.outUnlock()
@@ -170,10 +170,10 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType)
170170
}
171171

172172
s = newStream(c, id)
173-
s.inmaxbuf = c.config.streamReadBufferSize()
174-
s.inwin = c.config.streamReadBufferSize()
173+
s.inmaxbuf = c.config.maxStreamReadBufferSize()
174+
s.inwin = c.config.maxStreamReadBufferSize()
175175
if id.streamType() == bidiStream {
176-
s.outmaxbuf = c.config.streamWriteBufferSize()
176+
s.outmaxbuf = c.config.maxStreamWriteBufferSize()
177177
s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
178178
}
179179
s.inUnlock()

internal/quic/conn_streams_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func TestStreamsWriteQueueFairness(t *testing.T) {
207207
p.initialMaxData = 1<<62 - 1
208208
p.initialMaxStreamDataBidiRemote = dataLen
209209
}, func(c *Config) {
210-
c.StreamWriteBufferSize = dataLen
210+
c.MaxStreamWriteBufferSize = dataLen
211211
})
212212
tc.handshake()
213213
tc.ignoreFrame(frameTypeAck)

internal/quic/crypto_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (s *cryptoStream) ackOrLoss(start, end int64, fate packetFate) {
118118
// copy the data it wants into position.
119119
func (s *cryptoStream) dataToSend(pto bool, f func(off, size int64) (sent int64)) {
120120
for {
121-
off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
121+
off, size := dataToSend(s.out.start, s.out.end, s.outunsent, s.outacked, pto)
122122
if size == 0 {
123123
return
124124
}

internal/quic/stream.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -202,14 +202,7 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
202202
// We exit the loop after writing all data, so on subsequent passes through
203203
// the loop we are always write blocked.
204204
if len(b) > 0 && !canWrite {
205-
// We're blocked, either by flow control or by our own buffer limit.
206-
// We either need the peer to extend our flow control window,
207-
// or ack some of our outstanding packets.
208-
if s.out.end == s.outwin {
209-
// We're blocked by flow control.
210-
// Send a STREAM_DATA_BLOCKED frame to let the peer know.
211-
s.outblocked.setUnsent()
212-
}
205+
// Our send buffer is full. Wait for the peer to ack some data.
213206
s.outUnlock()
214207
if err := s.outgate.waitAndLock(ctx, s.conn.testHooks); err != nil {
215208
return n, err
@@ -233,18 +226,24 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
233226
if len(b) == 0 {
234227
break
235228
}
236-
s.outblocked.clear()
237-
// Write limit is min(our own buffer limit, the peer-provided flow control window).
229+
// Write limit is our send buffer limit.
238230
// This is a stream offset.
239-
lim := min(s.out.start+s.outmaxbuf, s.outwin)
231+
lim := s.out.start + s.outmaxbuf
240232
// Amount to write is min(the full buffer, data up to the write limit).
241233
// This is a number of bytes.
242234
nn := min(int64(len(b)), lim-s.out.end)
243235
// Copy the data into the output buffer and mark it as unsent.
244-
s.outunsent.add(s.out.end, s.out.end+nn)
236+
if s.out.end <= s.outwin {
237+
s.outunsent.add(s.out.end, min(s.out.end+nn, s.outwin))
238+
}
245239
s.out.writeAt(b[:nn], s.out.end)
246240
b = b[nn:]
247241
n += int(nn)
242+
if s.out.end > s.outwin {
243+
// We're blocked by flow control.
244+
// Send a STREAM_DATA_BLOCKED frame to let the peer know.
245+
s.outblocked.set()
246+
}
248247
// If we have bytes left to send, we're blocked.
249248
canWrite = false
250249
}
@@ -425,8 +424,8 @@ func (s *Stream) outUnlockNoQueue() streamState {
425424
}
426425
}
427426
}
428-
lim := min(s.out.start+s.outmaxbuf, s.outwin)
429-
canWrite := lim > s.out.end || // available flow control
427+
lim := s.out.start + s.outmaxbuf
428+
canWrite := lim > s.out.end || // available send buffer
430429
s.outclosed.isSet() || // closed locally
431430
s.outreset.isSet() // reset locally
432431
defer s.outgate.unlock(canWrite)
@@ -533,7 +532,19 @@ func (s *Stream) handleStopSending(code uint64) error {
533532
func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
534533
s.outgate.lock()
535534
defer s.outUnlock()
536-
s.outwin = max(maxStreamData, s.outwin)
535+
if maxStreamData <= s.outwin {
536+
return nil
537+
}
538+
if s.out.end > s.outwin {
539+
s.outunsent.add(s.outwin, min(maxStreamData, s.out.end))
540+
}
541+
s.outwin = maxStreamData
542+
if s.out.end > s.outwin {
543+
// We've still got more data than flow control window.
544+
s.outblocked.setUnsent()
545+
} else {
546+
s.outblocked.clear()
547+
}
537548
return nil
538549
}
539550

@@ -635,7 +646,7 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
635646
if s.outreset.isSet() {
636647
// RESET_STREAM
637648
if s.outreset.shouldSendPTO(pto) {
638-
if !w.appendResetStreamFrame(s.id, s.outresetcode, s.out.end) {
649+
if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) {
639650
return false
640651
}
641652
s.outreset.setSent(pnum)
@@ -645,15 +656,15 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
645656
}
646657
if s.outblocked.shouldSendPTO(pto) {
647658
// STREAM_DATA_BLOCKED
648-
if !w.appendStreamDataBlockedFrame(s.id, s.out.end) {
659+
if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
649660
return false
650661
}
651662
s.outblocked.setSent(pnum)
652663
s.frameOpensStream(pnum)
653664
}
654-
// STREAM
655665
for {
656-
off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
666+
// STREAM
667+
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
657668
fin := s.outclosed.isSet() && off+size == s.out.end
658669
shouldSend := size > 0 || // have data to send
659670
s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -691,7 +702,7 @@ func (s *Stream) frameOpensStream(pnum packetNumber) {
691702
}
692703

693704
// dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
694-
func dataToSend(out pipe, outunsent, outacked rangeset[int64], pto bool) (start, size int64) {
705+
func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
695706
switch {
696707
case pto:
697708
// On PTO, resend unacked data that fits in the probe packet.
@@ -702,14 +713,14 @@ func dataToSend(out pipe, outunsent, outacked rangeset[int64], pto bool) (start,
702713
// This may miss unacked data starting after that acked byte,
703714
// but avoids resending data the peer has acked.
704715
for _, r := range outacked {
705-
if r.start > out.start {
706-
return out.start, r.start - out.start
716+
if r.start > start {
717+
return start, r.start - start
707718
}
708719
}
709-
return out.start, out.end - out.start
720+
return start, end - start
710721
case outunsent.numRanges() > 0:
711722
return outunsent.min(), outunsent[0].size()
712723
default:
713-
return out.end, 0
724+
return end, 0
714725
}
715726
}

0 commit comments

Comments
 (0)