Skip to content

Commit 7fcbbee

Browse files
committed
PR fixes
1 parent 14b9fee commit 7fcbbee

File tree

8 files changed

+104
-36
lines changed

8 files changed

+104
-36
lines changed

driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.mongodb.annotations.Immutable;
2121
import com.mongodb.lang.Nullable;
2222

23-
import java.nio.channels.AsynchronousChannelGroup;
2423
import java.util.concurrent.ExecutorService;
2524

2625
import static com.mongodb.assertions.Assertions.notNull;
@@ -54,13 +53,17 @@ private Builder() {
5453
}
5554

5655
/**
57-
* Sets the executor service. This executor service will not be shut
58-
* down by the driver code, and must be shut down by application code.
56+
* The executor service, intended to be used exclusively by the mongo
57+
* client. Closing the mongo client will result in orderly shutdown
58+
* of the executor service.
59+
*
60+
* <p>When TLS is not enabled, see
61+
* {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
62+
* for additional requirements for the executor service.
5963
*
6064
* @param executorService the executor service
6165
* @return this
6266
* @see #getExecutorService()
63-
* @see AsynchronousChannelGroup#withThreadPool(ExecutorService)
6467
*/
6568
public Builder executorService(final ExecutorService executorService) {
6669
this.executorService = notNull("executorService", executorService);
@@ -89,8 +92,8 @@ public ExecutorService getExecutorService() {
8992

9093
@Override
9194
public String toString() {
92-
return "AsyncTransportSettings{" +
93-
"executorService=" + executorService +
94-
'}';
95+
return "AsyncTransportSettings{"
96+
+ "executorService=" + executorService
97+
+ '}';
9598
}
9699
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal;
18+
19+
import com.mongodb.internal.function.CheckedSupplier;
20+
21+
/**
22+
* <p>This class is not part of the public API and may be removed or changed at any time</p>
23+
*/
24+
public class ValueOrExceptionContainer<T> {
25+
private final T value;
26+
private final Exception exception;
27+
28+
public ValueOrExceptionContainer(final CheckedSupplier<T, Exception> supplier) {
29+
T value = null;
30+
Exception exception = null;
31+
try {
32+
value = supplier.get();
33+
} catch (Exception e) {
34+
exception = e;
35+
}
36+
this.value = value;
37+
this.exception = exception;
38+
}
39+
40+
public T get() throws Exception {
41+
if (isCompletedExceptionally()) {
42+
throw exception;
43+
}
44+
return value;
45+
}
46+
47+
public boolean isCompletedExceptionally() {
48+
return exception != null;
49+
}
50+
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mongodb.ServerAddress;
2222
import com.mongodb.connection.AsyncCompletionHandler;
2323
import com.mongodb.connection.SocketSettings;
24+
import com.mongodb.internal.ValueOrExceptionContainer;
2425
import com.mongodb.lang.Nullable;
2526
import com.mongodb.spi.dns.InetAddressResolver;
2627

@@ -33,7 +34,6 @@
3334
import java.nio.channels.CompletionHandler;
3435
import java.util.LinkedList;
3536
import java.util.Queue;
36-
import java.util.concurrent.ExecutorService;
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +49,7 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
4949
private final InetAddressResolver inetAddressResolver;
5050
private final SocketSettings settings;
5151
@Nullable
52-
private final ExecutorService executorService;
52+
private final ValueOrExceptionContainer<AsynchronousChannelGroup> group;
5353

5454
public AsynchronousSocketChannelStream(
5555
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
@@ -60,12 +60,12 @@ public AsynchronousSocketChannelStream(
6060
public AsynchronousSocketChannelStream(
6161
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
6262
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
63-
@Nullable final ExecutorService executorService) {
63+
@Nullable final ValueOrExceptionContainer<AsynchronousChannelGroup> group) {
6464
super(serverAddress, settings, bufferProvider);
6565
this.serverAddress = serverAddress;
6666
this.inetAddressResolver = inetAddressResolver;
6767
this.settings = settings;
68-
this.executorService = executorService;
68+
this.group = group;
6969
}
7070

7171
@Override
@@ -91,9 +91,8 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
9191

9292
try {
9393
AsynchronousSocketChannel attemptConnectionChannel;
94-
if (executorService != null) {
95-
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
96-
attemptConnectionChannel = AsynchronousSocketChannel.open(group);
94+
if (group != null) {
95+
attemptConnectionChannel = AsynchronousSocketChannel.open(group.get());
9796
} else {
9897
attemptConnectionChannel = AsynchronousSocketChannel.open();
9998
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.connection.SocketSettings;
2121
import com.mongodb.connection.SslSettings;
22+
import com.mongodb.internal.ValueOrExceptionContainer;
2223
import com.mongodb.lang.Nullable;
2324
import com.mongodb.spi.dns.InetAddressResolver;
2425

25-
import java.util.concurrent.ExecutorService;
26+
import java.nio.channels.AsynchronousChannelGroup;
2627

2728
import static com.mongodb.assertions.Assertions.assertFalse;
2829
import static com.mongodb.assertions.Assertions.notNull;
@@ -35,7 +36,7 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
3536
private final SocketSettings settings;
3637
private final InetAddressResolver inetAddressResolver;
3738
@Nullable
38-
private final ExecutorService executorService;
39+
private final ValueOrExceptionContainer<AsynchronousChannelGroup> group;
3940

4041
/**
4142
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
@@ -51,17 +52,17 @@ public AsynchronousSocketChannelStreamFactory(
5152

5253
AsynchronousSocketChannelStreamFactory(
5354
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
54-
final SslSettings sslSettings, @Nullable final ExecutorService executorService) {
55+
final SslSettings sslSettings, @Nullable final ValueOrExceptionContainer<AsynchronousChannelGroup> group) {
5556
assertFalse(sslSettings.isEnabled());
5657
this.inetAddressResolver = inetAddressResolver;
5758
this.settings = notNull("settings", settings);
58-
this.executorService = executorService;
59+
this.group = group;
5960
}
6061

6162
@Override
6263
public Stream create(final ServerAddress serverAddress) {
6364
return new AsynchronousSocketChannelStream(
64-
serverAddress, inetAddressResolver, settings, bufferProvider, executorService);
65+
serverAddress, inetAddressResolver, settings, bufferProvider, group);
6566
}
6667

6768
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
import com.mongodb.connection.SocketSettings;
2020
import com.mongodb.connection.SslSettings;
21+
import com.mongodb.internal.ValueOrExceptionContainer;
2122
import com.mongodb.lang.Nullable;
2223
import com.mongodb.spi.dns.InetAddressResolver;
2324

24-
import java.util.concurrent.ExecutorService;
25+
import java.nio.channels.AsynchronousChannelGroup;
2526

2627
/**
2728
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
@@ -31,26 +32,33 @@
3132
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
3233
private final InetAddressResolver inetAddressResolver;
3334
@Nullable
34-
private final ExecutorService executorService;
35+
private final ValueOrExceptionContainer<AsynchronousChannelGroup> group;
3536

3637
public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
3738
this(inetAddressResolver, null);
3839
}
3940

4041
AsynchronousSocketChannelStreamFactoryFactory(
4142
final InetAddressResolver inetAddressResolver,
42-
@Nullable final ExecutorService executorService) {
43+
@Nullable final ValueOrExceptionContainer<AsynchronousChannelGroup> group) {
4344
this.inetAddressResolver = inetAddressResolver;
44-
this.executorService = executorService;
45+
this.group = group;
4546
}
4647

4748
@Override
4849
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
4950
return new AsynchronousSocketChannelStreamFactory(
50-
inetAddressResolver, socketSettings, sslSettings, executorService);
51+
inetAddressResolver, socketSettings, sslSettings, group);
5152
}
5253

5354
@Override
5455
public void close() {
56+
if (group != null && !group.isCompletedExceptionally()) {
57+
try {
58+
group.get().shutdown();
59+
} catch (Exception e) {
60+
// will not occur, since it was not completed exceptionally
61+
}
62+
}
5563
}
5664
}

driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import com.mongodb.connection.NettyTransportSettings;
2323
import com.mongodb.connection.SocketSettings;
2424
import com.mongodb.connection.TransportSettings;
25+
import com.mongodb.internal.ValueOrExceptionContainer;
2526
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
2627
import com.mongodb.spi.dns.InetAddressResolver;
2728

29+
import java.nio.channels.AsynchronousChannelGroup;
2830
import java.util.concurrent.ExecutorService;
2931

3032
/**
@@ -57,7 +59,9 @@ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClien
5759
if (settings.getSslSettings().isEnabled()) {
5860
return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
5961
} else {
60-
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, executorService);
62+
ValueOrExceptionContainer<AsynchronousChannelGroup> group = new ValueOrExceptionContainer<>(
63+
() -> AsynchronousChannelGroup.withThreadPool(executorService));
64+
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group);
6165
}
6266
} else if (transportSettings instanceof NettyTransportSettings) {
6367
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);

driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public class AsynchronousTlsChannelGroup {
7373
private static final int queueLengthMultiplier = 32;
7474

7575
private static final AtomicInteger globalGroupCount = new AtomicInteger();
76-
private final boolean executorIsExternal;
7776

7877
class RegisteredSocket {
7978

@@ -210,11 +209,9 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi
210209
}
211210
timeoutExecutor.setRemoveOnCancelPolicy(true);
212211
if (executorService != null) {
213-
this.executorIsExternal = true;
214212
this.executor = executorService;
215213
} else {
216214
int nThreads = Runtime.getRuntime().availableProcessors();
217-
this.executorIsExternal = false;
218215
this.executor = new ThreadPoolExecutor(
219216
nThreads,
220217
nThreads,
@@ -424,9 +421,7 @@ private void loop() {
424421
} catch (Throwable e) {
425422
LOGGER.error("error in selector loop", e);
426423
} finally {
427-
if (!executorIsExternal) {
428-
executor.shutdown();
429-
}
424+
executor.shutdown();
430425
// use shutdownNow to stop delayed tasks
431426
timeoutExecutor.shutdownNow();
432427
try {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222
import com.mongodb.connection.TransportSettings;
2323
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
2424
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.ValueSource;
2527

2628
import java.util.concurrent.ExecutorService;
2729
import java.util.concurrent.Executors;
2830

2931
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
3032
import static org.mockito.ArgumentMatchers.any;
3133
import static org.mockito.Mockito.atLeastOnce;
32-
import static org.mockito.Mockito.never;
3334
import static org.mockito.Mockito.spy;
35+
import static org.mockito.Mockito.times;
3436
import static org.mockito.Mockito.verify;
3537

3638
class AsyncTransportSettingsTest {
@@ -51,20 +53,26 @@ void testAsyncTransportSettings() {
5153
verify(executorService, atLeastOnce()).execute(any());
5254
}
5355

54-
@Test
55-
void testExternalExecutorNotShutDown() {
56+
@ParameterizedTest
57+
@ValueSource(booleans = {true, false})
58+
void testExternalExecutorWasShutDown(final boolean tlsEnabled) {
5659
ExecutorService executorService = spy(Executors.newFixedThreadPool(5));
5760
AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder()
5861
.executorService(executorService)
5962
.build();
6063
MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder()
61-
.applyToSslSettings(builder -> builder.enabled(true))
64+
.applyToSslSettings(builder -> builder.enabled(tlsEnabled))
6265
.transportSettings(asyncTransportSettings)
6366
.build();
6467

6568
try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) {
6669
// ignored
6770
}
68-
verify(executorService, never()).shutdown();
71+
try {
72+
Thread.sleep(100);
73+
} catch (InterruptedException e) {
74+
throw new RuntimeException(e);
75+
}
76+
verify(executorService, times(1)).shutdown();
6977
}
7078
}

0 commit comments

Comments
 (0)