Skip to content

Commit c814cfb

Browse files
author
iwysiu
authored
GODRIVER-1363 fix indexes for writeErrors with batches and filter ret… (#362)
1 parent a0461a4 commit c814cfb

File tree

3 files changed

+97
-1
lines changed

3 files changed

+97
-1
lines changed

mongo/collection.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,25 @@ func (coll *Collection) insert(ctx context.Context, documents []interface{},
293293
}
294294
op = op.Retry(retry)
295295

296-
return result, op.Execute(ctx)
296+
err = op.Execute(ctx)
297+
wce, ok := err.(driver.WriteCommandError)
298+
if !ok {
299+
return result, err
300+
}
301+
302+
// remove the ids that had writeErrors from result
303+
for i, we := range wce.WriteErrors {
304+
// i indexes have been removed before the current error, so the index is we.Index-i
305+
idIndex := int(we.Index) - i
306+
// if the insert is ordered, nothing after the error was inserted
307+
if imo.Ordered == nil || *imo.Ordered {
308+
result = result[:idIndex]
309+
break
310+
}
311+
result = append(result[:idIndex], result[idIndex+1:]...)
312+
}
313+
314+
return result, err
297315
}
298316

299317
// InsertOne executes an insert command to insert a single document into the collection.
@@ -367,6 +385,7 @@ func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
367385
nil,
368386
})
369387
}
388+
370389
return imResult, BulkWriteException{
371390
WriteErrors: bwErrors,
372391
WriteConcernError: writeException.WriteConcernError,

mongo/integration/collection_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,74 @@ func TestCollection(t *testing.T) {
164164
})
165165
}
166166
})
167+
mt.Run("return only inserted ids", func(mt *mtest.T) {
168+
id := int32(11)
169+
docs := []interface{}{
170+
bson.D{{"_id", id}},
171+
bson.D{{"_id", id}},
172+
bson.D{{"x", 6}},
173+
bson.D{{"_id", id}},
174+
}
175+
176+
testCases := []struct {
177+
name string
178+
ordered bool
179+
numInserted int
180+
numErrors int
181+
}{
182+
{"unordered", false, 2, 2},
183+
{"ordered", true, 1, 1},
184+
}
185+
for _, tc := range testCases {
186+
mt.Run(tc.name, func(mt *mtest.T) {
187+
res, err := mt.Coll.InsertMany(mtest.Background, docs, options.InsertMany().SetOrdered(tc.ordered))
188+
189+
assert.Equal(mt, tc.numInserted, len(res.InsertedIDs), "expected %v inserted IDs, got %v", tc.numInserted, len(res.InsertedIDs))
190+
assert.Equal(mt, id, res.InsertedIDs[0], "expected inserted ID %v, got %v", id, res.InsertedIDs[0])
191+
if tc.numInserted > 1 {
192+
assert.NotNil(mt, res.InsertedIDs[1], "expected ID but got nil")
193+
}
194+
195+
we, ok := err.(mongo.BulkWriteException)
196+
assert.True(mt, ok, "expected error type %T, got %T", mongo.BulkWriteException{}, err)
197+
numErrors := len(we.WriteErrors)
198+
assert.Equal(mt, tc.numErrors, numErrors, "expected %v write errors, got %v", tc.numErrors, numErrors)
199+
gotCode := we.WriteErrors[0].Code
200+
assert.Equal(mt, errorDuplicateKey, gotCode, "expected error code %v, got %v", errorDuplicateKey, gotCode)
201+
})
202+
}
203+
})
204+
mt.Run("writeError index", func(mt *mtest.T) {
205+
// TODO(GODRIVER-425): remove this as part a larger project to
206+
// refactor integration and other longrunning tasks.
207+
if os.Getenv("EVR_TASK_ID") == "" {
208+
mt.Skip("skipping long running integration test outside of evergreen")
209+
}
210+
211+
// force multiple batches
212+
numDocs := 700000
213+
var docs []interface{}
214+
for i := 0; i < numDocs; i++ {
215+
d := bson.D{
216+
{"a", int32(i)},
217+
{"b", int32(i * 2)},
218+
{"c", int32(i * 3)},
219+
}
220+
docs = append(docs, d)
221+
}
222+
repeated := bson.D{{"_id", int32(11)}}
223+
docs = append(docs, repeated, repeated)
224+
225+
_, err := mt.Coll.InsertMany(context.Background(), docs)
226+
assert.NotNil(mt, err, "expected InsertMany error, got nil")
227+
228+
we, ok := err.(mongo.BulkWriteException)
229+
assert.True(mt, ok, "expected error type %T, got %T", mongo.BulkWriteException{}, err)
230+
numErrors := len(we.WriteErrors)
231+
assert.Equal(mt, 1, numErrors, "expected 1 write error, got %v", numErrors)
232+
gotIndex := we.WriteErrors[0].Index
233+
assert.Equal(mt, numDocs+1, gotIndex, "expected index %v, got %v", numDocs+1, gotIndex)
234+
})
167235
wcCollOpts := options.Collection().SetWriteConcern(impossibleWc)
168236
wcTestOpts := mtest.NewOptions().CollectionOptions(wcCollOpts).Topologies(mtest.ReplicaSet)
169237
mt.RunOpts("write concern error", wcTestOpts, func(mt *mtest.T) {

x/mongo/driver/operation.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
305305
}
306306
batching := op.Batches.Valid()
307307
retryEnabled := op.RetryMode != nil && op.RetryMode.Enabled()
308+
currIndex := 0
308309
for {
309310
if batching {
310311
targetBatchSize := desc.MaxDocumentSize
@@ -430,6 +431,13 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
430431
}
431432
continue
432433
}
434+
435+
if batching && len(tt.WriteErrors) > 0 && currIndex > 0 {
436+
for i := range tt.WriteErrors {
437+
tt.WriteErrors[i].Index += int64(currIndex)
438+
}
439+
}
440+
433441
// If batching is enabled and either ordered is the default (which is true) or
434442
// explicitly set to true and we have write errors, return the errors.
435443
if batching && (op.Batches.Ordered == nil || *op.Batches.Ordered == true) && len(tt.WriteErrors) > 0 {
@@ -526,6 +534,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
526534
retries = 1
527535
}
528536
}
537+
currIndex += len(op.Batches.Current)
529538
op.Batches.ClearBatch()
530539
continue
531540
}

0 commit comments

Comments
 (0)