Skip to content

Commit 9b4c595

Browse files
committed
Update CRT Client and add Stress Test
1 parent e69b80b commit 9b4c595

File tree

5 files changed

+317
-39
lines changed

5 files changed

+317
-39
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt;
17+
18+
import static software.amazon.awssdk.testutils.service.AwsTestBase.CREDENTIALS_PROVIDER_CHAIN;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import org.junit.Assert;
29+
import org.junit.experimental.theories.DataPoints;
30+
import org.junit.experimental.theories.FromDataPoints;
31+
import org.junit.experimental.theories.Theories;
32+
import org.junit.experimental.theories.Theory;
33+
import org.junit.runner.RunWith;
34+
import software.amazon.awssdk.crt.CrtResource;
35+
import software.amazon.awssdk.crt.io.ClientBootstrap;
36+
import software.amazon.awssdk.crt.io.SocketOptions;
37+
import software.amazon.awssdk.crt.io.TlsContext;
38+
import software.amazon.awssdk.crt.io.TlsContextOptions;
39+
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
40+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
41+
import software.amazon.awssdk.regions.Region;
42+
import software.amazon.awssdk.services.kms.KmsAsyncClient;
43+
import software.amazon.awssdk.services.kms.model.GenerateRandomRequest;
44+
import software.amazon.awssdk.services.kms.model.GenerateRandomResponse;
45+
import software.amazon.awssdk.utils.AttributeMap;
46+
47+
@RunWith(Theories.class)
48+
public class AwsCrtCombinatorialConfigStressTest {
49+
private final static String KEY_ALIAS = "alias/aws-sdk-java-v2-integ-test";
50+
private final static Region REGION = Region.US_EAST_1;
51+
private final static List<SdkAsyncHttpClient> awsCrtHttpClients = new ArrayList<>();
52+
private final static int DEFAULT_KEY_SIZE = 32;
53+
54+
// Success rate will currently never go above ~99% due to aws-c-http not detecting connection close headers, and KMS
55+
// closing the connection after the 100th Request on a Http Connection.
56+
// Tracking Issue: https://github.com/awslabs/aws-c-http/issues/106
57+
private static double MINIMUM_SUCCESS_RATE = 0.95;
58+
59+
private boolean testWithClient(KmsAsyncClient asyncKMSClient, int numberOfRequests) {
60+
List<CompletableFuture<GenerateRandomResponse>> futures = new ArrayList<>();
61+
62+
for (int i = 0; i < numberOfRequests; i++) {
63+
GenerateRandomRequest request = GenerateRandomRequest.builder().numberOfBytes(DEFAULT_KEY_SIZE).build();
64+
CompletableFuture<GenerateRandomResponse> future = asyncKMSClient.generateRandom(request);
65+
futures.add(future);
66+
}
67+
68+
List<Exception> failures = new ArrayList<>();
69+
int actualNumSucceeded = 0;
70+
for (CompletableFuture<GenerateRandomResponse> f : futures) {
71+
try {
72+
GenerateRandomResponse resp = f.get(5, TimeUnit.MINUTES);
73+
if (200 == resp.sdkHttpResponse().statusCode()) {
74+
actualNumSucceeded += 1;
75+
}
76+
} catch (Exception e) {
77+
failures.add(e);
78+
}
79+
}
80+
81+
int minimumNumSucceeded = (int)(numberOfRequests * (MINIMUM_SUCCESS_RATE));
82+
boolean succeeded = true;
83+
if (actualNumSucceeded < minimumNumSucceeded) {
84+
System.err.println("Failure Metrics: numRequests=" + numberOfRequests + ", numSucceeded=" + actualNumSucceeded);
85+
succeeded = false;
86+
}
87+
88+
if (!succeeded) {
89+
for(Exception e: failures) {
90+
System.err.println(e.getMessage());
91+
}
92+
failures.get(0).printStackTrace();
93+
}
94+
95+
96+
97+
return succeeded;
98+
}
99+
100+
private boolean testWithNewClient(int eventLoopSize, int numberOfRequests) {
101+
try (ClientBootstrap newBootstrap = new ClientBootstrap(eventLoopSize)) {
102+
try (SocketOptions newSocketOptions = new SocketOptions()) {
103+
try (TlsContextOptions newContextOptions = new TlsContextOptions()) {
104+
try (TlsContext newTlsContext = new TlsContext(newContextOptions)) {
105+
try (SdkAsyncHttpClient newAwsCrtHttpClient = AwsCrtAsyncHttpClient.builder()
106+
.bootstrap(newBootstrap)
107+
.socketOptions(newSocketOptions)
108+
.tlsContext(newTlsContext)
109+
.build()) {
110+
try (KmsAsyncClient newAsyncKMSClient = KmsAsyncClient.builder()
111+
.region(REGION)
112+
.httpClient(newAwsCrtHttpClient)
113+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
114+
.build()) {
115+
boolean succeeded = testWithClient(newAsyncKMSClient, numberOfRequests);
116+
return succeeded;
117+
}
118+
}
119+
}
120+
}
121+
}
122+
}
123+
}
124+
125+
@DataPoints("EventLoop")
126+
public static int[] eventLoopValues(){
127+
return new int[]{1, 4};
128+
}
129+
130+
@DataPoints("ConnectionPool")
131+
public static int[] connectionsValues(){
132+
/* Don't use 1 connection Pool of size 1, otherwise test takes too long */
133+
return new int[]{10, 100};
134+
}
135+
136+
@DataPoints("NumRequests")
137+
public static int[] requestValues(){
138+
return new int[]{1, 25, 250};
139+
}
140+
141+
@DataPoints("ParallelClients")
142+
public static int[] parallelClientValues(){
143+
return new int[]{1, 2, 8};
144+
}
145+
146+
@DataPoints("SharedClient")
147+
public static boolean[] sharedClientValue(){
148+
return new boolean[]{true, false};
149+
}
150+
151+
@Theory
152+
public void checkAllCombinations(@FromDataPoints("EventLoop") int eventLoopSize,
153+
@FromDataPoints("ConnectionPool") int connectionPoolSize,
154+
@FromDataPoints("NumRequests") int numberOfRequests,
155+
@FromDataPoints("ParallelClients") int numberOfParallelClients,
156+
@FromDataPoints("SharedClient") boolean useSharedClient) throws Exception {
157+
158+
try {
159+
if (CrtResource.getAllocatedNativeResourceCount() > 0) {
160+
System.err.println("Leaked Resources: " + String.join(", ", CrtResource.getAllocatedNativeResources()));
161+
}
162+
Assert.assertEquals("Expected Zero allocated AwsCrtResources", 0, CrtResource.getAllocatedNativeResourceCount());
163+
164+
String testName = String.format("Testing with eventLoopSize %d, connectionPoolSize %d, numberOfRequests %d, " +
165+
"numberOfParallelJavaClients %d, useSharedClient %b", eventLoopSize, connectionPoolSize,
166+
numberOfRequests, numberOfParallelClients, useSharedClient);
167+
System.out.println("\n" + testName);
168+
169+
CountDownLatch latch = new CountDownLatch(numberOfParallelClients);
170+
171+
AttributeMap attributes = AttributeMap.builder()
172+
.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, connectionPoolSize)
173+
.build();
174+
175+
ClientBootstrap bootstrap = new ClientBootstrap(eventLoopSize);
176+
SocketOptions socketOptions = new SocketOptions();
177+
TlsContext tlsContext = new TlsContext();
178+
SdkAsyncHttpClient awsCrtHttpClient = AwsCrtAsyncHttpClient.builder()
179+
.bootstrap(bootstrap)
180+
.socketOptions(socketOptions)
181+
.tlsContext(tlsContext)
182+
.buildWithDefaults(attributes);
183+
184+
KmsAsyncClient sharedAsyncKMSClient = KmsAsyncClient.builder()
185+
.region(REGION)
186+
.httpClient(awsCrtHttpClient)
187+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
188+
.build();
189+
190+
final AtomicBoolean failed = new AtomicBoolean(false);
191+
192+
long start = System.currentTimeMillis();
193+
ExecutorService pool = Executors.newCachedThreadPool();
194+
for (int threads = 0; threads < numberOfParallelClients; threads++) {
195+
pool.submit(() -> {
196+
if (useSharedClient) {
197+
if (!testWithClient(sharedAsyncKMSClient, numberOfRequests)) {
198+
System.err.println("Failed: " + testName);
199+
failed.set(true);
200+
}
201+
} else {
202+
if (!testWithNewClient(eventLoopSize, numberOfRequests)) {
203+
System.err.println("Failed: " + testName);
204+
failed.set(true);
205+
}
206+
}
207+
latch.countDown();
208+
});
209+
}
210+
211+
latch.await(5, TimeUnit.MINUTES);
212+
213+
sharedAsyncKMSClient.close();
214+
awsCrtHttpClient.close();
215+
tlsContext.close();
216+
socketOptions.close();
217+
bootstrap.close();
218+
219+
Assert.assertFalse(failed.get());
220+
221+
if (CrtResource.getAllocatedNativeResourceCount() > 0) {
222+
System.err.println("Leaked Resources: " + String.join(", ", CrtResource.getAllocatedNativeResources()));
223+
}
224+
225+
Assert.assertEquals("Expected Zero allocated AwsCrtResources", 0, CrtResource.getAllocatedNativeResourceCount());
226+
227+
float numSeconds = (float) ((System.currentTimeMillis() - start) / 1000.0);
228+
String timeElapsed = String.format("%.2f sec", numSeconds);
229+
230+
System.out.println("Passed: " + testName + ", Time " + timeElapsed);
231+
} catch (Exception e) {
232+
System.err.println(e.getMessage());
233+
e.printStackTrace();
234+
}
235+
}
236+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
200200
HttpRequest crtRequest = toCrtRequest(uri, asyncRequest);
201201

