18
18
import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .getBucketRegionFromException ;
19
19
import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .isS3RedirectException ;
20
20
import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .requestWithDecoratedEndpointProvider ;
21
+ import static software .amazon .awssdk .services .s3 .internal .crossregion .utils .CrossRegionUtils .updateUserAgentInConfig ;
21
22
22
23
import java .util .Map ;
23
24
import java .util .Optional ;
24
25
import java .util .concurrent .CompletableFuture ;
25
26
import java .util .concurrent .ConcurrentHashMap ;
27
+ import java .util .function .BiConsumer ;
26
28
import java .util .function .Function ;
27
29
import software .amazon .awssdk .annotations .SdkInternalApi ;
30
+ import software .amazon .awssdk .awscore .AwsRequestOverrideConfiguration ;
28
31
import software .amazon .awssdk .regions .Region ;
29
32
import software .amazon .awssdk .services .s3 .DelegatingS3AsyncClient ;
30
33
import software .amazon .awssdk .services .s3 .S3AsyncClient ;
@@ -47,44 +50,60 @@ protected <T extends S3Request, ReturnT> CompletableFuture<ReturnT> invokeOperat
47
50
48
51
Optional <String > bucket = request .getValueForField ("Bucket" , String .class );
49
52
53
+ AwsRequestOverrideConfiguration overrideConfiguration = updateUserAgentInConfig (request );
54
+ T userAgentUpdatedRequest = (T ) request .toBuilder ().overrideConfiguration (overrideConfiguration ).build ();
55
+
50
56
if (!bucket .isPresent ()) {
51
- return operation .apply (request );
57
+ return operation .apply (userAgentUpdatedRequest );
52
58
}
53
59
String bucketName = bucket .get ();
54
60
55
- if (bucketToRegionCache .containsKey (bucketName )) {
56
- return operation .apply (requestWithDecoratedEndpointProvider (request ,
57
- () -> bucketToRegionCache .get (bucketName ),
58
- serviceClientConfiguration ().endpointProvider ().get ()));
59
- }
60
-
61
61
CompletableFuture <ReturnT > returnFuture = new CompletableFuture <>();
62
- operation .apply (request )
63
- .whenComplete ((r , t ) -> {
64
- if (t != null ) {
65
- if (isS3RedirectException (t )) {
66
- bucketToRegionCache .remove (bucketName );
67
- requestWithCrossRegion (request , operation , bucketName , returnFuture , t );
68
- return ;
69
- }
70
- returnFuture .completeExceptionally (t );
71
- return ;
72
- }
73
- returnFuture .complete (r );
74
- });
62
+ CompletableFuture <ReturnT > apiOperationFuture = bucketToRegionCache .containsKey (bucketName ) ?
63
+ operation .apply (
64
+ requestWithDecoratedEndpointProvider (
65
+ userAgentUpdatedRequest ,
66
+ () -> bucketToRegionCache .get (bucketName ),
67
+ serviceClientConfiguration ().endpointProvider ().get ()
68
+ )
69
+ ) :
70
+ operation .apply (userAgentUpdatedRequest );
71
+
72
+ apiOperationFuture .whenComplete (redirectToCrossRegionIfRedirectException (operation ,
73
+ userAgentUpdatedRequest ,
74
+ bucketName ,
75
+ returnFuture ));
75
76
return returnFuture ;
76
77
}
77
78
79
+ private <T extends S3Request , ReturnT > BiConsumer <ReturnT , Throwable > redirectToCrossRegionIfRedirectException (
80
+ Function <T , CompletableFuture <ReturnT >> operation ,
81
+ T userAgentUpdatedRequest , String bucketName ,
82
+ CompletableFuture <ReturnT > returnFuture ) {
83
+
84
+ return (response , throwable ) -> {
85
+ if (throwable != null ) {
86
+ if (isS3RedirectException (throwable )) {
87
+ bucketToRegionCache .remove (bucketName );
88
+ requestWithCrossRegion (userAgentUpdatedRequest , operation , bucketName , returnFuture , throwable );
89
+ } else {
90
+ returnFuture .completeExceptionally (throwable );
91
+ }
92
+ } else {
93
+ returnFuture .complete (response );
94
+ }
95
+ };
96
+ }
97
+
78
98
private <T extends S3Request , ReturnT > void requestWithCrossRegion (T request ,
79
99
Function <T , CompletableFuture <ReturnT >> operation ,
80
100
String bucketName ,
81
101
CompletableFuture <ReturnT > returnFuture ,
82
- Throwable t ) {
102
+ Throwable throwable ) {
83
103
84
- Optional <String > bucketRegionFromException = getBucketRegionFromException ((S3Exception ) t .getCause ());
104
+ Optional <String > bucketRegionFromException = getBucketRegionFromException ((S3Exception ) throwable .getCause ());
85
105
if (bucketRegionFromException .isPresent ()) {
86
- sendRequestWithRightRegion (request , operation , bucketName , returnFuture ,
87
- bucketRegionFromException );
106
+ sendRequestWithRightRegion (request , operation , bucketName , returnFuture , bucketRegionFromException );
88
107
} else {
89
108
fetchRegionAndSendRequest (request , operation , bucketName , returnFuture );
90
109
}
@@ -94,28 +113,21 @@ private <T extends S3Request, ReturnT> void fetchRegionAndSendRequest(T request,
94
113
Function <T , CompletableFuture <ReturnT >> operation ,
95
114
String bucketName ,
96
115
CompletableFuture <ReturnT > returnFuture ) {
97
-
98
116
// // TODO: will fix the casts with separate PR
99
117
((S3AsyncClient ) delegate ()).headBucket (b -> b .bucket (bucketName )).whenComplete ((response ,
100
118
throwable ) -> {
101
119
if (throwable != null ) {
102
120
if (isS3RedirectException (throwable )) {
103
121
bucketToRegionCache .remove (bucketName );
104
122
Optional <String > bucketRegion = getBucketRegionFromException ((S3Exception ) throwable .getCause ());
105
-
106
123
if (bucketRegion .isPresent ()) {
107
- bucketToRegionCache .put (bucketName , Region .of (bucketRegion .get ()));
108
124
sendRequestWithRightRegion (request , operation , bucketName , returnFuture , bucketRegion );
109
125
} else {
110
126
returnFuture .completeExceptionally (throwable );
111
127
}
112
128
} else {
113
129
returnFuture .completeExceptionally (throwable );
114
130
}
115
- } else {
116
- CompletableFuture <ReturnT > newFuture = operation .apply (request );
117
- CompletableFutureUtils .forwardResultTo (newFuture , returnFuture );
118
- CompletableFutureUtils .forwardExceptionTo (returnFuture , newFuture );
119
131
}
120
132
});
121
133
}
@@ -127,19 +139,11 @@ private <T extends S3Request, ReturnT> void sendRequestWithRightRegion(T request
127
139
Optional <String > bucketRegionFromException ) {
128
140
String region = bucketRegionFromException .get ();
129
141
bucketToRegionCache .put (bucketName , Region .of (region ));
130
- doSendRequestWithRightRegion (request , operation , returnFuture , region );
131
- }
132
-
133
- private <T extends S3Request , ReturnT > void doSendRequestWithRightRegion (T request ,
134
- Function <T , CompletableFuture <ReturnT >> operation ,
135
- CompletableFuture <ReturnT > returnFuture ,
136
- String region ) {
137
142
CompletableFuture <ReturnT > newFuture = operation .apply (
138
143
requestWithDecoratedEndpointProvider (request ,
139
144
() -> Region .of (region ),
140
145
serviceClientConfiguration ().endpointProvider ().get ()));
141
146
CompletableFutureUtils .forwardResultTo (newFuture , returnFuture );
142
- // forward exception
143
147
CompletableFutureUtils .forwardExceptionTo (returnFuture , newFuture );
144
148
}
145
149
}
0 commit comments