Skip to content

Commit d6e3799

Browse files
authored
Add throwTranslatedWriteException, refactoring, async helper (#1379)
JAVA-5379
1 parent 2412cbd commit d6e3799

File tree

5 files changed

+441
-328
lines changed

5 files changed

+441
-328
lines changed

driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,42 @@ default AsyncRunnable thenRun(final AsyncRunnable runnable) {
178178
};
179179
}
180180

181+
/**
182+
* The error check checks if the exception is an instance of the provided class.
183+
* @see #thenRunTryCatchAsyncBlocks(AsyncRunnable, java.util.function.Predicate, AsyncFunction)
184+
*/
185+
default <T extends Throwable> AsyncRunnable thenRunTryCatchAsyncBlocks(
186+
final AsyncRunnable runnable,
187+
final Class<T> exceptionClass,
188+
final AsyncFunction<Throwable, Void> errorFunction) {
189+
return thenRunTryCatchAsyncBlocks(runnable, e -> exceptionClass.isInstance(e), errorFunction);
190+
}
191+
192+
/**
193+
* Convenience method corresponding to a try-catch block in sync code.
194+
* This MUST be used to properly handle cases where there is code above
195+
* the block, whose errors must not be caught by an ensuing
196+
* {@link #onErrorIf(java.util.function.Predicate, AsyncFunction)}.
197+
*
198+
* @param runnable corresponds to the contents of the try block
199+
* @param errorCheck for matching on an error (or, a more complex condition)
200+
* @param errorFunction corresponds to the contents of the catch block
201+
* @return the composition of this runnable, a runnable that runs the
202+
* provided runnable, followed by (composed with) the error function, which
203+
* is conditional on there being an exception meeting the error check.
204+
*/
205+
default AsyncRunnable thenRunTryCatchAsyncBlocks(
206+
final AsyncRunnable runnable,
207+
final Predicate<Throwable> errorCheck,
208+
final AsyncFunction<Throwable, Void> errorFunction) {
209+
return this.thenRun(c -> {
210+
beginAsync()
211+
.thenRun(runnable)
212+
.onErrorIf(errorCheck, errorFunction)
213+
.finish(c);
214+
});
215+
}
216+
181217
/**
182218
* @param condition the condition to check
183219
* @param runnable The async runnable to run after this runnable,

driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ static BsonDocument executeCommandWithoutCheckingForFailure(final String databas
6161
static void executeCommandAsync(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode,
6262
@Nullable final ServerApi serverApi, final InternalConnection internalConnection,
6363
final SingleResultCallback<BsonDocument> callback) {
64-
internalConnection.sendAndReceiveAsync(getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi),
64+
internalConnection.sendAndReceiveAsync(
65+
getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi),
6566
new BsonDocumentCodec(),
6667
NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE, new OperationContext(), (result, t) -> {
6768
if (t != null) {

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -633,19 +633,34 @@ private <T> T getCommandResult(final Decoder<T> decoder, final ResponseBuffers r
633633
@Override
634634
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
635635
notNull("stream is open", stream);
636-
637636
if (isClosed()) {
638637
throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress());
639638
}
640-
641639
try {
642640
stream.write(byteBuffers);
643641
} catch (Exception e) {
644642
close();
645-
throw translateWriteException(e);
643+
throwTranslatedWriteException(e);
646644
}
647645
}
648646

647+
@Override
648+
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId,
649+
final SingleResultCallback<Void> callback) {
650+
beginAsync().thenRun((c) -> {
651+
notNull("stream is open", stream);
652+
if (isClosed()) {
653+
throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress());
654+
}
655+
c.complete(c);
656+
}).thenRunTryCatchAsyncBlocks(c -> {
657+
stream.writeAsync(byteBuffers, c.asHandler());
658+
}, Exception.class, (e, c) -> {
659+
close();
660+
throwTranslatedWriteException(e);
661+
}).finish(errorHandlingCallback(callback, LOGGER));
662+
}
663+
649664
@Override
650665
public ResponseBuffers receiveMessage(final int responseTo) {
651666
assertNotNull(stream);
@@ -665,39 +680,6 @@ private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additional
665680
}
666681
}
667682

668-
@Override
669-
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId,
670-
final SingleResultCallback<Void> callback) {
671-
assertNotNull(stream);
672-
673-
if (isClosed()) {
674-
callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", getServerAddress()));
675-
return;
676-
}
677-
678-
writeAsync(byteBuffers, errorHandlingCallback(callback, LOGGER));
679-
}
680-
681-
private void writeAsync(final List<ByteBuf> byteBuffers, final SingleResultCallback<Void> callback) {
682-
try {
683-
stream.writeAsync(byteBuffers, new AsyncCompletionHandler<Void>() {
684-
@Override
685-
public void completed(@Nullable final Void v) {
686-
callback.onResult(null, null);
687-
}
688-
689-
@Override
690-
public void failed(final Throwable t) {
691-
close();
692-
callback.onResult(null, translateWriteException(t));
693-
}
694-
});
695-
} catch (Throwable t) {
696-
close();
697-
callback.onResult(null, t);
698-
}
699-
}
700-
701683
@Override
702684
public void receiveMessageAsync(final int responseTo, final SingleResultCallback<ResponseBuffers> callback) {
703685
assertNotNull(stream);
@@ -762,6 +744,10 @@ private void updateSessionContext(final SessionContext sessionContext, final Res
762744
}
763745
}
764746

747+
private void throwTranslatedWriteException(final Throwable e) {
748+
throw translateWriteException(e);
749+
}
750+
765751
private MongoException translateWriteException(final Throwable e) {
766752
if (e instanceof MongoException) {
767753
return (MongoException) e;

0 commit comments

Comments
 (0)