Skip to content

GODRIVER-1028 return correct error indexes for bulkWrite #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
type bulkWriteBatch struct {
models []WriteModel
canRetry bool
indexes []int
}

// bulkWrite perfoms a bulkwrite operation
Expand Down Expand Up @@ -52,7 +53,6 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
}

var lastErr error
var opIndex int64 // the operation index for the upsertedIDs map
continueOnError := !ordered
for _, batch := range batches {
if len(batch.models) == 0 {
Expand All @@ -66,13 +66,10 @@ func (bw *bulkWrite) execute(ctx context.Context) error {

batchRes, batchErr, err := bw.runBatch(ctx, batch)

bw.mergeResults(batchRes, opIndex)
bw.mergeResults(batchRes)

bwErr.WriteConcernError = batchErr.WriteConcernError
bwErr.Labels = append(bwErr.Labels, batchErr.Labels...)
for i := range batchErr.WriteErrors {
batchErr.WriteErrors[i].Index = batchErr.WriteErrors[i].Index + int(opIndex)
}

bwErr.WriteErrors = append(bwErr.WriteErrors, batchErr.WriteErrors...)

Expand All @@ -89,8 +86,6 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
if err != nil {
lastErr = err
}

opIndex += int64(len(batch.models))
}

bw.result.MatchedCount -= bw.result.UpsertedCount
Expand Down Expand Up @@ -151,16 +146,18 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr
batchRes.ModifiedCount = int64(res.NModified)
batchRes.UpsertedCount = int64(len(res.Upserted))
for _, upsert := range res.Upserted {
batchRes.UpsertedIDs[upsert.Index] = upsert.ID
batchRes.UpsertedIDs[int64(batch.indexes[upsert.Index])] = upsert.ID
}
}

batchErr.WriteErrors = make([]BulkWriteError, 0, len(writeErrors))
convWriteErrors := writeErrorsFromDriverWriteErrors(writeErrors)
for _, we := range convWriteErrors {
request := batch.models[we.Index]
we.Index = batch.indexes[we.Index]
batchErr.WriteErrors = append(batchErr.WriteErrors, BulkWriteError{
WriteError: we,
Request: batch.models[we.Index],
Request: request,
})
}
return batchRes, batchErr, nil
Expand Down Expand Up @@ -401,18 +398,23 @@ func createBatches(models []WriteModel, ordered bool) []bulkWriteBatch {
batches[updateOneCommand].canRetry = true

// TODO(GODRIVER-1157): fix batching once operation retryability is fixed
for _, model := range models {
for i, model := range models {
switch model.(type) {
case *InsertOneModel:
batches[insertCommand].models = append(batches[insertCommand].models, model)
batches[insertCommand].indexes = append(batches[insertCommand].indexes, i)
case *DeleteOneModel:
batches[deleteOneCommand].models = append(batches[deleteOneCommand].models, model)
batches[deleteOneCommand].indexes = append(batches[deleteOneCommand].indexes, i)
case *DeleteManyModel:
batches[deleteManyCommand].models = append(batches[deleteManyCommand].models, model)
batches[deleteManyCommand].indexes = append(batches[deleteManyCommand].indexes, i)
case *ReplaceOneModel, *UpdateOneModel:
batches[updateOneCommand].models = append(batches[updateOneCommand].models, model)
batches[updateOneCommand].indexes = append(batches[updateOneCommand].indexes, i)
case *UpdateManyModel:
batches[updateManyCommand].models = append(batches[updateManyCommand].models, model)
batches[updateManyCommand].indexes = append(batches[updateManyCommand].indexes, i)
}
}

Expand All @@ -424,7 +426,7 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
var prevKind writeCommandKind = -1
i := -1 // batch index

for _, model := range models {
for ind, model := range models {
var createNewBatch bool
var canRetry bool
var newKind writeCommandKind
Expand Down Expand Up @@ -455,13 +457,15 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
batches = append(batches, bulkWriteBatch{
models: []WriteModel{model},
canRetry: canRetry,
indexes: []int{ind},
})
i++
} else {
batches[i].models = append(batches[i].models, model)
if !canRetry {
batches[i].canRetry = false // don't make it true if it was already false
}
batches[i].indexes = append(batches[i].indexes, ind)
}

prevKind = newKind
Expand All @@ -470,15 +474,15 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
return batches
}

func (bw *bulkWrite) mergeResults(newResult BulkWriteResult, opIndex int64) {
func (bw *bulkWrite) mergeResults(newResult BulkWriteResult) {
bw.result.InsertedCount += newResult.InsertedCount
bw.result.MatchedCount += newResult.MatchedCount
bw.result.ModifiedCount += newResult.ModifiedCount
bw.result.DeletedCount += newResult.DeletedCount
bw.result.UpsertedCount += newResult.UpsertedCount

for index, upsertID := range newResult.UpsertedIDs {
bw.result.UpsertedIDs[index+opIndex] = upsertID
bw.result.UpsertedIDs[index] = upsertID
}
}

Expand Down
71 changes: 71 additions & 0 deletions mongo/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,77 @@ func TestCollection(t *testing.T) {
assert.Equal(mt, expectedModel, actualModel, "expected model %v in BulkWriteException, got %v",
expectedModel, actualModel)
})
mt.Run("unordered writeError index", func(mt *mtest.T) {
cappedOpts := bson.D{{"capped", true}, {"size", 64 * 1024}}
// Use a capped collection to get WriteErrors for delete operations
capped := mt.CreateCollection(mtest.Collection{
Name: "deleteOne_capped",
CreateOpts: cappedOpts,
}, true)
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id3"}}),
}
_, err := capped.BulkWrite(mtest.Background, models, options.BulkWrite())
assert.Nil(t, err, "BulkWrite error: %v", err)

// UpdateOne and ReplaceOne models are batched together, so they each appear once
models = []mongo.WriteModel{
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
mongo.NewDeleteManyModel().SetFilter(bson.D{{"_id", "id0"}}),
mongo.NewUpdateOneModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
mongo.NewDeleteManyModel().SetFilter(bson.D{{"_id", "id0"}}),
mongo.NewUpdateManyModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", "id3"}}).SetReplacement(bson.D{{"_id", 3.14159}}),
mongo.NewUpdateManyModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
}
_, err = capped.BulkWrite(mtest.Background, models, options.BulkWrite().SetOrdered(false))
bwException, ok := err.(mongo.BulkWriteException)
assert.True(mt, ok, "expected error of type %T, got %T", mongo.BulkWriteException{}, err)

assert.Equal(mt, len(bwException.WriteErrors), 10, "expected 10 writeErrors, got %v", len(bwException.WriteErrors))
for _, writeErr := range bwException.WriteErrors {
switch writeErr.Request.(type) {
case *mongo.DeleteOneModel:
assert.True(mt, writeErr.Index == 0 || writeErr.Index == 6,
"expected index 0 or 6, got %v", writeErr.Index)
case *mongo.DeleteManyModel:
assert.True(mt, writeErr.Index == 1 || writeErr.Index == 4,
"expected index 1 or 4, got %v", writeErr.Index)
case *mongo.UpdateManyModel:
assert.True(mt, writeErr.Index == 5 || writeErr.Index == 9,
"expected index 5 or 9, got %v", writeErr.Index)
case *mongo.InsertOneModel:
assert.True(mt, writeErr.Index == 3 || writeErr.Index == 7,
"expected index 3 or 7, got %v", writeErr.Index)
case *mongo.UpdateOneModel:
assert.Equal(mt, writeErr.Index, 2, "expected index 2, got %v", writeErr.Index)
case *mongo.ReplaceOneModel:
assert.Equal(mt, writeErr.Index, 8, "expected index 8, got %v", writeErr.Index)
}

}
})
mt.Run("unordered upsertID index", func(mt *mtest.T) {
id1 := "id1"
id3 := "id3"
models := []mongo.WriteModel{
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", id1}}).SetReplacement(bson.D{{"_id", id1}}).SetUpsert(true),
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id2"}}),
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", id3}}).SetReplacement(bson.D{{"_id", id3}}).SetUpsert(true),
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id4"}}),
}
res, err := mt.Coll.BulkWrite(mtest.Background, models, options.BulkWrite().SetOrdered(false))
assert.Nil(mt, err, "bulkwrite error: %v", err)

assert.Equal(mt, len(res.UpsertedIDs), 2, "expected 2 UpsertedIDs, got %v", len(res.UpsertedIDs))
assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1])
assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3])
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also going to ask whether there are tests of individual commands being split into multiple batches. Though that was addressed in GODRIVER-1363 and appears to be tested in c814cfb#diff-1572fa8947f9463542abdc5cb3a11528R204.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will be handled in GODRIVER-1715

unackClientOpts := options.Client().
SetWriteConcern(writeconcern.New(writeconcern.W(0)))
unackMtOpts := mtest.NewOptions().
Expand Down