Skip to content

Commit 2c93fcf

Browse files
committed
Cross region support for CRT Client
1 parent 5d57416 commit 2c93fcf

File tree

9 files changed

+290
-5
lines changed

9 files changed

+290
-5
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
<rxjava.version>2.2.21</rxjava.version>
115115
<commons-codec.verion>1.15</commons-codec.verion>
116116
<jmh.version>1.29</jmh.version>
117-
<awscrt.version>0.21.12</awscrt.version>
117+
<awscrt.version>0.22.2</awscrt.version>
118118

119119
<!--Test dependencies -->
120120
<junit5.version>5.8.1</junit5.version>
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static software.amazon.awssdk.services.s3.crt.S3CrtClientCopyIntegrationTest.randomBytes;
21+
import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.computeCheckSum;
22+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
23+
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.file.Files;
28+
import java.util.Random;
29+
import java.util.UUID;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import org.assertj.core.api.Assertions;
35+
import org.junit.jupiter.api.AfterAll;
36+
import org.junit.jupiter.api.BeforeAll;
37+
import org.junit.jupiter.api.Test;
38+
import software.amazon.awssdk.core.ResponseBytes;
39+
import software.amazon.awssdk.core.async.AsyncRequestBody;
40+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41+
import software.amazon.awssdk.core.sync.ResponseTransformer;
42+
import software.amazon.awssdk.crt.CrtResource;
43+
import software.amazon.awssdk.regions.Region;
44+
import software.amazon.awssdk.services.s3.S3AsyncClient;
45+
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
46+
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
47+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
48+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
49+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
50+
import software.amazon.awssdk.services.s3.model.S3Exception;
51+
import software.amazon.awssdk.testutils.RandomTempFile;
52+
import software.amazon.awssdk.testutils.service.AwsTestBase;
53+
54+
public class S3CrossRegionCrtIntegrationTest extends S3IntegrationTestBase {
55+
private static final String BUCKET = temporaryBucketName(S3CrossRegionCrtIntegrationTest.class);
56+
private static final String KEY = "key";
57+
public static final Region CROSS_REGION = Region.EU_CENTRAL_1;
58+
private static S3AsyncClient crtClient;
59+
private static File file;
60+
private static ExecutorService executorService;
61+
private static final String ORIGINAL_OBJ = "test_file.dat";
62+
private static final String COPIED_OBJ = "test_file_copy.dat";
63+
64+
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024, 16 * 1024 + 1);
65+
66+
67+
@BeforeAll
68+
public static void setup() throws Exception {
69+
S3IntegrationTestBase.setUp();
70+
S3IntegrationTestBase.createBucket(BUCKET);
71+
crtClient = S3AsyncClient.crtBuilder()
72+
.region(CROSS_REGION)
73+
.crossRegionAccessEnabled(true)
74+
.credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN)
75+
.build();
76+
file = new RandomTempFile(10_000);
77+
S3IntegrationTestBase.s3.putObject(PutObjectRequest.builder()
78+
.bucket(BUCKET)
79+
.key(KEY)
80+
.build(), file.toPath());
81+
executorService = Executors.newFixedThreadPool(2);
82+
}
83+
84+
@AfterAll
85+
public static void cleanup() {
86+
crtClient.close();
87+
S3IntegrationTestBase.deleteBucketAndAllContents(BUCKET);
88+
executorService.shutdown();
89+
CrtResource.waitForNoResources();
90+
}
91+
92+
@Test
93+
void crossRegionClient_getObject() throws IOException {
94+
byte[] bytes =
95+
crtClient.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBytes()).join().asByteArray();
96+
assertThat(bytes).isEqualTo(Files.readAllBytes(file.toPath()));
97+
}
98+
99+
@Test
100+
void putObjectNoSuchBucket() {
101+
assertThatThrownBy(() -> crtClient.getObject(GetObjectRequest.builder().bucket("nonExistingTestBucket" + UUID.randomUUID()).key(KEY).build(),
102+
AsyncResponseTransformer.toBytes()).get())
103+
.hasCauseInstanceOf(S3Exception.class)
104+
.satisfies(throwable -> assertThat(throwable.getCause()).satisfies(cause -> assertThat(((S3Exception) cause).statusCode()).isEqualTo(404)));
105+
}
106+
107+
@Test
108+
void copy_copiedObject_hasSameContent() {
109+
byte[] originalContent = randomBytes(OBJ_SIZE);
110+
createOriginalObject(originalContent, ORIGINAL_OBJ);
111+
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
112+
validateCopiedObject(originalContent, ORIGINAL_OBJ);
113+
}
114+
115+
private void copyObject(String original, String destination) {
116+
CompletableFuture<CopyObjectResponse> future = crtClient.copyObject(c -> c
117+
.sourceBucket(BUCKET)
118+
.sourceKey(original)
119+
.destinationBucket(BUCKET)
120+
.destinationKey(destination));
121+
122+
CopyObjectResponse copyObjectResponse = future.join();
123+
assertThat(copyObjectResponse.responseMetadata().requestId()).isNotNull();
124+
assertThat(copyObjectResponse.sdkHttpResponse()).isNotNull();
125+
}
126+
127+
@Test
128+
void putObject_byteBufferBody_objectSentCorrectly() {
129+
byte[] data = new byte[16384];
130+
new Random().nextBytes(data);
131+
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
132+
133+
AsyncRequestBody body = AsyncRequestBody.fromByteBuffer(byteBuffer);
134+
135+
crtClient.putObject(r -> r.bucket(BUCKET).key(KEY), body).join();
136+
137+
ResponseBytes<GetObjectResponse> responseBytes = S3IntegrationTestBase.s3.getObject(r -> r.bucket(BUCKET).key(KEY),
138+
ResponseTransformer.toBytes());
139+
140+
byte[] expectedSum = computeCheckSum(byteBuffer);
141+
142+
Assertions.assertThat(computeCheckSum(responseBytes.asByteBuffer())).isEqualTo(expectedSum);
143+
}
144+
145+
private void validateCopiedObject(byte[] originalContent, String originalKey) {
146+
ResponseBytes<GetObjectResponse> copiedObject = s3.getObject(r -> r.bucket(BUCKET)
147+
.key(originalKey),
148+
ResponseTransformer.toBytes());
149+
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
150+
}
151+
152+
private void createOriginalObject(byte[] originalContent, String originalKey) {
153+
crtClient.putObject(r -> r.bucket(BUCKET)
154+
.key(originalKey),
155+
AsyncRequestBody.fromBytes(originalContent)).join();
156+
}
157+
158+
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private void validateCopiedObject(byte[] originalContent, String originalKey) {
166166
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
167167
}
168168

