Skip to content

Commit 4fdb03d

Browse files
committed
Change CommandMessage.requireOpMsgResponse such that it accounts for ordered/unordered bulk writes
1 parent 56bb9cc commit 4fdb03d

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ private boolean requireOpMsgResponse() {
253253
if (responseExpected) {
254254
return true;
255255
} else {
256-
return payload != null && payload.hasAnotherSplit();
256+
return payload != null && payload.isOrdered() && payload.hasAnotherSplit();
257257
}
258258
}
259259

driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public final class SplittablePayload {
5757
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
5858
private final Type payloadType;
5959
private final List<WriteRequestWithIndex> writeRequestWithIndexes;
60+
private final boolean ordered;
6061
private final Map<Integer, BsonValue> insertedIds = new HashMap<>();
6162
private int position = 0;
6263

@@ -91,9 +92,10 @@ public enum Type {
9192
* @param payloadType the payload type
9293
* @param writeRequestWithIndexes the writeRequests
9394
*/
94-
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes) {
95+
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered) {
9596
this.payloadType = notNull("batchType", payloadType);
9697
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
98+
this.ordered = ordered;
9799
}
98100

99101
/**
@@ -159,13 +161,17 @@ public boolean hasAnotherSplit() {
159161
return writeRequestWithIndexes.size() > position;
160162
}
161163

164+
boolean isOrdered() {
165+
return ordered;
166+
}
167+
162168
/**
163169
* @return a new SplittablePayload containing only the values after the current position.
164170
*/
165171
public SplittablePayload getNextSplit() {
166172
isTrue("hasAnotherSplit", hasAnotherSplit());
167173
List<WriteRequestWithIndex> nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
168-
return new SplittablePayload(payloadType, nextPayLoad);
174+
return new SplittablePayload(payloadType, nextPayLoad, ordered);
169175
}
170176

171177
/**

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
154154

155155
this.indexMap = indexMap;
156156
this.unprocessed = unprocessedItems;
157-
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems);
157+
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
158158
this.operationContext = operationContext;
159159
this.comment = comment;
160160
this.variables = variables;

0 commit comments

Comments
 (0)