Skip to content

Commit 9174f11

Browse files
authored
Close the underlying crt connection and stream properly if the stream… (#4835)
* Close the underlying crt connection and stream properly if the stream is aborted or closed in the AWS CRT HTTP client * Add more tests
1 parent e5e9fa8 commit 9174f11

File tree

8 files changed

+305
-13
lines changed

8 files changed

+305
-13
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed the issue in the AWS CRT sync HTTP client where the connection was left open after the stream was aborted."
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal.response;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.nio.ByteBuffer;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.http.Abortable;
25+
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
26+
27+
/**
28+
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable} and closes the underlying connections when
29+
* {@link #close()} or {@link #abort()} is invoked.
30+
*/
31+
@SdkInternalApi
32+
public final class AbortableInputStreamSubscriber extends InputStream implements Subscriber<ByteBuffer>, Abortable {
33+
34+
private final InputStreamSubscriber delegate;
35+
private final Runnable closeConnection;
36+
37+
public AbortableInputStreamSubscriber(Runnable onClose, InputStreamSubscriber inputStreamSubscriber) {
38+
this.delegate = inputStreamSubscriber;
39+
this.closeConnection = onClose;
40+
}
41+
42+
@Override
43+
public void abort() {
44+
close();
45+
}
46+
47+
@Override
48+
public int read() throws IOException {
49+
return delegate.read();
50+
}
51+
52+
@Override
53+
public int read(byte[] b, int off, int len) throws IOException {
54+
return delegate.read(b, off, len);
55+
}
56+
57+
@Override
58+
public int read(byte[] b) throws IOException {
59+
return delegate.read(b);
60+
}
61+
62+
@Override
63+
public void onSubscribe(Subscription s) {
64+
delegate.onSubscribe(s);
65+
}
66+
67+
@Override
68+
public void onNext(ByteBuffer byteBuffer) {
69+
delegate.onNext(byteBuffer);
70+
}
71+
72+
@Override
73+
public void onError(Throwable t) {
74+
delegate.onError(t);
75+
}
76+
77+
@Override
78+
public void onComplete() {
79+
delegate.onComplete();
80+
}
81+
82+
@Override
83+
public void close() {
84+
closeConnection.run();
85+
delegate.close();
86+
}
87+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private void onSuccessfulResponseComplete(HttpStream stream) {
126126

127127
private void handlePublisherError(HttpStream stream, Throwable failure) {
128128
failResponseHandlerAndFuture(stream, failure);
129-
responseHandlerHelper.releaseConnection(stream);
129+
responseHandlerHelper.closeConnection(stream);
130130
}
131131

132132
private void onFailedResponseComplete(HttpStream stream, HttpException error) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import software.amazon.awssdk.http.SdkHttpFullResponse;
3232
import software.amazon.awssdk.http.SdkHttpResponse;
3333
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
34+
import software.amazon.awssdk.utils.Logger;
3435
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
3536
import software.amazon.awssdk.utils.async.SimplePublisher;
3637

@@ -39,8 +40,8 @@
3940
*/
4041
@SdkInternalApi
4142
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
42-
43-
private volatile InputStreamSubscriber inputStreamSubscriber;
43+
private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class);
44+
private volatile AbortableInputStreamSubscriber inputStreamSubscriber;
4445
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();
4546

4647
private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
@@ -66,12 +67,19 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo
6667
}
6768
responseBuilder.statusCode(responseStatusCode);
6869
}
70+
71+
// Propagate cancellation
72+
requestCompletionFuture.exceptionally(t -> {
73+
responseHandlerHelper.closeConnection(stream);
74+
return null;
75+
});
6976
}
7077

