Skip to content

Commit f4a087a

Browse files
authored
GODRIVER-2337 Add CustomPipeline to ChangeStreamOptions (edited) (#882)
Also renames CustomOptions to Custom in ChangeStreamOptions and AggregateOptions
1 parent 526e277 commit f4a087a

File tree

6 files changed

+100
-41
lines changed

6 files changed

+100
-41
lines changed

mongo/change_stream.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,22 @@ type ChangeStream struct {
6969
// TryNext. If continued access is required, a copy must be made.
7070
Current bson.Raw
7171

72-
aggregate *operation.Aggregate
73-
pipelineSlice []bsoncore.Document
74-
cursor changeStreamCursor
75-
cursorOptions driver.CursorOptions
76-
batch []bsoncore.Document
77-
resumeToken bson.Raw
78-
err error
79-
sess *session.Client
80-
client *Client
81-
registry *bsoncodec.Registry
82-
streamType StreamType
83-
options *options.ChangeStreamOptions
84-
selector description.ServerSelector
85-
operationTime *primitive.Timestamp
86-
wireVersion *description.VersionRange
72+
aggregate *operation.Aggregate
73+
pipelineSlice []bsoncore.Document
74+
pipelineOptions map[string]bsoncore.Value
75+
cursor changeStreamCursor
76+
cursorOptions driver.CursorOptions
77+
batch []bsoncore.Document
78+
resumeToken bson.Raw
79+
err error
80+
sess *session.Client
81+
client *Client
82+
registry *bsoncodec.Registry
83+
streamType StreamType
84+
options *options.ChangeStreamOptions
85+
selector description.ServerSelector
86+
operationTime *primitive.Timestamp
87+
wireVersion *description.VersionRange
8788
}
8889

8990
type changeStreamConfig struct {
@@ -143,11 +144,11 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
143144
if cs.options.MaxAwaitTime != nil {
144145
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
145146
}
146-
if cs.options.CustomOptions != nil {
147+
if cs.options.Custom != nil {
147148
// Marshal all custom options before passing to the initial aggregate. Return
148149
// any errors from Marshaling.
149150
customOptions := make(map[string]bsoncore.Value)
150-
for optionName, optionValue := range cs.options.CustomOptions {
151+
for optionName, optionValue := range cs.options.Custom {
151152
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
152153
if err != nil {
153154
cs.err = err
@@ -159,6 +160,21 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
159160
}
160161
cs.aggregate.CustomOptions(customOptions)
161162
}
163+
if cs.options.CustomPipeline != nil {
164+
// Marshal all custom pipeline options before building pipeline slice. Return
165+
// any errors from Marshaling.
166+
cs.pipelineOptions = make(map[string]bsoncore.Value)
167+
for optionName, optionValue := range cs.options.CustomPipeline {
168+
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
169+
if err != nil {
170+
cs.err = err
171+
closeImplicitSession(cs.sess)
172+
return nil, cs.Err()
173+
}
174+
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
175+
cs.pipelineOptions[optionName] = optionValueBSON
176+
}
177+
}
162178

163179
switch cs.streamType {
164180
case ClientStream:
@@ -406,6 +422,11 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
406422
plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
407423
}
408424

425+
// Append custom pipeline options.
426+
for optionName, optionValue := range cs.pipelineOptions {
427+
plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
428+
}
429+
409430
if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
410431
return nil
411432
}

mongo/collection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,11 +853,11 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
853853
}
854854
op.Let(let)
855855
}
856-
if ao.CustomOptions != nil {
856+
if ao.Custom != nil {
857857
// Marshal all custom options before passing to the aggregate operation. Return
858858
// any errors from Marshaling.
859859
customOptions := make(map[string]bsoncore.Value)
860-
for optionName, optionValue := range ao.CustomOptions {
860+
for optionName, optionValue := range ao.Custom {
861861
bsonType, bsonData, err := bson.MarshalValueWithRegistry(a.registry, optionValue)
862862
if err != nil {
863863
return nil, err

mongo/integration/change_stream_test.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,11 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
634634
evt := mt.GetStartedEvent()
635635
assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
636636
})
637-
mt.Run("CustomOptions", func(mt *mtest.T) {
637+
mt.Run("Custom", func(mt *mtest.T) {
638638
// Custom options should be a BSON map of option names to Marshalable option values.
639639
// We use "allowDiskUse" as an example.
640640
customOpts := bson.M{"allowDiskUse": true}
641-
opts := options.ChangeStream().SetCustomOptions(customOpts)
641+
opts := options.ChangeStream().SetCustom(customOpts)
642642

643643
// Create change stream with custom options set.
644644
mt.ClearEvents()
@@ -656,6 +656,28 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
656656
assert.True(mt, ok, "expected field 'allowDiskUse' to be boolean, got %v", aduVal.Type.String())
657657
assert.True(mt, adu, "expected field 'allowDiskUse' to be true, got false")
658658
})
659+
mt.RunOpts("CustomPipeline", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
660+
// Custom pipeline options should be a BSON map of option names to Marshalable option values.
661+
// We use "allChangesForCluster" as an example.
662+
customPipelineOpts := bson.M{"allChangesForCluster": false}
663+
opts := options.ChangeStream().SetCustomPipeline(customPipelineOpts)
664+
665+
// Create change stream with custom pipeline options set.
666+
mt.ClearEvents()
667+
cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
668+
assert.Nil(mt, err, "Watch error: %v", err)
669+
defer closeStream(cs)
670+
671+
// Assert that custom pipeline option is included in the $changeStream stage.
672+
evt := mt.GetStartedEvent()
673+
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
674+
675+
acfcVal, err := evt.Command.LookupErr("pipeline", "0", "$changeStream", "allChangesForCluster")
676+
assert.Nil(mt, err, "expected field 'allChangesForCluster' in $changeStream stage not found")
677+
acfc, ok := acfcVal.BooleanOK()
678+
assert.True(mt, ok, "expected field 'allChangesForCluster' to be boolean, got %v", acfcVal.Type.String())
679+
assert.False(mt, acfc, "expected field 'allChangesForCluster' to be false, got %v", acfc)
680+
})
659681
}
660682

661683
func closeStream(cs *mongo.ChangeStream) {

mongo/integration/collection_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -803,11 +803,11 @@ func TestCollection(t *testing.T) {
803803
return mt.Coll.Aggregate(context.Background(), mongo.Pipeline{}, options.Aggregate().SetBatchSize(3))
804804
})
805805
})
806-
mt.Run("CustomOptions", func(mt *mtest.T) {
806+
mt.Run("Custom", func(mt *mtest.T) {
807807
// Custom options should be a BSON map of option names to Marshalable option values.
808808
// We use "allowDiskUse" as an example.
809809
customOpts := bson.M{"allowDiskUse": true}
810-
opts := options.Aggregate().SetCustomOptions(customOpts)
810+
opts := options.Aggregate().SetCustom(customOpts)
811811

812812
// Run aggregate with custom options set.
813813
mt.ClearEvents()

mongo/options/aggregateoptions.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type AggregateOptions struct {
5858
// Custom options to be added to aggregate expression. Key-value pairs of the BSON map should correlate with desired
5959
// option names and values. Values must be Marshalable. Custom options may conflict with non-custom options, and custom
6060
// options bypass client-side validation. Prefer using non-custom options where possible.
61-
CustomOptions bson.M
61+
Custom bson.M
6262
}
6363

6464
// Aggregate creates a new AggregateOptions instance.
@@ -120,12 +120,12 @@ func (ao *AggregateOptions) SetLet(let interface{}) *AggregateOptions {
120120
return ao
121121
}
122122

123-
// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
124-
// should correlate with desired option names and values. Values must be Marshalable. Custom options
125-
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
126-
// using non-custom options where possible.
127-
func (ao *AggregateOptions) SetCustomOptions(co bson.M) *AggregateOptions {
128-
ao.CustomOptions = co
123+
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
124+
// with desired option names and values. Values must be Marshalable. Custom options may conflict
125+
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
126+
// options where possible.
127+
func (ao *AggregateOptions) SetCustom(c bson.M) *AggregateOptions {
128+
ao.Custom = c
129129
return ao
130130
}
131131

@@ -164,8 +164,8 @@ func MergeAggregateOptions(opts ...*AggregateOptions) *AggregateOptions {
164164
if ao.Let != nil {
165165
aggOpts.Let = ao.Let
166166
}
167-
if ao.CustomOptions != nil {
168-
aggOpts.CustomOptions = ao.CustomOptions
167+
if ao.Custom != nil {
168+
aggOpts.Custom = ao.Custom
169169
}
170170
}
171171

mongo/options/changestreamoptions.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ type ChangeStreamOptions struct {
5151
// Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should
5252
// correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with
5353
// non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible.
54-
CustomOptions bson.M
54+
Custom bson.M
55+
56+
// Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should
57+
// correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side
58+
// validation. Prefer using non-custom options where possible.
59+
CustomPipeline bson.M
5560
}
5661

5762
// ChangeStream creates a new ChangeStreamOptions instance.
@@ -103,12 +108,20 @@ func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptio
103108
return cso
104109
}
105110

106-
// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
107-
// should correlate with desired option names and values. Values must be Marshalable. Custom options
108-
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
109-
// using non-custom options where possible.
110-
func (cso *ChangeStreamOptions) SetCustomOptions(co bson.M) *ChangeStreamOptions {
111-
cso.CustomOptions = co
111+
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
112+
// with desired option names and values. Values must be Marshalable. Custom options may conflict
113+
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
114+
// options where possible.
115+
func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions {
116+
cso.Custom = c
117+
return cso
118+
}
119+
120+
// SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map
121+
// should correlate with desired option names and values. Values must be Marshalable. Custom pipeline
122+
// options bypass client-side validation. Prefer using non-custom options where possible.
123+
func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions {
124+
cso.CustomPipeline = cp
112125
return cso
113126
}
114127

@@ -141,8 +154,11 @@ func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions
141154
if cso.StartAfter != nil {
142155
csOpts.StartAfter = cso.StartAfter
143156
}
144-
if cso.CustomOptions != nil {
145-
csOpts.CustomOptions = cso.CustomOptions
157+
if cso.Custom != nil {
158+
csOpts.Custom = cso.Custom
159+
}
160+
if cso.CustomPipeline != nil {
161+
csOpts.CustomPipeline = cso.CustomPipeline
146162
}
147163
}
148164

0 commit comments

Comments
 (0)