15
15
16
16
package software .amazon .awssdk .http .crt ;
17
17
18
- import static software .amazon .awssdk .utils .CollectionUtils .isNullOrEmpty ;
19
- import static software .amazon .awssdk .utils .FunctionalUtils .invokeSafely ;
20
18
import static software .amazon .awssdk .utils .Validate .paramNotNull ;
21
19
22
- import java .io .IOException ;
23
20
import java .net .URI ;
24
21
import java .time .Duration ;
25
- import java .util .ArrayList ;
26
22
import java .util .LinkedList ;
27
- import java .util .List ;
28
23
import java .util .Map ;
29
- import java .util .Optional ;
30
24
import java .util .concurrent .CompletableFuture ;
31
25
import java .util .concurrent .ConcurrentHashMap ;
32
26
import java .util .function .Consumer ;
33
27
import software .amazon .awssdk .annotations .SdkPublicApi ;
34
28
import software .amazon .awssdk .crt .CrtResource ;
35
- import software .amazon .awssdk .crt .CrtRuntimeException ;
36
29
import software .amazon .awssdk .crt .http .HttpClientConnectionManager ;
37
30
import software .amazon .awssdk .crt .http .HttpClientConnectionManagerOptions ;
38
- import software .amazon .awssdk .crt .http .HttpHeader ;
39
31
import software .amazon .awssdk .crt .http .HttpMonitoringOptions ;
40
32
import software .amazon .awssdk .crt .http .HttpProxyOptions ;
41
- import software .amazon .awssdk .crt .http .HttpRequest ;
42
33
import software .amazon .awssdk .crt .io .ClientBootstrap ;
43
34
import software .amazon .awssdk .crt .io .EventLoopGroup ;
44
35
import software .amazon .awssdk .crt .io .HostResolver ;
45
36
import software .amazon .awssdk .crt .io .SocketOptions ;
46
37
import software .amazon .awssdk .crt .io .TlsCipherPreference ;
47
38
import software .amazon .awssdk .crt .io .TlsContext ;
48
39
import software .amazon .awssdk .crt .io .TlsContextOptions ;
49
- import software .amazon .awssdk .http .Header ;
50
40
import software .amazon .awssdk .http .SdkHttpConfigurationOption ;
51
- import software .amazon .awssdk .http .SdkHttpRequest ;
52
41
import software .amazon .awssdk .http .async .AsyncExecuteRequest ;
53
42
import software .amazon .awssdk .http .async .SdkAsyncHttpClient ;
54
- import software .amazon .awssdk .http .crt .internal .AwsCrtAsyncHttpStreamAdapter ;
43
+ import software .amazon .awssdk .http .crt .internal .CrtRequestContext ;
44
+ import software .amazon .awssdk .http .crt .internal .CrtRequestExecutor ;
55
45
import software .amazon .awssdk .utils .AttributeMap ;
56
46
import software .amazon .awssdk .utils .IoUtils ;
57
47
import software .amazon .awssdk .utils .Logger ;
58
48
import software .amazon .awssdk .utils .Validate ;
59
- import software .amazon .awssdk .utils .http .SdkHttpUtils ;
60
49
61
50
/**
62
51
* An implementation of {@link SdkAsyncHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
@@ -79,7 +68,7 @@ public final class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
79
68
private final HttpProxyOptions proxyOptions ;
80
69
private final HttpMonitoringOptions monitoringOptions ;
81
70
private final long maxConnectionIdleInMilliseconds ;
82
- private final int initialWindowSize ;
71
+ private final int readBufferSize ;
83
72
private final int maxConnectionsPerEndpoint ;
84
73
private boolean isClosed = false ;
85
74
@@ -88,7 +77,7 @@ private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
88
77
89
78
Validate .isPositive (maxConns , "maxConns" );
90
79
Validate .notNull (builder .cipherPreference , "cipherPreference" );
91
- Validate .isPositive (builder .initialWindowSize , "initialWindowSize " );
80
+ Validate .isPositive (builder .readBufferSize , "readBufferSize " );
92
81
Validate .notNull (builder .eventLoopGroup , "eventLoopGroup" );
93
82
Validate .notNull (builder .hostResolver , "hostResolver" );
94
83
@@ -102,12 +91,10 @@ private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
102
91
this .bootstrap = registerOwnedResource (clientBootstrap );
103
92
this .socketOptions = registerOwnedResource (clientSocketOptions );
104
93
this .tlsContext = registerOwnedResource (clientTlsContext );
105
-
106
- this .initialWindowSize = builder .initialWindowSize ;
94
+ this .readBufferSize = builder .readBufferSize ;
107
95
this .maxConnectionsPerEndpoint = maxConns ;
108
96
this .monitoringOptions = revolveHttpMonitoringOptions (builder .connectionHealthChecksConfiguration );
109
97
this .maxConnectionIdleInMilliseconds = config .get (SdkHttpConfigurationOption .CONNECTION_MAX_IDLE_TIMEOUT ).toMillis ();
110
-
111
98
this .proxyOptions = buildProxyOptions (builder .proxyConfiguration );
112
99
}
113
100
}
@@ -164,11 +151,6 @@ private <T extends CrtResource> T registerOwnedResource(T subresource) {
164
151
return subresource ;
165
152
}
166
153
167
- private static URI toUri (SdkHttpRequest sdkRequest ) {
168
- return invokeSafely (() -> new URI (sdkRequest .protocol (), null , sdkRequest .host (), sdkRequest .port (),
169
- null , null , null ));
170
- }
171
-
172
154
public static Builder builder () {
173
155
return new DefaultBuilder ();
174
156
}
@@ -195,7 +177,7 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
195
177
.withSocketOptions (socketOptions )
196
178
.withTlsContext (tlsContext )
197
179
.withUri (uri )
198
- .withWindowSize (initialWindowSize )
180
+ .withWindowSize (readBufferSize )
199
181
.withMaxConnections (maxConnectionsPerEndpoint )
200
182
.withManualWindowManagement (true )
201
183
.withProxyOptions (proxyOptions )
@@ -232,60 +214,6 @@ private HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
232
214
}
233
215
}
234
216
235
- private List <HttpHeader > createHttpHeaderList (URI uri , AsyncExecuteRequest asyncRequest ) {
236
- SdkHttpRequest sdkRequest = asyncRequest .request ();
237
- // worst case we may add 3 more headers here
238
- List <HttpHeader > crtHeaderList = new ArrayList <>(sdkRequest .headers ().size () + 3 );
239
-
240
- // Set Host Header if needed
241
- if (isNullOrEmpty (sdkRequest .headers ().get (Header .HOST ))) {
242
- crtHeaderList .add (new HttpHeader (Header .HOST , uri .getHost ()));
243
- }
244
-
245
- // Add Connection Keep Alive Header to reuse this Http Connection as long as possible
246
- if (isNullOrEmpty (sdkRequest .headers ().get (Header .CONNECTION ))) {
247
- crtHeaderList .add (new HttpHeader (Header .CONNECTION , Header .KEEP_ALIVE_VALUE ));
248
- }
249
-
250
- // Set Content-Length if needed
251
- Optional <Long > contentLength = asyncRequest .requestContentPublisher ().contentLength ();
252
- if (isNullOrEmpty (sdkRequest .headers ().get (Header .CONTENT_LENGTH )) && contentLength .isPresent ()) {
253
- crtHeaderList .add (new HttpHeader (Header .CONTENT_LENGTH , Long .toString (contentLength .get ())));
254
- }
255
-
256
- // Add the rest of the Headers
257
- for (Map .Entry <String , List <String >> headerList : sdkRequest .headers ().entrySet ()) {
258
- for (String val : headerList .getValue ()) {
259
- HttpHeader h = new HttpHeader (headerList .getKey (), val );
260
- crtHeaderList .add (h );
261
- }
262
- }
263
-
264
- return crtHeaderList ;
265
- }
266
-
267
- private HttpHeader [] asArray (List <HttpHeader > crtHeaderList ) {
268
- return crtHeaderList .toArray (new HttpHeader [0 ]);
269
- }
270
-
271
- private HttpRequest toCrtRequest (URI uri , AsyncExecuteRequest asyncRequest , AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter ) {
272
- SdkHttpRequest sdkRequest = asyncRequest .request ();
273
-
274
- String method = sdkRequest .method ().name ();
275
- String encodedPath = sdkRequest .encodedPath ();
276
- if (encodedPath == null || encodedPath .length () == 0 ) {
277
- encodedPath = "/" ;
278
- }
279
-
280
- String encodedQueryString = SdkHttpUtils .encodeAndFlattenQueryParameters (sdkRequest .rawQueryParameters ())
281
- .map (value -> "?" + value )
282
- .orElse ("" );
283
-
284
- HttpHeader [] crtHeaderArray = asArray (createHttpHeaderList (uri , asyncRequest ));
285
-
286
- return new HttpRequest (method , encodedPath + encodedQueryString , crtHeaderArray , crtToSdkAdapter );
287
- }
288
-
289
217
@ Override
290
218
public CompletableFuture <Void > execute (AsyncExecuteRequest asyncRequest ) {
291
219
@@ -294,8 +222,6 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
294
222
paramNotNull (asyncRequest .requestContentPublisher (), "RequestContentPublisher" );
295
223
paramNotNull (asyncRequest .responseHandler (), "ResponseHandler" );
296
224
297
- URI uri = toUri (asyncRequest .request ());
298
-
299
225
/*
300
226
* See the note on getOrCreateConnectionPool()
301
227
*
@@ -306,38 +232,14 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
306
232
* we have a pool and no one can destroy it underneath us until we've finished submitting the
307
233
* request)
308
234
*/
309
- try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool (uri )) {
310
- CompletableFuture <Void > requestFuture = new CompletableFuture <>();
311
- // When a Connection is ready from the Connection Pool, schedule the Request on the connection
312
- crtConnPool .acquireConnection ()
313
- .whenComplete ((crtConn , throwable ) -> {
314
- // If we didn't get a connection for some reason, fail the request
315
- if (throwable != null ) {
316
- try {
317
- asyncRequest .responseHandler ().onError (throwable );
318
- } catch (Exception e ) {
319
- log .warn (() -> "Exception while handling error" , e );
320
- }
321
- requestFuture .completeExceptionally (new IOException (
322
- "Crt exception while acquiring connection" , throwable ));
323
- return ;
324
- }
325
- AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
326
- new AwsCrtAsyncHttpStreamAdapter (crtConn , requestFuture , asyncRequest , initialWindowSize );
327
- HttpRequest crtRequest = toCrtRequest (uri , asyncRequest , crtToSdkAdapter );
328
- // Submit the Request on this Connection
329
- invokeSafely (() -> {
330
- try {
331
- crtConn .makeRequest (crtRequest , crtToSdkAdapter ).activate ();
332
- } catch (IllegalStateException | CrtRuntimeException e ) {
333
- log .error (() -> "Exception occurred when making the request" , e );
334
- requestFuture .completeExceptionally (
335
- new IOException ("Exception throw while submitting request to CRT http connection" , e ));
336
- }
337
- });
338
- });
339
-
340
- return requestFuture ;
235
+ try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool (asyncRequest .request ().getUri ())) {
236
+ CrtRequestContext context = CrtRequestContext .builder ()
237
+ .crtConnPool (crtConnPool )
238
+ .readBufferSize (readBufferSize )
239
+ .request (asyncRequest )
240
+ .build ();
241
+
242
+ return new CrtRequestExecutor ().execute (context );
341
243
}
342
244
}
343
245
@@ -456,7 +358,7 @@ Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChecksConfi
456
358
private static final class DefaultBuilder implements Builder {
457
359
private final AttributeMap .Builder standardOptions = AttributeMap .builder ();
458
360
private TlsCipherPreference cipherPreference = TlsCipherPreference .TLS_CIPHER_SYSTEM_DEFAULT ;
459
- private int initialWindowSize = DEFAULT_STREAM_WINDOW_SIZE ;
361
+ private int readBufferSize = DEFAULT_STREAM_WINDOW_SIZE ;
460
362
private EventLoopGroup eventLoopGroup ;
461
363
private HostResolver hostResolver ;
462
364
private ProxyConfiguration proxyConfiguration ;
@@ -495,9 +397,9 @@ public Builder tlsCipherPreference(TlsCipherPreference tlsCipherPreference) {
495
397
}
496
398
497
399
@ Override
498
- public Builder readBufferSize (int initialWindowSize ) {
499
- Validate .isPositive (initialWindowSize , "initialWindowSize " );
500
- this .initialWindowSize = initialWindowSize ;
400
+ public Builder readBufferSize (int readBufferSize ) {
401
+ Validate .isPositive (readBufferSize , "readBufferSize " );
402
+ this .readBufferSize = readBufferSize ;
501
403
return this ;
502
404
}
503
405
0 commit comments