Skip to content

Commit 012750a

Browse files
author
Divjot Arora
committed
GODRIVER-1615 Use custom deployment for change streams (#411)
1 parent 66f247d commit 012750a

File tree

4 files changed

+150
-8
lines changed

4 files changed

+150
-8
lines changed

mongo/change_stream.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in
162162
return cs, cs.Err()
163163
}
164164

165+
func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
166+
return &changeStreamDeployment{
167+
topologyKind: cs.client.deployment.Kind(),
168+
server: server,
169+
conn: connection,
170+
}
171+
}
172+
165173
func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
166174
var server driver.Server
167175
var conn driver.Connection
@@ -176,9 +184,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
176184

177185
defer conn.Close()
178186

179-
cs.aggregate.Deployment(driver.SingleConnectionDeployment{
180-
C: conn,
181-
})
187+
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
182188

183189
if resuming {
184190
cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
@@ -230,9 +236,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
230236
break
231237
}
232238

233-
cs.aggregate.Deployment(driver.SingleConnectionDeployment{
234-
C: conn,
235-
})
239+
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
236240
cs.err = cs.aggregate.Execute(ctx)
237241
}
238242

mongo/change_stream_deployment.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package mongo
8+
9+
import (
10+
"context"
11+
12+
"go.mongodb.org/mongo-driver/x/mongo/driver"
13+
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
14+
)
15+
16+
type changeStreamDeployment struct {
17+
topologyKind description.TopologyKind
18+
server driver.Server
19+
conn driver.Connection
20+
}
21+
22+
var _ driver.Deployment = (*changeStreamDeployment)(nil)
23+
var _ driver.Server = (*changeStreamDeployment)(nil)
24+
var _ driver.ErrorProcessor = (*changeStreamDeployment)(nil)
25+
26+
func (c *changeStreamDeployment) SelectServer(context.Context, description.ServerSelector) (driver.Server, error) {
27+
return c, nil
28+
}
29+
30+
func (c *changeStreamDeployment) SupportsRetryWrites() bool {
31+
return false
32+
}
33+
34+
func (c *changeStreamDeployment) Kind() description.TopologyKind {
35+
return c.topologyKind
36+
}
37+
38+
func (c *changeStreamDeployment) Connection(context.Context) (driver.Connection, error) {
39+
return c.conn, nil
40+
}
41+
42+
func (c *changeStreamDeployment) ProcessError(err error) {
43+
ep, ok := c.server.(driver.ErrorProcessor)
44+
if !ok {
45+
return
46+
}
47+
48+
ep.ProcessError(err)
49+
}

mongo/integration/change_stream_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.mongodb.org/mongo-driver/bson"
1414
"go.mongodb.org/mongo-driver/bson/primitive"
15+
"go.mongodb.org/mongo-driver/event"
1516
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1617
"go.mongodb.org/mongo-driver/mongo"
1718
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
@@ -515,6 +516,92 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
515516
tryNextGetmoreError(mt, cs)
516517
})
517518
})
519+
520+
customDeploymentClientOpts := options.Client().
521+
SetPoolMonitor(poolMonitor).
522+
SetWriteConcern(mtest.MajorityWc).
523+
SetReadConcern(mtest.MajorityRc).
524+
SetRetryReads(false)
525+
customDeploymentOpts := mtest.NewOptions().
526+
Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
527+
MinServerVersion("4.0"). // 4.0 is needed to use replica set fail points.
528+
ClientOptions(customDeploymentClientOpts).
529+
CreateClient(false)
530+
mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
531+
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
532+
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
533+
// by ChangeStream when executing an aggregate.
534+
535+
mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
536+
clearPoolChan()
537+
mt.SetFailPoint(mtest.FailPoint{
538+
ConfigureFailPoint: "failCommand",
539+
Mode: mtest.FailPointMode{
540+
Times: 1,
541+
},
542+
Data: mtest.FailPointData{
543+
FailCommands: []string{"aggregate"},
544+
CloseConnection: true,
545+
},
546+
})
547+
548+
_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
549+
assert.NotNil(mt, err, "expected Watch error, got nil")
550+
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
551+
})
552+
mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
553+
clearPoolChan()
554+
mt.SetFailPoint(mtest.FailPoint{
555+
ConfigureFailPoint: "failCommand",
556+
Mode: mtest.FailPointMode{
557+
Times: 1,
558+
},
559+
Data: mtest.FailPointData{
560+
FailCommands: []string{"getMore"},
561+
CloseConnection: true,
562+
},
563+
})
564+
565+
cs, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
566+
assert.Nil(mt, err, "Watch error: %v", err)
567+
defer closeStream(cs)
568+
569+
_, err = mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
570+
assert.Nil(mt, err, "InsertOne error: %v", err)
571+
572+
assert.True(mt, cs.Next(mtest.Background), "expected Next to return true, got false (iteration error %v)",
573+
cs.Err())
574+
assert.True(mt, isPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
575+
})
576+
retryAggClientOpts := options.Client().SetRetryReads(true).SetPoolMonitor(poolMonitor)
577+
retryAggMtOpts := mtest.NewOptions().ClientOptions(retryAggClientOpts)
578+
mt.RunOpts("errors are processed for SDAM on retried aggregate", retryAggMtOpts, func(mt *mtest.T) {
579+
clearPoolChan()
580+
581+
mt.SetFailPoint(mtest.FailPoint{
582+
ConfigureFailPoint: "failCommand",
583+
Mode: mtest.FailPointMode{
584+
Times: 2,
585+
},
586+
Data: mtest.FailPointData{
587+
FailCommands: []string{"aggregate"},
588+
CloseConnection: true,
589+
},
590+
})
591+
592+
_, err := mt.Coll.Watch(mtest.Background, mongo.Pipeline{})
593+
assert.NotNil(mt, err, "expected Watch error, got nil")
594+
595+
var numClearedEvents int
596+
for len(poolChan) > 0 {
597+
curr := <-poolChan
598+
if curr.Type == event.PoolCleared {
599+
numClearedEvents++
600+
}
601+
}
602+
assert.Equal(mt, 2, numClearedEvents, "expected two PoolCleared events, got %d", numClearedEvents)
603+
})
604+
})
518605
}
519606

520607
func closeStream(cs *mongo.ChangeStream) {

x/mongo/driver/driver.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ func (SingleServerDeployment) SupportsRetryWrites() bool { return false }
105105
// Kind implements the Deployment interface. It always returns description.Single.
106106
func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single }
107107

108-
// SingleConnectionDeployment is an implementation of Deployment that always returns the same
109-
// Connection.
108+
// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This
109+
// implementation should only be used for connection handshakes and server heartbeats as it does not implement
110+
// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection,
111+
// which does not implement Compressor.
110112
type SingleConnectionDeployment struct{ C Connection }
111113

112114
var _ Deployment = SingleConnectionDeployment{}

0 commit comments

Comments
 (0)