-
Notifications
You must be signed in to change notification settings - Fork 917
S3CrossRegion Sync and Async Clients Redirect implementation #4089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
3ab5cbf
233b93e
ec30955
02db0d9
72c74fa
9cc4bdf
95f389d
a712820
d1096c3
d086ece
12b080a
f99d959
d13ce08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,80 +15,131 @@ | |
|
||
package software.amazon.awssdk.services.s3.internal.crossregion; | ||
|
||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException; | ||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException; | ||
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Function; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; | ||
import software.amazon.awssdk.endpoints.Endpoint; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider; | ||
import software.amazon.awssdk.services.s3.model.S3Exception; | ||
import software.amazon.awssdk.services.s3.model.S3Request; | ||
import software.amazon.awssdk.utils.CompletableFutureUtils; | ||
|
||
@SdkInternalApi | ||
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient { | ||
|
||
private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>(); | ||
|
||
public S3CrossRegionAsyncClient(S3AsyncClient s3Client) { | ||
super(s3Client); | ||
} | ||
|
||
@Override | ||
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> | ||
invokeOperation(T request, Function<T, CompletableFuture<ReturnT>> operation) { | ||
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperation( | ||
T request, Function<T, CompletableFuture<ReturnT>> operation) { | ||
|
||
Optional<String> bucket = request.getValueForField("Bucket", String.class); | ||
|
||
if (!bucket.isPresent()) { | ||
return operation.apply(request); | ||
} | ||
String bucketName = bucket.get(); | ||
|
||
return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get())) | ||
.whenComplete((r, t) -> handleOperationFailure(t, bucket.get())); | ||
} | ||
if (bucketToRegionCache.containsKey(bucketName)) { | ||
return operation.apply(requestWithDecoratedEndpointProvider(request, | ||
() -> bucketToRegionCache.get(bucketName), | ||
serviceClientConfiguration().endpointProvider().get())); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private void handleOperationFailure(Throwable t, String bucket) { | ||
//TODO: handle failure case | ||
CompletableFuture<ReturnT> returnFuture = new CompletableFuture<>(); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
operation.apply(request) | ||
.whenComplete((r, t) -> { | ||
if (t != null) { | ||
if (isS3RedirectException(t)) { | ||
bucketToRegionCache.remove(bucketName); | ||
requestWithCrossRegion(request, operation, bucketName, returnFuture, t); | ||
return; | ||
} | ||
returnFuture.completeExceptionally(t); | ||
return; | ||
} | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
returnFuture.complete(r); | ||
}); | ||
return returnFuture; | ||
} | ||
|
||
//Cannot avoid unchecked cast without upstream changes to supply builder function | ||
@SuppressWarnings("unchecked") | ||
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) { | ||
return (T) request.toBuilder() | ||
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket)) | ||
.build(); | ||
private <T extends S3Request, ReturnT> void requestWithCrossRegion(T request, | ||
Function<T, CompletableFuture<ReturnT>> operation, | ||
String bucketName, | ||
CompletableFuture<ReturnT> returnFuture, | ||
Throwable t) { | ||
|
||
Optional<String> bucketRegionFromException = getBucketRegionFromException((S3Exception) t.getCause()); | ||
if (bucketRegionFromException.isPresent()) { | ||
sendRequestWithRightRegion(request, operation, bucketName, returnFuture, | ||
bucketRegionFromException); | ||
} else { | ||
fetchRegionAndSendRequest(request, operation, bucketName, returnFuture); | ||
} | ||
} | ||
|
||
//TODO: optimize shared sync/async code | ||
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) { | ||
AwsRequestOverrideConfiguration requestOverrideConfig = | ||
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build()); | ||
|
||
S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider) | ||
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get()); | ||
|
||
return requestOverrideConfig.toBuilder() | ||
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket)) | ||
.build(); | ||
private <T extends S3Request, ReturnT> void fetchRegionAndSendRequest(T request, | ||
Function<T, CompletableFuture<ReturnT>> operation, | ||
String bucketName, | ||
CompletableFuture<ReturnT> returnFuture) { | ||
|
||
// // TODO: will fix the casts with separate PR | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done ...Was wondering if there is a tool/scan which suggest this automatically ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I usually notice because IntelliJ gets angry and makes lines yellow. |
||
((S3AsyncClient) delegate()).headBucket(b -> b.bucket(bucketName)).whenComplete((response, | ||
throwable) -> { | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (throwable != null) { | ||
if (isS3RedirectException(throwable)) { | ||
bucketToRegionCache.remove(bucketName); | ||
Optional<String> bucketRegion = getBucketRegionFromException((S3Exception) throwable.getCause()); | ||
|
||
if (bucketRegion.isPresent()) { | ||
bucketToRegionCache.put(bucketName, Region.of(bucketRegion.get())); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sendRequestWithRightRegion(request, operation, bucketName, returnFuture, bucketRegion); | ||
} else { | ||
returnFuture.completeExceptionally(throwable); | ||
} | ||
} else { | ||
returnFuture.completeExceptionally(throwable); | ||
} | ||
} else { | ||
CompletableFuture<ReturnT> newFuture = operation.apply(request); | ||
CompletableFutureUtils.forwardResultTo(newFuture, returnFuture); | ||
CompletableFutureUtils.forwardExceptionTo(returnFuture, newFuture); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}); | ||
} | ||
|
||
//TODO: add cross region logic | ||
static final class BucketEndpointProvider implements S3EndpointProvider { | ||
private final S3EndpointProvider delegate; | ||
private final String bucket; | ||
|
||
private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) { | ||
this.delegate = delegate; | ||
this.bucket = bucket; | ||
} | ||
|
||
public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) { | ||
return new BucketEndpointProvider(delegate, bucket); | ||
} | ||
private <T extends S3Request, ReturnT> void sendRequestWithRightRegion(T request, | ||
Function<T, CompletableFuture<ReturnT>> operation, | ||
String bucketName, | ||
CompletableFuture<ReturnT> returnFuture, | ||
Optional<String> bucketRegionFromException) { | ||
String region = bucketRegionFromException.get(); | ||
bucketToRegionCache.put(bucketName, Region.of(region)); | ||
doSendRequestWithRightRegion(request, operation, returnFuture, region); | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) { | ||
return delegate.resolveEndpoint(endpointParams); | ||
} | ||
private <T extends S3Request, ReturnT> void doSendRequestWithRightRegion(T request, | ||
Function<T, CompletableFuture<ReturnT>> operation, | ||
CompletableFuture<ReturnT> returnFuture, | ||
String region) { | ||
CompletableFuture<ReturnT> newFuture = operation.apply( | ||
requestWithDecoratedEndpointProvider(request, | ||
() -> Region.of(region), | ||
serviceClientConfiguration().endpointProvider().get())); | ||
CompletableFutureUtils.forwardResultTo(newFuture, returnFuture); | ||
// forward exception | ||
cenedhryn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
CompletableFutureUtils.forwardExceptionTo(returnFuture, newFuture); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Supplier; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.endpoints.Endpoint; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams; | ||
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider; | ||
|
||
/** | ||
* Decorator S3EndpointProvider which updates the region with the one that is supplied during its instantiation. | ||
*/ | ||
@SdkInternalApi | ||
public class BucketEndpointProvider implements S3EndpointProvider { | ||
private final S3EndpointProvider delegateEndPointProvider; | ||
private final Supplier<Region> regionSupplier; | ||
|
||
private BucketEndpointProvider(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) { | ||
this.delegateEndPointProvider = delegateEndPointProvider; | ||
this.regionSupplier = regionSupplier; | ||
} | ||
|
||
public static BucketEndpointProvider create(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) { | ||
return new BucketEndpointProvider(delegateEndPointProvider, regionSupplier); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) { | ||
Region crossRegion = regionSupplier.get(); | ||
return delegateEndPointProvider.resolveEndpoint( | ||
crossRegion != null ? endpointParams.copy(c -> c.region(crossRegion)) : endpointParams); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the sync case, the try-catch contains both the call with cache hit and the cache miss, so the catch covers both cases. In this async instance, we will not catch a potential redirect as a result of calling with a cached region and we won't clear the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line number 66 clears the cache.
Note that HeadBucket fall back logic comes into picture only when first fall back fails to get region and in this case we clear the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the code within the if-statement
if (bucketToRegionCache.containsKey(bucketName))
. No other code will execute below that. If the call torequestWithDecoratedEndpointProvider
gets a redirect, it will not be handled. So the sync and async code will have slightly different behavior in that edge case. Question is if it's likely to occur at all and therefore worth handling - i.e., you get a region from the cache but it's the wrong region. Probably not a huge risk.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch !! This can happen if the bucket is changed, thanks for catching this bug.
Will update the code and add a test case for below standalone API call scenarios