Skip to content

Commit 5ecfb2d

Browse files
committed
Various refactoring of crt branch
1 parent 0a249ae commit 5ecfb2d

File tree

11 files changed

+128
-163
lines changed

11 files changed

+128
-163
lines changed

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static software.amazon.awssdk.utils.CollectionUtils.isNullOrEmpty;
1919
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
20+
import static software.amazon.awssdk.utils.Validate.paramNotNull;
2021

2122
import java.io.IOException;
2223
import java.net.URI;
@@ -46,7 +47,6 @@
4647
import software.amazon.awssdk.crt.io.TlsContext;
4748
import software.amazon.awssdk.crt.io.TlsContextOptions;
4849
import software.amazon.awssdk.http.Header;
49-
import software.amazon.awssdk.http.SdkHttpClient;
5050
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
5151
import software.amazon.awssdk.http.SdkHttpRequest;
5252
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
@@ -59,7 +59,7 @@
5959
import software.amazon.awssdk.utils.http.SdkHttpUtils;
6060

6161
/**
62-
* An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
62+
* An implementation of {@link SdkAsyncHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
6363
* Http Web Services. This client is asynchronous and uses non-blocking IO.
6464
*
6565
* <p>This can be created via {@link #builder()}</p>
@@ -69,8 +69,6 @@ public final class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
6969
private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpClient.class);
7070

7171
private static final String AWS_COMMON_RUNTIME = "AwsCommonRuntime";
72-
private static final String NULL_REQUEST_ERROR_MESSAGE = "SdkHttpRequest must not be null";
73-
private static final String NULL_URI_ERROR_MESSAGE = "URI must not be null";
7472
private static final int DEFAULT_STREAM_WINDOW_SIZE = 16 * 1024 * 1024; // 16 MB
7573

7674
private final Map<URI, HttpClientConnectionManager> connectionPools = new ConcurrentHashMap<>();
@@ -115,27 +113,28 @@ private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
115113
}
116114

117115
private HttpProxyOptions buildProxyOptions(ProxyConfiguration proxyConfiguration) {
118-
if (proxyConfiguration != null) {
119-
HttpProxyOptions clientProxyOptions = new HttpProxyOptions();
116+
if (proxyConfiguration == null) {
117+
return null;
118+
}
120119

121-
clientProxyOptions.setHost(proxyConfiguration.host());
122-
clientProxyOptions.setPort(proxyConfiguration.port());
123-
if (proxyConfiguration.scheme() != null && proxyConfiguration.scheme().equalsIgnoreCase("https")) {
124-
clientProxyOptions.setTlsContext(tlsContext);
125-
}
120+
HttpProxyOptions clientProxyOptions = new HttpProxyOptions();
126121

127-
if (proxyConfiguration.username() != null && proxyConfiguration.password() != null) {
128-
clientProxyOptions.setAuthorizationUsername(proxyConfiguration.username());
129-
clientProxyOptions.setAuthorizationPassword(proxyConfiguration.password());
130-
clientProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.Basic);
131-
} else {
132-
clientProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.None);
133-
}
122+
clientProxyOptions.setHost(proxyConfiguration.host());
123+
clientProxyOptions.setPort(proxyConfiguration.port());
124+
125+
if ("https".equalsIgnoreCase(proxyConfiguration.scheme())) {
126+
clientProxyOptions.setTlsContext(tlsContext);
127+
}
134128

135-
return clientProxyOptions;
129+
if (proxyConfiguration.username() != null && proxyConfiguration.password() != null) {
130+
clientProxyOptions.setAuthorizationUsername(proxyConfiguration.username());
131+
clientProxyOptions.setAuthorizationPassword(proxyConfiguration.password());
132+
clientProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.Basic);
136133
} else {
137-
return null;
134+
clientProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.None);
138135
}
136+
137+
return clientProxyOptions;
139138
}
140139

141140
/**
@@ -154,7 +153,6 @@ private <T extends CrtResource> T registerOwnedResource(T subresource) {
154153
}
155154

156155
private static URI toUri(SdkHttpRequest sdkRequest) {
157-
Validate.notNull(sdkRequest, NULL_REQUEST_ERROR_MESSAGE);
158156
return invokeSafely(() -> new URI(sdkRequest.protocol(), null, sdkRequest.host(), sdkRequest.port(),
159157
null, null, null));
160158
}
@@ -202,7 +200,6 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
202200
* pool implementation.
203201
*/
204202
private HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
205-
Validate.notNull(uri, NULL_URI_ERROR_MESSAGE);
206203
synchronized (this) {
207204
if (isClosed) {
208205
throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
@@ -247,13 +244,11 @@ private List<HttpHeader> createHttpHeaderList(URI uri, AsyncExecuteRequest async
247244
}
248245

249246
private HttpHeader[] asArray(List<HttpHeader> crtHeaderList) {
250-
return crtHeaderList.toArray(new HttpHeader[crtHeaderList.size()]);
247+
return crtHeaderList.toArray(new HttpHeader[0]);
251248
}
252249

253250
private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest, AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter) {
254251
SdkHttpRequest sdkRequest = asyncRequest.request();
255-
Validate.notNull(uri, NULL_URI_ERROR_MESSAGE);
256-
Validate.notNull(sdkRequest, NULL_REQUEST_ERROR_MESSAGE);
257252

258253
String method = sdkRequest.method().name();
259254
String encodedPath = sdkRequest.encodedPath();
@@ -273,10 +268,10 @@ private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest, AwsC
273268
@Override
274269
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
275270

276-
Validate.notNull(asyncRequest, "AsyncExecuteRequest must not be null");
277-
Validate.notNull(asyncRequest.request(), NULL_REQUEST_ERROR_MESSAGE);
278-
Validate.notNull(asyncRequest.requestContentPublisher(), "RequestContentPublisher must not be null");
279-
Validate.notNull(asyncRequest.responseHandler(), "ResponseHandler must not be null");
271+
paramNotNull(asyncRequest, "asyncRequest");
272+
paramNotNull(asyncRequest.request(), "SdkHttpRequest");
273+
paramNotNull(asyncRequest.requestContentPublisher(), "RequestContentPublisher");
274+
paramNotNull(asyncRequest.responseHandler(), "ResponseHandler");
280275

281276
URI uri = toUri(asyncRequest.request());
282277

@@ -292,7 +287,6 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
292287
*/
293288
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(uri)) {
294289
CompletableFuture<Void> requestFuture = new CompletableFuture<>();
295-
296290
// When a Connection is ready from the Connection Pool, schedule the Request on the connection
297291
crtConnPool.acquireConnection()
298292
.whenComplete((crtConn, throwable) -> {
@@ -301,23 +295,23 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
301295
try {
302296
asyncRequest.responseHandler().onError(throwable);
303297
} catch (Exception e) {
304-
log.error(() -> String.format("Exception while handling error: %s", e.toString()));
298+
log.warn(() -> "Exception while handling error", e);
305299
}
306300
requestFuture.completeExceptionally(new IOException(
307301
"Crt exception while acquiring connection", throwable));
308302
return;
309303
}
310-
311304
AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
312305
new AwsCrtAsyncHttpStreamAdapter(crtConn, requestFuture, asyncRequest, initialWindowSize);
313306
HttpRequest crtRequest = toCrtRequest(uri, asyncRequest, crtToSdkAdapter);
314-
315307
// Submit the Request on this Connection
316308
invokeSafely(() -> {
317309
try {
318310
crtConn.makeRequest(crtRequest, crtToSdkAdapter).activate();
319311
} catch (IllegalStateException | CrtRuntimeException e) {
320-
throw new IOException("Exception throw while submitting request to CRT http connection", e);
312+
log.error(() -> "Exception occurred when making the request", e);
313+
requestFuture.completeExceptionally(
314+
new IOException("Exception throw while submitting request to CRT http connection", e));
321315
}
322316
});
323317
});
@@ -348,11 +342,11 @@ public void close() {
348342
public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClient.Builder> {
349343

350344
/**
351-
* The maximum number of connections allowed per distinct endpoint
352-
* @param maxConnections maximum connections per endpoint
345+
* The Maximum number of allowed concurrent requests. For HTTP/1.1 this is the same as max connections.
346+
* @param maxConcurrency maximum concurrency per endpoint
353347
* @return The builder of the method chaining.
354348
*/
355-
Builder maxConnections(int maxConnections);
349+
Builder maxConcurrency(int maxConcurrency);
356350

357351
/**
358352
* The AWS CRT TlsCipherPreference to use for this Client
@@ -445,9 +439,9 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
445439
}
446440

447441
@Override
448-
public Builder maxConnections(int maxConnections) {
449-
Validate.isPositive(maxConnections, "maxConnections");
450-
standardOptions.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConnections);
442+
public Builder maxConcurrency(int maxConcurrency) {
443+
Validate.isPositive(maxConcurrency, "maxConcurrency");
444+
standardOptions.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConcurrency);
451445
return this;
452446
}
453447

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
3737
*/
3838
@SdkInternalApi
39-
public class AwsCrtAsyncHttpStreamAdapter implements HttpStreamResponseHandler, HttpRequestBodyStream {
39+
public final class AwsCrtAsyncHttpStreamAdapter implements HttpStreamResponseHandler, HttpRequestBodyStream {
4040
private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpStreamAdapter.class);
4141

4242
private final HttpClientConnection connection;

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtRequestBodySubscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* Implements the Subscriber<ByteBuffer> API to be be callable from AwsCrtAsyncHttpStreamAdapter.sendRequestBody()
3434
*/
3535
@SdkInternalApi
36-
public class AwsCrtRequestBodySubscriber implements Subscriber<ByteBuffer> {
36+
public final class AwsCrtRequestBodySubscriber implements Subscriber<ByteBuffer> {
3737
private static final Logger log = Logger.loggerFor(AwsCrtRequestBodySubscriber.class);
3838

3939
private final int windowSize;
@@ -66,7 +66,7 @@ protected void requestDataIfNecessary() {
6666

6767
@Override
6868
public void onSubscribe(Subscription s) {
69-
Validate.notNull(s, "Subscription should not be null");
69+
Validate.paramNotNull(s, "s");
7070

7171
boolean wasFirstSubscription = subscriptionRef.compareAndSet(null, s);
7272

@@ -81,7 +81,7 @@ public void onSubscribe(Subscription s) {
8181

8282
@Override
8383
public void onNext(ByteBuffer byteBuffer) {
84-
Validate.notNull(byteBuffer, "ByteBuffer should not be null");
84+
Validate.paramNotNull(byteBuffer, "byteBuffer");
8585
queuedBuffers.add(byteBuffer);
8686
queuedByteCount.addAndGet(byteBuffer.remaining());
8787
requestDataIfNecessary();

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* Adapts an AWS Common Runtime Response Body stream from CrtHttpStreamHandler to a Publisher<ByteBuffer>
3939
*/
4040
@SdkInternalApi
41-
public class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
41+
public final class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
4242
private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisher.class);
4343
private static final LongUnaryOperator DECREMENT_IF_GREATER_THAN_ZERO = x -> ((x > 0) ? (x - 1) : (x));
4444

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void callsOnStreamForEmptyResponseContent() throws Exception {
127127
stubFor(any(urlEqualTo("/")).willReturn(aResponse().withStatus(204).withHeader("foo", "bar")));
128128

129129
CompletableFuture<Boolean> streamReceived = new CompletableFuture<>();
130-
final AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
130+
AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
131131

132132
SdkAsyncHttpResponseHandler handler = new TestResponseHandler() {
133133
@Override
@@ -167,9 +167,9 @@ public void testGetRequest() throws Exception {
167167
.withBody(body)));
168168

169169
CompletableFuture<Boolean> streamReceived = new CompletableFuture<>();
170-
final AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
170+
AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
171171
Sha256BodySubscriber bodySha256Subscriber = new Sha256BodySubscriber();
172-
final AtomicReference<Throwable> error = new AtomicReference<>(null);
172+
AtomicReference<Throwable> error = new AtomicReference<>(null);
173173

174174
SdkAsyncHttpResponseHandler handler = new SdkAsyncHttpResponseHandler() {
175175
@Override
@@ -208,8 +208,8 @@ public void onError(Throwable t) {
208208

209209
private void makePutRequest(String path, byte[] reqBody, int expectedStatus) throws Exception {
210210
CompletableFuture<Boolean> streamReceived = new CompletableFuture<>();
211-
final AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
212-
final AtomicReference<Throwable> error = new AtomicReference<>(null);
211+
AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
212+
AtomicReference<Throwable> error = new AtomicReference<>(null);
213213

214214
Subscriber<ByteBuffer> subscriber = CrtHttpClientTestUtils.createDummySubscriber();
215215

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/H1ServerBehaviorTest.java

Lines changed: 9 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,69 +15,30 @@
1515

1616
package software.amazon.awssdk.http.crt;
1717

18-
import org.junit.After;
19-
import org.junit.Before;
20-
import org.junit.Test;
18+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
19+
2120
import software.amazon.awssdk.crt.io.EventLoopGroup;
2221
import software.amazon.awssdk.crt.io.HostResolver;
23-
import software.amazon.awssdk.http.H1ServerBehaviorTestBase;
22+
import software.amazon.awssdk.http.SdkAsyncHttpClientH1TestSuite;
2423
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
25-
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
2624
import software.amazon.awssdk.utils.AttributeMap;
2725

28-
import static org.assertj.core.api.Assertions.assertThat;
29-
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
30-
3126
/**
3227
* Testing the scenario where h1 server sends 5xx errors.
3328
*/
34-
public class H1ServerBehaviorTest extends H1ServerBehaviorTestBase {
35-
private SdkAsyncHttpClient crtClient;
29+
public class H1ServerBehaviorTest extends SdkAsyncHttpClientH1TestSuite {
3630

3731
@Override
38-
protected SdkAsyncHttpClient getTestClient() { return crtClient; }
39-
40-
@Before
41-
public void setup() throws Exception {
42-
super.setup();
43-
32+
protected SdkAsyncHttpClient setupClient() {
4433
int numThreads = Runtime.getRuntime().availableProcessors();
4534
try (EventLoopGroup eventLoopGroup = new EventLoopGroup(numThreads);
4635
HostResolver hostResolver = new HostResolver(eventLoopGroup)) {
4736

48-
crtClient = AwsCrtAsyncHttpClient.builder()
49-
.eventLoopGroup(eventLoopGroup)
50-
.hostResolver(hostResolver)
51-
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
52-
}
53-
}
54-
55-
56-
@After
57-
public void teardown() throws InterruptedException {
58-
super.teardown();
59-
60-
if (crtClient != null) {
61-
crtClient.close();
37+
return AwsCrtAsyncHttpClient.builder()
38+
.eventLoopGroup(eventLoopGroup)
39+
.hostResolver(hostResolver)
40+
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
6241
}
63-
crtClient = null;
64-
}
65-
66-
@Test
67-
public void connectionReceiveServerErrorStatusShouldNotReuseConnection() {
68-
assertThat(crtClient).isNotNull();
69-
super.connectionReceiveServerErrorStatusShouldNotReuseConnection();
70-
}
71-
72-
@Test
73-
public void connectionReceiveOkStatusShouldReuseConnection() {
74-
assertThat(crtClient).isNotNull();
75-
super.connectionReceiveOkStatusShouldReuseConnection();
7642
}
7743

78-
@Test
79-
public void connectionReceiveCloseHeaderShouldNotReuseConnection() throws InterruptedException {
80-
assertThat(crtClient).isNotNull();
81-
super.connectionReceiveCloseHeaderShouldNotReuseConnection();
82-
}
8344
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/ProxyWireMockTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ public void teardown() {
109109
public void proxyConfigured_httpGet() throws Throwable {
110110

111111
CompletableFuture<Boolean> streamReceived = new CompletableFuture<>();
112-
final AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
113-
final AtomicReference<Throwable> error = new AtomicReference<>(null);
112+
AtomicReference<SdkHttpResponse> response = new AtomicReference<>(null);
113+
AtomicReference<Throwable> error = new AtomicReference<>(null);
114114

115115
Subscriber<ByteBuffer> subscriber = CrtHttpClientTestUtils.createDummySubscriber();
116116

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License").
5+
# You may not use this file except in compliance with the License.
6+
# A copy of the License is located at
7+
#
8+
# http://aws.amazon.com/apache2.0
9+
#
10+
# or in the "license" file accompanying this file. This file is distributed
11+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
# express or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
#
15+
16+
log4j.rootLogger=WARN, A1
17+
log4j.appender.A1=org.apache.log4j.ConsoleAppender
18+
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
19+
20+
# Print the date in ISO 8601 format
21+
log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
22+
23+
24+

0 commit comments

Comments
 (0)