Skip to content

Cross region support for CRT Client #4129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.21.12</awscrt.version>
<awscrt.version>0.22.2</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.8.1</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.crt;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.services.s3.crt.S3CrtClientCopyIntegrationTest.randomBytes;
import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.computeCheckSum;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.testutils.service.AwsTestBase;

public class S3CrossRegionCrtIntegrationTest extends S3IntegrationTestBase {
public static final Region CROSS_REGION = Region.EU_CENTRAL_1;
private static final String BUCKET = temporaryBucketName(S3CrossRegionCrtIntegrationTest.class);
private static final String KEY = "key";
private static final String ORIGINAL_OBJ = "test_file.dat";
private static final String COPIED_OBJ = "test_file_copy.dat";
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024, 16 * 1024 + 1);
private static S3AsyncClient crtClient;
private static File file;
private static ExecutorService executorService;

@BeforeAll
public static void setup() throws Exception {
S3IntegrationTestBase.setUp();
S3IntegrationTestBase.createBucket(BUCKET);
crtClient = S3AsyncClient.crtBuilder()
.region(CROSS_REGION)
.crossRegionAccessEnabled(true)
.credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN)
.build();
file = new RandomTempFile(10_000);
S3IntegrationTestBase.s3.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
.build(), file.toPath());
executorService = Executors.newFixedThreadPool(2);
}

@AfterAll
public static void cleanup() {
crtClient.close();
S3IntegrationTestBase.deleteBucketAndAllContents(BUCKET);
executorService.shutdown();
CrtResource.waitForNoResources();
}

@Test
void crossRegionClient_getObject() throws IOException {
byte[] bytes =
crtClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join().asByteArray();
assertThat(bytes).isEqualTo(Files.readAllBytes(file.toPath()));
}

@Test
void putObjectNoSuchBucket() {
assertThatThrownBy(() -> crtClient.getObject(GetObjectRequest.builder().bucket("nonExistingTestBucket" + UUID.randomUUID()).key(KEY).build(),
AsyncResponseTransformer.toBytes()).get())
.hasCauseInstanceOf(S3Exception.class)
.satisfies(throwable -> assertThat(throwable.getCause()).satisfies(cause -> assertThat(((S3Exception) cause).statusCode()).isEqualTo(404)));
}

