Skip to content

Add s3 express suppport for AWS CRT-based S3 Client #4723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS CRT-based S3 client",
"contributor": "",
"description": "Add S3 express support for the AWS CRT-based S3 client."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.28.0</awscrt.version>
<awscrt.version>0.28.10</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.10.0</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class S3ExpressIntegrationTest extends S3ExpressIntegrationTestBase {
private static final String AZ = "use1-az4";
private static S3Client s3;
private static S3AsyncClient s3Async;
private static S3AsyncClient s3CrtAsync;
private static String testBucket;

@BeforeAll
Expand All @@ -104,6 +107,7 @@ static void setup() {
.build();
s3Async = s3AsyncClientBuilder(TEST_REGION).overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
.build();
s3CrtAsync = s3CrtAsyncClientBuilder(TEST_REGION).build();
testBucket = getS3ExpressBucketNameForAz(AZ);
createBucketS3Express(s3, testBucket, AZ);
}
Expand All @@ -113,25 +117,31 @@ static void teardown() {
deleteBucketAndAllContents(s3, testBucket);
s3.close();
s3Async.close();
s3CrtAsync.close();
}

@BeforeEach
void reset() {
capturingInterceptor.reset();
}

@Test
public void putCopyGetDeleteAsync() {
s3Async.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
s3Async.headObject(r -> r.bucket(testBucket).key(KEY)).join();
private static Stream<S3AsyncClient> asyncClients() {
return Stream.of(s3Async, s3CrtAsync);
}

@ParameterizedTest(autoCloseArguments = false)
@MethodSource("asyncClients")
public void putCopyGetDeleteAsync(S3AsyncClient s3AsyncClient) {
s3AsyncClient.putObject(r -> r.bucket(testBucket).key(KEY), AsyncRequestBody.fromString(CONTENTS)).join();
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(KEY)).join();

s3.copyObject(r -> r.sourceBucket(testBucket).sourceKey(KEY).destinationBucket(testBucket).destinationKey(COPY_DESTINATION_KEY));
s3Async.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();
s3AsyncClient.headObject(r -> r.bucket(testBucket).key(COPY_DESTINATION_KEY)).join();

String result = s3Async.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
String result = s3AsyncClient.getObject(r -> r.bucket(testBucket).key(KEY), AsyncResponseTransformer.toBytes()).join().asUtf8String();
assertThat(result).isEqualTo(CONTENTS);

s3Async.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
s3AsyncClient.deleteObject(r -> r.bucket(testBucket).key(KEY)).join();
}

@Test
Expand All @@ -148,16 +158,17 @@ public void putCopyGetDeleteSync() {
s3.deleteObject(r -> r.bucket(testBucket).key(KEY));
}

@Test
public void uploadMultiplePartAsync() {
String uploadId = s3Async.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("asyncClients")
public void uploadMultiplePartAsync(S3AsyncClient s3AsyncClient) {
String uploadId = s3AsyncClient.createMultipartUpload(b -> b.bucket(testBucket).key(KEY)).join().uploadId();

UploadPartRequest uploadPartRequest = UploadPartRequest.builder().bucket(testBucket).key(KEY)
.uploadId(uploadId)
.partNumber(1)
.build();

UploadPartResponse response = s3Async.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();
UploadPartResponse response = s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromString(CONTENTS)).join();

List<CompletedPart> completedParts = new ArrayList<>();
completedParts.add(CompletedPart.builder().eTag(response.eTag()).partNumber(1).build());
Expand All @@ -168,7 +179,7 @@ public void uploadMultiplePartAsync() {
.uploadId(uploadId)
.multipartUpload(completedUploadParts)
.build();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Async.completeMultipartUpload(completeRequest).join();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3AsyncClient.completeMultipartUpload(completeRequest).join();
assertThat(completeMultipartUploadResponse).isNotNull();

ResponseBytes<GetObjectResponse> objectAsBytes = s3.getObject(b -> b.bucket(testBucket).key(KEY), ResponseTransformer.toBytes());
Expand All @@ -183,7 +194,7 @@ public void s3Express_nonObjectTransferApis_Sync(SyncTestCase tc) {
}

@MethodSource("asyncTestCases")
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
public void s3Express_nonObjectTransferApis_Async(AsyncTestCase tc) {
runAndVerify(tc);
}
Expand Down Expand Up @@ -351,30 +362,36 @@ private static List<SyncTestCase> syncTestCases() {
}

private static List<AsyncTestCase> asyncTestCases() {
return Stream.concat(asyncTestCasesPerClient(s3Async).stream(), asyncTestCasesPerClient(s3CrtAsync).stream()).collect(Collectors.toList());
}

private static List<AsyncTestCase> asyncTestCasesPerClient(S3AsyncClient s3Async) {
// getSimpleName is not "simple", but it's fine to be used for testing
String simpleName = s3Async.getClass().getSimpleName();
return Arrays.asList(
//control plane APIs
new AsyncTestCase("ListDirectoryBuckets", () -> {
new AsyncTestCase("ListDirectoryBuckets-" + simpleName, () -> {
ListDirectoryBucketsRequest request = ListDirectoryBucketsRequest.builder().build();
return s3Async.listDirectoryBuckets(request);
}, Expect.builder().build()),
new AsyncTestCase("PutBucketPolicy", () -> {
new AsyncTestCase("PutBucketPolicy-" + simpleName, () -> {
PutBucketPolicyRequest request = PutBucketPolicyRequest.builder().bucket(testBucket).policy("fake").build();
return s3Async.putBucketPolicy(request);
}, Expect.builder().error("Policies must be valid JSON").build()),
new AsyncTestCase("GetBucketPolicy", () -> {
new AsyncTestCase("GetBucketPolicy-" + simpleName, () -> {
GetBucketPolicyRequest request = GetBucketPolicyRequest.builder().bucket(testBucket).build();
return s3Async.getBucketPolicy(request);
}, Expect.builder().error("The bucket policy does not exist").build()),
new AsyncTestCase("DeleteBucketPolicy", () -> {
new AsyncTestCase("DeleteBucketPolicy-" + simpleName, () -> {
DeleteBucketPolicyRequest request = DeleteBucketPolicyRequest.builder().bucket(testBucket).build();
return s3Async.deleteBucketPolicy(request);
}, Expect.builder().build()),
//data plane APIs
new AsyncTestCase("ListObjectsV2", () -> {
new AsyncTestCase("ListObjectsV2-" + simpleName, () -> {
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(testBucket).build();
return s3Async.listObjectsV2(request);
}, Expect.builder().build()),
new AsyncTestCase("DeleteObjects", () -> {
new AsyncTestCase("DeleteObjects-" + simpleName, () -> {
DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.bucket(testBucket)
.delete(Delete.builder()
Expand All @@ -385,7 +402,7 @@ private static List<AsyncTestCase> asyncTestCases() {
.build();
return s3Async.deleteObjects(request);
}, Expect.builder().build()),
new AsyncTestCase("HeadBucket", () -> {
new AsyncTestCase("HeadBucket-" + simpleName, () -> {
HeadBucketRequest request = HeadBucketRequest.builder().bucket(testBucket).build();
return s3Async.headBucket(request);
}, Expect.builder().build())
Expand Down Expand Up @@ -422,8 +439,6 @@ protected static void runAndVerify(SyncTestCase testCase) {
}
List<String> contentSha256Value = req.headers().get("x-amz-content-sha256");
assertThat(contentSha256Value).isNotNull().hasSize(1).isEqualTo(Collections.singletonList("UNSIGNED-PAYLOAD"));

req.headers().keySet().forEach(k -> System.out.println(k + " " + req.headers().get(k)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.BucketInfo;
import software.amazon.awssdk.services.s3.model.BucketType;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
Expand Down Expand Up @@ -63,6 +64,13 @@ protected static S3AsyncClientBuilder s3AsyncClientBuilder(Region region) {

}

protected static S3CrtAsyncClientBuilder s3CrtAsyncClientBuilder(Region region) {
return S3AsyncClient.crtBuilder()
.region(region)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN);

}

protected static void createBucketS3Express(S3Client client, String bucketName, String az) {
try {
LocationInfo location = LocationInfo.builder().name(az).type(LocationType.AVAILABILITY_ZONE).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.identity.spi.IdentityProviders;
import software.amazon.awssdk.identity.spi.ResolveIdentityRequest;
import software.amazon.awssdk.services.s3.s3express.S3ExpressAuthScheme;
import software.amazon.awssdk.services.s3.s3express.S3ExpressSessionCredentials;

/**
* An implementation of {@link S3ExpressAuthScheme} that returns a noop {@link IdentityProvider}.
*/
@SdkInternalApi
public final class CrtS3ExpressNoOpAuthScheme implements S3ExpressAuthScheme {
@Override
public String schemeId() {
return S3ExpressAuthScheme.SCHEME_ID;
}

@Override
public IdentityProvider<S3ExpressSessionCredentials> identityProvider(IdentityProviders providers) {
return NoOpIdentityProvider.INSTANCE;
}

@Override
public HttpSigner<S3ExpressSessionCredentials> signer() {
return NoOpSigner.INSTANCE;
}

private static final class NoOpIdentityProvider implements IdentityProvider<S3ExpressSessionCredentials> {
private static final NoOpIdentityProvider INSTANCE = new NoOpIdentityProvider();

@Override
public Class<S3ExpressSessionCredentials> identityType() {
return S3ExpressSessionCredentials.class;
}

@Override
public CompletableFuture<? extends S3ExpressSessionCredentials> resolveIdentity(ResolveIdentityRequest request) {
return CompletableFuture.completedFuture(null);
}
}

private static final class NoOpSigner implements HttpSigner<S3ExpressSessionCredentials> {
private static final NoOpSigner INSTANCE = new NoOpSigner();

@Override
public SignedRequest sign(SignRequest<? extends S3ExpressSessionCredentials> request) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<AsyncSignedRequest> signAsync(AsyncSignRequest<? extends S3ExpressSessionCredentials> request) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@

package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.AUTH_SCHEMES;
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;

import java.net.URI;
Expand Down Expand Up @@ -58,6 +61,7 @@
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand Down Expand Up @@ -129,6 +133,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
.accelerate(builder.accelerate)
.forcePathStyle(builder.forcePathStyle)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.putAuthScheme(new CrtS3ExpressNoOpAuthScheme())
.httpClientBuilder(initializeS3CrtAsyncHttpClient(builder))
.build();
}
Expand Down Expand Up @@ -309,6 +314,8 @@ public void afterMarshalling(Context.AfterMarshalling context,
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
.put(S3InternalSdkHttpExecutionAttribute.OBJECT_FILE_PATH,
executionAttributes.getAttribute(OBJECT_FILE_PATH))
.put(USE_S3_EXPRESS_AUTH, S3ExpressUtils.useS3ExpressAuthScheme(executionAttributes))
.put(SIGNING_NAME, executionAttributes.getAttribute(SERVICE_SIGNING_NAME))
.build();

// For putObject and getObject, we rely on CRT to perform checksum validation
Expand Down
Loading