Skip to content

Commit 797e160

Browse files
author
Divjot Arora
authored
GODRIVER-1934 Ensure correct CursorOptions are used (#625) (#630)
1 parent 692114e commit 797e160

File tree

9 files changed

+195
-28
lines changed

9 files changed

+195
-28
lines changed

mongo/change_stream.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,12 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
103103
}
104104

105105
cs := &ChangeStream{
106-
client: config.client,
107-
registry: config.registry,
108-
streamType: config.streamType,
109-
options: options.MergeChangeStreamOptions(opts...),
110-
selector: description.ReadPrefSelector(config.readPreference),
106+
client: config.client,
107+
registry: config.registry,
108+
streamType: config.streamType,
109+
options: options.MergeChangeStreamOptions(opts...),
110+
selector: description.ReadPrefSelector(config.readPreference),
111+
cursorOptions: config.client.createBaseCursorOptions(),
111112
}
112113

113114
cs.sess = sessionFromContext(ctx)
@@ -128,9 +129,6 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
128129
CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
129130
Crypt(config.crypt)
130131

131-
if config.crypt != nil {
132-
cs.cursorOptions.Crypt = config.crypt
133-
}
134132
if cs.options.Collation != nil {
135133
cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
136134
}
@@ -141,7 +139,6 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
141139
if cs.options.MaxAwaitTime != nil {
142140
cs.cursorOptions.MaxTimeMS = int64(time.Duration(*cs.options.MaxAwaitTime) / time.Millisecond)
143141
}
144-
cs.cursorOptions.CommandMonitor = cs.client.monitor
145142

146143
switch cs.streamType {
147144
case ClientStream:

mongo/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,3 +926,10 @@ func (c *Client) Watch(ctx context.Context, pipeline interface{},
926926
func (c *Client) NumberSessionsInProgress() int {
927927
return c.sessionPool.CheckedOut()
928928
}
929+
930+
func (c *Client) createBaseCursorOptions() driver.CursorOptions {
931+
return driver.CursorOptions{
932+
CommandMonitor: c.monitor,
933+
Crypt: c.cryptFLE,
934+
}
935+
}

mongo/collection.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -780,10 +780,7 @@ func aggregate(a aggregateParams) (*Cursor, error) {
780780
}
781781

782782
ao := options.MergeAggregateOptions(a.opts...)
783-
cursorOpts := driver.CursorOptions{
784-
CommandMonitor: a.client.monitor,
785-
Crypt: a.client.cryptFLE,
786-
}
783+
cursorOpts := a.client.createBaseCursorOptions()
787784

788785
op := operation.NewAggregate(pipelineArr).
789786
Session(sess).
@@ -1139,10 +1136,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
11391136
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE)
11401137

11411138
fo := options.MergeFindOptions(opts...)
1142-
cursorOpts := driver.CursorOptions{
1143-
CommandMonitor: coll.client.monitor,
1144-
Crypt: coll.client.cryptFLE,
1145-
}
1139+
cursorOpts := coll.client.createBaseCursorOptions()
11461140

11471141
if fo.AllowDiskUse != nil {
11481142
op.AllowDiskUse(*fo.AllowDiskUse)

mongo/database.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}
227227
return nil, replaceErrors(err)
228228
}
229229

