Skip to content

GODRIVER-1522 Ignore read preference for aggregations with output stages #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,24 @@ func aggregate(a aggregateParams) (*Cursor, error) {
Crypt: a.client.crypt,
}

op := operation.NewAggregate(pipelineArr).Session(sess).WriteConcern(wc).ReadConcern(rc).ReadPreference(a.readPreference).CommandMonitor(a.client.monitor).
ServerSelector(selector).ClusterClock(a.client.clock).Database(a.db).Collection(a.col).Deployment(a.client.deployment).Crypt(a.client.crypt)
op := operation.NewAggregate(pipelineArr).
Session(sess).
WriteConcern(wc).
ReadConcern(rc).
CommandMonitor(a.client.monitor).
ServerSelector(selector).
ClusterClock(a.client.clock).
Database(a.db).
Collection(a.col).
Deployment(a.client.deployment).
Crypt(a.client.crypt)
if !hasOutputStage {
// Only pass the user-specified read preference if the aggregation doesn't have a $out or $merge stage.
// Otherwise, the read preference could be forwarded to a mongos, which would error if the aggregation were
// executed against a non-primary node.
op.ReadPreference(a.readPreference)
}

if ao.AllowDiskUse != nil {
op.AllowDiskUse(*ao.AllowDiskUse)
}
Expand Down
53 changes: 52 additions & 1 deletion mongo/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"bytes"
"errors"
"testing"

Expand All @@ -15,10 +16,11 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

func TestCrudProse(t *testing.T) {
func TestWriteErrorsWithLabels(t *testing.T) {
clientOpts := options.Client().SetRetryWrites(false).SetWriteConcern(mtest.MajorityWc).
SetReadConcern(mtest.MajorityRc)
mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.0").Topologies(mtest.ReplicaSet).
Expand Down Expand Up @@ -107,6 +109,7 @@ func TestCrudProse(t *testing.T) {
assert.True(mt, ok, "expected mongo.BulkWriteException, got %T", err)
assert.True(mt, we.HasErrorLabel(label), "expected error to have label: %v", label)
})

}

func TestHintErrors(t *testing.T) {
Expand Down Expand Up @@ -174,3 +177,51 @@ func TestHintWithUnacknowledgedWriteErrors(t *testing.T) {
assert.Equal(mt, got, expected, "expected: %v got: %v", expected, got)
})
}

func TestAggregateSecondaryPreferredReadPreference(t *testing.T) {
// Use secondaryPreferred instead of secondary because sharded clusters started up by mongo-orchestration have
// one-node shards, so a secondary read preference is not satisfiable.
secondaryPrefClientOpts := options.Client().
SetWriteConcern(mtest.MajorityWc).
SetReadPreference(readpref.SecondaryPreferred()).
SetReadConcern(mtest.MajorityRc)
mtOpts := mtest.NewOptions().
ClientOptions(secondaryPrefClientOpts).
MinServerVersion("4.1.0") // Consistent with tests in aggregate-out-readConcern.json

mt := mtest.New(t, mtOpts)
mt.Run("aggregate $out with read preference secondary", func(mt *mtest.T) {
doc, err := bson.Marshal(bson.D{
{"_id", 1},
{"x", 11},
})
assert.Nil(mt, err, "Marshal error: %v", err)
_, err = mt.Coll.InsertOne(mtest.Background, doc)
assert.Nil(mt, err, "InsertOne error: %v", err)

mt.ClearEvents()
outputCollName := "aggregate-read-pref-secondary-output"
outStage := bson.D{
{"$out", outputCollName},
}
cursor, err := mt.Coll.Aggregate(mtest.Background, mongo.Pipeline{outStage})
assert.Nil(mt, err, "Aggregate error: %v", err)
_ = cursor.Close(mtest.Background)

// Assert that the output collection contains the document we expect.
outputColl := mt.CreateCollection(mtest.Collection{Name: outputCollName}, false)
cursor, err = outputColl.Find(mtest.Background, bson.D{})
assert.Nil(mt, err, "Find error: %v", err)
defer cursor.Close(mtest.Background)

assert.True(mt, cursor.Next(mtest.Background), "expected Next to return true, got false")
assert.True(mt, bytes.Equal(doc, cursor.Current), "expected document %s, got %s", bson.Raw(doc), cursor.Current)
assert.False(mt, cursor.Next(mtest.Background), "unexpected document returned by Find: %s", cursor.Current)

// Assert that no read preference was sent to the server.
evt := mt.GetStartedEvent()
assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate', got '%s'", evt.CommandName)
_, err = evt.Command.LookupErr("$readPreference")
assert.NotNil(mt, err, "expected command %s to not contain $readPreference", evt.Command)
})
}