Skip to content

Refactor MixedBulkWriteOperation a bit and change CommandMessage.isResponseExpected such that it accounts for ordered/unordered bulk writes #1481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.ReadPreference.primaryPreferred;
import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ClusterConnectionMode.LOAD_BALANCED;
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
Expand Down Expand Up @@ -112,6 +113,7 @@ public final class CommandMessage extends RequestMessage {
this.payloadFieldNameValidator = payloadFieldNameValidator;
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
assertTrue(useOpMsg() || responseExpected);
}

/**
Expand Down Expand Up @@ -187,7 +189,11 @@ private String getSequenceIdentifier(final ByteBuf byteBuf) {
}

boolean isResponseExpected() {
return !useOpMsg() || requireOpMsgResponse();
if (responseExpected) {
return true;
} else {
return payload != null && payload.isOrdered() && payload.hasAnotherSplit();
}
}

MongoNamespace getNamespace() {
Expand Down Expand Up @@ -240,7 +246,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu

private int getOpMsgFlagBits() {
int flagBits = 0;
if (!requireOpMsgResponse()) {
if (!isResponseExpected()) {
flagBits = 1 << 1;
}
if (exhaustAllowed) {
Expand All @@ -249,14 +255,6 @@ private int getOpMsgFlagBits() {
return flagBits;
}

private boolean requireOpMsgResponse() {
if (responseExpected) {
return true;
} else {
return payload != null && payload.hasAnotherSplit();
}
}

private boolean isDirectConnectionToReplicaSetMember() {
return clusterConnectionMode == SINGLE
&& getSettings().getServerType() != SHARD_ROUTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.SplittablePayload.Type.INSERT;
Expand All @@ -57,6 +58,7 @@ public final class SplittablePayload {
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
private final Type payloadType;
private final List<WriteRequestWithIndex> writeRequestWithIndexes;
private final boolean ordered;
private final Map<Integer, BsonValue> insertedIds = new HashMap<>();
private int position = 0;

Expand Down Expand Up @@ -91,9 +93,10 @@ public enum Type {
* @param payloadType the payload type
* @param writeRequestWithIndexes the writeRequests
*/
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes) {
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered) {
this.payloadType = notNull("batchType", payloadType);
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
this.ordered = ordered;
}

/**
Expand All @@ -117,7 +120,7 @@ public String getPayloadName() {
}

boolean hasPayload() {
return writeRequestWithIndexes.size() > 0;
return !writeRequestWithIndexes.isEmpty();
}

public int size() {
Expand All @@ -137,10 +140,6 @@ public List<BsonDocument> getPayload() {
.collect(Collectors.toList());
}

public List<WriteRequestWithIndex> getWriteRequestWithIndexes() {
return writeRequestWithIndexes;
}

/**
* @return the current position in the payload
*/
Expand All @@ -160,16 +159,22 @@ public void setPosition(final int position) {
* @return true if there are more values after the current position
*/
public boolean hasAnotherSplit() {
// this method must be not called before this payload having been encoded
assertTrue(position > 0);
return writeRequestWithIndexes.size() > position;
}

boolean isOrdered() {
return ordered;
}

/**
* @return a new SplittablePayload containing only the values after the current position.
*/
public SplittablePayload getNextSplit() {
isTrue("hasAnotherSplit", hasAnotherSplit());
List<WriteRequestWithIndex> nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
return new SplittablePayload(payloadType, nextPayLoad);
return new SplittablePayload(payloadType, nextPayLoad, ordered);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.mongodb.internal.operation;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoClientException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
Expand Down Expand Up @@ -65,6 +64,7 @@
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
Expand Down Expand Up @@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
final List<? extends WriteRequest> writeRequests,
final OperationContext operationContext,
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
SessionContext sessionContext = operationContext.getSessionContext();
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction()
&& !writeConcern.isAcknowledged()) {
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
}
Comment on lines -105 to -108
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation was moved to MixedBulkWriteOperation.validateAndGetEffectiveWriteConcern.

boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext());
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
boolean writeRequestsAreRetryable = true;
for (int i = 0; i < writeRequests.size(); i++) {
Expand Down Expand Up @@ -159,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti

this.indexMap = indexMap;
this.unprocessed = unprocessedItems;
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems);
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
this.operationContext = operationContext;
this.comment = comment;
this.variables = variables;
Expand All @@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
if (!payloadItems.isEmpty()) {
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName()));
command.put("ordered", new BsonBoolean(ordered));
if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) {
command.put("writeConcern", writeConcern.asDocument());
}
Comment on lines -172 to -174
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic needs to be reused in the implementation of the new bulkWrite command, so I extracted it into MixedBulkWriteOperation.commandWriteConcern.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it eventually move to some common "helper" class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no such plans, as I don't see any value in creating a somewhat useless class just to hold this method, such that both MixedBulkWriteOperation and ClientBulkWriteOperation could call commandWriteConcern on that class. Calling commandWriteConcern on MixedBulkWriteOperation seems to be not worse in any way, especially given that when it is statically imported, one can't even notice immediately see where the method comes from.

