Skip to content

S3CrossRegion Sync and Async Clients Redirect implementation #4089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,95 @@

package software.amazon.awssdk.services.s3.internal.crossregion;

import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.isS3RedirectException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Request;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.StringUtils;

@SdkInternalApi
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient {

private final Map<String, CompletableFuture<Region>> bucketToRegionCache = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use LruCache to prevent this map from growing unbounded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LruCache cache doenot provide API to modify the Cache it just caches the content and provides get.
I will create a separate PR to make this bounded cache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can update LruCache to support this? @cenedhryn WDYT?

Copy link
Contributor

@cenedhryn cenedhryn Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems most use cases for Lru can be solved with a LinkedHashMap. (It may make sense to rewrite our LruCache on that model, but that's a different question)

For instance:

    private static final int CAPACITY = 100;

    private final Map<String, Region> bucketToRegionCache = Collections.synchronizedMap(
        new LinkedHashMap<String, Region>(CAPACITY) {
            protected boolean removeEldestEntry(Map.Entry eldest) {
                return size() > CAPACITY;
            }
        });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we cannot implement using ConcurrentHashMap since it internally just does a putAlll and doesnot actually use LinkedHashMap in above


public S3CrossRegionAsyncClient(S3AsyncClient s3Client) {
super(s3Client);
}

@Override
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT>
invokeOperation(T request, Function<T, CompletableFuture<ReturnT>> operation) {
protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperation(
T request, Function<T, CompletableFuture<ReturnT>> operation) {

Optional<String> bucket = request.getValueForField("Bucket", String.class);

if (!bucket.isPresent()) {
return operation.apply(request);
}
String bucketName = bucket.get();

return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get()))
.whenComplete((r, t) -> handleOperationFailure(t, bucket.get()));
if (bucketToRegionCache.containsKey(bucketName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sync case, the try-catch contains both the call with cache hit and the cache miss, so the catch covers both cases. In this async instance, we will not catch a potential redirect as a result of calling with a cached region and we won't clear the cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line number 66 clears the cache.
Note that HeadBucket fall back logic comes into picture only when first fall back fails to get region and in this case we clear the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the code within the if-statement if (bucketToRegionCache.containsKey(bucketName)). No other code will execute below that. If the call to requestWithDecoratedEndpointProvider gets a redirect, it will not be handled. So the sync and async code will have slightly different behavior in that edge case. Question is if it's likely to occur at all and therefore worth handling - i.e., you get a region from the cache but it's the wrong region. Probably not a huge risk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch !! This can happen if the bucket is changed, thanks for catching this bug.
Will update the code and add a test case for below standalone API call scenarios

  1. Redirect Cached and Success ==> 1st call
  2. Success ==> 2nd call
  3. Redirect and success ==> 3 rd Call

return operation.apply(requestWithDecoratedEndpointProvider(request,
regionSupplier(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}
return operation.apply(request).thenApply(CompletableFuture::completedFuture)
.exceptionally(exception -> {
if (isS3RedirectException(exception.getCause())) {
bucketToRegionCache.remove(bucketName);
getBucketRegionFromException((S3Exception) exception.getCause())
.ifPresent(
region -> bucketToRegionCache.put(bucketName,
CompletableFuture.completedFuture(Region.of(region))));
return operation.apply(
requestWithDecoratedEndpointProvider(request,
regionSupplier(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}
return CompletableFutureUtils.failedFuture(exception);
}).thenCompose(Function.identity());
}

private void handleOperationFailure(Throwable t, String bucket) {
//TODO: handle failure case
}

//Cannot avoid unchecked cast without upstream changes to supply builder function
@SuppressWarnings("unchecked")
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) {
return (T) request.toBuilder()
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket))
.build();
private Supplier<Region> regionSupplier(String bucket) {
return () -> bucketToRegionCache.computeIfAbsent(bucket, this::regionCompletableFuture).join();
}

//TODO: optimize shared sync/async code
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) {
AwsRequestOverrideConfiguration requestOverrideConfig =
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());

S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());

return requestOverrideConfig.toBuilder()
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket))
.build();
}

//TODO: add cross region logic
static final class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegate;
private final String bucket;

private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) {
this.delegate = delegate;
this.bucket = bucket;
}

