Skip to content

Commit c339ff7

Browse files
committed
Add request cancellation logic and tests
1 parent b3923cb commit c339ff7

File tree

6 files changed

+161
-12
lines changed

6 files changed

+161
-12
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
<rxjava.version>2.2.21</rxjava.version>
113113
<commons-codec.verion>1.10</commons-codec.verion>
114114
<jmh.version>1.29</jmh.version>
115-
<awscrt.version>0.12.8</awscrt.version>
115+
<awscrt.version>0.13.2</awscrt.version>
116116

117117
<!--Test dependencies -->
118118
<junit.version>4.13.1</junit.version>

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3CrtAsyncClient.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import com.amazonaws.s3.S3NativeClient;
19+
import com.amazonaws.s3.model.GetObjectOutput;
1920
import com.amazonaws.s3.model.PutObjectOutput;
2021
import java.util.concurrent.CompletableFuture;
2122
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2224
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
2325
import software.amazon.awssdk.core.async.AsyncRequestBody;
2426
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -28,6 +30,7 @@
2830
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2931
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3032
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
33+
import software.amazon.awssdk.utils.CompletableFutureUtils;
3134

3235
@SdkInternalApi
3336
public final class DefaultS3CrtAsyncClient implements S3CrtAsyncClient {
@@ -55,6 +58,13 @@ public DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
5558
configuration.maxConcurrency());
5659
}
5760

