Skip to content

Commit 47caaff

Browse files
committed
quic: send and receive UDP datagrams
Add the Listener type, which manages a UDP socket. For golang/go#58547 Change-Id: Ia23a8b726ef46f8f84c9e052aa4dfc10eab034d6 Reviewed-on: https://go-review.googlesource.com/c/net/+/527758 Reviewed-by: Jonathan Amsterdam <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]>
1 parent 02eb0f3 commit 47caaff

File tree

8 files changed

+532
-91
lines changed

8 files changed

+532
-91
lines changed

internal/quic/conn.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ import (
2020
// Multiple goroutines may invoke methods on a Conn simultaneously.
2121
type Conn struct {
2222
side connSide
23-
listener connListener
23+
listener *Listener
2424
config *Config
2525
testHooks connTestHooks
2626
peerAddr netip.AddrPort
2727

2828
msgc chan any
2929
donec chan struct{} // closed when conn loop exits
30+
readyc chan struct{} // closed when TLS handshake completes
3031
exited bool // set to make the conn loop exit immediately
3132

3233
w packetWriter
@@ -61,28 +62,24 @@ type Conn struct {
6162
testSendPing sentVal
6263
}
6364

64-
// The connListener is the Conn's Listener.
65-
// Defined as an interface so we can swap it out in tests.
66-
type connListener interface {
67-
sendDatagram(p []byte, addr netip.AddrPort) error
68-
}
69-
7065
// connTestHooks override conn behavior in tests.
7166
type connTestHooks interface {
7267
nextMessage(msgc chan any, nextTimeout time.Time) (now time.Time, message any)
7368
handleTLSEvent(tls.QUICEvent)
7469
newConnID(seq int64) ([]byte, error)
7570
waitUntil(ctx context.Context, until func() bool) error
71+
timeNow() time.Time
7672
}
7773

78-
func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.AddrPort, config *Config, l connListener, hooks connTestHooks) (*Conn, error) {
74+
func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.AddrPort, config *Config, l *Listener, hooks connTestHooks) (*Conn, error) {
7975
c := &Conn{
8076
side: side,
8177
listener: l,
8278
config: config,
8379
peerAddr: peerAddr,
8480
msgc: make(chan any, 1),
8581
donec: make(chan struct{}),
82+
readyc: make(chan struct{}),
8683
testHooks: hooks,
8784
maxIdleTimeout: defaultMaxIdleTimeout,
8885
idleTimeout: now.Add(defaultMaxIdleTimeout),
@@ -94,12 +91,12 @@ func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.
9491
c.msgc = make(chan any, 1)
9592

9693
if c.side == clientSide {
97-
if err := c.connIDState.initClient(c.newConnIDFunc()); err != nil {
94+
if err := c.connIDState.initClient(c); err != nil {
9895
return nil, err
9996
}
10097
initialConnID, _ = c.connIDState.dstConnID()
10198
} else {
102-
if err := c.connIDState.initServer(c.newConnIDFunc(), initialConnID); err != nil {
99+
if err := c.connIDState.initServer(c, initialConnID); err != nil {
103100
return nil, err
104101
}
105102
}
@@ -134,6 +131,14 @@ func (c *Conn) String() string {
134131
return fmt.Sprintf("quic.Conn(%v,->%v)", c.side, c.peerAddr)
135132
}
136133

134+
func (c *Conn) Close() error {
135+
// TODO: Implement shutdown for real.
136+
c.runOnLoop(func(now time.Time, c *Conn) {
137+
c.exited = true
138+
})
139+
return nil
140+
}
141+
137142
// confirmHandshake is called when the handshake is confirmed.
138143
// https://www.rfc-editor.org/rfc/rfc9001#section-4.1.2
139144
func (c *Conn) confirmHandshake(now time.Time) {
@@ -147,6 +152,7 @@ func (c *Conn) confirmHandshake(now time.Time) {
147152
if c.side == serverSide {
148153
// When the server confirms the handshake, it sends a HANDSHAKE_DONE.
149154
c.handshakeConfirmed.setUnsent()
155+
c.listener.serverConnEstablished(c)
150156
} else {
151157
// The client never sends a HANDSHAKE_DONE, so we set handshakeConfirmed
152158
// to the received state, indicating that the handshake is confirmed and we
@@ -177,7 +183,7 @@ func (c *Conn) receiveTransportParameters(p transportParameters) error {
177183
c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
178184
c.peerAckDelayExponent = p.ackDelayExponent
179185
c.loss.setMaxAckDelay(p.maxAckDelay)
180-
if err := c.connIDState.setPeerActiveConnIDLimit(p.activeConnIDLimit, c.newConnIDFunc()); err != nil {
186+
if err := c.connIDState.setPeerActiveConnIDLimit(c, p.activeConnIDLimit); err != nil {
181187
return err
182188
}
183189
if p.preferredAddrConnID != nil {
@@ -211,6 +217,7 @@ type (
211217
func (c *Conn) loop(now time.Time) {
212218
defer close(c.donec)
213219
defer c.tls.Close()
220+
defer c.listener.connDrained(c)
214221

215222
// The connection timer sends a message to the connection loop on expiry.
216223
// We need to give it an expiry when creating it, so set the initial timeout to
@@ -371,10 +378,3 @@ func firstTime(a, b time.Time) time.Time {
371378
return b
372379
}
373380
}
374-
375-
func (c *Conn) newConnIDFunc() newConnIDFunc {
376-
if c.testHooks != nil {
377-
return c.testHooks.newConnID
378-
}
379-
return newRandomConnID
380-
}

internal/quic/conn_id.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ type connID struct {
5555
send sentVal
5656
}
5757

58-
func (s *connIDState) initClient(newID newConnIDFunc) error {
58+
func (s *connIDState) initClient(c *Conn) error {
5959
// Client chooses its initial connection ID, and sends it
6060
// in the Source Connection ID field of the first Initial packet.
61-
locid, err := newID(0)
61+
locid, err := c.newConnID(0)
6262
if err != nil {
6363
return err
6464
}
@@ -70,18 +70,20 @@ func (s *connIDState) initClient(newID newConnIDFunc) error {
7070

7171
// Client chooses an initial, transient connection ID for the server,
7272
// and sends it in the Destination Connection ID field of the first Initial packet.
73-
remid, err := newID(-1)
73+
remid, err := c.newConnID(-1)
7474
if err != nil {
7575
return err
7676
}
7777
s.remote = append(s.remote, connID{
7878
seq: -1,
7979
cid: remid,
8080
})
81+
const retired = false
82+
c.listener.connIDsChanged(c, retired, s.local[:])
8183
return nil
8284
}
8385

84-
func (s *connIDState) initServer(newID newConnIDFunc, dstConnID []byte) error {
86+
func (s *connIDState) initServer(c *Conn, dstConnID []byte) error {
8587
// Client-chosen, transient connection ID received in the first Initial packet.
8688
// The server will not use this as the Source Connection ID of packets it sends,
8789
// but remembers it because it may receive packets sent to this destination.
@@ -92,7 +94,7 @@ func (s *connIDState) initServer(newID newConnIDFunc, dstConnID []byte) error {
9294

9395
// Server chooses a connection ID, and sends it in the Source Connection ID of
9496
// the response to the clent.
95-
locid, err := newID(0)
97+
locid, err := c.newConnID(0)
9698
if err != nil {
9799
return err
98100
}
@@ -101,6 +103,8 @@ func (s *connIDState) initServer(newID newConnIDFunc, dstConnID []byte) error {
101103
cid: locid,
102104
})
103105
s.nextLocalSeq = 1
106+
const retired = false
107+
c.listener.connIDsChanged(c, retired, s.local[:])
104108
return nil
105109
}
106110

@@ -125,20 +129,21 @@ func (s *connIDState) dstConnID() (cid []byte, ok bool) {
125129

126130
// setPeerActiveConnIDLimit sets the active_connection_id_limit
127131
// transport parameter received from the peer.
128-
func (s *connIDState) setPeerActiveConnIDLimit(lim int64, newID newConnIDFunc) error {
132+
func (s *connIDState) setPeerActiveConnIDLimit(c *Conn, lim int64) error {
129133
s.peerActiveConnIDLimit = lim
130-
return s.issueLocalIDs(newID)
134+
return s.issueLocalIDs(c)
131135
}
132136

133-
func (s *connIDState) issueLocalIDs(newID newConnIDFunc) error {
137+
func (s *connIDState) issueLocalIDs(c *Conn) error {
134138
toIssue := min(int(s.peerActiveConnIDLimit), maxPeerActiveConnIDLimit)
135139
for i := range s.local {
136140
if s.local[i].seq != -1 && !s.local[i].retired {
137141
toIssue--
138142
}
139143
}
144+
prev := len(s.local)
140145
for toIssue > 0 {
141-
cid, err := newID(s.nextLocalSeq)
146+
cid, err := c.newConnID(s.nextLocalSeq)
142147
if err != nil {
143148
return err
144149
}
@@ -151,14 +156,16 @@ func (s *connIDState) issueLocalIDs(newID newConnIDFunc) error {
151156
s.needSend = true
152157
toIssue--
153158
}
159+
const retired = false
160+
c.listener.connIDsChanged(c, retired, s.local[prev:])
154161
return nil
155162
}
156163

157164
// handlePacket updates the connection ID state during the handshake
158165
// (Initial and Handshake packets).
159-
func (s *connIDState) handlePacket(side connSide, ptype packetType, srcConnID []byte) {
166+
func (s *connIDState) handlePacket(c *Conn, ptype packetType, srcConnID []byte) {
160167
switch {
161-
case ptype == packetTypeInitial && side == clientSide:
168+
case ptype == packetTypeInitial && c.side == clientSide:
162169
if len(s.remote) == 1 && s.remote[0].seq == -1 {
163170
// We're a client connection processing the first Initial packet
164171
// from the server. Replace the transient remote connection ID
@@ -168,7 +175,7 @@ func (s *connIDState) handlePacket(side connSide, ptype packetType, srcConnID []
168175
cid: cloneBytes(srcConnID),
169176
}
170177
}
171-
case ptype == packetTypeInitial && side == serverSide:
178+
case ptype == packetTypeInitial && c.side == serverSide:
172179
if len(s.remote) == 0 {
173180
// We're a server connection processing the first Initial packet
174181
// from the client. Set the client's connection ID.
@@ -177,11 +184,13 @@ func (s *connIDState) handlePacket(side connSide, ptype packetType, srcConnID []
177184
cid: cloneBytes(srcConnID),
178185
})
179186
}
180-
case ptype == packetTypeHandshake && side == serverSide:
187+
case ptype == packetTypeHandshake && c.side == serverSide:
181188
if len(s.local) > 0 && s.local[0].seq == -1 {
182189
// We're a server connection processing the first Handshake packet from
183190
// the client. Discard the transient, client-chosen connection ID used
184191
// for Initial packets; the client will never send it again.
192+
const retired = true
193+
c.listener.connIDsChanged(c, retired, s.local[0:1])
185194
s.local = append(s.local[:0], s.local[1:]...)
186195
}
187196
}
@@ -263,17 +272,19 @@ func (s *connIDState) retireRemote(rcid *connID) {
263272
s.needSend = true
264273
}
265274

266-
func (s *connIDState) handleRetireConnID(seq int64, newID newConnIDFunc) error {
275+
func (s *connIDState) handleRetireConnID(c *Conn, seq int64) error {
267276
if seq >= s.nextLocalSeq {
268277
return localTransportError(errProtocolViolation)
269278
}
270279
for i := range s.local {
271280
if s.local[i].seq == seq {
281+
const retired = true
282+
c.listener.connIDsChanged(c, retired, s.local[i:i+1])
272283
s.local = append(s.local[:i], s.local[i+1:]...)
273284
break
274285
}
275286
}
276-
s.issueLocalIDs(newID)
287+
s.issueLocalIDs(c)
277288
return nil
278289
}
279290

@@ -355,7 +366,12 @@ func cloneBytes(b []byte) []byte {
355366
return n
356367
}
357368

358-
type newConnIDFunc func(seq int64) ([]byte, error)
369+
func (c *Conn) newConnID(seq int64) ([]byte, error) {
370+
if c.testHooks != nil {
371+
return c.testHooks.newConnID(seq)
372+
}
373+
return newRandomConnID(seq)
374+
}
359375

360376
func newRandomConnID(_ int64) ([]byte, error) {
361377
// It is not necessary for connection IDs to be cryptographically secure,

0 commit comments

Comments
 (0)