Skip to content

Commit 184cf7d

Browse files
committed
GODRIVER-2038 Make all connections in the background using connectTimeoutMS.
1 parent 33fac98 commit 184cf7d

File tree

15 files changed

+1180
-1685
lines changed

15 files changed

+1180
-1685
lines changed

data/connection-monitoring-and-pooling/wait-queue-timeout.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
55
"poolOptions": {
66
"maxPoolSize": 1,
7-
"waitQueueTimeoutMS": 20
7+
"waitQueueTimeoutMS": 50
88
},
99
"operations": [
1010
{

data/connection-monitoring-and-pooling/wait-queue-timeout.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ style: unit
33
description: must aggressively timeout threads enqueued longer than waitQueueTimeoutMS
44
poolOptions:
55
maxPoolSize: 1
6-
waitQueueTimeoutMS: 20
6+
waitQueueTimeoutMS: 50
77
operations:
88
# Check out only possible connection
99
- name: checkOut

mongo/integration/sdam_error_handling_test.go

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -47,46 +47,7 @@ func TestSDAMErrorHandling(t *testing.T) {
4747
// blockConnection and appName.
4848
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
4949
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
50-
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
51-
// Assert that the pool is not cleared when a connection created by an application
52-
// operation thread encounters an operation timeout during handshaking. Unlike the
53-
// non-timeout test below, we only test connections created in the foreground for
54-
// timeouts because connections created by the pool maintenance routine can't be
55-
// timed out using a context.
56-
57-
appName := "authOperationTimeoutTest"
58-
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
59-
// speculative auth.
60-
mt.SetFailPoint(mtest.FailPoint{
61-
ConfigureFailPoint: "failCommand",
62-
Mode: mtest.FailPointMode{
63-
Times: 1,
64-
},
65-
Data: mtest.FailPointData{
66-
FailCommands: []string{"saslContinue"},
67-
BlockConnection: true,
68-
BlockTimeMS: 150,
69-
AppName: appName,
70-
},
71-
})
72-
73-
// Reset the client with the appName specified in the failpoint and the pool monitor.
74-
tpm := newTestPoolMonitor()
75-
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
76-
77-
// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
78-
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
79-
// like connectTimeoutMS or socketTimeoutMS).
80-
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
81-
defer cancel()
82-
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
83-
assert.NotNil(mt, err, "expected InsertOne error, got nil")
84-
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
85-
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
86-
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
87-
})
88-
89-
mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
50+
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
9051
// Assert that the pool is cleared when a connection created by an application
9152
// operation thread encounters a timeout caused by connectTimeoutMS during
9253
// handshaking.

x/mongo/driver/topology/CMAP_spec_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ func runCMAPTest(t *testing.T, testFileName string) {
119119
s.connectionstate = connected
120120
err = s.pool.connect()
121121
testHelpers.RequireNil(t, err, "error connecting connection pool: %v", err)
122+
defer func() {
123+
_ = s.pool.disconnect(context.Background())
124+
}()
122125

123126
for _, op := range test.Operations {
124127
if tempErr := runOperation(t, op, testInfo, s, test.PoolOptions.WaitQueueTimeoutMS); tempErr != nil {

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ func TestCMAPProse(t *testing.T) {
5252
assert.Equal(t, numClosed, len(closed), "expected %d closed events, got %d", numClosed, len(closed))
5353

5454
netCount := numCreated - numClosed
55-
assert.Equal(t, netCount, len(p.opened), "expected %d connections in opened map, got %d", netCount,
56-
len(p.opened))
55+
assert.Equal(t, netCount, p.totalConnectionCount(), "expected %d connections in opened map, got %d", netCount,
56+
p.totalConnectionCount())
5757
}
5858

59-
t.Run("get", func(t *testing.T) {
59+
t.Run("checkOut", func(t *testing.T) {
6060
t.Run("errored connection exists in pool", func(t *testing.T) {
61-
// If a connection is created as part of minPoolSize maintenance and errors while connecting, get()
61+
// If a connection is created as part of minPoolSize maintenance and errors while connecting, checkOut()
6262
// should report that error and publish an event.
6363
clearEvents()
6464

@@ -76,20 +76,19 @@ func TestCMAPProse(t *testing.T) {
7676
}
7777
pool := createTestPool(t, cfg, connOpts...)
7878

79-
_, err := pool.get(context.Background())
80-
assert.NotNil(t, err, "expected get() error, got nil")
79+
_, err := pool.checkOut(context.Background())
80+
assert.NotNil(t, err, "expected checkOut() error, got nil")
8181

82-
// If the connection doesn't finish connecting before resourcePool gives it back, the error will be
83-
// detected by pool.get and result in a created/closed count of 1. If it does finish connecting, the
84-
// error will be detected by resourcePool, which will return nil. Then, pool will try to create a new
85-
// connection, which will also error. This process will result in a created/closed count of 2.
8682
assert.True(t, len(created) == 1 || len(created) == 2, "expected 1 or 2 opened events, got %d", len(created))
8783
assert.True(t, len(closed) == 1 || len(closed) == 2, "expected 1 or 2 closed events, got %d", len(closed))
84+
85+
_ = pool.disconnect(context.Background())
8886
netCount := len(created) - len(closed)
8987
assert.Equal(t, 0, netCount, "expected net connection count to be 0, got %d", netCount)
9088
})
9189
t.Run("pool is empty", func(t *testing.T) {
92-
// If a new connection is created during get(), get() should report that error and publish an event.
90+
// If a checkOut() has to create a new connection and that connection encounters an
91+
// error while connecting, checkOut() should return that error and publish an event.
9392
clearEvents()
9493

9594
var dialer DialerFunc = func(context.Context, string, string) (net.Conn, error) {
@@ -103,13 +102,16 @@ func TestCMAPProse(t *testing.T) {
103102
}),
104103
}
105104
pool := createTestPool(t, getConfig(), connOpts...)
105+
defer func() {
106+
_ = pool.disconnect(context.Background())
107+
}()
106108

107-
_, err := pool.get(context.Background())
108-
assert.NotNil(t, err, "expected get() error, got nil")
109+
_, err := pool.checkOut(context.Background())
110+
assert.NotNil(t, err, "expected checkOut() error, got nil")
109111
assertConnectionCounts(t, pool, 1, 1)
110112
})
111113
})
112-
t.Run("put", func(t *testing.T) {
114+
t.Run("checkIn", func(t *testing.T) {
113115
t.Run("errored connection", func(t *testing.T) {
114116
// If the connection being returned to the pool encountered a network error, it should be removed from
115117
// the pool and an event should be published.
@@ -124,16 +126,19 @@ func TestCMAPProse(t *testing.T) {
124126
WithDialer(func(Dialer) Dialer { return dialer }),
125127
}
126128
pool := createTestPool(t, getConfig(), connOpts...)
129+
defer func() {
130+
_ = pool.disconnect(context.Background())
131+
}()
127132

128-
conn, err := pool.get(context.Background())
129-
assert.Nil(t, err, "get error: %v", err)
133+
conn, err := pool.checkOut(context.Background())
134+
assert.Nil(t, err, "checkOut() error: %v", err)
130135

131136
// Force a network error by writing to the connection.
132137
err = conn.writeWireMessage(context.Background(), nil)
133138
assert.NotNil(t, err, "expected writeWireMessage error, got nil")
134139

135-
err = pool.put(conn)
136-
assert.Nil(t, err, "put error: %v", err)
140+
err = pool.checkIn(conn)
141+
assert.Nil(t, err, "checkIn() error: %v", err)
137142

138143
assertConnectionCounts(t, pool, 1, 1)
139144
evt := <-closed
@@ -157,16 +162,19 @@ func TestCMAPProse(t *testing.T) {
157162
WithIdleTimeout(func(time.Duration) time.Duration { return 1 * time.Second }),
158163
}
159164
pool := createTestPool(t, getConfig(), connOpts...)
165+
defer func() {
166+
_ = pool.disconnect(context.Background())
167+
}()
160168

161-
conn, err := pool.get(context.Background())
162-
assert.Nil(t, err, "get error: %v", err)
169+
conn, err := pool.checkOut(context.Background())
170+
assert.Nil(t, err, "checkOut() error: %v", err)
163171

164172
// Set the idleDeadline to a time in the past to simulate expiration.
165173
pastTime := time.Now().Add(-10 * time.Second)
166174
conn.idleDeadline.Store(pastTime)
167175

168-
err = pool.put(conn)
169-
assert.Nil(t, err, "put error: %v", err)
176+
err = pool.checkIn(conn)
177+
assert.Nil(t, err, "checkIn() error: %v", err)
170178

171179
assertConnectionCounts(t, pool, 1, 1)
172180
evt := <-closed
@@ -189,10 +197,10 @@ func TestCMAPProse(t *testing.T) {
189197
conns := checkoutConnections(t, pool, numConns)
190198
assertConnectionCounts(t, pool, numConns, 0)
191199

192-
// Return all connections to the pool and assert that none were closed by put().
200+
// Return all connections to the pool and assert that none were closed by checkIn().
193201
for i, c := range conns {
194-
err := pool.put(c)
195-
assert.Nil(t, err, "put error at index %d: %v", i, err)
202+
err := pool.checkIn(c)
203+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
196204
}
197205
assertConnectionCounts(t, pool, numConns, 0)
198206

@@ -223,8 +231,8 @@ func TestCMAPProse(t *testing.T) {
223231

224232
// Only return 2 of the connection.
225233
for i := 0; i < 2; i++ {
226-
err := pool.put(conns[i])
227-
assert.Nil(t, err, "put error at index %d: %v", i, err)
234+
err := pool.checkIn(conns[i])
235+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
228236
}
229237
conns = conns[2:]
230238
assertConnectionCounts(t, pool, numConns, 0)
@@ -237,8 +245,8 @@ func TestCMAPProse(t *testing.T) {
237245
// Return the remaining connections and assert that the closed event count does not increase because
238246
// these connections have already been closed.
239247
for i, c := range conns {
240-
err := pool.put(c)
241-
assert.Nil(t, err, "put error at index %d: %v", i, err)
248+
err := pool.checkIn(c)
249+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
242250
}
243251
assertConnectionCounts(t, pool, numConns, numConns)
244252

@@ -269,8 +277,8 @@ func checkoutConnections(t *testing.T, p *pool, numConns int) []*connection {
269277
conns := make([]*connection, 0, numConns)
270278

271279
for i := 0; i < numConns; i++ {
272-
conn, err := p.get(context.Background())
273-
assert.Nil(t, err, "get error at index %d: %v", i, err)
280+
conn, err := p.checkOut(context.Background())
281+
assert.Nil(t, err, "checkOut() error at index %d: %v", i, err)
274282
conns = append(conns, conn)
275283
}
276284

x/mongo/driver/topology/connection.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21-
"go.mongodb.org/mongo-driver/event"
2221
"go.mongodb.org/mongo-driver/internal"
2322
"go.mongodb.org/mongo-driver/mongo/address"
2423
"go.mongodb.org/mongo-driver/mongo/description"
@@ -46,7 +45,6 @@ type connection struct {
4645
idleDeadline atomic.Value // Stores a time.Time
4746
readTimeout time.Duration
4847
writeTimeout time.Duration
49-
descMu sync.RWMutex // Guards desc. TODO: Remove with or after GODRIVER-2038.
5048
desc description.Server
5149
helloRTT time.Duration
5250
compressor wiremessage.CompressorID
@@ -64,11 +62,9 @@ type connection struct {
6462
cancellationListener cancellationListener
6563

6664
// pool related fields
67-
pool *pool
68-
poolID uint64
69-
generation uint64
70-
expireReason string
71-
poolMonitor *event.PoolMonitor
65+
pool *pool
66+
poolID uint64
67+
generation uint64
7268
}
7369

7470
// newConnection handles the creation of a connection. It does not connect the connection.
@@ -90,7 +86,6 @@ func newConnection(addr address.Address, opts ...ConnectionOption) (*connection,
9086
config: cfg,
9187
connectContextMade: make(chan struct{}),
9288
cancellationListener: internal.NewCancellationListener(),
93-
poolMonitor: cfg.poolMonitor,
9489
}
9590
// Connections to non-load balanced deployments should eagerly set the generation numbers so errors encountered
9691
// at any point during connection establishment can be processed without the connection being considered stale.
@@ -212,13 +207,6 @@ func (c *connection) connect(ctx context.Context) {
212207
// running hello and authentication is handled by a handshaker on the configuration instance.
213208
handshaker := c.config.handshaker
214209
if handshaker == nil {
215-
if c.poolMonitor != nil {
216-
c.poolMonitor.Event(&event.PoolEvent{
217-
Type: event.ConnectionReady,
218-
Address: c.addr.String(),
219-
ConnectionID: c.poolID,
220-
})
221-
}
222210
return
223211
}
224212

@@ -229,9 +217,7 @@ func (c *connection) connect(ctx context.Context) {
229217
if err == nil {
230218
// We only need to retain the Description field as the connection's description. The authentication-related
231219
// fields in handshakeInfo are tracked by the handshaker if necessary.
232-
c.descMu.Lock()
233220
c.desc = handshakeInfo.Description
234-
c.descMu.Unlock()
235221
c.helloRTT = time.Since(handshakeStartTime)
236222

237223
// If the application has indicated that the cluster is load balanced, ensure the server has included serviceId
@@ -287,13 +273,6 @@ func (c *connection) connect(ctx context.Context) {
287273
}
288274
}
289275
}
290-
if c.poolMonitor != nil {
291-
c.poolMonitor.Event(&event.PoolEvent{
292-
Type: event.ConnectionReady,
293-
Address: c.addr.String(),
294-
ConnectionID: c.poolID,
295-
})
296-
}
297276
}
298277

299278
func (c *connection) wait() error {
@@ -707,7 +686,7 @@ func (c *Connection) Expire() error {
707686
}
708687

709688
func (c *Connection) cleanupReferences() error {
710-
err := c.pool.put(c.connection)
689+
err := c.pool.checkIn(c.connection)
711690
if c.cleanupPoolFn != nil {
712691
c.cleanupPoolFn()
713692
c.cleanupPoolFn = nil

x/mongo/driver/topology/connection_options.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type connectionConfig struct {
4646
handshaker Handshaker
4747
idleTimeout time.Duration
4848
cmdMonitor *event.CommandMonitor
49-
poolMonitor *event.PoolMonitor
5049
readTimeout time.Duration
5150
writeTimeout time.Duration
5251
tlsConfig *tls.Config
@@ -173,14 +172,6 @@ func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) Connectio
173172
}
174173
}
175174

176-
// withPoolMonitor configures a event for connection monitoring.
177-
func withPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ConnectionOption {
178-
return func(c *connectionConfig) error {
179-
c.poolMonitor = fn(c.poolMonitor)
180-
return nil
181-
}
182-
}
183-
184175
// WithZlibLevel sets the zLib compression level.
185176
func WithZlibLevel(fn func(*int) *int) ConnectionOption {
186177
return func(c *connectionConfig) error {

0 commit comments

Comments
 (0)