Skip to content

Commit 6143fe5

Browse files
authored
Fixed a thread safety issue that could cause application to crash in … (#4839)
* Fixed a thread safety issue that could cause application to crash in the edge case in AWS CRT HTTP client * Add tests
1 parent 10121d4 commit 6143fe5

File tree

9 files changed

+195
-25
lines changed

9 files changed

+195
-25
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed a thread safety issue that could cause application to crash in the edge case where the SDK attempted to invoke `incrementWindow` after the stream is closed in AWS CRT HTTP Client."
6+
}

bom-internal/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@
255255
<version>${mockito.version}</version>
256256
<scope>test</scope>
257257
</dependency>
258+
<dependency>
259+
<groupId>org.mockito</groupId>
260+
<artifactId>mockito-inline</artifactId>
261+
<version>${mockito.version}</version>
262+
<scope>test</scope>
263+
</dependency>
258264
<dependency>
259265
<groupId>nl.jqno.equalsverifier</groupId>
260266
<artifactId>equalsverifier</artifactId>

http-clients/aws-crt-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
</dependency>
106106
<dependency>
107107
<groupId>org.mockito</groupId>
108-
<artifactId>mockito-core</artifactId>
108+
<artifactId>mockito-inline</artifactId>
109109
<scope>test</scope>
110110
</dependency>
111111
<dependency>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.ByteBuffer;
2121
import java.util.concurrent.CompletableFuture;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2324
import software.amazon.awssdk.crt.CRT;
2425
import software.amazon.awssdk.crt.http.HttpClientConnection;
2526
import software.amazon.awssdk.crt.http.HttpException;
@@ -46,19 +47,29 @@ public final class CrtResponseAdapter implements HttpStreamResponseHandler {
4647
private final HttpClientConnection connection;
4748
private final CompletableFuture<Void> completionFuture;
4849
private final SdkAsyncHttpResponseHandler responseHandler;
49-
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();
50+
private final SimplePublisher<ByteBuffer> responsePublisher;
5051

5152
private final SdkHttpResponse.Builder responseBuilder;
5253
private final ResponseHandlerHelper responseHandlerHelper;
5354

5455
private CrtResponseAdapter(HttpClientConnection connection,
5556
CompletableFuture<Void> completionFuture,
5657
SdkAsyncHttpResponseHandler responseHandler) {
58+
this(connection, completionFuture, responseHandler, new SimplePublisher<>());
59+
}
60+
61+
62+
@SdkTestInternalApi
63+
public CrtResponseAdapter(HttpClientConnection connection,
64+
CompletableFuture<Void> completionFuture,
65+
SdkAsyncHttpResponseHandler responseHandler,
66+
SimplePublisher<ByteBuffer> simplePublisher) {
5767
this.connection = Validate.paramNotNull(connection, "connection");
5868
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
5969
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
6070
this.responseBuilder = SdkHttpResponse.builder();
6171
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection);
72+
this.responsePublisher = simplePublisher;
6273
}
6374

6475
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
@@ -95,9 +106,7 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
95106
return;
96107
}
97108

98-
if (!responseHandlerHelper.connectionClosed().get()) {
99-
stream.incrementWindow(bodyBytesIn.length);
100-
}
109+
responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length);
101110
});
102111

103112
return 0;

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.ByteBuffer;
2121
import java.util.concurrent.CompletableFuture;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2324
import software.amazon.awssdk.crt.CRT;
2425
import software.amazon.awssdk.crt.http.HttpClientConnection;
2526
import software.amazon.awssdk.crt.http.HttpException;
@@ -42,7 +43,7 @@
4243
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
4344
private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class);
4445
private volatile AbortableInputStreamSubscriber inputStreamSubscriber;
45-
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();
46+
private final SimplePublisher<ByteBuffer> simplePublisher;
4647

4748
private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
4849
private final HttpClientConnection crtConn;
@@ -52,10 +53,18 @@ public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpS
5253

5354
public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
5455
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
56+
this(crtConn, requestCompletionFuture, new SimplePublisher<>());
57+
}
58+
59+
@SdkTestInternalApi
60+
public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
61+
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture,
62+
SimplePublisher<ByteBuffer> simplePublisher) {
5563
this.crtConn = crtConn;
5664
this.requestCompletionFuture = requestCompletionFuture;
5765
this.responseBuilder = SdkHttpResponse.builder();
5866
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, crtConn);
67+
this.simplePublisher = simplePublisher;
5968
}
6069

6170
@Override
@@ -101,11 +110,8 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
101110
failFutureAndCloseConnection(stream, failure);
102111
return;
103112
}
104-
105-
if (!responseHandlerHelper.connectionClosed().get()) {
106-
// increment the window upon buffer consumption.
107-
stream.incrementWindow(bodyBytesIn.length);
108-
}
113+
// increment the window upon buffer consumption.
114+
responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length);
109115
});
110116

