Skip to content

Commit 2def9ae

Browse files
committed
GODRIVER-2038 Make all connections in the background using connectTimeoutMS.
1 parent 21d5588 commit 2def9ae

File tree

12 files changed

+1471
-1699
lines changed

12 files changed

+1471
-1699
lines changed

mongo/integration/client_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package integration
88

99
import (
10+
"context"
1011
"fmt"
1112
"os"
1213
"reflect"
@@ -26,6 +27,7 @@ import (
2627
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2728
"go.mongodb.org/mongo-driver/x/mongo/driver"
2829
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
30+
"golang.org/x/sync/errgroup"
2931
)
3032

3133
var noClientOpts = mtest.NewOptions().CreateClient(false)
@@ -463,3 +465,78 @@ type proxyMessage struct {
463465
sent wiremessage.WireMessage
464466
received wiremessage.WireMessage
465467
}
468+
469+
func TestClientStress(t *testing.T) {
470+
mtOpts := mtest.NewOptions().
471+
MinServerVersion("3.6").
472+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
473+
CreateClient(false)
474+
mt := mtest.New(t, mtOpts)
475+
476+
maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
477+
mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
478+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
479+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
480+
481+
findOneFor := func(d time.Duration) []error {
482+
timeout := time.After(d)
483+
errs := make([]error, 0)
484+
485+
for {
486+
select {
487+
case <-timeout:
488+
return errs
489+
default:
490+
}
491+
492+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
493+
defer cancel()
494+
var res map[string]interface{}
495+
err := mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
496+
if err != nil {
497+
errs = append(errs, err)
498+
}
499+
}
500+
}
501+
502+
g := new(errgroup.Group)
503+
for i := 0; i < 1000; i++ {
504+
g.Go(func() error {
505+
errs := findOneFor(20 * time.Second)
506+
507+
if len(errs) == 0 {
508+
return nil
509+
}
510+
511+
return errs[len(errs)-1]
512+
})
513+
}
514+
err = g.Wait()
515+
assert.NotNil(mt, err, "expected at least one error, got nil")
516+
517+
// findOneFor(1 * time.Second)
518+
519+
errs := findOneFor(5 * time.Second)
520+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
521+
})
522+
}
523+
524+
func TestClientOnce(t *testing.T) {
525+
mtOpts := mtest.NewOptions().
526+
MinServerVersion("3.6").
527+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
528+
CreateClient(false)
529+
mt := mtest.New(t, mtOpts)
530+
531+
maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
532+
mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
533+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
534+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
535+
536+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
537+
defer cancel()
538+
var res map[string]interface{}
539+
err = mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
540+
assert.Nil(mt, err, "unexpected error calling FindOne")
541+
})
542+
}

mongo/integration/mtest/mongotest.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516

1617
"go.mongodb.org/mongo-driver/bson"
@@ -101,7 +102,7 @@ type T struct {
101102
dataLake *bool
102103
ssl *bool
103104
collCreateOpts bson.D
104-
connsCheckedOut int // net number of connections checked out during test execution
105+
connsCheckedOut int32 // net number of connections checked out during test execution
105106
requireAPIVersion *bool
106107

107108
// options copied to sub-tests
@@ -366,7 +367,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {
366367
}
367368

368369
// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
369-
func (t *T) NumberConnectionsCheckedOut() int {
370+
func (t *T) NumberConnectionsCheckedOut() int32 {
370371
return t.connsCheckedOut
371372
}
372373

@@ -592,9 +593,9 @@ func (t *T) createTestClient() {
592593

593594
switch evt.Type {
594595
case event.GetSucceeded:
595-
t.connsCheckedOut++
596+
atomic.AddInt32(&t.connsCheckedOut, 1)
596597
case event.ConnectionReturned:
597-
t.connsCheckedOut--
598+
atomic.AddInt32(&t.connsCheckedOut, -1)
598599
}
599600
},
600601
})

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func TestCMAPProse(t *testing.T) {
5252
assert.Equal(t, numClosed, len(closed), "expected %d closed events, got %d", numClosed, len(closed))
5353

5454
netCount := numCreated - numClosed
55-
assert.Equal(t, netCount, len(p.opened), "expected %d connections in opened map, got %d", netCount,
56-
len(p.opened))
55+
assert.Equal(t, netCount, len(p.conns), "expected %d connections in opened map, got %d", netCount,
56+
len(p.conns))
5757
}
5858

5959
t.Run("get", func(t *testing.T) {
@@ -76,7 +76,7 @@ func TestCMAPProse(t *testing.T) {
7676
}
7777
pool := createTestPool(t, cfg, connOpts...)
7878

79-
_, err := pool.get(context.Background())
79+
_, err := pool.checkOut(context.Background())
8080
assert.NotNil(t, err, "expected get() error, got nil")
8181

8282
// If the connection doesn't finish connecting before resourcePool gives it back, the error will be
@@ -104,7 +104,7 @@ func TestCMAPProse(t *testing.T) {
104104
}
105105
pool := createTestPool(t, getConfig(), connOpts...)
106106

107-
_, err := pool.get(context.Background())
107+
_, err := pool.checkOut(context.Background())
108108
assert.NotNil(t, err, "expected get() error, got nil")
109109
assertConnectionCounts(t, pool, 1, 1)
110110
})
@@ -125,14 +125,14 @@ func TestCMAPProse(t *testing.T) {
125125
}
126126
pool := createTestPool(t, getConfig(), connOpts...)
127127

