Skip to content

Commit 6164a6b

Browse files
Revert "GODRIVER-2337 Add CustomPipeline to ChangeStreamOptions (#880)"
This reverts commit 6cb4f98.
1 parent 6cb4f98 commit 6164a6b

File tree

6 files changed

+41
-100
lines changed

6 files changed

+41
-100
lines changed

mongo/change_stream.go

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,21 @@ type ChangeStream struct {
6969
// TryNext. If continued access is required, a copy must be made.
7070
Current bson.Raw
7171

72-
aggregate *operation.Aggregate
73-
pipelineSlice []bsoncore.Document
74-
pipelineOptions map[string]bsoncore.Value
75-
cursor changeStreamCursor
76-
cursorOptions driver.CursorOptions
77-
batch []bsoncore.Document
78-
resumeToken bson.Raw
79-
err error
80-
sess *session.Client
81-
client *Client
82-
registry *bsoncodec.Registry
83-
streamType StreamType
84-
options *options.ChangeStreamOptions
85-
selector description.ServerSelector
86-
operationTime *primitive.Timestamp
87-
wireVersion *description.VersionRange
72+
aggregate *operation.Aggregate
73+
pipelineSlice []bsoncore.Document
74+
cursor changeStreamCursor
75+
cursorOptions driver.CursorOptions
76+
batch []bsoncore.Document
77+
resumeToken bson.Raw
78+
err error
79+
sess *session.Client
80+
client *Client
81+
registry *bsoncodec.Registry
82+
streamType StreamType
83+
options *options.ChangeStreamOptions
84+
selector description.ServerSelector
85+
operationTime *primitive.Timestamp
86+
wireVersion *description.VersionRange
8887
}
8988

