|
15 | 15 |
|
16 | 16 | package software.amazon.awssdk.services.s3.internal.crossregion;
|
17 | 17 |
|
| 18 | +import java.util.Map; |
18 | 19 | import java.util.Optional;
|
19 |
| -import java.util.concurrent.CompletableFuture; |
| 20 | +import java.util.concurrent.ConcurrentHashMap; |
20 | 21 | import java.util.function.Function;
|
| 22 | +import java.util.function.Supplier; |
21 | 23 | import software.amazon.awssdk.annotations.SdkInternalApi;
|
22 | 24 | import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
|
23 |
| -import software.amazon.awssdk.endpoints.Endpoint; |
| 25 | +import software.amazon.awssdk.regions.Region; |
24 | 26 | import software.amazon.awssdk.services.s3.DelegatingS3Client;
|
25 | 27 | import software.amazon.awssdk.services.s3.S3Client;
|
26 |
| -import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams; |
27 | 28 | import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
|
| 29 | +import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider; |
| 30 | +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; |
| 31 | +import software.amazon.awssdk.services.s3.model.S3Exception; |
28 | 32 | import software.amazon.awssdk.services.s3.model.S3Request;
|
29 | 33 |
|
| 34 | +/** |
| 35 | + * Decorator S3 Sync client that will fetch the region name whenever there is Redirect 301 error due to cross region bucket |
| 36 | + * access. |
| 37 | + */ |
30 | 38 | @SdkInternalApi
|
31 | 39 | public final class S3CrossRegionSyncClient extends DelegatingS3Client {
|
| 40 | + |
| 41 | + private static final String AMZ_BUCKET_REGION_HEADER = "x-amz-bucket-region"; |
| 42 | + public static final int REDIRECT_STATUS_CODE = 301; |
| 43 | + private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>(); |
| 44 | + |
32 | 45 | public S3CrossRegionSyncClient(S3Client s3Client) {
|
33 | 46 | super(s3Client);
|
34 | 47 | }
|
35 | 48 |
|
| 49 | + private static <T extends S3Request> Optional<String> bucketNameFromRequest(T request) { |
| 50 | + return request.getValueForField("Bucket", String.class); |
| 51 | + } |
| 52 | + |
36 | 53 | @Override
|
37 | 54 | protected <T extends S3Request, ReturnT> ReturnT invokeOperation(T request, Function<T, ReturnT> operation) {
|
38 | 55 |
|
39 |
| - Optional<String> bucket = request.getValueForField("Bucket", String.class); |
40 |
| - |
41 |
| - if (bucket.isPresent()) { |
42 |
| - try { |
43 |
| - return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get())); |
44 |
| - } catch (Exception e) { |
45 |
| - handleOperationFailure(e, bucket.get()); |
| 56 | + Optional<String> bucketRequest = bucketNameFromRequest(request); |
| 57 | + if (!bucketRequest.isPresent()) { |
| 58 | + return operation.apply(request); |
| 59 | + } |
| 60 | + String bucketName = bucketRequest.get(); |
| 61 | + try { |
| 62 | + if (bucketToRegionCache.containsKey(bucketName)) { |
| 63 | + return operation.apply(requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName))); |
| 64 | + } |
| 65 | + return operation.apply(request); |
| 66 | + } catch (S3Exception exception) { |
| 67 | + if (exception.statusCode() == REDIRECT_STATUS_CODE) { |
| 68 | + updateCacheFromRedirectException(exception, bucketName); |
| 69 | + return operation.apply(requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName))); |
46 | 70 | }
|
| 71 | + throw exception; |
47 | 72 | }
|
| 73 | + } |
| 74 | + |
| 75 | + private String updateCacheFromRedirectException(S3Exception exception, String bucketName) { |
| 76 | + Optional<String> regionStr = getBucketRegionFromException(exception); |
| 77 | + // If redirected, clear previous values due to region change. bucketToRegionCache.remove(bucketName); |
| 78 | + regionStr.ifPresent(region -> bucketToRegionCache.put(bucketName, Region.of(region))); |
| 79 | + return regionStr.orElse(null); |
| 80 | + } |
48 | 81 |
|
49 |
| - return operation.apply(request); |
| 82 | + private Supplier<Region> regionSupplier(String bucket) { |
| 83 | + return () -> bucketToRegionCache.computeIfAbsent(bucket, this::fetchBucketRegion); |
50 | 84 | }
|
51 | 85 |
|
52 |
| - private void handleOperationFailure(Throwable t, String bucket) { |
53 |
| - //TODO: handle failure case |
| 86 | + private Region fetchBucketRegion(String bucketName) { |
| 87 | + try { |
| 88 | + ((S3Client) delegate()).headBucket(HeadBucketRequest.builder().bucket(bucketName).build()); |
| 89 | + } catch (S3Exception exception) { |
| 90 | + if (exception.statusCode() == REDIRECT_STATUS_CODE) { |
| 91 | + return Region.of(getBucketRegionFromException(exception).orElseThrow(() -> exception)); |
| 92 | + } |
| 93 | + throw exception; |
| 94 | + } |
| 95 | + return null; |
54 | 96 | }
|
55 | 97 |
|
56 | 98 | @SuppressWarnings("unchecked")
|
57 |
| - private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) { |
| 99 | + private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, Supplier<Region> regionSupplier) { |
58 | 100 | return (T) request.toBuilder()
|
59 |
| - .overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket)) |
| 101 | + .overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, regionSupplier)) |
60 | 102 | .build();
|
61 | 103 | }
|
62 | 104 |
|
63 |
| - //TODO: optimize shared sync/async code |
64 |
| - private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) { |
| 105 | + private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, |
| 106 | + Supplier<Region> regionSupplier) { |
65 | 107 | AwsRequestOverrideConfiguration requestOverrideConfig =
|
66 | 108 | request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());
|
67 | 109 |
|
68 | 110 | S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
|
69 | 111 | requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());
|
70 | 112 |
|
71 | 113 | return requestOverrideConfig.toBuilder()
|
72 |
| - .endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket)) |
| 114 | + .endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, regionSupplier)) |
73 | 115 | .build();
|
74 | 116 | }
|
75 | 117 |
|
76 |
| - static final class BucketEndpointProvider implements S3EndpointProvider { |
77 |
| - private final S3EndpointProvider delegate; |
78 |
| - private final String bucket; |
79 |
| - |
80 |
| - private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) { |
81 |
| - this.delegate = delegate; |
82 |
| - this.bucket = bucket; |
83 |
| - } |
84 |
| - |
85 |
| - public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) { |
86 |
| - return new BucketEndpointProvider(delegate, bucket); |
87 |
| - } |
88 |
| - |
89 |
| - @Override |
90 |
| - public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) { |
91 |
| - return delegate.resolveEndpoint(endpointParams); |
92 |
| - } |
| 118 | + private Optional<String> getBucketRegionFromException(S3Exception exception) { |
| 119 | + return exception.awsErrorDetails() |
| 120 | + .sdkHttpResponse() |
| 121 | + .firstMatchingHeader(AMZ_BUCKET_REGION_HEADER); |
93 | 122 | }
|
| 123 | + |
94 | 124 | }
|
0 commit comments