commandWriteConcern(writeConcern, sessionContext).ifPresent(value ->
command.put("writeConcern", value.asDocument()));
if (bypassDocumentValidation != null) {
command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.operation;

import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
Expand Down Expand Up @@ -191,8 +192,8 @@ public BulkWriteResult execute(final WriteBinding binding) {
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) {
handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext);
}
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
Expand All @@ -201,7 +202,7 @@ public BulkWriteResult execute(final WriteBinding binding) {
connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
}
return executeBulkWriteBatch(retryState, binding, connection);
return executeBulkWriteBatch(retryState, writeConcern, binding, connection);
})
);
try {
Expand All @@ -226,8 +227,8 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContext)) {
return;
}
Expand All @@ -245,13 +246,17 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
releasingCallback.onResult(null, t);
return;
}
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback);
executeBulkWriteBatchAsync(retryState, writeConcern, binding, connection, releasingCallback);
})
).whenComplete(binding::release);
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER)));
}

private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) {
private BulkWriteResult executeBulkWriteBatch(
final RetryState retryState,
final WriteConcern effectiveWriteConcern,
final WriteBinding binding,
final Connection connection) {
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
.orElseThrow(Assertions::fail);
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
Expand All @@ -261,7 +266,7 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final

while (currentBatch.shouldProcessBatch()) {
try {
BsonDocument result = executeCommand(operationContext, connection, currentBatch);
BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch);
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
connection.getDescription().getServerAddress(), "errMsg", timeoutContext);
Expand Down Expand Up @@ -295,7 +300,11 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
}
}

private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection,
private void executeBulkWriteBatchAsync(
final RetryState retryState,
final WriteConcern effectiveWriteConcern,
final AsyncWriteBinding binding,
final AsyncConnection connection,
final SingleResultCallback<BulkWriteResult> callback) {
LoopState loopState = new LoopState();
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> {
Expand All @@ -309,7 +318,7 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
}
OperationContext operationContext = binding.getOperationContext();
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
executeCommandAsync(operationContext, connection, currentBatch, (result, t) -> {
executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> {
if (t == null) {
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
Expand Down Expand Up @@ -405,31 +414,47 @@ private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetrySta
}

@Nullable
private BsonDocument executeCommand(final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) {
private BsonDocument executeCommand(
final WriteConcern effectiveWriteConcern,
final OperationContext operationContext,
final Connection connection,
final BulkWriteBatch batch) {
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
batch.getPayload(), batch.getFieldNameValidator());
}

private void executeCommandAsync(final OperationContext operationContext, final AsyncConnection connection, final BulkWriteBatch batch,
private void executeCommandAsync(
final WriteConcern effectiveWriteConcern,
final OperationContext operationContext,
final AsyncConnection connection,
final BulkWriteBatch batch,
final SingleResultCallback<BsonDocument> callback) {
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
batch.getPayload(), batch.getFieldNameValidator(), callback);
}

private WriteConcern getAppliedWriteConcern(final SessionContext sessionContext) {
if (sessionContext.hasActiveTransaction()) {
return WriteConcern.ACKNOWLEDGED;
} else {
return writeConcern;
private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext)
throws MongoClientException {
boolean activeTransaction = sessionContext.hasActiveTransaction();
WriteConcern effectiveWriteConcern = activeTransaction
? WriteConcern.ACKNOWLEDGED
: writeConcernSetting;
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) {
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
}
return effectiveWriteConcern;
}

private boolean shouldAcknowledge(final BulkWriteBatch batch, final SessionContext sessionContext) {
return ordered
? batch.hasAnotherBatch() || getAppliedWriteConcern(sessionContext).isAcknowledged()
: getAppliedWriteConcern(sessionContext).isAcknowledged();
static Optional<WriteConcern> commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) {
return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction()
? Optional.empty()
: Optional.of(effectiveWriteConcern);
}

private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {
return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch());
}
Comment on lines +456 to 458
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldExpectResponse replaces the shouldAcknowledge method. The new method has simpler body and named in a way that avoids erroneous allusions to an unacknowledged (w: 0) write concern.


private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set<String> errorLabels) {
Expand Down
Loading