Skip to content

Commit e69b80b

Browse files
committed
Update CRT Client
1 parent 244de0b commit e69b80b

File tree

7 files changed

+79
-35
lines changed

7 files changed

+79
-35
lines changed

http-clients/aws-crt-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<dependency>
3434
<groupId>software.amazon.awssdk.crt</groupId>
3535
<artifactId>aws-crt</artifactId>
36-
<version>0.3.14</version>
36+
<version>0.3.17</version>
3737
</dependency>
3838

3939
<!--SDK dependencies-->

http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientS3IntegrationTest.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.TimeUnit;
2425
import org.junit.After;
2526
import org.junit.Assert;
2627
import org.junit.Before;
2728
import org.junit.Test;
2829
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
30+
import software.amazon.awssdk.core.ResponseBytes;
2931
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3032
import software.amazon.awssdk.crt.CrtResource;
3133
import software.amazon.awssdk.crt.io.ClientBootstrap;
@@ -35,6 +37,7 @@
3537
import software.amazon.awssdk.regions.Region;
3638
import software.amazon.awssdk.services.s3.S3AsyncClient;
3739
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
40+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3841

3942

4043
public class AwsCrtClientS3IntegrationTest {
@@ -43,9 +46,10 @@ public class AwsCrtClientS3IntegrationTest {
4346
*/
4447
private static String BUCKET_NAME = "aws-crt-test-stuff";
4548

46-
private static String KEY = "http_test_doc.txt";
47-
48-
private static String FILE_SHA256 = "C7FDB5314B9742467B16BD5EA2F8012190B5E2C44A005F7984F89AAB58219534";
49+
private static String LARGE_FILE = "http_test_doc.txt";
50+
private static String SMALL_FILE = "random_32_byte.data";
51+
private static String LARGE_FILE_SHA256 = "C7FDB5314B9742467B16BD5EA2F8012190B5E2C44A005F7984F89AAB58219534";
52+
private static int NUM_REQUESTS = 1000;
4953

5054
private static Region REGION = Region.US_EAST_1;
5155

@@ -63,7 +67,7 @@ private void addResource(CrtResource resource) {
6367
public void setup() {
6468
Assert.assertEquals("Expected Zero allocated AwsCrtResources", 0, CrtResource.getAllocatedNativeResourceCount());
6569

66-
ClientBootstrap bootstrap = new ClientBootstrap(1);
70+
ClientBootstrap bootstrap = new ClientBootstrap(4);
6771
SocketOptions socketOptions = new SocketOptions();
6872
TlsContext tlsContext = new TlsContext();
6973

@@ -100,11 +104,31 @@ public void tearDown() {
100104
public void testDownloadFromS3() throws Exception {
101105
GetObjectRequest s3Request = GetObjectRequest.builder()
102106
.bucket(BUCKET_NAME)
103-
.key(KEY)
107+
.key(LARGE_FILE)
104108
.build();
105109

106110
byte[] responseBody = s3.getObject(s3Request, AsyncResponseTransformer.toBytes()).get(120, TimeUnit.SECONDS).asByteArray();
107111

108-
assertThat(sha256Hex(responseBody).toUpperCase()).isEqualTo(FILE_SHA256);
112+
assertThat(sha256Hex(responseBody).toUpperCase()).isEqualTo(LARGE_FILE_SHA256);
113+
}
114+
115+
@Test
116+
public void testParallelDownloadFromS3() throws Exception {
117+
List<CompletableFuture<ResponseBytes<GetObjectResponse>> > requestFutures = new ArrayList<>();
118+
119+
for (int i = 0; i < NUM_REQUESTS; i++) {
120+
GetObjectRequest s3Request = GetObjectRequest.builder()
121+
.bucket(BUCKET_NAME)
122+
.key(SMALL_FILE)
123+
.build();
124+
CompletableFuture<ResponseBytes<GetObjectResponse>> requestFuture = s3.getObject(s3Request, AsyncResponseTransformer.toBytes());
125+
requestFutures.add(requestFuture);
126+
}
127+
128+
for(CompletableFuture<ResponseBytes<GetObjectResponse>> f: requestFutures) {
129+
f.join();
130+
Assert.assertEquals(32, f.get().asByteArray().length);
131+
}
109132
}
133+
110134
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
5757
private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpClient.class);
5858
private static final String HOST_HEADER = "Host";
5959
private static final String CONTENT_LENGTH = "Content-Length";
60+
private static final String CONNECTION = "Connection";
61+
private static final String KEEP_ALIVE = "keep-alive";
6062
private static final String AWS_COMMON_RUNTIME = "AwsCommonRuntime";
6163
private static final int DEFAULT_STREAM_WINDOW_SIZE = 16 * 1024 * 1024; // 16 MB Total Buffer size
6264
private static final int DEFAULT_HTTP_BODY_UPDATE_SIZE = 4 * 1024 * 1024; // 4 MB Update size from Native
@@ -111,7 +113,7 @@ public String clientName() {
111113

112114
private HttpConnectionPoolManager createConnectionPool(URI uri) {
113115
Validate.notNull(uri, "URI must not be null");
114-
log.debug(() -> "Creating ConnectionPool for: " + uri);
116+
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
115117
return new HttpConnectionPoolManager(bootstrap, socketOptions, tlsContext, uri, windowSize, maxConnectionsPerEndpoint);
116118
}
117119

@@ -144,6 +146,11 @@ private List<HttpHeader> createHttpHeaderList(URI uri, AsyncExecuteRequest async
144146
crtHeaderList.add(new HttpHeader(HOST_HEADER, uri.getHost()));
145147
}
146148

149+
// Add Connection Keep Alive Header to reuse this Http Connection as long as possible
150+
if (isNullOrEmpty(sdkRequest.headers().get(CONNECTION))) {
151+
crtHeaderList.add(new HttpHeader(CONNECTION, KEEP_ALIVE));
152+
}
153+
147154
// Set Content-Length if needed
148155
Optional<Long> contentLength = asyncRequest.requestContentPublisher().contentLength();
149156
if (isNullOrEmpty(sdkRequest.headers().get(CONTENT_LENGTH)) && contentLength.isPresent()) {
@@ -200,19 +207,20 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
200207
reqOptions.setBodyBufferSize(httpBodyUpdateSize);
201208

202209
// When a Connection is ready from the Connection Pool, schedule the Request on the connection
203-
crtConnPool.acquireConnection().whenComplete((crtConn, throwable) -> {
204-
// If we didn't get a connection for some reason, fail the request
205-
if (throwable != null) {
206-
requestFuture.completeExceptionally(throwable);
207-
return;
208-
}
209-
210-
// When the Request is complete, return our connection back to the Connection Pool
211-
requestFuture.whenComplete((v, t) -> crtConnPool.releaseConnection(crtConn));
212-
213-
// Submit the Request on this Connection
214-
invokeSafely(() -> crtConn.makeRequest(crtRequest, reqOptions, crtToSdkAdapter));
215-
});
210+
crtConnPool.acquireConnection()
211+
.whenComplete((crtConn, throwable) -> {
212+
// If we didn't get a connection for some reason, fail the request
213+
if (throwable != null) {
214+
requestFuture.completeExceptionally(throwable);
215+
return;
216+
}
217+
218+
// When the Request is complete, return our connection back to the Connection Pool
219+
requestFuture.whenComplete((v, t) -> crtConnPool.releaseConnection(crtConn));
220+
221+
// Submit the Request on this Connection
222+
invokeSafely(() -> crtConn.makeRequest(crtRequest, reqOptions, crtToSdkAdapter));
223+
});
216224

217225
return requestFuture;
218226
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@
3737
public class AwsCrtAsyncHttpStreamAdapter implements CrtHttpStreamHandler {
3838
private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpStreamAdapter.class);
3939
private final AsyncExecuteRequest sdkRequest;
40-
private final CompletableFuture<Void> reqComplete;
40+
private final CompletableFuture<Void> responseComplete;
4141
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
4242
private final int windowSize;
4343
private final AwsCrtRequestBodySubscriber requestBodySubscriber;
4444
private AwsCrtResponseBodyPublisher respBodyPublisher = null;
4545

46-
public AwsCrtAsyncHttpStreamAdapter(CompletableFuture<Void> reqComplete, AsyncExecuteRequest sdkRequest,
46+
public AwsCrtAsyncHttpStreamAdapter(CompletableFuture<Void> responseComplete, AsyncExecuteRequest sdkRequest,
4747
int windowSize) {
48-
Validate.notNull(reqComplete, "reqComplete Future is null");
48+
Validate.notNull(responseComplete, "reqComplete Future is null");
4949
Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
5050
Validate.isPositive(windowSize, "windowSize is <= 0");
5151

5252
this.sdkRequest = sdkRequest;
53-
this.reqComplete = reqComplete;
53+
this.responseComplete = responseComplete;
5454
this.windowSize = windowSize;
5555
this.requestBodySubscriber = new AwsCrtRequestBodySubscriber(windowSize);
5656

@@ -70,7 +70,7 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, HttpHea
7070
public void onResponseHeadersDone(HttpStream stream, boolean hasBody) {
7171
respBuilder.statusCode(stream.getResponseStatusCode());
7272
sdkRequest.responseHandler().onHeaders(respBuilder.build());
73-
respBodyPublisher = new AwsCrtResponseBodyPublisher(stream, windowSize);
73+
respBodyPublisher = new AwsCrtResponseBodyPublisher(stream, responseComplete, windowSize);
7474

7575

7676
if (!hasBody) {
@@ -92,6 +92,10 @@ public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
9292
respBodyPublisher.queueBuffer(deepCopy(bodyBytesIn));
9393
respBodyPublisher.publishToSubscribers();
9494

95+
if (bodyBytesIn.remaining() != 0) {
96+
throw new IllegalStateException("Unprocessed bytes remain in bodyBytesIn Buffer!");
97+
}
98+
9599
return 0;
96100
}
97101

@@ -101,7 +105,7 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
101105
log.debug(() -> "Response Completed Successfully");
102106
respBodyPublisher.setQueueComplete();
103107
respBodyPublisher.publishToSubscribers();
104-
reqComplete.complete(null);
108+
responseComplete.complete(null);
105109
} else {
106110
HttpException error = new HttpException(errorCode);
107111
log.error(() -> "Response Encountered an Error.", error);
@@ -115,7 +119,7 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
115119
respBodyPublisher.publishToSubscribers();
116120
}
117121

118-
reqComplete.completeExceptionally(error);
122+
responseComplete.completeExceptionally(error);
119123
}
120124

121125
stream.close();

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtRequestBodySubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public synchronized boolean transferRequestBody(ByteBuffer out) {
126126
if (!endOfStream) {
127127
requestDataIfNecessary();
128128
} else {
129-
log.debug(() -> "End Of Stream reached");
129+
log.debug(() -> "End Of RequestBody reached");
130130
}
131131

132132
return endOfStream;

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.util.Queue;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.ConcurrentLinkedQueue;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@ public class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
3839
private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisher.class);
3940
private static final LongUnaryOperator DECREMENT_IF_GREATER_THAN_ZERO = x -> ((x > 0) ? (x - 1) : (x));
4041

42+
private final CompletableFuture<Void> responseComplete;
4143
private final AtomicLong outstandingRequests = new AtomicLong(0);
4244
private final HttpStream stream;
4345
private final int windowSize;
@@ -56,10 +58,12 @@ public class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
5658
* @param windowSize The max allowed bytes to be queued. The sum of the sizes of all queued ByteBuffers should
5759
* never exceed this value.
5860
*/
59-
public AwsCrtResponseBodyPublisher(HttpStream stream, int windowSize) {
61+
public AwsCrtResponseBodyPublisher(HttpStream stream, CompletableFuture<Void> responseComplete, int windowSize) {
6062
Validate.notNull(stream, "Stream must not be null");
63+
Validate.notNull(responseComplete, "Stream must not be null");
6164
Validate.isPositive(windowSize, "windowSize must be > 0");
6265
this.stream = stream;
66+
this.responseComplete = responseComplete;
6367
this.windowSize = windowSize;
6468
}
6569

@@ -153,18 +157,21 @@ protected void completeSubscriptionExactlyOnce() {
153157
return;
154158
}
155159

156-
Subscriber s = subscriberRef.getAndSet(null);
160+
Subscriber subscriber = subscriberRef.getAndSet(null);
157161

158-
if (s == null) {
162+
if (subscriber == null) {
159163
return;
160164
}
161165

162166
Throwable throwable = error.get();
163167

164168
if (throwable != null) {
165-
s.onError(throwable);
169+
log.error(() -> "Error before ResponseBodyPublisher could complete: " + throwable.getMessage());
170+
subscriber.onError(throwable);
166171
} else {
167-
s.onComplete();
172+
log.debug(() -> "ResponseBodyPublisher Completed Successfully");
173+
subscriber.onComplete();
174+
responseComplete.complete(null);
168175
}
169176
}
170177

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.nio.ByteBuffer;
2121
import java.util.UUID;
22+
import java.util.concurrent.CompletableFuture;
2223
import org.reactivestreams.Publisher;
2324
import org.reactivestreams.tck.PublisherVerification;
2425
import org.reactivestreams.tck.TestEnvironment;
@@ -36,7 +37,7 @@ public AwsCrtResponseBodyPublisherReactiveStreamCompatTest() {
3637
@Override
3738
public Publisher<ByteBuffer> createPublisher(long elements) {
3839
HttpStream stream = mock(HttpStream.class);
39-
AwsCrtResponseBodyPublisher bodyPublisher = new AwsCrtResponseBodyPublisher(stream, Integer.MAX_VALUE);
40+
AwsCrtResponseBodyPublisher bodyPublisher = new AwsCrtResponseBodyPublisher(stream, new CompletableFuture<>(), Integer.MAX_VALUE);
4041

4142
for (long i = 0; i < elements; i++) {
4243
bodyPublisher.queueBuffer(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes()));

0 commit comments

Comments
 (0)