Skip to content

[netty-nio-client] Ensure in-use channels are not incorrectly closed #2883

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-nettynioclient-841857c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Netty NIO HTTP Client",
"contributor": "",
"type": "bugfix",
"description": "Ensure in-use channels are not incorrectly closed"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -236,8 +237,13 @@ private SdkChannelPool wrapBaseChannelPool(Bootstrap bootstrap, ChannelPool chan
configuration.maxConnections(),
configuration);

// Wrap the channel pool such that we remove request-specific handlers with each request.
sdkChannelPool = new HandlerRemovingChannelPool(sdkChannelPool);
sdkChannelPool = new ListenerInvokingChannelPool(bootstrap.config().group(), sdkChannelPool, Arrays.asList(
// Add a listener that ensures acquired channels are marked IN_USE and thus not eligible for certain idle timeouts.
InUseTrackingChannelPoolListener.create(),

// Add a listener that removes request-specific handlers with each request.
HandlerRemovingChannelPoolListener.create()
));

// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
sdkChannelPool = new ReleaseOnceChannelPool(sdkChannelPool);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.nio.netty.internal.ListenerInvokingChannelPool.ChannelPoolListener;
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler;

/**
* Removes any per-request {@link ChannelHandler} from the pipeline when releasing it to the pool.
*/
@SdkInternalApi
public final class HandlerRemovingChannelPoolListener implements ChannelPoolListener {

private static final HandlerRemovingChannelPoolListener INSTANCE = new HandlerRemovingChannelPoolListener();

private HandlerRemovingChannelPoolListener() {
}

public static HandlerRemovingChannelPoolListener create() {
return INSTANCE;
}

@Override
public void channelReleased(Channel channel) {
// Only remove per request handler if the channel is registered
// or open since DefaultChannelPipeline would remove handlers if
// channel is closed and unregistered
if (channel.isOpen() || channel.isRegistered()) {
removeIfExists(channel.pipeline(),
HttpStreamsClientHandler.class,
LastHttpContentHandler.class,
FlushOnReadHandler.class,
ResponseHandler.class,
ReadTimeoutHandler.class,
WriteTimeoutHandler.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;

import io.netty.channel.Channel;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.nio.netty.internal.ListenerInvokingChannelPool.ChannelPoolListener;

/**
* Marks {@link Channel}s as in-use when they are leased from the pool. An in-use channel is not eligible to be closed by {@link
* IdleConnectionReaperHandler} or {@link OldConnectionReaperHandler}.
*/
@SdkInternalApi
public final class InUseTrackingChannelPoolListener implements ChannelPoolListener {

private static final InUseTrackingChannelPoolListener INSTANCE = new InUseTrackingChannelPoolListener();

private InUseTrackingChannelPoolListener() {
}

public static InUseTrackingChannelPoolListener create() {
return INSTANCE;
}

@Override
public void channelAcquired(Channel channel) {
channel.attr(IN_USE).set(true);
}

@Override
public void channelReleased(Channel channel) {
channel.attr(IN_USE).set(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.consumeOrPropagate;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import software.amazon.awssdk.metrics.MetricCollector;

/**
* A {@link SdkChannelPool} that wraps and delegates to another {@link SdkChannelPool} while invoking {@link ChannelPoolListener}s
* for important events that occur.
* <p>
* {@link ChannelPoolListener} is similar to Netty's {@link ChannelPoolHandler} interface, but by invoking them as part of a
* {@link SdkChannelPool} wrapper, we are given more control over when they are invoked. For example, {@link
* HttpOrHttp2ChannelPool} may choose not to release HTTP/2 stream channels to the lowest-level pool (and instead store the
* channels in its own pool), but by instrumenting listeners that sit on top of this layer, we are still given visibility into
* these events occurring.
*/
@SdkInternalApi
public final class ListenerInvokingChannelPool implements SdkChannelPool {
private final SdkChannelPool delegatePool;
private final Supplier<Promise<Channel>> promiseFactory;
private final List<ChannelPoolListener> listeners;

/**
* Listener which is called for various actions performed on a {@link SdkChannelPool}. All listener events are guaranteed to
* be invoked as part of the {@link Channel}'s {@link EventLoop}.
*/
@SdkInternalApi
public interface ChannelPoolListener {

/**
* Called <b>after</b> a {@link Channel} was acquired by calling {@link SdkChannelPool#acquire()} or {@link
* SdkChannelPool#acquire(Promise)}.
* <p>
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
default void channelAcquired(Channel channel) {
}

/**
* Called <b>before</b> a {@link Channel} is released by calling {@link SdkChannelPool#release(Channel)} or {@link
* SdkChannelPool#release(Channel, Promise)}.
* <p>
* This method will be called by the {@link EventLoop} of the {@link Channel}.
*/
default void channelReleased(Channel channel) {
}
}

public ListenerInvokingChannelPool(EventLoopGroup eventLoopGroup,
SdkChannelPool delegatePool,
List<ChannelPoolListener> listeners) {
this(() -> eventLoopGroup.next().newPromise(), delegatePool, listeners);
}

public ListenerInvokingChannelPool(Supplier<Promise<Channel>> promiseFactory,
SdkChannelPool delegatePool,
List<ChannelPoolListener> listeners) {
this.delegatePool = delegatePool;
this.promiseFactory = promiseFactory;
this.listeners = listeners;
}

@Override
public Future<Channel> acquire() {
return acquire(promiseFactory.get());
}

@Override
public Future<Channel> acquire(Promise<Channel> returnFuture) {
delegatePool.acquire(promiseFactory.get())
.addListener(consumeOrPropagate(returnFuture, channel -> {
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
invokeChannelAcquired(channel);
returnFuture.trySuccess(channel);
}, returnFuture);
}));
return returnFuture;
}

private void invokeChannelAcquired(Channel channel) {
listeners.forEach(listener -> listener.channelAcquired(channel));
}

@Override
public Future<Void> release(Channel channel) {
return release(channel, channel.eventLoop().newPromise());
}

@Override
public Future<Void> release(Channel channel, Promise<Void> returnFuture) {
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
invokeChannelReleased(channel);
delegatePool.release(channel, returnFuture);
}, returnFuture);
return returnFuture;
}

private void invokeChannelReleased(Channel channel) {
listeners.forEach(listener -> listener.channelReleased(channel));
}


@Override
public void close() {
delegatePool.close();
}

@Override
public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metrics) {
return delegatePool.collectChannelPoolMetrics(metrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ private void configureChannel() {
channel.attr(REQUEST_CONTEXT_KEY).set(context);
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
channel.attr(IN_USE).set(true);
channel.config().setOption(ChannelOption.AUTO_READ, false);
}

Expand Down
Loading