Skip to content

Commit 233b93e

Browse files
committed
Added implementation for Async client Decorator
1 parent 3ab5cbf commit 233b93e

File tree

10 files changed

+784
-273
lines changed

10 files changed

+784
-273
lines changed

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crossregion/S3CrossRegionAsyncClient.java

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,63 +15,95 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.crossregion;
1717

18+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
19+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException;
20+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;
21+
22+
import java.util.Map;
1823
import java.util.Optional;
1924
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
2026
import java.util.function.Function;
27+
import java.util.function.Supplier;
2128
import software.amazon.awssdk.annotations.SdkInternalApi;
22-
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
29+
import software.amazon.awssdk.regions.Region;
2330
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
2431
import software.amazon.awssdk.services.s3.S3AsyncClient;
25-
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
26-
import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider;
32+
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
33+
import software.amazon.awssdk.services.s3.model.S3Exception;
2734
import software.amazon.awssdk.services.s3.model.S3Request;
35+
import software.amazon.awssdk.utils.CompletableFutureUtils;
36+
import software.amazon.awssdk.utils.StringUtils;
2837

2938
@SdkInternalApi
3039
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient {
40+
41+
private final Map<String, CompletableFuture<Region>> bucketToRegionCache = new ConcurrentHashMap<>();
42+
3143
public S3CrossRegionAsyncClient(S3AsyncClient s3Client) {
3244
super(s3Client);
3345
}
3446

3547
@Override
36-
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT>
37-
invokeOperation(T request, Function<T, CompletableFuture<ReturnT>> operation) {
48+
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperation(
49+
T request, Function<T, CompletableFuture<ReturnT>> operation) {
3850

3951
Optional<String> bucket = request.getValueForField("Bucket", String.class);
4052

4153
if (!bucket.isPresent()) {
4254
return operation.apply(request);
4355
}
56+
String bucketName = bucket.get();
4457

45-
return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get()))
46-
.whenComplete((r, t) -> handleOperationFailure(t, bucket.get()));
58+
if (bucketToRegionCache.containsKey(bucketName)) {
59+
return operation.apply(requestWithDecoratedEndpointProvider(request,
60+
regionSupplier(bucketName),
61+
serviceClientConfiguration().endpointProvider().get()));
62+
}
63+
return operation.apply(request).thenApply(CompletableFuture::completedFuture)
64+
.exceptionally(exception -> {
65+
if (isS3RedirectException(exception.getCause())) {
66+
bucketToRegionCache.remove(bucketName);
67+
getBucketRegionFromException((S3Exception) exception.getCause())
68+
.ifPresent(
69+
region -> bucketToRegionCache.put(bucketName,
70+
CompletableFuture.completedFuture(Region.of(region))));
71+
return operation.apply(
72+
requestWithDecoratedEndpointProvider(request,
73+
regionSupplier(bucketName),
74+
serviceClientConfiguration().endpointProvider().get()));
75+
}
76+
return CompletableFutureUtils.failedFuture(exception);
77+
}).thenCompose(Function.identity());
4778
}
4879

49-
private void handleOperationFailure(Throwable t, String bucket) {
50-
//TODO: handle failure case
51-
}
5280

53-
//Cannot avoid unchecked cast without upstream changes to supply builder function
54-
@SuppressWarnings("unchecked")
55-
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) {
56-
return (T) request.toBuilder()
57-
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket))
58-
.build();
81+
private Supplier<Region> regionSupplier(String bucket) {
82+
return () -> bucketToRegionCache.computeIfAbsent(bucket, this::regionCompletableFuture).join();
5983
}
6084

61-
//TODO: optimize shared sync/async code
62-
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) {
63-
AwsRequestOverrideConfiguration requestOverrideConfig =
64-
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());
65-
66-
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
67-
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());
68-
69-
// TODO : separate PR to provide supplier for Async client
70-
return requestOverrideConfig.toBuilder()
71-
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, null))
72-
.build();
85+
private CompletableFuture<Region> regionCompletableFuture(String bucketName) {
86+
StringBuilder stringBuilder = new StringBuilder();
87+
return CompletableFuture.supplyAsync(
88+
() -> ((S3AsyncClient) delegate()).headBucket(HeadBucketRequest.builder()
89+
.bucket(bucketName)
90+
.build())
91+
.exceptionally(exception -> {
92+
if (isS3RedirectException(exception)) {
93+
getBucketRegionFromException(
94+
(S3Exception) exception).ifPresent(
95+
stringBuilder::append);
96+
} else {
97+
CompletableFutureUtils.failedFuture(exception);
98+
}
99+
return null;
100+
}))
101+
.thenApplyAsync(h -> {
102+
h.join();
103+
if (h != null && StringUtils.isNotBlank(stringBuilder.toString())) {
104+
return Region.of(stringBuilder.toString());
105+
}
106+
return null;
107+
});
73108
}
74-
75-
//TODO: add cross region logic
76-
77109
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crossregion/S3CrossRegionSyncClient.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,19 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.crossregion;
1717

