Skip to content

GODRIVER-2068 Use monitor.TestPoolMonitor to replace most connection pool monitors in tests. #890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 0 additions & 70 deletions internal/testutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@ import (
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.mongodb.org/mongo-driver/x/mongo/driver/ocsp"
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
)

var connectionString connstring.ConnString
var connectionStringOnce sync.Once
var connectionStringErr error
var liveTopology *topology.Topology
var liveSessionPool *session.Pool
var liveTopologyOnce sync.Once
var liveTopologyErr error
var monitoredTopology *topology.Topology
var monitoredSessionPool *session.Pool
var monitoredTopologyOnce sync.Once
var monitoredTopologyErr error

// AddOptionsToURI appends connection string options to a URI.
func AddOptionsToURI(uri string, opts ...string) string {
Expand Down Expand Up @@ -119,61 +113,6 @@ func MonitoredTopology(t *testing.T, dbName string, monitor *event.CommandMonito
return monitoredTopology
}

// GlobalMonitoredTopology gets the globally configured topology and attaches a command monitor.
func GlobalMonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topology.Topology {
cs := ConnString(t)
opts := []topology.Option{
topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return cs }),
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
return append(
opts,
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
return append(
opts,
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
return monitor
}),
topology.WithOCSPCache(func(ocsp.Cache) ocsp.Cache {
return ocsp.NewCache()
}),
)
}),
)
}),
}

monitoredTopologyOnce.Do(func() {
var err error
monitoredTopology, err = topology.New(opts...)
if err != nil {
monitoredTopologyErr = err
} else {
_ = monitoredTopology.Connect()

err = operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dropDatabase", 1))).
Database(DBName(t)).ServerSelector(description.WriteSelector()).Deployment(monitoredTopology).Execute(context.Background())

require.NoError(t, err)

sub, err := monitoredTopology.Subscribe()
require.NoError(t, err)
monitoredSessionPool = session.NewPool(sub.Updates)
}
})

if monitoredTopologyErr != nil {
t.Fatal(monitoredTopologyErr)
}

return monitoredTopology
}

// GlobalMonitoredSessionPool returns the globally configured session pool.
// Must be called after GlobalMonitoredTopology()
func GlobalMonitoredSessionPool() *session.Pool {
return monitoredSessionPool
}

// Topology gets the globally configured topology.
func Topology(t *testing.T) *topology.Topology {
cs := ConnString(t)
Expand Down Expand Up @@ -205,10 +144,6 @@ func Topology(t *testing.T) *topology.Topology {
err = operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dropDatabase", 1))).
Database(DBName(t)).ServerSelector(description.WriteSelector()).Deployment(liveTopology).Execute(context.Background())
require.NoError(t, err)

sub, err := liveTopology.Subscribe()
require.NoError(t, err)
liveSessionPool = session.NewPool(sub.Updates)
}
})

Expand All @@ -219,11 +154,6 @@ func Topology(t *testing.T) *topology.Topology {
return liveTopology
}

// SessionPool gets the globally configured session pool. Must be called after Topology().
func SessionPool() *session.Pool {
return liveSessionPool
}

// TopologyWithConnString takes a connection string and returns a connected
// topology, or else bails out of testing
func TopologyWithConnString(t *testing.T, cs connstring.ConnString) *topology.Topology {
Expand Down
78 changes: 78 additions & 0 deletions internal/testutil/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (C) MongoDB, Inc. 2022-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

// Package monitor provides test types that are used to monitor client state and actions via the
// various monitor types supported by the driver.
package monitor

import (
"sync"

"go.mongodb.org/mongo-driver/event"
)

// TestPoolMonitor exposes an *event.TestPoolMonitor and collects all events logged to that
// *event.TestPoolMonitor. It is safe to use from multiple concurrent goroutines.
type TestPoolMonitor struct {
*event.PoolMonitor

events []*event.PoolEvent
mu sync.RWMutex
}

func NewTestPoolMonitor() *TestPoolMonitor {
tpm := &TestPoolMonitor{
events: make([]*event.PoolEvent, 0),
}
tpm.PoolMonitor = &event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
tpm.mu.Lock()
defer tpm.mu.Unlock()
tpm.events = append(tpm.events, evt)
},
}
return tpm
}

// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
// true to include the event in the result).
func (tpm *TestPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
tpm.mu.RLock()
defer tpm.mu.RUnlock()

for _, evt := range tpm.events {
keep := true
for _, filter := range filters {
if !filter(evt) {
keep = false
break
}
}
if keep {
filtered = append(filtered, evt)
}
}

return filtered
}

// ClearEvents will reset the events collected by the testPoolMonitor.
func (tpm *TestPoolMonitor) ClearEvents() {
tpm.mu.Lock()
defer tpm.mu.Unlock()
tpm.events = tpm.events[:0]
}

// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
// recorded by the testPoolMonitor.
func (tpm *TestPoolMonitor) IsPoolCleared() bool {
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Type == event.PoolCleared
})
return len(poolClearedEvents) > 0
}
46 changes: 0 additions & 46 deletions internal/testutil/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package testutil // import "go.mongodb.org/mongo-driver/internal/testutil"
import (
"context"
"os"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -19,31 +18,8 @@ import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
)

