Skip to content

Commit 66cc220

Browse files
committed
Fixed a bug in the netty client, where a future may not always be completed.
If a service closes a connection between when a channel is acquired and handlers are attached, channel writes could disappear and the response future would never be completed. This change introduces health checks and retries for channel acquisition to fix the majority of cases without failing requests, as well as one last check after handlers are added to ensure the channel hasn't been closed since the channel pool health check. Fixes #1207.
1 parent 64a1c78 commit 66cc220

File tree

5 files changed

+397
-4
lines changed

5 files changed

+397
-4
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "Fixed a bug in asynchronous clients, where a service closing a connection between when a channel is acquired and handlers are attached could lead to response futures never being completed. Fixes [#1207](https://github.com/aws/aws-sdk-java-v2/issues/1207)."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool;
6363
import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer;
6464
import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool;
65+
import software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool;
6566
import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool;
6667
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
6768
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor;
@@ -220,6 +221,10 @@ private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitia
220221
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
221222
channelPool = new ReleaseOnceChannelPool(channelPool);
222223

224+
// Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are
225+
// closed.
226+
channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool);
227+
223228
// Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired
224229
// from the underlying pool, the channel is closed and released.
225230
channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import io.netty.channel.Channel;
19+
import io.netty.channel.EventLoopGroup;
20+
import io.netty.channel.pool.ChannelPool;
21+
import io.netty.util.concurrent.Future;
22+
import io.netty.util.concurrent.Promise;
23+
import io.netty.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
import software.amazon.awssdk.annotations.SdkInternalApi;
27+
28+
/**
29+
* An implementation of {@link ChannelPool} that validates the health of its connections.
30+
*
31+
* This wraps another {@code ChannelPool}, and verifies:
32+
* <ol>
33+
* <li>All connections acquired from the underlying channel pool are in the active state.</li>
34+
* <li>All connections released into the underlying pool that are not active, are closed before they are released.</li>
35+
* </ol>
36+
*
37+
* Acquisitions that fail due to an unhealthy underlying channel are retried until a healthy channel can be returned, or the
38+
* {@link NettyConfiguration#connectionAcquireTimeoutMillis()} timeout is reached.
39+
*/
40+
@SdkInternalApi
41+
public class HealthCheckedChannelPool implements ChannelPool {
42+
private final EventLoopGroup eventLoopGroup;
43+
private final int acquireTimeoutMillis;
44+
private final ChannelPool delegate;
45+
46+
public HealthCheckedChannelPool(EventLoopGroup eventLoopGroup,
47+
NettyConfiguration configuration,
48+
ChannelPool delegate) {
49+
this.eventLoopGroup = eventLoopGroup;
50+
this.acquireTimeoutMillis = configuration.connectionAcquireTimeoutMillis();
51+
this.delegate = delegate;
52+
}
53+
54+
@Override
55+
public Future<Channel> acquire() {
56+
return acquire(eventLoopGroup.next().newPromise());
57+
}
58+
59+
@Override
60+
public Future<Channel> acquire(Promise<Channel> resultFuture) {
61+
// Schedule a task to time out this acquisition, in case we can't acquire a channel fast enough.
62+
ScheduledFuture<?> timeoutFuture =
63+
eventLoopGroup.schedule(() -> timeoutAcquire(resultFuture), acquireTimeoutMillis, TimeUnit.MILLISECONDS);
64+
65+
tryAcquire(resultFuture, timeoutFuture);
66+
return resultFuture;
67+
}
68+
69+
/**
70+
* Time out the provided acquire future, if it hasn't already been completed.
71+
*/
72+
private void timeoutAcquire(Promise<Channel> resultFuture) {
73+
resultFuture.tryFailure(new TimeoutException("Acquire operation took longer than " + acquireTimeoutMillis +
74+
" milliseconds."));
75+
}
76+
77+
/**
78+
* Try to acquire a channel from the underlying pool. This will keep retrying the acquisition until the provided result
79+
* future is completed.
80+
*
81+
* @param resultFuture The future that should be completed with the acquired channel. If this is completed external to this
82+
* function, this function will stop trying to acquire a channel.
83+
* @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
84+
*/
85+
private void tryAcquire(Promise<Channel> resultFuture, ScheduledFuture<?> timeoutFuture) {
86+
// Something else completed the future (probably a timeout). Stop trying to get a channel.
87+
if (resultFuture.isDone()) {
88+
return;
89+
}
90+
91+
Promise<Channel> delegateFuture = eventLoopGroup.next().newPromise();
92+
delegate.acquire(delegateFuture);
93+
delegateFuture.addListener(f -> ensureAcquiredChannelIsHealthy(delegateFuture, resultFuture, timeoutFuture));
94+
}
95+
96+
/**
97+
* Validate that the channel returned by the underlying channel pool is healthy. If so, complete the result future with the
98+
* channel returned by the underlying pool. If not, close the channel and try to get a different one.
99+
*
100+
* @param delegateFuture A completed promise as a result of invoking delegate.acquire().
101+
* @param resultFuture The future that should be completed with the healthy, acquired channel.
102+
* @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
103+
*/
104+
private void ensureAcquiredChannelIsHealthy(Promise<Channel> delegateFuture,
105+
Promise<Channel> resultFuture,
106+
ScheduledFuture<?> timeoutFuture) {
107+
// If our delegate failed to connect, forward down the failure. Don't try again.
108+
if (!delegateFuture.isSuccess()) {
109+
timeoutFuture.cancel(false);
110+
resultFuture.tryFailure(delegateFuture.cause());
111+
return;
112+
}
113+
114+
// If our delegate gave us an unhealthy connection, close it and try to get a new one.
115+
Channel channel = delegateFuture.getNow();
116+
if (!isHealthy(channel)) {
117+
channel.close();
118+
delegate.release(channel);
119+
tryAcquire(resultFuture, timeoutFuture);
120+
return;
121+
}
122+
123+
// Cancel the timeout (best effort), and return back the healthy channel.
124+
timeoutFuture.cancel(false);
125+
if (!resultFuture.trySuccess(channel)) {
126+
// If we couldn't give the channel to the result future (because it failed for some other reason),
127+
// just return it to the pool.
128+
release(channel);
129+
}
130+
}
131+
132+
@Override
133+
public Future<Void> release(Channel channel) {
134+
closeIfUnhealthy(channel);
135+
return delegate.release(channel);
136+
}
137+
138+
@Override
139+
public Future<Void> release(Channel channel, Promise<Void> promise) {
140+
closeIfUnhealthy(channel);
141+
return delegate.release(channel, promise);
142+
}
143+
144+
@Override
145+
public void close() {
146+
delegate.close();
147+
}
148+
149+
/**
150+
* Close the provided channel, if it's considered unhealthy.
151+
*/
152+
private void closeIfUnhealthy(Channel channel) {
153+
if (!isHealthy(channel)) {
154+
channel.close();
155+
}
156+
}
157+
158+
/**
159+
* Determine whether the provided channel is 'healthy' enough to use.
160+
*/
161+
private boolean isHealthy(Channel channel) {
162+
return channel.isActive();
163+
}
164+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,9 @@ private void makeRequestListener(Future<Channel> channelFuture) {
130130
if (channelFuture.isSuccess()) {
131131
channel = channelFuture.getNow();
132132
configureChannel();
133-
configurePipeline();
134-
makeRequest();
133+
if (tryConfigurePipeline()) {
134+
makeRequest();
135+
}
135136
} else {
136137
handleFailure(() -> "Failed to create connection to " + endpoint(), channelFuture.cause());
137138
}
@@ -146,7 +147,7 @@ private void configureChannel() {
146147
channel.config().setOption(ChannelOption.AUTO_READ, false);
147148
}
148149

149-
private void configurePipeline() {
150+
private boolean tryConfigurePipeline() {
150151
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
151152
ChannelPipeline pipeline = channel.pipeline();
152153
if (HTTP2.equals(protocol)) {
@@ -156,10 +157,23 @@ private void configurePipeline() {
156157
String errorMsg = "Unknown protocol: " + protocol;
157158
closeAndRelease(channel);
158159
handleFailure(() -> errorMsg, new RuntimeException(errorMsg));
159-
return;
160+
return false;
160161
}
162+
161163
pipeline.addLast(new HttpStreamsClientHandler());
162164
pipeline.addLast(ResponseHandler.getInstance());
165+
166+
// It's possible that the channel could become inactive between checking it out from the pool, and adding our response
167+
// handler (which will monitor for it going inactive from now on).
168+
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
169+
if (!channel.isActive()) {
170+
String errorMessage = "Channel was closed before it could be written to.";
171+
closeAndRelease(channel);
172+
handleFailure(() -> errorMessage, new IOException(errorMessage));
173+
return false;
174+
}
175+
176+
return true;
163177
}
164178

165179
private void makeRequest() {

0 commit comments

Comments
 (0)