Skip to content

Commit 3d60e91

Browse files
authored
GODRIVER-2068 Use monitor.TestPoolMonitor to replace most connection pool monitors in tests. (#890)
1 parent 5970415 commit 3d60e91

File tree

11 files changed

+150
-288
lines changed

11 files changed

+150
-288
lines changed

internal/testutil/config.go

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,15 @@ import (
2424
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
2525
"go.mongodb.org/mongo-driver/x/mongo/driver/ocsp"
2626
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
27-
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
2827
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
2928
)
3029

3130
var connectionString connstring.ConnString
3231
var connectionStringOnce sync.Once
3332
var connectionStringErr error
3433
var liveTopology *topology.Topology
35-
var liveSessionPool *session.Pool
3634
var liveTopologyOnce sync.Once
3735
var liveTopologyErr error
38-
var monitoredTopology *topology.Topology
39-
var monitoredSessionPool *session.Pool
40-
var monitoredTopologyOnce sync.Once
41-
var monitoredTopologyErr error
4236

4337
// AddOptionsToURI appends connection string options to a URI.
4438
func AddOptionsToURI(uri string, opts ...string) string {
@@ -119,61 +113,6 @@ func MonitoredTopology(t *testing.T, dbName string, monitor *event.CommandMonito
119113
return monitoredTopology
120114
}
121115

122-
// GlobalMonitoredTopology gets the globally configured topology and attaches a command monitor.
123-
func GlobalMonitoredTopology(t *testing.T, monitor *event.CommandMonitor) *topology.Topology {
124-
cs := ConnString(t)
125-
opts := []topology.Option{
126-
topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return cs }),
127-
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
128-
return append(
129-
opts,
130-
topology.WithConnectionOptions(func(opts ...topology.ConnectionOption) []topology.ConnectionOption {
131-
return append(
132-
opts,
133-
topology.WithMonitor(func(*event.CommandMonitor) *event.CommandMonitor {
134-
return monitor
135-
}),
136-
topology.WithOCSPCache(func(ocsp.Cache) ocsp.Cache {
137-
return ocsp.NewCache()
138-
}),
139-
)
140-
}),
141-
)
142-
}),
143-
}
144-
145-
monitoredTopologyOnce.Do(func() {
146-
var err error
147-
monitoredTopology, err = topology.New(opts...)
148-
if err != nil {
149-
monitoredTopologyErr = err
150-
} else {
151-
_ = monitoredTopology.Connect()
152-
153-
err = operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dropDatabase", 1))).
154-
Database(DBName(t)).ServerSelector(description.WriteSelector()).Deployment(monitoredTopology).Execute(context.Background())
155-
156-
require.NoError(t, err)
157-
158-
sub, err := monitoredTopology.Subscribe()
159-
require.NoError(t, err)
160-
monitoredSessionPool = session.NewPool(sub.Updates)
161-
}
162-
})
163-
164-
if monitoredTopologyErr != nil {
165-
t.Fatal(monitoredTopologyErr)
166-
}
167-
168-
return monitoredTopology
169-
}
170-
171-
// GlobalMonitoredSessionPool returns the globally configured session pool.
172-
// Must be called after GlobalMonitoredTopology()
173-
func GlobalMonitoredSessionPool() *session.Pool {
174-
return monitoredSessionPool
175-
}
176-
177116
// Topology gets the globally configured topology.
178117
func Topology(t *testing.T) *topology.Topology {
179118
cs := ConnString(t)
@@ -205,10 +144,6 @@ func Topology(t *testing.T) *topology.Topology {
205144
err = operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dropDatabase", 1))).
206145
Database(DBName(t)).ServerSelector(description.WriteSelector()).Deployment(liveTopology).Execute(context.Background())
207146
require.NoError(t, err)
208-
209-
sub, err := liveTopology.Subscribe()
210-
require.NoError(t, err)
211-
liveSessionPool = session.NewPool(sub.Updates)
212147
}
213148
})
214149

@@ -219,11 +154,6 @@ func Topology(t *testing.T) *topology.Topology {
219154
return liveTopology
220155
}
221156

