Skip to content

Commit f6fe8da

Browse files
authored
Validate that the response content-length header matches what the service actually returns. (#2957)
Also update error messaging in the event of service-side connection closes, and add testing to protect these error messages against regression.
1 parent 2d1ea9c commit f6fe8da

File tree

13 files changed

+805
-132
lines changed

13 files changed

+805
-132
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Verify that the content-length header matches the content returned by the service."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ public final class ChannelAttributeKey {
7070
public static final AttributeKey<Http2FrameStream> HTTP2_FRAME_STREAM = NettyUtils.getOrCreateAttributeKey(
7171
"aws.http.nio.netty.async.http2FrameStream");
7272

73+
public static final AttributeKey<ChannelDiagnostics> CHANNEL_DIAGNOSTICS = NettyUtils.getOrCreateAttributeKey(
74+
"aws.http.nio.netty.async.channelDiagnostics");
75+
7376
/**
7477
* {@link AttributeKey} to keep track of whether we should close the connection after this request
7578
* has completed.
@@ -88,6 +91,12 @@ public final class ChannelAttributeKey {
8891
static final AttributeKey<Boolean> RESPONSE_COMPLETE_KEY = NettyUtils.getOrCreateAttributeKey(
8992
"aws.http.nio.netty.async.responseComplete");
9093

94+
static final AttributeKey<Long> RESPONSE_CONTENT_LENGTH = NettyUtils.getOrCreateAttributeKey(
95+
"aws.http.nio.netty.async.responseContentLength");
96+
97+
static final AttributeKey<Long> RESPONSE_DATA_READ = NettyUtils.getOrCreateAttributeKey(
98+
"aws.http.nio.netty.async.responseDataRead");
99+
91100
/**
92101
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
93102
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import io.netty.channel.Channel;
19+
import java.time.Duration;
20+
import java.time.Instant;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.utils.ToString;
23+
24+
/**
25+
* Diagnostic information that may be useful to help with debugging during error scenarios.
26+
*/
27+
@SdkInternalApi
28+
public class ChannelDiagnostics {
29+
private final Channel channel;
30+
private final Instant channelCreationTime;
31+
private int requestCount = 0;
32+
33+
public ChannelDiagnostics(Channel channel) {
34+
this.channel = channel;
35+
this.channelCreationTime = Instant.now();
36+
}
37+
38+
public void incrementRequestCount() {
39+
++this.requestCount;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return ToString.builder("ChannelDiagnostics")
45+
.add("channel", channel)
46+
.add("channelAge", Duration.between(channelCreationTime, Instant.now()))
47+
.add("requestCount", requestCount)
48+
.build();
49+
}
50+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS;
1819
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
1920
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
2021
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
@@ -88,6 +89,7 @@ public ChannelPipelineInitializer(Protocol protocol,
8889

8990
@Override
9091
public void channelCreated(Channel ch) {
92+
ch.attr(CHANNEL_DIAGNOSTICS).set(new ChannelDiagnostics(ch));
9193
ch.attr(PROTOCOL_FUTURE).set(new CompletableFuture<>());
9294
ChannelPipeline pipeline = ch.pipeline();
9395
if (sslCtx != null) {

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 16 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

1818
import static software.amazon.awssdk.http.HttpMetric.CONCURRENCY_ACQUIRE_DURATION;
19+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS;
1920
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
2021
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
2122
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
2223
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
2324
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
2425
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2526
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
27+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_CONTENT_LENGTH;
28+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
2629
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
27-
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.CLOSED_CHANNEL_MESSAGE;
2830

2931
import io.netty.buffer.ByteBuf;
3032
import io.netty.buffer.Unpooled;
@@ -38,22 +40,18 @@
3840
import io.netty.handler.codec.http.HttpMethod;
3941
import io.netty.handler.codec.http.HttpRequest;
4042
import io.netty.handler.codec.http.HttpVersion;
41-
import io.netty.handler.timeout.ReadTimeoutException;
4243
import io.netty.handler.timeout.ReadTimeoutHandler;
43-
import io.netty.handler.timeout.WriteTimeoutException;
4444
import io.netty.handler.timeout.WriteTimeoutHandler;
4545
import io.netty.util.concurrent.Future;
4646
import io.netty.util.concurrent.GenericFutureListener;
4747
import io.netty.util.concurrent.Promise;
4848
import java.io.IOException;
4949
import java.net.URI;
5050
import java.nio.ByteBuffer;
51-
import java.nio.channels.ClosedChannelException;
5251
import java.time.Duration;
5352
import java.util.Optional;
5453
import java.util.concurrent.CompletableFuture;
5554
import java.util.concurrent.TimeUnit;
56-
import java.util.concurrent.TimeoutException;
5755
import java.util.concurrent.atomic.AtomicLong;
5856
import java.util.function.Supplier;
5957
import org.reactivestreams.Publisher;
@@ -176,9 +174,13 @@ private void makeRequestListener(Future<Channel> channelFuture) {
176174
if (channelFuture.isSuccess()) {
177175
channel = channelFuture.getNow();
178176
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
179-
configureChannel();
180-
if (tryConfigurePipeline()) {
177+
try {
178+
configureChannel();
179+
configurePipeline();
181180
makeRequest();
181+
} catch (Throwable t) {
182+
closeAndRelease(channel);
183+
handleFailure(channel, () -> "Failed to initiate request to " + endpoint(), t);
182184
}
183185
});
184186
} else {
@@ -192,10 +194,13 @@ private void configureChannel() {
192194
channel.attr(REQUEST_CONTEXT_KEY).set(context);
193195
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
194196
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
197+
channel.attr(RESPONSE_CONTENT_LENGTH).set(null);
198+
channel.attr(RESPONSE_DATA_READ).set(null);
199+
channel.attr(CHANNEL_DIAGNOSTICS).get().incrementRequestCount();
195200
channel.config().setOption(ChannelOption.AUTO_READ, false);
196201
}
197202

198-
private boolean tryConfigurePipeline() {
203+
private void configurePipeline() throws IOException {
199204
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
200205
ChannelPipeline pipeline = channel.pipeline();
201206

@@ -210,10 +215,7 @@ private boolean tryConfigurePipeline() {
210215
requestAdapter = REQUEST_ADAPTER_HTTP1_1;
211216
break;
212217
default:
213-
String errorMsg = "Unknown protocol: " + protocol;
214-
closeAndRelease(channel);
215-
handleFailure(channel, () -> errorMsg, new RuntimeException(errorMsg));
216-
return false;
218+
throw new IOException("Unknown protocol: " + protocol);
217219
}
218220

219221
pipeline.addLast(LastHttpContentHandler.create());
@@ -227,13 +229,8 @@ private boolean tryConfigurePipeline() {
227229
// handler (which will monitor for it going inactive from now on).
228230
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
229231
if (!channel.isActive()) {
230-
String errorMessage = "Channel was closed before it could be written to.";
231-
closeAndRelease(channel);
232-
handleFailure(channel, () -> errorMessage, new IOException(errorMessage));
233-
return false;
232+
throw new IOException(NettyUtils.closedChannelMessage(channel));
234233
}
235-
236-
return true;
237234
}
238235

239236
private void makeRequest() {
@@ -308,80 +305,11 @@ private URI endpoint() {
308305

309306
private void handleFailure(Channel channel, Supplier<String> msgSupplier, Throwable cause) {
310307
log.debug(channel, msgSupplier, cause);
311-
cause = decorateException(cause);
308+
cause = NettyUtils.decorateException(channel, cause);
312309
context.handler().onError(cause);
313310
executeFuture.completeExceptionally(cause);
314311
}
315312

316-
private Throwable decorateException(Throwable originalCause) {
317-
if (isAcquireTimeoutException(originalCause)) {
318-
return new Throwable(getMessageForAcquireTimeoutException(), originalCause);
319-
} else if (isTooManyPendingAcquiresException(originalCause)) {
320-
return new Throwable(getMessageForTooManyAcquireOperationsError(), originalCause);
321-
} else if (originalCause instanceof ReadTimeoutException) {
322-
return new IOException("Read timed out", originalCause);
323-
} else if (originalCause instanceof WriteTimeoutException) {
324-
return new IOException("Write timed out", originalCause);
325-
} else if (originalCause instanceof ClosedChannelException) {
326-
return new IOException(CLOSED_CHANNEL_MESSAGE, originalCause);
327-
}
328-
329-
return originalCause;
330-
}
331-
332-
private boolean isAcquireTimeoutException(Throwable originalCause) {
333-
String message = originalCause.getMessage();
334-
return originalCause instanceof TimeoutException &&
335-
message != null &&
336-
message.contains("Acquire operation took longer");
337-
}
338-
339-
private boolean isTooManyPendingAcquiresException(Throwable originalCause) {
340-
String message = originalCause.getMessage();
341-
return originalCause instanceof IllegalStateException &&
342-
message != null &&
343-
originalCause.getMessage().contains("Too many outstanding acquire operations");
344-
}
345-
346-
private String getMessageForAcquireTimeoutException() {
347-
return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a "
348-
+ "connection from the pool within the specified maximum time. This can be due to high request rate.\n"
349-
350-
+ "Consider taking any of the following actions to mitigate the issue: increase max connections, "
351-
+ "increase acquire timeout, or slowing the request rate.\n"
352-
353-
+ "Increasing the max connections can increase client throughput (unless the network interface is already "
354-
+ "fully utilized), but can eventually start to hit operation system limitations on the number of file "
355-
+ "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
356-
+ "further increase your connection count, increasing the acquire timeout gives extra time for requests to "
357-
+ "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests "
358-
+ "will still timeout.\n"
359-
360-
+ "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
361-
+ "traffic bursts cannot overload the client, being more efficient with the number of times you need to "
362-
+ "call AWS, or by increasing the number of hosts sending requests.";
363-
}
364-
365-
private String getMessageForTooManyAcquireOperationsError() {
366-
return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n"
367-
368-
+ "Consider taking any of the following actions to mitigate the issue: increase max connections, "
369-
+ "increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\n"
370-
371-
+ "Increasing the max connections can increase client throughput (unless the network interface is already "
372-
+ "fully utilized), but can eventually start to hit operation system limitations on the number of file "
373-
+ "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
374-
+ "further increase your connection count, increasing the pending acquire count allows extra requests to be "
375-
+ "buffered by the client, but can cause additional request latency and higher memory usage. If your request"
376-
+ " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail "
377-
+ "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total "
378-
+ "number of failed requests.\n"
379-
380-
+ "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
381-
+ "traffic bursts cannot overload the client, being more efficient with the number of times you need to call "
382-
+ "AWS, or by increasing the number of hosts sending requests.";
383-
}
384-
385313
/**
386314
* Close and release the channel back to the pool.
387315
*

0 commit comments

Comments
 (0)