Skip to content

Commit 88894be

Browse files
committed
GODRIVER-1028 return correct error indexes for bulkWrite (mongodb#483)
1 parent 3774acf commit 88894be

File tree

2 files changed

+88
-13
lines changed

2 files changed

+88
-13
lines changed

mongo/bulk_write.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
type bulkWriteBatch struct {
2323
models []WriteModel
2424
canRetry bool
25+
indexes []int
2526
}
2627

2728
// bulkWrite perfoms a bulkwrite operation
@@ -52,7 +53,6 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
5253
}
5354

5455
var lastErr error
55-
var opIndex int64 // the operation index for the upsertedIDs map
5656
continueOnError := !ordered
5757
for _, batch := range batches {
5858
if len(batch.models) == 0 {
@@ -66,13 +66,10 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
6666

6767
batchRes, batchErr, err := bw.runBatch(ctx, batch)
6868

69-
bw.mergeResults(batchRes, opIndex)
69+
bw.mergeResults(batchRes)
7070

7171
bwErr.WriteConcernError = batchErr.WriteConcernError
7272
bwErr.Labels = append(bwErr.Labels, batchErr.Labels...)
73-
for i := range batchErr.WriteErrors {
74-
batchErr.WriteErrors[i].Index = batchErr.WriteErrors[i].Index + int(opIndex)
75-
}
7673

7774
bwErr.WriteErrors = append(bwErr.WriteErrors, batchErr.WriteErrors...)
7875

@@ -89,8 +86,6 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
8986
if err != nil {
9087
lastErr = err
9188
}
92-
93-
opIndex += int64(len(batch.models))
9489
}
9590

9691
bw.result.MatchedCount -= bw.result.UpsertedCount
@@ -151,16 +146,18 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr
151146
batchRes.ModifiedCount = int64(res.NModified)
152147
batchRes.UpsertedCount = int64(len(res.Upserted))
153148
for _, upsert := range res.Upserted {
154-
batchRes.UpsertedIDs[upsert.Index] = upsert.ID
149+
batchRes.UpsertedIDs[int64(batch.indexes[upsert.Index])] = upsert.ID
155150
}
156151
}
157152

