Skip to content

GODRIVER-2078 Add a Client stress test that simulates traffic overload and recovery. #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions internal/testutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,6 @@ func GetDBName(cs connstring.ConnString) string {
return fmt.Sprintf("mongo-go-driver-%d", os.Getpid())
}

// Integration should be called at the beginning of integration
// tests to ensure that they are skipped if integration testing is
// turned off.
func Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
}

// compareVersions compares two version number strings (i.e. positive integers separated by
// periods). Comparisons are done to the lesser precision of the two versions. For example, 3.2 is
// considered equal to 3.2.11, whereas 3.2.0 is considered less than 3.2.11.
Expand Down
108 changes: 108 additions & 0 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/internal"
"go.mongodb.org/mongo-driver/internal/testutil"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
Expand All @@ -28,6 +29,7 @@ import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
"golang.org/x/sync/errgroup"
)

var noClientOpts = mtest.NewOptions().CreateClient(false)
Expand Down Expand Up @@ -472,3 +474,109 @@ type proxyMessage struct {
sent wiremessage.WireMessage
received wiremessage.WireMessage
}

func TestClientStress(t *testing.T) {
// TODO: Enable with GODRIVER-2038.
t.Skip("TODO: Enable with GODRIVER-2038")

if testing.Short() {
t.Skip("skipping integration test in short mode")
}

mtOpts := mtest.NewOptions().CreateClient(false)
mt := mtest.New(t, mtOpts)

// Test that a Client can recover from a massive traffic spike after the traffic spike is over.
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
oid := primitive.NewObjectID()
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
_, err := mt.Coll.InsertOne(context.Background(), doc)
assert.Nil(mt, err, "InsertOne error: %v", err)

// findOne calls FindOne("_id": oid) on the given collection and with the given timeout. It
// returns any errors.
findOne := func(coll *mongo.Collection, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var res map[string]interface{}
return coll.FindOne(ctx, bson.D{{Key: "_id", Value: oid}}).Decode(&res)
}

// findOneFor calls FindOne on the given collection and with the given timeout in a loop for
// the given duration and returns any errors returned by FindOne.
findOneFor := func(coll *mongo.Collection, timeout time.Duration, d time.Duration) []error {
errs := make([]error, 0)
start := time.Now()
for time.Since(start) <= d {
err := findOne(coll, timeout)
if err != nil {
errs = append(errs, err)
}
}
return errs
}

// Calculate the maximum observed round-trip time by measuring the duration of some FindOne
// operations and picking the max.
var maxRTT time.Duration
for i := 0; i < 50; i++ {
start := time.Now()
err := findOne(mt.Coll, 10*time.Second)
assert.Nil(t, err, "FindOne error: %v", err)
duration := time.Since(start)
if duration > maxRTT {
maxRTT = duration
}
}
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")

// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
// size pools, to test how the client handles traffic spikes using different connection pool
// configurations.
maxPoolSizes := []uint64{0, 1, 10, 100}
for _, maxPoolSize := range maxPoolSizes {
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
_, err := mt.Coll.InsertOne(context.Background(), doc)
assert.Nil(mt, err, "InsertOne error: %v", err)

// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
// prevent spurious test failures due to extremely low timeouts.
timeout := maxRTT * 10
if timeout < 10*time.Millisecond {
timeout = 10 * time.Millisecond
}
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)

// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
// are no errors.
errs := findOneFor(mt.Coll, timeout, 1*time.Second)
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)

// Simulate an extreme traffic spike by running 1,000 FindOne loops in parallel for 10
// seconds and expect at least some errors to occur.
g := new(errgroup.Group)
for i := 0; i < 1000; i++ {
g.Go(func() error {
errs := findOneFor(mt.Coll, timeout, 10*time.Second)
if len(errs) == 0 {
return nil
}
return errs[len(errs)-1]
})
}
err = g.Wait()
assert.NotNil(mt, err, "expected at least one error, got nil")

// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
// connection errors to stop.
_ = findOneFor(mt.Coll, timeout, 1*time.Second)

// Simulate normal traffic again for 1 second and assert that there are no errors.
errs = findOneFor(mt.Coll, timeout, 1*time.Second)
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
})
}
})
}
11 changes: 6 additions & 5 deletions mongo/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"

"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -103,7 +104,7 @@ type T struct {
dataLake *bool
ssl *bool
collCreateOpts bson.D
connsCheckedOut int // net number of connections checked out during test execution
connsCheckedOut int64 // net number of connections checked out during test execution
requireAPIVersion *bool

// options copied to sub-tests
Expand Down Expand Up @@ -231,7 +232,7 @@ func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
// store number of sessions and connections checked out here but assert that they're equal to 0 after
// cleaning up test resources to make sure resources are always cleared
sessions := sub.Client.NumberSessionsInProgress()
conns := sub.connsCheckedOut
conns := sub.NumberConnectionsCheckedOut()

if sub.clientType != Mock {
sub.ClearFailPoints()
Expand Down Expand Up @@ -369,7 +370,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {

// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
func (t *T) NumberConnectionsCheckedOut() int {
return t.connsCheckedOut
return int(atomic.LoadInt64(&t.connsCheckedOut))
}

// ClearEvents clears the existing command monitoring events.
Expand Down Expand Up @@ -594,9 +595,9 @@ func (t *T) createTestClient() {

switch evt.Type {
case event.GetSucceeded:
t.connsCheckedOut++
atomic.AddInt64(&t.connsCheckedOut, 1)
case event.ConnectionReturned:
t.connsCheckedOut--
atomic.AddInt64(&t.connsCheckedOut, -1)
}
},
})
Expand Down