Skip to content

GODRIVER-1615 Use custom deployment for change streams #411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
return cs, cs.Err()
}

func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
return &changeStreamDeployment{
topologyKind: cs.client.deployment.Kind(),
server: server,
conn: connection,
}
}

func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
var server driver.Server
var conn driver.Connection
Expand All @@ -199,9 +207,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
defer conn.Close()
cs.wireVersion = conn.Description().WireVersion

cs.aggregate.Deployment(driver.SingleConnectionDeployment{
C: conn,
})
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))

if resuming {
cs.replaceOptions(ctx, cs.wireVersion)
Expand Down Expand Up @@ -252,9 +258,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
break
}

cs.aggregate.Deployment(driver.SingleConnectionDeployment{
C: conn,
})
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
cs.err = cs.aggregate.Execute(ctx)
}

Expand Down
49 changes: 49 additions & 0 deletions mongo/change_stream_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package mongo

import (
"context"

"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

type changeStreamDeployment struct {
topologyKind description.TopologyKind
server driver.Server
conn driver.Connection
}

var _ driver.Deployment = (*changeStreamDeployment)(nil)
var _ driver.Server = (*changeStreamDeployment)(nil)
var _ driver.ErrorProcessor = (*changeStreamDeployment)(nil)

func (c *changeStreamDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
return c, nil
}

func (c *changeStreamDeployment) SupportsRetryWrites() bool {
return false
}

func (c *changeStreamDeployment) Kind() description.TopologyKind {
return c.topologyKind
}

func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection, error) {
return c.conn, nil
}

func (c *changeStreamDeployment) ProcessError(err error) {
ep, ok := c.server.(driver.ErrorProcessor)
if !ok {
return
}

ep.ProcessError(err)
}
87 changes: 87 additions & 0 deletions mongo/integration/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/testutil/assert"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
Expand Down Expand Up @@ -499,6 +500,92 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
tryNextGetmoreError(mt, cs)
})
})

customDeploymentClientOpts := options.Client().
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc).
SetRetryReads(false)
customDeploymentOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
ClientOptions(customDeploymentClientOpts).
CreateClient(false)
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
// by ChangeStream when executing an aggregate.

mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
clearPoolChan()
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"aggregate"},
CloseConnection: true,
},
})

_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
clearPoolChan()
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"getMore"},
CloseConnection: true,
},
})

cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.Nil(mt, err, "Watch error: %v", err)
defer closeStream(cs)

_, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)

assert.True(mt, cs.Next(mtest.Background), "expected Next to return true, got false (iteration error %v)",
cs.Err())
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
})
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
clearPoolChan()

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 2,
},
Data: mtest.FailPointData{
FailCommands: []string{"aggregate"},
CloseConnection: true,
},
})

_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
assert.NotNil(mt, err, "expected Watch error, got nil")

var numClearedEvents int
for len(poolChan) > 0 {
curr := <-poolChan
if curr.Type == event.PoolCleared {
numClearedEvents++
}
}
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
})
})
}

func closeStream(cs *mongo.ChangeStream) {
Expand Down
6 changes: 4 additions & 2 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func (SingleServerDeployment) SupportsRetryWrites() bool { return false }
// Kind implements the Deployment interface. It always returns description.Single.
func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single }

// SingleConnectionDeployment is an implementation of Deployment that always returns the same
// Connection.
// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This
// implementation should only be used for connection handshakes and server heartbeats as it does not implement
// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection,
// which does not implement Compressor.
type SingleConnectionDeployment struct{ C Connection }

var _ Deployment = SingleConnectionDeployment{}
Expand Down