Skip to content

S3CrossRegion Sync and Async Clients Redirect implementation #4089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,131 @@

package software.amazon.awssdk.services.s3.internal.crossregion;

import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Request;
import software.amazon.awssdk.utils.CompletableFutureUtils;

@SdkInternalApi
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient {

private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>();

public S3CrossRegionAsyncClient(S3AsyncClient s3Client) {
super(s3Client);
}

@Override
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT>
invokeOperation(T request, Function<T, CompletableFuture<ReturnT>> operation) {
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperation(
T request, Function<T, CompletableFuture<ReturnT>> operation) {

Optional<String> bucket = request.getValueForField("Bucket", String.class);

if (!bucket.isPresent()) {
return operation.apply(request);
}
String bucketName = bucket.get();

return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get()))
.whenComplete((r, t) -> handleOperationFailure(t, bucket.get()));
}
if (bucketToRegionCache.containsKey(bucketName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sync case, the try-catch contains both the call with cache hit and the cache miss, so the catch covers both cases. In this async instance, we will not catch a potential redirect as a result of calling with a cached region and we won't clear the cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line number 66 clears the cache.
Note that HeadBucket fall back logic comes into picture only when first fall back fails to get region and in this case we clear the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the code within the if-statement if (bucketToRegionCache.containsKey(bucketName)). No other code will execute below that. If the call to requestWithDecoratedEndpointProvider gets a redirect, it will not be handled. So the sync and async code will have slightly different behavior in that edge case. Question is if it's likely to occur at all and therefore worth handling - i.e., you get a region from the cache but it's the wrong region. Probably not a huge risk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch !! This can happen if the bucket is changed, thanks for catching this bug.
Will update the code and add a test case for below standalone API call scenarios

  1. Redirect Cached and Success ==> 1st call
  2. Success ==> 2nd call
  3. Redirect and success ==> 3 rd Call

return operation.apply(requestWithDecoratedEndpointProvider(request,
() -> bucketToRegionCache.get(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}

private void handleOperationFailure(Throwable t, String bucket) {
//TODO: handle failure case
CompletableFuture<ReturnT> returnFuture = new CompletableFuture<>();
operation.apply(request)
.whenComplete((r, t) -> {
if (t != null) {
if (isS3RedirectException(t)) {
bucketToRegionCache.remove(bucketName);
requestWithCrossRegion(request, operation, bucketName, returnFuture, t);
return;
}
returnFuture.completeExceptionally(t);
return;
}
returnFuture.complete(r);
});
return returnFuture;
}

//Cannot avoid unchecked cast without upstream changes to supply builder function
@SuppressWarnings("unchecked")
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) {
return (T) request.toBuilder()
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket))
.build();
private <T extends S3Request, ReturnT> void requestWithCrossRegion(T request,
Function<T, CompletableFuture<ReturnT>> operation,
String bucketName,
CompletableFuture<ReturnT> returnFuture,
Throwable t) {

Optional<String> bucketRegionFromException = getBucketRegionFromException((S3Exception) t.getCause());
if (bucketRegionFromException.isPresent()) {
sendRequestWithRightRegion(request, operation, bucketName, returnFuture,
bucketRegionFromException);
} else {
fetchRegionAndSendRequest(request, operation, bucketName, returnFuture);
}
}

//TODO: optimize shared sync/async code
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) {
AwsRequestOverrideConfiguration requestOverrideConfig =
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());

S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());

return requestOverrideConfig.toBuilder()
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket))
.build();
private <T extends S3Request, ReturnT> void fetchRegionAndSendRequest(T request,
Function<T, CompletableFuture<ReturnT>> operation,
String bucketName,
CompletableFuture<ReturnT> returnFuture) {

// // TODO: will fix the casts with separate PR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a @SuppressWarnings("unchecked") statement to requestWithDecoratedEndpointProvider in CrossRegionUtils in that PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ...Was wondering if there is a tool/scan which suggest this automatically ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually notice because IntelliJ gets angry and makes lines yellow.

((S3AsyncClient) delegate()).headBucket(b -> b.bucket(bucketName)).whenComplete((response,
throwable) -> {
if (throwable != null) {
if (isS3RedirectException(throwable)) {
bucketToRegionCache.remove(bucketName);
Optional<String> bucketRegion = getBucketRegionFromException((S3Exception) throwable.getCause());

if (bucketRegion.isPresent()) {
bucketToRegionCache.put(bucketName, Region.of(bucketRegion.get()));
sendRequestWithRightRegion(request, operation, bucketName, returnFuture, bucketRegion);
} else {
returnFuture.completeExceptionally(throwable);
}
} else {
returnFuture.completeExceptionally(throwable);
}
} else {
CompletableFuture<ReturnT> newFuture = operation.apply(request);
CompletableFutureUtils.forwardResultTo(newFuture, returnFuture);
CompletableFutureUtils.forwardExceptionTo(returnFuture, newFuture);
}
});
}

//TODO: add cross region logic
static final class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegate;
private final String bucket;

