Skip to content

Merge 2.0.0-kinesis-event-streaming branch with master #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -798,4 +798,3 @@ private HttpResponseHandler<AwsServiceException> createErrorResponseHandler(Base
return protocolFactory.createErrorResponseHandler(new JsonErrorResponseMetadata());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public abstract class AbstractAws4Signer<T extends Aws4SignerParams, U extends A
private static final int SIGNER_CACHE_MAX_SIZE = 300;
private static final FifoCache<SignerKey> SIGNER_CACHE =
new FifoCache<>(SIGNER_CACHE_MAX_SIZE);
private static final List<String> LIST_OF_HEADERS_TO_IGNORE_IN_LOWER_CASE = Arrays.asList("connection", "x-amzn-trace-id");
private static final List<String> LIST_OF_HEADERS_TO_IGNORE_IN_LOWER_CASE =
Arrays.asList("connection", "x-amzn-trace-id", "user-agent");

protected SdkHttpFullRequest.Builder doSign(SdkHttpFullRequest request,
Aws4SignerRequestParams requestParams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Collections.singletonList;
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER;
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZ_ID_2_HEADER;
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -158,6 +159,13 @@ public class EventStreamAsyncResponseTransformer<ResponseT, EventT>

private volatile CompletableFuture<Void> transformFuture;

/**
* Extended Request Id for the streaming request. The value is populated when the initial response is received from the
* service. As request id is not sent in event messages (including exceptions), this can be returned by the SDK along with
* received exception details.
*/
private String extendedRequestId = null;

@Deprecated
@ReviewBeforeRelease("Remove this on full GA of 2.0.0")
public EventStreamAsyncResponseTransformer(
Expand Down Expand Up @@ -199,6 +207,9 @@ public void onResponse(SdkResponse response) {
this.requestId = response.sdkHttpResponse()
.firstMatchingHeader(X_AMZN_REQUEST_ID_HEADER)
.orElse(null);
this.extendedRequestId = response.sdkHttpResponse()
.firstMatchingHeader(X_AMZ_ID_2_HEADER)
.orElse(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ private static void validateClientOptions(SdkClientConfiguration c) {
c.option(AwsClientOption.SERVICE_SIGNING_NAME));
require("overrideConfiguration.advancedOption[ENABLE_DEFAULT_REGION_DETECTION]",
c.option(AwsAdvancedClientOption.ENABLE_DEFAULT_REGION_DETECTION));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface HttpResponseHandler<T> {

String X_AMZN_REQUEST_ID_HEADER = "x-amzn-RequestId";

String X_AMZ_ID_2_HEADER = "x-amz-id-2";

/**
* Accepts an HTTP response object, and returns an object of type T.
* Individual implementations may choose to handle the response however they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
this.configuration = new NettyConfiguration(serviceDefaultsMap);
this.protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
this.maxStreams = 200;
this.maxStreams = builder.maxHttp2Streams == null ? Integer.MAX_VALUE : builder.maxHttp2Streams;
this.sdkEventLoopGroup = eventLoopGroup(builder);
this.pools = createChannelPoolMap();
this.sdkChannelOptions = channelOptions(builder);
Expand Down Expand Up @@ -285,6 +285,18 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
* @see SdkEventLoopGroup.Builder
*/
Builder putChannelOption(ChannelOption channelOption, Object value);

/**
* Sets the max number of concurrent streams for an HTTP/2 connection. This setting is only respected when the HTTP/2
* protocol is used.
*
* <p>Note that this cannot exceed the value of the MAX_CONCURRENT_STREAMS setting returned by the service. If it
* does the service setting is used instead.</p>
*
* @param maxHttp2Streams Max concurrent HTTP/2 streams per connection.
* @return This builder for method chaining.
*/
Builder maxHttp2Streams(Integer maxHttp2Streams);
}

/**
Expand All @@ -299,6 +311,7 @@ private static final class DefaultBuilder implements Builder {

private SdkEventLoopGroup eventLoopGroup;
private SdkEventLoopGroup.Builder eventLoopGroupBuilder;
private Integer maxHttp2Streams;

private DefaultBuilder() {
}
Expand Down Expand Up @@ -438,6 +451,16 @@ public Builder putChannelOption(ChannelOption channelOption, Object value) {
return this;
}

@Override
public Builder maxHttp2Streams(Integer maxHttp2Streams) {
this.maxHttp2Streams = maxHttp2Streams;
return this;
}

public void setMaxHttp2Streams(Integer maxHttp2Streams) {
maxHttp2Streams(maxHttp2Streams);
}

@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new NettyNioAsyncHttpClient(this, standardOptions.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ChannelPipelineInitializer(Protocol protocol,
}

@Override
public void channelCreated(Channel ch) throws Exception {
public void channelCreated(Channel ch) {
ch.attr(PROTOCOL_FUTURE).set(new CompletableFuture<>());
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
Expand Down Expand Up @@ -89,7 +89,7 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {

pipeline.addLast(new SimpleChannelInboundHandler<Http2SettingsFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) {
Long serverMaxStreams = Optional.ofNullable(msg.settings().maxConcurrentStreams()).orElse(Long.MAX_VALUE);
ch.attr(MAX_CONCURRENT_STREAMS).set(Math.min(clientMaxStreams, serverMaxStreams));
ch.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP2);
Expand All @@ -103,7 +103,7 @@ public void channelUnregistered(ChannelHandlerContext ctx) {
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
channelError(cause, ch);
}
});
Expand Down Expand Up @@ -136,7 +136,7 @@ private void configureHttp11(Channel ch, ChannelPipeline pipeline) {

private static class NoOpChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
protected void initChannel(Channel ch) {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -182,7 +181,7 @@ private static class PublisherAdapter implements Publisher<ByteBuffer> {
private final ChannelHandlerContext channelContext;
private final RequestContext requestContext;
private final CompletableFuture<Void> executeFuture;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
private final AtomicBoolean isDone = new AtomicBoolean(false);

private PublisherAdapter(StreamedHttpResponse response, ChannelHandlerContext channelContext,
RequestContext requestContext, CompletableFuture<Void> executeFuture) {
Expand Down Expand Up @@ -211,8 +210,10 @@ private Subscription resolveSubscription(Subscription subscription) {
}

private void onCancel() {
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
isCancelled.set(true);
SdkCancellationException e = new SdkCancellationException(
"Subscriber cancelled before all events were published");
requestContext.handler().onError(e);
Expand All @@ -225,6 +226,10 @@ private void onCancel() {

@Override
public void onNext(HttpContent httpContent) {
// isDone may be true if the subscriber cancelled
if (isDone.get()) {
return;
}
// Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
ByteBuffer b = copyToByteBuffer(httpContent.content());
httpContent.release();
Expand All @@ -234,7 +239,7 @@ public void onNext(HttpContent httpContent) {

@Override
public void onError(Throwable t) {
if (isCancelled.get()) {
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
Expand All @@ -254,7 +259,7 @@ public void onError(Throwable t) {
public void onComplete() {
// For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
// inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
if (isCancelled.get()) {
if (!isDone.compareAndSet(false, true)) {
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
Expand All @@ -56,22 +55,23 @@ public class SubscribeToShardIntegrationTest {

private String streamName;
private static final String CONSUMER_NAME = "subscribe-to-shard-consumer";
private KinesisAsyncClient client;
private String consumerArn;
private String shardId;
private static KinesisAsyncClient client;
private static String consumerArn;
private static String shardId;

@Before
public void setup() throws InterruptedException {
streamName = "subscribe-to-shard-integ-test-" + System.currentTimeMillis();

client = KinesisAsyncClient.builder()
.region(Region.EU_CENTRAL_1)
.build();
client.createStream(r -> r.streamName(streamName)
.shardCount(1)).join();
waitForStreamToBeActive();
String streamARN = client.describeStream(r -> r.streamName(streamName)).join()
.streamDescription()
.streamARN();

this.shardId = client.listShards(r -> r.streamName(streamName))
.join()
.shards().get(0).shardId();
Expand Down Expand Up @@ -174,7 +174,7 @@ public void complete() {
}
}

private void waitForConsumerToBeActive() throws InterruptedException {
private static void waitForConsumerToBeActive() throws InterruptedException {
waitUntilTrue(() -> ConsumerStatus.ACTIVE == client.describeStreamConsumer(r -> r.consumerARN(consumerArn))
.join()
.consumerDescription()
Expand All @@ -188,7 +188,7 @@ private void waitForStreamToBeActive() throws InterruptedException {
.streamStatus());
}

private void waitUntilTrue(Supplier<Boolean> state) throws InterruptedException {
private static void waitUntilTrue(Supplier<Boolean> state) throws InterruptedException {
int attempt = 0;
do {
if (attempt > 10) {
Expand Down Expand Up @@ -229,4 +229,4 @@ private void verifyHttpMetadata(SubscribeToShardResponse response) {
assertThat(sdkHttpResponse.isSuccessful()).isTrue();
assertThat(sdkHttpResponse.headers()).isNotEmpty();
}
}
}