Skip to content

Commit 7ed5e70

Browse files
authored
Handle Socket IO interruptibility (#1189)
JAVA-4646
1 parent 481e793 commit 7ed5e70

File tree

5 files changed

+214
-18
lines changed

5 files changed

+214
-18
lines changed

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -422,11 +422,15 @@ void doMaintenance() {
422422
RuntimeException actualException = e instanceof MongoOpenConnectionInternalException
423423
? (RuntimeException) e.getCause()
424424
: e;
425-
sdamProvider.optional().ifPresent(sdam -> {
426-
if (!silentlyComplete.test(actualException)) {
427-
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(actualException, sdam.context(newConnection)));
428-
}
429-
});
425+
try {
426+
sdamProvider.optional().ifPresent(sdam -> {
427+
if (!silentlyComplete.test(actualException)) {
428+
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(actualException, sdam.context(newConnection)));
429+
}
430+
});
431+
} catch (Exception suppressed) {
432+
actualException.addSuppressed(suppressed);
433+
}
430434
throw actualException;
431435
}
432436
});

driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,13 @@ public Connection getConnection(final OperationContext operationContext) {
9595
return OperationCountTrackingConnection.decorate(this,
9696
connectionFactory.create(connectionPool.get(operationContext), new DefaultServerProtocolExecutor(), clusterConnectionMode));
9797
} catch (Throwable e) {
98-
operationEnd();
99-
if (e instanceof MongoException) {
100-
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(e, exceptionContext));
98+
try {
99+
operationEnd();
100+
if (e instanceof MongoException) {
101+
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(e, exceptionContext));
102+
}
103+
} catch (Exception suppressed) {
104+
e.addSuppressed(suppressed);
101105
}
102106
throw e;
103107
}
@@ -117,6 +121,8 @@ public void getConnectionAsync(final OperationContext operationContext, final Si
117121
try {
118122
operationEnd();
119123
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(t, exceptionContext));
124+
} catch (Exception suppressed) {
125+
t.addSuppressed(suppressed);
120126
} finally {
121127
callback.onResult(null, t);
122128
}
@@ -202,7 +208,11 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
202208
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext, clusterClock));
203209
return protocol.execute(connection);
204210
} catch (MongoException e) {
205-
sdam.handleExceptionAfterHandshake(SdamIssue.specific(e, sdam.context(connection)));
211+
try {
212+
sdam.handleExceptionAfterHandshake(SdamIssue.specific(e, sdam.context(connection)));
213+
} catch (Exception suppressed) {
214+
e.addSuppressed(suppressed);
215+
}
206216
if (e instanceof MongoWriteConcernWithResponseException) {
207217
return (T) ((MongoWriteConcernWithResponseException) e).getResponse();
208218
} else {
@@ -223,6 +233,8 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
223233
if (t != null) {
224234
try {
225235
sdam.handleExceptionAfterHandshake(SdamIssue.specific(t, sdam.context(connection)));
236+
} catch (Exception suppressed) {
237+
t.addSuppressed(suppressed);
226238
} finally {
227239
if (t instanceof MongoWriteConcernWithResponseException) {
228240
callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null);

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060

6161
import java.io.IOException;
6262
import java.io.InterruptedIOException;
63+
import java.net.SocketException;
6364
import java.net.SocketTimeoutException;
6465
import java.nio.channels.ClosedByInterruptException;
6566
import java.util.Collections;
@@ -705,10 +706,12 @@ private void updateSessionContext(final SessionContext sessionContext, final Res
705706
private MongoException translateWriteException(final Throwable e) {
706707
if (e instanceof MongoException) {
707708
return (MongoException) e;
709+
}
710+
MongoInterruptedException interruptedException = translateInterruptedExceptions(e, "Interrupted while sending message");
711+
if (interruptedException != null) {
712+
return interruptedException;
708713
} else if (e instanceof IOException) {
709714
return new MongoSocketWriteException("Exception sending message", getServerAddress(), e);
710-
} else if (e instanceof InterruptedException) {
711-
return new MongoInternalException("Thread interrupted exception", e);
712715
} else {
713716
return new MongoInternalException("Unexpected exception", e);
714717
}
@@ -717,23 +720,52 @@ private MongoException translateWriteException(final Throwable e) {
717720
private MongoException translateReadException(final Throwable e) {
718721
if (e instanceof MongoException) {
719722
return (MongoException) e;
723+
}
724+
MongoInterruptedException interruptedException = translateInterruptedExceptions(e, "Interrupted while receiving message");
725+
if (interruptedException != null) {
726+
return interruptedException;
720727
} else if (e instanceof SocketTimeoutException) {
721728
return new MongoSocketReadTimeoutException("Timeout while receiving message", getServerAddress(), e);
722-
} else if (e instanceof InterruptedIOException) {
723-
return new MongoInterruptedException("Interrupted while receiving message", (InterruptedIOException) e);
724-
} else if (e instanceof ClosedByInterruptException) {
725-
return new MongoInterruptedException("Interrupted while receiving message", (ClosedByInterruptException) e);
726729
} else if (e instanceof IOException) {
727730
return new MongoSocketReadException("Exception receiving message", getServerAddress(), e);
728731
} else if (e instanceof RuntimeException) {
729732
return new MongoInternalException("Unexpected runtime exception", e);
730-
} else if (e instanceof InterruptedException) {
731-
return new MongoInternalException("Interrupted exception", e);
732733
} else {
733734
return new MongoInternalException("Unexpected exception", e);
734735
}
735736
}
736737

738+
/**
739+
* @return {@code null} iff {@code e} does not communicate an interrupt.
740+
*/
741+
@Nullable
742+
private static MongoInterruptedException translateInterruptedExceptions(final Throwable e, final String message) {
743+
if (e instanceof InterruptedException) {
744+
// The interrupted status is cleared before throwing `InterruptedException`,
745+
// we are not propagating `InterruptedException`, and we do not own the current thread,
746+
// which means we must reinstate the interrupted status.
747+
Thread.currentThread().interrupt();
748+
return new MongoInterruptedException(message, (InterruptedException) e);
749+
} else if (
750+
// `InterruptedIOException` is weirdly documented, and almost seems to be a relic abandoned by the Java SE APIs:
751+
// - `SocketTimeoutException` is `InterruptedIOException`,
752+
// but it is not related to the Java SE interrupt mechanism. As a side note, it does not happen when writing.
753+
// - Java SE methods, where IO may indeed be interrupted via the Java SE interrupt mechanism,
754+
// use different exceptions, like `ClosedByInterruptException` or even `SocketException`.
755+
(e instanceof InterruptedIOException && !(e instanceof SocketTimeoutException))
756+
// see `java.nio.channels.InterruptibleChannel` and `java.net.Socket.getOutputStream`/`getInputStream`
757+
|| e instanceof ClosedByInterruptException
758+
// see `java.net.Socket.getOutputStream`/`getInputStream`
759+
|| (e instanceof SocketException && Thread.currentThread().isInterrupted())) {
760+
// The interrupted status is not cleared before throwing `ClosedByInterruptException`/`SocketException`,
761+
// so we do not need to reinstate it.
762+
// `InterruptedIOException` does not specify how it behaves with regard to the interrupted status, so we do nothing.
763+
return new MongoInterruptedException(message, (Exception) e);
764+
} else {
765+
return null;
766+
}
767+
}
768+
737769
private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) throws IOException {
738770
ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, additionalTimeout);
739771
MessageHeader messageHeader;

driver-core/src/main/com/mongodb/internal/connection/SocketStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOEx
138138
try {
139139
return read(numBytes);
140140
} finally {
141-
socket.setSoTimeout(curTimeout);
141+
if (!socket.isClosed()) {
142+
// `socket` may be closed if the current thread is virtual, and it is interrupted while reading
143+
socket.setSoTimeout(curTimeout);
144+
}
142145
}
143146
}
144147

driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.mongodb.internal.connection
1818

1919
import com.mongodb.MongoCommandException
2020
import com.mongodb.MongoInternalException
21+
import com.mongodb.MongoInterruptedException
2122
import com.mongodb.MongoNamespace
2223
import com.mongodb.MongoSocketClosedException
2324
import com.mongodb.MongoSocketException
@@ -54,6 +55,7 @@ import org.bson.codecs.configuration.CodecConfigurationException
5455
import spock.lang.Specification
5556

5657
import java.nio.ByteBuffer
58+
import java.nio.channels.ClosedByInterruptException
5759
import java.util.concurrent.CountDownLatch
5860
import java.util.concurrent.ExecutorService
5961
import java.util.concurrent.Executors
@@ -312,6 +314,149 @@ class InternalStreamConnectionSpecification extends Specification {
312314
connection.isClosed()
313315
}
314316

317+
def 'should throw MongoInterruptedException and leave the interrupt status set when Stream.write throws InterruptedIOException'() {
318+
given:
319+
stream.write(_) >> { throw new InterruptedIOException() }
320+
def connection = getOpenedConnection()
321+
Thread.currentThread().interrupt()
322+
323+
when:
324+
connection.sendMessage([new ByteBufNIO(ByteBuffer.allocate(1))], 1)
325+
326+
then:
327+
Thread.interrupted()
328+
thrown(MongoInterruptedException)
329+
connection.isClosed()
330+
}
331+
332+
def 'should throw MongoInterruptedException and leave the interrupt status unset when Stream.write throws InterruptedIOException'() {
333+
given:
334+
stream.write(_) >> { throw new InterruptedIOException() }
335+
def connection = getOpenedConnection()
336+
337+
when:
338+
connection.sendMessage([new ByteBufNIO(ByteBuffer.allocate(1))], 1)
339+
340+
then:
341+
!Thread.interrupted()
342+
thrown(MongoInterruptedException)
343+
connection.isClosed()
344+
}
345+
346+
def 'should throw MongoInterruptedException and leave the interrupt status set when Stream.write throws ClosedByInterruptException'() {
347+
given:
348+
stream.write(_) >> { throw new ClosedByInterruptException() }
349+
def connection = getOpenedConnection()
350+
Thread.currentThread().interrupt()
351+
352+
when:
353+
connection.sendMessage([new ByteBufNIO(ByteBuffer.allocate(1))], 1)
354+
355+
then:
356+
Thread.interrupted()
357+
thrown(MongoInterruptedException)
358+
connection.isClosed()
359+
}
360+
361+
def 'should throw MongoInterruptedException when Stream.write throws SocketException and the thread is interrupted'() {
362+
given:
363+
stream.write(_) >> { throw new SocketException() }
364+
def connection = getOpenedConnection()
365+
Thread.currentThread().interrupt()
366+
367+
when:
368+
connection.sendMessage([new ByteBufNIO(ByteBuffer.allocate(1))], 1)
369+
370+
then:
371+
Thread.interrupted()
372+
thrown(MongoInterruptedException)
373+
connection.isClosed()
374+
}
375+
376+
def 'should throw MongoSocketWriteException when Stream.write throws SocketException and the thread is not interrupted'() {
377+
given:
378+
stream.write(_) >> { throw new SocketException() }
379+
def connection = getOpenedConnection()
380+
381+
when:
382+
connection.sendMessage([new ByteBufNIO(ByteBuffer.allocate(1))], 1)
383+
384+
then:
385+
thrown(MongoSocketWriteException)
386+
connection.isClosed()
387+
}
388+
389+
def 'should throw MongoInterruptedException and leave the interrupt status set when Stream.read throws InterruptedIOException'() {
390+
given:
391+
stream.read(_, _) >> { throw new InterruptedIOException() }
392+
def connection = getOpenedConnection()
393+
Thread.currentThread().interrupt()
394+
395+
when:
396+
connection.receiveMessage(1)
397+
398+
then:
399+
Thread.interrupted()
400+
thrown(MongoInterruptedException)
401+
connection.isClosed()
402+
}
403+
404+
def 'should throw MongoInterruptedException and leave the interrupt status unset when Stream.read throws InterruptedIOException'() {
405+
given:
406+
stream.read(_, _) >> { throw new InterruptedIOException() }
407+
def connection = getOpenedConnection()
408+
409+
when:
410+
connection.receiveMessage(1)
411+
412+
then:
413+
!Thread.interrupted()
414+
thrown(MongoInterruptedException)
415+
connection.isClosed()
416+
}
417+
418+
def 'should throw MongoInterruptedException and leave the interrupt status set when Stream.read throws ClosedByInterruptException'() {
419+
given:
420+
stream.read(_, _) >> { throw new ClosedByInterruptException() }
421+
def connection = getOpenedConnection()
422+
Thread.currentThread().interrupt()
423+
424+
when:
425+
connection.receiveMessage(1)
426+
427+
then:
428+
Thread.interrupted()
429+
thrown(MongoInterruptedException)
430+
connection.isClosed()
431+
}
432+
433+
def 'should throw MongoInterruptedException when Stream.read throws SocketException and the thread is interrupted'() {
434+
given:
435+
stream.read(_, _) >> { throw new SocketException() }
436+
def connection = getOpenedConnection()
437+
Thread.currentThread().interrupt()
438+
439+
when:
440+
connection.receiveMessage(1)
441+
442+
then:
443+
Thread.interrupted()
444+
thrown(MongoInterruptedException)
445+
connection.isClosed()
446+
}
447+
448+
def 'should throw MongoSocketReadException when Stream.read throws SocketException and the thread is not interrupted'() {
449+
given:
450+
stream.read(_, _) >> { throw new SocketException() }
451+
def connection = getOpenedConnection()
452+
453+
when:
454+
connection.receiveMessage(1)
455+
456+
then:
457+
thrown(MongoSocketReadException)
458+
connection.isClosed()
459+
}
315460

316461
def 'should close the stream when reading the message header throws an exception asynchronously'() {
317462
given:

0 commit comments

Comments
 (0)