222-
// SessionPool gets the globally configured session pool. Must be called after Topology().
223-
func SessionPool() *session.Pool {
224-
return liveSessionPool
225-
}
226-
227157
// TopologyWithConnString takes a connection string and returns a connected
228158
// topology, or else bails out of testing
229159
func TopologyWithConnString(t *testing.T, cs connstring.ConnString) *topology.Topology {

internal/testutil/monitor/monitor.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (C) MongoDB, Inc. 2022-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
// Package monitor provides test types that are used to monitor client state and actions via the
8+
// various monitor types supported by the driver.
9+
package monitor
10+
11+
import (
12+
"sync"
13+
14+
"go.mongodb.org/mongo-driver/event"
15+
)
16+
17+
// TestPoolMonitor exposes an *event.TestPoolMonitor and collects all events logged to that
18+
// *event.TestPoolMonitor. It is safe to use from multiple concurrent goroutines.
19+
type TestPoolMonitor struct {
20+
*event.PoolMonitor
21+
22+
events []*event.PoolEvent
23+
mu sync.RWMutex
24+
}
25+
26+
func NewTestPoolMonitor() *TestPoolMonitor {
27+
tpm := &TestPoolMonitor{
28+
events: make([]*event.PoolEvent, 0),
29+
}
30+
tpm.PoolMonitor = &event.PoolMonitor{
31+
Event: func(evt *event.PoolEvent) {
32+
tpm.mu.Lock()
33+
defer tpm.mu.Unlock()
34+
tpm.events = append(tpm.events, evt)
35+
},
36+
}
37+
return tpm
38+
}
39+
40+
// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
41+
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
42+
// true to include the event in the result).
43+
func (tpm *TestPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
44+
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
45+
tpm.mu.RLock()
46+
defer tpm.mu.RUnlock()
47+
48+
for _, evt := range tpm.events {
49+
keep := true
50+
for _, filter := range filters {
51+
if !filter(evt) {
52+
keep = false
53+
break
54+
}
55+
}
56+
if keep {
57+
filtered = append(filtered, evt)
58+
}
59+
}
60+
61+
return filtered
62+
}
63+
64+
// ClearEvents will reset the events collected by the testPoolMonitor.
65+
func (tpm *TestPoolMonitor) ClearEvents() {
66+
tpm.mu.Lock()
67+
defer tpm.mu.Unlock()
68+
tpm.events = tpm.events[:0]
69+
}
70+
71+
// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
72+
// recorded by the testPoolMonitor.
73+
func (tpm *TestPoolMonitor) IsPoolCleared() bool {
74+
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
75+
return evt.Type == event.PoolCleared
76+
})
77+
return len(poolClearedEvents) > 0
78+
}

internal/testutil/ops.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package testutil // import "go.mongodb.org/mongo-driver/internal/testutil"
99
import (
1010
"context"
1111
"os"
12-
"strings"
1312
"testing"
1413

1514
"github.com/stretchr/testify/require"
@@ -19,31 +18,8 @@ import (
1918
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
2019
"go.mongodb.org/mongo-driver/x/mongo/driver"
2120
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
22-
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
2321
)
2422

25-
// AutoCreateIndexes creates an index in the test cluster.
26-
func AutoCreateIndexes(t *testing.T, keys []string) {
27-
elems := make([][]byte, 0, len(keys))
28-
for _, k := range keys {
29-
elems = append(elems, bsoncore.AppendInt32Element(nil, k, 1))
30-
}
31-
name := strings.Join(keys, "_")
32-
indexes := bsoncore.BuildDocumentFromElements(nil,
33-
bsoncore.AppendDocumentElement(nil, "key", bsoncore.BuildDocumentFromElements(nil,
34-
elems...)),
35-
bsoncore.AppendStringElement(nil, "name", name),
36-
)
37-
err := operation.NewCreateIndexes(indexes).Collection(ColName(t)).Database(DBName(t)).
38-
Deployment(Topology(t)).ServerSelector(description.WriteSelector()).Execute(context.Background())
39-
require.NoError(t, err)
40-
}
41-
42-
// AutoDropCollection drops the collection in the test cluster.
43-
func AutoDropCollection(t *testing.T) {
44-
DropCollection(t, DBName(t), ColName(t))
45-
}
46-
4723
// DropCollection drops the collection in the test cluster.
4824
func DropCollection(t *testing.T, dbname, colname string) {
4925
err := operation.NewCommand(bsoncore.BuildDocument(nil, bsoncore.AppendStringElement(nil, "drop", colname))).
@@ -66,28 +42,6 @@ func InsertDocs(t *testing.T, dbname, colname string, writeConcern *writeconcern
6642
require.NoError(t, err)
6743
}
6844

69-
// EnableMaxTimeFailPoint turns on the max time fail point in the test cluster.
70-
func EnableMaxTimeFailPoint(t *testing.T, s *topology.Server) error {
71-
cmd := bsoncore.BuildDocumentFromElements(nil,
72-
bsoncore.AppendStringElement(nil, "configureFailPoint", "maxTimeAlwaysTimeOut"),
73-
bsoncore.AppendStringElement(nil, "mode", "alwaysOn"),
74-
)
75-
return operation.NewCommand(cmd).
76-
Database("admin").Deployment(driver.SingleServerDeployment{Server: s}).
77-
Execute(context.Background())
78-
}
79-
80-
// DisableMaxTimeFailPoint turns off the max time fail point in the test cluster.
81-
func DisableMaxTimeFailPoint(t *testing.T, s *topology.Server) {
82-
cmd := bsoncore.BuildDocumentFromElements(nil,
83-
bsoncore.AppendStringElement(nil, "configureFailPoint", "maxTimeAlwaysTimeOut"),
84-
bsoncore.AppendStringElement(nil, "mode", "off"),
85-
)
86-
_ = operation.NewCommand(cmd).
87-
Database("admin").Deployment(driver.SingleServerDeployment{Server: s}).
88-
Execute(context.Background())
89-
}
90-
9145
// RunCommand runs an arbitrary command on a given database of target server
9246
func RunCommand(t *testing.T, s driver.Server, db string, cmd bsoncore.Document) (bsoncore.Document, error) {
9347
op := operation.NewCommand(cmd).

mongo/integration/change_stream_test.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/bson/primitive"
1616
"go.mongodb.org/mongo-driver/event"
1717
"go.mongodb.org/mongo-driver/internal/testutil/assert"
18+
"go.mongodb.org/mongo-driver/internal/testutil/monitor"
1819
"go.mongodb.org/mongo-driver/mongo"
1920
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
2021
"go.mongodb.org/mongo-driver/mongo/options"
@@ -503,23 +504,23 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
503504
})
504505
})
505506