@Test
void copy_copiedObject_hasSameContent() {
byte[] originalContent = randomBytes(OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
validateCopiedObject(originalContent, ORIGINAL_OBJ);
}

private void copyObject(String original, String destination) {
CompletableFuture<CopyObjectResponse> future = crtClient.copyObject(c -> c
.sourceBucket(BUCKET)
.sourceKey(original)
.destinationBucket(BUCKET)
.destinationKey(destination));

CopyObjectResponse copyObjectResponse = future.join();
assertThat(copyObjectResponse.responseMetadata().requestId()).isNotNull();
assertThat(copyObjectResponse.sdkHttpResponse()).isNotNull();
}

@Test
void putObject_byteBufferBody_objectSentCorrectly() {
byte[] data = new byte[16384];
new Random().nextBytes(data);
ByteBuffer byteBuffer = ByteBuffer.wrap(data);

AsyncRequestBody body = AsyncRequestBody.fromByteBuffer(byteBuffer);

crtClient.putObject(r -> r.bucket(BUCKET).key(KEY), body).join();

ResponseBytes<GetObjectResponse> responseBytes = S3IntegrationTestBase.s3.getObject(r -> r.bucket(BUCKET).key(KEY),
ResponseTransformer.toBytes());

byte[] expectedSum = computeCheckSum(byteBuffer);

assertThat(computeCheckSum(responseBytes.asByteBuffer())).isEqualTo(expectedSum);
}

private void validateCopiedObject(byte[] originalContent, String originalKey) {
ResponseBytes<GetObjectResponse> copiedObject = s3.getObject(r -> r.bucket(BUCKET)
.key(originalKey),
ResponseTransformer.toBytes());
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
}

private void createOriginalObject(byte[] originalContent, String originalKey) {
crtClient.putObject(r -> r.bucket(BUCKET)
.key(originalKey),
AsyncRequestBody.fromBytes(originalContent)).join();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private void validateCopiedObject(byte[] originalContent, String originalKey) {
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
}

private static byte[] randomBytes(long size) {
public static byte[] randomBytes(long size) {
byte[] bytes = new byte[Math.toIntExact(size)];
ThreadLocalRandom.current().nextBytes(bytes);
return bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.RandomTempFile;
Expand Down Expand Up @@ -97,35 +93,4 @@ void getObject_customResponseTransformer() {

}

private static final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
private CompletableFuture<Void> future;

@Override
public CompletableFuture<Void> prepare() {
future = new CompletableFuture<>();
return future;
}

@Override
public void onResponse(GetObjectResponse response) {
assertThat(response).isNotNull();
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new SimpleSubscriber(b -> {
}) {
@Override
public void onComplete() {
super.onComplete();
future.complete(null);
}
});
}

@Override
public void exceptionOccurred(Throwable error) {
future.completeExceptionally(error);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.crt;

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

public final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case for this class?

Copy link
Contributor Author

@joviegas joviegas Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this from S3CrtGetObjectIntegrationTest

private CompletableFuture<Void> future;

@Override
public CompletableFuture<Void> prepare() {
future = new CompletableFuture<>();
return future;
}

@Override
public void onResponse(GetObjectResponse response) {
assertThat(response).isNotNull();
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new SimpleSubscriber(b -> {
}) {
@Override
public void onComplete() {
super.onComplete();
future.complete(null);
}
});
}

@Override
public void exceptionOccurred(Throwable error) {
future.completeExceptionally(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,27 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
.build());
}




/**
* <p> Configures whether cross-region bucket access is enabled for clients using the configuration.
* <p>The following behavior is used when this mode is enabled:
* <ol>
* <li>This method allows enabling or disabling cross-region bucket access for clients. When cross-region bucket
* access is enabled, requests that do not act on an existing bucket (e.g., createBucket API) will be routed to the
* region configured on the client</li>
* <li>The first time a request is made that references an existing bucket (e.g., putObject API), a request will be
* made to the client-configured region. If the bucket does not exist in this region, the service will include the
* actual region in the error responses. Subsequently, the API will be called using the correct region obtained
* from the error response. </li>
* <li>This location may be cached in the client for subsequent requests to the same bucket.</li>
* </ol>
* <p>Enabling this mode has several drawbacks, as it can increase latency if the bucket's location is physically far
* from the location of the request.Therefore, it is strongly advised, whenever possible, to know the location of your
* buckets and create a region-specific client to access them
*
* @param crossRegionAccessEnabled Whether cross region bucket access should be enabled.
* @return The builder object for method chaining.
*/
S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled);

@Override
S3AsyncClient build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;

import java.net.URI;
Expand All @@ -27,6 +28,7 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest;
Expand Down Expand Up @@ -94,6 +96,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
// Disable checksum, it is handled in CRT
.serviceConfiguration(S3Configuration.builder()
.checksumValidationEnabled(false)
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
Expand Down Expand Up @@ -149,6 +152,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB

private List<ExecutionInterceptor> executionInterceptors;
private S3CrtRetryConfiguration retryConfiguration;
private boolean crossRegionAccessEnabled;

public AwsCredentialsProvider credentialsProvider() {
return credentialsProvider;
Expand Down Expand Up @@ -178,6 +182,10 @@ public Long readBufferSizeInBytes() {
return readBufferSizeInBytes;
}

public boolean crossRegionAccessEnabled() {
return crossRegionAccessEnabled;
}

@Override
public S3CrtAsyncClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
this.credentialsProvider = credentialsProvider;
Expand Down Expand Up @@ -259,6 +267,12 @@ public S3CrtAsyncClientBuilder retryConfiguration(S3CrtRetryConfiguration retryC
return this;
}

@Override
public S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled) {
this.crossRegionAccessEnabled = crossRegionAccessEnabled;
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand All @@ -280,6 +294,7 @@ public void afterMarshalling(Context.AfterMarshalling context,
builder.put(OPERATION_NAME,
executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME))
.put(HTTP_CHECKSUM, executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM))
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
.build();

// For putObject and getObject, we rely on CRT to perform checksum validation
Expand Down
Loading