61+
@SdkTestInternalApi
62+
DefaultS3CrtAsyncClient(S3NativeClientConfiguration configuration,
63+
S3NativeClient nativeClient) {
64+
this.configuration = configuration;
65+
this.s3NativeClient = nativeClient;
66+
}
67+
5868
@Override
5969
public <ReturnT> CompletableFuture<ReturnT> getObject(
6070
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ReturnT> asyncResponseTransformer) {
@@ -65,7 +75,8 @@ public <ReturnT> CompletableFuture<ReturnT> getObject(
6575

6676
CompletableFuture<ReturnT> adapterFuture = adapter.transformerFuture();
6777

68-
s3NativeClient.getObject(crtGetObjectRequest, adapter);
78+
CompletableFuture<GetObjectOutput> crtFuture = s3NativeClient.getObject(crtGetObjectRequest, adapter);
79+
CompletableFutureUtils.forwardExceptionTo(future, crtFuture);
6980

7081
adapterFuture.whenComplete((r, t) -> {
7182
if (t == null) {
@@ -76,7 +87,7 @@ public <ReturnT> CompletableFuture<ReturnT> getObject(
7687
// TODO: Offload to future completion thread
7788
});
7889

79-
return future;
90+
return CompletableFutureUtils.forwardExceptionTo(future, adapterFuture);
8091
}
8192

8293
@Override
@@ -94,9 +105,12 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
94105
requestDataSupplier);
95106

96107
CompletableFuture<SdkHttpResponse> httpResponseFuture = requestDataSupplier.sdkHttpResponseFuture();
97-
return httpResponseFuture.thenCombine(putObjectOutputCompletableFuture,
98-
(header, putObjectOutput) ->
99-
S3CrtPojoConversion.fromCrtPutObjectOutput(putObjectOutput, header));
108+
CompletableFuture<PutObjectResponse> executeFuture =
109+
httpResponseFuture.thenCombine(putObjectOutputCompletableFuture,
110+
(header, putObjectOutput) -> S3CrtPojoConversion.fromCrtPutObjectOutput(
111+
putObjectOutput, header));
112+
113+
return CompletableFutureUtils.forwardExceptionTo(executeFuture, putObjectOutputCompletableFuture);
100114
}
101115

102116
@Override

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2424
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
2525
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
26+
import software.amazon.awssdk.transfer.s3.CompletedDownload;
27+
import software.amazon.awssdk.transfer.s3.CompletedUpload;
2628
import software.amazon.awssdk.transfer.s3.Download;
2729
import software.amazon.awssdk.transfer.s3.DownloadRequest;
2830
import software.amazon.awssdk.transfer.s3.S3ClientConfiguration;
2931
import software.amazon.awssdk.transfer.s3.S3TransferManager;
3032
import software.amazon.awssdk.transfer.s3.Upload;
3133
import software.amazon.awssdk.transfer.s3.UploadRequest;
34+
import software.amazon.awssdk.utils.CompletableFutureUtils;
3235

3336
@SdkInternalApi
3437
public final class DefaultS3TransferManager implements S3TransferManager {
@@ -57,17 +60,21 @@ public Upload upload(UploadRequest uploadRequest) {
5760

5861
CompletableFuture<PutObjectResponse> putObjFuture = s3CrtAsyncClient.putObject(putObjectRequest, requestBody);
5962

60-
return new DefaultUpload(putObjFuture.thenApply(r -> DefaultCompletedUpload.builder()
61-
.response(r)
62-
.build()));
63+
CompletableFuture<CompletedUpload> future = putObjFuture.thenApply(r -> DefaultCompletedUpload.builder()
64+
.response(r)
65+
.build());
66+
return new DefaultUpload(CompletableFutureUtils.forwardExceptionTo(future, putObjFuture));
6367
}
6468

6569
@Override
6670
public Download download(DownloadRequest downloadRequest) {
67-
CompletableFuture<GetObjectResponse> future =
71+
CompletableFuture<GetObjectResponse> getObjectFuture =
6872
s3CrtAsyncClient.getObject(downloadRequest.getObjectRequest(),
6973
AsyncResponseTransformer.toFile(downloadRequest.destination()));
70-
return new DefaultDownload(future.thenApply(r -> DefaultCompletedDownload.builder().response(r).build()));
74+
CompletableFuture<CompletedDownload> future =
75+
getObjectFuture.thenApply(r -> DefaultCompletedDownload.builder().response(r).build());
76+
77+
return new DefaultDownload(CompletableFutureUtils.forwardExceptionTo(future, getObjectFuture));
7178
}
7279

7380
@Override

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/S3NativeClientConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* Internal client configuration resolver
3030
*/
3131
@SdkInternalApi
32-
public final class S3NativeClientConfiguration implements SdkAutoCloseable {
32+
public class S3NativeClientConfiguration implements SdkAutoCloseable {
3333
private static final long DEFAULT_PART_SIZE_IN_BYTES = 8L * SizeConstant.MB;
3434
private static final long DEFAULT_TARGET_THROUGHPUT_IN_GBPS = 5;
3535
private final String signingRegion;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.when;
22+
23+
import com.amazonaws.s3.RequestDataSupplier;
24+
import com.amazonaws.s3.ResponseDataConsumer;
25+
import com.amazonaws.s3.S3NativeClient;
26+
import com.amazonaws.s3.model.GetObjectOutput;
27+
import com.amazonaws.s3.model.GetObjectRequest;
28+
import com.amazonaws.s3.model.PutObjectOutput;
29+
import com.amazonaws.s3.model.PutObjectRequest;
30+
import java.util.concurrent.CompletableFuture;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.mockito.Mock;
35+
import org.mockito.runners.MockitoJUnitRunner;
36+
import software.amazon.awssdk.core.ResponseBytes;
37+
import software.amazon.awssdk.core.async.AsyncRequestBody;
38+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
39+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
40+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
41+
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class DefaultS3CrtAsyncClientTest {
44+
@Mock
45+
private S3NativeClient mockS3NativeClient;
46+
47+
@Mock
48+
private S3NativeClientConfiguration mockConfiguration;
49+
50+
private S3CrtAsyncClient s3CrtAsyncClient;
51+
52+
@Before
53+
public void methodSetup() {
54+
s3CrtAsyncClient = new DefaultS3CrtAsyncClient(mockConfiguration,
55+
mockS3NativeClient);
56+
}
57+
58+
@Test
59+
public void getObject_cancels_shouldForwardCancellation() {
60+
CompletableFuture<GetObjectOutput> crtFuture = new CompletableFuture<>();
61+
when(mockS3NativeClient.getObject(any(GetObjectRequest.class),
62+
any(ResponseDataConsumer.class)))
63+
.thenReturn(crtFuture);
64+
65+
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
66+
s3CrtAsyncClient.getObject(b -> b.bucket("bucket").key("key"),
67+
AsyncResponseTransformer.toBytes());
68+
69+
future.cancel(true);
70+
assertThat(crtFuture).isCancelled();
71+
}
72+
73+
@Test
74+
public void putObject_cancels_shouldForwardCancellation() {
75+
CompletableFuture<PutObjectOutput> crtFuture = new CompletableFuture<>();
76+
when(mockS3NativeClient.putObject(any(PutObjectRequest.class),
77+
any(RequestDataSupplier.class)))
78+
.thenReturn(crtFuture);
79+
80+
CompletableFuture<PutObjectResponse> future =
81+
s3CrtAsyncClient.putObject(b -> b.bucket("bucket").key("key"),
82+
AsyncRequestBody.empty());
83+
84+
future.cancel(true);
85+
assertThat(crtFuture).isCancelled();
86+
}
87+
88+
@Test
89+
public void closeS3Client_shouldCloseUnderlyingResources() {
90+
s3CrtAsyncClient.close();
91+
verify(mockS3NativeClient).close();
92+
verify(mockConfiguration).close();
93+
}
94+
}

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ public void upload_returnsResponse() {
7373
assertThat(completedUpload.response()).isEqualTo(response);
7474
}
7575

76+
@Test
77+
public void upload_cancel_shouldForwardCancellation() {
78+
CompletableFuture<PutObjectResponse> s3CrtFuture = new CompletableFuture<>();
79+
when(mockS3Crt.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
80+
.thenReturn(s3CrtFuture);
81+
82+
CompletableFuture<CompletedUpload> future = tm.upload(UploadRequest.builder()
83+
.putObjectRequest(r -> r.bucket("bucket")
84+
.key("key"))
85+
.source(Paths.get("."))
86+
.build())
87+
.completionFuture();
88+
89+
future.cancel(true);
90+
assertThat(s3CrtFuture).isCancelled();
91+
}
92+
7693
@Test
7794
public void download_returnsResponse() {
7895
GetObjectResponse response = GetObjectResponse.builder().build();
@@ -89,4 +106,21 @@ public void download_returnsResponse() {
89106
assertThat(completedDownload.response()).isEqualTo(response);
90107
}
91108

109+
@Test
110+
public void download_cancel_shouldForwardCancellation() {
111+
CompletableFuture<GetObjectResponse> s3CrtFuture = new CompletableFuture<>();
112+
when(mockS3Crt.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)))
113+
.thenReturn(s3CrtFuture);
114+
115+
CompletableFuture<CompletedDownload> future = tm.download(DownloadRequest.builder()
116+
.getObjectRequest(r -> r.bucket("bucket")
117+
.key("key"))
118+
.destination(Paths.get("."))
119+
.build())
120+
.completionFuture();
121+
future.cancel(true);
122+
assertThat(s3CrtFuture).isCancelled();
123+
}
124+
125+
92126
}

0 commit comments

Comments
 (0)