Skip to content

GODRIVER-2337 Add CustomPipeline to ChangeStreamOptions #880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,22 @@ type ChangeStream struct {
// TryNext. If continued access is required, a copy must be made.
Current bson.Raw

aggregate *operation.Aggregate
pipelineSlice []bsoncore.Document
cursor changeStreamCursor
cursorOptions driver.CursorOptions
batch []bsoncore.Document
resumeToken bson.Raw
err error
sess *session.Client
client *Client
registry *bsoncodec.Registry
streamType StreamType
options *options.ChangeStreamOptions
selector description.ServerSelector
operationTime *primitive.Timestamp
wireVersion *description.VersionRange
aggregate *operation.Aggregate
pipelineSlice []bsoncore.Document
pipelineOptions map[string]bsoncore.Value
cursor changeStreamCursor
cursorOptions driver.CursorOptions
batch []bsoncore.Document
resumeToken bson.Raw
err error
sess *session.Client
client *Client
registry *bsoncodec.Registry
streamType StreamType
options *options.ChangeStreamOptions
selector description.ServerSelector
operationTime *primitive.Timestamp
wireVersion *description.VersionRange
}

type changeStreamConfig struct {
Expand Down Expand Up @@ -143,11 +144,11 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
if cs.options.MaxAwaitTime != nil {
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
}
if cs.options.CustomOptions != nil {
if cs.options.Custom != nil {
// Marshal all custom options before passing to the initial aggregate. Return
// any errors from Marshaling.
customOptions := make(map[string]bsoncore.Value)
for optionName, optionValue := range cs.options.CustomOptions {
for optionName, optionValue := range cs.options.Custom {
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
if err != nil {
cs.err = err
Expand All @@ -159,6 +160,21 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
}
cs.aggregate.CustomOptions(customOptions)
}
if cs.options.CustomPipeline != nil {
// Marshal all custom pipeline options before building pipeline slice. Return
// any errors from Marshaling.
cs.pipelineOptions = make(map[string]bsoncore.Value)
for optionName, optionValue := range cs.options.CustomPipeline {
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
if err != nil {
cs.err = err
closeImplicitSession(cs.sess)
return nil, cs.Err()
}
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
cs.pipelineOptions[optionName] = optionValueBSON
}
}

switch cs.streamType {
case ClientStream:
Expand Down Expand Up @@ -406,6 +422,11 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
}

// Append custom pipeline options.
for optionName, optionValue := range cs.pipelineOptions {
plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
}

if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,11 +852,11 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
}
op.Let(let)
}
if ao.CustomOptions != nil {
if ao.Custom != nil {
// Marshal all custom options before passing to the aggregate operation. Return
// any errors from Marshaling.
customOptions := make(map[string]bsoncore.Value)
for optionName, optionValue := range ao.CustomOptions {
for optionName, optionValue := range ao.Custom {
bsonType, bsonData, err := bson.MarshalValueWithRegistry(a.registry, optionValue)
if err != nil {
return nil, err
Expand Down
26 changes: 24 additions & 2 deletions mongo/integration/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,11 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
evt := mt.GetStartedEvent()
assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
})
mt.Run("CustomOptions", func(mt *mtest.T) {
mt.Run("Custom", func(mt *mtest.T) {
// Custom options should be a BSON map of option names to Marshalable option values.
// We use "allowDiskUse" as an example.
customOpts := bson.M{"allowDiskUse": true}
opts := options.ChangeStream().SetCustomOptions(customOpts)
opts := options.ChangeStream().SetCustom(customOpts)

// Create change stream with custom options set.
mt.ClearEvents()
Expand All @@ -656,6 +656,28 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
assert.True(mt, ok, "expected field 'allowDiskUse' to be boolean, got %v", aduVal.Type.String())
assert.True(mt, adu, "expected field 'allowDiskUse' to be true, got false")
})
mt.RunOpts("CustomPipeline", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
// Custom pipeline options should be a BSON map of option names to Marshalable option values.
// We use "allChangesForCluster" as an example.
customPipelineOpts := bson.M{"allChangesForCluster": false}
opts := options.ChangeStream().SetCustomPipeline(customPipelineOpts)

// Create change stream with custom pipeline options set.
mt.ClearEvents()
cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
assert.Nil(mt, err, "Watch error: %v", err)
defer closeStream(cs)

// Assert that custom pipeline option is included in the $changeStream stage.
evt := mt.GetStartedEvent()
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)

acfcVal, err := evt.Command.LookupErr("pipeline", "0", "$changeStream", "allChangesForCluster")
assert.Nil(mt, err, "expected field 'allChangesForCluster' in $changeStream stage not found")
acfc, ok := acfcVal.BooleanOK()
assert.True(mt, ok, "expected field 'allChangesForCluster' to be boolean, got %v", acfcVal.Type.String())
assert.False(mt, acfc, "expected field 'allChangesForCluster' to be false, got %v", acfc)
})
}

func closeStream(cs *mongo.ChangeStream) {
Expand Down
4 changes: 2 additions & 2 deletions mongo/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,11 +803,11 @@ func TestCollection(t *testing.T) {
return mt.Coll.Aggregate(context.Background(), mongo.Pipeline{}, options.Aggregate().SetBatchSize(3))
})
})
mt.Run("CustomOptions", func(mt *mtest.T) {
mt.Run("Custom", func(mt *mtest.T) {
// Custom options should be a BSON map of option names to Marshalable option values.
// We use "allowDiskUse" as an example.
customOpts := bson.M{"allowDiskUse": true}
opts := options.Aggregate().SetCustomOptions(customOpts)
opts := options.Aggregate().SetCustom(customOpts)

// Run aggregate with custom options set.
mt.ClearEvents()
Expand Down
18 changes: 9 additions & 9 deletions mongo/options/aggregateoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type AggregateOptions struct {
// Custom options to be added to aggregate expression. Key-value pairs of the BSON map should correlate with desired
// option names and values. Values must be Marshalable. Custom options may conflict with non-custom options, and custom
// options bypass client-side validation. Prefer using non-custom options where possible.
CustomOptions bson.M
Custom bson.M
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also changed CustomOptions to Custom in AggregateOptions for consistency.

}

// Aggregate creates a new AggregateOptions instance.
Expand Down Expand Up @@ -120,12 +120,12 @@ func (ao *AggregateOptions) SetLet(let interface{}) *AggregateOptions {
return ao
}

// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
// should correlate with desired option names and values. Values must be Marshalable. Custom options
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
// using non-custom options where possible.
func (ao *AggregateOptions) SetCustomOptions(co bson.M) *AggregateOptions {
ao.CustomOptions = co
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
// with desired option names and values. Values must be Marshalable. Custom options may conflict
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
// options where possible.
func (ao *AggregateOptions) SetCustom(c bson.M) *AggregateOptions {
ao.Custom = c
return ao
}

Expand Down Expand Up @@ -164,8 +164,8 @@ func MergeAggregateOptions(opts ...*AggregateOptions) *AggregateOptions {
if ao.Let != nil {
aggOpts.Let = ao.Let
}
if ao.CustomOptions != nil {
aggOpts.CustomOptions = ao.CustomOptions
if ao.Custom != nil {
aggOpts.Custom = ao.Custom
}
}

Expand Down
34 changes: 25 additions & 9 deletions mongo/options/changestreamoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ type ChangeStreamOptions struct {
// Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should
// correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with
// non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible.
CustomOptions bson.M
Custom bson.M

// Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should
// correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side
// validation. Prefer using non-custom options where possible.
CustomPipeline bson.M
}

// ChangeStream creates a new ChangeStreamOptions instance.
Expand Down Expand Up @@ -103,12 +108,20 @@ func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptio
return cso
}

// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
// should correlate with desired option names and values. Values must be Marshalable. Custom options
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
// using non-custom options where possible.
func (cso *ChangeStreamOptions) SetCustomOptions(co bson.M) *ChangeStreamOptions {
cso.CustomOptions = co
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
// with desired option names and values. Values must be Marshalable. Custom options may conflict
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
// options where possible.
func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions {
cso.Custom = c
return cso
}

// SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map
// should correlate with desired option names and values. Values must be Marshalable. Custom pipeline
// options bypass client-side validation. Prefer using non-custom options where possible.
func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions {
cso.CustomPipeline = cp
return cso
}

Expand Down Expand Up @@ -141,8 +154,11 @@ func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions
if cso.StartAfter != nil {
csOpts.StartAfter = cso.StartAfter
}
if cso.CustomOptions != nil {
csOpts.CustomOptions = cso.CustomOptions
if cso.Custom != nil {
csOpts.Custom = cso.Custom
}
if cso.CustomPipeline != nil {
csOpts.CustomPipeline = cso.CustomPipeline
}
}

Expand Down