128-
conn, err := pool.get(context.Background())
128+
conn, err := pool.checkOut(context.Background())
129129
assert.Nil(t, err, "get error: %v", err)
130130

131131
// Force a network error by writing to the connection.
132132
err = conn.writeWireMessage(context.Background(), nil)
133133
assert.NotNil(t, err, "expected writeWireMessage error, got nil")
134134

135-
err = pool.put(conn)
135+
err = pool.checkIn(conn)
136136
assert.Nil(t, err, "put error: %v", err)
137137

138138
assertConnectionCounts(t, pool, 1, 1)
@@ -158,14 +158,14 @@ func TestCMAPProse(t *testing.T) {
158158
}
159159
pool := createTestPool(t, getConfig(), connOpts...)
160160

161-
conn, err := pool.get(context.Background())
161+
conn, err := pool.checkOut(context.Background())
162162
assert.Nil(t, err, "get error: %v", err)
163163

164164
// Set the idleDeadline to a time in the past to simulate expiration.
165165
pastTime := time.Now().Add(-10 * time.Second)
166166
conn.idleDeadline.Store(pastTime)
167167

168-
err = pool.put(conn)
168+
err = pool.checkIn(conn)
169169
assert.Nil(t, err, "put error: %v", err)
170170

171171
assertConnectionCounts(t, pool, 1, 1)
@@ -191,7 +191,7 @@ func TestCMAPProse(t *testing.T) {
191191

192192
// Return all connections to the pool and assert that none were closed by put().
193193
for i, c := range conns {
194-
err := pool.put(c)
194+
err := pool.checkIn(c)
195195
assert.Nil(t, err, "put error at index %d: %v", i, err)
196196
}
197197
assertConnectionCounts(t, pool, numConns, 0)
@@ -223,7 +223,7 @@ func TestCMAPProse(t *testing.T) {
223223

224224
// Only return 2 of the connection.
225225
for i := 0; i < 2; i++ {
226-
err := pool.put(conns[i])
226+
err := pool.checkIn(conns[i])
227227
assert.Nil(t, err, "put error at index %d: %v", i, err)
228228
}
229229
conns = conns[2:]
@@ -237,7 +237,7 @@ func TestCMAPProse(t *testing.T) {
237237
// Return the remaining connections and assert that the closed event count does not increase because
238238
// these connections have already been closed.
239239
for i, c := range conns {
240-
err := pool.put(c)
240+
err := pool.checkIn(c)
241241
assert.Nil(t, err, "put error at index %d: %v", i, err)
242242
}
243243
assertConnectionCounts(t, pool, numConns, numConns)
@@ -269,7 +269,7 @@ func checkoutConnections(t *testing.T, p *pool, numConns int) []*connection {
269269
conns := make([]*connection, 0, numConns)
270270

271271
for i := 0; i < numConns; i++ {
272-
conn, err := p.get(context.Background())
272+
conn, err := p.checkOut(context.Background())
273273
assert.Nil(t, err, "get error at index %d: %v", i, err)
274274
conns = append(conns, conn)
275275
}

x/mongo/driver/topology/connection.go

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21-
"go.mongodb.org/mongo-driver/event"
2221
"go.mongodb.org/mongo-driver/internal"
2322
"go.mongodb.org/mongo-driver/mongo/address"
2423
"go.mongodb.org/mongo-driver/mongo/description"
@@ -63,11 +62,9 @@ type connection struct {
6362
cancellationListener cancellationListener
6463

6564
// pool related fields
66-
pool *pool
67-
poolID uint64
68-
generation uint64
69-
expireReason string
70-
poolMonitor *event.PoolMonitor
65+
pool *pool
66+
poolID uint64
67+
generation uint64
7168
}
7269

7370
// newConnection handles the creation of a connection. It does not connect the connection.
@@ -89,7 +86,6 @@ func newConnection(addr address.Address, opts ...ConnectionOption) (*connection,
8986
config: cfg,
9087
connectContextMade: make(chan struct{}),
9188
cancellationListener: internal.NewCancellationListener(),
92-
poolMonitor: cfg.poolMonitor,
9389
}
9490
// Connections to non-load balanced deployments should eagerly set the generation numbers so errors encountered
9591
// at any point during connection establishment can be processed without the connection being considered stale.
@@ -101,15 +97,15 @@ func newConnection(addr address.Address, opts ...ConnectionOption) (*connection,
10197
return c, nil
10298
}
10399

104-
func (c *connection) processInitializationError(opCtx context.Context, err error) {
100+
func (c *connection) processInitializationError(err error) {
105101
atomic.StoreInt32(&c.connected, disconnected)
106102
if c.nc != nil {
107103
_ = c.nc.Close()
108104
}
109105

110106
c.connectErr = ConnectionError{Wrapped: err, init: true}
111107
if c.config.errorHandlingCallback != nil {
112-
c.config.errorHandlingCallback(opCtx, c.connectErr, c.generation, c.desc.ServiceID)
108+
c.config.errorHandlingCallback(c.connectErr, c.generation, c.desc.ServiceID)
113109
}
114110
}
115111

@@ -184,7 +180,7 @@ func (c *connection) connect(ctx context.Context) {
184180
var tempNc net.Conn
185181
tempNc, err = c.config.dialer.DialContext(dialCtx, c.addr.Network(), c.addr.String())
186182
if err != nil {
187-
c.processInitializationError(ctx, err)
183+
c.processInitializationError(err)
188184
return
189185
}
190186
c.nc = tempNc
@@ -200,7 +196,7 @@ func (c *connection) connect(ctx context.Context) {
200196
}
201197
tlsNc, err := configureTLS(dialCtx, c.config.tlsConnectionSource, c.nc, c.addr, tlsConfig, ocspOpts)
202198
if err != nil {
203-
c.processInitializationError(ctx, err)
199+
c.processInitializationError(err)
204200
return
205201
}
206202
c.nc = tlsNc
@@ -211,13 +207,6 @@ func (c *connection) connect(ctx context.Context) {
211207
// running isMaster and authentication is handled by a handshaker on the configuration instance.
212208
handshaker := c.config.handshaker
213209
if handshaker == nil {
214-
if c.poolMonitor != nil {
215-
c.poolMonitor.Event(&event.PoolEvent{
216-
Type: event.ConnectionReady,
217-
Address: c.addr.String(),
218-
ConnectionID: c.poolID,
219-
})
220-
}
221210
return
222211
}
223212

@@ -252,7 +241,7 @@ func (c *connection) connect(ctx context.Context) {
252241

253242
// We have a failed handshake here
254243
if err != nil {
255-
c.processInitializationError(ctx, err)
244+
c.processInitializationError(err)
256245
return
257246
}
258247

@@ -284,13 +273,6 @@ func (c *connection) connect(ctx context.Context) {
284273
}
285274
}
286275
}
287-
if c.poolMonitor != nil {
288-
c.poolMonitor.Event(&event.PoolEvent{
289-
Type: event.ConnectionReady,
290-
Address: c.addr.String(),
291-
ConnectionID: c.poolID,
292-
})
293-
}
294276
}
295277

296278
func (c *connection) wait() error {
@@ -704,7 +686,7 @@ func (c *Connection) Expire() error {
704686
}
705687

706688
func (c *Connection) cleanupReferences() error {
707-
err := c.pool.put(c.connection)
689+
err := c.pool.checkIn(c.connection)
708690
if c.cleanupPoolFn != nil {
709691
c.cleanupPoolFn()
710692
c.cleanupPoolFn = nil

x/mongo/driver/topology/connection_options.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type connectionConfig struct {
4646
handshaker Handshaker
4747
idleTimeout time.Duration
4848
cmdMonitor *event.CommandMonitor
49-
poolMonitor *event.PoolMonitor
5049
readTimeout time.Duration
5150
writeTimeout time.Duration
5251
tlsConfig *tls.Config
@@ -55,7 +54,7 @@ type connectionConfig struct {
5554
zstdLevel *int
5655
ocspCache ocsp.Cache
5756
disableOCSPEndpointCheck bool
58-
errorHandlingCallback func(opCtx context.Context, err error, startGenNum uint64, svcID *primitive.ObjectID)
57+
errorHandlingCallback func(err error, startGenNum uint64, svcID *primitive.ObjectID)
5958
tlsConnectionSource tlsConnectionSource
6059
loadBalanced bool
6160
getGenerationFn generationNumberFn
@@ -92,7 +91,7 @@ func withTLSConnectionSource(fn func(tlsConnectionSource) tlsConnectionSource) C
9291
}
9392
}
9493

95-
func withErrorHandlingCallback(fn func(opCtx context.Context, err error, startGenNum uint64, svcID *primitive.ObjectID)) ConnectionOption {
94+
func withErrorHandlingCallback(fn func(err error, startGenNum uint64, svcID *primitive.ObjectID)) ConnectionOption {
9695
return func(c *connectionConfig) error {
9796
c.errorHandlingCallback = fn
9897
return nil
@@ -173,14 +172,6 @@ func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) Connectio
173172
}
174173
}
175174

176-
// withPoolMonitor configures a event for connection monitoring.
177-
func withPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ConnectionOption {
178-
return func(c *connectionConfig) error {
179-
c.poolMonitor = fn(c.poolMonitor)
180-
return nil
181-
}
182-
}
183-
184175
// WithZlibLevel sets the zLib compression level.
185176
func WithZlibLevel(fn func(*int) *int) ConnectionOption {
186177
return func(c *connectionConfig) error {

0 commit comments

Comments
 (0)