Skip to content

Commit cd959e1

Browse files
author
Divjot Arora
committed
GODRIVER-1522 Ignore read preference for aggregations with output stages (#327)
1 parent cc958e8 commit cd959e1

File tree

2 files changed

+85
-2
lines changed

2 files changed

+85
-2
lines changed

mongo/collection.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -753,8 +753,24 @@ func aggregate(a aggregateParams) (*Cursor, error) {
753753
Crypt: a.client.crypt,
754754
}
755755

756-
op := operation.NewAggregate(pipelineArr).Session(sess).WriteConcern(wc).ReadConcern(rc).ReadPreference(a.readPreference).CommandMonitor(a.client.monitor).
757-
ServerSelector(selector).ClusterClock(a.client.clock).Database(a.db).Collection(a.col).Deployment(a.client.deployment).Crypt(a.client.crypt)
756+
op := operation.NewAggregate(pipelineArr).
757+
Session(sess).
758+
WriteConcern(wc).
759+
ReadConcern(rc).
760+
CommandMonitor(a.client.monitor).
761+
ServerSelector(selector).
762+
ClusterClock(a.client.clock).
763+
Database(a.db).
764+
Collection(a.col).
765+
Deployment(a.client.deployment).
766+
Crypt(a.client.crypt)
767+
if !hasOutputStage {
768+
// Only pass the user-specified read preference if the aggregation doesn't have a $out or $merge stage.
769+
// Otherwise, the read preference could be forwarded to a mongos, which would error if the aggregation were
770+
// executed against a non-primary node.
771+
op.ReadPreference(a.readPreference)
772+
}
773+
758774
if ao.AllowDiskUse != nil {
759775
op.AllowDiskUse(*ao.AllowDiskUse)
760776
}

mongo/integration/crud_prose_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (C) MongoDB, Inc. 2017-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package integration
8+
9+
import (
10+
"bytes"
11+
"testing"
12+
13+
"go.mongodb.org/mongo-driver/bson"
14+
"go.mongodb.org/mongo-driver/internal/testutil/assert"
15+
"go.mongodb.org/mongo-driver/mongo"
16+
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
17+
"go.mongodb.org/mongo-driver/mongo/options"
18+
"go.mongodb.org/mongo-driver/mongo/readpref"
19+
)
20+
21+
func TestAggregateSecondaryPreferredReadPreference(t *testing.T) {
22+
// Use secondaryPreferred instead of secondary because sharded clusters started up by mongo-orchestration have
23+
// one-node shards, so a secondary read preference is not satisfiable.
24+
secondaryPrefClientOpts := options.Client().
25+
SetWriteConcern(mtest.MajorityWc).
26+
SetReadPreference(readpref.SecondaryPreferred()).
27+
SetReadConcern(mtest.MajorityRc)
28+
mtOpts := mtest.NewOptions().
29+
ClientOptions(secondaryPrefClientOpts).
30+
MinServerVersion("4.1.0") // Consistent with tests in aggregate-out-readConcern.json
31+
32+
mt := mtest.New(t, mtOpts)
33+
mt.Run("aggregate $out with read preference secondary", func(mt *mtest.T) {
34+
doc, err := bson.Marshal(bson.D{
35+
{"_id", 1},
36+
{"x", 11},
37+
})
38+
assert.Nil(mt, err, "Marshal error: %v", err)
39+
_, err = mt.Coll.InsertOne(mtest.Background, doc)
40+
assert.Nil(mt, err, "InsertOne error: %v", err)
41+
42+
mt.ClearEvents()
43+
outputCollName := "aggregate-read-pref-secondary-output"
44+
outStage := bson.D{
45+
{"$out", outputCollName},
46+
}
47+
cursor, err := mt.Coll.Aggregate(mtest.Background, mongo.Pipeline{outStage})
48+
assert.Nil(mt, err, "Aggregate error: %v", err)
49+
_ = cursor.Close(mtest.Background)
50+
51+
// Assert that the output collection contains the document we expect.
52+
outputColl := mt.CreateCollection(mtest.Collection{Name: outputCollName}, false)
53+
cursor, err = outputColl.Find(mtest.Background, bson.D{})
54+
assert.Nil(mt, err, "Find error: %v", err)
55+
defer cursor.Close(mtest.Background)
56+
57+
assert.True(mt, cursor.Next(mtest.Background), "expected Next to return true, got false")
58+
assert.True(mt, bytes.Equal(doc, cursor.Current), "expected document %s, got %s", bson.Raw(doc), cursor.Current)
59+
assert.False(mt, cursor.Next(mtest.Background), "unexpected document returned by Find: %s", cursor.Current)
60+
61+
// Assert that no read preference was sent to the server.
62+
evt := mt.GetStartedEvent()
63+
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate', got '%s'", evt.CommandName)
64+
_, err = evt.Command.LookupErr("$readPreference")
65+
assert.NotNil(mt, err, "expected command %s to not contain $readPreference", evt.Command)
66+
})
67+
}

0 commit comments

Comments
 (0)