Skip to content

Commit 4851f0d

Browse files
authored
Fixed the issue in AWS CRT HTTP clients where the connection is shut down unnecessarily (#4825)
* Fixed the issue where the connection is shut down unnecessarily * Refactoring * Address feedback * Fix typo
1 parent a4de6e7 commit 4851f0d

File tree

6 files changed

+306
-37
lines changed

6 files changed

+306
-37
lines changed

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
2828
import software.amazon.awssdk.crt.http.HttpStream;
2929
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
30-
import software.amazon.awssdk.http.HttpStatusFamily;
3130
import software.amazon.awssdk.http.SdkHttpResponse;
3231
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
3332
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
@@ -49,14 +48,17 @@ public final class CrtResponseAdapter implements HttpStreamResponseHandler {
4948
private final SdkAsyncHttpResponseHandler responseHandler;
5049
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();
5150

52-
private final SdkHttpResponse.Builder responseBuilder = SdkHttpResponse.builder();
51+
private final SdkHttpResponse.Builder responseBuilder;
52+
private final ResponseHandlerHelper responseHandlerHelper;
5353

5454
private CrtResponseAdapter(HttpClientConnection connection,
5555
CompletableFuture<Void> completionFuture,
5656
SdkAsyncHttpResponseHandler responseHandler) {
5757
this.connection = Validate.paramNotNull(connection, "connection");
5858
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
5959
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
60+
this.responseBuilder = SdkHttpResponse.builder();
61+
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection);
6062
}
6163

6264
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
@@ -66,18 +68,13 @@ public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnectio
6668
}
6769

6870
@Override
69-
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
70-
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
71-
for (HttpHeader h : nextHeaders) {
72-
responseBuilder.appendHeader(h.getName(), h.getValue());
73-
}
74-
}
71+
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
72+
responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
7573
}
7674

7775
@Override
7876
public void onResponseHeadersDone(HttpStream stream, int headerType) {
7977
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
80-
responseBuilder.statusCode(stream.getResponseStatusCode());
8178
responseHandler.onHeaders(responseBuilder.build());
8279
responseHandler.onStream(responsePublisher);
8380
}
@@ -94,7 +91,7 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
9491

9592
writeFuture.whenComplete((result, failure) -> {
9693
if (failure != null) {
97-
failResponseHandlerAndFuture(stream, failure);
94+
handlePublisherError(stream, failure);
9895
return;
9996
}
10097

@@ -116,18 +113,18 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
116113
private void onSuccessfulResponseComplete(HttpStream stream) {
117114
responsePublisher.complete().whenComplete((result, failure) -> {
118115
if (failure != null) {
119-
failResponseHandlerAndFuture(stream, failure);
116+
handlePublisherError(stream, failure);
120117
return;
121118
}
122-
123-
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
124-
connection.shutdown();
125-
}
126-
127-
connection.close();
128-
stream.close();
129119
completionFuture.complete(null);
130120
});
121+
122+
responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream);
123+
}
124+
125+
private void handlePublisherError(HttpStream stream, Throwable failure) {
126+
failResponseHandlerAndFuture(stream, failure);
127+
responseHandlerHelper.releaseConnection(stream);
131128
}
132129

133130
private void onFailedResponseComplete(HttpStream stream, HttpException error) {
@@ -136,14 +133,12 @@ private void onFailedResponseComplete(HttpStream stream, HttpException error) {
136133
Throwable toThrow = wrapWithIoExceptionIfRetryable(error);;
137134
responsePublisher.error(toThrow);
138135
failResponseHandlerAndFuture(stream, toThrow);
136+
responseHandlerHelper.closeConnection(stream);
139137
}
140138

141139
private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
142140
callResponseHandlerOnError(error);
143141
completionFuture.completeExceptionally(error);
144-
connection.shutdown();
145-
connection.close();
146-
stream.close();
147142
}
148143