202202
CompletableFuture<Void> requestFuture = new CompletableFuture<>();
203-
AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
204-
new AwsCrtAsyncHttpStreamAdapter(requestFuture, asyncRequest, windowSize);
203+
205204

206205
HttpRequestOptions reqOptions = new HttpRequestOptions();
207206
reqOptions.setBodyBufferSize(httpBodyUpdateSize);
@@ -215,8 +214,8 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
215214
return;
216215
}
217216

218-
// When the Request is complete, return our connection back to the Connection Pool
219-
requestFuture.whenComplete((v, t) -> crtConnPool.releaseConnection(crtConn));
217+
AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
218+
new AwsCrtAsyncHttpStreamAdapter(crtConn, requestFuture, asyncRequest, windowSize);
220219

221220
// Submit the Request on this Connection
222221
invokeSafely(() -> crtConn.makeRequest(crtRequest, reqOptions, crtToSdkAdapter));

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
2323
import software.amazon.awssdk.crt.CRT;
2424
import software.amazon.awssdk.crt.http.CrtHttpStreamHandler;
25+
import software.amazon.awssdk.crt.http.HttpConnection;
2526
import software.amazon.awssdk.crt.http.HttpException;
2627
import software.amazon.awssdk.crt.http.HttpHeader;
2728
import software.amazon.awssdk.crt.http.HttpStream;
@@ -36,29 +37,41 @@
3637
@SdkInternalApi
3738
public class AwsCrtAsyncHttpStreamAdapter implements CrtHttpStreamHandler {
3839
private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpStreamAdapter.class);
39-
private final AsyncExecuteRequest sdkRequest;
40+
41+
private final HttpConnection connection;
4042
private final CompletableFuture<Void> responseComplete;
43+
private final AsyncExecuteRequest sdkRequest;
4144
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
4245
private final int windowSize;
4346
private final AwsCrtRequestBodySubscriber requestBodySubscriber;
4447
private AwsCrtResponseBodyPublisher respBodyPublisher = null;
4548

