Skip to content

Commit 30d82da

Browse files
committed
Update AWS CRT Client to use Connection Manager
1 parent 6a2440e commit 30d82da

File tree

3 files changed

+108
-75
lines changed

3 files changed

+108
-75
lines changed

http-clients/aws-crt-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<dependency>
3434
<groupId>software.amazon.awssdk.crt</groupId>
3535
<artifactId>aws-crt</artifactId>
36-
<version>0.3.12</version>
36+
<version>0.3.14</version>
3737
</dependency>
3838

3939
<!--SDK dependencies-->

http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientKmsIntegrationTest.java

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
import software.amazon.awssdk.crt.CrtResource;
1414
import software.amazon.awssdk.crt.io.ClientBootstrap;
1515
import software.amazon.awssdk.crt.io.SocketOptions;
16+
import software.amazon.awssdk.crt.io.TlsCipherPreference;
1617
import software.amazon.awssdk.crt.io.TlsContext;
18+
import software.amazon.awssdk.crt.io.TlsContextOptions;
1719
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
1820
import software.amazon.awssdk.regions.Region;
1921
import software.amazon.awssdk.services.kms.KmsAsyncClient;
@@ -32,8 +34,7 @@
3234
public class AwsCrtClientKmsIntegrationTest {
3335
private static String KEY_ALIAS = "alias/aws-sdk-java-v2-integ-test";
3436
private static Region REGION = Region.US_EAST_1;
35-
private static SdkAsyncHttpClient client;
36-
private static KmsAsyncClient kms;
37+
private static List<SdkAsyncHttpClient> awsCrtHttpClients = new ArrayList<>();
3738

3839
List<CrtResource> crtResources = new ArrayList<>();
3940

@@ -45,40 +46,43 @@ private void addResource(CrtResource resource) {
4546
public void setup() {
4647
Assert.assertEquals("Expected Zero allocated AwsCrtResources", 0, CrtResource.getAllocatedNativeResourceCount());
4748

48-
ClientBootstrap bootstrap = new ClientBootstrap(1);
49-
SocketOptions socketOptions = new SocketOptions();
50-
TlsContext tlsContext = new TlsContext();
51-
52-
addResource(bootstrap);
53-
addResource(socketOptions);
54-
addResource(tlsContext);
55-
56-
client = AwsCrtAsyncHttpClient.builder()
57-
.bootstrap(bootstrap)
58-
.socketOptions(socketOptions)
59-
.tlsContext(tlsContext)
60-
.build();
61-
62-
kms = KmsAsyncClient.builder()
63-
.region(REGION)
64-
.httpClient(client)
65-
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
66-
.build();
67-
}
49+
// Create an Http Client for each TLS Cipher Preference supported on the current platform
50+
for (TlsCipherPreference pref: TlsCipherPreference.values()) {
51+
if (!TlsContextOptions.isCipherPreferenceSupported(pref)) {
52+
continue;
53+
}
6854

69-
@After
70-
public void tearDown() {
71-
kms.close();
72-
client.close();
55+
ClientBootstrap bootstrap = new ClientBootstrap(1);
56+
SocketOptions socketOptions = new SocketOptions();
57+
TlsContext tlsContext = new TlsContext();
58+
59+
addResource(bootstrap);
60+
addResource(socketOptions);
61+
addResource(tlsContext);
62+
63+
SdkAsyncHttpClient awsCrtHttpClient = AwsCrtAsyncHttpClient.builder()
64+
.bootstrap(bootstrap)
65+
.socketOptions(socketOptions)
66+
.tlsContext(tlsContext)
67+
.build();
7368

69+
awsCrtHttpClients.add(awsCrtHttpClient);
70+
}
71+
}
72+
73+
private void closeResources() {
7474
for (CrtResource r: crtResources) {
7575
r.close();
7676
}
77+
}
7778

79+
@After
80+
public void tearDown() {
81+
closeResources();
7882
Assert.assertEquals("Expected Zero allocated AwsCrtResources", 0, CrtResource.getAllocatedNativeResourceCount());
7983
}
8084

81-
private boolean doesKeyExist(String keyAlias) {
85+
private boolean doesKeyExist(KmsAsyncClient kms, String keyAlias) {
8286
try {
8387
DescribeKeyRequest req = DescribeKeyRequest.builder().keyId(keyAlias).build();
8488
DescribeKeyResponse resp = kms.describeKey(req).get();
@@ -89,51 +93,66 @@ private boolean doesKeyExist(String keyAlias) {
8993
}
9094
}
9195

92-
private void createKeyAlias(String keyId, String keyAlias) throws Exception {
96+
private void createKeyAlias(KmsAsyncClient kms, String keyId, String keyAlias) throws Exception {
9397
CreateAliasRequest req = CreateAliasRequest.builder().aliasName(keyAlias).targetKeyId(keyId).build();
9498
CreateAliasResponse resp = kms.createAlias(req).get();
9599
Assert.assertEquals(200, resp.sdkHttpResponse().statusCode());
96100
}
97101

98-
private String createKey() throws Exception {
102+
private String createKey(KmsAsyncClient kms) throws Exception {
99103
CreateKeyRequest req = CreateKeyRequest.builder().build();
100104
CreateKeyResponse resp = kms.createKey(req).get();
101105
Assert.assertEquals(200, resp.sdkHttpResponse().statusCode());
102106
return resp.keyMetadata().keyId();
103107
}
104108

105-
private void createKeyIfNotExists(String keyAlias) throws Exception {
106-
if (!doesKeyExist(keyAlias)) {
107-
String keyId = createKey();
108-
createKeyAlias(keyId, KEY_ALIAS);
109+
private void createKeyIfNotExists(KmsAsyncClient kms, String keyAlias) throws Exception {
110+
if (!doesKeyExist(kms, keyAlias)) {
111+
String keyId = createKey(kms);
112+
createKeyAlias(kms, keyId, KEY_ALIAS);
109113
}
110114
}
111115

112-
private SdkBytes encrypt(String keyId, String plaintext) throws Exception {
116+
private SdkBytes encrypt(KmsAsyncClient kms, String keyId, String plaintext) throws Exception {
113117
SdkBytes bytes = SdkBytes.fromUtf8String(plaintext);
114118
EncryptRequest req = EncryptRequest.builder().keyId(keyId).plaintext(bytes).build();
115119
EncryptResponse resp = kms.encrypt(req).get();
116120
Assert.assertEquals(200, resp.sdkHttpResponse().statusCode());
117121
return resp.ciphertextBlob();
118122
}
119123

120-
private String decrypt(SdkBytes ciphertext) throws Exception {
124+
private String decrypt(KmsAsyncClient kms, SdkBytes ciphertext) throws Exception {
121125
DecryptRequest req = DecryptRequest.builder().ciphertextBlob(ciphertext).build();
122126
DecryptResponse resp = kms.decrypt(req).get();
123127
Assert.assertEquals(200, resp.sdkHttpResponse().statusCode());
124128
return resp.plaintext().asUtf8String();
125129
}
126130

127-
@Test
128-
public void testEncryptDecryptWithKms() throws Exception {
129-
createKeyIfNotExists(KEY_ALIAS);
130-
Assert.assertTrue(doesKeyExist(KEY_ALIAS));
131-
Assert.assertFalse(doesKeyExist("alias/does-not-exist-" + UUID.randomUUID()));
131+
private void testEncryptDecryptWithKms(KmsAsyncClient kms) throws Exception {
132+
createKeyIfNotExists(kms, KEY_ALIAS);
133+
Assert.assertTrue(doesKeyExist(kms, KEY_ALIAS));
134+
Assert.assertFalse(doesKeyExist(kms, "alias/does-not-exist-" + UUID.randomUUID()));
132135

133136
String secret = UUID.randomUUID().toString();
134-
SdkBytes cipherText = encrypt(KEY_ALIAS, secret);
135-
String plainText = decrypt(cipherText);
137+
SdkBytes cipherText = encrypt(kms, KEY_ALIAS, secret);
138+
String plainText = decrypt(kms, cipherText);
136139

137140
Assert.assertEquals(plainText, secret);
138141
}
142+
143+
@Test
144+
public void testEncryptDecryptWithKms() throws Exception {
145+
for (SdkAsyncHttpClient awsCrtHttpClient: awsCrtHttpClients) {
146+
KmsAsyncClient kms = KmsAsyncClient.builder()
147+
.region(REGION)
148+
.httpClient(awsCrtHttpClient)
149+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
150+
.build();
151+
152+
testEncryptDecryptWithKms(kms);
153+
154+
kms.close();
155+
awsCrtHttpClient.close();
156+
}
157+
}
139158
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.http.crt;
1717

18+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS;
19+
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.MAX_CONNECTIONS;
1820
import static software.amazon.awssdk.utils.CollectionUtils.isNullOrEmpty;
1921
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2022

@@ -25,11 +27,11 @@
2527
import java.util.Optional;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.ConcurrentHashMap;
28-
2930
import software.amazon.awssdk.annotations.SdkPublicApi;
30-
import software.amazon.awssdk.crt.http.HttpConnection;
31+
import software.amazon.awssdk.crt.http.HttpConnectionPoolManager;
3132
import software.amazon.awssdk.crt.http.HttpHeader;
3233
import software.amazon.awssdk.crt.http.HttpRequest;
34+
import software.amazon.awssdk.crt.http.HttpRequestOptions;
3335
import software.amazon.awssdk.crt.io.ClientBootstrap;
3436
import software.amazon.awssdk.crt.io.SocketOptions;
3537
import software.amazon.awssdk.crt.io.TlsContext;
@@ -59,23 +61,26 @@ public class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
5961
private static final int DEFAULT_STREAM_WINDOW_SIZE = 16 * 1024 * 1024; // 16 MB Total Buffer size
6062
private static final int DEFAULT_HTTP_BODY_UPDATE_SIZE = 4 * 1024 * 1024; // 4 MB Update size from Native
6163

62-
private final Map<URI, HttpConnection> connections = new ConcurrentHashMap<>();
64+
private final Map<URI, HttpConnectionPoolManager> connectionPools = new ConcurrentHashMap<>();
6365
private final ClientBootstrap bootstrap;
6466
private final SocketOptions socketOptions;
6567
private final TlsContext tlsContext;
6668
private final int windowSize;
69+
private final int maxConnectionsPerEndpoint;
6770
private final int httpBodyUpdateSize;
6871

69-
public AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
70-
this(builder.bootstrap, builder.socketOptions, builder.tlsContext, builder.windowSize, builder.httpBodyUpdateSize);
72+
public AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
73+
this(builder.bootstrap, builder.socketOptions, builder.tlsContext, builder.windowSize,
74+
config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS), builder.httpBodyUpdateSize);
7175
}
7276

7377
public AwsCrtAsyncHttpClient(ClientBootstrap bootstrap, SocketOptions sockOpts, TlsContext tlsContext) {
74-
this(bootstrap, sockOpts, tlsContext, DEFAULT_STREAM_WINDOW_SIZE, DEFAULT_HTTP_BODY_UPDATE_SIZE);
78+
this(bootstrap, sockOpts, tlsContext, DEFAULT_STREAM_WINDOW_SIZE, GLOBAL_HTTP_DEFAULTS.get(MAX_CONNECTIONS),
79+
DEFAULT_HTTP_BODY_UPDATE_SIZE);
7580
}
7681

7782
public AwsCrtAsyncHttpClient(ClientBootstrap bootstrap, SocketOptions sockOpts, TlsContext tlsContext,
78-
int windowSize, int httpBodyUpdateSize) {
83+
int windowSize, int maxConns, int httpBodyUpdateSize) {
7984
Validate.notNull(bootstrap, "ClientBootstrap must not be null");
8085
Validate.notNull(sockOpts, "SocketOptions must not be null");
8186
Validate.notNull(tlsContext, "TlsContext must not be null");
@@ -85,6 +90,7 @@ public AwsCrtAsyncHttpClient(ClientBootstrap bootstrap, SocketOptions sockOpts,
8590
this.socketOptions = sockOpts;
8691
this.tlsContext = tlsContext;
8792
this.windowSize = windowSize;
93+
this.maxConnectionsPerEndpoint = maxConns;
8894
this.httpBodyUpdateSize = httpBodyUpdateSize;
8995
}
9096

@@ -103,38 +109,30 @@ public String clientName() {
103109
return AWS_COMMON_RUNTIME;
104110
}
105111

106-
private HttpConnection createConnection(URI uri) {
112+
private HttpConnectionPoolManager createConnectionPool(URI uri) {
107113
Validate.notNull(uri, "URI must not be null");
108-
log.debug(() -> "Creating Connection to: " + uri);
109-
return invokeSafely(() -> HttpConnection.createConnection(uri, bootstrap, socketOptions, tlsContext,
110-
windowSize, httpBodyUpdateSize).get());
114+
log.debug(() -> "Creating ConnectionPool for: " + uri);
115+
return new HttpConnectionPoolManager(bootstrap, socketOptions, tlsContext, uri, windowSize, maxConnectionsPerEndpoint);
111116
}
112117

113-
private HttpConnection getOrCreateConnection(URI uri) {
118+
private HttpConnectionPoolManager getOrCreateConnectionPool(URI uri) {
114119
Validate.notNull(uri, "URI must not be null");
115-
HttpConnection connToReturn = connections.get(uri);
120+
HttpConnectionPoolManager connPool = connectionPools.get(uri);
116121

117-
if (connToReturn == null) {
118-
HttpConnection newConn = createConnection(uri);
119-
HttpConnection alreadyExistingConn = connections.putIfAbsent(uri, newConn);
122+
if (connPool == null) {
123+
HttpConnectionPoolManager newConnPool = createConnectionPool(uri);
124+
HttpConnectionPoolManager alreadyExistingConnPool = connectionPools.putIfAbsent(uri, newConnPool);
120125

121-
if (alreadyExistingConn == null) {
122-
connToReturn = newConn;
126+
if (alreadyExistingConnPool == null) {
127+
connPool = newConnPool;
123128
} else {
124129
// Multiple threads trying to open connections to the same URI at once, close the newer one
125-
newConn.close();
126-
connToReturn = alreadyExistingConn;
130+
newConnPool.close();
131+
connPool = alreadyExistingConnPool;
127132
}
128133
}
129134

130-
// If connection was shutdown by peer, open a new connection
131-
if (connToReturn.getShutdownFuture().isDone()) {
132-
connections.remove(uri, connToReturn);
133-
connToReturn.close();
134-
return getOrCreateConnection(uri);
135-
}
136-
137-
return connToReturn;
135+
return connPool;
138136
}
139137

140138
private List<HttpHeader> createHttpHeaderList(URI uri, AsyncExecuteRequest asyncRequest) {
@@ -191,22 +189,38 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
191189
Validate.notNull(asyncRequest.responseHandler(), "ResponseHandler must not be null");
192190

193191
URI uri = toUri(asyncRequest.request());
194-
HttpConnection crtConn = getOrCreateConnection(uri);
192+
HttpConnectionPoolManager crtConnPool = getOrCreateConnectionPool(uri);
195193
HttpRequest crtRequest = toCrtRequest(uri, asyncRequest);
196194

197195
CompletableFuture<Void> requestFuture = new CompletableFuture<>();
198196
AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
199197
new AwsCrtAsyncHttpStreamAdapter(requestFuture, asyncRequest, windowSize);
200198

201-
invokeSafely(() -> crtConn.makeRequest(crtRequest, crtToSdkAdapter));
199+
HttpRequestOptions reqOptions = new HttpRequestOptions();
200+
reqOptions.setBodyBufferSize(httpBodyUpdateSize);
201+
202+
// When a Connection is ready from the Connection Pool, schedule the Request on the connection
203+
crtConnPool.acquireConnection().whenComplete((crtConn, throwable) -> {
204+
// If we didn't get a connection for some reason, fail the request
205+
if (throwable != null) {
206+
requestFuture.completeExceptionally(throwable);
207+
return;
208+
}
209+
210+
// When the Request is complete, return our connection back to the Connection Pool
211+
requestFuture.whenComplete((v, t) -> crtConnPool.releaseConnection(crtConn));
212+
213+
// Submit the Request on this Connection
214+
invokeSafely(() -> crtConn.makeRequest(crtRequest, reqOptions, crtToSdkAdapter));
215+
});
202216

203217
return requestFuture;
204218
}
205219

206220
@Override
207221
public void close() {
208-
for (HttpConnection conn : connections.values()) {
209-
conn.close();
222+
for (HttpConnectionPoolManager connPool : connectionPools.values()) {
223+
connPool.close();
210224
}
211225
}
212226

0 commit comments

Comments
 (0)