506-
customDeploymentClientOpts := options.Client().
507-
SetPoolMonitor(poolMonitor).
508-
SetWriteConcern(mtest.MajorityWc).
509-
SetReadConcern(mtest.MajorityRc).
510-
SetRetryReads(false)
511507
customDeploymentOpts := mtest.NewOptions().
512508
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
513509
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
514-
ClientOptions(customDeploymentClientOpts).
515510
CreateClient(false)
516511
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
517512
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
518513
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
519514
// by ChangeStream when executing an aggregate.
520515

521516
mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
522-
clearPoolChan()
517+
tpm := monitor.NewTestPoolMonitor()
518+
mt.ResetClient(options.Client().
519+
SetPoolMonitor(tpm.PoolMonitor).
520+
SetWriteConcern(mtest.MajorityWc).
521+
SetReadConcern(mtest.MajorityRc).
522+
SetRetryReads(false))
523+
523524
mt.SetFailPoint(mtest.FailPoint{
524525
ConfigureFailPoint: "failCommand",
525526
Mode: mtest.FailPointMode{
@@ -533,10 +534,16 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
533534

534535
_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
535536
assert.NotNil(mt, err, "expected Watch error, got nil")
536-
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
537+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
537538
})
538539
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
539-
clearPoolChan()
540+
tpm := monitor.NewTestPoolMonitor()
541+
mt.ResetClient(options.Client().
542+
SetPoolMonitor(tpm.PoolMonitor).
543+
SetWriteConcern(mtest.MajorityWc).
544+
SetReadConcern(mtest.MajorityRc).
545+
SetRetryReads(false))
546+
540547
mt.SetFailPoint(mtest.FailPoint{
541548
ConfigureFailPoint: "failCommand",
542549
Mode: mtest.FailPointMode{
@@ -557,12 +564,13 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
557564

558565
assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false (iteration error %v)",
559566
cs.Err())
560-
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
567+
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
561568
})
562-
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
563-
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
564-
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
565-
clearPoolChan()
569+
mt.Run("errors are processed for SDAM on retried aggregate", func(mt *mtest.T) {
570+
tpm := monitor.NewTestPoolMonitor()
571+
mt.ResetClient(options.Client().
572+
SetPoolMonitor(tpm.PoolMonitor).
573+
SetRetryReads(true))
566574

567575
mt.SetFailPoint(mtest.FailPoint{
568576
ConfigureFailPoint: "failCommand",
@@ -578,14 +586,10 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
578586
_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
579587
assert.NotNil(mt, err, "expected Watch error, got nil")
580588

581-
var numClearedEvents int
582-
for len(poolChan) > 0 {
583-
curr := <-poolChan
584-
if curr.Type == event.PoolCleared {
585-
numClearedEvents++
586-
}
587-
}
588-
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
589+
clearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
590+
return evt.Type == event.PoolCleared
591+
})
592+
assert.Equal(mt, 2, len(clearedEvents), "expected two PoolCleared events, got %d", len(clearedEvents))
589593
})
590594
})
591595
// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"

mongo/integration/client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.mongodb.org/mongo-driver/internal"
2525
"go.mongodb.org/mongo-driver/internal/testutil"
2626
"go.mongodb.org/mongo-driver/internal/testutil/assert"
27+
"go.mongodb.org/mongo-driver/internal/testutil/monitor"
2728
"go.mongodb.org/mongo-driver/mongo"
2829
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
2930
"go.mongodb.org/mongo-driver/mongo/options"
@@ -558,7 +559,7 @@ func TestClient(t *testing.T) {
558559

559560
// Reset the client with a dialer that delays all network round trips by 300ms and set the
560561
// heartbeat interval to 100ms to reduce the time it takes to collect RTT samples.
561-
tpm := newTestPoolMonitor()
562+
tpm := monitor.NewTestPoolMonitor()
562563
mt.ResetClient(options.Client().
563564
SetPoolMonitor(tpm.PoolMonitor).
564565
SetDialer(newSlowConnDialer(300 * time.Millisecond)).
@@ -733,7 +734,7 @@ func TestClientStress(t *testing.T) {
733734
// pool configurations.
734735
maxPoolSizes := []uint64{1, 10, 100}
735736
for _, maxPoolSize := range maxPoolSizes {
736-
tpm := newTestPoolMonitor()
737+
tpm := monitor.NewTestPoolMonitor()
737738
maxPoolSizeOpt := mtest.NewOptions().ClientOptions(
738739
options.Client().
739740
SetPoolMonitor(tpm.PoolMonitor).

0 commit comments

Comments
 (0)