// AutoCreateIndexes creates an index in the test cluster.
func AutoCreateIndexes(t *testing.T, keys []string) {
elems := make([][]byte, 0, len(keys))
for _, k := range keys {
elems = append(elems, bsoncore.AppendInt32Element(nil, k, 1))
}
name := strings.Join(keys, "_")
indexes := bsoncore.BuildDocumentFromElements(nil,
bsoncore.AppendDocumentElement(nil, "key", bsoncore.BuildDocumentFromElements(nil,
elems...)),
bsoncore.AppendStringElement(nil, "name", name),
)
err := operation.NewCreateIndexes(indexes).Collection(ColName(t)).Database(DBName(t)).
Deployment(Topology(t)).ServerSelector(description.WriteSelector()).Execute(context.Background())
require.NoError(t, err)
}

// AutoDropCollection drops the collection in the test cluster.
func AutoDropCollection(t *testing.T) {
DropCollection(t, DBName(t), ColName(t))
}

// DropCollection drops the collection in the test cluster.
func DropCollection(t *testing.T, dbname, colname string) {
err := operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendStringElement(nil, "drop", colname))).
Expand All @@ -66,28 +42,6 @@ func InsertDocs(t *testing.T, dbname, colname string, writeConcern *writeconcern
require.NoError(t, err)
}

// EnableMaxTimeFailPoint turns on the max time fail point in the test cluster.
func EnableMaxTimeFailPoint(t *testing.T, s *topology.Server) error {
cmd := bsoncore.BuildDocumentFromElements(nil,
bsoncore.AppendStringElement(nil, "configureFailPoint", "maxTimeAlwaysTimeOut"),
bsoncore.AppendStringElement(nil, "mode", "alwaysOn"),
)
return operation.NewCommand(cmd).
Database("admin").Deployment(driver.SingleServerDeployment{Server: s}).
Execute(context.Background())
}

// DisableMaxTimeFailPoint turns off the max time fail point in the test cluster.
func DisableMaxTimeFailPoint(t *testing.T, s *topology.Server) {
cmd := bsoncore.BuildDocumentFromElements(nil,
bsoncore.AppendStringElement(nil, "configureFailPoint", "maxTimeAlwaysTimeOut"),
bsoncore.AppendStringElement(nil, "mode", "off"),
)
_ = operation.NewCommand(cmd).
Database("admin").Deployment(driver.SingleServerDeployment{Server: s}).
Execute(context.Background())
}

// RunCommand runs an arbitrary command on a given database of target server
func RunCommand(t *testing.T, s driver.Server, db string, cmd bsoncore.Document) (bsoncore.Document, error) {
op := operation.NewCommand(cmd).
Expand Down
48 changes: 26 additions & 22 deletions mongo/integration/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/internal/testutil/monitor"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -503,23 +504,23 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
})
})

customDeploymentClientOpts := options.Client().
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc).
SetRetryReads(false)
customDeploymentOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
ClientOptions(customDeploymentClientOpts).
CreateClient(false)
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
// by ChangeStream when executing an aggregate.

mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
clearPoolChan()
tpm := monitor.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc).
SetRetryReads(false))

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Expand All @@ -533,10 +534,16 @@ func TestChangeStream_ReplicaSet(t *testing.T) {

_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
clearPoolChan()
tpm := monitor.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc).
SetRetryReads(false))

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Expand All @@ -557,12 +564,13 @@ func TestChangeStream_ReplicaSet(t *testing.T) {

assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false (iteration error %v)",
cs.Err())
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
clearPoolChan()
mt.Run("errors are processed for SDAM on retried aggregate", func(mt *mtest.T) {
tpm := monitor.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetRetryReads(true))

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Expand All @@ -578,14 +586,10 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")

var numClearedEvents int
for len(poolChan) > 0 {
curr := <-poolChan
if curr.Type == event.PoolCleared {
numClearedEvents++
}
}
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
clearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Type == event.PoolCleared
})
assert.Equal(mt, 2, len(clearedEvents), "expected two PoolCleared events, got %d", len(clearedEvents))
})
})
// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"
Expand Down
5 changes: 3 additions & 2 deletions mongo/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.mongodb.org/mongo-driver/internal"
"go.mongodb.org/mongo-driver/internal/testutil"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/internal/testutil/monitor"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -558,7 +559,7 @@ func TestClient(t *testing.T) {

// Reset the client with a dialer that delays all network round trips by 300ms and set the
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples.
tpm := newTestPoolMonitor()
tpm := monitor.NewTestPoolMonitor()
mt.ResetClient(options.Client().
SetPoolMonitor(tpm.PoolMonitor).
SetDialer(newSlowConnDialer(300 * time.Millisecond)).
Expand Down Expand Up @@ -733,7 +734,7 @@ func TestClientStress(t *testing.T) {
// pool configurations.
maxPoolSizes := []uint64{1, 10, 100}
for _, maxPoolSize := range maxPoolSizes {
tpm := newTestPoolMonitor()
tpm := monitor.NewTestPoolMonitor()
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(
options.Client().
SetPoolMonitor(tpm.PoolMonitor).
Expand Down
Loading