private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) {
this.delegate = delegate;
this.bucket = bucket;
}

public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) {
return new BucketEndpointProvider(delegate, bucket);
}
private <T extends S3Request, ReturnT> void sendRequestWithRightRegion(T request,
Function<T, CompletableFuture<ReturnT>> operation,
String bucketName,
CompletableFuture<ReturnT> returnFuture,
Optional<String> bucketRegionFromException) {
String region = bucketRegionFromException.get();
bucketToRegionCache.put(bucketName, Region.of(region));
doSendRequestWithRightRegion(request, operation, returnFuture, region);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
return delegate.resolveEndpoint(endpointParams);
}
private <T extends S3Request, ReturnT> void doSendRequestWithRightRegion(T request,
Function<T, CompletableFuture<ReturnT>> operation,
CompletableFuture<ReturnT> returnFuture,
String region) {
CompletableFuture<ReturnT> newFuture = operation.apply(
requestWithDecoratedEndpointProvider(request,
() -> Region.of(region),
serviceClientConfiguration().endpointProvider().get()));
CompletableFutureUtils.forwardResultTo(newFuture, returnFuture);
// forward exception
CompletableFutureUtils.forwardExceptionTo(returnFuture, newFuture);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,86 @@

package software.amazon.awssdk.services.s3.internal.crossregion;

import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3Client;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Request;

/**
* Decorator S3 Sync client that will fetch the region name whenever there is Redirect 301 error due to cross region bucket
* access.
*/
@SdkInternalApi
public final class S3CrossRegionSyncClient extends DelegatingS3Client {

private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>();

public S3CrossRegionSyncClient(S3Client s3Client) {
super(s3Client);
}

private static <T extends S3Request> Optional<String> bucketNameFromRequest(T request) {
return request.getValueForField("Bucket", String.class);
}

@Override
protected <T extends S3Request, ReturnT> ReturnT invokeOperation(T request, Function<T, ReturnT> operation) {

Optional<String> bucket = request.getValueForField("Bucket", String.class);

if (bucket.isPresent()) {
try {
return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get()));
} catch (Exception e) {
handleOperationFailure(e, bucket.get());
Optional<String> bucketRequest = bucketNameFromRequest(request);
if (!bucketRequest.isPresent()) {
return operation.apply(request);
}
String bucketName = bucketRequest.get();
try {
if (bucketToRegionCache.containsKey(bucketName)) {
return operation.apply(
requestWithDecoratedEndpointProvider(request,
() -> bucketToRegionCache.get(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}
return operation.apply(request);
} catch (S3Exception exception) {
if (isS3RedirectException(exception)) {
updateCacheFromRedirectException(exception, bucketName);
return operation.apply(
requestWithDecoratedEndpointProvider(
request,
() -> bucketToRegionCache.computeIfAbsent(bucketName, this::fetchBucketRegion),
serviceClientConfiguration().endpointProvider().get()));
}
throw exception;
}

return operation.apply(request);
}

private void handleOperationFailure(Throwable t, String bucket) {
//TODO: handle failure case
}

@SuppressWarnings("unchecked")
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) {
return (T) request.toBuilder()
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket))
.build();
}

//TODO: optimize shared sync/async code
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) {
AwsRequestOverrideConfiguration requestOverrideConfig =
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());

S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());

return requestOverrideConfig.toBuilder()
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket))
.build();
private void updateCacheFromRedirectException(S3Exception exception, String bucketName) {
Optional<String> regionStr = getBucketRegionFromException(exception);
// If redirected, clear previous values due to region change.
bucketToRegionCache.remove(bucketName);
regionStr.ifPresent(region -> bucketToRegionCache.put(bucketName, Region.of(region)));
}

static final class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegate;
private final String bucket;

private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) {
this.delegate = delegate;
this.bucket = bucket;
private Region fetchBucketRegion(String bucketName) {
try {
((S3Client) delegate()).headBucket(HeadBucketRequest.builder().bucket(bucketName).build());
} catch (S3Exception exception) {
if (isS3RedirectException(exception)) {
return Region.of(getBucketRegionFromException(exception).orElseThrow(() -> exception));
}
throw exception;
}
return null;
}

public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) {
return new BucketEndpointProvider(delegate, bucket);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
return delegate.resolveEndpoint(endpointParams);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;

/**
* Decorator S3EndpointProvider which updates the region with the one that is supplied during its instantiation.
*/
@SdkInternalApi
public class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegateEndPointProvider;
private final Supplier<Region> regionSupplier;

private BucketEndpointProvider(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) {
this.delegateEndPointProvider = delegateEndPointProvider;
this.regionSupplier = regionSupplier;
}

public static BucketEndpointProvider create(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) {
return new BucketEndpointProvider(delegateEndPointProvider, regionSupplier);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
Region crossRegion = regionSupplier.get();
return delegateEndPointProvider.resolveEndpoint(
crossRegion != null ? endpointParams.copy(c -> c.region(crossRegion)) : endpointParams);
}
}

Loading