18+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.REDIRECT_STATUS_CODE;
19+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
20+
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;
21+
1822
import java.util.Map;
1923
import java.util.Optional;
2024
import java.util.concurrent.ConcurrentHashMap;
2125
import java.util.function.Function;
2226
import java.util.function.Supplier;
2327
import software.amazon.awssdk.annotations.SdkInternalApi;
24-
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2528
import software.amazon.awssdk.regions.Region;
2629
import software.amazon.awssdk.services.s3.DelegatingS3Client;
2730
import software.amazon.awssdk.services.s3.S3Client;
28-
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
29-
import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider;
3031
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
3132
import software.amazon.awssdk.services.s3.model.S3Exception;
3233
import software.amazon.awssdk.services.s3.model.S3Request;
@@ -38,8 +39,6 @@
3839
@SdkInternalApi
3940
public final class S3CrossRegionSyncClient extends DelegatingS3Client {
4041

41-
private static final String AMZ_BUCKET_REGION_HEADER = "x-amz-bucket-region";
42-
public static final int REDIRECT_STATUS_CODE = 301;
4342
private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>();
4443

4544
public S3CrossRegionSyncClient(S3Client s3Client) {
@@ -60,23 +59,28 @@ protected <T extends S3Request, ReturnT> ReturnT invokeOperation(T request, Func
6059
String bucketName = bucketRequest.get();
6160
try {
6261
if (bucketToRegionCache.containsKey(bucketName)) {
63-
return operation.apply(requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName)));
62+
return operation.apply(
63+
requestWithDecoratedEndpointProvider(request,
64+
() -> bucketToRegionCache.get(bucketName),
65+
serviceClientConfiguration().endpointProvider().get()));
6466
}
6567
return operation.apply(request);
6668
} catch (S3Exception exception) {
6769
if (exception.statusCode() == REDIRECT_STATUS_CODE) {
6870
updateCacheFromRedirectException(exception, bucketName);
69-
return operation.apply(requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName)));
71+
return operation.apply(
72+
requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName),
73+
serviceClientConfiguration().endpointProvider().get()));
7074
}
7175
throw exception;
7276
}
7377
}
7478

75-
private String updateCacheFromRedirectException(S3Exception exception, String bucketName) {
79+
private void updateCacheFromRedirectException(S3Exception exception, String bucketName) {
7680
Optional<String> regionStr = getBucketRegionFromException(exception);
77-
// If redirected, clear previous values due to region change. bucketToRegionCache.remove(bucketName);
81+
// If redirected, clear previous values due to region change.
82+
bucketToRegionCache.remove(bucketName);
7883
regionStr.ifPresent(region -> bucketToRegionCache.put(bucketName, Region.of(region)));
79-
return regionStr.orElse(null);
8084
}
8185

8286
private Supplier<Region> regionSupplier(String bucket) {
@@ -95,30 +99,5 @@ private Region fetchBucketRegion(String bucketName) {
9599
return null;
96100
}
97101

98-
@SuppressWarnings("unchecked")
99-
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, Supplier<Region> regionSupplier) {
100-
return (T) request.toBuilder()
101-
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, regionSupplier))
102-
.build();
103-
}
104-
105-
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request,
106-
Supplier<Region> regionSupplier) {
107-
AwsRequestOverrideConfiguration requestOverrideConfig =
108-
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());
109-
110-
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
111-
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());
112-
113-
return requestOverrideConfig.toBuilder()
114-
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, regionSupplier))
115-
.build();
116-
}
117-
118-
private Optional<String> getBucketRegionFromException(S3Exception exception) {
119-
return exception.awsErrorDetails()
120-
.sdkHttpResponse()
121-
.firstMatchingHeader(AMZ_BUCKET_REGION_HEADER);
122-
}
123102

124103
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crossregion/endpointprovider/BucketEndpointProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static BucketEndpointProvider create(S3EndpointProvider delegateEndPointP
4444
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
4545
Region crossRegion = regionSupplier.get();
4646
return delegateEndPointProvider.resolveEndpoint(
47-
crossRegion != null ? endpointParams.toBuilder().region(crossRegion).build() : endpointParams);
47+
crossRegion != null ? endpointParams.copy(c -> c.region(crossRegion)) : endpointParams);
4848
}
4949
}
5050

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crossregion.utils;
17+
18+
19+
import java.util.Optional;
20+
import java.util.function.Supplier;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
23+
import software.amazon.awssdk.endpoints.EndpointProvider;
24+
import software.amazon.awssdk.regions.Region;
25+
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
26+
import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider;
27+
import software.amazon.awssdk.services.s3.model.S3Exception;
28+
import software.amazon.awssdk.services.s3.model.S3Request;
29+
30+
@SdkInternalApi
31+
public final class CrossRegionUtils {
32+
public static final int REDIRECT_STATUS_CODE = 301;
33+
private static final String AMZ_BUCKET_REGION_HEADER = "x-amz-bucket-region";
34+
35+
private CrossRegionUtils() {
36+
}
37+
38+
public static Optional<String> getBucketRegionFromException(S3Exception exception) {
39+
return exception.awsErrorDetails()
40+
.sdkHttpResponse()
41+
.firstMatchingHeader(AMZ_BUCKET_REGION_HEADER);
42+
}
43+
44+
public static boolean isS3RedirectException(Throwable exception) {
45+
return exception instanceof S3Exception && ((S3Exception) exception).statusCode() == REDIRECT_STATUS_CODE;
46+
}
47+
48+
49+
public static <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, Supplier<Region> regionSupplier,
50+
EndpointProvider clientEndpointProvider) {
51+
AwsRequestOverrideConfiguration requestOverrideConfig =
52+
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());
53+
54+
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider) requestOverrideConfig.endpointProvider()
55+
.orElse(clientEndpointProvider);
56+
return (T) request.toBuilder()
57+
.overrideConfiguration(
58+
requestOverrideConfig.toBuilder()
59+
.endpointProvider(
60+
BucketEndpointProvider.create(delegateEndpointProvider, regionSupplier))
61+
.build())
62+
.build();
63+
}
64+
}

0 commit comments

Comments
 (0)