149144
private void callResponseHandlerOnError(Throwable error) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import software.amazon.awssdk.crt.http.HttpStream;
2929
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
3030
import software.amazon.awssdk.http.AbortableInputStream;
31-
import software.amazon.awssdk.http.HttpStatusFamily;
3231
import software.amazon.awssdk.http.SdkHttpFullResponse;
32+
import software.amazon.awssdk.http.SdkHttpResponse;
3333
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
3434
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
3535
import software.amazon.awssdk.utils.async.SimplePublisher;
@@ -39,17 +39,22 @@
3939
*/
4040
@SdkInternalApi
4141
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
42-
private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
42+
4343
private volatile InputStreamSubscriber inputStreamSubscriber;
4444
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();
4545

4646
private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
4747
private final HttpClientConnection crtConn;
4848

49+
private final SdkHttpFullResponse.Builder responseBuilder;
50+
private final ResponseHandlerHelper responseHandlerHelper;
51+
4952
public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
5053
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
5154
this.crtConn = crtConn;
5255
this.requestCompletionFuture = requestCompletionFuture;
56+
this.responseBuilder = SdkHttpResponse.builder();
57+
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, crtConn);
5358
}
5459

5560
@Override
@@ -59,9 +64,8 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo
5964
for (HttpHeader h : nextHeaders) {
6065
responseBuilder.appendHeader(h.getName(), h.getValue());
6166
}
67+
responseBuilder.statusCode(responseStatusCode);
6268
}
63-
64-
responseBuilder.statusCode(responseStatusCode);
6569
}
6670

6771
@Override
@@ -84,15 +88,15 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
8488

8589
writeFuture.whenComplete((result, failure) -> {
8690
if (failure != null) {
87-
failFutureAndCloseConnection(stream, failure);
91+
failFutureAndReleaseConnection(stream, failure);
8892
return;
8993
}
9094

9195
// increment the window upon buffer consumption.
9296
stream.incrementWindow(bodyBytesIn.length);
9397
});
9498

95-
// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
99+
// Window will be incremented after the subscriber consumes the data, returning 0 here to disable it.
96100
return 0;
97101
}
98102

@@ -105,11 +109,14 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
105109
}
106110
}
107111

112+
private void failFutureAndReleaseConnection(HttpStream stream, Throwable failure) {
113+
requestCompletionFuture.completeExceptionally(failure);
114+
responseHandlerHelper.releaseConnection(stream);
115+
}
116+
108117
private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
109118
requestCompletionFuture.completeExceptionally(failure);
110-
crtConn.shutdown();
111-
crtConn.close();
112-
stream.close();
119+
responseHandlerHelper.closeConnection(stream);
113120
}
114121

