Skip to content

Commit c195560

Browse files
committed
Respond to review feedback and add comments to clarify confusion points.
1 parent 9bee5cb commit c195560

File tree

5 files changed

+411
-213
lines changed

5 files changed

+411
-213
lines changed

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ func TestCMAPProse(t *testing.T) {
221221
var dialer DialerFunc = func(context.Context, string, string) (net.Conn, error) {
222222
return &testNetConn{}, nil
223223
}
224-
pool, _ := createTestPool(t, getConfig(), WithDialer(func(Dialer) Dialer { return dialer }))
224+
pool, disconnect := createTestPool(t, getConfig(), WithDialer(func(Dialer) Dialer { return dialer }))
225+
defer disconnect()
225226

226227
conns := checkoutConnections(t, pool, numConns)
227228
assertConnectionCounts(t, pool, numConns, 0)
@@ -294,14 +295,12 @@ func TestCMAPProse(t *testing.T) {
294295
func createTestPool(t *testing.T, cfg poolConfig, opts ...ConnectionOption) (*pool, func()) {
295296
t.Helper()
296297

297-
pool, err := newPool(cfg, opts...)
298-
assert.Nil(t, err, "newPool error: %v", err)
299-
err = pool.connect()
298+
pool := newPool(cfg, opts...)
299+
err := pool.connect()
300300
assert.Nil(t, err, "connect error: %v", err)
301301

302302
disconnect := func() {
303-
err := pool.disconnect(context.Background())
304-
assert.Nil(t, err, "unexpected pool.disconnect() error: %v", err)
303+
_ = pool.disconnect(context.Background())
305304
}
306305
return pool, disconnect
307306
}

x/mongo/driver/topology/connection_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -840,12 +840,10 @@ func TestConnection(t *testing.T) {
840840
t.Helper()
841841

842842
addr := bootstrapConnections(t, numConns, func(nc net.Conn) {})
843-
pool, err := newPool(poolConfig{
843+
pool := newPool(poolConfig{
844844
Address: address.Address(addr.String()),
845845
})
846-
assert.Nil(t, err, "newPool error: %v", err)
847-
848-
err = pool.connect()
846+
err := pool.connect()
849847
assert.Nil(t, err, "pool.connect() error: %v", err)
850848

851849
conns := make([]*Connection, 0, numConns)

x/mongo/driver/topology/pool.go

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type pool struct {
6262
maxConnecting uint64
6363
monitor *event.PoolMonitor
6464

65-
opts []ConnectionOption
65+
connOpts []ConnectionOption
6666
generation *poolGenerationMap
6767

6868
maintainInterval time.Duration // maintainInterval is the maintain() loop interval.
@@ -94,12 +94,10 @@ func connectionPerished(conn *connection) (string, bool) {
9494
return "", false
9595
}
9696

97-
// newPool creates a new pool. It will use the
98-
// provided options when creating connections.
99-
func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {
100-
opts := connOpts
97+
// newPool creates a new pool. It will use the provided options when creating connections.
98+
func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
10199
if config.MaxIdleTime != time.Duration(0) {
102-
opts = append(opts, WithIdleTimeout(func(_ time.Duration) time.Duration { return config.MaxIdleTime }))
100+
connOpts = append(connOpts, WithIdleTimeout(func(_ time.Duration) time.Duration { return config.MaxIdleTime }))
103101
}
104102

105103
pool := &pool{
@@ -108,29 +106,30 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) (*pool, error) {
108106
maxSize: config.MaxPoolSize,
109107
maxConnecting: 2,
110108
monitor: config.PoolMonitor,
111-
opts: opts,
109+
connOpts: connOpts,
112110
generation: newPoolGenerationMap(),
113111
connected: disconnected,
114112
maintainInterval: 10 * time.Second,
115113
connsCond: sync.NewCond(&sync.Mutex{}),
116114
conns: make(map[uint64]*connection, config.MaxPoolSize),
117115
idleConns: make([]*connection, 0, config.MaxPoolSize),
118116
}
119-
pool.opts = append(pool.opts, withGenerationNumberFn(func(_ generationNumberFn) generationNumberFn { return pool.getGenerationForNewConnection }))
117+
pool.connOpts = append(pool.connOpts, withGenerationNumberFn(func(_ generationNumberFn) generationNumberFn { return pool.getGenerationForNewConnection }))
120118

121119
if pool.monitor != nil {
122120
pool.monitor.Event(&event.PoolEvent{
123121
Type: event.PoolCreated,
124122
PoolOptions: &event.MonitorPoolOptions{
125-
MaxPoolSize: config.MaxPoolSize,
126-
MinPoolSize: config.MinPoolSize,
123+
MaxPoolSize: config.MaxPoolSize,
124+
MinPoolSize: config.MinPoolSize,
125+
// TODO: We don't use this value, should we publish it?
127126
WaitQueueTimeoutMS: uint64(config.MaxIdleTime) / uint64(time.Millisecond),
128127
},
129128
Address: pool.address.String(),
130129
})
131130
}
132131

133-
return pool, nil
132+
return pool
134133
}
135134

136135
// stale checks if a given connection's generation is below the generation of the pool
@@ -176,7 +175,10 @@ func (p *pool) disconnect(ctx context.Context) error {
176175
}
177176

178177
// Call cancelBackground() to exit the maintain() background goroutine and broadcast to the
179-
// connsCond to wake up all createConnections() goroutines.
178+
// connsCond to wake up all createConnections() goroutines. We must hold the connsCond lock here
179+
// because we're changing the condition by cancelling the "background goroutine" Context, even
180+
// tho cancelling the Context is also synchronized by a lock. Otherwise, we run into an
181+
// intermittent bug that prevents the createConnections() goroutines from exiting.
180182
p.connsCond.L.Lock()
181183
p.cancelBackground()
182184
p.connsCond.L.Unlock()
@@ -256,7 +258,6 @@ func (p *pool) disconnect(ctx context.Context) error {
256258
w.tryDeliver(nil, ErrPoolDisconnected)
257259
}
258260

259-
// Mark the pool state as disconnected.
260261
atomic.StoreInt64(&p.connected, disconnected)
261262

262263
if p.monitor != nil {
@@ -290,6 +291,7 @@ func (p *pool) unpinConnectionFromTransaction() {
290291
// checkOut checks out a connection from the pool. If an idle connection is not available, the
291292
// checkOut enters a queue waiting for either the next idle or new connection. If the pool is
292293
// disconnected, checkOut returns an error.
294+
// Based partially on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1324
293295
func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
294296
if atomic.LoadInt64(&p.connected) != connected {
295297
if p.monitor != nil {
@@ -299,21 +301,27 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
299301
Reason: event.ReasonPoolClosed,
300302
})
301303
}
302-
err = ErrPoolDisconnected
303-
return
304+
return nil, ErrPoolDisconnected
304305
}
305306

306307
if ctx == nil {
307308
ctx = context.Background()
308309
}
309310

311+
// Create a wantConn, which we will use to request an existing idle or new connection. Always
312+
// cancel the wantConn if checkOut() returned an error to make sure any delivered connections
313+
// are returned to the pool (e.g. if a connection was delivered immediately after the Context
314+
// timed out).
310315
w := newWantConn()
311316
defer func() {
312317
if err != nil {
313318
w.cancel(p, err)
314319
}
315320
}()
316321

322+
// Get in the queue for an idle connection. If queueForIdleConn returns true, it was able to
323+
// immediately deliver an idle connection to the wantConn, so we can return the connection or
324+
// error from the wantConn without waiting for "ready".
317325
if delivered := p.queueForIdleConn(w); delivered {
318326
if w.err != nil {
319327
if p.monitor != nil {
@@ -323,8 +331,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
323331
Reason: event.ReasonConnectionErrored,
324332
})
325333
}
326-
err = w.err
327-
return
334+
return nil, w.err
328335
}
329336

330337
if p.monitor != nil {
@@ -334,12 +341,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
334341
ConnectionID: w.conn.poolID,
335342
})
336343
}
337-
conn = w.conn
338-
return
344+
return w.conn, nil
339345
}
340346

347+
// If we didn't get an immediately available idle connection, also get in the queue for a new
348+
// connection while we're waiting for an idle connection.
341349
p.queueForNewConn(w)
342350

351+
// Wait for either the wantConn to be ready or for the Context to time out.
343352
select {
344353
case <-w.ready:
345354
if w.err != nil {
@@ -350,8 +359,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
350359
Reason: event.ReasonConnectionErrored,
351360
})
352361
}
353-
err = w.err
354-
return
362+
return nil, w.err
355363
}
356364

