22
22
import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .HTTP_CHECKSUM ;
23
23
import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .OBJECT_FILE_PATH ;
24
24
import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .OPERATION_NAME ;
25
+ import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .SIGNING_NAME ;
25
26
import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .SIGNING_REGION ;
27
+ import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .USE_S3_EXPRESS_AUTH ;
26
28
import static software .amazon .awssdk .utils .FunctionalUtils .invokeSafely ;
27
29
28
30
import java .net .URI ;
46
48
import software .amazon .awssdk .crt .s3 .S3MetaRequest ;
47
49
import software .amazon .awssdk .crt .s3 .S3MetaRequestOptions ;
48
50
import software .amazon .awssdk .http .Header ;
51
+ import software .amazon .awssdk .http .SdkHttpExecutionAttributes ;
49
52
import software .amazon .awssdk .http .SdkHttpRequest ;
50
53
import software .amazon .awssdk .http .async .AsyncExecuteRequest ;
51
54
import software .amazon .awssdk .http .async .SdkAsyncHttpClient ;
@@ -70,9 +73,28 @@ public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient {
70
73
71
74
private S3CrtAsyncHttpClient (Builder builder ) {
72
75
s3NativeClientConfiguration = builder .clientConfiguration ;
76
+ this .s3ClientOptions = createS3ClientOption ();
77
+
78
+ this .crtS3Client = new S3Client (s3ClientOptions );
79
+ }
80
+
81
+ @ SdkTestInternalApi
82
+ S3CrtAsyncHttpClient (S3Client crtS3Client ,
83
+ Builder builder ) {
84
+ s3NativeClientConfiguration = builder .clientConfiguration ;
85
+ s3ClientOptions = createS3ClientOption ();
86
+ this .crtS3Client = crtS3Client ;
87
+ }
88
+
89
+ @ SdkTestInternalApi
90
+ public S3ClientOptions s3ClientOptions () {
91
+ return s3ClientOptions ;
92
+ }
93
+
94
+ private S3ClientOptions createS3ClientOption () {
73
95
Long initialWindowSize = s3NativeClientConfiguration .readBufferSizeInBytes ();
74
96
75
- this . s3ClientOptions =
97
+ S3ClientOptions options =
76
98
new S3ClientOptions ().withRegion (s3NativeClientConfiguration .signingRegion ())
77
99
.withEndpoint (s3NativeClientConfiguration .endpointOverride () == null ? null :
78
100
s3NativeClientConfiguration .endpointOverride ().toString ())
@@ -82,80 +104,64 @@ private S3CrtAsyncHttpClient(Builder builder) {
82
104
.withPartSize (s3NativeClientConfiguration .partSizeBytes ())
83
105
.withMultipartUploadThreshold (s3NativeClientConfiguration .thresholdInBytes ())
84
106
.withComputeContentMd5 (false )
107
+ .withEnableS3Express (true )
85
108
.withMaxConnections (s3NativeClientConfiguration .maxConcurrency ())
86
109
.withThroughputTargetGbps (s3NativeClientConfiguration .targetThroughputInGbps ())
87
110
.withInitialReadWindowSize (initialWindowSize )
88
111
.withReadBackpressureEnabled (true );
89
112
90
113
if (s3NativeClientConfiguration .standardRetryOptions () != null ) {
91
- this . s3ClientOptions .withStandardRetryOptions (s3NativeClientConfiguration .standardRetryOptions ());
114
+ options .withStandardRetryOptions (s3NativeClientConfiguration .standardRetryOptions ());
92
115
}
93
116
if (Boolean .FALSE .equals (s3NativeClientConfiguration .isUseEnvironmentVariableValues ())) {
94
- s3ClientOptions .withProxyEnvironmentVariableSetting (disabledHttpProxyEnvironmentVariableSetting ());
117
+ options .withProxyEnvironmentVariableSetting (disabledHttpProxyEnvironmentVariableSetting ());
95
118
}
96
- Optional .ofNullable (s3NativeClientConfiguration .proxyOptions ()).ifPresent (s3ClientOptions ::withProxyOptions );
119
+ Optional .ofNullable (s3NativeClientConfiguration .proxyOptions ()).ifPresent (options ::withProxyOptions );
97
120
Optional .ofNullable (s3NativeClientConfiguration .connectionTimeout ())
98
121
.map (Duration ::toMillis )
99
122
.map (NumericUtils ::saturatedCast )
100
- .ifPresent (s3ClientOptions ::withConnectTimeoutMs );
123
+ .ifPresent (options ::withConnectTimeoutMs );
101
124
Optional .ofNullable (s3NativeClientConfiguration .httpMonitoringOptions ())
102
- .ifPresent (s3ClientOptions ::withHttpMonitoringOptions );
103
-
104
- this .crtS3Client = new S3Client (s3ClientOptions );
105
- }
106
-
107
- @ SdkTestInternalApi
108
- S3CrtAsyncHttpClient (S3Client crtS3Client ,
109
- S3NativeClientConfiguration nativeClientConfiguration ) {
110
- this .crtS3Client = crtS3Client ;
111
- this .s3NativeClientConfiguration = nativeClientConfiguration ;
112
- this .s3ClientOptions = null ;
113
- }
114
-
115
- @ SdkTestInternalApi
116
- public S3ClientOptions s3ClientOptions () {
117
- return s3ClientOptions ;
125
+ .ifPresent (options ::withHttpMonitoringOptions );
126
+ return options ;
118
127
}
119
128
120
129
@ Override
121
130
public CompletableFuture <Void > execute (AsyncExecuteRequest asyncRequest ) {
122
131
CompletableFuture <Void > executeFuture = new CompletableFuture <>();
123
132
URI uri = asyncRequest .request ().getUri ();
124
133
HttpRequest httpRequest = toCrtRequest (asyncRequest );
134
+ SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest .httpExecutionAttributes ();
125
135
S3CrtResponseHandlerAdapter responseHandler =
126
136
new S3CrtResponseHandlerAdapter (executeFuture ,
127
137
asyncRequest .responseHandler (),
128
- asyncRequest . httpExecutionAttributes () .getAttribute (CRT_PROGRESS_LISTENER ));
138
+ httpExecutionAttributes .getAttribute (CRT_PROGRESS_LISTENER ));
129
139
130
140
S3MetaRequestOptions .MetaRequestType requestType = requestType (asyncRequest );
131
141
132
- HttpChecksum httpChecksum = asyncRequest . httpExecutionAttributes () .getAttribute (HTTP_CHECKSUM );
133
- ResumeToken resumeToken = asyncRequest . httpExecutionAttributes () .getAttribute (CRT_PAUSE_RESUME_TOKEN );
134
- Region signingRegion = asyncRequest . httpExecutionAttributes () .getAttribute (SIGNING_REGION );
135
- Path requestFilePath = asyncRequest . httpExecutionAttributes () .getAttribute (OBJECT_FILE_PATH );
142
+ HttpChecksum httpChecksum = httpExecutionAttributes .getAttribute (HTTP_CHECKSUM );
143
+ ResumeToken resumeToken = httpExecutionAttributes .getAttribute (CRT_PAUSE_RESUME_TOKEN );
144
+ Region signingRegion = httpExecutionAttributes .getAttribute (SIGNING_REGION );
145
+ Path requestFilePath = httpExecutionAttributes .getAttribute (OBJECT_FILE_PATH );
136
146
ChecksumConfig checksumConfig =
137
147
checksumConfig (httpChecksum , requestType , s3NativeClientConfiguration .checksumValidationEnabled ());
138
148
URI endpoint = getEndpoint (uri );
139
149
150
+ AwsSigningConfig defaultS3SigningConfig = awsSigningConfig (signingRegion , httpExecutionAttributes );
151
+
140
152
S3MetaRequestOptions requestOptions = new S3MetaRequestOptions ()
141
153
.withHttpRequest (httpRequest )
142
154
.withMetaRequestType (requestType )
143
155
.withChecksumConfig (checksumConfig )
144
156
.withEndpoint (endpoint )
145
157
.withResponseHandler (responseHandler )
146
158
.withResumeToken (resumeToken )
147
- .withRequestFilePath (requestFilePath );
148
-
149
- // Create a new SigningConfig object only if the signing region has changed from the previously configured region.
150
- if (signingRegion != null && !s3ClientOptions .getRegion ().equals (signingRegion .id ())) {
151
- requestOptions .withSigningConfig (
152
- AwsSigningConfig .getDefaultS3SigningConfig (signingRegion .id (),
153
- s3ClientOptions .getCredentialsProvider ()));
154
- }
159
+ .withRequestFilePath (requestFilePath )
160
+ .withSigningConfig (defaultS3SigningConfig );
155
161
156
162
S3MetaRequest s3MetaRequest = crtS3Client .makeMetaRequest (requestOptions );
157
163
S3MetaRequestPauseObservable observable =
158
- asyncRequest . httpExecutionAttributes () .getAttribute (METAREQUEST_PAUSE_OBSERVABLE );
164
+ httpExecutionAttributes .getAttribute (METAREQUEST_PAUSE_OBSERVABLE );
159
165
160
166
responseHandler .metaRequest (s3MetaRequest );
161
167
@@ -167,6 +173,22 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
167
173
return executeFuture ;
168
174
}
169
175
176
+ private AwsSigningConfig awsSigningConfig (Region signingRegion , SdkHttpExecutionAttributes httpExecutionAttributes ) {
177
+ AwsSigningConfig defaultS3SigningConfig =
178
+ AwsSigningConfig .getDefaultS3SigningConfig (s3ClientOptions .getRegion (), s3ClientOptions .getCredentialsProvider ());
179
+
180
+ // Override the region only if the signing region has changed from the previously configured region.
181
+ if (signingRegion != null && !s3ClientOptions .getRegion ().equals (signingRegion .id ())) {
182
+ defaultS3SigningConfig .setRegion (signingRegion .id ());
183
+ }
184
+
185
+ defaultS3SigningConfig .setService (httpExecutionAttributes .getAttribute (SIGNING_NAME ));
186
+
187
+ if (Boolean .TRUE .equals (httpExecutionAttributes .getAttribute (USE_S3_EXPRESS_AUTH ))) {
188
+ defaultS3SigningConfig .setAlgorithm (AwsSigningConfig .AwsSigningAlgorithm .SIGV4_S3EXPRESS );
189
+ }
190
+ return defaultS3SigningConfig ;
191
+ }
170
192
171
193
private static URI getEndpoint (URI uri ) {
172
194
return invokeSafely (() -> new URI (uri .getScheme (), null , uri .getHost (), uri .getPort (), null , null , null ));
0 commit comments