Skip to content

Commit ffe57e8

Browse files
committed
GODRIVER-2038 Make all connections in the background using connectTimeoutMS.
1 parent e142bb3 commit ffe57e8

File tree

14 files changed

+1467
-1651
lines changed

14 files changed

+1467
-1651
lines changed

data/connection-monitoring-and-pooling/wait-queue-timeout.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
55
"poolOptions": {
66
"maxPoolSize": 1,
7-
"waitQueueTimeoutMS": 20
7+
"waitQueueTimeoutMS": 50
88
},
99
"operations": [
1010
{

data/connection-monitoring-and-pooling/wait-queue-timeout.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ style: unit
33
description: must aggressively timeout threads enqueued longer than waitQueueTimeoutMS
44
poolOptions:
55
maxPoolSize: 1
6-
waitQueueTimeoutMS: 20
6+
waitQueueTimeoutMS: 50
77
operations:
88
# Check out only possible connection
99
- name: checkOut

mongo/integration/client_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,3 +464,137 @@ type proxyMessage struct {
464464
sent wiremessage.WireMessage
465465
received wiremessage.WireMessage
466466
}
467+
468+
// func TestClientStress(t *testing.T) {
469+
// mtOpts := mtest.NewOptions().
470+
// MinServerVersion("3.6").
471+
// Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
472+
// CreateClient(false)
473+
// mt := mtest.New(t, mtOpts)
474+
475+
// maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
476+
// mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
477+
// _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
478+
// assert.Nil(mt, err, "unexpected error inserting document: %v", err)
479+
480+
// findOneFor := func(d time.Duration) []error {
481+
// timeout := time.After(d)
482+
// errs := make([]error, 0)
483+
484+
// for {
485+
// select {
486+
// case <-timeout:
487+
// return errs
488+
// default:
489+
// }
490+
491+
// ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
492+
// defer cancel()
493+
// var res map[string]interface{}
494+
// err := mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
495+
// if err != nil {
496+
// errs = append(errs, err)
497+
// }
498+
// }
499+
// }
500+
501+
// g := new(errgroup.Group)
502+
// for i := 0; i < 1000; i++ {
503+
// g.Go(func() error {
504+
// errs := findOneFor(5 * time.Second)
505+
506+
// if len(errs) == 0 {
507+
// return nil
508+
// }
509+
510+
// return errs[len(errs)-1]
511+
// })
512+
// }
513+
// err = g.Wait()
514+
// assert.NotNil(mt, err, "expected at least one error, got nil")
515+
516+
// findOneFor(1 * time.Second)
517+
518+
// errs := findOneFor(5 * time.Second)
519+
// assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
520+
// })
521+
// }
522+
523+
// func TestBlah(t *testing.T) {
524+
// uri := os.Getenv("MONGODB_URI")
525+
// client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri).SetMaxPoolSize(100))
526+
// if err != nil {
527+
// t.Log(err)
528+
// t.FailNow()
529+
// }
530+
// coll := client.Database("sample_mflix").Collection("movies")
531+
// oid, err := primitive.ObjectIDFromHex("573a1398f29313caabce9682")
532+
// if err != nil {
533+
// t.Log(err)
534+
// t.FailNow()
535+
// }
536+
537+
// findOneFor := func(d time.Duration) []error {
538+
// timeout := time.After(d)
539+
// errs := make([]error, 0)
540+
541+
// for {
542+
// select {
543+
// case <-timeout:
544+
// return errs
545+
// default:
546+
// }
547+
548+
// ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
549+
// defer cancel()
550+
// var res bson.D
551+
// err := coll.FindOne(
552+
// ctx,
553+
// bson.D{{"_id", oid}},
554+
// options.FindOne().SetProjection(bson.D{{"title", 1}})).Decode(&res)
555+
// if err != nil {
556+
// errs = append(errs, err)
557+
// }
558+
// }
559+
// }
560+
561+
// g := new(errgroup.Group)
562+
// for i := 0; i < 1000; i++ {
563+
// g.Go(func() error {
564+
// errs := findOneFor(5 * time.Second)
565+
566+
// if len(errs) == 0 {
567+
// return nil
568+
// }
569+
570+
// return errs[len(errs)-1]
571+
// })
572+
// }
573+
// err = g.Wait()
574+
// assert.NotNil(t, err, "expected at least one error, got nil")
575+
576+
// // findOneFor(1 * time.Second)
577+
578+
// errs := findOneFor(5 * time.Second)
579+
// assert.True(t, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
580+
// }
581+
582+
// func TestClientOnce(t *testing.T) {
583+
// mtOpts := mtest.NewOptions().
584+
// MinServerVersion("3.6").
585+
// Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
586+
// CreateClient(false)
587+
// mt := mtest.New(t, mtOpts)
588+
589+
// maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
590+
// mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
591+
// _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
592+
// assert.Nil(mt, err, "unexpected error inserting document: %v", err)
593+
594+
// ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
595+
// defer cancel()
596+
// var res map[string]interface{}
597+
// err = mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
598+
// assert.Nil(mt, err, "unexpected error calling FindOne")
599+
// })
600+
// }

mongo/integration/mtest/mongotest.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516

1617
"go.mongodb.org/mongo-driver/bson"
@@ -103,7 +104,7 @@ type T struct {
103104
dataLake *bool
104105
ssl *bool
105106
collCreateOpts bson.D
106-
connsCheckedOut int // net number of connections checked out during test execution
107+
connsCheckedOut int64 // net number of connections checked out during test execution
107108
requireAPIVersion *bool
108109

109110
// options copied to sub-tests
@@ -231,7 +232,7 @@ func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
231232
// store number of sessions and connections checked out here but assert that they're equal to 0 after
232233
// cleaning up test resources to make sure resources are always cleared
233234
sessions := sub.Client.NumberSessionsInProgress()
234-
conns := sub.connsCheckedOut
235+
conns := sub.NumberConnectionsCheckedOut()
235236

236237
if sub.clientType != Mock {
237238
sub.ClearFailPoints()
@@ -369,7 +370,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {
369370

370371
// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
371372
func (t *T) NumberConnectionsCheckedOut() int {
372-
return t.connsCheckedOut
373+
return int(atomic.LoadInt64(&t.connsCheckedOut))
373374
}
374375

375376
// ClearEvents clears the existing command monitoring events.
@@ -594,9 +595,9 @@ func (t *T) createTestClient() {
594595

595596
switch evt.Type {
596597
case event.GetSucceeded:
597-
t.connsCheckedOut++
598+
atomic.AddInt64(&t.connsCheckedOut, 1)
598599
case event.ConnectionReturned:
599-
t.connsCheckedOut--
600+
atomic.AddInt64(&t.connsCheckedOut, -1)
600601
}
601602
},
602603
})

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ func TestCMAPProse(t *testing.T) {
5252
assert.Equal(t, numClosed, len(closed), "expected %d closed events, got %d", numClosed, len(closed))
5353

5454
netCount := numCreated - numClosed
55-
assert.Equal(t, netCount, len(p.opened), "expected %d connections in opened map, got %d", netCount,
56-
len(p.opened))
55+
assert.Equal(t, netCount, p.totalConnectionCount(), "expected %d connections in opened map, got %d", netCount,
56+
p.totalConnectionCount())
5757
}
5858

59-
t.Run("get", func(t *testing.T) {
59+
t.Run("checkOut", func(t *testing.T) {
6060
t.Run("errored connection exists in pool", func(t *testing.T) {
61-
// If a connection is created as part of minPoolSize maintenance and errors while connecting, get()
61+
// If a connection is created as part of minPoolSize maintenance and errors while connecting, checkOut()
6262
// should report that error and publish an event.
6363
clearEvents()
6464

@@ -76,20 +76,19 @@ func TestCMAPProse(t *testing.T) {
7676
}
7777
pool := createTestPool(t, cfg, connOpts...)
7878

79-
_, err := pool.get(context.Background())
80-
assert.NotNil(t, err, "expected get() error, got nil")
79+
_, err := pool.checkOut(context.Background())
80+
assert.NotNil(t, err, "expected checkOut() error, got nil")
8181

82-
// If the connection doesn't finish connecting before resourcePool gives it back, the error will be
83-
// detected by pool.get and result in a created/closed count of 1. If it does finish connecting, the
84-
// error will be detected by resourcePool, which will return nil. Then, pool will try to create a new
85-
// connection, which will also error. This process will result in a created/closed count of 2.
8682
assert.True(t, len(created) == 1 || len(created) == 2, "expected 1 or 2 opened events, got %d", len(created))
8783
assert.True(t, len(closed) == 1 || len(closed) == 2, "expected 1 or 2 closed events, got %d", len(closed))
84+
85+
_ = pool.disconnect(context.Background())
8886
netCount := len(created) - len(closed)
8987
assert.Equal(t, 0, netCount, "expected net connection count to be 0, got %d", netCount)
9088
})
9189
t.Run("pool is empty", func(t *testing.T) {
92-
// If a new connection is created during get(), get() should report that error and publish an event.
90+
// If a checkOut() has to create a new connection and that connection encounters an
91+
// error while connecting, checkOut() should return that error and publish an event.
9392
clearEvents()
9493

9594
var dialer DialerFunc = func(context.Context, string, string) (net.Conn, error) {
@@ -103,13 +102,16 @@ func TestCMAPProse(t *testing.T) {
103102
}),
104103
}
105104
pool := createTestPool(t, getConfig(), connOpts...)
105+
defer func() {
106+
_ = pool.disconnect(context.Background())
107+
}()
106108

107-
_, err := pool.get(context.Background())
108-
assert.NotNil(t, err, "expected get() error, got nil")
109+
_, err := pool.checkOut(context.Background())
110+
assert.NotNil(t, err, "expected checkOut() error, got nil")
109111
assertConnectionCounts(t, pool, 1, 1)
110112
})
111113
})
112-
t.Run("put", func(t *testing.T) {
114+
t.Run("checkIn", func(t *testing.T) {
113115
t.Run("errored connection", func(t *testing.T) {
114116
// If the connection being returned to the pool encountered a network error, it should be removed from
115117
// the pool and an event should be published.
@@ -124,16 +126,19 @@ func TestCMAPProse(t *testing.T) {
124126
WithDialer(func(Dialer) Dialer { return dialer }),
125127
}
126128
pool := createTestPool(t, getConfig(), connOpts...)
129+
defer func() {
130+
_ = pool.disconnect(context.Background())
131+
}()
127132

128-
conn, err := pool.get(context.Background())
129-
assert.Nil(t, err, "get error: %v", err)
133+
conn, err := pool.checkOut(context.Background())
134+
assert.Nil(t, err, "checkOut() error: %v", err)
130135

131136
// Force a network error by writing to the connection.
132137
err = conn.writeWireMessage(context.Background(), nil)
133138
assert.NotNil(t, err, "expected writeWireMessage error, got nil")
134139

135-
err = pool.put(conn)
136-
assert.Nil(t, err, "put error: %v", err)
140+
err = pool.checkIn(conn)
141+
assert.Nil(t, err, "checkIn() error: %v", err)
137142

138143
assertConnectionCounts(t, pool, 1, 1)
139144
evt := <-closed
@@ -157,16 +162,19 @@ func TestCMAPProse(t *testing.T) {
157162
WithIdleTimeout(func(time.Duration) time.Duration { return 1 * time.Second }),
158163
}
159164
pool := createTestPool(t, getConfig(), connOpts...)
165+
defer func() {
166+
_ = pool.disconnect(context.Background())
167+
}()
160168

161-
conn, err := pool.get(context.Background())
162-
assert.Nil(t, err, "get error: %v", err)
169+
conn, err := pool.checkOut(context.Background())
170+
assert.Nil(t, err, "checkOut() error: %v", err)
163171

164172
// Set the idleDeadline to a time in the past to simulate expiration.
165173
pastTime := time.Now().Add(-10 * time.Second)
166174
conn.idleDeadline.Store(pastTime)
167175

168-
err = pool.put(conn)
169-
assert.Nil(t, err, "put error: %v", err)
176+
err = pool.checkIn(conn)
177+
assert.Nil(t, err, "checkIn() error: %v", err)
170178

171179
assertConnectionCounts(t, pool, 1, 1)
172180
evt := <-closed
@@ -189,10 +197,10 @@ func TestCMAPProse(t *testing.T) {
189197
conns := checkoutConnections(t, pool, numConns)
190198
assertConnectionCounts(t, pool, numConns, 0)
191199

192-
// Return all connections to the pool and assert that none were closed by put().
200+
// Return all connections to the pool and assert that none were closed by checkIn().
193201
for i, c := range conns {
194-
err := pool.put(c)
195-
assert.Nil(t, err, "put error at index %d: %v", i, err)
202+
err := pool.checkIn(c)
203+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
196204
}
197205
assertConnectionCounts(t, pool, numConns, 0)
198206

@@ -223,8 +231,8 @@ func TestCMAPProse(t *testing.T) {
223231

224232
// Only return 2 of the connection.
225233
for i := 0; i < 2; i++ {
226-
err := pool.put(conns[i])
227-
assert.Nil(t, err, "put error at index %d: %v", i, err)
234+
err := pool.checkIn(conns[i])
235+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
228236
}
229237
conns = conns[2:]
230238
assertConnectionCounts(t, pool, numConns, 0)
@@ -237,8 +245,8 @@ func TestCMAPProse(t *testing.T) {
237245
// Return the remaining connections and assert that the closed event count does not increase because
238246
// these connections have already been closed.
239247
for i, c := range conns {
240-
err := pool.put(c)
241-
assert.Nil(t, err, "put error at index %d: %v", i, err)
248+
err := pool.checkIn(c)
249+
assert.Nil(t, err, "checkIn() error at index %d: %v", i, err)
242250
}
243251
assertConnectionCounts(t, pool, numConns, numConns)
244252

@@ -269,8 +277,8 @@ func checkoutConnections(t *testing.T, p *pool, numConns int) []*connection {
269277
conns := make([]*connection, 0, numConns)
270278

271279
for i := 0; i < numConns; i++ {
272-
conn, err := p.get(context.Background())
273-
assert.Nil(t, err, "get error at index %d: %v", i, err)
280+
conn, err := p.checkOut(context.Background())
281+
assert.Nil(t, err, "checkOut() error at index %d: %v", i, err)
274282
conns = append(conns, conn)
275283
}
276284

0 commit comments

Comments
 (0)