357365
if p.monitor != nil {
@@ -361,8 +369,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
361369
ConnectionID: w.conn.poolID,
362370
})
363371
}
364-
conn = w.conn
365-
return
372+
return w.conn, nil
366373
case <-ctx.Done():
367374
if p.monitor != nil {
368375
p.monitor.Event(&event.PoolEvent{
@@ -371,14 +378,13 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
371378
Reason: event.ReasonTimedOut,
372379
})
373380
}
374-
err = WaitQueueTimeoutError{
381+
return nil, WaitQueueTimeoutError{
375382
Wrapped: ctx.Err(),
376383
PinnedCursorConnections: atomic.LoadUint64(&p.pinnedCursorConnections),
377384
PinnedTransactionConnections: atomic.LoadUint64(&p.pinnedTransactionConnections),
378385
maxPoolSize: p.maxSize,
379386
totalConnectionCount: p.totalConnectionCount(),
380387
}
381-
return
382388
}
383389
}
384390

@@ -407,6 +413,10 @@ func (p *pool) getGenerationForNewConnection(serviceID *primitive.ObjectID) uint
407413

408414
// removeConnection removes a connection from the pool and emits a "ConnectionClosed" event.
409415
func (p *pool) removeConnection(conn *connection, reason string) error {
416+
if conn == nil {
417+
return nil
418+
}
419+
410420
if conn.pool != p {
411421
return ErrWrongPool
412422
}
@@ -614,7 +624,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
614624
return nil, nil, false
615625
}
616626

617-
conn, err := newConnection(p.address, p.opts...)
627+
conn, err := newConnection(p.address, p.connOpts...)
618628
if err != nil {
619629
w.tryDeliver(nil, err)
620630
return nil, nil, false
@@ -779,7 +789,7 @@ func compact(arr []*connection) []*connection {
779789
// The conn may be gotten by creating a new connection or by finding an idle connection, or a
780790
// cancellation may make the conn no longer wanted. These three options are racing against each
781791
// other and use wantConn to coordinate and agree about the winning outcome.
782-
// Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1174-1240;bpv=0;bpt=1
792+
// Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1174-1240
783793
type wantConn struct {
784794
ready chan struct{}
785795

@@ -841,7 +851,7 @@ func (w *wantConn) cancel(p *pool, err error) {
841851
}
842852

843853
// A wantConnQueue is a queue of wantConns.
844-
// Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1242-1306;bpv=0;bpt=1
854+
// Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1242-1306
845855
type wantConnQueue struct {
846856
// This is a queue, not a deque.
847857
// It is split into two stages - head[headPos:] and tail.

0 commit comments

Comments
 (0)