158153
batchErr.WriteErrors = make([]BulkWriteError, 0, len(writeErrors))
159154
convWriteErrors := writeErrorsFromDriverWriteErrors(writeErrors)
160155
for _, we := range convWriteErrors {
156+
request := batch.models[we.Index]
157+
we.Index = batch.indexes[we.Index]
161158
batchErr.WriteErrors = append(batchErr.WriteErrors, BulkWriteError{
162159
WriteError: we,
163-
Request: batch.models[we.Index],
160+
Request: request,
164161
})
165162
}
166163
return batchRes, batchErr, nil
@@ -401,18 +398,23 @@ func createBatches(models []WriteModel, ordered bool) []bulkWriteBatch {
401398
batches[updateOneCommand].canRetry = true
402399

403400
// TODO(GODRIVER-1157): fix batching once operation retryability is fixed
404-
for _, model := range models {
401+
for i, model := range models {
405402
switch model.(type) {
406403
case *InsertOneModel:
407404
batches[insertCommand].models = append(batches[insertCommand].models, model)
405+
batches[insertCommand].indexes = append(batches[insertCommand].indexes, i)
408406
case *DeleteOneModel:
409407
batches[deleteOneCommand].models = append(batches[deleteOneCommand].models, model)
408+
batches[deleteOneCommand].indexes = append(batches[deleteOneCommand].indexes, i)
410409
case *DeleteManyModel:
411410
batches[deleteManyCommand].models = append(batches[deleteManyCommand].models, model)
411+
batches[deleteManyCommand].indexes = append(batches[deleteManyCommand].indexes, i)
412412
case *ReplaceOneModel, *UpdateOneModel:
413413
batches[updateOneCommand].models = append(batches[updateOneCommand].models, model)
414+
batches[updateOneCommand].indexes = append(batches[updateOneCommand].indexes, i)
414415
case *UpdateManyModel:
415416
batches[updateManyCommand].models = append(batches[updateManyCommand].models, model)
417+
batches[updateManyCommand].indexes = append(batches[updateManyCommand].indexes, i)
416418
}
417419
}
418420

@@ -424,7 +426,7 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
424426
var prevKind writeCommandKind = -1
425427
i := -1 // batch index
426428

427-
for _, model := range models {
429+
for ind, model := range models {
428430
var createNewBatch bool
429431
var canRetry bool
430432
var newKind writeCommandKind
@@ -455,13 +457,15 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
455457
batches = append(batches, bulkWriteBatch{
456458
models: []WriteModel{model},
457459
canRetry: canRetry,
460+
indexes: []int{ind},
458461
})
459462
i++
460463
} else {
461464
batches[i].models = append(batches[i].models, model)
462465
if !canRetry {
463466
batches[i].canRetry = false // don't make it true if it was already false
464467
}
468+
batches[i].indexes = append(batches[i].indexes, ind)
465469
}
466470

467471
prevKind = newKind
@@ -470,15 +474,15 @@ func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
470474
return batches
471475
}
472476

473-
func (bw *bulkWrite) mergeResults(newResult BulkWriteResult, opIndex int64) {
477+
func (bw *bulkWrite) mergeResults(newResult BulkWriteResult) {
474478
bw.result.InsertedCount += newResult.InsertedCount
475479
bw.result.MatchedCount += newResult.MatchedCount
476480
bw.result.ModifiedCount += newResult.ModifiedCount
477481
bw.result.DeletedCount += newResult.DeletedCount
478482
bw.result.UpsertedCount += newResult.UpsertedCount
479483

480484
for index, upsertID := range newResult.UpsertedIDs {
481-
bw.result.UpsertedIDs[index+opIndex] = upsertID
485+
bw.result.UpsertedIDs[index] = upsertID
482486
}
483487
}
484488

mongo/integration/collection_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,77 @@ func TestCollection(t *testing.T) {
10671067
assert.Equal(mt, expectedModel, actualModel, "expected model %v in BulkWriteException, got %v",
10681068
expectedModel, actualModel)
10691069
})
1070+
mt.Run("unordered writeError index", func(mt *mtest.T) {
1071+
cappedOpts := bson.D{{"capped", true}, {"size", 64 * 1024}}
1072+
// Use a capped collection to get WriteErrors for delete operations
1073+
capped := mt.CreateCollection(mtest.Collection{
1074+
Name: "deleteOne_capped",
1075+
CreateOpts: cappedOpts,
1076+
}, true)
1077+
models := []mongo.WriteModel{
1078+
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
1079+
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id3"}}),
1080+
}
1081+
_, err := capped.BulkWrite(mtest.Background, models, options.BulkWrite())
1082+
assert.Nil(t, err, "BulkWrite error: %v", err)
1083+
1084+
// UpdateOne and ReplaceOne models are batched together, so they each appear once
1085+
models = []mongo.WriteModel{
1086+
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
1087+
mongo.NewDeleteManyModel().SetFilter(bson.D{{"_id", "id0"}}),
1088+
mongo.NewUpdateOneModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
1089+
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
1090+
mongo.NewDeleteManyModel().SetFilter(bson.D{{"_id", "id0"}}),
1091+
mongo.NewUpdateManyModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
1092+
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
1093+
mongo.NewInsertOneModel().SetDocument(bson.D{{"_id", "id1"}}),
1094+
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", "id3"}}).SetReplacement(bson.D{{"_id", 3.14159}}),
1095+
mongo.NewUpdateManyModel().SetFilter(bson.D{{"_id", "id3"}}).SetUpdate(bson.D{{"$set", bson.D{{"_id", 3.14159}}}}),
1096+
}
1097+
_, err = capped.BulkWrite(mtest.Background, models, options.BulkWrite().SetOrdered(false))
1098+
bwException, ok := err.(mongo.BulkWriteException)
1099+
assert.True(mt, ok, "expected error of type %T, got %T", mongo.BulkWriteException{}, err)
1100+
1101+
assert.Equal(mt, len(bwException.WriteErrors), 10, "expected 10 writeErrors, got %v", len(bwException.WriteErrors))
1102+
for _, writeErr := range bwException.WriteErrors {
1103+
switch writeErr.Request.(type) {
1104+
case *mongo.DeleteOneModel:
1105+
assert.True(mt, writeErr.Index == 0 || writeErr.Index == 6,
1106+
"expected index 0 or 6, got %v", writeErr.Index)
1107+
case *mongo.DeleteManyModel:
1108+
assert.True(mt, writeErr.Index == 1 || writeErr.Index == 4,
1109+
"expected index 1 or 4, got %v", writeErr.Index)
1110+
case *mongo.UpdateManyModel:
1111+
assert.True(mt, writeErr.Index == 5 || writeErr.Index == 9,
1112+
"expected index 5 or 9, got %v", writeErr.Index)
1113+
case *mongo.InsertOneModel:
1114+
assert.True(mt, writeErr.Index == 3 || writeErr.Index == 7,
1115+
"expected index 3 or 7, got %v", writeErr.Index)
1116+
case *mongo.UpdateOneModel:
1117+
assert.Equal(mt, writeErr.Index, 2, "expected index 2, got %v", writeErr.Index)
1118+
case *mongo.ReplaceOneModel:
1119+
assert.Equal(mt, writeErr.Index, 8, "expected index 8, got %v", writeErr.Index)
1120+
}
1121+
1122+
}
1123+
})
1124+
mt.Run("unordered upsertID index", func(mt *mtest.T) {
1125+
id1 := "id1"
1126+
id3 := "id3"
1127+
models := []mongo.WriteModel{
1128+
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id0"}}),
1129+
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", id1}}).SetReplacement(bson.D{{"_id", id1}}).SetUpsert(true),
1130+
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id2"}}),
1131+
mongo.NewReplaceOneModel().SetFilter(bson.D{{"_id", id3}}).SetReplacement(bson.D{{"_id", id3}}).SetUpsert(true),
1132+
mongo.NewDeleteOneModel().SetFilter(bson.D{{"_id", "id4"}}),
1133+
}
1134+
res, err := mt.Coll.BulkWrite(mtest.Background, models, options.BulkWrite().SetOrdered(false))
1135+
assert.Nil(mt, err, "bulkwrite error: %v", err)
1136+
1137+
assert.Equal(mt, len(res.UpsertedIDs), 2, "expected 2 UpsertedIDs, got %v", len(res.UpsertedIDs))
1138+
assert.Equal(mt, res.UpsertedIDs[1].(string), id1, "expected UpsertedIDs[1] to be %v, got %v", id1, res.UpsertedIDs[1])
1139+
assert.Equal(mt, res.UpsertedIDs[3].(string), id3, "expected UpsertedIDs[3] to be %v, got %v", id3, res.UpsertedIDs[3])
1140+
})
10701141
unackClientOpts := options.Client().
10711142
SetWriteConcern(writeconcern.New(writeconcern.W(0)))
10721143
unackMtOpts := mtest.NewOptions().

0 commit comments

Comments
 (0)