Skip to content

Commit 6825899

Browse files
committed
Not calling terminal methods on cancel and fixing simple methods
1 parent 135b9e2 commit 6825899

File tree

16 files changed

+245
-32
lines changed

16 files changed

+245
-32
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/docs/PaginationDocs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
2626
import software.amazon.awssdk.codegen.poet.PoetExtensions;
2727
import software.amazon.awssdk.codegen.utils.PaginatorUtils;
28-
import software.amazon.awssdk.core.internal.async.SequentialSubscriber;
28+
import software.amazon.awssdk.utils.async.SequentialSubscriber;
2929

3030
public class PaginationDocs {
3131

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/DefaultEventStreamResponseHandlerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import software.amazon.awssdk.annotations.SdkProtectedApi;
2424
import software.amazon.awssdk.core.async.SdkPublisher;
25-
import software.amazon.awssdk.core.internal.async.SequentialSubscriber;
25+
import software.amazon.awssdk.utils.async.SequentialSubscriber;
2626

2727
/**
2828
* Base class for event stream response handler builders.

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import org.reactivestreams.Publisher;
2424
import org.reactivestreams.Subscriber;
2525
import software.amazon.awssdk.annotations.SdkPublicApi;
26-
import software.amazon.awssdk.core.internal.async.BufferingSubscriber;
27-
import software.amazon.awssdk.core.internal.async.DelegatingSubscriber;
28-
import software.amazon.awssdk.core.internal.async.FilteringSubscriber;
29-
import software.amazon.awssdk.core.internal.async.FlatteningSubscriber;
30-
import software.amazon.awssdk.core.internal.async.LimitingSubscriber;
31-
import software.amazon.awssdk.core.internal.async.SequentialSubscriber;
26+
import software.amazon.awssdk.utils.async.BufferingSubscriber;
27+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
28+
import software.amazon.awssdk.utils.async.FilteringSubscriber;
29+
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
30+
import software.amazon.awssdk.utils.async.LimitingSubscriber;
31+
import software.amazon.awssdk.utils.async.SequentialSubscriber;
3232

3333
/**
3434
* Interface that is implemented by the Async auto-paginated responses.

core/sdk-core/src/main/java/software/amazon/awssdk/core/exception/SdkClientException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ protected SdkClientException(Builder b) {
3535
super(b);
3636
}
3737

38+
public static SdkClientException create(String message) {
39+
return SdkClientException.builder().message(message).build();
40+
}
41+
3842
public static SdkClientException create(String message, Throwable cause) {
3943
return SdkClientException.builder().message(message).cause(cause).build();
4044
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.RejectedExecutionException;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import org.reactivestreams.Publisher;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
2326
import org.slf4j.Logger;
2427
import org.slf4j.LoggerFactory;
2528
import software.amazon.awssdk.annotations.SdkInternalApi;
2629
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
2730
import software.amazon.awssdk.core.client.config.SdkClientOption;
31+
import software.amazon.awssdk.core.exception.SdkClientException;
2832
import software.amazon.awssdk.core.exception.SdkException;
2933
import software.amazon.awssdk.core.http.HttpResponse;
3034
import software.amazon.awssdk.core.internal.Response;
@@ -43,6 +47,8 @@
4347
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
4448
import software.amazon.awssdk.http.async.SdkHttpRequestProvider;
4549
import software.amazon.awssdk.http.async.SdkHttpResponseHandler;
50+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
51+
import software.amazon.awssdk.utils.async.DelegatingSubscription;
4652

4753
/**
4854
* Delegate to the HTTP implementation to make an HTTP request and receive the response.
@@ -122,6 +128,7 @@ private boolean shouldSetContentLength(SdkHttpFullRequest request, SdkHttpReques
122128
private class ResponseHandler implements SdkHttpResponseHandler<Response<OutputT>> {
123129
private final SdkHttpFullRequest request;
124130
private final Completable completable;
131+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
125132

126133
private volatile SdkHttpResponse response;
127134
private volatile boolean isSuccess = false;
@@ -150,21 +157,36 @@ public void headersReceived(SdkHttpResponse response) {
150157
public void onStream(Publisher<ByteBuffer> publisher) {
151158
if (isSuccess) {
152159
// TODO handle exception as non retryable
153-
responseHandler.onStream(publisher);
160+
responseHandler.onStream(subscriber ->
161+
publisher.subscribe(new OnCancelSubscriber(subscriber, this::onCancel)));
154162
} else {
155163
errorResponseHandler.onStream(publisher);
156164
}
157165
}
158166

167+
/**
168+
* If the subscriber cancels the subscription we treat it as an error and notify the future accordingly.
169+
*/
170+
private void onCancel() {
171+
this.isCancelled.set(true);
172+
completable.completeExceptionally(SdkClientException.create("Subscriber cancelled before all events were published"));
173+
}
174+
159175
@Override
160176
public void exceptionOccurred(Throwable throwable) {
177+
if (isCancelled.get()) {
178+
return;
179+
}
161180
// Note that we don't notify the response handler here, we do that in AsyncRetryableStage where we
162181
// have more context of what's going on and can deliver exceptions more reliably.
163182
completable.completeExceptionally(throwable);
164183
}
165184

166185
@Override
167186
public Response<OutputT> complete() {
187+
if (isCancelled.get()) {
188+
return null;
189+
}
168190
try {
169191
SdkHttpFullResponse httpFullResponse = (SdkHttpFullResponse) this.response;
170192
final HttpResponse httpResponse = SdkHttpResponseAdapter.adapt(false, request, httpFullResponse);
@@ -188,6 +210,39 @@ private Response<OutputT> handleResponse(HttpResponse httpResponse) {
188210

189211
}
190212

213+
/**
214+
* Decorator around a {@link Subscriber} to notify if a cancellation occurs.
215+
*/
216+
private class OnCancelSubscriber extends DelegatingSubscriber<ByteBuffer, ByteBuffer> {
217+
218+
private final Runnable onCancel;
219+
220+
/**
221+
* @param subscriber Subscriber to delegate to.
222+
* @param onCancel Runnable to execute if a cancellation occurs.
223+
*/
224+
private OnCancelSubscriber(Subscriber<? super ByteBuffer> subscriber, Runnable onCancel) {
225+
super(subscriber);
226+
this.onCancel = onCancel;
227+
}
228+
229+
@Override
230+
public void onSubscribe(Subscription subscription) {
231+
super.onSubscribe(new DelegatingSubscription(subscription) {
232+
@Override
233+
public void cancel() {
234+
onCancel.run();
235+
super.cancel();
236+
}
237+
});
238+
}
239+
240+
@Override
241+
public void onNext(ByteBuffer byteBuffer) {
242+
subscriber.onNext(byteBuffer);
243+
}
244+
}
245+
191246
/**
192247
* An interface similar to {@link CompletableFuture} that may or may not dispatch completion of the future to an executor
193248
* service, depending on the client's configuration.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2ResetSendingSubscription.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,24 @@
2020
import io.netty.handler.codec.http2.Http2Error;
2121
import org.reactivestreams.Subscription;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.utils.async.DelegatingSubscription;
2324

2425
/**
2526
* Wrapper around a {@link Subscription} to send a RST_STREAM frame on cancel.
2627
*/
2728
@SdkInternalApi
28-
public class Http2ResetSendingSubscription implements Subscription {
29+
public class Http2ResetSendingSubscription extends DelegatingSubscription {
2930

3031
private final ChannelHandlerContext ctx;
31-
private final Subscription delegate;
3232

3333
public Http2ResetSendingSubscription(ChannelHandlerContext ctx, Subscription delegate) {
34+
super(delegate);
3435
this.ctx = ctx;
35-
this.delegate = delegate;
36-
}
37-
38-
@Override
39-
public void request(long l) {
40-
delegate.request(l);
4136
}
4237

4338
@Override
4439
public void cancel() {
4540
ctx.write(new DefaultHttp2ResetFrame(Http2Error.CANCEL));
46-
delegate.cancel();
41+
super.cancel();
4742
}
4843
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.kinesis;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.junit.Assert.fail;
20+
21+
import java.util.concurrent.CompletionException;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.function.Supplier;
24+
import org.junit.After;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.reactivestreams.Subscriber;
28+
import org.reactivestreams.Subscription;
29+
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
30+
import software.amazon.awssdk.core.async.SdkPublisher;
31+
import software.amazon.awssdk.regions.Region;
32+
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
33+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
34+
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
35+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
36+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
37+
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
38+
39+
public class SubscribeToShardIntegrationTest {
40+
41+
private static final String STREAM_NAME = "subscribe-to-shard-integ-test-" + System.currentTimeMillis();
42+
private static final String CONSUMER_NAME = "subscribe-to-shard-consumer";
43+
private KinesisAsyncClient client;
44+
private String consumerArn;
45+
private String shardId;
46+
47+
@Before
48+
public void setup() throws InterruptedException {
49+
client = KinesisAsyncClient.builder()
50+
// TODO credentials and region (whitelisting)
51+
.credentialsProvider(ProfileCredentialsProvider.create("justin-kinesis"))
52+
.region(Region.US_EAST_2)
53+
.build();
54+
client.createStream(r -> r.streamName(STREAM_NAME)
55+
.shardCount(4)).join();
56+
waitForStreamToBeActive();
57+
String streamARN = client.describeStream(r -> r.streamName(STREAM_NAME)).join()
58+
.streamDescription()
59+
.streamARN();
60+
this.shardId = client.listShards(r -> r.streamName(STREAM_NAME))
61+
.join()
62+
.shards().get(0).shardId();
63+
this.consumerArn = client.registerStreamConsumer(r -> r.streamARN(streamARN).consumerName(CONSUMER_NAME)).join()
64+
.consumer()
65+
.consumerARN();
66+
waitForConsumerToBeActive();
67+
}
68+
69+
@After
70+
public void tearDown() {
71+
client.deleteStream(r -> r.streamName(STREAM_NAME)
72+
.enforceConsumerDeletion(true)).join();
73+
}
74+
75+
@Test
76+
public void cancelledSubscription_DoesNotCallTerminalMethods() {
77+
AtomicBoolean terminalCalled = new AtomicBoolean(false);
78+
try {
79+
client.subscribeToShard(r -> r.consumerARN(consumerArn)
80+
.shardId(shardId)
81+
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
82+
new SubscribeToShardResponseHandler() {
83+
@Override
84+
public void responseReceived(SubscribeToShardResponse response) {
85+
86+
}
87+
88+
@Override
89+
public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
90+
publisher.limit(3).subscribe(new Subscriber<SubscribeToShardEventStream>() {
91+
@Override
92+
public void onSubscribe(Subscription subscription) {
93+
subscription.request(10);
94+
}
95+
96+
@Override
97+
public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
98+
}
99+
100+
@Override
101+
public void onError(Throwable throwable) {
102+
terminalCalled.set(true);
103+
}
104+
105+
@Override
106+
public void onComplete() {
107+
terminalCalled.set(true);
108+
}
109+
});
110+
}
111+
112+
@Override
113+
public void exceptionOccurred(Throwable throwable) {
114+
// Expected to be called
115+
}
116+
117+
@Override
118+
public void complete() {
119+
terminalCalled.set(true);
120+
}
121+
}).join();
122+
fail("Expected exception");
123+
} catch (CompletionException e) {
124+
assertThat(e.getCause()).hasMessageContaining("cancelled");
125+
assertThat(terminalCalled).as("complete or onComplete was called when it shouldn't have been")
126+
.isFalse();
127+
}
128+
}
129+
130+
private void waitForConsumerToBeActive() throws InterruptedException {
131+
waitUntilTrue(() -> ConsumerStatus.ACTIVE == client.describeStreamConsumer(r -> r.consumerARN(consumerArn))
132+
.join()
133+
.consumerDescription()
134+
.consumerStatus());
135+
}
136+
137+
private void waitForStreamToBeActive() throws InterruptedException {
138+
waitUntilTrue(() -> StreamStatus.ACTIVE == client.describeStream(r -> r.streamName(STREAM_NAME))
139+
.join()
140+
.streamDescription()
141+
.streamStatus());
142+
}
143+
144+
private void waitUntilTrue(Supplier<Boolean> state) throws InterruptedException {
145+
int attempt = 0;
146+
do {
147+
if (attempt > 10) {
148+
throw new IllegalStateException("State never transitioned");
149+
}
150+
Thread.sleep(5000);
151+
attempt++;
152+
if (state.get()) {
153+
return;
154+
}
155+
} while (true);
156+
}
157+
158+
}

services/kinesis/src/main/resources/codegen-resources/customization.config

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@
66
"protocol": "cbor"
77
},
88
"skipSmokeTests": "true",
9-
"verifiedSimpleMethods": ["deregisterStreamConsumer"]
10-
}
9+
"blacklistedSimpleMethods": ["deregisterStreamConsumer", "describeStreamConsumer", "listShards"]
1110
}

utils/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<name>AWS Java SDK :: Utilities</name>
3030

3131
<dependencies>
32+
<dependency>
33+
<groupId>org.reactivestreams</groupId>
34+
<artifactId>reactive-streams</artifactId>
35+
</dependency>
3236
<dependency>
3337
<groupId>software.amazon.awssdk</groupId>
3438
<artifactId>annotations</artifactId>
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.async;
16+
package software.amazon.awssdk.utils.async;
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.async;
16+
package software.amazon.awssdk.utils.async;
1717

1818
import org.reactivestreams.Subscriber;
1919
import org.reactivestreams.Subscription;

0 commit comments

Comments
 (0)