7178
@Override
7279
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
7380
if (inputStreamSubscriber == null) {
74-
inputStreamSubscriber = new InputStreamSubscriber();
81+
inputStreamSubscriber = new AbortableInputStreamSubscriber(() -> responseHandlerHelper.closeConnection(stream),
82+
new InputStreamSubscriber());
7583
simplePublisher.subscribe(inputStreamSubscriber);
7684
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
7785
// the stream directly.
@@ -88,7 +96,9 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
8896

8997
writeFuture.whenComplete((result, failure) -> {
9098
if (failure != null) {
91-
failFutureAndReleaseConnection(stream, failure);
99+
log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future",
100+
failure);
101+
failFutureAndCloseConnection(stream, failure);
92102
return;
93103
}
94104

@@ -111,11 +121,6 @@ public void onResponseComplete(HttpStream stream, int errorCode) {
111121
}
112122
}
113123

114-
private void failFutureAndReleaseConnection(HttpStream stream, Throwable failure) {
115-
requestCompletionFuture.completeExceptionally(failure);
116-
responseHandlerHelper.releaseConnection(stream);
117-
}
118-
119124
private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
120125
requestCompletionFuture.completeExceptionally(failure);
121126
responseHandlerHelper.closeConnection(stream);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal;
17+
18+
import static org.mockito.Mockito.verify;
19+
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.extension.ExtendWith;
23+
import org.mockito.Mock;
24+
import org.mockito.junit.jupiter.MockitoExtension;
25+
import software.amazon.awssdk.http.crt.internal.response.AbortableInputStreamSubscriber;
26+
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
27+
28+
@ExtendWith(MockitoExtension.class)
29+
public class AbortableInputStreamSubscriberTest {
30+
31+
private AbortableInputStreamSubscriber abortableInputStreamSubscriber;
32+
33+
@Mock
34+
private Runnable onClose;
35+
36+
@BeforeEach
37+
void setUp() {
38+
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(onClose, new InputStreamSubscriber());
39+
}
40+
41+
@Test
42+
void close_shouldInvokeOnClose() {
43+
abortableInputStreamSubscriber.close();
44+
verify(onClose).run();
45+
}
46+
47+
@Test
48+
void abort_shouldInvokeOnClose() {
49+
abortableInputStreamSubscriber.abort();
50+
verify(onClose).run();
51+
}
52+
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public abstract class BaseHttpStreamResponseHandlerTest {
4242
CompletableFuture requestFuture;
4343

4444
@Mock
45-
private HttpStream httpStream;
45+
HttpStream httpStream;
4646

47-
private HttpStreamResponseHandler responseHandler;
47+
HttpStreamResponseHandler responseHandler;
4848

4949
abstract HttpStreamResponseHandler responseHandler();
5050

@@ -113,7 +113,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
113113
verify(httpStream, never()).incrementWindow(anyInt());
114114
}
115115

116-
private static HttpHeader[] getHttpHeaders() {
116+
static HttpHeader[] getHttpHeaders() {
117117
HttpHeader[] httpHeaders = new HttpHeader[1];
118118
httpHeaders[0] = new HttpHeader("Content-Length", "1");
119119
return httpHeaders;

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,29 @@
1515

1616
package software.amazon.awssdk.http.crt.internal;
1717

18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.mockito.ArgumentMatchers.anyInt;
20+
import static org.mockito.Mockito.never;
21+
import static org.mockito.Mockito.verify;
22+
23+
import java.nio.ByteBuffer;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.concurrent.CancellationException;
1826
import java.util.concurrent.CompletableFuture;
1927
import java.util.function.Function;
28+
import org.junit.jupiter.api.Test;
2029
import org.mockito.Mockito;
30+
import org.reactivestreams.Publisher;
31+
import org.reactivestreams.Subscriber;
32+
import org.reactivestreams.Subscription;
2133
import software.amazon.awssdk.core.http.HttpResponseHandler;
2234
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2335
import software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler;
36+
import software.amazon.awssdk.crt.http.HttpHeader;
37+
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
2438
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
2539
import software.amazon.awssdk.http.SdkHttpFullResponse;
40+
import software.amazon.awssdk.http.SdkHttpResponse;
2641
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
2742
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
2843
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
@@ -37,4 +52,62 @@ HttpStreamResponseHandler responseHandler() {
3752
responseHandler.prepare();
3853
return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
3954
}
55+
56+
@Test
57+
void publisherFailedToDeliverEvents_shouldShutDownConnection() {
58+
SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler();
59+
60+
HttpStreamResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
61+
HttpHeader[] httpHeaders = getHttpHeaders();
62+
crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
63+
httpHeaders);
64+
crtResponseHandler.onResponseHeadersDone(httpStream, 0);
65+
crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8));
66+
67+
crtResponseHandler.onResponseComplete(httpStream, 0);
68+
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining(
69+
"subscription has been cancelled");
70+
verify(crtConn).shutdown();
71+
verify(crtConn).close();
72+
verify(httpStream).close();
73+
}
74+
75+
private static class TestAsyncHttpResponseHandler implements SdkAsyncHttpResponseHandler {
76+
77+
@Override
78+
public void onHeaders(SdkHttpResponse headers) {
79+
}
80+
81+
@Override
82+
public void onStream(Publisher<ByteBuffer> stream) {
83+
stream.subscribe(new Subscriber<ByteBuffer>() {
84+
private Subscription subscription;
85+
@Override
86+
public void onSubscribe(Subscription s) {
87+
subscription = s;
88+
s.request(1);
89+
}
90+
91+
@Override
92+
public void onNext(ByteBuffer byteBuffer) {
93+
subscription.cancel();
94+
}
95+
96+
@Override
97+
public void onError(Throwable t) {
98+
99+
}
100+
101+
@Override
102+
public void onComplete() {
103+
104+
}
105+
});
106+
}
107+
108+
@Override
109+
public void onError(Throwable error) {
110+
111+
}
112+
}
40113
}

0 commit comments

Comments
 (0)