Skip to content

Commit 4039f39

Browse files
authored
Cross region support for CRT Client (#4129)
* Cross region support for CRT Client * removing common class * handled review comments
1 parent 5d57416 commit 4039f39

File tree

10 files changed

+289
-40
lines changed

10 files changed

+289
-40
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
<rxjava.version>2.2.21</rxjava.version>
115115
<commons-codec.verion>1.15</commons-codec.verion>
116116
<jmh.version>1.29</jmh.version>
117-
<awscrt.version>0.21.12</awscrt.version>
117+
<awscrt.version>0.22.2</awscrt.version>
118118

119119
<!--Test dependencies -->
120120
<junit5.version>5.8.1</junit5.version>
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static software.amazon.awssdk.services.s3.crt.S3CrtClientCopyIntegrationTest.randomBytes;
21+
import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.computeCheckSum;
22+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
23+
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.file.Files;
28+
import java.util.Random;
29+
import java.util.UUID;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import org.assertj.core.api.Assertions;
35+
import org.junit.jupiter.api.AfterAll;
36+
import org.junit.jupiter.api.BeforeAll;
37+
import org.junit.jupiter.api.Test;
38+
import software.amazon.awssdk.core.ResponseBytes;
39+
import software.amazon.awssdk.core.async.AsyncRequestBody;
40+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41+
import software.amazon.awssdk.core.sync.ResponseTransformer;
42+
import software.amazon.awssdk.crt.CrtResource;
43+
import software.amazon.awssdk.regions.Region;
44+
import software.amazon.awssdk.services.s3.S3AsyncClient;
45+
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
46+
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
47+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
48+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
49+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
50+
import software.amazon.awssdk.services.s3.model.S3Exception;
51+
import software.amazon.awssdk.testutils.RandomTempFile;
52+
import software.amazon.awssdk.testutils.service.AwsTestBase;
53+
54+
public class S3CrossRegionCrtIntegrationTest extends S3IntegrationTestBase {
55+
public static final Region CROSS_REGION = Region.EU_CENTRAL_1;
56+
private static final String BUCKET = temporaryBucketName(S3CrossRegionCrtIntegrationTest.class);
57+
private static final String KEY = "key";
58+
private static final String ORIGINAL_OBJ = "test_file.dat";
59+
private static final String COPIED_OBJ = "test_file_copy.dat";
60+
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024, 16 * 1024 + 1);
61+
private static S3AsyncClient crtClient;
62+
private static File file;
63+
private static ExecutorService executorService;
64+
65+
@BeforeAll
66+
public static void setup() throws Exception {
67+
S3IntegrationTestBase.setUp();
68+
S3IntegrationTestBase.createBucket(BUCKET);
69+
crtClient = S3AsyncClient.crtBuilder()
70+
.region(CROSS_REGION)
71+
.crossRegionAccessEnabled(true)
72+
.credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN)
73+
.build();
74+
file = new RandomTempFile(10_000);
75+
S3IntegrationTestBase.s3.putObject(PutObjectRequest.builder()
76+
.bucket(BUCKET)
77+
.key(KEY)
78+
.build(), file.toPath());
79+
executorService = Executors.newFixedThreadPool(2);
80+
}
81+
82+
@AfterAll
83+
public static void cleanup() {
84+
crtClient.close();
85+
S3IntegrationTestBase.deleteBucketAndAllContents(BUCKET);
86+
executorService.shutdown();
87+
CrtResource.waitForNoResources();
88+
}
89+
90+
@Test
91+
void crossRegionClient_getObject() throws IOException {
92+
byte[] bytes =
93+
crtClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join().asByteArray();
94+
assertThat(bytes).isEqualTo(Files.readAllBytes(file.toPath()));
95+
}
96+
97+
@Test
98+
void putObjectNoSuchBucket() {
99+
assertThatThrownBy(() -> crtClient.getObject(GetObjectRequest.builder().bucket("nonExistingTestBucket" + UUID.randomUUID()).key(KEY).build(),
100+
AsyncResponseTransformer.toBytes()).get())
101+
.hasCauseInstanceOf(S3Exception.class)
102+
.satisfies(throwable -> assertThat(throwable.getCause()).satisfies(cause -> assertThat(((S3Exception) cause).statusCode()).isEqualTo(404)));
103+
}
104+
105+
@Test
106+
void copy_copiedObject_hasSameContent() {
107+
byte[] originalContent = randomBytes(OBJ_SIZE);
108+
createOriginalObject(originalContent, ORIGINAL_OBJ);
109+
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
110+
validateCopiedObject(originalContent, ORIGINAL_OBJ);
111+
}
112+
113+
private void copyObject(String original, String destination) {
114+
CompletableFuture<CopyObjectResponse> future = crtClient.copyObject(c -> c
115+
.sourceBucket(BUCKET)
116+
.sourceKey(original)
117+
.destinationBucket(BUCKET)
118+
.destinationKey(destination));
119+
120+
CopyObjectResponse copyObjectResponse = future.join();
121+
assertThat(copyObjectResponse.responseMetadata().requestId()).isNotNull();
122+
assertThat(copyObjectResponse.sdkHttpResponse()).isNotNull();
123+
}
124+
125+
@Test
126+
void putObject_byteBufferBody_objectSentCorrectly() {
127+
byte[] data = new byte[16384];
128+
new Random().nextBytes(data);
129+
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
130+
131+
AsyncRequestBody body = AsyncRequestBody.fromByteBuffer(byteBuffer);
132+
133+
crtClient.putObject(r -> r.bucket(BUCKET).key(KEY), body).join();
134+
135+
ResponseBytes<GetObjectResponse> responseBytes = S3IntegrationTestBase.s3.getObject(r -> r.bucket(BUCKET).key(KEY),
136+
ResponseTransformer.toBytes());
137+
138+
byte[] expectedSum = computeCheckSum(byteBuffer);
139+
140+
assertThat(computeCheckSum(responseBytes.asByteBuffer())).isEqualTo(expectedSum);
141+
}
142+
143+
private void validateCopiedObject(byte[] originalContent, String originalKey) {
144+
ResponseBytes<GetObjectResponse> copiedObject = s3.getObject(r -> r.bucket(BUCKET)
145+
.key(originalKey),
146+
ResponseTransformer.toBytes());
147+
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
148+
}
149+
150+
private void createOriginalObject(byte[] originalContent, String originalKey) {
151+
crtClient.putObject(r -> r.bucket(BUCKET)
152+
.key(originalKey),
153+
AsyncRequestBody.fromBytes(originalContent)).join();
154+
}
155+
156+
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private void validateCopiedObject(byte[] originalContent, String originalKey) {
166166
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
167167
}
168168

