Skip to content

Commit 101e55c

Browse files
committed
GODRIVER-2078 Add a Client stress test that simulates traffic overload and recovery.
1 parent 22266fc commit 101e55c

File tree

2 files changed

+111
-5
lines changed

2 files changed

+111
-5
lines changed

mongo/integration/client_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.mongodb.org/mongo-driver/bson"
1919
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2020
"go.mongodb.org/mongo-driver/bson/bsonrw"
21+
"go.mongodb.org/mongo-driver/bson/primitive"
2122
"go.mongodb.org/mongo-driver/internal"
2223
"go.mongodb.org/mongo-driver/internal/testutil"
2324
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -28,6 +29,7 @@ import (
2829
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2930
"go.mongodb.org/mongo-driver/x/mongo/driver"
3031
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
32+
"golang.org/x/sync/errgroup"
3133
)
3234

3335
var noClientOpts = mtest.NewOptions().CreateClient(false)
@@ -472,3 +474,106 @@ type proxyMessage struct {
472474
sent wiremessage.WireMessage
473475
received wiremessage.WireMessage
474476
}
477+
478+
func TestClientStress(t *testing.T) {
479+
// t.Skip("TODO: Enable with GODRIVER-2038")
480+
481+
mtOpts := mtest.NewOptions().
482+
MinServerVersion("3.6").
483+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single).
484+
CreateClient(false)
485+
mt := mtest.New(t, mtOpts)
486+
487+
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
488+
oid := primitive.NewObjectID()
489+
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
490+
_, err := mt.Coll.InsertOne(context.Background(), doc)
491+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
492+
493+
// findOne calls FindOne("_id": oid) on the given collection and with the given timeout. It
494+
// returns any errors.
495+
findOne := func(coll *mongo.Collection, timeout time.Duration) error {
496+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
497+
defer cancel()
498+
var res map[string]interface{}
499+
return coll.FindOne(ctx, bson.D{{Key: "_id", Value: oid}}).Decode(&res)
500+
}
501+
502+
// findOneFor calls FindOne on the given collection and with the given timeout in a loop for
503+
// the given duration and returns any errors returned by FindOne.
504+
findOneFor := func(coll *mongo.Collection, timeout time.Duration, d time.Duration) []error {
505+
errs := make([]error, 0)
506+
start := time.Now()
507+
for time.Since(start) <= d {
508+
err := findOne(coll, timeout)
509+
if err != nil {
510+
errs = append(errs, err)
511+
}
512+
}
513+
return errs
514+
}
515+
516+
// Calculate the maximum observed round-trip time by measuring the duration of some FindOne
517+
// operations and picking the max.
518+
var maxRTT time.Duration
519+
for i := 0; i < 50; i++ {
520+
start := time.Now()
521+
err := findOne(mt.Coll, 10*time.Second)
522+
assert.Nil(t, err, "unexpected error calling FindOne: %v", err)
523+
duration := time.Since(start)
524+
if duration > maxRTT {
525+
maxRTT = duration
526+
}
527+
}
528+
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")
529+
530+
// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
531+
// size pools, to test how the client handles traffic spikes using different connection pool
532+
// configurations.
533+
maxPoolSizes := []uint64{0, 1, 10, 100}
534+
for _, maxPoolSize := range maxPoolSizes {
535+
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
536+
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
537+
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
538+
_, err := mt.Coll.InsertOne(context.Background(), doc)
539+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
540+
541+
// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
542+
// prevent spurious test failures due to extremely low timeouts.
543+
timeout := maxRTT * 10
544+
if timeout < 10*time.Millisecond {
545+
timeout = 10 * time.Millisecond
546+
}
547+
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
548+
549+
// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
550+
// are no errors.
551+
errs := findOneFor(mt.Coll, timeout, 1*time.Second)
552+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
553+
554+
// Simulate an extreme traffic spike by running 1,000 FindOne loops in parallel for 10
555+
// seconds and expect at least some errors to occur.
556+
g := new(errgroup.Group)
557+
for i := 0; i < 1000; i++ {
558+
g.Go(func() error {
559+
errs := findOneFor(mt.Coll, timeout, 10*time.Second)
560+
if len(errs) == 0 {
561+
return nil
562+
}
563+
return errs[len(errs)-1]
564+
})
565+
}
566+
err = g.Wait()
567+
assert.NotNil(mt, err, "expected at least one error, got nil")
568+
569+
// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
570+
// connection errors to stop.
571+
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
572+
573+
// Simulate normal traffic again for 1 second and assert that there are no errors.
574+
errs = findOneFor(mt.Coll, timeout, 1*time.Second)
575+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
576+
})
577+
}
578+
})
579+
}

mongo/integration/mtest/mongotest.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516

1617
"go.mongodb.org/mongo-driver/bson"
@@ -103,7 +104,7 @@ type T struct {
103104
dataLake *bool
104105
ssl *bool
105106
collCreateOpts bson.D
106-
connsCheckedOut int // net number of connections checked out during test execution
107+
connsCheckedOut int64 // net number of connections checked out during test execution
107108
requireAPIVersion *bool
108109

109110
// options copied to sub-tests
@@ -231,7 +232,7 @@ func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
231232
// store number of sessions and connections checked out here but assert that they're equal to 0 after
232233
// cleaning up test resources to make sure resources are always cleared
233234
sessions := sub.Client.NumberSessionsInProgress()
234-
conns := sub.connsCheckedOut
235+
conns := sub.NumberConnectionsCheckedOut()
235236

236237
if sub.clientType != Mock {
237238
sub.ClearFailPoints()
@@ -369,7 +370,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {
369370

370371
// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
371372
func (t *T) NumberConnectionsCheckedOut() int {
372-
return t.connsCheckedOut
373+
return int(atomic.LoadInt64(&t.connsCheckedOut))
373374
}
374375

375376
// ClearEvents clears the existing command monitoring events.
@@ -594,9 +595,9 @@ func (t *T) createTestClient() {
594595

595596
switch evt.Type {
596597
case event.GetSucceeded:
597-
t.connsCheckedOut++
598+
atomic.AddInt64(&t.connsCheckedOut, 1)
598599
case event.ConnectionReturned:
599-
t.connsCheckedOut--
600+
atomic.AddInt64(&t.connsCheckedOut, -1)
600601
}
601602
},
602603
})

0 commit comments

Comments
 (0)