Skip to content

Commit 1bdb8a8

Browse files
author
Divjot Arora
authored
GODRIVER-1699 Fix flaky tests (#468)
1 parent 037df13 commit 1bdb8a8

File tree

5 files changed

+98
-62
lines changed

5 files changed

+98
-62
lines changed

mongo/integration/crud_prose_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,19 +145,17 @@ func TestHintErrors(t *testing.T) {
145145
})
146146
}
147147

148-
func TestAggregateSecondaryPreferredReadPreference(t *testing.T) {
149-
// Use secondaryPreferred instead of secondary because sharded clusters started up by mongo-orchestration have
150-
// one-node shards, so a secondary read preference is not satisfiable.
151-
secondaryPrefClientOpts := options.Client().
148+
func TestAggregatePrimaryPreferredReadPreference(t *testing.T) {
149+
primaryPrefClientOpts := options.Client().
152150
SetWriteConcern(mtest.MajorityWc).
153-
SetReadPreference(readpref.SecondaryPreferred()).
151+
SetReadPreference(readpref.PrimaryPreferred()).
154152
SetReadConcern(mtest.MajorityRc)
155153
mtOpts := mtest.NewOptions().
156-
ClientOptions(secondaryPrefClientOpts).
154+
ClientOptions(primaryPrefClientOpts).
157155
MinServerVersion("4.1.0") // Consistent with tests in aggregate-out-readConcern.json
158156

159157
mt := mtest.New(t, mtOpts)
160-
mt.Run("aggregate $out with read preference secondary", func(mt *mtest.T) {
158+
mt.Run("aggregate $out with non-primary read ppreference", func(mt *mtest.T) {
161159
doc, err := bson.Marshal(bson.D{
162160
{"_id", 1},
163161
{"x", 11},
@@ -167,7 +165,7 @@ func TestAggregateSecondaryPreferredReadPreference(t *testing.T) {
167165
assert.Nil(mt, err, "InsertOne error: %v", err)
168166

169167
mt.ClearEvents()
170-
outputCollName := "aggregate-read-pref-secondary-output"
168+
outputCollName := "aggregate-read-pref-primary-preferred-output"
171169
outStage := bson.D{
172170
{"$out", outputCollName},
173171
}

mongo/integration/errors_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,16 @@ func TestErrors(t *testing.T) {
8080
"errors.Is failure: expected error %v to be %v", err, context.DeadlineExceeded)
8181
})
8282

83-
socketTimeoutOpts := options.Client().
84-
SetSocketTimeout(100 * time.Millisecond)
85-
socketTimeoutMtOpts := mtest.NewOptions().
86-
ClientOptions(socketTimeoutOpts)
87-
mt.RunOpts("socketTimeoutMS timeouts return network errors", socketTimeoutMtOpts, func(mt *mtest.T) {
83+
mt.Run("socketTimeoutMS timeouts return network errors", func(mt *mtest.T) {
8884
_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
8985
assert.Nil(mt, err, "InsertOne error: %v", err)
9086

87+
// Reset the test client to have a 100ms socket timeout. We do this here rather than passing it in as a
88+
// test option using mt.RunOpts because that could cause the collection creation or InsertOne to fail.
89+
resetClientOpts := options.Client().
90+
SetSocketTimeout(100 * time.Millisecond)
91+
mt.ResetClient(resetClientOpts)
92+
9193
mt.ClearEvents()
9294
filter := bson.M{
9395
"$where": "function() { sleep(1000); return false; }",

mongo/integration/sessions_test.go

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -252,43 +252,27 @@ func TestSessions(t *testing.T) {
252252
deleteID := extractSentSessionID(mt)
253253
assert.Equal(mt, findID, deleteID, "expected session ID %v, got %v", findID, deleteID)
254254
})
255-
mt.RunOpts("find and getMore use same ID", noClientOpts, func(mt *mtest.T) {
256-
testCases := []struct {
257-
name string
258-
rp *readpref.ReadPref
259-
topos []mtest.TopologyKind // if nil, all will be used
260-
}{
261-
{"primary", readpref.Primary(), nil},
262-
{"primaryPreferred", readpref.PrimaryPreferred(), nil},
263-
{"secondary", readpref.Secondary(), []mtest.TopologyKind{mtest.ReplicaSet}},
264-
{"secondaryPreferred", readpref.SecondaryPreferred(), nil},
265-
{"nearest", readpref.Nearest(), nil},
255+
mt.Run("find and getMore use same ID", func(mt *mtest.T) {
256+
var docs []interface{}
257+
for i := 0; i < 3; i++ {
258+
docs = append(docs, bson.D{{"x", i}})
266259
}
267-
for _, tc := range testCases {
268-
clientOpts := options.Client().SetReadPreference(tc.rp).SetWriteConcern(mtest.MajorityWc)
269-
mt.RunOpts(tc.name, mtest.NewOptions().ClientOptions(clientOpts).Topologies(tc.topos...), func(mt *mtest.T) {
270-
var docs []interface{}
271-
for i := 0; i < 3; i++ {
272-
docs = append(docs, bson.D{{"x", i}})
273-
}
274-
_, err := mt.Coll.InsertMany(mtest.Background, docs)
275-
assert.Nil(mt, err, "InsertMany error: %v", err)
276-
277-
// run a find that will hold onto an implicit session and record the session ID
278-
mt.ClearEvents()
279-
cursor, err := mt.Coll.Find(mtest.Background, bson.D{}, options.Find().SetBatchSize(2))
280-
assert.Nil(mt, err, "Find error: %v", err)
281-
findID := extractSentSessionID(mt)
282-
assert.NotNil(mt, findID, "expected session ID for find, got nil")
283-
284-
// iterate over all documents and record the session ID of the getMore
285-
for i := 0; i < 3; i++ {
286-
assert.True(mt, cursor.Next(mtest.Background), "Next returned false on iteration %v", i)
287-
}
288-
getMoreID := extractSentSessionID(mt)
289-
assert.Equal(mt, findID, getMoreID, "expected session ID %v, got %v", findID, getMoreID)
290-
})
260+
_, err := mt.Coll.InsertMany(mtest.Background, docs)
261+
assert.Nil(mt, err, "InsertMany error: %v", err)
262+
263+
// run a find that will hold onto an implicit session and record the session ID
264+
mt.ClearEvents()
265+
cursor, err := mt.Coll.Find(mtest.Background, bson.D{}, options.Find().SetBatchSize(2))
266+
assert.Nil(mt, err, "Find error: %v", err)
267+
findID := extractSentSessionID(mt)
268+
assert.NotNil(mt, findID, "expected session ID for find, got nil")
269+
270+
// iterate over all documents and record the session ID of the getMore
271+
for i := 0; i < 3; i++ {
272+
assert.True(mt, cursor.Next(mtest.Background), "Next returned false on iteration %v", i)
291273
}
274+
getMoreID := extractSentSessionID(mt)
275+
assert.Equal(mt, findID, getMoreID, "expected session ID %v, got %v", findID, getMoreID)
292276
})
293277

294278
mt.Run("imperative API", func(mt *mtest.T) {

x/mongo/driver/topology/cmap_prose_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,15 @@ func TestCMAPProse(t *testing.T) {
7878

7979
_, err := pool.get(context.Background())
8080
assert.NotNil(t, err, "expected get() error, got nil")
81-
assertConnectionCounts(t, pool, 1, 1)
81+
82+
// If the connection doesn't finish connecting before resourcePool gives it back, the error will be
83+
// detected by pool.get and result in a created/closed count of 1. If it does finish connecting, the
84+
// error will be detected by resourcePool, which will return nil. Then, pool will try to create a new
85+
// connection, which will also error. This process will result in a created/closed count of 2.
86+
assert.True(t, len(created) == 1 || len(created) == 2, "expected 1 or 2 opened events, got %d", len(created))
87+
assert.True(t, len(closed) == 1 || len(closed) == 2, "expected 1 or 2 closed events, got %d", len(closed))
88+
netCount := len(created) - len(closed)
89+
assert.Equal(t, 0, netCount, "expected net connection count to be 0, got %d", netCount)
8290
})
8391
t.Run("pool is empty", func(t *testing.T) {
8492
// If a new connection is created during get(), get() should report that error and publish an event.

x/mongo/driver/topology/connection_test.go

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,65 @@ func TestConnection(t *testing.T) {
115115
assert.NotNil(t, err, "expected connect error %v, got nil", want)
116116
assert.Equal(t, want, got, "expected error %v, got %v", want, got)
117117
})
118-
t.Run("cancelConnectContext is nil after connect", func(t *testing.T) {
119-
conn, err := newConnection(address.Address(""))
120-
assert.Nil(t, err, "newConnection shouldn't error. got %v; want nil", err)
121-
var wg sync.WaitGroup
122-
wg.Add(1)
118+
t.Run("context is not pinned by connect", func(t *testing.T) {
119+
// connect creates a cancel-able version of the context passed to it and stores the CancelFunc on the
120+
// connection. The CancelFunc must be set to nil once the connection has been established so the driver
121+
// does not pin the memory associated with the context for the connection's lifetime.
122+
123+
t.Run("connect succeeds", func(t *testing.T) {
124+
// In the case where connect finishes successfully, it unpins the CancelFunc.
125+
126+
conn, err := newConnection(address.Address(""),
127+
WithDialer(func(Dialer) Dialer {
128+
return DialerFunc(func(context.Context, string, string) (net.Conn, error) {
129+
return &net.TCPConn{}, nil
130+
})
131+
}),
132+
WithHandshaker(func(Handshaker) Handshaker {
133+
return &testHandshaker{}
134+
}),
135+
)
136+
assert.Nil(t, err, "newConnection error: %v", err)
123137

124-
go func() {
125-
defer wg.Done()
126138
conn.connect(context.Background())
127-
assert.Nil(t, conn.cancelConnectContext, "expected nil, got context.CancelFunc")
128-
}()
129-
130-
conn.closeConnectContext()
131-
assert.Nil(t, conn.cancelConnectContext, "expected nil, got context.CancelFunc")
132-
wg.Wait()
139+
err = conn.wait()
140+
assert.Nil(t, err, "error establishing connection: %v", err)
141+
assert.Nil(t, conn.cancelConnectContext, "cancellation function was not cleared")
142+
})
143+
t.Run("connect cancelled", func(t *testing.T) {
144+
// In the case where connection establishment is cancelled, the closeConnectContext function
145+
// unpins the CancelFunc.
146+
147+
// Create a connection that will block in connect until doneChan is closed. This prevents
148+
// connect from succeeding and unpinning the CancelFunc.
149+
doneChan := make(chan struct{})
150+
conn, err := newConnection(address.Address(""),
151+
WithDialer(func(Dialer) Dialer {
152+
return DialerFunc(func(context.Context, string, string) (net.Conn, error) {
153+
<-doneChan
154+
return &net.TCPConn{}, nil
155+
})
156+
}),
157+
WithHandshaker(func(Handshaker) Handshaker {
158+
return &testHandshaker{}
159+
}),
160+
)
161+
assert.Nil(t, err, "newConnection error: %v", err)
162+
163+
// Call connect in a goroutine because it will block.
164+
var wg sync.WaitGroup
165+
wg.Add(1)
166+
go func() {
167+
defer wg.Done()
168+
conn.connect(context.Background())
169+
}()
170+
171+
// Simulate cancelling connection establishment and assert that this cleares the CancelFunc.
172+
conn.closeConnectContext()
173+
assert.Nil(t, conn.cancelConnectContext, "cancellation function was not cleared")
174+
close(doneChan)
175+
wg.Wait()
176+
})
133177
})
134178
})
135179
t.Run("writeWireMessage", func(t *testing.T) {

0 commit comments

Comments
 (0)