Skip to content

Commit c9fe824

Browse files
Divjot AroraRoland Fong
authored andcommitted
Add mongo/changestreamopt
GODRIVER-272 Change-Id: I8dffbf535f7756dd18980bcc3bf4f4e2d28092fc
1 parent 5d57f17 commit c9fe824

File tree

8 files changed

+596
-272
lines changed

8 files changed

+596
-272
lines changed

examples/documentation_examples/examples.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,7 +1166,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
11661166
{
11671167
// Start Example 44
11681168

1169-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1169+
projection := findopt.Projection(bson.NewDocument(
11701170
bson.EC.Int32("item", 1),
11711171
bson.EC.Int32("status", 1),
11721172
))
@@ -1177,7 +1177,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
11771177
bson.NewDocument(
11781178
bson.EC.String("status", "A"),
11791179
),
1180-
findopt.Projection(projection),
1180+
projection,
11811181
)
11821182

11831183
// End Example 44
@@ -1206,7 +1206,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12061206
{
12071207
// Start Example 45
12081208

1209-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1209+
projection := findopt.Projection(bson.NewDocument(
12101210
bson.EC.Int32("item", 1),
12111211
bson.EC.Int32("status", 1),
12121212
bson.EC.Int32("_id", 0),
@@ -1218,7 +1218,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12181218
bson.NewDocument(
12191219
bson.EC.String("status", "A"),
12201220
),
1221-
findopt.Projection(projection),
1221+
projection,
12221222
)
12231223

12241224
// End Example 45
@@ -1247,7 +1247,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12471247
{
12481248
// Start Example 46
12491249

1250-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1250+
projection := findopt.Projection(bson.NewDocument(
12511251
bson.EC.Int32("status", 0),
12521252
bson.EC.Int32("instock", 0),
12531253
))
@@ -1258,7 +1258,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12581258
bson.NewDocument(
12591259
bson.EC.String("status", "A"),
12601260
),
1261-
findopt.Projection(projection),
1261+
projection,
12621262
)
12631263

12641264
// End Example 46
@@ -1287,7 +1287,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12871287
{
12881288
// Start Example 47
12891289

1290-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1290+
projection := findopt.Projection(bson.NewDocument(
12911291
bson.EC.Int32("item", 1),
12921292
bson.EC.Int32("status", 1),
12931293
bson.EC.Int32("size.uom", 1),
@@ -1299,7 +1299,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
12991299
bson.NewDocument(
13001300
bson.EC.String("status", "A"),
13011301
),
1302-
findopt.Projection(projection),
1302+
projection,
13031303
)
13041304

13051305
// End Example 47
@@ -1333,7 +1333,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
13331333
{
13341334
// Start Example 48
13351335

1336-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1336+
projection := findopt.Projection(bson.NewDocument(
13371337
bson.EC.Int32("size.uom", 0),
13381338
))
13391339
require.NoError(t, err)
@@ -1343,7 +1343,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
13431343
bson.NewDocument(
13441344
bson.EC.String("status", "A"),
13451345
),
1346-
findopt.Projection(projection),
1346+
projection,
13471347
)
13481348

13491349
// End Example 48
@@ -1377,7 +1377,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
13771377
{
13781378
// Start Example 49
13791379

1380-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1380+
projection := findopt.Projection(bson.NewDocument(
13811381
bson.EC.Int32("item", 1),
13821382
bson.EC.Int32("status", 1),
13831383
bson.EC.Int32("instock.qty", 1),
@@ -1389,7 +1389,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
13891389
bson.NewDocument(
13901390
bson.EC.String("status", "A"),
13911391
),
1392-
findopt.Projection(projection),
1392+
projection,
13931393
)
13941394

13951395
// End Example 49
@@ -1435,7 +1435,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
14351435
{
14361436
// Start Example 50
14371437

1438-
projection, err := mongo.Opt.Projection(bson.NewDocument(
1438+
projection := findopt.Projection(bson.NewDocument(
14391439
bson.EC.Int32("item", 1),
14401440
bson.EC.Int32("status", 1),
14411441
bson.EC.SubDocumentFromElements("instock",
@@ -1449,7 +1449,7 @@ func ProjectionExamples(t *testing.T, db *mongo.Database) {
14491449
bson.NewDocument(
14501450
bson.EC.String("status", "A"),
14511451
),
1452-
findopt.Projection(projection),
1452+
projection,
14531453
)
14541454

14551455
// End Example 50

mongo/change_stream.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/mongodb/mongo-go-driver/bson"
1515
"github.com/mongodb/mongo-go-driver/core/command"
1616
"github.com/mongodb/mongo-go-driver/core/option"
17+
"github.com/mongodb/mongo-go-driver/mongo/changestreamopt"
1718
)
1819

1920
// ErrMissingResumeToken indicates that a change stream notification from the server did not
@@ -33,16 +34,21 @@ const errorCodeNotMaster int32 = 10107
3334
const errorCodeCursorNotFound int32 = 43
3435

3536
func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{},
36-
opts ...option.ChangeStreamOptioner) (*changeStream, error) {
37+
opts ...changestreamopt.ChangeStream) (*changeStream, error) {
3738

3839
pipelineArr, err := transformAggregatePipeline(pipeline)
3940
if err != nil {
4041
return nil, err
4142
}
4243

44+
csOpts, err := changestreamopt.BundleChangeStream(opts...).Unbundle(true)
45+
if err != nil {
46+
return nil, err
47+
}
48+
4349
changeStreamOptions := bson.NewDocument()
4450

45-
for _, opt := range opts {
51+
for _, opt := range csOpts {
4652
err = opt.Option(changeStreamOptions)
4753
if err != nil {
4854
return nil, err
@@ -61,7 +67,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
6167

6268
cs := &changeStream{
6369
pipeline: pipelineArr,
64-
options: opts,
70+
options: csOpts,
6571
coll: coll,
6672
cursor: cursor,
6773
}
@@ -90,7 +96,7 @@ func (cs *changeStream) Next(ctx context.Context) bool {
9096
}
9197
}
9298

93-
resumeToken := Opt.ResumeAfter(cs.resumeToken)
99+
resumeToken := changestreamopt.ResumeAfter(cs.resumeToken).ConvertChangeStreamOption()
94100
found := false
95101

96102
for i, opt := range cs.options {

0 commit comments

Comments
 (0)