Skip to content

Commit dbb3f1c

Browse files
committed
Enable ClientStressTest and fix pool test.
1 parent ea4f951 commit dbb3f1c

File tree

2 files changed

+107
-93
lines changed

2 files changed

+107
-93
lines changed

mongo/integration/client_test.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.mongodb.org/mongo-driver/bson/bsoncodec"
2020
"go.mongodb.org/mongo-driver/bson/bsonrw"
2121
"go.mongodb.org/mongo-driver/bson/primitive"
22+
"go.mongodb.org/mongo-driver/event"
2223
"go.mongodb.org/mongo-driver/internal"
2324
"go.mongodb.org/mongo-driver/internal/testutil"
2425
"go.mongodb.org/mongo-driver/internal/testutil/assert"
@@ -469,15 +470,13 @@ func TestClient(t *testing.T) {
469470
}
470471

471472
func TestClientStress(t *testing.T) {
472-
// TODO: Enable with GODRIVER-2038.
473-
t.Skip("TODO: Enable with GODRIVER-2038")
474-
475473
if testing.Short() {
476474
t.Skip("skipping integration test in short mode")
477475
}
478476

479477
mtOpts := mtest.NewOptions().CreateClient(false)
480478
mt := mtest.New(t, mtOpts)
479+
defer mt.Close()
481480

482481
// Test that a Client can recover from a massive traffic spike after the traffic spike is over.
483482
mt.Run("Client recovers from traffic spike", func(mt *mtest.T) {
@@ -505,6 +504,7 @@ func TestClientStress(t *testing.T) {
505504
if err != nil {
506505
errs = append(errs, err)
507506
}
507+
time.Sleep(10 * time.Microsecond)
508508
}
509509
return errs
510510
}
@@ -523,34 +523,52 @@ func TestClientStress(t *testing.T) {
523523
}
524524
assert.True(mt, maxRTT > 0, "RTT must be greater than 0")
525525

526-
// Run tests with various "maxPoolSize" values, including 1-connection pools and unlimited
527-
// size pools, to test how the client handles traffic spikes using different connection pool
528-
// configurations.
526+
// Run tests with various "maxPoolSize" values, including 1-connection pools and the default
527+
// size of 100, to test how the client handles traffic spikes using different connection
528+
// pool configurations.
529529
maxPoolSizes := []uint64{0, 1, 10, 100}
530530
for _, maxPoolSize := range maxPoolSizes {
531-
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(options.Client().SetMaxPoolSize(maxPoolSize))
531+
tpm := newTestPoolMonitor()
532+
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(
533+
options.Client().
534+
SetPoolMonitor(tpm.PoolMonitor).
535+
SetMaxPoolSize(maxPoolSize))
532536
mt.RunOpts(fmt.Sprintf("maxPoolSize %d", maxPoolSize), maxPoolSizeOpt, func(mt *mtest.T) {
537+
// Print the count of connection created, connection closed, and pool clear events
538+
// collected during the test to help with debugging.
539+
defer func() {
540+
created := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionCreated }))
541+
closed := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.ConnectionClosed }))
542+
poolCleared := len(tpm.Events(func(e *event.PoolEvent) bool { return e.Type == event.PoolCleared }))
543+
mt.Logf("Connections created: %d, connections closed: %d, pool clears: %d", created, closed, poolCleared)
544+
}()
545+
533546
doc := bson.D{{Key: "_id", Value: oid}, {Key: "key", Value: "value"}}
534547
_, err := mt.Coll.InsertOne(context.Background(), doc)
535548
assert.Nil(mt, err, "InsertOne error: %v", err)
536549

