Skip to content

Gracefully handle GOAWAY #1479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "Netty NIO HTTP Client",
"description": "Better handle `GOAWAY` messages from the remote endpoint."
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import javax.net.ssl.SSLParameters;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2GoAwayFrameHandler;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2SettingsFrameHandler;

/**
Expand Down Expand Up @@ -126,8 +127,8 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {
.build());

pipeline.addLast(new Http2MultiplexHandler(new NoOpChannelInitializer()));

pipeline.addLast(new Http2SettingsFrameHandler(ch, clientMaxStreams, channelPoolRef));
pipeline.addLast(new Http2GoAwayFrameHandler());
}

private void configureHttp11(Channel ch, ChannelPipeline pipeline) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal.http2;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
* Exception thrown when a GOAWAY frame is sent by the service.
*/
@SdkInternalApi
class GoAwayException extends IOException {
private final String message;

GoAwayException(long errorCode, ByteBuf debugData) {
this.message = String.format("GOAWAY received. Error Code = %d, Debug Data = %s",
errorCode, debugData.toString(StandardCharsets.UTF_8));
}

@Override
public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal.http2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;

/**
* Handles {@link Http2GoAwayFrame}s sent on a connection. This will pass the frame along to the connection's
* {@link MultiplexedChannelRecord#goAway(Http2GoAwayFrame)} method.
*/
@SdkInternalApi
public class Http2GoAwayFrameHandler extends SimpleChannelInboundHandler<Http2GoAwayFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2GoAwayFrame frame)
{
MultiplexedChannelRecord parentConnection = ctx.channel().attr(ChannelAttributeKey.CHANNEL_POOL_RECORD).get();
if (parentConnection != null) {
parentConnection.goAway(frame);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Frame;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.handler.codec.http2.HttpConversionUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Converts {@link Http2Frame}s to {@link HttpObject}s. Ignores the majority of {@link Http2Frame}s like PING
Expand All @@ -54,8 +50,6 @@ protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws
ctx.channel().read();
} else if (frame instanceof Http2ResetFrame) {
onRstStreamRead((Http2ResetFrame) frame, ctx);
} else if (frame instanceof Http2GoAwayFrame) {
onGoAwayRead((Http2GoAwayFrame) frame, ctx);
} else {
// TODO this is related to the inbound window update bug. Revisit
ctx.channel().parent().read();
Expand All @@ -76,10 +70,6 @@ private void onDataRead(Http2DataFrame dataFrame, ChannelHandlerContext ctx) thr
}
}

private void onGoAwayRead(Http2GoAwayFrame goAwayFrame, ChannelHandlerContext ctx) throws Http2Exception {
ctx.fireExceptionCaught(new GoawayException(goAwayFrame.errorCode(), goAwayFrame.content()));
}

private void onRstStreamRead(Http2ResetFrame resetFrame, ChannelHandlerContext ctx) throws Http2Exception {
ctx.fireExceptionCaught(new Http2ResetException(resetFrame.errorCode()));
}
Expand All @@ -90,24 +80,4 @@ public static class Http2ResetException extends IOException {
super(String.format("Connection reset. Error - %s(%d)", Http2Error.valueOf(errorCode).name(), errorCode));
}
}