169-
private static byte[] randomBytes(long size) {
169+
public static byte[] randomBytes(long size) {
170170
byte[] bytes = new byte[Math.toIntExact(size)];
171171
ThreadLocalRandom.current().nextBytes(bytes);
172172
return bytes;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.concurrent.CompletableFuture;
22+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
23+
import software.amazon.awssdk.core.async.SdkPublisher;
24+
import software.amazon.awssdk.http.async.SimpleSubscriber;
25+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
26+
27+
public final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
28+
private CompletableFuture<Void> future;
29+
30+
@Override
31+
public CompletableFuture<Void> prepare() {
32+
future = new CompletableFuture<>();
33+
return future;
34+
}
35+
36+
@Override
37+
public void onResponse(GetObjectResponse response) {
38+
assertThat(response).isNotNull();
39+
}
40+
41+
@Override
42+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
43+
publisher.subscribe(new SimpleSubscriber(b -> {
44+
}) {
45+
@Override
46+
public void onComplete() {
47+
super.onComplete();
48+
future.complete(null);
49+
}
50+
});
51+
}
52+
53+
@Override
54+
public void exceptionOccurred(Throwable error) {
55+
future.completeExceptionally(error);
56+
}
57+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/S3CrtAsyncClientBuilder.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,27 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
212212
.build());
213213
}
214214

215-
216-
217-
215+
/**
216+
* <p> Configures whether cross-region bucket access is enabled for clients using the configuration.
217+
* <p>The following behavior is used when this mode is enabled:
218+
* <ol>
219+
* <li>This method allows enabling or disabling cross-region bucket access for clients. When cross-region bucket
220+
* access is enabled, requests that do not act on an existing bucket (e.g., createBucket API) will be routed to the
221+
* region configured on the client</li>
222+
* <li>The first time a request is made that references an existing bucket (e.g., putObject API), a request will be
223+
* made to the client-configured region. If the bucket does not exist in this region, the service will include the
224+
* actual region in the error responses. Subsequently, the API will be called using the correct region obtained
225+
* from the error response. </li>
226+
* <li>This location may be cached in the client for subsequent requests to the same bucket.</li>
227+
* </ol>
228+
* <p>Enabling this mode has several drawbacks, as it can increase latency if the bucket's location is physically far
229+
* from the location of the request.Therefore, it is strongly advised, whenever possible, to know the location of your
230+
* buckets and create a region-specific client to access them
231+
*
232+
* @param crossRegionAccessEnabled Whether cross region bucket access should be enabled.
233+
* @return The builder object for method chaining.
234+
*/
235+
S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled);
218236

219237
@Override
220238
S3AsyncClient build();

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
1919
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
2020
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
21+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
2122
import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES;
2223