230-
bc, err := op.ResultCursor(driver.CursorOptions{})
230+
bc, err := op.ResultCursor(db.client.createBaseCursorOptions())
231231
if err != nil {
232232
closeImplicitSession(sess)
233233
return nil, replaceErrors(err)
@@ -362,10 +362,13 @@ func (db *Database) ListCollections(ctx context.Context, filter interface{}, opt
362362
Session(sess).ReadPreference(db.readPreference).CommandMonitor(db.client.monitor).
363363
ServerSelector(selector).ClusterClock(db.client.clock).
364364
Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE)
365+
366+
cursorOpts := db.client.createBaseCursorOptions()
365367
if lco.NameOnly != nil {
366368
op = op.NameOnly(*lco.NameOnly)
367369
}
368370
if lco.BatchSize != nil {
371+
cursorOpts.BatchSize = *lco.BatchSize
369372
op = op.BatchSize(*lco.BatchSize)
370373
}
371374

@@ -381,7 +384,7 @@ func (db *Database) ListCollections(ctx context.Context, filter interface{}, opt
381384
return nil, replaceErrors(err)
382385
}
383386

384-
bc, err := op.Result(driver.CursorOptions{Crypt: db.client.cryptFLE})
387+
bc, err := op.Result(cursorOpts)
385388
if err != nil {
386389
closeImplicitSession(sess)
387390
return nil, replaceErrors(err)

mongo/index_view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (iv IndexView) List(ctx context.Context, opts ...*options.ListIndexesOption
9797
Database(iv.coll.db.name).Collection(iv.coll.name).
9898
Deployment(iv.coll.client.deployment)
9999

100-
var cursorOpts driver.CursorOptions
100+
cursorOpts := iv.coll.client.createBaseCursorOptions()
101101
lio := options.MergeListIndexesOptions(opts...)
102102
if lio.BatchSize != nil {
103103
op = op.BatchSize(*lio.BatchSize)

mongo/integration/change_stream_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,30 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
609609
// next call to cs.Next should return False since cursor is closed
610610
assert.False(mt, cs.Next(mtest.Background), "expected to return false, but returned true")
611611
})
612+
mt.Run("getMore commands are monitored", func(mt *mtest.T) {
613+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
614+
assert.Nil(mt, err, "Watch error: %v", err)
615+
defer closeStream(cs)
616+
617+
_, err = mt.Coll.InsertOne(mtest.Background, bson.M{"x": 1})
618+
assert.Nil(mt, err, "InsertOne error: %v", err)
619+
620+
mt.ClearEvents()
621+
assert.True(mt, cs.Next(mtest.Background), "Next returned false with error %v", cs.Err())
622+
evt := mt.GetStartedEvent()
623+
assert.Equal(mt, "getMore", evt.CommandName, "expected command 'getMore', got %q", evt.CommandName)
624+
})
625+
mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
626+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
627+
assert.Nil(mt, err, "Watch error: %v", err)
628+
defer closeStream(cs)
629+
630+
mt.ClearEvents()
631+
err = cs.Close(mtest.Background)
632+
assert.Nil(mt, err, "Close error: %v", err)
633+
evt := mt.GetStartedEvent()
634+
assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
635+
})
612636
}
613637

