Skip to content

GODRIVER-2038 Make all connections in the background using connectTimeoutMS. #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
"poolOptions": {
"maxPoolSize": 1,
"waitQueueTimeoutMS": 20
"waitQueueTimeoutMS": 50
},
"operations": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ style: unit
description: must aggressively timeout threads enqueued longer than waitQueueTimeoutMS
poolOptions:
maxPoolSize: 1
waitQueueTimeoutMS: 20
waitQueueTimeoutMS: 50
operations:
# Check out only possible connection
- name: checkOut
Expand Down
50 changes: 34 additions & 16 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal"
"go.mongodb.org/mongo-driver/internal/testutil"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
Expand Down Expand Up @@ -469,15 +470,13 @@ func TestClient(t *testing.T) {
}

func TestClientStress(t *testing.T) {
// TODO: Enable with GODRIVER-2038.
t.Skip("TODO: Enable with GODRIVER-2038")

if testing.Short() {
t.Skip("skipping integration test in short mode")
}

mtOpts := mtest.NewOptions().CreateClient(false)
mt := mtest.New(t, mtOpts)
defer mt.Close()

// Test that a Client can recover from a massive traffic spike after the traffic spike is over.
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
Expand Down Expand Up @@ -505,6 +504,7 @@ func TestClientStress(t *testing.T) {
if err != nil {
errs = append(errs, err)
}
time.Sleep(10 * time.Microsecond)
}
return errs
}
Expand All @@ -523,24 +523,42 @@ func TestClientStress(t *testing.T) {
}
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")

// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
// size pools, to test how the client handles traffic spikes using different connection pool
// configurations.
maxPoolSizes := []uint64{0, 1, 10, 100}
// Run tests with various "maxPoolSize" values, including 1-connection pools and the default
// size of 100, to test how the client handles traffic spikes using different connection
// pool configurations.
maxPoolSizes := []uint64{1, 10, 100}
for _, maxPoolSize := range maxPoolSizes {
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
tpm := newTestPoolMonitor()
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(
options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetMaxPoolSize(maxPoolSize))
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
// Print the count of connection created, connection closed, and pool clear events
// collected during the test to help with debugging.
defer func() {
created := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionCreated }))
closed := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionClosed }))
poolCleared := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.PoolCleared }))
mt.Logf("Connections created: %d, connections closed: %d, pool clears: %d", created, closed, poolCleared)
}()

doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
_, err := mt.Coll.InsertOne(context.Background(), doc)
assert.Nil(mt, err, "InsertOne error: %v", err)

// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
// Set the timeout to be 100x the maximum observed RTT. Use a minimum 100ms timeout to
// prevent spurious test failures due to extremely low timeouts.
timeout := maxRTT * 10
if timeout < 10*time.Millisecond {
timeout = 10 * time.Millisecond
timeout := maxRTT * 100
minTimeout := 100 * time.Millisecond
if timeout < minTimeout {
timeout = minTimeout
}
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
mt.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)

// Warm up the client for 1 second to allow connections to be established. Ignore
// any errors.
_ = findOneFor(mt.Coll, timeout, 1*time.Second)

// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
// are no errors.
Expand All @@ -560,11 +578,11 @@ func TestClientStress(t *testing.T) {
})
}
err = g.Wait()
assert.NotNil(mt, err, "expected at least one error, got nil")
mt.Logf("Error from extreme traffic spike (errors are expected): %v", err)

// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
// Simulate normal traffic again for 5 second. Ignore any errors to allow any outstanding
// connection errors to stop.
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
_ = findOneFor(mt.Coll, timeout, 5*time.Second)

// Simulate normal traffic again for 1 second and assert that there are no errors.
errs = findOneFor(mt.Coll, timeout, 1*time.Second)
Expand Down
41 changes: 1 addition & 40 deletions mongo/integration/sdam_error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,7 @@ func TestSDAMErrorHandling(t *testing.T) {
// blockConnection and appName.
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
// Assert that the pool is not cleared when a connection created by an application
// operation thread encounters an operation timeout during handshaking. Unlike the
// non-timeout test below, we only test connections created in the foreground for
// timeouts because connections created by the pool maintenance routine can't be
// timed out using a context.

appName := "authOperationTimeoutTest"
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
// speculative auth.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"saslContinue"},
BlockConnection: true,
BlockTimeMS: 150,
AppName: appName,
},
})

// Reset the client with the appName specified in the failpoint and the pool monitor.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
// like connectTimeoutMS or socketTimeoutMS).
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
defer cancel()
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
})

mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application
// operation thread encounters a timeout caused by connectTimeoutMS during
// handshaking.
Expand Down
3 changes: 3 additions & 0 deletions x/mongo/driver/topology/CMAP_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ func runCMAPTest(t *testing.T, testFileName string) {
s.connectionstate = connected
err = s.pool.connect()
testHelpers.RequireNil(t, err, "error connecting connection pool: %v", err)
defer func() {
_ = s.pool.disconnect(context.Background())
}()

for _, op := range test.Operations {
if tempErr := runOperation(t, op, testInfo, s, test.PoolOptions.WaitQueueTimeoutMS); tempErr != nil {
Expand Down
Loading