2324
import java.net.URI;
@@ -27,6 +28,7 @@
2728
import software.amazon.awssdk.annotations.SdkInternalApi;
2829
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2930
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
31+
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
3032
import software.amazon.awssdk.awscore.AwsRequest;
3133
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
3234
import software.amazon.awssdk.core.SdkRequest;
@@ -94,6 +96,7 @@ private static S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder b
9496
// Disable checksum, it is handled in CRT
9597
.serviceConfiguration(S3Configuration.builder()
9698
.checksumValidationEnabled(false)
99+
.crossRegionAccessEnabled(builder.crossRegionAccessEnabled)
97100
.build())
98101
.region(builder.region)
99102
.endpointOverride(builder.endpointOverride)
@@ -149,6 +152,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
149152

150153
private List<ExecutionInterceptor> executionInterceptors;
151154
private S3CrtRetryConfiguration retryConfiguration;
155+
private boolean crossRegionAccessEnabled;
152156

153157
public AwsCredentialsProvider credentialsProvider() {
154158
return credentialsProvider;
@@ -178,6 +182,10 @@ public Long readBufferSizeInBytes() {
178182
return readBufferSizeInBytes;
179183
}
180184

185+
public boolean crossRegionAccessEnabled() {
186+
return crossRegionAccessEnabled;
187+
}
188+
181189
@Override
182190
public S3CrtAsyncClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
183191
this.credentialsProvider = credentialsProvider;
@@ -259,6 +267,12 @@ public S3CrtAsyncClientBuilder retryConfiguration(S3CrtRetryConfiguration retryC
259267
return this;
260268
}
261269

270+
@Override
271+
public S3CrtAsyncClientBuilder crossRegionAccessEnabled(boolean crossRegionAccessEnabled) {
272+
this.crossRegionAccessEnabled = crossRegionAccessEnabled;
273+
return this;
274+
}
275+
262276
@Override
263277
public S3CrtAsyncClient build() {
264278
return new DefaultS3CrtAsyncClient(this);
@@ -280,6 +294,7 @@ public void afterMarshalling(Context.AfterMarshalling context,
280294
builder.put(OPERATION_NAME,
281295
executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME))
282296
.put(HTTP_CHECKSUM, executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM))
297+
.put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION))
283298
.build();
284299

285300
// For putObject and getObject, we rely on CRT to perform checksum validation

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
2121
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2222
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
23+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
2324
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2425

2526
import java.net.URI;
@@ -31,6 +32,7 @@
3132
import software.amazon.awssdk.annotations.SdkInternalApi;
3233
import software.amazon.awssdk.annotations.SdkTestInternalApi;
3334
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
35+
import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig;
3436
import software.amazon.awssdk.crt.http.HttpHeader;
3537
import software.amazon.awssdk.crt.http.HttpRequest;
3638
import software.amazon.awssdk.crt.s3.ChecksumConfig;
@@ -43,6 +45,7 @@
4345
import software.amazon.awssdk.http.SdkHttpRequest;
4446
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
4547
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
48+
import software.amazon.awssdk.regions.Region;
4649
import software.amazon.awssdk.utils.AttributeMap;
4750
import software.amazon.awssdk.utils.Logger;
4851
import software.amazon.awssdk.utils.NumericUtils;
@@ -117,6 +120,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
117120

118121
HttpChecksum httpChecksum = asyncRequest.httpExecutionAttributes().getAttribute(HTTP_CHECKSUM);
119122
ResumeToken resumeToken = asyncRequest.httpExecutionAttributes().getAttribute(CRT_PAUSE_RESUME_TOKEN);
123+
Region signingRegion = asyncRequest.httpExecutionAttributes().getAttribute(SIGNING_REGION);
120124

121125
ChecksumConfig checksumConfig =
122126
checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled());
@@ -130,6 +134,13 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
130134
.withResponseHandler(responseHandler)
131135
.withResumeToken(resumeToken);
132136

137+
// Create a new SigningConfig object only if the signing region has changed from the previously configured region.
138+
if (signingRegion != null && !s3ClientOptions.getRegion().equals(signingRegion)) {
139+
requestOptions.withSigningConfig(
140+
AwsSigningConfig.getDefaultS3SigningConfig(signingRegion.id(),
141+
s3ClientOptions.getCredentialsProvider()));
142+
}
143+
133144
S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
134145
S3MetaRequestPauseObservable observable =
135146
asyncRequest.httpExecutionAttributes().getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
@@ -144,6 +155,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
144155
return executeFuture;
145156
}
146157

158+
147159
private static URI getEndpoint(URI uri) {
148160
return invokeSafely(() -> new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), null, null, null));
149161
}

0 commit comments

Comments
 (0)