Skip to content

Commit 6c34260

Browse files
committed
Don't retry on SubscribeToShard and checkstyle fixes
1 parent 94e70e6 commit 6c34260

File tree

6 files changed

+69
-16
lines changed

6 files changed

+69
-16
lines changed

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,12 @@ protected ChannelPool newPool(URI key) {
148148
.option(ChannelOption.TCP_NODELAY, true)
149149
.remoteAddress(key.getHost(), key.getPort());
150150
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
151-
channelPoolRef.set(
152-
new ReleaseOnceChannelPool(
153-
new HandlerRemovingChannelPool(
154-
new HttpOrHttp2ChannelPool(bootstrap,
155-
new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef),
156-
configuration.maxConnections(),
157-
configuration))));
151+
ChannelPipelineInitializer handler =
152+
new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef);
153+
channelPoolRef.set(new ReleaseOnceChannelPool(
154+
new HandlerRemovingChannelPool(
155+
new HttpOrHttp2ChannelPool(bootstrap, handler,
156+
configuration.maxConnections(), configuration))));
158157
return channelPoolRef.get();
159158
}
160159
};

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ReleaseOnceChannelPool.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
/*
2-
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.
66
* A copy of the License is located at
77
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
8+
* http://aws.amazon.com/apache2.0
99
*
1010
* or in the "license" file accompanying this file. This file is distributed
1111
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
@@ -24,13 +24,15 @@
2424
import io.netty.util.concurrent.Promise;
2525
import io.netty.util.concurrent.SucceededFuture;
2626
import java.util.concurrent.atomic.AtomicBoolean;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
2728
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool;
2829

2930
/**
3031
* Wrapper around a {@link ChannelPool} to protect it from having the same channel released twice. This can
3132
* cause issues in {@link FixedChannelPool} and {@link Http2MultiplexedChannelPool} which has a simple
3233
* mechanism to track leased connections.
3334
*/
35+
@SdkInternalApi
3436
public class ReleaseOnceChannelPool implements ChannelPool {
3537

3638
private static final AttributeKey<AtomicBoolean> IS_RELEASED = AttributeKey.newInstance("isReleased");
@@ -53,7 +55,7 @@ public Future<Channel> acquire(Promise<Channel> promise) {
5355

5456
private GenericFutureListener<Future<Channel>> onAcquire() {
5557
return future -> {
56-
if(future.isSuccess()) {
58+
if (future.isSuccess()) {
5759
future.getNow().attr(IS_RELEASED).set(new AtomicBoolean(false));
5860
}
5961
};

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import software.amazon.awssdk.http.SdkHttpResponse;
5252
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription;
5353
import software.amazon.awssdk.utils.FunctionalUtils.UnsafeRunnable;
54-
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
5554
import software.amazon.awssdk.utils.async.DelegatingSubscription;
5655

5756
@Sharable
@@ -211,12 +210,13 @@ private Subscription resolveSubscription(Subscription subscription) {
211210
private void onCancel() {
212211
try {
213212
isCancelled.set(true);
214-
requestContext.handler().exceptionOccurred(new RuntimeException("Cancelled subscription"));
213+
requestContext.handler().exceptionOccurred(
214+
new RuntimeException("Subscriber cancelled before all events were published"));
215215
} finally {
216216
runAndLogError("Could not release channel back to the pool",
217-
() -> closeAndRelease(channelContext));
217+
() -> closeAndRelease(channelContext));
218218
runAndLogError("Could not release channel back to the pool",
219-
() -> closeAndRelease(channelContext));
219+
() -> closeAndRelease(channelContext));
220220
}
221221
}
222222

services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void complete() {
121121
}).join();
122122
fail("Expected exception");
123123
} catch (CompletionException e) {
124-
assertThat(e.getCause()).hasMessageContaining("cancelled");
124+
assertThat(e.getCause().getCause()).hasMessageContaining("cancelled");
125125
assertThat(terminalCalled).as("complete or onComplete was called when it shouldn't have been")
126126
.isFalse();
127127
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.kinesis;
17+
18+
import software.amazon.awssdk.annotations.SdkInternalApi;
19+
import software.amazon.awssdk.awscore.retry.AwsRetryPolicy;
20+
import software.amazon.awssdk.core.retry.RetryPolicy;
21+
import software.amazon.awssdk.core.retry.conditions.AndRetryCondition;
22+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
23+
24+
/**
25+
* Default retry policy for the Kinesis Client.
26+
*/
27+
@SdkInternalApi
28+
public class KinesisRetryPolicy {
29+
30+
/**
31+
* Default retry policy for Kinesis. Turns off retries for SubscribeToShard
32+
*/
33+
private static final RetryPolicy DEFAULT =
34+
AwsRetryPolicy.defaultRetryPolicy().toBuilder()
35+
.retryCondition(AndRetryCondition.create(
36+
c -> !(c.originalRequest() instanceof SubscribeToShardRequest),
37+
AwsRetryPolicy.defaultRetryCondition()))
38+
.build();
39+
40+
private KinesisRetryPolicy() {
41+
42+
}
43+
44+
/**
45+
* @return Default retry policy used by Kinesis
46+
*/
47+
public static RetryPolicy defaultPolicy() {
48+
return DEFAULT;
49+
}
50+
}
51+

services/kinesis/src/main/resources/codegen-resources/customization.config

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
"protocol": "cbor"
77
},
88
"skipSmokeTests": "true",
9-
"blacklistedSimpleMethods": ["deregisterStreamConsumer", "describeStreamConsumer", "listShards"]
9+
"blacklistedSimpleMethods": ["deregisterStreamConsumer", "describeStreamConsumer", "listShards"],
10+
"customRetryPolicy" : "software.amazon.awssdk.services.kinesis.KinesisRetryPolicy"
1011
}

0 commit comments

Comments
 (0)