Skip to content

Commit 4f5fc19

Browse files
author
Divjot Arora
authored
GODRIVER-1522 Ignore read preference for aggregations with output stages (#327)
1 parent 5d11dfa commit 4f5fc19

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

mongo/collection.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -735,8 +735,24 @@ func aggregate(a aggregateParams) (*Cursor, error) {
735735
Crypt: a.client.crypt,
736736
}
737737

738-
op := operation.NewAggregate(pipelineArr).Session(sess).WriteConcern(wc).ReadConcern(rc).ReadPreference(a.readPreference).CommandMonitor(a.client.monitor).
739-
ServerSelector(selector).ClusterClock(a.client.clock).Database(a.db).Collection(a.col).Deployment(a.client.deployment).Crypt(a.client.crypt)
738+
op := operation.NewAggregate(pipelineArr).
739+
Session(sess).
740+
WriteConcern(wc).
741+
ReadConcern(rc).
742+
CommandMonitor(a.client.monitor).
743+
ServerSelector(selector).
744+
ClusterClock(a.client.clock).
745+
Database(a.db).
746+
Collection(a.col).
747+
Deployment(a.client.deployment).
748+
Crypt(a.client.crypt)
749+
if !hasOutputStage {
750+
// Only pass the user-specified read preference if the aggregation doesn't have a $out or $merge stage.
751+
// Otherwise, the read preference could be forwarded to a mongos, which would error if the aggregation were
752+
// executed against a non-primary node.
753+
op.ReadPreference(a.readPreference)
754+
}
755+
740756
if ao.AllowDiskUse != nil {
741757
op.AllowDiskUse(*ao.AllowDiskUse)
742758
}

mongo/integration/crud_prose_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package integration
88

99
import (
10+
"bytes"
1011
"errors"
1112
"testing"
1213

@@ -15,10 +16,11 @@ import (
1516
"go.mongodb.org/mongo-driver/mongo"
1617
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
1718
"go.mongodb.org/mongo-driver/mongo/options"
19+
"go.mongodb.org/mongo-driver/mongo/readpref"
1820
"go.mongodb.org/mongo-driver/mongo/writeconcern"
1921
)
2022

21-
func TestCrudProse(t *testing.T) {
23+
func TestWriteErrorsWithLabels(t *testing.T) {
2224
clientOpts := options.Client().SetRetryWrites(false).SetWriteConcern(mtest.MajorityWc).
2325
SetReadConcern(mtest.MajorityRc)
2426
mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.0").Topologies(mtest.ReplicaSet).
@@ -107,6 +109,7 @@ func TestCrudProse(t *testing.T) {
107109
assert.True(mt, ok, "expected mongo.BulkWriteException, got %T", err)
108110
assert.True(mt, we.HasErrorLabel(label), "expected error to have label: %v", label)
109111
})
112+
110113
}
111114

112115
func TestHintErrors(t *testing.T) {
@@ -174,3 +177,51 @@ func TestHintWithUnacknowledgedWriteErrors(t *testing.T) {
174177
assert.Equal(mt, got, expected, "expected: %v got: %v", expected, got)
175178
})
176179
}
180+
181+
func TestAggregateSecondaryPreferredReadPreference(t *testing.T) {
182+
// Use secondaryPreferred instead of secondary because sharded clusters started up by mongo-orchestration have
183+
// one-node shards, so a secondary read preference is not satisfiable.
184+
secondaryPrefClientOpts := options.Client().
185+
SetWriteConcern(mtest.MajorityWc).
186+
SetReadPreference(readpref.SecondaryPreferred()).
187+
SetReadConcern(mtest.MajorityRc)
188+
mtOpts := mtest.NewOptions().
189+
ClientOptions(secondaryPrefClientOpts).
190+
MinServerVersion("4.1.0") // Consistent with tests in aggregate-out-readConcern.json
191+
192+
mt := mtest.New(t, mtOpts)
193+
mt.Run("aggregate $out with read preference secondary", func(mt *mtest.T) {
194+
doc, err := bson.Marshal(bson.D{
195+
{"_id", 1},
196+
{"x", 11},
197+
})
198+
assert.Nil(mt, err, "Marshal error: %v", err)
199+
_, err = mt.Coll.InsertOne(mtest.Background, doc)
200+
assert.Nil(mt, err, "InsertOne error: %v", err)
201+
202+
mt.ClearEvents()
203+
outputCollName := "aggregate-read-pref-secondary-output"
204+
outStage := bson.D{
205+
{"$out", outputCollName},
206+
}
207+
cursor, err := mt.Coll.Aggregate(mtest.Background, mongo.Pipeline{outStage})
208+
assert.Nil(mt, err, "Aggregate error: %v", err)
209+
_ = cursor.Close(mtest.Background)
210+
211+
// Assert that the output collection contains the document we expect.
212+
outputColl := mt.CreateCollection(mtest.Collection{Name: outputCollName}, false)
213+
cursor, err = outputColl.Find(mtest.Background, bson.D{})
214+
assert.Nil(mt, err, "Find error: %v", err)
215+
defer cursor.Close(mtest.Background)
216+
217+
assert.True(mt, cursor.Next(mtest.Background), "expected Next to return true, got false")
218+
assert.True(mt, bytes.Equal(doc, cursor.Current), "expected document %s, got %s", bson.Raw(doc), cursor.Current)
219+
assert.False(mt, cursor.Next(mtest.Background), "unexpected document returned by Find: %s", cursor.Current)
220+
221+
// Assert that no read preference was sent to the server.
222+
evt := mt.GetStartedEvent()
223+
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate', got '%s'", evt.CommandName)
224+
_, err = evt.Command.LookupErr("$readPreference")
225+
assert.NotNil(mt, err, "expected command %s to not contain $readPreference", evt.Command)
226+
})
227+
}

0 commit comments

Comments
 (0)