111117
// Window will be incremented after the subscriber consumes the data, returning 0 here to disable it.

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package software.amazon.awssdk.http.crt.internal.response;
1717

18-
import java.util.concurrent.atomic.AtomicBoolean;
1918
import software.amazon.awssdk.annotations.SdkInternalApi;
2019
import software.amazon.awssdk.crt.http.HttpClientConnection;
2120
import software.amazon.awssdk.crt.http.HttpHeader;
@@ -30,14 +29,16 @@
3029
*
3130
* CRT connection will only be closed, i.e., not reused, in one of the following conditions:
3231
* 1. 5xx server error OR
33-
* 2. It fails to read the response.
32+
* 2. It fails to read the response OR
33+
* 3. the response stream is closed/aborted by the caller.
3434
*/
3535
@SdkInternalApi
3636
public class ResponseHandlerHelper {
3737

3838
private final SdkHttpResponse.Builder responseBuilder;
3939
private final HttpClientConnection connection;
40-
private AtomicBoolean connectionClosed = new AtomicBoolean(false);
40+
private boolean connectionClosed;
41+
private final Object lock = new Object();
4142

4243
public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder, HttpClientConnection connection) {
4344
this.responseBuilder = responseBuilder;
@@ -57,20 +58,34 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int hea
5758
* Release the connection back to the pool so that it can be reused.
5859
*/
5960
public void releaseConnection(HttpStream stream) {
60-
if (connectionClosed.compareAndSet(false, true)) {
61-
connection.close();
62-
stream.close();
61+
synchronized (lock) {
62+
if (!connectionClosed) {
63+
connectionClosed = true;
64+
connection.close();
65+
stream.close();
66+
}
67+
}
68+
}
69+
70+
public void incrementWindow(HttpStream stream, int windowSize) {
71+
synchronized (lock) {
72+
if (!connectionClosed) {
73+
stream.incrementWindow(windowSize);
74+
}
6375
}
6476
}
6577

6678
/**
6779
* Close the connection completely
6880
*/
6981
public void closeConnection(HttpStream stream) {
70-
if (connectionClosed.compareAndSet(false, true)) {
71-
connection.shutdown();
72-
connection.close();
73-
stream.close();
82+
synchronized (lock) {
83+
if (!connectionClosed) {
84+
connectionClosed = true;
85+
connection.shutdown();
86+
connection.close();
87+
stream.close();
88+
}
7489
}
7590
}
7691

@@ -82,8 +97,4 @@ public void cleanUpConnectionBasedOnStatusCode(HttpStream stream) {
8297
releaseConnection(stream);
8398
}
8499
}
85-
86-
public AtomicBoolean connectionClosed() {
87-
return connectionClosed;
88-
}
89100
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,34 @@
1616
package software.amazon.awssdk.http.crt.internal;
1717

1818
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.mockito.ArgumentMatchers.any;
1920
import static org.mockito.ArgumentMatchers.anyInt;
2021
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
2224

25+
import java.nio.ByteBuffer;
2326
import java.nio.charset.StandardCharsets;
2427
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import org.apache.commons.lang3.RandomStringUtils;
2531
import org.junit.jupiter.api.BeforeEach;
2632
import org.junit.jupiter.api.Test;
2733
import org.junit.jupiter.api.extension.ExtendWith;
2834
import org.junit.jupiter.params.ParameterizedTest;
2935
import org.junit.jupiter.params.provider.ValueSource;
3036
import org.mockito.Mock;
37+
import org.mockito.Mockito;
3138
import org.mockito.junit.jupiter.MockitoExtension;
3239
import software.amazon.awssdk.crt.http.HttpClientConnection;
3340
import software.amazon.awssdk.crt.http.HttpException;
3441
import software.amazon.awssdk.crt.http.HttpHeader;
3542
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
3643
import software.amazon.awssdk.crt.http.HttpStream;
3744
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
45+
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
46+
import software.amazon.awssdk.utils.async.SimplePublisher;
3847

3948
@ExtendWith(MockitoExtension.class)
4049
public abstract class BaseHttpStreamResponseHandlerTest {
@@ -44,10 +53,15 @@ public abstract class BaseHttpStreamResponseHandlerTest {
4453
@Mock
4554
HttpStream httpStream;
4655

56+
@Mock
57+
SimplePublisher<ByteBuffer> simplePublisher;
58+
4759
HttpStreamResponseHandler responseHandler;
4860

4961
abstract HttpStreamResponseHandler responseHandler();
5062

63+
abstract HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher<ByteBuffer> simplePublisher);
64+
5165
@BeforeEach
5266
public void setUp() {
5367
requestFuture = new CompletableFuture<>();
@@ -113,6 +127,101 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
113127
verify(httpStream, never()).incrementWindow(anyInt());
114128
}
115129

130+
@Test
131+
void publisherWritesFutureFails_shouldShutdownConnection() {
132+
SimplePublisher<ByteBuffer> simplePublisher = Mockito.mock(SimplePublisher.class);
133+
CompletableFuture<Void> future = new CompletableFuture<>();
134+
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
135+
136+
HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
137+
HttpHeader[] httpHeaders = getHttpHeaders();
138+
139+
handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
140+
httpHeaders);
141+
handler.onResponseHeadersDone(httpStream, 0);
142+
handler.onResponseBody(httpStream,
143+
RandomStringUtils.random(1 * 1024 * 1024).getBytes(StandardCharsets.UTF_8));
144+
RuntimeException runtimeException = new RuntimeException();
145+
future.completeExceptionally(runtimeException);
146+
147+
try {
148+
requestFuture.join();
149+
} catch (Exception e) {
150+
// we don't verify here because it behaves differently in async and sync
151+
}
152+
153+
verify(crtConn).shutdown();
154+
verify(crtConn).close();
155+
verify(httpStream).close();
156+
verify(httpStream, never()).incrementWindow(anyInt());
157+
}
158+
159+
@Test
160+
void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncrementWindow() {
161+
CompletableFuture<Void> future = new CompletableFuture<>();
162+
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
163+
when(simplePublisher.complete()).thenReturn(future);
164+
165+
HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
166+
167+
168+
HttpHeader[] httpHeaders = getHttpHeaders();
169+
170+
handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
171+
httpHeaders);
172+
handler.onResponseHeadersDone(httpStream, 0);
173+
handler.onResponseBody(httpStream,
174+
RandomStringUtils.random(1 * 1024 * 1024).getBytes(StandardCharsets.UTF_8));
175+
handler.onResponseComplete(httpStream, 0);
176+
future.complete(null);
177+
178+
requestFuture.join();
179+
verify(crtConn, never()).shutdown();
180+
verify(crtConn).close();
181+
verify(httpStream).close();
182+
verify(httpStream, never()).incrementWindow(anyInt());
183+
}
184+
185+
@Test
186+
void publisherWritesFutureCompletesWhenConnectionClosed_shouldNotInvokeIncrementWindow() {
187+
CompletableFuture<Void> future = new CompletableFuture<>();
188+
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
189+
when(simplePublisher.complete()).thenReturn(future);
190+
191+
HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher(simplePublisher);
192+
193+
194+
HttpHeader[] httpHeaders = getHttpHeaders();
195+
196+
handler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
197+
httpHeaders);
198+
handler.onResponseHeadersDone(httpStream, 0);
199+
handler.onResponseBody(httpStream,
200+
RandomStringUtils.random(1 * 1024 * 1024).getBytes(StandardCharsets.UTF_8));
201+
202+
// This tracker tracks which of the two operation completes first
203+
AtomicInteger whenCompleteTracker = new AtomicInteger(0);
204+
CompletableFuture<Void> onResponseComplete = CompletableFuture.runAsync(() -> handler.onResponseComplete(httpStream, 0))
205+
.whenComplete((r, t) -> whenCompleteTracker.compareAndSet(0, 1));
206+
207+
CompletableFuture<Void> writeComplete = CompletableFuture.runAsync(() -> future.complete(null))
208+
.whenComplete((r, t) -> whenCompleteTracker.compareAndSet(0, 2));
209+
requestFuture.join();
210+
211+
CompletableFuture.allOf(onResponseComplete, writeComplete).join();
212+
213+
if (whenCompleteTracker.get() == 1) {
214+
// onResponseComplete finishes first
215+
verify(httpStream, never()).incrementWindow(anyInt());
216+
} else {
217+
verify(httpStream).incrementWindow(anyInt());
218+
}
219+
220+
verify(crtConn, never()).shutdown();
221+
verify(crtConn).close();
222+
verify(httpStream).close();
223+
}
224+
116225
static HttpHeader[] getHttpHeaders() {
117226
HttpHeader[] httpHeaders = new HttpHeader[1];
118227
httpHeaders[0] = new HttpHeader("Content-Length", "1");

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
4242
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
4343
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
44+
import software.amazon.awssdk.utils.async.SimplePublisher;
4445

4546
public class CrtResponseHandlerTest extends BaseHttpStreamResponseHandlerTest {
4647

@@ -53,6 +54,15 @@ HttpStreamResponseHandler responseHandler() {
5354
return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler);
5455
}
5556

57+
@Override
58+
HttpStreamResponseHandler responseHandlerWithMockedPublisher(SimplePublisher<ByteBuffer> simplePublisher) {
59+
AsyncResponseHandler<Void> responseHandler = new AsyncResponseHandler<>((response,
60+
executionAttributes) -> null, Function.identity(), new ExecutionAttributes());
61+
62+
responseHandler.prepare();
63+
return new CrtResponseAdapter(crtConn, requestFuture, responseHandler, simplePublisher);
64+
}
65+
5666
@Test
5767
void publisherFailedToDeliverEvents_shouldShutDownConnection() {
5868
SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler();

0 commit comments

Comments
 (0)