Skip to content

Commit 7e499db

Browse files
[netty-nio-client] Ensure in-use channels are not incorrectly closed (#2883)
* [netty-nio-client] Ensure in-use channels are not incorrectly closed IdleConnectionReaperHandler and OldConnectionReaperHandler are responsible for closing idle/old channels, but only when they are not marked as IN_USE. NettyRequestExecutor is responsible for marking a channel as IN_USE as part of its logic to prepare a channel for a request. However, in rare circumstances, it may be possible for a channel to reach its idle time in between being acquired from the pool and having the request initiated (especially when the channel is acquired from a thread outside the EventLoop). We can make this more resilient by more aggressively marking the channel as IN_USE as part of the channel acquire logic, and ensuring this is run before the channel's health is queried by HealthCheckedChannelPool (which will acquire a new channel, if needed). Previously, NettyRequestExecutor was responsible for setting IN_USE and HandlerRemovingChannelPool was responsible for clearing it. This PR introduces a simple ChannelPoolListener interface that allows implementing classes to register behavior based on channels being acquired/released (while also guaranteeing the listener callbacks are executed within the channel's EventLoop). We can leverage this interface to make the IN_USE behavior more symmetrical, by using a single ChannelPoolListener to both set/clear the flag. The ChannelPoolListener interface can additionally be used to simplify some of our other ChannelPool-wrapping classes (the ones that don't need to actually manipulate the pool control flow). So this commit also takes the opportunity to migrate the existing HandlerRemovingChannelPool to leverage this interface.
1 parent dbe8b17 commit 7e499db

File tree

14 files changed

+342
-114
lines changed

14 files changed

+342
-114
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Ensure in-use channels are not incorrectly closed"
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/AwaitCloseChannelPoolMap.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.net.URI;
2727
import java.net.URISyntaxException;
2828
import java.time.Duration;
29+
import java.util.Arrays;
2930
import java.util.Collection;
3031
import java.util.Map;
3132
import java.util.concurrent.CompletableFuture;
@@ -236,8 +237,13 @@ private SdkChannelPool wrapBaseChannelPool(Bootstrap bootstrap, ChannelPool chan
236237
configuration.maxConnections(),
237238
configuration);
238239

239-
// Wrap the channel pool such that we remove request-specific handlers with each request.
240-
sdkChannelPool = new HandlerRemovingChannelPool(sdkChannelPool);
240+
sdkChannelPool = new ListenerInvokingChannelPool(bootstrap.config().group(), sdkChannelPool, Arrays.asList(
241+
// Add a listener that ensures acquired channels are marked IN_USE and thus not eligible for certain idle timeouts.
242+
InUseTrackingChannelPoolListener.create(),
243+
244+
// Add a listener that removes request-specific handlers with each request.
245+
HandlerRemovingChannelPoolListener.create()
246+
));
241247

242248
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
243249
sdkChannelPool = new ReleaseOnceChannelPool(sdkChannelPool);

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPool.java

Lines changed: 0 additions & 97 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;
19+
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelHandler;
22+
import io.netty.handler.timeout.ReadTimeoutHandler;
23+
import io.netty.handler.timeout.WriteTimeoutHandler;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.http.nio.netty.internal.ListenerInvokingChannelPool.ChannelPoolListener;
26+
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
27+
import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler;
28+
29+
/**
30+
* Removes any per-request {@link ChannelHandler} from the pipeline when releasing it to the pool.
31+
*/
32+
@SdkInternalApi
33+
public final class HandlerRemovingChannelPoolListener implements ChannelPoolListener {
34+
35+
private static final HandlerRemovingChannelPoolListener INSTANCE = new HandlerRemovingChannelPoolListener();
36+
37+
private HandlerRemovingChannelPoolListener() {
38+
}
39+
40+
public static HandlerRemovingChannelPoolListener create() {
41+
return INSTANCE;
42+
}
43+
44+
@Override
45+
public void channelReleased(Channel channel) {
46+
// Only remove per request handler if the channel is registered
47+
// or open since DefaultChannelPipeline would remove handlers if
48+
// channel is closed and unregistered
49+
if (channel.isOpen() || channel.isRegistered()) {
50+
removeIfExists(channel.pipeline(),
51+
HttpStreamsClientHandler.class,
52+
LastHttpContentHandler.class,
53+
FlushOnReadHandler.class,
54+
ResponseHandler.class,
55+
ReadTimeoutHandler.class,
56+
WriteTimeoutHandler.class);
57+
}
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
19+
20+
import io.netty.channel.Channel;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.http.nio.netty.internal.ListenerInvokingChannelPool.ChannelPoolListener;
23+
24+
/**
25+
* Marks {@link Channel}s as in-use when they are leased from the pool. An in-use channel is not eligible to be closed by {@link
26+
* IdleConnectionReaperHandler} or {@link OldConnectionReaperHandler}.
27+
*/
28+
@SdkInternalApi
29+
public final class InUseTrackingChannelPoolListener implements ChannelPoolListener {
30+
31+
private static final InUseTrackingChannelPoolListener INSTANCE = new InUseTrackingChannelPoolListener();
32+
33+
private InUseTrackingChannelPoolListener() {
34+
}
35+
36+
public static InUseTrackingChannelPoolListener create() {
37+
return INSTANCE;
38+
}
39+
40+
@Override
41+
public void channelAcquired(Channel channel) {
42+
channel.attr(IN_USE).set(true);
43+
}
44+
45+
@Override
46+
public void channelReleased(Channel channel) {
47+
channel.attr(IN_USE).set(false);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.consumeOrPropagate;
19+
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.EventLoop;
22+
import io.netty.channel.EventLoopGroup;
23+
import io.netty.channel.pool.ChannelPoolHandler;
24+
import io.netty.util.concurrent.Future;
25+
import io.netty.util.concurrent.Promise;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Supplier;
29+
import software.amazon.awssdk.annotations.SdkInternalApi;
30+
import software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool;
31+
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
32+
import software.amazon.awssdk.metrics.MetricCollector;
33+
34+
/**
35+
* A {@link SdkChannelPool} that wraps and delegates to another {@link SdkChannelPool} while invoking {@link ChannelPoolListener}s
36+
* for important events that occur.
37+
* <p>
38+
* {@link ChannelPoolListener} is similar to Netty's {@link ChannelPoolHandler} interface, but by invoking them as part of a
39+
* {@link SdkChannelPool} wrapper, we are given more control over when they are invoked. For example, {@link
40+
* HttpOrHttp2ChannelPool} may choose not to release HTTP/2 stream channels to the lowest-level pool (and instead store the
41+
* channels in its own pool), but by instrumenting listeners that sit on top of this layer, we are still given visibility into
42+
* these events occurring.
43+
*/
44+
@SdkInternalApi
45+
public final class ListenerInvokingChannelPool implements SdkChannelPool {
46+
private final SdkChannelPool delegatePool;
47+
private final Supplier<Promise<Channel>> promiseFactory;
48+
private final List<ChannelPoolListener> listeners;
49+
50+
/**
51+
* Listener which is called for various actions performed on a {@link SdkChannelPool}. All listener events are guaranteed to
52+
* be invoked as part of the {@link Channel}'s {@link EventLoop}.
53+
*/
54+
@SdkInternalApi
55+
public interface ChannelPoolListener {
56+
57+
/**
58+
* Called <b>after</b> a {@link Channel} was acquired by calling {@link SdkChannelPool#acquire()} or {@link
59+
* SdkChannelPool#acquire(Promise)}.
60+
* <p>
61+
* This method will be called by the {@link EventLoop} of the {@link Channel}.
62+
*/
63+
default void channelAcquired(Channel channel) {
64+
}
65+
66+
/**
67+
* Called <b>before</b> a {@link Channel} is released by calling {@link SdkChannelPool#release(Channel)} or {@link
68+
* SdkChannelPool#release(Channel, Promise)}.
69+
* <p>
70+
* This method will be called by the {@link EventLoop} of the {@link Channel}.
71+
*/
72+
default void channelReleased(Channel channel) {
73+
}
74+
}
75+
76+
public ListenerInvokingChannelPool(EventLoopGroup eventLoopGroup,
77+
SdkChannelPool delegatePool,
78+
List<ChannelPoolListener> listeners) {
79+
this(() -> eventLoopGroup.next().newPromise(), delegatePool, listeners);
80+
}
81+
82+
public ListenerInvokingChannelPool(Supplier<Promise<Channel>> promiseFactory,
83+
SdkChannelPool delegatePool,
84+
List<ChannelPoolListener> listeners) {
85+
this.delegatePool = delegatePool;
86+
this.promiseFactory = promiseFactory;
87+
this.listeners = listeners;
88+
}
89+
90+
@Override
91+
public Future<Channel> acquire() {
92+
return acquire(promiseFactory.get());
93+
}
94+
95+
@Override
96+
public Future<Channel> acquire(Promise<Channel> returnFuture) {
97+
delegatePool.acquire(promiseFactory.get())
98+
.addListener(consumeOrPropagate(returnFuture, channel -> {
99+
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
100+
invokeChannelAcquired(channel);
101+
returnFuture.trySuccess(channel);
102+
}, returnFuture);
103+
}));
104+
return returnFuture;
105+
}
106+
107+
private void invokeChannelAcquired(Channel channel) {
108+
listeners.forEach(listener -> listener.channelAcquired(channel));
109+
}
110+
111+
@Override
112+
public Future<Void> release(Channel channel) {
113+
return release(channel, channel.eventLoop().newPromise());
114+
}
115+
116+
@Override
117+
public Future<Void> release(Channel channel, Promise<Void> returnFuture) {
118+
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
119+
invokeChannelReleased(channel);
120+
delegatePool.release(channel, returnFuture);
121+
}, returnFuture);
122+
return returnFuture;
123+
}
124+
125+
private void invokeChannelReleased(Channel channel) {
126+
listeners.forEach(listener -> listener.channelReleased(channel));
127+
}
128+
129+
130+
@Override
131+
public void close() {
132+
delegatePool.close();
133+
}
134+
135+
@Override
136+
public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metrics) {
137+
return delegatePool.collectChannelPoolMetrics(metrics);
138+
}
139+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ private void configureChannel() {
192192
channel.attr(REQUEST_CONTEXT_KEY).set(context);
193193
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
194194
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
195-
channel.attr(IN_USE).set(true);
196195
channel.config().setOption(ChannelOption.AUTO_READ, false);
197196
}
198197

0 commit comments

Comments
 (0)