15
15
16
16
package software .amazon .awssdk .services .s3 .internal .crossregion ;
17
17
18
+ import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .getBucketRegionFromException ;
19
+ import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .isS3RedirectException ;
20
+ import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .requestWithDecoratedEndpointProvider ;
18
21
import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .updateUserAgentInConfig ;
19
22
23
+ import java .util .Map ;
20
24
import java .util .Optional ;
21
25
import java .util .concurrent .CompletableFuture ;
26
+ import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .function .BiConsumer ;
22
28
import java .util .function .Function ;
23
29
import software .amazon .awssdk .annotations .SdkInternalApi ;
24
30
import software .amazon .awssdk .awscore .AwsRequestOverrideConfiguration ;
25
- import software .amazon .awssdk .endpoints . Endpoint ;
31
+ import software .amazon .awssdk .regions . Region ;
26
32
import software .amazon .awssdk .services .s3 .DelegatingS3AsyncClient ;
27
33
import software .amazon .awssdk .services .s3 .S3AsyncClient ;
28
- import software .amazon .awssdk .services .s3 .endpoints .S3EndpointParams ;
29
- import software .amazon .awssdk .services .s3 .endpoints .S3EndpointProvider ;
34
+ import software .amazon .awssdk .services .s3 .model .S3Exception ;
30
35
import software .amazon .awssdk .services .s3 .model .S3Request ;
36
+ import software .amazon .awssdk .utils .CompletableFutureUtils ;
31
37
32
38
@ SdkInternalApi
33
39
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient {
40
+
41
+ private final Map <String , Region > bucketToRegionCache = new ConcurrentHashMap <>();
42
+
34
43
public S3CrossRegionAsyncClient (S3AsyncClient s3Client ) {
35
44
super (s3Client );
36
45
}
37
46
38
47
@ Override
39
- protected <T extends S3Request , ReturnT > CompletableFuture <ReturnT >
40
- invokeOperation ( T request , Function <T , CompletableFuture <ReturnT >> operation ) {
48
+ protected <T extends S3Request , ReturnT > CompletableFuture <ReturnT > invokeOperation (
49
+ T request , Function <T , CompletableFuture <ReturnT >> operation ) {
41
50
42
51
Optional <String > bucket = request .getValueForField ("Bucket" , String .class );
43
52
@@ -47,53 +56,94 @@ public S3CrossRegionAsyncClient(S3AsyncClient s3Client) {
47
56
if (!bucket .isPresent ()) {
48
57
return operation .apply (userAgentUpdatedRequest );
49
58
}
50
-
51
- return operation .apply (requestWithDecoratedEndpointProvider (userAgentUpdatedRequest , bucket .get ()))
52
- .whenComplete ((r , t ) -> handleOperationFailure (t , bucket .get ()));
59
+ String bucketName = bucket .get ();
60
+
61
+ CompletableFuture <ReturnT > returnFuture = new CompletableFuture <>();
62
+ CompletableFuture <ReturnT > apiOperationFuture = bucketToRegionCache .containsKey (bucketName ) ?
63
+ operation .apply (
64
+ requestWithDecoratedEndpointProvider (
65
+ userAgentUpdatedRequest ,
66
+ () -> bucketToRegionCache .get (bucketName ),
67
+ serviceClientConfiguration ().endpointProvider ().get ()
68
+ )
69
+ ) :
70
+ operation .apply (userAgentUpdatedRequest );
71
+
72
+ apiOperationFuture .whenComplete (redirectToCrossRegionIfRedirectException (operation ,
73
+ userAgentUpdatedRequest ,
74
+ bucketName ,
75
+ returnFuture ));
76
+ return returnFuture ;
53
77
}
54
78
55
- private void handleOperationFailure (Throwable t , String bucket ) {
56
- //TODO: handle failure case
79
+ private <T extends S3Request , ReturnT > BiConsumer <ReturnT , Throwable > redirectToCrossRegionIfRedirectException (
80
+ Function <T , CompletableFuture <ReturnT >> operation ,
81
+ T userAgentUpdatedRequest , String bucketName ,
82
+ CompletableFuture <ReturnT > returnFuture ) {
83
+
84
+ return (response , throwable ) -> {
85
+ if (throwable != null ) {
86
+ if (isS3RedirectException (throwable )) {
87
+ bucketToRegionCache .remove (bucketName );
88
+ requestWithCrossRegion (userAgentUpdatedRequest , operation , bucketName , returnFuture , throwable );
89
+ } else {
90
+ returnFuture .completeExceptionally (throwable );
91
+ }
92
+ } else {
93
+ returnFuture .complete (response );
94
+ }
95
+ };
57
96
}
58
97
59
- //Cannot avoid unchecked cast without upstream changes to supply builder function
60
- @ SuppressWarnings ("unchecked" )
61
- private <T extends S3Request > T requestWithDecoratedEndpointProvider (T request , String bucket ) {
62
- return (T ) request .toBuilder ()
63
- .overrideConfiguration (getOrCreateConfigWithEndpointProvider (request , bucket ))
64
- .build ();
98
+ private <T extends S3Request , ReturnT > void requestWithCrossRegion (T request ,
99
+ Function <T , CompletableFuture <ReturnT >> operation ,
100
+ String bucketName ,
101
+ CompletableFuture <ReturnT > returnFuture ,
102
+ Throwable throwable ) {
103
+
104
+ Optional <String > bucketRegionFromException = getBucketRegionFromException ((S3Exception ) throwable .getCause ());
105
+ if (bucketRegionFromException .isPresent ()) {
106
+ sendRequestWithRightRegion (request , operation , bucketName , returnFuture , bucketRegionFromException );
107
+ } else {
108
+ fetchRegionAndSendRequest (request , operation , bucketName , returnFuture );
109
+ }
65
110
}
66
111
67
- //TODO: optimize shared sync/async code
68
- private AwsRequestOverrideConfiguration getOrCreateConfigWithEndpointProvider (S3Request request , String bucket ) {
69
- AwsRequestOverrideConfiguration requestOverrideConfig =
70
- request .overrideConfiguration ().orElseGet (() -> AwsRequestOverrideConfiguration .builder ().build ());
71
-
72
- S3EndpointProvider delegateEndpointProvider = (S3EndpointProvider )
73
- requestOverrideConfig .endpointProvider ().orElseGet (() -> serviceClientConfiguration ().endpointProvider ().get ());
74
-
75
- return requestOverrideConfig .toBuilder ()
76
- .endpointProvider (BucketEndpointProvider .create (delegateEndpointProvider , bucket ))
77
- .build ();
112
+ private <T extends S3Request , ReturnT > void fetchRegionAndSendRequest (T request ,
113
+ Function <T , CompletableFuture <ReturnT >> operation ,
114
+ String bucketName ,
115
+ CompletableFuture <ReturnT > returnFuture ) {
116
+ // // TODO: will fix the casts with separate PR
117
+ ((S3AsyncClient ) delegate ()).headBucket (b -> b .bucket (bucketName )).whenComplete ((response ,
118
+ throwable ) -> {
119
+ if (throwable != null ) {
120
+ if (isS3RedirectException (throwable )) {
121
+ bucketToRegionCache .remove (bucketName );
122
+ Optional <String > bucketRegion = getBucketRegionFromException ((S3Exception ) throwable .getCause ());
123
+ if (bucketRegion .isPresent ()) {
124
+ sendRequestWithRightRegion (request , operation , bucketName , returnFuture , bucketRegion );
125
+ } else {
126
+ returnFuture .completeExceptionally (throwable );
127
+ }
128
+ } else {
129
+ returnFuture .completeExceptionally (throwable );
130
+ }
131
+ }
132
+ });
78
133
}
79
134
80
- //TODO: add cross region logic
81
- static final class BucketEndpointProvider implements S3EndpointProvider {
82
- private final S3EndpointProvider delegate ;
83
- private final String bucket ;
84
-
85
- private BucketEndpointProvider (S3EndpointProvider delegate , String bucket ) {
86
- this .delegate = delegate ;
87
- this .bucket = bucket ;
88
- }
89
-
90
- public static BucketEndpointProvider create (S3EndpointProvider delegate , String bucket ) {
91
- return new BucketEndpointProvider (delegate , bucket );
92
- }
93
-
94
- @ Override
95
- public CompletableFuture <Endpoint > resolveEndpoint (S3EndpointParams endpointParams ) {
96
- return delegate .resolveEndpoint (endpointParams );
97
- }
135
+ private <T extends S3Request , ReturnT > void sendRequestWithRightRegion (T request ,
136
+ Function <T , CompletableFuture <ReturnT >> operation ,
137
+ String bucketName ,
138
+ CompletableFuture <ReturnT > returnFuture ,
139
+ Optional <String > bucketRegionFromException ) {
140
+ String region = bucketRegionFromException .get ();
141
+ bucketToRegionCache .put (bucketName , Region .of (region ));
142
+ CompletableFuture <ReturnT > newFuture = operation .apply (
143
+ requestWithDecoratedEndpointProvider (request ,
144
+ () -> Region .of (region ),
145
+ serviceClientConfiguration ().endpointProvider ().get ()));
146
+ CompletableFutureUtils .forwardResultTo (newFuture , returnFuture );
147
+ CompletableFutureUtils .forwardExceptionTo (returnFuture , newFuture );
98
148
}
99
- }
149
+ }
0 commit comments