public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) {
return new BucketEndpointProvider(delegate, bucket);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
return delegate.resolveEndpoint(endpointParams);
}
private CompletableFuture<Region> regionCompletableFuture(String bucketName) {
StringBuilder stringBuilder = new StringBuilder();
return CompletableFuture.supplyAsync(
() -> ((S3AsyncClient) delegate()).headBucket(HeadBucketRequest.builder()
.bucket(bucketName)
.build())
.exceptionally(exception -> {
if (isS3RedirectException(exception)) {
getBucketRegionFromException(
(S3Exception) exception).ifPresent(
stringBuilder::append);
} else {
CompletableFutureUtils.failedFuture(exception);
}
return null;
}))
.thenApplyAsync(headResponse -> {
headResponse.join();
if (headResponse != null && StringUtils.isNotBlank(stringBuilder.toString())) {
return Region.of(stringBuilder.toString());
}
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,89 @@

package software.amazon.awssdk.services.s3.internal.crossregion;

import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.REDIRECT_STATUS_CODE;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.getBucketRegionFromException;
import static software.amazon.awssdk.services.s3.internal.crossregion.utils.CrossRegionUtils.requestWithDecoratedEndpointProvider;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.DelegatingS3Client;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Request;

/**
* Decorator S3 Sync client that will fetch the region name whenever there is Redirect 301 error due to cross region bucket
* access.
*/
@SdkInternalApi
public final class S3CrossRegionSyncClient extends DelegatingS3Client {

private final Map<String, Region> bucketToRegionCache = new ConcurrentHashMap<>();

public S3CrossRegionSyncClient(S3Client s3Client) {
super(s3Client);
}

private static <T extends S3Request> Optional<String> bucketNameFromRequest(T request) {
return request.getValueForField("Bucket", String.class);
}

@Override
protected <T extends S3Request, ReturnT> ReturnT invokeOperation(T request, Function<T, ReturnT> operation) {

Optional<String> bucket = request.getValueForField("Bucket", String.class);

if (bucket.isPresent()) {
try {
return operation.apply(requestWithDecoratedEndpointProvider(request, bucket.get()));
} catch (Exception e) {
handleOperationFailure(e, bucket.get());
Optional<String> bucketRequest = bucketNameFromRequest(request);
if (!bucketRequest.isPresent()) {
return operation.apply(request);
}
String bucketName = bucketRequest.get();
try {
if (bucketToRegionCache.containsKey(bucketName)) {
return operation.apply(
requestWithDecoratedEndpointProvider(request,
() -> bucketToRegionCache.get(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}
return operation.apply(request);
} catch (S3Exception exception) {
if (exception.statusCode() == REDIRECT_STATUS_CODE) {
updateCacheFromRedirectException(exception, bucketName);
return operation.apply(
requestWithDecoratedEndpointProvider(request, regionSupplier(bucketName),
serviceClientConfiguration().endpointProvider().get()));
}
throw exception;
}

return operation.apply(request);
}

private void handleOperationFailure(Throwable t, String bucket) {
//TODO: handle failure case
private void updateCacheFromRedirectException(S3Exception exception, String bucketName) {
Optional<String> regionStr = getBucketRegionFromException(exception);
// If redirected, clear previous values due to region change.
bucketToRegionCache.remove(bucketName);
regionStr.ifPresent(region -> bucketToRegionCache.put(bucketName, Region.of(region)));
}

@SuppressWarnings("unchecked")
private <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, String bucket) {
return (T) request.toBuilder()
.overrideConfiguration(getOrCreateConfigWithEndpointProvider(request, bucket))
.build();
private Supplier<Region> regionSupplier(String bucket) {
return () -> bucketToRegionCache.computeIfAbsent(bucket, this::fetchBucketRegion);
}

//TODO: optimize shared sync/async code
private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider(S3Request request, String bucket) {
AwsRequestOverrideConfiguration requestOverrideConfig =
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());

S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider)
requestOverrideConfig.endpointProvider().orElseGet(() -> serviceClientConfiguration().endpointProvider().get());

return requestOverrideConfig.toBuilder()
.endpointProvider(BucketEndpointProvider.create(delegateEndpointProvider, bucket))
.build();
}

static final class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegate;
private final String bucket;

private BucketEndpointProvider(S3EndpointProvider delegate, String bucket) {
this.delegate = delegate;
this.bucket = bucket;
private Region fetchBucketRegion(String bucketName) {
try {
((S3Client) delegate()).headBucket(HeadBucketRequest.builder().bucket(bucketName).build());
} catch (S3Exception exception) {
if (exception.statusCode() == REDIRECT_STATUS_CODE) {
return Region.of(getBucketRegionFromException(exception).orElseThrow(() -> exception));
}
throw exception;
}
return null;
}

public static BucketEndpointProvider create(S3EndpointProvider delegate, String bucket) {
return new BucketEndpointProvider(delegate, bucket);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
return delegate.resolveEndpoint(endpointParams);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.endpoints.Endpoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointParams;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;

/**
* Decorator S3EndpointProvider which updates the region with the one that is supplied during its instantiation.
*/
@SdkInternalApi
public class BucketEndpointProvider implements S3EndpointProvider {
private final S3EndpointProvider delegateEndPointProvider;
private final Supplier<Region> regionSupplier;

private BucketEndpointProvider(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) {
this.delegateEndPointProvider = delegateEndPointProvider;
this.regionSupplier = regionSupplier;
}

public static BucketEndpointProvider create(S3EndpointProvider delegateEndPointProvider, Supplier<Region> regionSupplier) {
return new BucketEndpointProvider(delegateEndPointProvider, regionSupplier);
}

@Override
public CompletableFuture<Endpoint> resolveEndpoint(S3EndpointParams endpointParams) {
Region crossRegion = regionSupplier.get();
return delegateEndPointProvider.resolveEndpoint(
crossRegion != null ? endpointParams.copy(c -> c.region(crossRegion)) : endpointParams);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crossregion.utils;


import java.util.Optional;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.endpoints.EndpointProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.endpoints.S3EndpointProvider;
import software.amazon.awssdk.services.s3.internal.crossregion.endpointprovider.BucketEndpointProvider;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Request;

@SdkInternalApi
public final class CrossRegionUtils {
public static final int REDIRECT_STATUS_CODE = 301;
public static final String AMZ_BUCKET_REGION_HEADER = "x-amz-bucket-region";

private CrossRegionUtils() {
}

public static Optional<String> getBucketRegionFromException(S3Exception exception) {
return exception.awsErrorDetails()
.sdkHttpResponse()
.firstMatchingHeader(AMZ_BUCKET_REGION_HEADER);
}

public static boolean isS3RedirectException(Throwable exception) {
return exception instanceof S3Exception && ((S3Exception) exception).statusCode() == REDIRECT_STATUS_CODE;
}


public static <T extends S3Request> T requestWithDecoratedEndpointProvider(T request, Supplier<Region> regionSupplier,
EndpointProvider clientEndpointProvider) {
AwsRequestOverrideConfiguration requestOverrideConfig =
request.overrideConfiguration().orElseGet(() -> AwsRequestOverrideConfiguration.builder().build());

S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider) requestOverrideConfig.endpointProvider()
.orElse(clientEndpointProvider);
return (T) request.toBuilder()
.overrideConfiguration(
requestOverrideConfig.toBuilder()
.endpointProvider(
BucketEndpointProvider.create(delegateEndpointProvider, regionSupplier))
.build())
.build();
}
}
Loading