537-
// Set the timeout to be 10x the maximum observed RTT. Use a minimum 10ms timeout to
550+
// Set the timeout to be 100x the maximum observed RTT. Use a minimum 100ms timeout to
538551
// prevent spurious test failures due to extremely low timeouts.
539-
timeout := maxRTT * 10
540-
if timeout < 10*time.Millisecond {
541-
timeout = 10 * time.Millisecond
552+
timeout := maxRTT * 100
553+
minTimeout := 100 * time.Millisecond
554+
if timeout < minTimeout {
555+
timeout = minTimeout
542556
}
543-
t.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
557+
mt.Logf("Max RTT %v; using a timeout of %v", maxRTT, timeout)
558+
559+
// Warm up the client for 1 second to allow connections to be established. Ignore
560+
// any errors.
561+
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
544562

545563
// Simulate normal traffic by running one FindOne loop for 1 second and assert that there
546564
// are no errors.
547565
errs := findOneFor(mt.Coll, timeout, 1*time.Second)
548566
assert.True(mt, len(errs) == 0, "expected no errors, but got %d (%v)", len(errs), errs)
549567

550-
// Simulate an extreme traffic spike by running 1,000 FindOne loops in parallel for 10
568+
// Simulate an extreme traffic spike by running 2,000 FindOne loops in parallel for 10
551569
// seconds and expect at least some errors to occur.
552570
g := new(errgroup.Group)
553-
for i := 0; i < 1000; i++ {
571+
for i := 0; i < 2000; i++ {
554572
g.Go(func() error {
555573
errs := findOneFor(mt.Coll, timeout, 10*time.Second)
556574
if len(errs) == 0 {
@@ -560,11 +578,11 @@ func TestClientStress(t *testing.T) {
560578
})
561579
}
562580
err = g.Wait()
563-
assert.NotNil(mt, err, "expected at least one error, got nil")
581+
mt.Logf("Error from extreme traffic spike (errors are expected): %v", err)
564582

565-
// Simulate normal traffic again for 1 second. Ignore any errors to allow any outstanding
583+
// Simulate normal traffic again for 5 second. Ignore any errors to allow any outstanding
566584
// connection errors to stop.
567-
_ = findOneFor(mt.Coll, timeout, 1*time.Second)
585+
_ = findOneFor(mt.Coll, timeout, 5*time.Second)
568586

569587
// Simulate normal traffic again for 1 second and assert that there are no errors.
570588
errs = findOneFor(mt.Coll, timeout, 1*time.Second)

x/mongo/driver/topology/pool_test.go

Lines changed: 72 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -371,82 +371,6 @@ func TestPool(t *testing.T) {
371371
t.Run("checkOut", func(t *testing.T) {
372372
t.Parallel()
373373

374-
t.Run("checkOut on a full pool should return a new connection as soon as the pool isn't full", func(t *testing.T) {
375-
t.Parallel()
376-
377-
cleanup := make(chan struct{})
378-
defer close(cleanup)
379-
addr := bootstrapConnections(t, 3, func(nc net.Conn) {
380-
<-cleanup
381-
_ = nc.Close()
382-
})
383-
384-
d := newdialer(&net.Dialer{})
385-
p := newPool(
386-
poolConfig{
387-
Address: address.Address(addr.String()),
388-
MaxPoolSize: 2,
389-
},
390-
WithDialer(func(Dialer) Dialer { return d }),
391-
)
392-
err := p.connect()
393-
noerr(t, err)
394-
395-
// Check out two connections (MaxPoolSize) so that subsequent checkOut() calls should
396-
// block until a connection is checked back in or removed from the pool.
397-
c, err := p.checkOut(context.Background())
398-
noerr(t, err)
399-
_, err = p.checkOut(context.Background())
400-
noerr(t, err)
401-
assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connection")
402-
assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
403-
assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
404-
405-
// Run a checkOut() with timeout and expect it to time out because the pool is at
406-
// MaxPoolSize and no connections are checked in or removed from the pool.
407-
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
408-
defer cancel()
409-
_, err = p.checkOut(ctx)
410-
assert.Equalf(t, context.DeadlineExceeded, err.(WaitQueueTimeoutError).Wrapped, "TODO")
411-
412-
// Start a goroutine that calls checkOut() without a timeout after recording the start
413-
// time of the checkOut().
414-
var start time.Time
415-
waiting := make(chan struct{})
416-
var wg sync.WaitGroup
417-
wg.Add(1)
418-
go func() {
419-
defer wg.Done()
420-
421-
close(waiting)
422-
start = time.Now()
423-
_, err := p.checkOut(context.Background())
424-
noerr(t, err)
425-
}()
426-
427-
// Close a connection, wait for the checkOut() goroutine to signal it is waiting for a
428-
// checkOut(), then check in the closed connection. Expect that the checkOut() goroutine
429-
// exits within 5ms of checking in the closed connection.
430-
c.close()
431-
<-waiting
432-
err = p.checkIn(c)
433-
noerr(t, err)
434-
wg.Wait()
435-
assert.WithinDurationf(
436-
t,
437-
start,
438-
time.Now(),
439-
5*time.Millisecond,
440-
"expected checkOut to complete within 5ms of checking in a closed connection")
441-
442-
assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection")
443-
assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connection")
444-
assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
445-
assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
446-
447-
err = p.disconnect(context.Background())
448-
noerr(t, err)
449-
})
450374
t.Run("return error when attempting to create new connection", func(t *testing.T) {
451375
t.Parallel()
452376

@@ -686,6 +610,78 @@ func TestPool(t *testing.T) {
686610
noerr(t, err)
687611
wg.Wait()
688612
})
613+
// Test that checkOut() on a full connection pool creates and returns a new connection
614+
// immediately as soon as the pool is no longer full.
615+
t.Run("should return a new connection as soon as the pool isn't full", func(t *testing.T) {
616+
t.Parallel()
617+
618+
cleanup := make(chan struct{})
619+
defer close(cleanup)
620+
addr := bootstrapConnections(t, 3, func(nc net.Conn) {
621+
<-cleanup
622+
_ = nc.Close()
623+
})
624+
625+
d := newdialer(&net.Dialer{})
626+
p := newPool(
627+
poolConfig{
628+
Address: address.Address(addr.String()),
629+
MaxPoolSize: 2,
630+
},
631+
WithDialer(func(Dialer) Dialer { return d }),
632+
)
633+
err := p.connect()
634+
noerr(t, err)
635+
636+
// Check out two connections (MaxPoolSize) so that subsequent checkOut() calls should
637+
// block until a connection is checked back in or removed from the pool.
638+
c, err := p.checkOut(context.Background())
639+
noerr(t, err)
640+
_, err = p.checkOut(context.Background())
641+
noerr(t, err)
642+
assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connection")
643+
assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
644+
assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
645+
646+
// Run a checkOut() with timeout and expect it to time out because the pool is at
647+
// MaxPoolSize and no connections are checked in or removed from the pool.
648+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
649+
defer cancel()
650+
_, err = p.checkOut(ctx)
651+
assert.Equalf(
652+
t,
653+
context.DeadlineExceeded,
654+
err.(WaitQueueTimeoutError).Wrapped,
655+
"expected wrapped error to be a context.DeadlineExceeded")
656+
657+
// Start a goroutine that closes one of the checked-out conections and checks it in.
658+
// Expect that the checked-in connection is closed and allows blocked checkOut() to
659+
// complete. Assert that the time between checking in the closed connection and when the
660+
// checkOut() completes is within 50ms.
661+
var start time.Time
662+
go func() {
663+
c.close()
664+
start = time.Now()
665+
err := p.checkIn(c)
666+
noerr(t, err)
667+
}()
668+
_, err = p.checkOut(context.Background())
669+
noerr(t, err)
670+
assert.WithinDurationf(
671+
t,
672+
time.Now(),
673+
start,
674+
50*time.Millisecond,
675+
"expected checkOut to complete within 50ms of checking in a closed connection")
676+
677+
assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection")
678+
assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connection")
679+
assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
680+
assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
681+
682+
err = p.disconnect(context.Background())
683+
noerr(t, err)
684+
})
689685
})
690686
t.Run("checkIn", func(t *testing.T) {
691687
t.Parallel()

0 commit comments

Comments
 (0)