Skip to content

Commit ce7b1c2

Browse files
committed
Wait for the topology to be ready in server selection tests.
1 parent 34cbc49 commit ce7b1c2

File tree

2 files changed

+36
-19
lines changed

2 files changed

+36
-19
lines changed

mongo/integration/server_selection_prose_test.go

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
"context"
1111
"sync"
1212
"testing"
13-
"time"
1413

1514
"github.com/stretchr/testify/assert"
1615
"github.com/stretchr/testify/require"
1716
"go.mongodb.org/mongo-driver/bson"
1817
"go.mongodb.org/mongo-driver/event"
18+
"go.mongodb.org/mongo-driver/mongo/description"
1919
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
2020
"go.mongodb.org/mongo-driver/mongo/options"
2121
)
@@ -59,17 +59,29 @@ func TestServerSelectionProse(t *testing.T) {
5959
mt.ClearFailPoints()
6060
}()
6161

62-
// Reset the client with exactly 2 mongos hosts.
62+
// Reset the client with exactly 2 mongos hosts. Use a ServerMonitor to wait for both mongos
63+
// host descriptions to move from kind "Unknown" to kind "Mongos".
64+
topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
6365
tpm := newTestPoolMonitor()
6466
mt.ResetClient(options.Client().
6567
SetHosts(hosts[:2]).
6668
SetPoolMonitor(tpm.PoolMonitor).
67-
SetAppName("loadBalancingTest"))
69+
SetAppName("loadBalancingTest").
70+
SetServerMonitor(&event.ServerMonitor{
71+
TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
72+
topologyEvents <- evt
73+
},
74+
}))
75+
for evt := range topologyEvents {
76+
servers := evt.NewDescription.Servers
77+
if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
78+
break
79+
}
80+
}
6881

69-
// Start 25 goroutines that each run 10 findOne operations. Run more operations than the
70-
// prose test specifies to get more samples and reduce intermittent test failures.
82+
// Start 10 goroutines that each run 10 findOne operations.
7183
var wg sync.WaitGroup
72-
for i := 0; i < 25; i++ {
84+
for i := 0; i < 10; i++ {
7385
wg.Add(1)
7486
go func() {
7587
defer wg.Done()
@@ -112,28 +124,33 @@ func TestServerSelectionProse(t *testing.T) {
112124
hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
113125
require.GreaterOrEqualf(mt, len(hosts), 2, "test cluster must have at least 2 mongos hosts")
114126

115-
// Reset the client with exactly 2 mongos hosts.
127+
// Reset the client with exactly 2 mongos hosts. Use a ServerMonitor to wait for both mongos
128+
// host descriptions to move from kind "Unknown" to kind "Mongos".
129+
topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
116130
tpm := newTestPoolMonitor()
117131
mt.ResetClient(options.Client().
118132
SetHosts(hosts[:2]).
119133
SetPoolMonitor(tpm.PoolMonitor).
120-
SetHeartbeatInterval(500 * time.Millisecond))
121-
122-
// Sleep for 1s to allow server state discovery and at least 1 heartbeat to complete. We
123-
// need both servers to be selectable when we start running the test or the distribution of
124-
// selected servers will be skewed. Unfortunately there's not currently another signal we
125-
// can wait on.
126-
time.Sleep(1 * time.Second)
134+
SetServerMonitor(&event.ServerMonitor{
135+
TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
136+
topologyEvents <- evt
137+
},
138+
}))
139+
for evt := range topologyEvents {
140+
servers := evt.NewDescription.Servers
141+
if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
142+
break
143+
}
144+
}
127145

128-
// Start 25 goroutines that each run 200 findOne operations. Run more operations than the
129-
// prose test specifies to get more samples and reduce intermittent test failures.
146+
// Start 10 goroutines that each run 10 findOne operations.
130147
var wg sync.WaitGroup
131-
for i := 0; i < 25; i++ {
148+
for i := 0; i < 10; i++ {
132149
wg.Add(1)
133150
go func() {
134151
defer wg.Done()
135152

136-
for i := 0; i < 200; i++ {
153+
for i := 0; i < 20; i++ {
137154
res := mt.Coll.FindOne(context.Background(), bson.D{})
138155
assert.NoError(mt, res.Err(), "FindOne() error")
139156
}

x/mongo/driver/topology/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (s *Server) Connection(ctx context.Context) (driver.Connection, error) {
265265

266266
// Increment the operation count before calling checkOut to make sure that all connection
267267
// requests are included in the operation count, including those in the wait queue. If we got an
268-
// error insted of a connection, immediately decrement the operation count.
268+
// error instead of a connection, immediately decrement the operation count.
269269
atomic.AddInt64(&s.operationCount, 1)
270270
conn, err := s.pool.checkOut(ctx)
271271
if err != nil {

0 commit comments

Comments
 (0)