Skip to content

Commit 56a3a88

Browse files
committed
GODRIVER-2038 Make all connections in the background using connectTimeoutMS.
1 parent 6385557 commit 56a3a88

File tree

12 files changed

+1468
-1675
lines changed

12 files changed

+1468
-1675
lines changed

mongo/integration/client_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package integration
88

99
import (
10+
"context"
1011
"fmt"
1112
"os"
1213
"reflect"
@@ -17,6 +18,7 @@ import (
1718
"go.mongodb.org/mongo-driver/bson"
1819
"go.mongodb.org/mongo-driver/bson/bsoncodec"
1920
"go.mongodb.org/mongo-driver/bson/bsonrw"
21+
"go.mongodb.org/mongo-driver/bson/primitive"
2022
"go.mongodb.org/mongo-driver/internal"
2123
"go.mongodb.org/mongo-driver/internal/testutil"
2224
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -27,6 +29,7 @@ import (
2729
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2830
"go.mongodb.org/mongo-driver/x/mongo/driver"
2931
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
32+
"golang.org/x/sync/errgroup"
3033
)
3134

3235
var noClientOpts = mtest.NewOptions().CreateClient(false)
@@ -464,3 +467,137 @@ type proxyMessage struct {
464467
sent wiremessage.WireMessage
465468
received wiremessage.WireMessage
466469
}
470+
471+
func TestClientStress(t *testing.T) {
472+
mtOpts := mtest.NewOptions().
473+
MinServerVersion("3.6").
474+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
475+
CreateClient(false)
476+
mt := mtest.New(t, mtOpts)
477+
478+
maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
479+
mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
480+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
481+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
482+
483+
findOneFor := func(d time.Duration) []error {
484+
timeout := time.After(d)
485+
errs := make([]error, 0)
486+
487+
for {
488+
select {
489+
case <-timeout:
490+
return errs
491+
default:
492+
}
493+
494+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
495+
defer cancel()
496+
var res map[string]interface{}
497+
err := mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
498+
if err != nil {
499+
errs = append(errs, err)
500+
}
501+
}
502+
}
503+
504+
g := new(errgroup.Group)
505+
for i := 0; i < 1000; i++ {
506+
g.Go(func() error {
507+
errs := findOneFor(5 * time.Second)
508+
509+
if len(errs) == 0 {
510+
return nil
511+
}
512+
513+
return errs[len(errs)-1]
514+
})
515+
}
516+
err = g.Wait()
517+
assert.NotNil(mt, err, "expected at least one error, got nil")
518+
519+
findOneFor(1 * time.Second)
520+
521+
errs := findOneFor(5 * time.Second)
522+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
523+
})
524+
}
525+
526+
func TestBlah(t *testing.T) {
527+
uri := os.Getenv("MONGODB_URI")
528+
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri).SetMaxPoolSize(100))
529+
if err != nil {
530+
t.Log(err)
531+
t.FailNow()
532+
}
533+
coll := client.Database("sample_mflix").Collection("movies")
534+
oid, err := primitive.ObjectIDFromHex("573a1398f29313caabce9682")
535+
if err != nil {
536+
t.Log(err)
537+
t.FailNow()
538+
}
539+
540+
findOneFor := func(d time.Duration) []error {
541+
timeout := time.After(d)
542+
errs := make([]error, 0)
543+
544+
for {
545+
select {
546+
case <-timeout:
547+
return errs
548+
default:
549+
}
550+
551+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
552+
defer cancel()
553+
var res bson.D
554+
err := coll.FindOne(
555+
ctx,
556+
bson.D{{"_id", oid}},
557+
options.FindOne().SetProjection(bson.D{{"title", 1}})).Decode(&res)
558+
if err != nil {
559+
errs = append(errs, err)
560+
}
561+
}
562+
}
563+
564+
g := new(errgroup.Group)
565+
for i := 0; i < 1000; i++ {
566+
g.Go(func() error {
567+
errs := findOneFor(5 * time.Second)
568+
569+
if len(errs) == 0 {
570+
return nil
571+
}
572+
573+
return errs[len(errs)-1]
574+
})
575+
}
576+
err = g.Wait()
577+
assert.NotNil(t, err, "expected at least one error, got nil")
578+
579+
// findOneFor(1 * time.Second)
580+
581+
errs := findOneFor(5 * time.Second)
582+
assert.True(t, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
583+
}
584+
585+
func TestClientOnce(t *testing.T) {
586+
mtOpts := mtest.NewOptions().
587+
MinServerVersion("3.6").
588+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single). // TODO: What topologies?
589+
CreateClient(false)
590+
mt := mtest.New(t, mtOpts)
591+
592+
maxPoolOpts := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(100))
593+
mt.RunOpts("", maxPoolOpts, func(mt *mtest.T) {
594+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"key", "value"}})
595+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
596+
597+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
598+
defer cancel()
599+
var res map[string]interface{}
600+
err = mt.Coll.FindOne(ctx, bson.D{{"key", "value"}}).Decode(&res)
601+
assert.Nil(mt, err, "unexpected error calling FindOne")
602+
})
603+
}