115122
private void onFailedResponseComplete(HttpStream stream, int errorCode) {
@@ -121,16 +128,12 @@ private void onFailedResponseComplete(HttpStream stream, int errorCode) {
121128
}
122129

123130
private void onSuccessfulResponseComplete(HttpStream stream) {
124-
// always close the connection on a 5XX response code.
125-
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
126-
crtConn.shutdown();
127-
}
128-
129131
// For response without a payload, for example, S3 PutObjectResponse, we need to complete the future
130132
// in onResponseComplete callback since onResponseBody will never be invoked.
131133
requestCompletionFuture.complete(responseBuilder.build());
134+
135+
// requestCompletionFuture has been completed at this point, no need to notify the future
132136
simplePublisher.complete();
133-
crtConn.close();
134-
stream.close();
137+
responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream);
135138
}
136139
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal.response;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
import software.amazon.awssdk.crt.http.HttpClientConnection;
21+
import software.amazon.awssdk.crt.http.HttpHeader;
22+
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
23+
import software.amazon.awssdk.crt.http.HttpStream;
24+
import software.amazon.awssdk.http.HttpStatusFamily;
25+
import software.amazon.awssdk.http.SdkHttpResponse;
26+
27+
/**
28+
* This is the helper class that contains common logic shared between {@link CrtResponseAdapter} and
29+
* {@link InputStreamAdaptingHttpStreamResponseHandler}.
30+
*
31+
* CRT connection will only be closed, i.e., not reused, in one of the following conditions:
32+
* 1. 5xx server error OR
33+
* 2. It fails to read the response.
34+
*/
35+
@SdkInternalApi
36+
public class ResponseHandlerHelper {
37+
38+
private final SdkHttpResponse.Builder responseBuilder;
39+
private final HttpClientConnection connection;
40+
private AtomicBoolean connectionClosed = new AtomicBoolean(false);
41+
42+
public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder, HttpClientConnection connection) {
43+
this.responseBuilder = responseBuilder;
44+
this.connection = connection;
45+
}
46+
47+
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
48+
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
49+
for (HttpHeader h : nextHeaders) {
50+
responseBuilder.appendHeader(h.getName(), h.getValue());
51+
}
52+
responseBuilder.statusCode(responseStatusCode);
53+
}
54+
}
55+
56+
/**
57+
* Release the connection back to the pool so that it can be reused.
58+
*/
59+
public void releaseConnection(HttpStream stream) {
60+
if (connectionClosed.compareAndSet(false, true)) {
61+
connection.close();
62+
stream.close();
63+
}
64+
}
65+
66+
/**
67+
* Close the connection completely
68+
*/
69+
public void closeConnection(HttpStream stream) {
70+
if (connectionClosed.compareAndSet(false, true)) {
71+
connection.shutdown();
72+
connection.close();
73+
stream.close();
74+
}
75+
}
76+
77+
public void cleanUpConnectionBasedOnStatusCode(HttpStream stream) {
78+
// always close the connection on a 5XX response code.
79+
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
80+
closeConnection(stream);
81+
} else {
82+
releaseConnection(stream);
83+
}
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.mockito.Mockito.never;
20+
import static org.mockito.Mockito.verify;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
import org.mockito.Mock;
29+
import org.mockito.junit.jupiter.MockitoExtension;
30+
import software.amazon.awssdk.crt.http.HttpClientConnection;
31+
import software.amazon.awssdk.crt.http.HttpException;
32+
import software.amazon.awssdk.crt.http.HttpHeader;
33+
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
34+
import software.amazon.awssdk.crt.http.HttpStream;
35+
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
36+
37+
@ExtendWith(MockitoExtension.class)
38+
public abstract class BaseHttpStreamResponseHandlerTest {
39+
@Mock HttpClientConnection crtConn;
40+
CompletableFuture requestFuture;
41+
42+
@Mock
43+
private HttpStream httpStream;
44+
45+
private HttpStreamResponseHandler responseHandler;
46+
47+
abstract HttpStreamResponseHandler responseHandler();
48+
49+
@BeforeEach
50+
public void setUp() {
51+
requestFuture = new CompletableFuture<>();
52+
responseHandler = responseHandler();
53+
}
54+
55+
@Test
56+
void serverError_shouldShutdownConnection() {
57+
HttpHeader[] httpHeaders = getHttpHeaders();
58+
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
59+
httpHeaders);
60+
61+
responseHandler.onResponseHeadersDone(httpStream, 0);
62+
responseHandler.onResponseComplete(httpStream, 0);
63+
requestFuture.join();
64+
verify(crtConn).shutdown();
65+
verify(crtConn).close();
66+
verify(httpStream).close();
67+
}
68+
69+
@ParameterizedTest
70+
@ValueSource(ints = { 200, 400, 202, 403 })
71+
void nonServerError_shouldNotShutdownConnection(int statusCode) {
72+
HttpHeader[] httpHeaders = getHttpHeaders();
73+
responseHandler.onResponseHeaders(httpStream, statusCode, HttpHeaderBlock.MAIN.getValue(),
74+
httpHeaders);
75+
76+
responseHandler.onResponseHeadersDone(httpStream, 0);
77+
responseHandler.onResponseComplete(httpStream, 0);
78+
79+
requestFuture.join();
80+
verify(crtConn, never()).shutdown();
81+
verify(crtConn).close();
82+
verify(httpStream).close();
83+
}
84+
85+
@Test
86+
void failedToGetResponse_shouldShutdownConnection() {
87+
HttpHeader[] httpHeaders = getHttpHeaders();
88+
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
89+
httpHeaders);
90+
91+
responseHandler.onResponseComplete(httpStream, 1);
92+
assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class);
93+
verify(crtConn).shutdown();
94+
verify(crtConn).close();
95+
verify(httpStream).close();
96+
}
97+
98+
private static HttpHeader[] getHttpHeaders() {
99+
HttpHeader[] httpHeaders = new HttpHeader[1];
100+
httpHeaders[0] = new HttpHeader("Content-Length", "1");
101+
return httpHeaders;
102+
}
103+
}

0 commit comments

Comments
 (0)