9089
type changeStreamConfig struct {
@@ -144,11 +143,11 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
144143
if cs.options.MaxAwaitTime != nil {
145144
cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
146145
}
147-
if cs.options.Custom != nil {
146+
if cs.options.CustomOptions != nil {
148147
// Marshal all custom options before passing to the initial aggregate. Return
149148
// any errors from Marshaling.
150149
customOptions := make(map[string]bsoncore.Value)
151-
for optionName, optionValue := range cs.options.Custom {
150+
for optionName, optionValue := range cs.options.CustomOptions {
152151
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
153152
if err != nil {
154153
cs.err = err
@@ -160,21 +159,6 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
160159
}
161160
cs.aggregate.CustomOptions(customOptions)
162161
}
163-
if cs.options.CustomPipeline != nil {
164-
// Marshal all custom pipeline options before building pipeline slice. Return
165-
// any errors from Marshaling.
166-
cs.pipelineOptions = make(map[string]bsoncore.Value)
167-
for optionName, optionValue := range cs.options.CustomPipeline {
168-
bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
169-
if err != nil {
170-
cs.err = err
171-
closeImplicitSession(cs.sess)
172-
return nil, cs.Err()
173-
}
174-
optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
175-
cs.pipelineOptions[optionName] = optionValueBSON
176-
}
177-
}
178162

179163
switch cs.streamType {
180164
case ClientStream:
@@ -422,11 +406,6 @@ func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
422406
plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
423407
}
424408

425-
// Append custom pipeline options.
426-
for optionName, optionValue := range cs.pipelineOptions {
427-
plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
428-
}
429-
430409
if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
431410
return nil
432411
}

mongo/collection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,11 +852,11 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
852852
}
853853
op.Let(let)
854854
}
855-
if ao.Custom != nil {
855+
if ao.CustomOptions != nil {
856856
// Marshal all custom options before passing to the aggregate operation. Return
857857
// any errors from Marshaling.
858858
customOptions := make(map[string]bsoncore.Value)
859-
for optionName, optionValue := range ao.Custom {
859+
for optionName, optionValue := range ao.CustomOptions {
860860
bsonType, bsonData, err := bson.MarshalValueWithRegistry(a.registry, optionValue)
861861
if err != nil {
862862
return nil, err

mongo/integration/change_stream_test.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,11 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
634634
evt := mt.GetStartedEvent()
635635
assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
636636
})
637-
mt.Run("Custom", func(mt *mtest.T) {
637+
mt.Run("CustomOptions", func(mt *mtest.T) {
638638
// Custom options should be a BSON map of option names to Marshalable option values.
639639
// We use "allowDiskUse" as an example.
640640
customOpts := bson.M{"allowDiskUse": true}
641-
opts := options.ChangeStream().SetCustom(customOpts)
641+
opts := options.ChangeStream().SetCustomOptions(customOpts)
642642

643643
// Create change stream with custom options set.
644644
mt.ClearEvents()
@@ -656,28 +656,6 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
656656
assert.True(mt, ok, "expected field 'allowDiskUse' to be boolean, got %v", aduVal.Type.String())
657657
assert.True(mt, adu, "expected field 'allowDiskUse' to be true, got false")
658658
})
659-
mt.RunOpts("CustomPipeline", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
660-
// Custom pipeline options should be a BSON map of option names to Marshalable option values.
661-
// We use "allChangesForCluster" as an example.
662-
customPipelineOpts := bson.M{"allChangesForCluster": false}
663-
opts := options.ChangeStream().SetCustomPipeline(customPipelineOpts)
664-
665-
// Create change stream with custom pipeline options set.
666-
mt.ClearEvents()
667-
cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
668-
assert.Nil(mt, err, "Watch error: %v", err)
669-
defer closeStream(cs)
670-
671-
// Assert that custom pipeline option is included in the $changeStream stage.
672-
evt := mt.GetStartedEvent()
673-
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
674-
675-
acfcVal, err := evt.Command.LookupErr("pipeline", "0", "$changeStream", "allChangesForCluster")
676-
assert.Nil(mt, err, "expected field 'allChangesForCluster' in $changeStream stage not found")
677-
acfc, ok := acfcVal.BooleanOK()
678-
assert.True(mt, ok, "expected field 'allChangesForCluster' to be boolean, got %v", acfcVal.Type.String())
679-
assert.False(mt, acfc, "expected field 'allChangesForCluster' to be false, got %v", acfc)
680-
})
681659
}
682660

683661
func closeStream(cs *mongo.ChangeStream) {

mongo/integration/collection_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -803,11 +803,11 @@ func TestCollection(t *testing.T) {
803803
return mt.Coll.Aggregate(context.Background(), mongo.Pipeline{}, options.Aggregate().SetBatchSize(3))
804804
})
805805
})
806-
mt.Run("Custom", func(mt *mtest.T) {
806+
mt.Run("CustomOptions", func(mt *mtest.T) {
807807
// Custom options should be a BSON map of option names to Marshalable option values.
808808
// We use "allowDiskUse" as an example.
809809
customOpts := bson.M{"allowDiskUse": true}
810-
opts := options.Aggregate().SetCustom(customOpts)
810+
opts := options.Aggregate().SetCustomOptions(customOpts)
811811

812812
// Run aggregate with custom options set.
813813
mt.ClearEvents()

mongo/options/aggregateoptions.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type AggregateOptions struct {
5858
// Custom options to be added to aggregate expression. Key-value pairs of the BSON map should correlate with desired
5959
// option names and values. Values must be Marshalable. Custom options may conflict with non-custom options, and custom
6060
// options bypass client-side validation. Prefer using non-custom options where possible.
61-
Custom bson.M
61+
CustomOptions bson.M
6262
}
6363

6464
// Aggregate creates a new AggregateOptions instance.
@@ -120,12 +120,12 @@ func (ao *AggregateOptions) SetLet(let interface{}) *AggregateOptions {
120120
return ao
121121
}
122122

123-
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
124-
// with desired option names and values. Values must be Marshalable. Custom options may conflict
125-
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
126-
// options where possible.
127-
func (ao *AggregateOptions) SetCustom(c bson.M) *AggregateOptions {
128-
ao.Custom = c
123+
// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
124+
// should correlate with desired option names and values. Values must be Marshalable. Custom options
125+
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
126+
// using non-custom options where possible.
127+
func (ao *AggregateOptions) SetCustomOptions(co bson.M) *AggregateOptions {
128+
ao.CustomOptions = co
129129
return ao
130130
}
131131

@@ -164,8 +164,8 @@ func MergeAggregateOptions(opts ...*AggregateOptions) *AggregateOptions {
164164
if ao.Let != nil {
165165
aggOpts.Let = ao.Let
166166
}
167-
if ao.Custom != nil {
168-
aggOpts.Custom = ao.Custom
167+
if ao.CustomOptions != nil {
168+
aggOpts.CustomOptions = ao.CustomOptions
169169
}
170170
}
171171

mongo/options/changestreamoptions.go

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,7 @@ type ChangeStreamOptions struct {
5151
// Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should
5252
// correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with
5353
// non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible.
54-
Custom bson.M
55-
56-
// Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should
57-
// correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side
58-
// validation. Prefer using non-custom options where possible.
59-
CustomPipeline bson.M
54+
CustomOptions bson.M
6055
}
6156

6257
// ChangeStream creates a new ChangeStreamOptions instance.
@@ -108,20 +103,12 @@ func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptio
108103
return cso
109104
}
110105

111-
// SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate
112-
// with desired option names and values. Values must be Marshalable. Custom options may conflict
113-
// with non-custom options, and custom options bypass client-side validation. Prefer using non-custom
114-
// options where possible.
115-
func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions {
116-
cso.Custom = c
117-
return cso
118-
}
119-
120-
// SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map
121-
// should correlate with desired option names and values. Values must be Marshalable. Custom pipeline
122-
// options bypass client-side validation. Prefer using non-custom options where possible.
123-
func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions {
124-
cso.CustomPipeline = cp
106+
// SetCustomOptions sets the value for the CustomOptions field. Key-value pairs of the BSON map
107+
// should correlate with desired option names and values. Values must be Marshalable. Custom options
108+
// may conflict with non-custom options, and custom options bypass client-side validation. Prefer
109+
// using non-custom options where possible.
110+
func (cso *ChangeStreamOptions) SetCustomOptions(co bson.M) *ChangeStreamOptions {
111+
cso.CustomOptions = co
125112
return cso
126113
}
127114

@@ -154,11 +141,8 @@ func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions
154141
if cso.StartAfter != nil {
155142
csOpts.StartAfter = cso.StartAfter
156143
}
157-
if cso.Custom != nil {
158-
csOpts.Custom = cso.Custom
159-
}
160-
if cso.CustomPipeline != nil {
161-
csOpts.CustomPipeline = cso.CustomPipeline
144+
if cso.CustomOptions != nil {
145+
csOpts.CustomOptions = cso.CustomOptions
162146
}
163147
}
164148

0 commit comments

Comments
 (0)