-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Refactor MixedBulkWriteOperation
a bit and change CommandMessage.isResponseExpected
such that it accounts for ordered/unordered bulk writes
#1481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
76115bd
56bb9cc
4fdb03d
b96210f
77ff590
020698b
47fe8e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,6 @@ | |
package com.mongodb.internal.operation; | ||
|
||
import com.mongodb.MongoBulkWriteException; | ||
import com.mongodb.MongoClientException; | ||
import com.mongodb.MongoInternalException; | ||
import com.mongodb.MongoNamespace; | ||
import com.mongodb.WriteConcern; | ||
|
@@ -65,6 +64,7 @@ | |
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE; | ||
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE; | ||
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; | ||
import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern; | ||
import static com.mongodb.internal.operation.OperationHelper.LOGGER; | ||
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; | ||
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError; | ||
|
@@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace, | |
final List<? extends WriteRequest> writeRequests, | ||
final OperationContext operationContext, | ||
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) { | ||
SessionContext sessionContext = operationContext.getSessionContext(); | ||
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction() | ||
&& !writeConcern.isAcknowledged()) { | ||
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session"); | ||
} | ||
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext); | ||
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext()); | ||
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>(); | ||
boolean writeRequestsAreRetryable = true; | ||
for (int i = 0; i < writeRequests.size(); i++) { | ||
|
@@ -159,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti | |
|
||
this.indexMap = indexMap; | ||
this.unprocessed = unprocessedItems; | ||
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems); | ||
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered); | ||
this.operationContext = operationContext; | ||
this.comment = comment; | ||
this.variables = variables; | ||
|
@@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti | |
if (!payloadItems.isEmpty()) { | ||
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName())); | ||
command.put("ordered", new BsonBoolean(ordered)); | ||
if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) { | ||
command.put("writeConcern", writeConcern.asDocument()); | ||
} | ||
Comment on lines
-172
to
-174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic needs to be reused in the implementation of the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it eventually move to some common "helper" class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had no such plans, as I don't see any value in creating a somewhat useless class just to hold this method, such that both |
||
commandWriteConcern(writeConcern, sessionContext).ifPresent(value -> | ||
command.put("writeConcern", value.asDocument())); | ||
if (bypassDocumentValidation != null) { | ||
command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation)); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
package com.mongodb.internal.operation; | ||
|
||
import com.mongodb.MongoClientException; | ||
import com.mongodb.MongoException; | ||
import com.mongodb.MongoNamespace; | ||
import com.mongodb.WriteConcern; | ||
|
@@ -191,8 +192,8 @@ public BulkWriteResult execute(final WriteBinding binding) { | |
// attach `maxWireVersion` ASAP because it is used to check whether we can retry | ||
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); | ||
SessionContext sessionContext = binding.getOperationContext().getSessionContext(); | ||
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); | ||
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) { | ||
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); | ||
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) { | ||
handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext); | ||
} | ||
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern); | ||
|
@@ -201,7 +202,7 @@ public BulkWriteResult execute(final WriteBinding binding) { | |
connectionDescription, ordered, writeConcern, | ||
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext); | ||
} | ||
return executeBulkWriteBatch(retryState, binding, connection); | ||
return executeBulkWriteBatch(retryState, writeConcern, binding, connection); | ||
}) | ||
); | ||
try { | ||
|
@@ -226,8 +227,8 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall | |
// attach `maxWireVersion` ASAP because it is used to check whether we can retry | ||
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); | ||
SessionContext sessionContext = binding.getOperationContext().getSessionContext(); | ||
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); | ||
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext) | ||
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext); | ||
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext) | ||
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContext)) { | ||
return; | ||
} | ||
|
@@ -245,13 +246,17 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba | |
releasingCallback.onResult(null, t); | ||
return; | ||
} | ||
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback); | ||
executeBulkWriteBatchAsync(retryState, writeConcern, binding, connection, releasingCallback); | ||
}) | ||
).whenComplete(binding::release); | ||
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER))); | ||
} | ||
|
||
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) { | ||
private BulkWriteResult executeBulkWriteBatch( | ||
final RetryState retryState, | ||
final WriteConcern effectiveWriteConcern, | ||
final WriteBinding binding, | ||
final Connection connection) { | ||
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) | ||
.orElseThrow(Assertions::fail); | ||
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); | ||
|
@@ -261,7 +266,7 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final | |
|
||
while (currentBatch.shouldProcessBatch()) { | ||
try { | ||
BsonDocument result = executeCommand(operationContext, connection, currentBatch); | ||
BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch); | ||
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { | ||
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, | ||
connection.getDescription().getServerAddress(), "errMsg", timeoutContext); | ||
|
@@ -295,7 +300,11 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final | |
} | ||
} | ||
|
||
private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection, | ||
private void executeBulkWriteBatchAsync( | ||
final RetryState retryState, | ||
final WriteConcern effectiveWriteConcern, | ||
final AsyncWriteBinding binding, | ||
final AsyncConnection connection, | ||
final SingleResultCallback<BulkWriteResult> callback) { | ||
LoopState loopState = new LoopState(); | ||
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> { | ||
|
@@ -309,7 +318,7 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async | |
} | ||
OperationContext operationContext = binding.getOperationContext(); | ||
TimeoutContext timeoutContext = operationContext.getTimeoutContext(); | ||
executeCommandAsync(operationContext, connection, currentBatch, (result, t) -> { | ||
executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> { | ||
if (t == null) { | ||
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) { | ||
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result, | ||
|
@@ -405,31 +414,47 @@ private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetrySta | |
} | ||
|
||
@Nullable | ||
private BsonDocument executeCommand(final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) { | ||
private BsonDocument executeCommand( | ||
final WriteConcern effectiveWriteConcern, | ||
final OperationContext operationContext, | ||
final Connection connection, | ||
final BulkWriteBatch batch) { | ||
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), | ||
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()), | ||
operationContext, shouldExpectResponse(batch, effectiveWriteConcern), | ||
batch.getPayload(), batch.getFieldNameValidator()); | ||
} | ||
|
||
private void executeCommandAsync(final OperationContext operationContext, final AsyncConnection connection, final BulkWriteBatch batch, | ||
private void executeCommandAsync( | ||
final WriteConcern effectiveWriteConcern, | ||
final OperationContext operationContext, | ||
final AsyncConnection connection, | ||
final BulkWriteBatch batch, | ||
final SingleResultCallback<BsonDocument> callback) { | ||
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), | ||
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()), | ||
operationContext, shouldExpectResponse(batch, effectiveWriteConcern), | ||
batch.getPayload(), batch.getFieldNameValidator(), callback); | ||
} | ||
|
||
private WriteConcern getAppliedWriteConcern(final SessionContext sessionContext) { | ||
if (sessionContext.hasActiveTransaction()) { | ||
return WriteConcern.ACKNOWLEDGED; | ||
} else { | ||
return writeConcern; | ||
private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext) | ||
throws MongoClientException { | ||
boolean activeTransaction = sessionContext.hasActiveTransaction(); | ||
WriteConcern effectiveWriteConcern = activeTransaction | ||
? WriteConcern.ACKNOWLEDGED | ||
: writeConcernSetting; | ||
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) { | ||
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session"); | ||
} | ||
return effectiveWriteConcern; | ||
} | ||
|
||
private boolean shouldAcknowledge(final BulkWriteBatch batch, final SessionContext sessionContext) { | ||
return ordered | ||
? batch.hasAnotherBatch() || getAppliedWriteConcern(sessionContext).isAcknowledged() | ||
: getAppliedWriteConcern(sessionContext).isAcknowledged(); | ||
static Optional<WriteConcern> commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) { | ||
return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction() | ||
? Optional.empty() | ||
: Optional.of(effectiveWriteConcern); | ||
} | ||
|
||
private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) { | ||
return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch()); | ||
} | ||
Comment on lines
+456
to
458
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set<String> errorLabels) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validation was moved to
MixedBulkWriteOperation.validateAndGetEffectiveWriteConcern
.