169-
private static byte[] randomBytes(long size) {
169+
public static byte[] randomBytes(long size) {
170170
byte[] bytes = new byte[Math.toIntExact(size)];
171171
ThreadLocalRandom.current().nextBytes(bytes);
172172
return bytes;

services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtGetObjectIntegrationTest.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,15 @@
2323
import java.nio.ByteBuffer;
2424
import java.nio.file.Files;
2525
import java.nio.file.Path;
26-
import java.util.concurrent.CompletableFuture;
2726
import java.util.concurrent.ExecutorService;
2827
import java.util.concurrent.Executors;
2928
import org.junit.jupiter.api.AfterAll;
3029
import org.junit.jupiter.api.BeforeAll;
3130
import org.junit.jupiter.api.Test;
3231
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
33-
import software.amazon.awssdk.core.async.SdkPublisher;
3432
import software.amazon.awssdk.crt.CrtResource;
35-
import software.amazon.awssdk.http.async.SimpleSubscriber;
3633
import software.amazon.awssdk.services.s3.S3AsyncClient;
3734
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
38-
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
3935
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
4036
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4137
import software.amazon.awssdk.testutils.RandomTempFile;
@@ -97,35 +93,4 @@ void getObject_customResponseTransformer() {
9793

9894
}
9995

100-
private static final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
101-
private CompletableFuture<Void> future;
102-
103-
@Override
104-
public CompletableFuture<Void> prepare() {
105-
future = new CompletableFuture<>();
106-
return future;
107-
}
108-
109-
@Override
110-
public void onResponse(GetObjectResponse response) {
111-
assertThat(response).isNotNull();
112-
}
113-
114-
@Override
115-
public void onStream(SdkPublisher<ByteBuffer> publisher) {
116-
publisher.subscribe(new SimpleSubscriber(b -> {
117-
}) {
118-
@Override
119-
public void onComplete() {
120-
super.onComplete();
121-
future.complete(null);
122-
}
123-
});
124-
}
125-
126-
@Override
127-
public void exceptionOccurred(Throwable error) {
128-
future.completeExceptionally(error);
129-
}
130-
}
13196
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.concurrent.CompletableFuture;
22+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
23+
import software.amazon.awssdk.core.async.SdkPublisher;
24+
import software.amazon.awssdk.http.async.SimpleSubscriber;
25+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
26+
27+
public final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
28+
private CompletableFuture<Void> future;
29+
30+
@Override
31+
public CompletableFuture<Void> prepare() {
32+
future = new CompletableFuture<>();
33+
return future;
34+
}
35+
36+
@Override
37+
public void onResponse(GetObjectResponse response) {
38+
assertThat(response).isNotNull();
39+
}
40+
41+
@Override
42+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
43+
publisher.subscribe(new SimpleSubscriber(b -> {
44+
}) {
45+
@Override
46+
public void onComplete() {
47+
super.onComplete();
48+
future.complete(null);
49+
}
50+
});
51+
}
52+
53+
@Override
54+
public void exceptionOccurred(Throwable error) {
55+
future.completeExceptionally(error);
56+
}
57+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,27 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
212212
.build());
213213
}
214214

