24
24
import java .util .concurrent .CompletableFuture ;
25
25
import java .util .concurrent .ConcurrentHashMap ;
26
26
import java .util .function .Function ;
27
- import java .util .function .Supplier ;
28
27
import software .amazon .awssdk .annotations .SdkInternalApi ;
29
- import software .amazon .awssdk .awscore .exception .AwsServiceException ;
30
28
import software .amazon .awssdk .regions .Region ;
31
29
import software .amazon .awssdk .services .s3 .DelegatingS3AsyncClient ;
32
30
import software .amazon .awssdk .services .s3 .S3AsyncClient ;
33
- import software .amazon .awssdk .services .s3 .model .HeadBucketRequest ;
34
31
import software .amazon .awssdk .services .s3 .model .S3Exception ;
35
32
import software .amazon .awssdk .services .s3 .model .S3Request ;
36
33
import software .amazon .awssdk .utils .CompletableFutureUtils ;
37
34
38
35
@ SdkInternalApi
39
36
public final class S3CrossRegionAsyncClient extends DelegatingS3AsyncClient {
40
37
41
- private final Map <String , CompletableFuture < Region > > bucketToRegionCache = new ConcurrentHashMap <>();
38
+ private final Map <String , Region > bucketToRegionCache = new ConcurrentHashMap <>();
42
39
43
40
public S3CrossRegionAsyncClient (S3AsyncClient s3Client ) {
44
41
super (s3Client );
@@ -57,46 +54,85 @@ protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperat
57
54
58
55
if (bucketToRegionCache .containsKey (bucketName )) {
59
56
return operation .apply (requestWithDecoratedEndpointProvider (request ,
60
- regionSupplier (bucketName ),
57
+ () -> bucketToRegionCache . get (bucketName ),
61
58
serviceClientConfiguration ().endpointProvider ().get ()));
62
59
}
63
- return operation .apply (request ).thenApply (CompletableFuture ::completedFuture )
64
- .exceptionally (exception -> {
65
- if (isS3RedirectException (exception .getCause ())) {
66
- bucketToRegionCache .remove (bucketName );
67
- getBucketRegionFromException ((S3Exception ) exception .getCause ())
68
- .ifPresent (
69
- region -> bucketToRegionCache .put (bucketName ,
70
- CompletableFuture .completedFuture (Region .of (region ))));
71
- return operation .apply (
72
- requestWithDecoratedEndpointProvider (request ,
73
- regionSupplier (bucketName ),
74
- serviceClientConfiguration ().endpointProvider ().get ()));
75
- }
76
- return CompletableFutureUtils .failedFuture (exception );
77
- }).thenCompose (Function .identity ());
78
- }
79
60
61
+ CompletableFuture <ReturnT > returnFuture = new CompletableFuture <>();
62
+ operation .apply (request )
63
+ .whenComplete ((r , t ) -> {
64
+ if (t != null ) {
65
+ if (isS3RedirectException (t .getCause ())) {
66
+ bucketToRegionCache .remove (bucketName );
67
+ Optional <String > bucketRegionFromException =
68
+ getBucketRegionFromException ((S3Exception ) t .getCause ());
80
69
81
- private Supplier <Region > regionSupplier (String bucket ) {
82
- CompletableFuture <Region > completableFuture = bucketToRegionCache .computeIfAbsent (bucket , this ::regionCompletableFuture );
83
- return () -> completableFuture .join ();
70
+ if (bucketRegionFromException .isPresent ()) {
71
+ sendRequestWithRightRegion (request , operation , bucketName , returnFuture ,
72
+ bucketRegionFromException );
73
+ } else {
74
+ fetchRegionAndSendRequest (request , operation , bucketName , returnFuture );
75
+ }
76
+ return ;
77
+ }
78
+ returnFuture .completeExceptionally (t );
79
+ return ;
80
+ }
81
+ returnFuture .complete (r );
82
+ });
83
+ return returnFuture ;
84
84
}
85
85
86
- private CompletableFuture <Region > regionCompletableFuture (String bucketName ) {
87
- return CompletableFuture .supplyAsync (() -> {
88
- try {
89
- ((S3AsyncClient ) delegate ()).headBucket (HeadBucketRequest .builder ().bucket (bucketName ).build ()).join ();
90
- } catch (Exception exception ) {
91
- if (isS3RedirectException (exception .getCause ())) {
92
- String region = getBucketRegionFromException ((S3Exception ) exception .getCause ())
93
- .orElseThrow (() -> AwsServiceException .create ("Region name not found in Redirect error" ,
94
- exception ));
95
- return Region .of (region );
86
+ private <T extends S3Request , ReturnT > void fetchRegionAndSendRequest (T request ,
87
+ Function <T , CompletableFuture <ReturnT >> operation ,
88
+ String bucketName ,
89
+ CompletableFuture <ReturnT > returnFuture ) {
90
+
91
+ // // TODO: will fix the casts with separate PR
92
+ ((S3AsyncClient ) delegate ()).headBucket (b -> b .bucket (bucketName )).whenComplete ((response ,
93
+ throwable ) -> {
94
+ if (throwable != null ) {
95
+ if (isS3RedirectException (throwable .getCause ())) {
96
+ bucketToRegionCache .remove (bucketName );
97
+ Optional <String > bucketRegion = getBucketRegionFromException ((S3Exception ) throwable .getCause ());
98
+
99
+ if (bucketRegion .isPresent ()) {
100
+ bucketToRegionCache .put (bucketName , Region .of (bucketRegion .get ()));
101
+ sendRequestWithRightRegion (request , operation , bucketName , returnFuture , bucketRegion );
102
+ } else {
103
+ returnFuture .completeExceptionally (throwable );
104
+ }
105
+ } else {
106
+ returnFuture .completeExceptionally (throwable );
96
107
}
97
- throw exception ;
108
+ } else {
109
+ CompletableFuture <ReturnT > newFuture = operation .apply (request );
110
+ CompletableFutureUtils .forwardResultTo (newFuture , returnFuture );
111
+ CompletableFutureUtils .forwardExceptionTo (returnFuture , newFuture );
98
112
}
99
- return ((S3AsyncClient ) delegate ()).serviceClientConfiguration ().region ();
100
113
});
101
114
}
102
- }
115
+
116
+ private <T extends S3Request , ReturnT > void sendRequestWithRightRegion (T request ,
117
+ Function <T , CompletableFuture <ReturnT >> operation ,
118
+ String bucketName ,
119
+ CompletableFuture <ReturnT > returnFuture ,
120
+ Optional <String > bucketRegionFromException ) {
121
+ String region = bucketRegionFromException .get ();
122
+ bucketToRegionCache .put (bucketName , Region .of (region ));
123
+ doSendRequestWithRightRegion (request , operation , returnFuture , region );
124
+ }
125
+
126
+ private <T extends S3Request , ReturnT > void doSendRequestWithRightRegion (T request ,
127
+ Function <T , CompletableFuture <ReturnT >> operation ,
128
+ CompletableFuture <ReturnT > returnFuture ,
129
+ String region ) {
130
+ CompletableFuture <ReturnT > newFuture = operation .apply (
131
+ requestWithDecoratedEndpointProvider (request ,
132
+ () -> Region .of (region ),
133
+ serviceClientConfiguration ().endpointProvider ().get ()));
134
+ CompletableFutureUtils .forwardResultTo (newFuture , returnFuture );
135
+ // forward exception
136
+ CompletableFutureUtils .forwardExceptionTo (returnFuture , newFuture );
137
+ }
138
+ }
0 commit comments