Skip to content

Revert the rollback of #2957 #2982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public final class ChannelAttributeKey {
public static final AttributeKey<Http2FrameStream> HTTP2_FRAME_STREAM = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.http2FrameStream");

public static final AttributeKey<ChannelDiagnostics> CHANNEL_DIAGNOSTICS = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.channelDiagnostics");

/**
* {@link AttributeKey} to keep track of whether we should close the connection after this request
* has completed.
Expand All @@ -88,6 +91,15 @@ public final class ChannelAttributeKey {
static final AttributeKey<Boolean> RESPONSE_COMPLETE_KEY = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.responseComplete");

static final AttributeKey<Integer> RESPONSE_STATUS_CODE = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.responseStatusCode");

static final AttributeKey<Long> RESPONSE_CONTENT_LENGTH = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.responseContentLength");

static final AttributeKey<Long> RESPONSE_DATA_READ = NettyUtils.getOrCreateAttributeKey(
"aws.http.nio.netty.async.responseDataRead");

/**
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import io.netty.channel.Channel;
import java.time.Duration;
import java.time.Instant;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.ToString;

/**
* Diagnostic information that may be useful to help with debugging during error scenarios.
*/
@SdkInternalApi
public class ChannelDiagnostics {
private final Channel channel;
private final Instant channelCreationTime;
private int requestCount = 0;

public ChannelDiagnostics(Channel channel) {
this.channel = channel;
this.channelCreationTime = Instant.now();
}

public void incrementRequestCount() {
++this.requestCount;
}

@Override
public String toString() {
return ToString.builder("ChannelDiagnostics")
.add("channel", channel)
.add("channelAge", Duration.between(channelCreationTime, Instant.now()))
.add("requestCount", requestCount)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
Expand Down Expand Up @@ -88,6 +89,7 @@ public ChannelPipelineInitializer(Protocol protocol,

@Override
public void channelCreated(Channel ch) {
ch.attr(CHANNEL_DIAGNOSTICS).set(new ChannelDiagnostics(ch));
ch.attr(PROTOCOL_FUTURE).set(new CompletableFuture<>());
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.HttpMetric.CONCURRENCY_ACQUIRE_DURATION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_CONTENT_LENGTH;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.CLOSED_CHANNEL_MESSAGE;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -38,22 +40,18 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -176,9 +174,13 @@ private void makeRequestListener(Future<Channel> channelFuture) {
if (channelFuture.isSuccess()) {
channel = channelFuture.getNow();
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
configureChannel();
if (tryConfigurePipeline()) {
try {
configureChannel();
configurePipeline();
makeRequest();
} catch (Throwable t) {
closeAndRelease(channel);
handleFailure(channel, () -> "Failed to initiate request to " + endpoint(), t);
}
});
} else {
Expand All @@ -192,10 +194,13 @@ private void configureChannel() {
channel.attr(REQUEST_CONTEXT_KEY).set(context);
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
channel.attr(RESPONSE_CONTENT_LENGTH).set(null);
channel.attr(RESPONSE_DATA_READ).set(null);
channel.attr(CHANNEL_DIAGNOSTICS).get().incrementRequestCount();
channel.config().setOption(ChannelOption.AUTO_READ, false);
}

private boolean tryConfigurePipeline() {
private void configurePipeline() throws IOException {
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
ChannelPipeline pipeline = channel.pipeline();

Expand All @@ -210,10 +215,7 @@ private boolean tryConfigurePipeline() {
requestAdapter = REQUEST_ADAPTER_HTTP1_1;
break;
default:
String errorMsg = "Unknown protocol: " + protocol;
closeAndRelease(channel);
handleFailure(channel, () -> errorMsg, new RuntimeException(errorMsg));
return false;
throw new IOException("Unknown protocol: " + protocol);
}

pipeline.addLast(LastHttpContentHandler.create());
Expand All @@ -227,13 +229,8 @@ private boolean tryConfigurePipeline() {
// handler (which will monitor for it going inactive from now on).
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
if (!channel.isActive()) {
String errorMessage = "Channel was closed before it could be written to.";
closeAndRelease(channel);
handleFailure(channel, () -> errorMessage, new IOException(errorMessage));
return false;
throw new IOException(NettyUtils.closedChannelMessage(channel));
}

return true;
}

private void makeRequest() {
Expand Down Expand Up @@ -308,80 +305,11 @@ private URI endpoint() {

private void handleFailure(Channel channel, Supplier<String> msgSupplier, Throwable cause) {
log.debug(channel, msgSupplier, cause);
cause = decorateException(cause);
cause = NettyUtils.decorateException(channel, cause);
context.handler().onError(cause);
executeFuture.completeExceptionally(cause);
}

private Throwable decorateException(Throwable originalCause) {
if (isAcquireTimeoutException(originalCause)) {
return new Throwable(getMessageForAcquireTimeoutException(), originalCause);
} else if (isTooManyPendingAcquiresException(originalCause)) {
return new Throwable(getMessageForTooManyAcquireOperationsError(), originalCause);
} else if (originalCause instanceof ReadTimeoutException) {
return new IOException("Read timed out", originalCause);
} else if (originalCause instanceof WriteTimeoutException) {
return new IOException("Write timed out", originalCause);
} else if (originalCause instanceof ClosedChannelException) {
return new IOException(CLOSED_CHANNEL_MESSAGE, originalCause);
}

return originalCause;
}

private boolean isAcquireTimeoutException(Throwable originalCause) {
String message = originalCause.getMessage();
return originalCause instanceof TimeoutException &&
message != null &&
message.contains("Acquire operation took longer");
}

private boolean isTooManyPendingAcquiresException(Throwable originalCause) {
String message = originalCause.getMessage();
return originalCause instanceof IllegalStateException &&
message != null &&
originalCause.getMessage().contains("Too many outstanding acquire operations");
}

private String getMessageForAcquireTimeoutException() {
return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a "
+ "connection from the pool within the specified maximum time. This can be due to high request rate.\n"

+ "Consider taking any of the following actions to mitigate the issue: increase max connections, "
+ "increase acquire timeout, or slowing the request rate.\n"

+ "Increasing the max connections can increase client throughput (unless the network interface is already "
+ "fully utilized), but can eventually start to hit operation system limitations on the number of file "
+ "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
+ "further increase your connection count, increasing the acquire timeout gives extra time for requests to "
+ "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests "
+ "will still timeout.\n"

+ "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
+ "traffic bursts cannot overload the client, being more efficient with the number of times you need to "
+ "call AWS, or by increasing the number of hosts sending requests.";
}

private String getMessageForTooManyAcquireOperationsError() {
return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n"

+ "Consider taking any of the following actions to mitigate the issue: increase max connections, "
+ "increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\n"

+ "Increasing the max connections can increase client throughput (unless the network interface is already "
+ "fully utilized), but can eventually start to hit operation system limitations on the number of file "
+ "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
+ "further increase your connection count, increasing the pending acquire count allows extra requests to be "
+ "buffered by the client, but can cause additional request latency and higher memory usage. If your request"
+ " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail "
+ "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total "
+ "number of failed requests.\n"

+ "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
+ "traffic bursts cannot overload the client, being more efficient with the number of times you need to call "
+ "AWS, or by increasing the number of hosts sending requests.";
}

/**
* Close and release the channel back to the pool.
*
Expand Down
Loading