Skip to content

Commit ddd0f79

Browse files
committed
Merge 2.0.0-kinesis-event-streaming with master
1 parent c97f8d1 commit ddd0f79

File tree

8 files changed

+60
-20
lines changed

8 files changed

+60
-20
lines changed

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,4 +798,3 @@ private HttpResponseHandler<AwsServiceException> createErrorResponseHandler(Base
798798
return protocolFactory.createErrorResponseHandler(new JsonErrorResponseMetadata());
799799
}
800800
}
801-

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static java.util.Collections.singletonList;
1919
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER;
20+
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZ_ID_2_HEADER;
2021
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
2122

2223
import java.io.ByteArrayInputStream;
@@ -158,6 +159,13 @@ public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
158159

159160
private volatile CompletableFuture<Void> transformFuture;
160161

162+
/**
163+
* Extended Request Id for the streaming request. The value is populated when the initial response is received from the
164+
* service. As request id is not sent in event messages (including exceptions), this can be returned by the SDK along with
165+
* received exception details.
166+
*/
167+
private String extendedRequestId = null;
168+
161169
@Deprecated
162170
@ReviewBeforeRelease("Remove this on full GA of 2.0.0")
163171
public EventStreamAsyncResponseTransformer(
@@ -199,6 +207,9 @@ public void onResponse(SdkResponse response) {
199207
this.requestId = response.sdkHttpResponse()
200208
.firstMatchingHeader(X_AMZN_REQUEST_ID_HEADER)
201209
.orElse(null);
210+
this.extendedRequestId = response.sdkHttpResponse()
211+
.firstMatchingHeader(X_AMZ_ID_2_HEADER)
212+
.orElse(null);
202213
}
203214
}
204215

core/aws-core/src/main/java/software/amazon/awssdk/awscore/internal/client/config/AwsClientOptionValidation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,5 @@ private static void validateClientOptions(SdkClientConfiguration c) {
4646
c.option(AwsClientOption.SERVICE_SIGNING_NAME));
4747
require("overrideConfiguration.advancedOption[ENABLE_DEFAULT_REGION_DETECTION]",
4848
c.option(AwsAdvancedClientOption.ENABLE_DEFAULT_REGION_DETECTION));
49-
5049
}
5150
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/http/HttpResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public interface HttpResponseHandler<T> {
3333

3434
String X_AMZN_REQUEST_ID_HEADER = "x-amzn-RequestId";
3535

36+
String X_AMZ_ID_2_HEADER = "x-amz-id-2";
37+
3638
/**
3739
* Accepts an HTTP response object, and returns an object of type T.
3840
* Individual implementations may choose to handle the response however they

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
7777
NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
7878
this.configuration = new NettyConfiguration(serviceDefaultsMap);
7979
this.protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
80-
this.maxStreams = 200;
80+
this.maxStreams = builder.maxHttp2Streams == null ? Integer.MAX_VALUE : builder.maxHttp2Streams;
8181
this.sdkEventLoopGroup = eventLoopGroup(builder);
8282
this.pools = createChannelPoolMap();
8383
this.sdkChannelOptions = channelOptions(builder);
@@ -277,6 +277,7 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
277277
Builder protocol(Protocol protocol);
278278

279279
/**
280+
<<<<<<< HEAD
280281
* Add new socket channel option which will be used to create Netty Http client. This allows custom configuration
281282
* for Netty.
282283
* @param channelOption {@link ChannelOption} to set
@@ -285,6 +286,18 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
285286
* @see SdkEventLoopGroup.Builder
286287
*/
287288
Builder putChannelOption(ChannelOption channelOption, Object value);
289+
290+
/**
291+
* Sets the max number of concurrent streams for an HTTP/2 connection. This setting is only respected when the HTTP/2
292+
* protocol is used.
293+
*
294+
* <p>Note that this cannot exceed the value of the MAX_CONCURRENT_STREAMS setting returned by the service. If it
295+
* does the service setting is used instead.</p>
296+
*
297+
* @param maxHttp2Streams Max concurrent HTTP/2 streams per connection.
298+
* @return This builder for method chaining.
299+
*/
300+
Builder maxHttp2Streams(Integer maxHttp2Streams);
288301
}
289302

