Skip to content

Commit 76115bd

Browse files
committed
Slightly refactor MixedBulkWriteOperation and BulkWriteBatch
1 parent 81402ae commit 76115bd

File tree

2 files changed

+52
-33
lines changed

2 files changed

+52
-33
lines changed

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoBulkWriteException;
20-
import com.mongodb.MongoClientException;
2120
import com.mongodb.MongoInternalException;
2221
import com.mongodb.MongoNamespace;
2322
import com.mongodb.WriteConcern;
@@ -65,6 +64,7 @@
6564
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
6665
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
6766
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
67+
import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern;
6868
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6969
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
7070
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
@@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
101101
final List<? extends WriteRequest> writeRequests,
102102
final OperationContext operationContext,
103103
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
104-
SessionContext sessionContext = operationContext.getSessionContext();
105-
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction()
106-
&& !writeConcern.isAcknowledged()) {
107-
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
108-
}
109-
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
104+
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext());
110105
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
111106
boolean writeRequestsAreRetryable = true;
112107
for (int i = 0; i < writeRequests.size(); i++) {
@@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
169164
if (!payloadItems.isEmpty()) {
170165
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName()));
171166
command.put("ordered", new BsonBoolean(ordered));
172-
if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) {
173-
command.put("writeConcern", writeConcern.asDocument());
174-
}
167+
commandWriteConcern(writeConcern, sessionContext).ifPresent(value ->
168+
command.put("writeConcern", value.asDocument()));
175169
if (bypassDocumentValidation != null) {
176170
command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation));
177171
}

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.MongoClientException;
1920
import com.mongodb.MongoException;
2021
import com.mongodb.MongoNamespace;
2122
import com.mongodb.WriteConcern;
@@ -191,8 +192,8 @@ public BulkWriteResult execute(final WriteBinding binding) {
191192
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
192193
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
193194
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
194-
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
195-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
195+
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
196+
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) {
196197
handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext);
197198
}
198199
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
@@ -201,7 +202,7 @@ public BulkWriteResult execute(final WriteBinding binding) {
201202
connectionDescription, ordered, writeConcern,
202203
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
203204
}
204-
return executeBulkWriteBatch(retryState, binding, connection);
205+
return executeBulkWriteBatch(retryState, writeConcern, binding, connection);
205206
})
206207
);
207208
try {
@@ -226,8 +227,8 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
226227
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
227228
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
228229
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
229-
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
230-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
230+
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
231+
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)
231232
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContext)) {
232233
return;
233234
}
@@ -245,13 +246,17 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
245246
releasingCallback.onResult(null, t);
246247
return;
247248
}
248-
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback);
249+
executeBulkWriteBatchAsync(retryState, writeConcern, binding, connection, releasingCallback);
249250
})
250251
).whenComplete(binding::release);
251252
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER)));
252253
}
253254

254-
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) {
255+
private BulkWriteResult executeBulkWriteBatch(
256+
final RetryState retryState,
257+
final WriteConcern effectiveWriteConcern,
258+
final WriteBinding binding,
259+
final Connection connection) {
255260
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
256261
.orElseThrow(Assertions::fail);
257262
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
@@ -261,7 +266,7 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
261266

262267
while (currentBatch.shouldProcessBatch()) {
263268
try {
264-
BsonDocument result = executeCommand(operationContext, connection, currentBatch);
269+
BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch);
265270
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
266271
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
267272
connection.getDescription().getServerAddress(), "errMsg", timeoutContext);
@@ -295,7 +300,11 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
295300
}
296301
}
297302

298-
private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection,
303+
private void executeBulkWriteBatchAsync(
304+
final RetryState retryState,
305+
final WriteConcern effectiveWriteConcern,
306+
final AsyncWriteBinding binding,
307+
final AsyncConnection connection,
299308
final SingleResultCallback<BulkWriteResult> callback) {
300309
LoopState loopState = new LoopState();
301310
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> {
@@ -309,7 +318,7 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
309318
}
310319
OperationContext operationContext = binding.getOperationContext();
311320
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
312-
executeCommandAsync(operationContext, connection, currentBatch, (result, t) -> {
321+
executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> {
313322
if (t == null) {
314323
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
315324
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
@@ -405,31 +414,47 @@ private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetrySta
405414
}
406415

407416
@Nullable
408-
private BsonDocument executeCommand(final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) {
417+
private BsonDocument executeCommand(
418+
final WriteConcern effectiveWriteConcern,
419+
final OperationContext operationContext,
420+
final Connection connection,
421+
final BulkWriteBatch batch) {
409422
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
410-
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
423+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
411424
batch.getPayload(), batch.getFieldNameValidator());
412425
}
413426

414-
private void executeCommandAsync(final OperationContext operationContext, final AsyncConnection connection, final BulkWriteBatch batch,
427+
private void executeCommandAsync(
428+
final WriteConcern effectiveWriteConcern,
429+
final OperationContext operationContext,
430+
final AsyncConnection connection,
431+
final BulkWriteBatch batch,
415432
final SingleResultCallback<BsonDocument> callback) {
416433
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
417-
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
434+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
418435
batch.getPayload(), batch.getFieldNameValidator(), callback);
419436
}
420437

421-
private WriteConcern getAppliedWriteConcern(final SessionContext sessionContext) {
422-
if (sessionContext.hasActiveTransaction()) {
423-
return WriteConcern.ACKNOWLEDGED;
424-
} else {
425-
return writeConcern;
438+
private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext)
439+
throws MongoClientException {
440+
boolean activeTransaction = sessionContext.hasActiveTransaction();
441+
WriteConcern effectiveWriteConcern = activeTransaction
442+
? WriteConcern.ACKNOWLEDGED
443+
: writeConcernSetting;
444+
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) {
445+
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
426446
}
447+
return effectiveWriteConcern;
427448
}
428449

429-
private boolean shouldAcknowledge(final BulkWriteBatch batch, final SessionContext sessionContext) {
430-
return ordered
431-
? batch.hasAnotherBatch() || getAppliedWriteConcern(sessionContext).isAcknowledged()
432-
: getAppliedWriteConcern(sessionContext).isAcknowledged();
450+
static Optional<WriteConcern> commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) {
451+
return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction()
452+
? Optional.empty()
453+
: Optional.of(effectiveWriteConcern);
454+
}
455+
456+
private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {
457+
return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch());
433458
}
434459

435460
private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set<String> errorLabels) {

0 commit comments

Comments
 (0)