Skip to content

Commit 44ca0f4

Browse files
committed
Await h2 connection releases before closing pool
We must wait for the connection release to be completed (it's an asynchronous operation) before closing the pool, otherwise it could be left dangling and lead to a socket leak.
1 parent 911496d commit 44ca0f4

File tree

4 files changed

+170
-1
lines changed

4 files changed

+170
-1
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"type": "bugfix",
4+
"description": "Fix a bug where it's possible for an HTTP2 channel pool to be closed while some channels are still being released causing them to be left open and leaked."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import io.netty.util.concurrent.Future;
2727
import io.netty.util.concurrent.Promise;
2828
import java.util.ArrayList;
29+
import java.util.Collection;
30+
2931
import software.amazon.awssdk.annotations.SdkInternalApi;
32+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
3033
import software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool;
3134

3235
/**
@@ -48,6 +51,7 @@ public class Http2MultiplexedChannelPool implements ChannelPool {
4851
private final ChannelPool connectionPool;
4952
private final long maxConcurrencyPerConnection;
5053
private final ArrayList<MultiplexedChannelRecord> connections;
54+
private boolean closed = false;
5155

5256
/**
5357
* @param connectionPool Connection pool for parent channels (i.e. the socket channel).
@@ -65,6 +69,17 @@ public class Http2MultiplexedChannelPool implements ChannelPool {
6569
this.connections = new ArrayList<>();
6670
}
6771

72+
@SdkTestInternalApi
73+
Http2MultiplexedChannelPool(ChannelPool connectionPool,
74+
EventLoop eventLoop,
75+
long maxConcurrencyPerConnection,
76+
Collection<MultiplexedChannelRecord> connections) {
77+
this.connectionPool = connectionPool;
78+
this.eventLoop = eventLoop;
79+
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection;
80+
this.connections = new ArrayList<>(connections);
81+
}
82+
6883
@Override
6984
public Future<Channel> acquire() {
7085
return acquire(new DefaultPromise<>(eventLoop));
@@ -77,6 +92,10 @@ public Future<Channel> acquire(Promise<Channel> promise) {
7792
}
7893

7994
private Future<Channel> acquire0(Promise<Channel> promise) {
95+
if (closed) {
96+
return promise.setFailure(new IllegalStateException("Channel pool is closed!"));
97+
}
98+
8099
for (MultiplexedChannelRecord connection : connections) {
81100
if (connection.availableStreams() > 0) {
82101
connection.acquire(promise);
@@ -150,7 +169,27 @@ private void releaseParentChannel(Channel parentChannel) {
150169

151170
@Override
152171
public void close() {
153-
doInEventLoop(eventLoop, connectionPool::close);
172+
try {
173+
setClosedFlag().await();
174+
for (MultiplexedChannelRecord c : connections) {
175+
Future<Channel> f = c.getConnectionFuture();
176+
f.await();
177+
if (f.isSuccess()) {
178+
connectionPool.release(f.getNow()).await();
179+
}
180+
}
181+
connectionPool.close();
182+
} catch (InterruptedException ie) {
183+
throw new RuntimeException(ie);
184+
}
154185
}
155186

187+
private Promise<Void> setClosedFlag() {
188+
Promise<Void> closedFuture = eventLoop.newPromise();
189+
doInEventLoop(eventLoop, () -> {
190+
closed = true;
191+
closedFuture.setSuccess(null);
192+
});
193+
return closedFuture;
194+
}
156195
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/MultiplexedChannelRecord.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.atomic.AtomicLong;
3535
import java.util.function.BiConsumer;
36+
3637
import software.amazon.awssdk.annotations.SdkInternalApi;
38+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
3739
import software.amazon.awssdk.http.Protocol;
3840

3941
/**
@@ -64,6 +66,18 @@ public final class MultiplexedChannelRecord {
6466
this.channelReleaser = channelReleaser;
6567
}
6668

69+
@SdkTestInternalApi
70+
MultiplexedChannelRecord(Future<Channel> connectionFuture,
71+
Channel connection,
72+
long maxConcurrencyPerConnection,
73+
BiConsumer<Channel, MultiplexedChannelRecord> channelReleaser) {
74+
this.connectionFuture = connectionFuture;
75+
this.childChannels = new ConcurrentHashMap<>(saturatedCast(maxConcurrencyPerConnection));
76+
this.availableStreams = new AtomicLong(maxConcurrencyPerConnection);
77+
this.channelReleaser = channelReleaser;
78+
this.connection = connection;
79+
}
80+
6781
MultiplexedChannelRecord acquire(Promise<Channel> channelPromise) {
6882
availableStreams.decrementAndGet();
6983
if (connection != null) {
@@ -140,6 +154,10 @@ void release(Channel channel) {
140154
childChannels.remove(channel.id());
141155
}
142156

157+
public Future<Channel> getConnectionFuture() {
158+
return connectionFuture;
159+
}
160+
143161
long availableStreams() {
144162
return availableStreams.get();
145163
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal.http2;
17+
18+
import io.netty.channel.Channel;
19+
import io.netty.channel.EventLoopGroup;
20+
import io.netty.channel.nio.NioEventLoopGroup;
21+
import io.netty.channel.pool.ChannelPool;
22+
import io.netty.channel.socket.SocketChannel;
23+
import io.netty.channel.socket.nio.NioSocketChannel;
24+
import io.netty.util.concurrent.DefaultPromise;
25+
import io.netty.util.concurrent.Promise;
26+
import org.junit.AfterClass;
27+
import org.junit.BeforeClass;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.mockito.InOrder;
31+
import org.mockito.Mockito;
32+
import org.mockito.runners.MockitoJUnitRunner;
33+
34+
import java.util.Collections;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
39+
/**
40+
* Tests for {@link Http2MultiplexedChannelPool}.
41+
*/
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class Http2MultiplexedChannelPoolTest {
44+
private static EventLoopGroup loopGroup;
45+
46+
@BeforeClass
47+
public static void setup() {
48+
loopGroup = new NioEventLoopGroup(4);
49+
}
50+
51+
@AfterClass
52+
public static void teardown() {
53+
loopGroup.shutdownGracefully().awaitUninterruptibly();
54+
}
55+
56+
@Test
57+
public void closeWaitsForConnectionToBeReleasedBeforeClosingConnectionPool() throws InterruptedException {
58+
SocketChannel channel = new NioSocketChannel();
59+
try {
60+
loopGroup.register(channel).awaitUninterruptibly();
61+
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
62+
channelPromise.setSuccess(channel);
63+
64+
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
65+
Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next()));
66+
Mockito.doCallRealMethod().when(releasePromise).await();
67+
releasePromise.setSuccess(null);
68+
Mockito.when(connectionPool.release(Mockito.eq(channel))).thenReturn(releasePromise);
69+
70+
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channelPromise,
71+
channel,
72+
8,
73+
(ch, rec) -> {});
74+
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.singletonList(record));
75+
76+
h2Pool.close();
77+
78+
InOrder inOrder = Mockito.inOrder(connectionPool, releasePromise);
79+
inOrder.verify(releasePromise).await();
80+
inOrder.verify(connectionPool).close();
81+
} finally {
82+
channel.close().awaitUninterruptibly();
83+
}
84+
}
85+
86+
@Test
87+
public void acquireAfterCloseFails() throws InterruptedException {
88+
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
89+
90+
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList());
91+
92+
h2Pool.close();
93+
94+
assertThat(h2Pool.acquire().await().isSuccess()).isFalse();
95+
}
96+
97+
@Test
98+
public void releaseAfterCloseFails() throws InterruptedException {
99+
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
100+
101+
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList());
102+
103+
h2Pool.close();
104+
105+
assertThat(h2Pool.release(Mockito.mock(Channel.class)).await().isSuccess()).isFalse();
106+
}
107+
}

0 commit comments

Comments
 (0)