46-
public AwsCrtAsyncHttpStreamAdapter(CompletableFuture<Void> responseComplete, AsyncExecuteRequest sdkRequest,
47-
int windowSize) {
49+
public AwsCrtAsyncHttpStreamAdapter(HttpConnection connection, CompletableFuture<Void> responseComplete,
50+
AsyncExecuteRequest sdkRequest, int windowSize) {
51+
Validate.notNull(connection, "HttpConnection is null");
4852
Validate.notNull(responseComplete, "reqComplete Future is null");
4953
Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
5054
Validate.isPositive(windowSize, "windowSize is <= 0");
5155

52-
this.sdkRequest = sdkRequest;
56+
this.connection = connection;
5357
this.responseComplete = responseComplete;
58+
this.sdkRequest = sdkRequest;
5459
this.windowSize = windowSize;
5560
this.requestBodySubscriber = new AwsCrtRequestBodySubscriber(windowSize);
5661

5762
sdkRequest.requestContentPublisher().subscribe(requestBodySubscriber);
5863
}
5964

65+
private void initRespBodyPublisherIfNeeded(HttpStream stream) {
66+
if (respBodyPublisher == null) {
67+
respBodyPublisher = new AwsCrtResponseBodyPublisher(connection, stream, responseComplete, windowSize);
68+
}
69+
}
70+
6071
@Override
6172
public void onResponseHeaders(HttpStream stream, int responseStatusCode, HttpHeader[] nextHeaders) {
73+
initRespBodyPublisherIfNeeded(stream);
74+
6275
respBuilder.statusCode(responseStatusCode);
6376

6477
for (HttpHeader h : nextHeaders) {
@@ -68,10 +81,10 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, HttpHea
6881

6982
@Override
7083
public void onResponseHeadersDone(HttpStream stream, boolean hasBody) {
84+
initRespBodyPublisherIfNeeded(stream);
85+
7186
respBuilder.statusCode(stream.getResponseStatusCode());
7287
sdkRequest.responseHandler().onHeaders(respBuilder.build());
73-
respBodyPublisher = new AwsCrtResponseBodyPublisher(stream, responseComplete, windowSize);
74-
7588

7689
if (!hasBody) {
7790
respBodyPublisher.setQueueComplete();
@@ -82,6 +95,8 @@ public void onResponseHeadersDone(HttpStream stream, boolean hasBody) {
8295

8396
@Override
8497
public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
98+
initRespBodyPublisherIfNeeded(stream);
99+
85100
if (respBodyPublisher == null) {
86101
log.error(() -> "Publisher is null, onResponseHeadersDone() was never called");
87102
throw new IllegalStateException("Publisher is null, onResponseHeadersDone() was never called");
@@ -101,11 +116,12 @@ public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
101116

102117
@Override
103118
public void onResponseComplete(HttpStream stream, int errorCode) {
119+
initRespBodyPublisherIfNeeded(stream);
120+
104121
if (errorCode == CRT.AWS_CRT_SUCCESS) {
105122
log.debug(() -> "Response Completed Successfully");
106123
respBodyPublisher.setQueueComplete();
107124
respBodyPublisher.publishToSubscribers();
108-
responseComplete.complete(null);
109125
} else {
110126
HttpException error = new HttpException(errorCode);
111127
log.error(() -> "Response Encountered an Error.", error);
@@ -114,15 +130,9 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
114130
sdkRequest.responseHandler().onError(error);
115131

116132
// Invoke Error Callback on any Subscriber's of the Response Body
117-
if (respBodyPublisher != null) {
118-
respBodyPublisher.setError(error);
119-
respBodyPublisher.publishToSubscribers();
120-
}
121-
122-
responseComplete.completeExceptionally(error);
133+
respBodyPublisher.setError(error);
134+
respBodyPublisher.publishToSubscribers();
123135
}
124-
125-
stream.close();
126136
}
127137

128138
@Override

0 commit comments

Comments
 (0)