Skip to content

Await h2 connection releases before closing pool #950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO HTTP Client",
"type": "bugfix",
"description": "Fix a bug where it's possible for an HTTP2 channel pool to be closed while some channels are still being released causing them to be left open and leaked."
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
import java.util.Collection;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool;

/**
Expand All @@ -48,6 +51,7 @@ public class Http2MultiplexedChannelPool implements ChannelPool {
private final ChannelPool connectionPool;
private final long maxConcurrencyPerConnection;
private final ArrayList<MultiplexedChannelRecord> connections;
private boolean closed = false;

/**
* @param connectionPool Connection pool for parent channels (i.e. the socket channel).
Expand All @@ -65,6 +69,17 @@ public class Http2MultiplexedChannelPool implements ChannelPool {
this.connections = new ArrayList<>();
}

@SdkTestInternalApi
Http2MultiplexedChannelPool(ChannelPool connectionPool,
EventLoop eventLoop,
long maxConcurrencyPerConnection,
Collection<MultiplexedChannelRecord> connections) {
this.connectionPool = connectionPool;
this.eventLoop = eventLoop;
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection;
this.connections = new ArrayList<>(connections);
}

@Override
public Future<Channel> acquire() {
return acquire(new DefaultPromise<>(eventLoop));
Expand All @@ -77,6 +92,10 @@ public Future<Channel> acquire(Promise<Channel> promise) {
}

private Future<Channel> acquire0(Promise<Channel> promise) {
if (closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future improvement, we can make this a decorator to reuse this functionality for HTTP/1.1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

return promise.setFailure(new IllegalStateException("Channel pool is closed!"));
}

for (MultiplexedChannelRecord connection : connections) {
if (connection.availableStreams() > 0) {
connection.acquire(promise);
Expand Down Expand Up @@ -150,7 +169,27 @@ private void releaseParentChannel(Channel parentChannel) {

@Override
public void close() {
doInEventLoop(eventLoop, connectionPool::close);
try {
setClosedFlag().await();
for (MultiplexedChannelRecord c : connections) {
Future<Channel> f = c.getConnectionFuture();
f.await();
if (f.isSuccess()) {
connectionPool.release(f.getNow()).await();
}
}
connectionPool.close();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}

private Promise<Void> setClosedFlag() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an advantage to doing it this way versus using an AtomicBoolean or something? Just to let the acquires submitted before close called to get a chance to acquire?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to make sure acquire and close serialized so it's not possible to close while in the middle of an acquire.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, that makes sense.

Promise<Void> closedFuture = eventLoop.newPromise();
doInEventLoop(eventLoop, () -> {
closed = true;
closedFuture.setSuccess(null);
});
return closedFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.http.Protocol;

/**
Expand Down Expand Up @@ -64,6 +66,18 @@ public final class MultiplexedChannelRecord {
this.channelReleaser = channelReleaser;
}

@SdkTestInternalApi
MultiplexedChannelRecord(Future<Channel> connectionFuture,
Channel connection,
long maxConcurrencyPerConnection,
BiConsumer<Channel, MultiplexedChannelRecord> channelReleaser) {
this.connectionFuture = connectionFuture;
this.childChannels = new ConcurrentHashMap<>(saturatedCast(maxConcurrencyPerConnection));
this.availableStreams = new AtomicLong(maxConcurrencyPerConnection);
this.channelReleaser = channelReleaser;
this.connection = connection;
}

MultiplexedChannelRecord acquire(Promise<Channel> channelPromise) {
availableStreams.decrementAndGet();
if (connection != null) {
Expand Down Expand Up @@ -140,6 +154,10 @@ void release(Channel channel) {
childChannels.remove(channel.id());
}

public Future<Channel> getConnectionFuture() {
return connectionFuture;
}

long availableStreams() {
return availableStreams.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal.http2;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;


/**
* Tests for {@link Http2MultiplexedChannelPool}.
*/
@RunWith(MockitoJUnitRunner.class)
public class Http2MultiplexedChannelPoolTest {
private static EventLoopGroup loopGroup;

@BeforeClass
public static void setup() {
loopGroup = new NioEventLoopGroup(4);
}

@AfterClass
public static void teardown() {
loopGroup.shutdownGracefully().awaitUninterruptibly();
}

@Test
public void closeWaitsForConnectionToBeReleasedBeforeClosingConnectionPool() throws InterruptedException {
SocketChannel channel = new NioSocketChannel();
try {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);

ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next()));
Mockito.doCallRealMethod().when(releasePromise).await();
releasePromise.setSuccess(null);
Mockito.when(connectionPool.release(Mockito.eq(channel))).thenReturn(releasePromise);

MultiplexedChannelRecord record = new MultiplexedChannelRecord(channelPromise,
channel,
8,
(ch, rec) -> {});
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.singletonList(record));

h2Pool.close();

InOrder inOrder = Mockito.inOrder(connectionPool, releasePromise);
inOrder.verify(releasePromise).await();
inOrder.verify(connectionPool).close();
} finally {
channel.close().awaitUninterruptibly();
}
}

@Test
public void acquireAfterCloseFails() throws InterruptedException {
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);

Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList());

h2Pool.close();

assertThat(h2Pool.acquire().await().isSuccess()).isFalse();
}

@Test
public void releaseAfterCloseFails() throws InterruptedException {
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);

Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.emptyList());

h2Pool.close();

assertThat(h2Pool.release(Mockito.mock(Channel.class)).await().isSuccess()).isFalse();
}
}