Skip to content

Commit 8539500

Browse files
authored
GODRIVER-2038 Make all connections in the background using connectTimeoutMS. (#716)
1 parent ad46855 commit 8539500

File tree

16 files changed

+1622
-1703
lines changed

16 files changed

+1622
-1703
lines changed

data/connection-monitoring-and-pooling/wait-queue-timeout.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
55
"poolOptions": {
66
"maxPoolSize": 1,
7-
"waitQueueTimeoutMS": 20
7+
"waitQueueTimeoutMS": 50
88
},
99
"operations": [
1010
{

data/connection-monitoring-and-pooling/wait-queue-timeout.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ style: unit
33
description: must aggressively timeout threads enqueued longer than waitQueueTimeoutMS
44
poolOptions:
55
maxPoolSize: 1
6-
waitQueueTimeoutMS: 20
6+
waitQueueTimeoutMS: 50
77
operations:
88
# Check out only possible connection
99
- name: checkOut

mongo/integration/client_test.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2020
"go.mongodb.org/mongo-driver/bson/bsonrw"
2121
"go.mongodb.org/mongo-driver/bson/primitive"
22+
"go.mongodb.org/mongo-driver/event"
2223
"go.mongodb.org/mongo-driver/internal"
2324
"go.mongodb.org/mongo-driver/internal/testutil"
2425
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -469,15 +470,13 @@ func TestClient(t *testing.T) {
469470
}
470471

471472
func TestClientStress(t *testing.T) {
472-
// TODO: Enable with GODRIVER-2038.
473-
t.Skip("TODO: Enable with GODRIVER-2038")
474-
475473
if testing.Short() {
476474
t.Skip("skipping integration test in short mode")
477475
}
478476

479477
mtOpts := mtest.NewOptions().CreateClient(false)
480478
mt := mtest.New(t, mtOpts)
479+
defer mt.Close()
481480

482481
// Test that a Client can recover from a massive traffic spike after the traffic spike is over.
483482
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
@@ -505,6 +504,7 @@ func TestClientStress(t *testing.T) {
505504
if err != nil {
506505
errs = append(errs, err)
507506
}
507+
time.Sleep(10 * time.Microsecond)
508508
}
509509
return errs
510510
}
@@ -523,24 +523,42 @@ func TestClientStress(t *testing.T) {
523523
}
524524
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")
525525

526-
// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
527-
// size pools, to test how the client handles traffic spikes using different connection pool
528-
// configurations.
529-
maxPoolSizes := []uint64{0, 1, 10, 100}
526+
// Run tests with various "maxPoolSize" values, including 1-connection pools and the default
527+
// size of 100, to test how the client handles traffic spikes using different connection
528+
// pool configurations.
529+
maxPoolSizes := []uint64{1, 10, 100}
530530
for _, maxPoolSize := range maxPoolSizes {
531-
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
531+
tpm := newTestPoolMonitor()
532+
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(
533+
options.Client().
534+
SetPoolMonitor(tpm.PoolMonitor).
535+
SetMaxPoolSize(maxPoolSize))
532536
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
537+
// Print the count of connection created, connection closed, and pool clear events
538+
// collected during the test to help with debugging.
539+
defer func() {
540+
created := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionCreated }))
541+
closed := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionClosed }))
542+
poolCleared := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.PoolCleared }))
543+
mt.Logf("Connections created: %d, connections closed: %d, pool clears: %d", created, closed, poolCleared)
544+
}()
545+
533546
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
534547
_, err := mt.Coll.InsertOne(context.Background(), doc)
535548
assert.Nil(mt, err, "InsertOne error: %v", err)
536549

537-
// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
550+
// Set the timeout to be 100x the maximum observed RTT. Use a minimum 100ms timeout to
538551
// prevent spurious test failures due to extremely low timeouts.
539-
timeout := maxRTT * 10
540-
if timeout < 10*time.Millisecond {
541-
timeout = 10 * time.Millisecond
552+
timeout := maxRTT * 100
553+
minTimeout := 100 * time.Millisecond
554+
if timeout < minTimeout {
555+
timeout = minTimeout
542556
}
543-
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
557+
mt.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
558+
559+
// Warm up the client for 1 second to allow connections to be established. Ignore
560+
// any errors.
561+
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
544562

545563
// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
546564
// are no errors.
@@ -560,11 +578,11 @@ func TestClientStress(t *testing.T) {
560578
})
561579
}
562580
err = g.Wait()
563-
assert.NotNil(mt, err, "expected at least one error, got nil")
581+
mt.Logf("Error from extreme traffic spike (errors are expected): %v", err)
564582

565-
// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
583+
// Simulate normal traffic again for 5 second. Ignore any errors to allow any outstanding
566584
// connection errors to stop.
567-
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
585+
_ = findOneFor(mt.Coll, timeout, 5*time.Second)
568586

569587
// Simulate normal traffic again for 1 second and assert that there are no errors.
570588
errs = findOneFor(mt.Coll, timeout, 1*time.Second)

mongo/integration/sdam_error_handling_test.go

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -47,46 +47,7 @@ func TestSDAMErrorHandling(t *testing.T) {
4747
// blockConnection and appName.
4848
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
4949
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
50-
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
51-
// Assert that the pool is not cleared when a connection created by an application
52-
// operation thread encounters an operation timeout during handshaking. Unlike the
53-
// non-timeout test below, we only test connections created in the foreground for
54-
// timeouts because connections created by the pool maintenance routine can't be
55-
// timed out using a context.
56-
57-
appName := "authOperationTimeoutTest"
58-
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
59-
// speculative auth.
60-
mt.SetFailPoint(mtest.FailPoint{
61-
ConfigureFailPoint: "failCommand",
62-
Mode: mtest.FailPointMode{
63-
Times: 1,
64-
},
65-
Data: mtest.FailPointData{
66-
FailCommands: []string{"saslContinue"},
67-
BlockConnection: true,
68-
BlockTimeMS: 150,
69-
AppName: appName,
70-
},
71-
})
72-
73-
// Reset the client with the appName specified in the failpoint and the pool monitor.
74-
tpm := newTestPoolMonitor()
75-
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
76-
77-
// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
78-
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
79-
// like connectTimeoutMS or socketTimeoutMS).
80-
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
81-
defer cancel()
82-
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
83-
assert.NotNil(mt, err, "expected InsertOne error, got nil")
84-
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
85-
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
86-
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
87-
})
88-
89-
mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
50+
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
9051
// Assert that the pool is cleared when a connection created by an application
9152
// operation thread encounters a timeout caused by connectTimeoutMS during
9253
// handshaking.

x/mongo/driver/topology/CMAP_spec_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ func runCMAPTest(t *testing.T, testFileName string) {
119119
s.connectionstate = connected
120120
err = s.pool.connect()
121121
testHelpers.RequireNil(t, err, "error connecting connection pool: %v", err)
122+
defer func() {
123+
_ = s.pool.disconnect(context.Background())
124+
}()
122125

123126
for _, op := range test.Operations {
124127
if tempErr := runOperation(t, op, testInfo, s, test.PoolOptions.WaitQueueTimeoutMS); tempErr != nil {

0 commit comments

Comments
 (0)