-
Notifications
You must be signed in to change notification settings - Fork 915
Await h2 connection releases before closing pool #950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"category": "Netty NIO HTTP Client", | ||
"type": "bugfix", | ||
"description": "Fix a bug where it's possible for an HTTP2 channel pool to be closed while some channels are still being released causing them to be left open and leaked." | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,10 @@ | |
import io.netty.util.concurrent.Future; | ||
import io.netty.util.concurrent.Promise; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
|
||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.annotations.SdkTestInternalApi; | ||
import software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool; | ||
|
||
/** | ||
|
@@ -48,6 +51,7 @@ public class Http2MultiplexedChannelPool implements ChannelPool { | |
private final ChannelPool connectionPool; | ||
private final long maxConcurrencyPerConnection; | ||
private final ArrayList<MultiplexedChannelRecord> connections; | ||
private boolean closed = false; | ||
|
||
/** | ||
* @param connectionPool Connection pool for parent channels (i.e. the socket channel). | ||
|
@@ -65,6 +69,17 @@ public class Http2MultiplexedChannelPool implements ChannelPool { | |
this.connections = new ArrayList<>(); | ||
} | ||
|
||
@SdkTestInternalApi | ||
Http2MultiplexedChannelPool(ChannelPool connectionPool, | ||
EventLoop eventLoop, | ||
long maxConcurrencyPerConnection, | ||
Collection<MultiplexedChannelRecord> connections) { | ||
this.connectionPool = connectionPool; | ||
this.eventLoop = eventLoop; | ||
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection; | ||
this.connections = new ArrayList<>(connections); | ||
} | ||
|
||
@Override | ||
public Future<Channel> acquire() { | ||
return acquire(new DefaultPromise<>(eventLoop)); | ||
|
@@ -77,6 +92,10 @@ public Future<Channel> acquire(Promise<Channel> promise) { | |
} | ||
|
||
private Future<Channel> acquire0(Promise<Channel> promise) { | ||
if (closed) { | ||
return promise.setFailure(new IllegalStateException("Channel pool is closed!")); | ||
} | ||
|
||
for (MultiplexedChannelRecord connection : connections) { | ||
if (connection.availableStreams() > 0) { | ||
connection.acquire(promise); | ||
|
@@ -150,7 +169,27 @@ private void releaseParentChannel(Channel parentChannel) { | |
|
||
@Override | ||
public void close() { | ||
doInEventLoop(eventLoop, connectionPool::close); | ||
try { | ||
setClosedFlag().await(); | ||
for (MultiplexedChannelRecord c : connections) { | ||
Future<Channel> f = c.getConnectionFuture(); | ||
f.await(); | ||
if (f.isSuccess()) { | ||
connectionPool.release(f.getNow()).await(); | ||
} | ||
} | ||
connectionPool.close(); | ||
} catch (InterruptedException ie) { | ||
throw new RuntimeException(ie); | ||
} | ||
} | ||
|
||
private Promise<Void> setClosedFlag() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an advantage to doing it this way versus using an AtomicBoolean or something? Just to let the acquires submitted before close called to get a chance to acquire? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I wanted to make sure acquire and close serialized so it's not possible to close while in the middle of an acquire. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, that makes sense. |
||
Promise<Void> closedFuture = eventLoop.newPromise(); | ||
doInEventLoop(eventLoop, () -> { | ||
closed = true; | ||
closedFuture.setSuccess(null); | ||
}); | ||
return closedFuture; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.http.nio.netty.internal.http2; | ||
|
||
import io.netty.channel.Channel; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.nio.NioEventLoopGroup; | ||
import io.netty.channel.pool.ChannelPool; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioSocketChannel; | ||
import io.netty.util.concurrent.DefaultPromise; | ||
import io.netty.util.concurrent.Promise; | ||
import org.junit.AfterClass; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.InOrder; | ||
import org.mockito.Mockito; | ||
import org.mockito.runners.MockitoJUnitRunner; | ||
|
||
import java.util.Collections; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
|
||
/** | ||
* Tests for {@link Http2MultiplexedChannelPool}. | ||
*/ | ||
@RunWith(MockitoJUnitRunner.class) | ||
public class Http2MultiplexedChannelPoolTest { | ||
private static EventLoopGroup loopGroup; | ||
|
||
@BeforeClass | ||
public static void setup() { | ||
loopGroup = new NioEventLoopGroup(4); | ||
} | ||
|
||
@AfterClass | ||
public static void teardown() { | ||
loopGroup.shutdownGracefully().awaitUninterruptibly(); | ||
} | ||
|
||
@Test | ||
public void closeWaitsForConnectionToBeReleasedBeforeClosingConnectionPool() throws InterruptedException { | ||
SocketChannel channel = new NioSocketChannel(); | ||
try { | ||
loopGroup.register(channel).awaitUninterruptibly(); | ||
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next()); | ||
channelPromise.setSuccess(channel); | ||
|
||
ChannelPool connectionPool = Mockito.mock(ChannelPool.class); | ||
Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next())); | ||
Mockito.doCallRealMethod().when(releasePromise).await(); | ||
releasePromise.setSuccess(null); | ||
Mockito.when(connectionPool.release(Mockito.eq(channel))).thenReturn(releasePromise); | ||
|
||
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channelPromise, | ||
channel, | ||
8, | ||
(ch, rec) -> {}); | ||
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.singletonList(record)); | ||
|
||
h2Pool.close(); | ||
|
||
InOrder inOrder = Mockito.inOrder(connectionPool, releasePromise); | ||
inOrder.verify(releasePromise).await(); | ||
inOrder.verify(connectionPool).close(); | ||
} finally { | ||
channel.close().awaitUninterruptibly(); | ||
} | ||
} | ||
|
||
@Test | ||
public void acquireAfterCloseFails() throws InterruptedException { | ||
ChannelPool connectionPool = Mockito.mock(ChannelPool.class); | ||
|
||
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList()); | ||
|
||
h2Pool.close(); | ||
|
||
assertThat(h2Pool.acquire().await().isSuccess()).isFalse(); | ||
} | ||
|
||
@Test | ||
public void releaseAfterCloseFails() throws InterruptedException { | ||
ChannelPool connectionPool = Mockito.mock(ChannelPool.class); | ||
|
||
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList()); | ||
|
||
h2Pool.close(); | ||
|
||
assertThat(h2Pool.release(Mockito.mock(Channel.class)).await().isSuccess()).isFalse(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future improvement, we can make this a decorator to reuse this functionality for HTTP/1.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good