-
Notifications
You must be signed in to change notification settings - Fork 916
S3CrossRegion Sync and Async Clients Redirect implementation #4089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
3ab5cbf
233b93e
ec30955
02db0d9
72c74fa
9cc4bdf
95f389d
a712820
d1096c3
d086ece
12b080a
f99d959
d13ce08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,80 +15,95 @@ | |
|
||
package software.amazon.awssdk.services.s3.internal.crossregion; | ||
|
||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException; | ||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException; | ||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; | ||
import software.amazon.awssdk.endpoints.Endpoint; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider; | ||
import software.amazon.awssdk.services.s3.model.HeadBucketRequest; | ||
import software.amazon.awssdk.services.s3.model.S3Exception; | ||
import software.amazon.awssdk.services.s3.model.S3Request; | ||
import software.amazon.awssdk.utils.CompletableFutureUtils; | ||
import software.amazon.awssdk.utils.StringUtils; | ||
|
||
@SdkInternalApi | ||
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient { | ||
|
||
private final Map<String, CompletableFuture<Region>> bucketToRegionCache = new ConcurrentHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use LruCache to prevent this map from growing unbounded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LruCache cache doenot provide API to modify the Cache it just caches the content and provides get. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can update LruCache to support this? @cenedhryn WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems most use cases for Lru can be solved with a LinkedHashMap. (It may make sense to rewrite our LruCache on that model, but that's a different question) For instance:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks , Will make the capacity as 300 to remain consistent with V1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we cannot implement using ConcurrentHashMap since it internally just does a putAlll and doesnot actually use LinkedHashMap in above |
||
|
||
public S3CrossRegionAsyncClient(S3AsyncClient s3Client) { | ||
super(s3Client); | ||
} | ||
|
||
@Override | ||
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> | ||
invokeOperation(T request, Function<T, CompletableFuture<ReturnT>> operation) { | ||
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperation( | ||
T request, Function<T, CompletableFuture<ReturnT>> operation) { | ||
|
||
Optional<String> bucket = request.getValueForField("Bucket", String.class); | ||
|
||
if (!bucket.isPresent()) { | ||
return operation.apply(request); | ||
} | ||
String bucketName = bucket.get(); | ||
|
||
return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get())) | ||
.whenComplete((r, t) -> handleOperationFailure(t, bucket.get())); | ||
if (bucketToRegionCache.containsKey(bucketName)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the sync case, the try-catch contains both the call with cache hit and the cache miss, so the catch covers both cases. In this async instance, we will not catch a potential redirect as a result of calling with a cached region and we won't clear the cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line number 66 clears the cache. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant the code within the if-statement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch !! This can happen if the bucket is changed, thanks for catching this bug.
|
||
return operation.apply(requestWithDecoratedEndpointProvider(request, | ||
regionSupplier(bucketName), | ||
serviceClientConfiguration().endpointProvider().get())); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return operation.apply(request).thenApply(CompletableFuture::completedFuture) | ||
.exceptionally(exception -> { | ||
if (isS3RedirectException(exception.getCause())) { | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bucketToRegionCache.remove(bucketName); | ||
getBucketRegionFromException((S3Exception) exception.getCause()) | ||
.ifPresent( | ||
region -> bucketToRegionCache.put(bucketName, | ||
CompletableFuture.completedFuture(Region.of(region)))); | ||
return operation.apply( | ||
requestWithDecoratedEndpointProvider(request, | ||
regionSupplier(bucketName), | ||
serviceClientConfiguration().endpointProvider().get())); | ||
} | ||
return CompletableFutureUtils.failedFuture(exception); | ||
}).thenCompose(Function.identity()); | ||
} | ||
|
||
private void handleOperationFailure(Throwable t, String bucket) { | ||
//TODO: handle failure case | ||
} | ||
|
||
//Cannot avoid unchecked cast without upstream changes to supply builder function | ||
@SuppressWarnings("unchecked") | ||
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) { | ||
return (T) request.toBuilder() | ||
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket)) | ||
.build(); | ||
private Supplier<Region> regionSupplier(String bucket) { | ||
return () -> bucketToRegionCache.computeIfAbsent(bucket, this::regionCompletableFuture).join(); | ||
} | ||
|
||
//TODO: optimize shared sync/async code | ||
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) { | ||
AwsRequestOverrideConfiguration requestOverrideConfig = | ||
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build()); | ||
|
||
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider) | ||
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get()); | ||
|
||
return requestOverrideConfig.toBuilder() | ||
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket)) | ||
.build(); | ||
} | ||
|
||
//TODO: add cross region logic | ||
static final class BucketEndpointProvider implements S3EndpointProvider { | ||
private final S3EndpointProvider delegate; | ||
private final String bucket; | ||
|
||
private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) { | ||
this.delegate = delegate; | ||
this.bucket = bucket; | ||
} | ||
|
||
public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) { | ||
return new BucketEndpointProvider(delegate, bucket); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) { | ||
return delegate.resolveEndpoint(endpointParams); | ||
} | ||
private CompletableFuture<Region> regionCompletableFuture(String bucketName) { | ||
StringBuilder stringBuilder = new StringBuilder(); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return CompletableFuture.supplyAsync( | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
() -> ((S3AsyncClient) delegate()).headBucket(HeadBucketRequest.builder() | ||
.bucket(bucketName) | ||
.build()) | ||
.exceptionally(exception -> { | ||
if (isS3RedirectException(exception)) { | ||
getBucketRegionFromException( | ||
(S3Exception) exception).ifPresent( | ||
stringBuilder::append); | ||
} else { | ||
CompletableFutureUtils.failedFuture(exception); | ||
} | ||
return null; | ||
})) | ||
.thenApplyAsync(headResponse -> { | ||
headResponse.join(); | ||
if (headResponse != null && StringUtils.isNotBlank(stringBuilder.toString())) { | ||
return Region.of(stringBuilder.toString()); | ||
} | ||
return null; | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Supplier; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.endpoints.Endpoint; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider; | ||
|
||
/** | ||
* Decorator S3EndpointProvider which updates the region with the one that is supplied during its instantiation. | ||
*/ | ||
@SdkInternalApi | ||
public class BucketEndpointProvider implements S3EndpointProvider { | ||
private final S3EndpointProvider delegateEndPointProvider; | ||
private final Supplier<Region> regionSupplier; | ||
|
||
private BucketEndpointProvider(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) { | ||
this.delegateEndPointProvider = delegateEndPointProvider; | ||
this.regionSupplier = regionSupplier; | ||
} | ||
|
||
public static BucketEndpointProvider create(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) { | ||
return new BucketEndpointProvider(delegateEndPointProvider, regionSupplier); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) { | ||
Region crossRegion = regionSupplier.get(); | ||
return delegateEndPointProvider.resolveEndpoint( | ||
crossRegion != null ? endpointParams.copy(c -> c.region(crossRegion)) : endpointParams); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.s3.internal.crossregion.utils; | ||
|
||
|
||
import java.util.Optional; | ||
import java.util.function.Supplier; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; | ||
import software.amazon.awssdk.endpoints.EndpointProvider; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider; | ||
import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider; | ||
import software.amazon.awssdk.services.s3.model.S3Exception; | ||
import software.amazon.awssdk.services.s3.model.S3Request; | ||
|
||
@SdkInternalApi | ||
public final class CrossRegionUtils { | ||
public static final int REDIRECT_STATUS_CODE = 301; | ||
public static final String AMZ_BUCKET_REGION_HEADER = "x-amz-bucket-region"; | ||
|
||
private CrossRegionUtils() { | ||
} | ||
|
||
public static Optional<String> getBucketRegionFromException(S3Exception exception) { | ||
return exception.awsErrorDetails() | ||
.sdkHttpResponse() | ||
.firstMatchingHeader(AMZ_BUCKET_REGION_HEADER); | ||
} | ||
|
||
public static boolean isS3RedirectException(Throwable exception) { | ||
return exception instanceof S3Exception && ((S3Exception) exception).statusCode() == REDIRECT_STATUS_CODE; | ||
} | ||
|
||
|
||
public static <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, Supplier<Region> regionSupplier, | ||
EndpointProvider clientEndpointProvider) { | ||
AwsRequestOverrideConfiguration requestOverrideConfig = | ||
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build()); | ||
|
||
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider) requestOverrideConfig.endpointProvider() | ||
.orElse(clientEndpointProvider); | ||
return (T) request.toBuilder() | ||
.overrideConfiguration( | ||
requestOverrideConfig.toBuilder() | ||
.endpointProvider( | ||
BucketEndpointProvider.create(delegateEndpointProvider, regionSupplier)) | ||
.build()) | ||
.build(); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.