614638
func closeStream(cs *mongo.ChangeStream) {

mongo/integration/collection_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,18 @@ func TestCollection(t *testing.T) {
787787
_, ok := err.(mongo.WriteConcernError)
788788
assert.True(mt, ok, "expected error type %v, got %v", mongo.WriteConcernError{}, err)
789789
})
790+
mt.Run("getMore commands are monitored", func(mt *mtest.T) {
791+
initCollection(mt, mt.Coll)
792+
assertGetMoreCommandsAreMonitored(mt, "aggregate", func() (*mongo.Cursor, error) {
793+
return mt.Coll.Aggregate(mtest.Background, mongo.Pipeline{}, options.Aggregate().SetBatchSize(3))
794+
})
795+
})
796+
mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
797+
initCollection(mt, mt.Coll)
798+
assertKillCursorsCommandsAreMonitored(mt, "aggregate", func() (*mongo.Cursor, error) {
799+
return mt.Coll.Aggregate(mtest.Background, mongo.Pipeline{}, options.Aggregate().SetBatchSize(3))
800+
})
801+
})
790802
})
791803
mt.RunOpts("count documents", noClientOpts, func(mt *mtest.T) {
792804
mt.Run("success", func(mt *mtest.T) {
@@ -1054,6 +1066,18 @@ func TestCollection(t *testing.T) {
10541066
})
10551067
}
10561068
})
1069+
mt.Run("getMore commands are monitored", func(mt *mtest.T) {
1070+
initCollection(mt, mt.Coll)
1071+
assertGetMoreCommandsAreMonitored(mt, "find", func() (*mongo.Cursor, error) {
1072+
return mt.Coll.Find(mtest.Background, bson.D{}, options.Find().SetBatchSize(3))
1073+
})
1074+
})
1075+
mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
1076+
initCollection(mt, mt.Coll)
1077+
assertKillCursorsCommandsAreMonitored(mt, "find", func() (*mongo.Cursor, error) {
1078+
return mt.Coll.Find(mtest.Background, bson.D{}, options.Find().SetBatchSize(3))
1079+
})
1080+
})
10571081
})
10581082
mt.RunOpts("find one", noClientOpts, func(mt *mtest.T) {
10591083
mt.Run("limit", func(mt *mtest.T) {
@@ -1886,3 +1910,41 @@ func create16MBDocument(mt *mtest.T) bsoncore.Document {
18861910
assert.Equal(mt, targetDocSize, len(doc), "expected document length %v, got %v", targetDocSize, len(doc))
18871911
return doc
18881912
}
1913+
1914+
// This is a helper function to ensure that sending getMore commands for a cursor results in command monitoring events
1915+
// being published. The cursorFn parameter should be a function that yields a cursor which is open on the server and
1916+
// requires at least one getMore to be fully iterated.
1917+
func assertGetMoreCommandsAreMonitored(mt *mtest.T, cmdName string, cursorFn func() (*mongo.Cursor, error)) {
1918+
mt.Helper()
1919+
mt.ClearEvents()
1920+
1921+
cursor, err := cursorFn()
1922+
assert.Nil(mt, err, "error creating cursor: %v", err)
1923+
var docs []bson.D
1924+
err = cursor.All(mtest.Background, &docs)
1925+
assert.Nil(mt, err, "All error: %v", err)
1926+
1927+
// Only assert that the initial command and at least one getMore were sent. The exact number of getMore's required
1928+
// is not important.
1929+
evt := mt.GetStartedEvent()
1930+
assert.Equal(mt, cmdName, evt.CommandName, "expected command %q, got %q", cmdName, evt.CommandName)
1931+
evt = mt.GetStartedEvent()
1932+
assert.Equal(mt, "getMore", evt.CommandName, "expected command 'getMore', got %q", evt.CommandName)
1933+
}
1934+
1935+
// This is a helper function to ensure that sending killCursors commands for a cursor results in command monitoring
1936+
// events being published. The cursorFn parameter should be a function that yields a cursor which is open on the server.
1937+
func assertKillCursorsCommandsAreMonitored(mt *mtest.T, cmdName string, cursorFn func() (*mongo.Cursor, error)) {
1938+
mt.Helper()
1939+
mt.ClearEvents()
1940+
1941+
cursor, err := cursorFn()
1942+
assert.Nil(mt, err, "error creating cursor: %v", err)
1943+
err = cursor.Close(mtest.Background)
1944+
assert.Nil(mt, err, "Close error: %v", err)
1945+
1946+
evt := mt.GetStartedEvent()
1947+
assert.Equal(mt, cmdName, evt.CommandName, "expected command %q, got %q", cmdName, evt.CommandName)
1948+
evt = mt.GetStartedEvent()
1949+
assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
1950+
}

mongo/integration/database_test.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,16 @@ func TestDatabase(t *testing.T) {
170170
})
171171

172172
mt.RunOpts("list collections", noClientOpts, func(mt *mtest.T) {
173+
createCollections := func(mt *mtest.T, numCollections int) {
174+
mt.Helper()
175+
176+
for i := 0; i < numCollections; i++ {
177+
mt.CreateCollection(mtest.Collection{
178+
Name: fmt.Sprintf("list-collections-test-%d", i),
179+
}, true)
180+
}
181+
}
182+
173183
mt.RunOpts("verify results", noClientOpts, func(mt *mtest.T) {
174184
testCases := []struct {
175185
name string
@@ -213,11 +223,7 @@ func TestDatabase(t *testing.T) {
213223
})
214224
mt.RunOpts("batch size", mtest.NewOptions().MinServerVersion("3.0"), func(mt *mtest.T) {
215225
// Create two new collections so there will be three total.
216-
for i := 0; i < 2; i++ {
217-
mt.CreateCollection(mtest.Collection{
218-
Name: fmt.Sprintf("list-collections-batchSize-%d", i),
219-
}, true)
220-
}
226+
createCollections(mt, 2)
221227

222228
mt.ClearEvents()
223229
lcOpts := options.ListCollections().SetBatchSize(2)
@@ -230,6 +236,22 @@ func TestDatabase(t *testing.T) {
230236
_, err = evt.Command.LookupErr("cursor", "batchSize")
231237
assert.Nil(mt, err, "expected command %s to contain key 'batchSize'", evt.Command)
232238
})
239+
240+
// The BatchSize option is not honored for ListCollections operations on server version 2.6 due to an
241+
// inconsistency in the legacy OP_QUERY code path (GODRIVER-1937).
242+
cmdMonitoringMtOpts := mtest.NewOptions().MinServerVersion("3.0")
243+
mt.RunOpts("getMore commands are monitored", cmdMonitoringMtOpts, func(mt *mtest.T) {
244+
createCollections(mt, 2)
245+
assertGetMoreCommandsAreMonitored(mt, "listCollections", func() (*mongo.Cursor, error) {
246+
return mt.DB.ListCollections(mtest.Background, bson.D{}, options.ListCollections().SetBatchSize(2))
247+
})
248+
})
249+
mt.RunOpts("killCursors commands are monitored", cmdMonitoringMtOpts, func(mt *mtest.T) {
250+
createCollections(mt, 2)
251+
assertKillCursorsCommandsAreMonitored(mt, "listCollections", func() (*mongo.Cursor, error) {
252+
return mt.DB.ListCollections(mtest.Background, bson.D{}, options.ListCollections().SetBatchSize(2))
253+
})
254+
})
233255
})
234256

235257
mt.RunOpts("list collection specifications", noClientOpts, func(mt *mtest.T) {
@@ -356,6 +378,29 @@ func TestDatabase(t *testing.T) {
356378
assert.Equal(mt, tc.numExpected, count, "expected document count %v, got %v", tc.numExpected, count)
357379
})
358380
}
381+
382+
// The find command does not exist on server versions below 3.2.
383+
cmdMonitoringMtOpts := mtest.NewOptions().MinServerVersion("3.2")
384+
mt.RunOpts("getMore commands are monitored", cmdMonitoringMtOpts, func(mt *mtest.T) {
385+
initCollection(mt, mt.Coll)
386+
assertGetMoreCommandsAreMonitored(mt, "find", func() (*mongo.Cursor, error) {
387+
findCmd := bson.D{
388+
{"find", mt.Coll.Name()},
389+
{"batchSize", 2},
390+
}
391+
return mt.DB.RunCommandCursor(mtest.Background, findCmd)
392+
})
393+
})
394+
mt.RunOpts("killCursors commands are monitored", cmdMonitoringMtOpts, func(mt *mtest.T) {
395+
initCollection(mt, mt.Coll)
396+
assertKillCursorsCommandsAreMonitored(mt, "find", func() (*mongo.Cursor, error) {
397+
findCmd := bson.D{
398+
{"find", mt.Coll.Name()},
399+
{"batchSize", 2},
400+
}
401+
return mt.DB.RunCommandCursor(mtest.Background, findCmd)
402+
})
403+
})
359404
})
360405

361406
mt.RunOpts("create collection", noClientOpts, func(mt *mtest.T) {

mongo/integration/index_view_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,44 @@ func TestIndexView(t *testing.T) {
3030
defer mt.Close()
3131

3232
mt.Run("list", func(mt *mtest.T) {
33-
verifyIndexExists(mt, mt.Coll.Indexes(), index{
34-
Key: bson.D{{"_id", int32(1)}},
35-
Name: "_id_",
33+
createIndexes := func(mt *mtest.T, numIndexes int) {
34+
mt.Helper()
35+
36+
models := make([]mongo.IndexModel, 0, numIndexes)
37+
for i, key := 0, 'a'; i < numIndexes; i, key = i+1, key+1 {
38+
models = append(models, mongo.IndexModel{
39+
Keys: bson.M{string(key): 1},
40+
})
41+
}
42+
43+
_, err := mt.Coll.Indexes().CreateMany(mtest.Background, models)
44+
assert.Nil(mt, err, "CreateMany error: %v", err)
45+
}
46+
47+
// For server versions below 3.0, we internally execute List() as a legacy OP_QUERY against the system.indexes
48+
// collection. Command monitoring upconversions translate this to a "find" command rather than "listIndexes".
49+
cmdName := "listIndexes"
50+
if mtest.CompareServerVersions(mtest.ServerVersion(), "3.0") < 0 {
51+
cmdName = "find"
52+
}
53+
54+
mt.Run("_id index is always listed", func(mt *mtest.T) {
55+
verifyIndexExists(mt, mt.Coll.Indexes(), index{
56+
Key: bson.D{{"_id", int32(1)}},
57+
Name: "_id_",
58+
})
59+
})
60+
mt.Run("getMore commands are monitored", func(mt *mtest.T) {
61+
createIndexes(mt, 2)
62+
assertGetMoreCommandsAreMonitored(mt, cmdName, func() (*mongo.Cursor, error) {
63+
return mt.Coll.Indexes().List(mtest.Background, options.ListIndexes().SetBatchSize(2))
64+
})
65+
})
66+
mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
67+
createIndexes(mt, 2)
68+
assertKillCursorsCommandsAreMonitored(mt, cmdName, func() (*mongo.Cursor, error) {
69+
return mt.Coll.Indexes().List(mtest.Background, options.ListIndexes().SetBatchSize(2))
70+
})
3671
})
3772
})
3873
mt.RunOpts("create one", noClientOpts, func(mt *mtest.T) {

0 commit comments

Comments
 (0)