mongo/integration/mtest/mongotest.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516

1617
"go.mongodb.org/mongo-driver/bson"
@@ -103,7 +104,7 @@ type T struct {
103104
dataLake *bool
104105
ssl *bool
105106
collCreateOpts bson.D
106-
connsCheckedOut int // net number of connections checked out during test execution
107+
connsCheckedOut int32 // net number of connections checked out during test execution
107108
requireAPIVersion *bool
108109

109110
// options copied to sub-tests
@@ -368,7 +369,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {
368369
}
369370

370371
// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
371-
func (t *T) NumberConnectionsCheckedOut() int {
372+
func (t *T) NumberConnectionsCheckedOut() int32 {
372373
return t.connsCheckedOut
373374
}
374375

@@ -594,9 +595,9 @@ func (t *T) createTestClient() {
594595

595596
switch evt.Type {
596597
case event.GetSucceeded:
597-
t.connsCheckedOut++
598+
atomic.AddInt32(&t.connsCheckedOut, 1)
598599
case event.ConnectionReturned:
599-
t.connsCheckedOut--
600+
atomic.AddInt32(&t.connsCheckedOut, -1)
600601
}
601602
},
602603
})

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func TestCMAPProse(t *testing.T) {
5252
assert.Equal(t, numClosed, len(closed), "expected %d closed events, got %d", numClosed, len(closed))
5353

5454
netCount := numCreated - numClosed
55-
assert.Equal(t, netCount, len(p.opened), "expected %d connections in opened map, got %d", netCount,
56-
len(p.opened))
55+
assert.Equal(t, netCount, p.totalConnectionCount(), "expected %d connections in opened map, got %d", netCount,
56+
p.totalConnectionCount())
5757
}
5858

5959
t.Run("get", func(t *testing.T) {
@@ -76,7 +76,7 @@ func TestCMAPProse(t *testing.T) {
7676
}
7777
pool := createTestPool(t, cfg, connOpts...)
7878

79-
_, err := pool.get(context.Background())
79+
_, err := pool.checkOut(context.Background())
8080
assert.NotNil(t, err, "expected get() error, got nil")
8181

8282
// If the connection doesn't finish connecting before resourcePool gives it back, the error will be
@@ -104,7 +104,7 @@ func TestCMAPProse(t *testing.T) {
104104
}
105105
pool := createTestPool(t, getConfig(), connOpts...)
106106

107-
_, err := pool.get(context.Background())
107+
_, err := pool.checkOut(context.Background())
108108
assert.NotNil(t, err, "expected get() error, got nil")
109109
assertConnectionCounts(t, pool, 1, 1)
110110
})
@@ -125,14 +125,14 @@ func TestCMAPProse(t *testing.T) {
125125
}
126126
pool := createTestPool(t, getConfig(), connOpts...)
127127

128-
conn, err := pool.get(context.Background())
128+
conn, err := pool.checkOut(context.Background())
129129
assert.Nil(t, err, "get error: %v", err)
130130

131131
// Force a network error by writing to the connection.
132132
err = conn.writeWireMessage(context.Background(), nil)
133133
assert.NotNil(t, err, "expected writeWireMessage error, got nil")
134134

135-
err = pool.put(conn)
135+
err = pool.checkIn(conn)
136136
assert.Nil(t, err, "put error: %v", err)
137137

138138
assertConnectionCounts(t, pool, 1, 1)
@@ -158,14 +158,14 @@ func TestCMAPProse(t *testing.T) {
158158
}
159159
pool := createTestPool(t, getConfig(), connOpts...)
160160

161-
conn, err := pool.get(context.Background())
161+
conn, err := pool.checkOut(context.Background())
162162
assert.Nil(t, err, "get error: %v", err)
163163

164164
// Set the idleDeadline to a time in the past to simulate expiration.
165165
pastTime := time.Now().Add(-10 * time.Second)
166166
conn.idleDeadline.Store(pastTime)
167167

168-
err = pool.put(conn)
168+
err = pool.checkIn(conn)
169169
assert.Nil(t, err, "put error: %v", err)
170170

