Skip to content

Commit 2306005

Browse files
authored
Add s3 express suppport for AWS CRT-based S3 Client (#4723)
* Integrate with CRT S3 express support * Add integ tests * Fix test
1 parent 0340939 commit 2306005

File tree

10 files changed

+327
-105
lines changed

10 files changed

+327
-105
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS CRT-based S3 client",
4+
"contributor": "",
5+
"description": "Add S3 express support for the AWS CRT-based S3 client."
6+
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
<rxjava.version>2.2.21</rxjava.version>
117117
<commons-codec.verion>1.15</commons-codec.verion>
118118
<jmh.version>1.29</jmh.version>
119-
<awscrt.version>0.28.0</awscrt.version>
119+
<awscrt.version>0.28.10</awscrt.version>
120120

121121
<!--Test dependencies -->
122122
<junit5.version>5.10.0</junit5.version>

services/s3/src/it/java/software/amazon/awssdk/services/s3/s3express/S3ExpressIntegrationTest.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.concurrent.CompletableFuture;
3737
import java.util.concurrent.ExecutionException;
3838
import java.util.function.Supplier;
39+
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
3941
import org.junit.jupiter.api.AfterAll;
4042
import org.junit.jupiter.api.BeforeAll;
4143
import org.junit.jupiter.api.BeforeEach;
@@ -96,6 +98,7 @@ public class S3ExpressIntegrationTest extends S3ExpressIntegrationTestBase {
9698
private static final String AZ = "use1-az4";
9799
private static S3Client s3;
98100
private static S3AsyncClient s3Async;
101+
private static S3AsyncClient s3CrtAsync;
99102
private static String testBucket;
100103

101104
@BeforeAll
@@ -104,6 +107,7 @@ static void setup() {
104107
.build();
105108
s3Async = s3AsyncClientBuilder(TEST_REGION).overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
106109
.build();
110+
s3CrtAsync = s3CrtAsyncClientBuilder(TEST_REGION).build();
107111
testBucket = getS3ExpressBucketNameForAz(AZ);
108112
createBucketS3Express(s3, testBucket, AZ);
109113
}
@@ -113,25 +117,31 @@ static void teardown() {
113117
deleteBucketAndAllContents(s3, testBucket);
114118
s3.close();
115119
s3Async.close();
120+
s3CrtAsync.close();
116121
}
117122

118123
@BeforeEach
119124
void reset() {
120125
capturingInterceptor.reset();
121126
}
122127

123-
@Test
124-
public void putCopyGetDeleteAsync() {
125-
s3Async.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
126-
s3Async.headObject(r -> r.bucket(testBucket).key(KEY)).join();
128+
private static Stream<S3AsyncClient> asyncClients() {
129+
return Stream.of(s3Async, s3CrtAsync);
130+
}
131+
132+
@ParameterizedTest(autoCloseArguments = false)
133+
@MethodSource("asyncClients")
134+
public void putCopyGetDeleteAsync(S3AsyncClient s3AsyncClient) {
135+
s3AsyncClient.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
136+
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(KEY)).join();
127137

128138
s3.copyObject(r -> r.sourceBucket(testBucket).sourceKey(KEY).destinationBucket(testBucket).destinationKey(COPY_DESTINATION_KEY));
129-
s3Async.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();
139+
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();
130140

131-
String result = s3Async.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
141+
String result = s3AsyncClient.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
132142
assertThat(result).isEqualTo(CONTENTS);
133143

134-
s3Async.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
144+
s3AsyncClient.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
135145
}
136146

137147
@Test
@@ -148,16 +158,17 @@ public void putCopyGetDeleteSync() {
148158
s3.deleteObject(r -> r.bucket(testBucket).key(KEY));
149159
}
150160

151-
@Test
152-
public void uploadMultiplePartAsync() {
153-
String uploadId = s3Async.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();
161+
@ParameterizedTest(autoCloseArguments = false)
162+
@MethodSource("asyncClients")
163+
public void uploadMultiplePartAsync(S3AsyncClient s3AsyncClient) {
164+
String uploadId = s3AsyncClient.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();
154165

155166
UploadPartRequest uploadPartRequest = UploadPartRequest.builder().bucket(testBucket).key(KEY)
156167
.uploadId(uploadId)
157168
.partNumber(1)
158169
.build();
159170

160-
UploadPartResponse response = s3Async.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();
171+
UploadPartResponse response = s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();
161172

162173
List<CompletedPart> completedParts = new ArrayList<>();
163174
completedParts.add(CompletedPart.builder().eTag(response.eTag()).partNumber(1).build());
@@ -168,7 +179,7 @@ public void uploadMultiplePartAsync() {
168179
.uploadId(uploadId)
169180
.multipartUpload(completedUploadParts)
170181
.build();
171-
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Async.completeMultipartUpload(completeRequest).join();
182+
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3AsyncClient.completeMultipartUpload(completeRequest).join();
172183
assertThat(completeMultipartUploadResponse).isNotNull();
173184

174185
ResponseBytes<GetObjectResponse> objectAsBytes = s3.getObject(b -> b.bucket(testBucket).key(KEY), ResponseTransformer.toBytes());
@@ -183,7 +194,7 @@ public void s3Express_nonObjectTransferApis_Sync(SyncTestCase tc) {
183194
}
184195

185196
@MethodSource("asyncTestCases")
186-
@ParameterizedTest
197+
@ParameterizedTest(autoCloseArguments = false)
187198
public void s3Express_nonObjectTransferApis_Async(AsyncTestCase tc) {
188199
runAndVerify(tc);
189200
}
@@ -351,30 +362,36 @@ private static List<SyncTestCase> syncTestCases() {
351362
}
352363

353364
private static List<AsyncTestCase> asyncTestCases() {
365+
return Stream.concat(asyncTestCasesPerClient(s3Async).stream(), asyncTestCasesPerClient(s3CrtAsync).stream()).collect(Collectors.toList());
366+
}
367+
368+
private static List<AsyncTestCase> asyncTestCasesPerClient(S3AsyncClient s3Async) {
369+
// getSimpleName is not "simple", but it's fine to be used for testing
370+
String simpleName = s3Async.getClass().getSimpleName();
354371
return Arrays.asList(
355372
//control plane APIs
356-
new AsyncTestCase("ListDirectoryBuckets", () -> {
373+
new AsyncTestCase("ListDirectoryBuckets-" + simpleName, () -> {
357374
ListDirectoryBucketsRequest request = ListDirectoryBucketsRequest.builder().build();
358375
return s3Async.listDirectoryBuckets(request);
359376
}, Expect.builder().build()),
360-
new AsyncTestCase("PutBucketPolicy", () -> {
377+
new AsyncTestCase("PutBucketPolicy-" + simpleName, () -> {
361378
PutBucketPolicyRequest request = PutBucketPolicyRequest.builder().bucket(testBucket).policy("fake").build();
362379
return s3Async.putBucketPolicy(request);
363380
}, Expect.builder().error("Policies must be valid JSON").build()),
364-
new AsyncTestCase("GetBucketPolicy", () -> {
381+
new AsyncTestCase("GetBucketPolicy-" + simpleName, () -> {
365382
GetBucketPolicyRequest request = GetBucketPolicyRequest.builder().bucket(testBucket).build();
366383
return s3Async.getBucketPolicy(request);
367384
}, Expect.builder().error("The bucket policy does not exist").build()),
368-
new AsyncTestCase("DeleteBucketPolicy", () -> {
385+
new AsyncTestCase("DeleteBucketPolicy-" + simpleName, () -> {
369386
DeleteBucketPolicyRequest request = DeleteBucketPolicyRequest.builder().bucket(testBucket).build();
370387
return s3Async.deleteBucketPolicy(request);
371388
}, Expect.builder().build()),
372389
//data plane APIs
373-
new AsyncTestCase("ListObjectsV2", () -> {
390+
new AsyncTestCase("ListObjectsV2-" + simpleName, () -> {
374391
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(testBucket).build();
375392
return s3Async.listObjectsV2(request);
376393
}, Expect.builder().build()),
377-
new AsyncTestCase("DeleteObjects", () -> {
394+
new AsyncTestCase("DeleteObjects-" + simpleName, () -> {
378395
DeleteObjectsRequest request = DeleteObjectsRequest.builder()
379396
.bucket(testBucket)
380397
.delete(Delete.builder()
@@ -385,7 +402,7 @@ private static List<AsyncTestCase> asyncTestCases() {
385402
.build();
386403
return s3Async.deleteObjects(request);
387404
}, Expect.builder().build()),
388-
new AsyncTestCase("HeadBucket", () -> {
405+
new AsyncTestCase("HeadBucket-" + simpleName, () -> {
389406
HeadBucketRequest request = HeadBucketRequest.builder().bucket(testBucket).build();
390407
return s3Async.headBucket(request);
391408
}, Expect.builder().build())
@@ -422,8 +439,6 @@ protected static void runAndVerify(SyncTestCase testCase) {
422439
}
423440
List<String> contentSha256Value = req.headers().get("x-amz-content-sha256");
424441
assertThat(contentSha256Value).isNotNull().hasSize(1).isEqualTo(Collections.singletonList("UNSIGNED-PAYLOAD"));
425-
426-
req.headers().keySet().forEach(k -> System.out.println(k + " " + req.headers().get(k)));
427442
});
428443
}
429444

services/s3/src/it/java/software/amazon/awssdk/services/s3/s3express/S3ExpressIntegrationTestBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
2525
import software.amazon.awssdk.services.s3.S3Client;
2626
import software.amazon.awssdk.services.s3.S3ClientBuilder;
27+
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
2728
import software.amazon.awssdk.services.s3.model.BucketInfo;
2829
import software.amazon.awssdk.services.s3.model.BucketType;
2930
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
@@ -63,6 +64,13 @@ protected static S3AsyncClientBuilder s3AsyncClientBuilder(Region region) {
6364

6465
}
6566

67+
protected static S3CrtAsyncClientBuilder s3CrtAsyncClientBuilder(Region region) {
68+
return S3AsyncClient.crtBuilder()
69+
.region(region)
70+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN);
71+
72+
}
73+
6674
protected static void createBucketS3Express(S3Client client, String bucketName, String az) {
6775
try {
6876
LocationInfo location = LocationInfo.builder().name(az).type(LocationType.AVAILABILITY_ZONE).build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
21+
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
22+
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
23+
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
24+
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
25+
import software.amazon.awssdk.identity.spi.IdentityProvider;
26+
import software.amazon.awssdk.identity.spi.IdentityProviders;
27+
import software.amazon.awssdk.identity.spi.ResolveIdentityRequest;
28+
import software.amazon.awssdk.services.s3.s3express.S3ExpressAuthScheme;
29+
import software.amazon.awssdk.services.s3.s3express.S3ExpressSessionCredentials;
30+
31+
/**
32+
* An implementation of {@link S3ExpressAuthScheme} that returns a noop {@link IdentityProvider}.
33+
*/
34+
@SdkInternalApi
35+
public final class CrtS3ExpressNoOpAuthScheme implements S3ExpressAuthScheme {
36+
@Override
37+
public String schemeId() {
38+
return S3ExpressAuthScheme.SCHEME_ID;
39+
}
40+
41+
@Override
42+
public IdentityProvider<S3ExpressSessionCredentials> identityProvider(IdentityProviders providers) {
43+
return NoOpIdentityProvider.INSTANCE;
44+
}
45+
46+
@Override
47+
public HttpSigner<S3ExpressSessionCredentials> signer() {
48+
return NoOpSigner.INSTANCE;
49+
}
50+
51+
private static final class NoOpIdentityProvider implements IdentityProvider<S3ExpressSessionCredentials> {
52+
private static final NoOpIdentityProvider INSTANCE = new NoOpIdentityProvider();
53+
54+
@Override
55+
public Class<S3ExpressSessionCredentials> identityType() {
56+
return S3ExpressSessionCredentials.class;
57+
}
58+
59+
@Override
60+
public CompletableFuture<? extends S3ExpressSessionCredentials> resolveIdentity(ResolveIdentityRequest request) {
61+
return CompletableFuture.completedFuture(null);
62+
}
63+
}
64+
65+
private static final class NoOpSigner implements HttpSigner<S3ExpressSessionCredentials> {
66+
private static final NoOpSigner INSTANCE = new NoOpSigner();
67+
68+
@Override
69+
public SignedRequest sign(SignRequest<? extends S3ExpressSessionCredentials> request) {
70+
throw new UnsupportedOperationException();
71+
}
72+
73+
@Override
74+
public CompletableFuture<AsyncSignedRequest> signAsync(AsyncSignRequest<? extends S3ExpressSessionCredentials> request) {
75+
throw new UnsupportedOperationException();
76+
}
77+
}
78+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.crt;
1717

18+
import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME;
1819
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES;
1920
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
2021
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
2122
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
23+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
2224
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
25+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
2326
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;
2427

2528
import java.net.URI;
@@ -58,6 +61,7 @@
5861
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
5962
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
6063
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
64+
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
6165
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
6266
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
6367
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -129,6 +133,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
129133
.accelerate(builder.accelerate)
130134
.forcePathStyle(builder.forcePathStyle)
131135
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
136+
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
132137
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder))
133138
.build();
134139
}
@@ -309,6 +314,8 @@ public void afterMarshalling(Context.AfterMarshalling context,
309314
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
310315
.put(S3InternalSdkHttpExecutionAttribute.OBJECT_FILE_PATH,
311316
executionAttributes.getAttribute(OBJECT_FILE_PATH))
317+
.put(USE_S3_EXPRESS_AUTH, S3ExpressUtils.useS3ExpressAuthScheme(executionAttributes))
318+
.put(SIGNING_NAME, executionAttributes.getAttribute(SERVICE_SIGNING_NAME))
312319
.build();
313320

314321
// For putObject and getObject, we rely on CRT to perform checksum validation

0 commit comments

Comments
 (0)