Skip to content

Commit 31c5859

Browse files
committed
Close created ChannelPools in close method
1 parent 5ae402b commit 31c5859

File tree

4 files changed

+58
-5
lines changed

4 files changed

+58
-5
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO Async HTTP Client",
3+
"type": "bugfix",
4+
"description": "Close created `ChannelPool`s in `close()` method."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;
2323
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.WRITE_TIMEOUT;
2424
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
25+
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
2526

2627
import io.netty.bootstrap.Bootstrap;
2728
import io.netty.channel.ChannelOption;
2829
import io.netty.channel.EventLoopGroup;
2930
import io.netty.channel.pool.ChannelPool;
30-
import io.netty.channel.pool.ChannelPoolMap;
3131
import io.netty.handler.codec.http2.Http2SecurityUtil;
3232
import io.netty.handler.ssl.SslContext;
3333
import io.netty.handler.ssl.SslContextBuilder;
@@ -39,6 +39,9 @@
3939
import java.util.concurrent.atomic.AtomicReference;
4040
import javax.net.ssl.SSLException;
4141
import javax.net.ssl.TrustManagerFactory;
42+
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4245
import software.amazon.awssdk.annotations.SdkPublicApi;
4346
import software.amazon.awssdk.http.Protocol;
4447
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
@@ -67,8 +70,10 @@
6770
*/
6871
@SdkPublicApi
6972
public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
73+
private static final Logger log = LoggerFactory.getLogger(NettyNioAsyncHttpClient.class);
74+
7075
private final SdkEventLoopGroup sdkEventLoopGroup;
71-
private final ChannelPoolMap<URI, ChannelPool> pools;
76+
private final SdkChannelPoolMap<URI, ChannelPool> pools;
7277
private final SdkChannelOptions sdkChannelOptions;
7378
private final NettyConfiguration configuration;
7479
private final long maxStreams;
@@ -134,7 +139,7 @@ private TrustManagerFactory getTrustManager() {
134139
return configuration.trustAllCertificates() ? InsecureTrustManagerFactory.INSTANCE : null;
135140
}
136141

137-
private ChannelPoolMap<URI, ChannelPool> createChannelPoolMap() {
142+
private SdkChannelPoolMap<URI, ChannelPool> createChannelPoolMap() {
138143
return new SdkChannelPoolMap<URI, ChannelPool>() {
139144
@Override
140145
protected ChannelPool newPool(URI key) {
@@ -166,7 +171,8 @@ private SdkEventLoopGroup nonManagedEventLoopGroup(SdkEventLoopGroup eventLoopGr
166171

167172
@Override
168173
public void close() {
169-
sdkEventLoopGroup.eventLoopGroup().shutdownGracefully();
174+
pools.forEach(e -> runAndLogError(log, "Unable to close channel pool for " + e.getKey(), e.getValue()::close));
175+
runAndLogError(log, "Unable to shutdown event loop", sdkEventLoopGroup.eventLoopGroup()::shutdownGracefully);
170176
}
171177

172178
/**

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/SdkEventLoopGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public final class SdkEventLoopGroup {
5252
private final EventLoopGroup eventLoopGroup;
5353
private final ChannelFactory<? extends Channel> channelFactory;
5454

55-
private SdkEventLoopGroup(EventLoopGroup eventLoopGroup, ChannelFactory<? extends Channel> channelFactory) {
55+
SdkEventLoopGroup(EventLoopGroup eventLoopGroup, ChannelFactory<? extends Channel> channelFactory) {
5656
Validate.paramNotNull(eventLoopGroup, "eventLoopGroup");
5757
Validate.paramNotNull(channelFactory, "channelFactory");
5858
this.eventLoopGroup = eventLoopGroup;

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientWireMockTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener;
4545
import com.github.tomakehurst.wiremock.junit.WireMockRule;
4646
import io.netty.channel.ChannelFactory;
47+
import io.netty.channel.ChannelFuture;
4748
import io.netty.channel.EventLoopGroup;
4849
import io.netty.channel.nio.NioEventLoopGroup;
4950
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -284,6 +285,47 @@ public void requestContentOnlyEqualToContentLengthHeaderFromProvider() throws In
284285
assertThat(wiremockTrafficListener.requests.toString()).endsWith(content);
285286
}
286287

288+
@Test
289+
public void closeMethodClosesOpenedChannels() throws InterruptedException, TimeoutException, ExecutionException {
290+
String body = randomAlphabetic(10);
291+
URI uri = URI.create("https://localhost:" + mockServer.httpsPort());
292+
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withHeader("Some-Header", "With Value").withBody(body)));
293+
294+
SdkHttpFullRequest request = createRequest(uri, "/", body, SdkHttpMethod.POST, Collections.emptyMap());
295+
RecordingResponseHandler recorder = new RecordingResponseHandler();
296+
297+
CompletableFuture<Boolean> channelClosedFuture = new CompletableFuture<>();
298+
ChannelFactory<NioSocketChannel> channelFactory = new ChannelFactory<NioSocketChannel>() {
299+
@Override
300+
public NioSocketChannel newChannel() {
301+
return new NioSocketChannel() {
302+
@Override
303+
public ChannelFuture close() {
304+
ChannelFuture cf = super.close();
305+
channelClosedFuture.complete(true);
306+
return cf;
307+
}
308+
};
309+
}
310+
};
311+
312+
SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder()
313+
.eventLoopGroup(new SdkEventLoopGroup(new NioEventLoopGroup(1), channelFactory))
314+
.buildWithDefaults(mapWithTrustAllCerts());
315+
316+
try {
317+
customClient.execute(AsyncExecuteRequest.builder()
318+
.request(request)
319+
.requestContentPublisher(createProvider(body))
320+
.responseHandler(recorder).build())
321+
.join();
322+
} finally {
323+
customClient.close();
324+
}
325+
326+
assertThat(channelClosedFuture.get(5, TimeUnit.SECONDS)).isTrue();
327+
}
328+
287329
private void assertCanReceiveBasicRequest(URI uri, String body) throws Exception {
288330
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withHeader("Some-Header", "With Value").withBody(body)));
289331

0 commit comments

Comments
 (0)