171171
assertConnectionCounts(t, pool, 1, 1)
@@ -191,7 +191,7 @@ func TestCMAPProse(t *testing.T) {
191191

192192
// Return all connections to the pool and assert that none were closed by put().
193193
for i, c := range conns {
194-
err := pool.put(c)
194+
err := pool.checkIn(c)
195195
assert.Nil(t, err, "put error at index %d: %v", i, err)
196196
}
197197
assertConnectionCounts(t, pool, numConns, 0)
@@ -223,7 +223,7 @@ func TestCMAPProse(t *testing.T) {
223223

224224
// Only return 2 of the connection.
225225
for i := 0; i < 2; i++ {
226-
err := pool.put(conns[i])
226+
err := pool.checkIn(conns[i])
227227
assert.Nil(t, err, "put error at index %d: %v", i, err)
228228
}
229229
conns = conns[2:]
@@ -237,7 +237,7 @@ func TestCMAPProse(t *testing.T) {
237237
// Return the remaining connections and assert that the closed event count does not increase because
238238
// these connections have already been closed.
239239
for i, c := range conns {
240-
err := pool.put(c)
240+
err := pool.checkIn(c)
241241
assert.Nil(t, err, "put error at index %d: %v", i, err)
242242
}
243243
assertConnectionCounts(t, pool, numConns, numConns)
@@ -269,7 +269,7 @@ func checkoutConnections(t *testing.T, p *pool, numConns int) []*connection {
269269
conns := make([]*connection, 0, numConns)
270270

271271
for i := 0; i < numConns; i++ {
272-
conn, err := p.get(context.Background())
272+
conn, err := p.checkOut(context.Background())
273273
assert.Nil(t, err, "get error at index %d: %v", i, err)
274274
conns = append(conns, conn)
275275
}

x/mongo/driver/topology/connection.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21-
"go.mongodb.org/mongo-driver/event"
2221
"go.mongodb.org/mongo-driver/internal"
2322
"go.mongodb.org/mongo-driver/mongo/address"
2423
"go.mongodb.org/mongo-driver/mongo/description"
@@ -63,11 +62,9 @@ type connection struct {
6362
cancellationListener cancellationListener
6463

6564
// pool related fields
66-
pool *pool
67-
poolID uint64
68-
generation uint64
69-
expireReason string
70-
poolMonitor *event.PoolMonitor
65+
pool *pool
66+
poolID uint64
67+
generation uint64
7168
}
7269

7370
// newConnection handles the creation of a connection. It does not connect the connection.
@@ -89,7 +86,6 @@ func newConnection(addr address.Address, opts ...ConnectionOption) (*connection,
8986
config: cfg,
9087
connectContextMade: make(chan struct{}),
9188
cancellationListener: internal.NewCancellationListener(),
92-
poolMonitor: cfg.poolMonitor,
9389
}
9490
// Connections to non-load balanced deployments should eagerly set the generation numbers so errors encountered
9591
// at any point during connection establishment can be processed without the connection being considered stale.
@@ -211,13 +207,6 @@ func (c *connection) connect(ctx context.Context) {
211207
// running hello and authentication is handled by a handshaker on the configuration instance.
212208
handshaker := c.config.handshaker
213209
if handshaker == nil {
214-
if c.poolMonitor != nil {
215-
c.poolMonitor.Event(&event.PoolEvent{
216-
Type: event.ConnectionReady,
217-
Address: c.addr.String(),
218-
ConnectionID: c.poolID,
219-
})
220-
}
221210
return
222211
}
223212

@@ -284,13 +273,6 @@ func (c *connection) connect(ctx context.Context) {
284273
}
285274
}
286275
}
287-
if c.poolMonitor != nil {
288-
c.poolMonitor.Event(&event.PoolEvent{
289-
Type: event.ConnectionReady,
290-
Address: c.addr.String(),
291-
ConnectionID: c.poolID,
292-
})
293-
}
294276
}
295277

296278
func (c *connection) wait() error {
@@ -704,7 +686,7 @@ func (c *Connection) Expire() error {
704686
}
705687

706688
func (c *Connection) cleanupReferences() error {
707-
err := c.pool.put(c.connection)
689+
err := c.pool.checkIn(c.connection)
708690
if c.cleanupPoolFn != nil {
709691
c.cleanupPoolFn()
710692
c.cleanupPoolFn = nil

x/mongo/driver/topology/connection_options.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type connectionConfig struct {
4646
handshaker Handshaker
4747
idleTimeout time.Duration
4848
cmdMonitor *event.CommandMonitor
49-
poolMonitor *event.PoolMonitor
5049
readTimeout time.Duration
5150
writeTimeout time.Duration
5251
tlsConfig *tls.Config
@@ -173,14 +172,6 @@ func WithMonitor(fn func(*event.CommandMonitor) *event.CommandMonitor) Connectio
173172
}
174173
}
175174

176-
// withPoolMonitor configures a event for connection monitoring.
177-
func withPoolMonitor(fn func(*event.PoolMonitor) *event.PoolMonitor) ConnectionOption {
178-
return func(c *connectionConfig) error {
179-
c.poolMonitor = fn(c.poolMonitor)
180-
return nil
181-
}
182-
}
183-
184175
// WithZlibLevel sets the zLib compression level.
185176
func WithZlibLevel(fn func(*int) *int) ConnectionOption {
186177
return func(c *connectionConfig) error {

0 commit comments

Comments
 (0)