215-
216-
217-
215+
/**
216+
* <p> Configures whether cross-region bucket access is enabled for clients using the configuration.
217+
* <p>The following behavior is used when this mode is enabled:
218+
* <ol>
219+
* <li>This method allows enabling or disabling cross-region bucket access for clients. When cross-region bucket
220+
* access is enabled, requests that do not act on an existing bucket (e.g., createBucket API) will be routed to the
221+
* region configured on the client</li>
222+
* <li>The first time a request is made that references an existing bucket (e.g., putObject API), a request will be
223+
* made to the client-configured region. If the bucket does not exist in this region, the service will include the
224+
* actual region in the error responses. Subsequently, the API will be called using the correct region obtained
225+
* from the error response. </li>
226+
* <li>This location may be cached in the client for subsequent requests to the same bucket.</li>
227+
* </ol>
228+
* <p>Enabling this mode has several drawbacks, as it can increase latency if the bucket's location is physically far
229+
* from the location of the request.Therefore, it is strongly advised, whenever possible, to know the location of your
230+
* buckets and create a region-specific client to access them
231+
*
232+
* @param crossRegionAccessEnabled Whether cross region bucket access should be enabled.
233+
* @return The builder object for method chaining.
234+
*/
235+
S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled);
218236

219237
@Override
220238
S3AsyncClient build();

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
1919
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
2020
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
21+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
2122
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;
2223

2324
import java.net.URI;
@@ -27,6 +28,7 @@
2728
import software.amazon.awssdk.annotations.SdkInternalApi;
2829
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2930
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
31+
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
3032
import software.amazon.awssdk.awscore.AwsRequest;
3133
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
3234
import software.amazon.awssdk.core.SdkRequest;
@@ -94,6 +96,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
9496
// Disable checksum, it is handled in CRT
9597
.serviceConfiguration(S3Configuration.builder()
9698
.checksumValidationEnabled(false)
99+
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
97100
.build())
98101
.region(builder.region)
99102
.endpointOverride(builder.endpointOverride)
@@ -149,6 +152,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
149152

150153
private List<ExecutionInterceptor> executionInterceptors;
151154
private S3CrtRetryConfiguration retryConfiguration;
155+
private boolean crossRegionAccessEnabled;
152156

153157
public AwsCredentialsProvider credentialsProvider() {
154158
return credentialsProvider;
@@ -178,6 +182,10 @@ public Long readBufferSizeInBytes() {
178182
return readBufferSizeInBytes;
179183
}
180184

185+
public boolean crossRegionAccessEnabled() {
186+
return crossRegionAccessEnabled;
187+
}
188+
181189
@Override
182190
public S3CrtAsyncClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
183191
this.credentialsProvider = credentialsProvider;
@@ -259,6 +267,12 @@ public S3CrtAsyncClientBuilder retryConfiguration(S3CrtRetryConfiguration retryC
259267
return this;
260268
}
261269

270+
@Override
271+
public S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled) {
272+
this.crossRegionAccessEnabled = crossRegionAccessEnabled;
273+
return this;
274+
}
275+
262276
@Override
263277
public S3CrtAsyncClient build() {
264278
return new DefaultS3CrtAsyncClient(this);
@@ -280,6 +294,7 @@ public void afterMarshalling(Context.AfterMarshalling context,
280294
builder.put(OPERATION_NAME,
281295
executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME))
282296
.put(HTTP_CHECKSUM, executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM))
297+
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
283298
.build();
284299

285300
// For putObject and getObject, we rely on CRT to perform checksum validation

0 commit comments

Comments
 (0)