Skip to content

Commit d400a05

Browse files
committed
GODRIVER-2078 Add a Client stress test that simulates traffic overload and recovery.
1 parent 2fa2b7c commit d400a05

File tree

2 files changed

+113
-5
lines changed

2 files changed

+113
-5
lines changed

mongo/integration/client_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.mongodb.org/mongo-driver/bson"
1919
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2020
"go.mongodb.org/mongo-driver/bson/bsonrw"
21+
"go.mongodb.org/mongo-driver/bson/primitive"
2122
"go.mongodb.org/mongo-driver/internal"
2223
"go.mongodb.org/mongo-driver/internal/testutil"
2324
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -28,6 +29,7 @@ import (
2829
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2930
"go.mongodb.org/mongo-driver/x/mongo/driver"
3031
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
32+
"golang.org/x/sync/errgroup"
3133
)
3234

3335
var noClientOpts = mtest.NewOptions().CreateClient(false)
@@ -472,3 +474,108 @@ type proxyMessage struct {
472474
sent wiremessage.WireMessage
473475
received wiremessage.WireMessage
474476
}
477+
478+
func TestClientStress(t *testing.T) {
479+
// TODO: Enable with GODRIVER-2038.
480+
t.Skip("TODO: Enable with GODRIVER-2038")
481+
482+
mtOpts := mtest.NewOptions().
483+
MinServerVersion("3.6").
484+
Topologies(mtest.ReplicaSet, mtest.Sharded, mtest.Single).
485+
CreateClient(false)
486+
mt := mtest.New(t, mtOpts)
487+
488+
// Test that a Client can recover from a massive traffic spikes after the traffic spike is over.
489+
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
490+
oid := primitive.NewObjectID()
491+
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
492+
_, err := mt.Coll.InsertOne(context.Background(), doc)
493+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
494+
495+
// findOne calls FindOne("_id": oid) on the given collection and with the given timeout. It
496+
// returns any errors.
497+
findOne := func(coll *mongo.Collection, timeout time.Duration) error {
498+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
499+
defer cancel()
500+
var res map[string]interface{}
501+
return coll.FindOne(ctx, bson.D{{Key: "_id", Value: oid}}).Decode(&res)
502+
}
503+
504+
// findOneFor calls FindOne on the given collection and with the given timeout in a loop for
505+
// the given duration and returns any errors returned by FindOne.
506+
findOneFor := func(coll *mongo.Collection, timeout time.Duration, d time.Duration) []error {
507+
errs := make([]error, 0)
508+
start := time.Now()
509+
for time.Since(start) <= d {
510+
err := findOne(coll, timeout)
511+
if err != nil {
512+
errs = append(errs, err)
513+
}
514+
}
515+
return errs
516+
}
517+
518+
// Calculate the maximum observed round-trip time by measuring the duration of some FindOne
519+
// operations and picking the max.
520+
var maxRTT time.Duration
521+
for i := 0; i < 50; i++ {
522+
start := time.Now()
523+
err := findOne(mt.Coll, 10*time.Second)
524+
assert.Nil(t, err, "unexpected error calling FindOne: %v", err)
525+
duration := time.Since(start)
526+
if duration > maxRTT {
527+
maxRTT = duration
528+
}
529+
}
530+
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")
531+
532+
// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
533+
// size pools, to test how the client handles traffic spikes using different connection pool
534+
// configurations.
535+
maxPoolSizes := []uint64{0, 1, 10, 100}
536+
for _, maxPoolSize := range maxPoolSizes {
537+
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
538+
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
539+
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
540+
_, err := mt.Coll.InsertOne(context.Background(), doc)
541+
assert.Nil(mt, err, "unexpected error inserting document: %v", err)
542+
543+
// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
544+
// prevent spurious test failures due to extremely low timeouts.
545+
timeout := maxRTT * 10
546+
if timeout < 10*time.Millisecond {
547+
timeout = 10 * time.Millisecond
548+
}
549+
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
550+
551+
// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
552+
// are no errors.
553+
errs := findOneFor(mt.Coll, timeout, 1*time.Second)
554+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
555+
556+
// Simulate an extreme traffic spike by running 1,000 FindOne loops in parallel for 10
557+
// seconds and expect at least some errors to occur.
558+
g := new(errgroup.Group)
559+
for i := 0; i < 1000; i++ {
560+
g.Go(func() error {
561+
errs := findOneFor(mt.Coll, timeout, 10*time.Second)
562+
if len(errs) == 0 {
563+
return nil
564+
}
565+
return errs[len(errs)-1]
566+
})
567+
}
568+
err = g.Wait()
569+
assert.NotNil(mt, err, "expected at least one error, got nil")
570+
571+
// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
572+
// connection errors to stop.
573+
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
574+
575+
// Simulate normal traffic again for 1 second and assert that there are no errors.
576+
errs = findOneFor(mt.Coll, timeout, 1*time.Second)
577+
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
578+
})
579+
}
580+
})
581+
}

mongo/integration/mtest/mongotest.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516

1617
"go.mongodb.org/mongo-driver/bson"
@@ -103,7 +104,7 @@ type T struct {
103104
dataLake *bool
104105
ssl *bool
105106
collCreateOpts bson.D
106-
connsCheckedOut int // net number of connections checked out during test execution
107+
connsCheckedOut int64 // net number of connections checked out during test execution
107108
requireAPIVersion *bool
108109

109110
// options copied to sub-tests
@@ -231,7 +232,7 @@ func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
231232
// store number of sessions and connections checked out here but assert that they're equal to 0 after
232233
// cleaning up test resources to make sure resources are always cleared
233234
sessions := sub.Client.NumberSessionsInProgress()
234-
conns := sub.connsCheckedOut
235+
conns := sub.NumberConnectionsCheckedOut()
235236

236237
if sub.clientType != Mock {
237238
sub.ClearFailPoints()
@@ -369,7 +370,7 @@ func (t *T) GetProxiedMessages() []*ProxyMessage {
369370

370371
// NumberConnectionsCheckedOut returns the number of connections checked out from the test Client.
371372
func (t *T) NumberConnectionsCheckedOut() int {
372-
return t.connsCheckedOut
373+
return int(atomic.LoadInt64(&t.connsCheckedOut))
373374
}
374375

375376
// ClearEvents clears the existing command monitoring events.
@@ -594,9 +595,9 @@ func (t *T) createTestClient() {
594595

595596
switch evt.Type {
596597
case event.GetSucceeded:
597-
t.connsCheckedOut++
598+
atomic.AddInt64(&t.connsCheckedOut, 1)
598599
case event.ConnectionReturned:
599-
t.connsCheckedOut--
600+
atomic.AddInt64(&t.connsCheckedOut, -1)
600601
}
601602
},
602603
})

0 commit comments

Comments
 (0)