Skip to content

Commit ed2bb99

Browse files
author
Divjot Arora
authored
GODRIVER-1615 Use custom deployment for change streams (#411)
1 parent eb2c1c4 commit ed2bb99

File tree

4 files changed

+150
-8
lines changed

4 files changed

+150
-8
lines changed

mongo/change_stream.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
185185
return cs, cs.Err()
186186
}
187187

188+
func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
189+
return &changeStreamDeployment{
190+
topologyKind: cs.client.deployment.Kind(),
191+
server: server,
192+
conn: connection,
193+
}
194+
}
195+
188196
func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
189197
var server driver.Server
190198
var conn driver.Connection
@@ -199,9 +207,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
199207
defer conn.Close()
200208
cs.wireVersion = conn.Description().WireVersion
201209

202-
cs.aggregate.Deployment(driver.SingleConnectionDeployment{
203-
C: conn,
204-
})
210+
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
205211

206212
if resuming {
207213
cs.replaceOptions(ctx, cs.wireVersion)
@@ -252,9 +258,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
252258
break
253259
}
254260

255-
cs.aggregate.Deployment(driver.SingleConnectionDeployment{
256-
C: conn,
257-
})
261+
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
258262
cs.err = cs.aggregate.Execute(ctx)
259263
}
260264

mongo/change_stream_deployment.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package mongo
8+
9+
import (
10+
"context"
11+
12+
"go.mongodb.org/mongo-driver/x/mongo/driver"
13+
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
14+
)
15+
16+
type changeStreamDeployment struct {
17+
topologyKind description.TopologyKind
18+
server driver.Server
19+
conn driver.Connection
20+
}
21+
22+
var _ driver.Deployment = (*changeStreamDeployment)(nil)
23+
var _ driver.Server = (*changeStreamDeployment)(nil)
24+
var _ driver.ErrorProcessor = (*changeStreamDeployment)(nil)
25+
26+
func (c *changeStreamDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
27+
return c, nil
28+
}
29+
30+
func (c *changeStreamDeployment) SupportsRetryWrites() bool {
31+
return false
32+
}
33+
34+
func (c *changeStreamDeployment) Kind() description.TopologyKind {
35+
return c.topologyKind
36+
}
37+
38+
func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection, error) {
39+
return c.conn, nil
40+
}
41+
42+
func (c *changeStreamDeployment) ProcessError(err error) {
43+
ep, ok := c.server.(driver.ErrorProcessor)
44+
if !ok {
45+
return
46+
}
47+
48+
ep.ProcessError(err)
49+
}

mongo/integration/change_stream_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.mongodb.org/mongo-driver/bson"
1414
"go.mongodb.org/mongo-driver/bson/primitive"
15+
"go.mongodb.org/mongo-driver/event"
1516
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1617
"go.mongodb.org/mongo-driver/mongo"
1718
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
@@ -499,6 +500,92 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
499500
tryNextGetmoreError(mt, cs)
500501
})
501502
})
503+
504+
customDeploymentClientOpts := options.Client().
505+
SetPoolMonitor(poolMonitor).
506+
SetWriteConcern(mtest.MajorityWc).
507+
SetReadConcern(mtest.MajorityRc).
508+
SetRetryReads(false)
509+
customDeploymentOpts := mtest.NewOptions().
510+
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
511+
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
512+
ClientOptions(customDeploymentClientOpts).
513+
CreateClient(false)
514+
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
515+
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
516+
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
517+
// by ChangeStream when executing an aggregate.
518+
519+
mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
520+
clearPoolChan()
521+
mt.SetFailPoint(mtest.FailPoint{
522+
ConfigureFailPoint: "failCommand",
523+
Mode: mtest.FailPointMode{
524+
Times: 1,
525+
},
526+
Data: mtest.FailPointData{
527+
FailCommands: []string{"aggregate"},
528+
CloseConnection: true,
529+
},
530+
})
531+
532+
_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
533+
assert.NotNil(mt, err, "expected Watch error, got nil")
534+
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
535+
})
536+
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
537+
clearPoolChan()
538+
mt.SetFailPoint(mtest.FailPoint{
539+
ConfigureFailPoint: "failCommand",
540+
Mode: mtest.FailPointMode{
541+
Times: 1,
542+
},
543+
Data: mtest.FailPointData{
544+
FailCommands: []string{"getMore"},
545+
CloseConnection: true,
546+
},
547+
})
548+
549+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
550+
assert.Nil(mt, err, "Watch error: %v", err)
551+
defer closeStream(cs)
552+
553+
_, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
554+
assert.Nil(mt, err, "InsertOne error: %v", err)
555+
556+
assert.True(mt, cs.Next(mtest.Background), "expected Next to return true, got false (iteration error %v)",
557+
cs.Err())
558+
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
559+
})
560+
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
561+
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
562+
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
563+
clearPoolChan()
564+
565+
mt.SetFailPoint(mtest.FailPoint{
566+
ConfigureFailPoint: "failCommand",
567+
Mode: mtest.FailPointMode{
568+
Times: 2,
569+
},
570+
Data: mtest.FailPointData{
571+
FailCommands: []string{"aggregate"},
572+
CloseConnection: true,
573+
},
574+
})
575+
576+
_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
577+
assert.NotNil(mt, err, "expected Watch error, got nil")
578+
579+
var numClearedEvents int
580+
for len(poolChan) > 0 {
581+
curr := <-poolChan
582+
if curr.Type == event.PoolCleared {
583+
numClearedEvents++
584+
}
585+
}
586+
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
587+
})
588+
})
502589
}
503590

504591
func closeStream(cs *mongo.ChangeStream) {

x/mongo/driver/driver.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,10 @@ func (SingleServerDeployment) SupportsRetryWrites() bool { return false }
120120
// Kind implements the Deployment interface. It always returns description.Single.
121121
func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single }
122122

123-
// SingleConnectionDeployment is an implementation of Deployment that always returns the same
124-
// Connection.
123+
// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This
124+
// implementation should only be used for connection handshakes and server heartbeats as it does not implement
125+
// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection,
126+
// which does not implement Compressor.
125127
type SingleConnectionDeployment struct{ C Connection }
126128

127129
var _ Deployment = SingleConnectionDeployment{}

0 commit comments

Comments
 (0)