290303
/**
@@ -299,6 +312,7 @@ private static final class DefaultBuilder implements Builder {
299312

300313
private SdkEventLoopGroup eventLoopGroup;
301314
private SdkEventLoopGroup.Builder eventLoopGroupBuilder;
315+
private Integer maxHttp2Streams;
302316

303317
private DefaultBuilder() {
304318
}
@@ -438,6 +452,16 @@ public Builder putChannelOption(ChannelOption channelOption, Object value) {
438452
return this;
439453
}
440454

455+
@Override
456+
public Builder maxHttp2Streams(Integer maxHttp2Streams) {
457+
this.maxHttp2Streams = maxHttp2Streams;
458+
return this;
459+
}
460+
461+
public void setMaxHttp2Streams(Integer maxHttp2Streams) {
462+
maxHttp2Streams(maxHttp2Streams);
463+
}
464+
441465
@Override
442466
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
443467
return new NettyNioAsyncHttpClient(this, standardOptions.build()

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public ChannelPipelineInitializer(Protocol protocol,
6161
}
6262

6363
@Override
64-
public void channelCreated(Channel ch) throws Exception {
64+
public void channelCreated(Channel ch) {
6565
ch.attr(PROTOCOL_FUTURE).set(new CompletableFuture<>());
6666
ChannelPipeline pipeline = ch.pipeline();
6767
if (sslCtx != null) {
@@ -89,7 +89,7 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {
8989

9090
pipeline.addLast(new SimpleChannelInboundHandler<Http2SettingsFrame>() {
9191
@Override
92-
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) throws Exception {
92+
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) {
9393
Long serverMaxStreams = Optional.ofNullable(msg.settings().maxConcurrentStreams()).orElse(Long.MAX_VALUE);
9494
ch.attr(MAX_CONCURRENT_STREAMS).set(Math.min(clientMaxStreams, serverMaxStreams));
9595
ch.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP2);
@@ -103,7 +103,7 @@ public void channelUnregistered(ChannelHandlerContext ctx) {
103103
}
104104

105105
@Override
106-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
106+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
107107
channelError(cause, ch);
108108
}
109109
});
@@ -136,7 +136,7 @@ private void configureHttp11(Channel ch, ChannelPipeline pipeline) {
136136

137137
private static class NoOpChannelInitializer extends ChannelInitializer<Channel> {
138138
@Override
139-
protected void initChannel(Channel ch) throws Exception {
139+
protected void initChannel(Channel ch) {
140140
}
141141
}
142142

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.netty.handler.codec.http.HttpUtil;
3636
import io.netty.handler.codec.http.LastHttpContent;
3737
import io.netty.util.AttributeKey;
38-
3938
import java.io.IOException;
4039
import java.nio.ByteBuffer;
4140
import java.util.List;
@@ -182,7 +181,7 @@ private static class PublisherAdapter implements Publisher<ByteBuffer> {
182181
private final ChannelHandlerContext channelContext;
183182
private final RequestContext requestContext;
184183
private final CompletableFuture<Void> executeFuture;
185-
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
184+
private final AtomicBoolean isDone = new AtomicBoolean(false);
186185

187186
private PublisherAdapter(StreamedHttpResponse response, ChannelHandlerContext channelContext,
188187
RequestContext requestContext, CompletableFuture<Void> executeFuture) {
@@ -211,8 +210,10 @@ private Subscription resolveSubscription(Subscription subscription) {
211210
}
212211

213212
private void onCancel() {
213+
if (!isDone.compareAndSet(false, true)) {
214+
return;
215+
}
214216
try {
215-
isCancelled.set(true);
216217
SdkCancellationException e = new SdkCancellationException(
217218
"Subscriber cancelled before all events were published");
218219
requestContext.handler().onError(e);
@@ -225,6 +226,10 @@ private void onCancel() {
225226

226227
@Override
227228
public void onNext(HttpContent httpContent) {
229+
// isDone may be true if the subscriber cancelled
230+
if (isDone.get()) {
231+
return;
232+
}
228233
// Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
229234
ByteBuffer b = copyToByteBuffer(httpContent.content());
230235
httpContent.release();
@@ -234,7 +239,7 @@ public void onNext(HttpContent httpContent) {
234239

235240
@Override
236241
public void onError(Throwable t) {
237-
if (isCancelled.get()) {
242+
if (!isDone.compareAndSet(false, true)) {
238243
return;
239244
}
240245
try {
@@ -254,7 +259,7 @@ public void onError(Throwable t) {
254259
public void onComplete() {
255260
// For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
256261
// inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
257-
if (isCancelled.get()) {
262+
if (!isDone.compareAndSet(false, true)) {
258263
return;
259264
}
260265
try {

services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import software.amazon.awssdk.core.async.SdkPublisher;
4242
import software.amazon.awssdk.http.SdkCancellationException;
4343
import software.amazon.awssdk.http.SdkHttpResponse;
44-
import software.amazon.awssdk.regions.Region;
4544
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
4645
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
4746
import software.amazon.awssdk.services.kinesis.model.Record;
@@ -56,22 +55,23 @@ public class SubscribeToShardIntegrationTest {
5655

5756
private String streamName;
5857
private static final String CONSUMER_NAME = "subscribe-to-shard-consumer";
59-
private KinesisAsyncClient client;
60-
private String consumerArn;
61-
private String shardId;
58+
private static KinesisAsyncClient client;
59+
private static String consumerArn;
60+
private static String shardId;
6261

6362
@Before
6463
public void setup() throws InterruptedException {
6564
streamName = "subscribe-to-shard-integ-test-" + System.currentTimeMillis();
65+
6666
client = KinesisAsyncClient.builder()
67-
.region(Region.EU_CENTRAL_1)
6867
.build();
6968
client.createStream(r -> r.streamName(streamName)
7069
.shardCount(1)).join();
7170
waitForStreamToBeActive();
7271
String streamARN = client.describeStream(r -> r.streamName(streamName)).join()
7372
.streamDescription()
7473
.streamARN();
74+
7575
this.shardId = client.listShards(r -> r.streamName(streamName))
7676
.join()
7777
.shards().get(0).shardId();
@@ -174,7 +174,7 @@ public void complete() {
174174
}
175175
}
176176

177-
private void waitForConsumerToBeActive() throws InterruptedException {
177+
private static void waitForConsumerToBeActive() throws InterruptedException {
178178
waitUntilTrue(() -> ConsumerStatus.ACTIVE == client.describeStreamConsumer(r -> r.consumerARN(consumerArn))
179179
.join()
180180
.consumerDescription()
@@ -188,7 +188,7 @@ private void waitForStreamToBeActive() throws InterruptedException {
188188
.streamStatus());
189189
}
190190

191-
private void waitUntilTrue(Supplier<Boolean> state) throws InterruptedException {
191+
private static void waitUntilTrue(Supplier<Boolean> state) throws InterruptedException {
192192
int attempt = 0;
193193
do {
194194
if (attempt > 10) {
@@ -229,4 +229,4 @@ private void verifyHttpMetadata(SubscribeToShardResponse response) {
229229
assertThat(sdkHttpResponse.isSuccessful()).isTrue();
230230
assertThat(sdkHttpResponse.headers()).isNotEmpty();
231231
}
232-
}
232+
}

0 commit comments

Comments
 (0)