/**
* Exception thrown when a GOAWAY frame is sent by the service.
*/
private static class GoawayException extends IOException {

private final long errorCode;
private final byte[] debugData;

GoawayException(long errorCode, ByteBuf debugData) {
this.errorCode = errorCode;
this.debugData = BinaryUtils.copyBytesFrom(debugData.nioBuffer());
}

@Override
public String getMessage() {
return String.format("GOAWAY received. Error Code = %d, Debug Data = %s",
errorCode, new String(debugData, StandardCharsets.UTF_8));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -42,14 +44,14 @@
* streams based on the MAX_CONCURRENT_STREAMS setting for the connection.
*/
@SdkInternalApi
public final class MultiplexedChannelRecord {

public class MultiplexedChannelRecord {
private final Future<Channel> connectionFuture;
private final Map<ChannelId, Channel> childChannels;
private final Map<ChannelId, Http2StreamChannel> childChannels;
private final AtomicLong availableStreams;
private final BiConsumer<Channel, MultiplexedChannelRecord> channelReleaser;

private volatile Channel connection;
private volatile boolean goAway = false;

/**
* @param connectionFuture Future for parent socket channel.
Expand Down Expand Up @@ -80,13 +82,13 @@ public final class MultiplexedChannelRecord {
MultiplexedChannelRecord acquire(Promise<Channel> channelPromise) {
availableStreams.decrementAndGet();
if (connection != null) {
createChildChannel(channelPromise, connection);
createChildChannel(channelPromise);
} else {
connectionFuture.addListener((GenericFutureListener<Future<Channel>>) future -> {
if (future.isSuccess()) {
connection = future.getNow();
connection.attr(CHANNEL_POOL_RECORD).set(this);
createChildChannel(channelPromise, connection);
createChildChannel(channelPromise);
} else {
channelPromise.setFailure(future.cause());
channelReleaser.accept(connection, this);
Expand All @@ -97,42 +99,62 @@ MultiplexedChannelRecord acquire(Promise<Channel> channelPromise) {
}

/**
* Delivers the exception to all registered child channels.
* Handle a {@link Http2GoAwayFrame} on this connection, preventing new streams from being created on it, and closing any
* streams newer than the last-stream-id on the go-away frame.
*/
public void goAway(Http2GoAwayFrame frame) {
this.goAway = true;
GoAwayException exception = new GoAwayException(frame.errorCode(), frame.content());
childChannels.entrySet().stream()
.map(Map.Entry::getValue)
.filter(cc -> cc.stream().id() > frame.lastStreamId())
.forEach(cc -> cc.eventLoop().execute(() -> shutdownChildChannel(cc, exception)));
}

/**
* Delivers the exception to all registered child channels, and prohibits new streams being created on this connection.
*
* @param t Exception to deliver.
*/
public void shutdownChildChannels(Throwable t) {
for (Channel childChannel : childChannels.values()) {
childChannel.pipeline().fireExceptionCaught(t);
}
this.goAway = true;
doInEventLoop(connection.eventLoop(), () -> {
for (Channel childChannel : childChannels.values()) {
shutdownChildChannel(childChannel, t);
}
});
}

private void shutdownChildChannel(Channel childChannel, Throwable t) {
childChannel.pipeline().fireExceptionCaught(t);
}

/**
* Bootstraps a child stream channel from the parent socket channel. Done in parent channel event loop.
*
* @param channelPromise Promise to notify when channel is available.
* @param parentChannel Parent socket channel.
*/
private void createChildChannel(Promise<Channel> channelPromise, Channel parentChannel) {
doInEventLoop(parentChannel.eventLoop(),
() -> createChildChannel0(channelPromise, parentChannel),
channelPromise);
private void createChildChannel(Promise<Channel> channelPromise) {
doInEventLoop(connection.eventLoop(), () -> createChildChannel0(channelPromise), channelPromise);
}

private void createChildChannel0(Promise<Channel> channelPromise, Channel parentChannel) {
// Once protocol future is notified then parent pipeline is configured and ready to go
parentChannel.attr(PROTOCOL_FUTURE).get()
.whenComplete(asyncPromiseNotifyingBiConsumer(bootstrapChildChannel(parentChannel), channelPromise));
private void createChildChannel0(Promise<Channel> channelPromise) {
if (goAway) {
channelPromise.tryFailure(new IOException("No streams are available on this connection."));
} else {
// Once protocol future is notified then parent pipeline is configured and ready to go
connection.attr(PROTOCOL_FUTURE).get()
.whenComplete(asyncPromiseNotifyingBiConsumer(bootstrapChildChannel(), channelPromise));
}
}

/**
* Bootstraps the child stream channel and notifies the Promise on success or failure.
*
* @param parentChannel Parent socket channel.
* @return BiConsumer that will bootstrap the child channel.
*/
private BiConsumer<Protocol, Promise<Channel>> bootstrapChildChannel(Channel parentChannel) {
return (s, p) -> new Http2StreamChannelBootstrap(parentChannel)
private BiConsumer<Protocol, Promise<Channel>> bootstrapChildChannel() {
return (s, p) -> new Http2StreamChannelBootstrap(connection)
.open()
.addListener((GenericFutureListener<Future<Http2StreamChannel>>) future -> {
if (future.isSuccess()) {
Expand All @@ -158,7 +180,7 @@ public Future<Channel> getConnectionFuture() {
}

long availableStreams() {
return availableStreams.get();
return goAway ? 0 : availableStreams.get();
}

}
Loading