Skip to content

Commit 189bbcf

Browse files
authored
Fixed the issue where invoking abort on AsyncResponseTransformer.toBl… (#4932)
* Fixed the issue where invoking abort on AsyncResponseTransformer.toBlockingInputStream could lead to memory leak * Fix build * Remove doAfterAbort and fix build * Make test less flaky
1 parent 350cf3a commit 189bbcf

File tree

9 files changed

+305
-21
lines changed

9 files changed

+305
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamResponseTransformer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import software.amazon.awssdk.core.SdkResponse;
2323
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2424
import software.amazon.awssdk.core.async.SdkPublisher;
25-
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
25+
import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber;
2626

2727
/**
2828
* A {@link AsyncResponseTransformer} that allows performing blocking reads on the response data.
@@ -50,7 +50,7 @@ public void onResponse(ResponseT response) {
5050

5151
@Override
5252
public void onStream(SdkPublisher<ByteBuffer> publisher) {
53-
InputStreamSubscriber inputStreamSubscriber = new InputStreamSubscriber();
53+
AbortableInputStreamSubscriber inputStreamSubscriber = AbortableInputStreamSubscriber.builder().build();
5454
publisher.subscribe(inputStreamSubscriber);
5555
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
5656
}

http-client-spi/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@
9090
<artifactId>byte-buddy</artifactId>
9191
<scope>test</scope>
9292
</dependency>
93+
<dependency>
94+
<groupId>org.mockito</groupId>
95+
<artifactId>mockito-junit-jupiter</artifactId>
96+
<scope>test</scope>
97+
</dependency>
98+
<dependency>
99+
<groupId>org.mockito</groupId>
100+
<artifactId>mockito-inline</artifactId>
101+
<scope>test</scope>
102+
</dependency>
93103
</dependencies>
94104

95105
<build>
Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,43 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.http.crt.internal.response;
16+
package software.amazon.awssdk.http.async;
1717

1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.nio.ByteBuffer;
2121
import org.reactivestreams.Subscriber;
2222
import org.reactivestreams.Subscription;
23-
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.annotations.SdkProtectedApi;
24+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2425
import software.amazon.awssdk.http.Abortable;
26+
import software.amazon.awssdk.utils.FunctionalUtils;
2527
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
2628

2729
/**
28-
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable} and closes the underlying connections when
29-
* {@link #close()} or {@link #abort()} is invoked.
30+
* Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable}. It will invoke {@link #close()}
31+
* when {@link #abort()} is invoked. Upon closing, the underlying {@link InputStreamSubscriber} will be closed, and additional
32+
* action can be added via {@link Builder#doAfterClose(Runnable)}.
33+
*
3034
*/
31-
@SdkInternalApi
35+
@SdkProtectedApi
3236
public final class AbortableInputStreamSubscriber extends InputStream implements Subscriber<ByteBuffer>, Abortable {
33-
3437
private final InputStreamSubscriber delegate;
35-
private final Runnable closeConnection;
3638

37-
public AbortableInputStreamSubscriber(Runnable onClose, InputStreamSubscriber inputStreamSubscriber) {
38-
this.delegate = inputStreamSubscriber;
39-
this.closeConnection = onClose;
39+
private final Runnable doAfterClose;
40+
41+
private AbortableInputStreamSubscriber(Builder builder) {
42+
this(builder, new InputStreamSubscriber());
43+
}
44+
45+
@SdkTestInternalApi
46+
AbortableInputStreamSubscriber(Builder builder, InputStreamSubscriber delegate) {
47+
this.delegate = delegate;
48+
this.doAfterClose = builder.doAfterClose == null ? FunctionalUtils.noOpRunnable() : builder.doAfterClose;
49+
}
50+
51+
public static Builder builder() {
52+
return new Builder();
4053
}
4154

4255
@Override
@@ -81,7 +94,23 @@ public void onComplete() {
8194

8295
@Override
8396
public void close() {
84-
closeConnection.run();
8597
delegate.close();
98+
FunctionalUtils.invokeSafely(() -> doAfterClose.run());
99+
}
100+
101+
public static final class Builder {
102+
private Runnable doAfterClose;
103+
104+
/**
105+
* Additional action to run when {@link #close()} is invoked
106+
*/
107+
public Builder doAfterClose(Runnable doAfterClose) {
108+
this.doAfterClose = doAfterClose;
109+
return this;
110+
}
111+
112+
public AbortableInputStreamSubscriber build() {
113+
return new AbortableInputStreamSubscriber(this);
114+
}
86115
}
87116
}
Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.http.crt.internal;
16+
package software.amazon.awssdk.http.async;
1717

1818
import static org.mockito.Mockito.verify;
1919

@@ -22,7 +22,6 @@
2222
import org.junit.jupiter.api.extension.ExtendWith;
2323
import org.mockito.Mock;
2424
import org.mockito.junit.jupiter.MockitoExtension;
25-
import software.amazon.awssdk.http.crt.internal.response.AbortableInputStreamSubscriber;
2625
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
2726

2827
@ExtendWith(MockitoExtension.class)
@@ -33,20 +32,39 @@ public class AbortableInputStreamSubscriberTest {
3332
@Mock
3433
private Runnable onClose;
3534

35+
@Mock
36+
private InputStreamSubscriber inputStreamSubscriber;
37+
3638
@BeforeEach
3739
void setUp() {
38-
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(onClose, new InputStreamSubscriber());
40+
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder()
41+
.doAfterClose(onClose),
42+
inputStreamSubscriber);
43+
44+
3945
}
4046

4147
@Test
42-
void close_shouldInvokeOnClose() {
48+
void close_closeConfigured_shouldInvokeOnClose() {
4349
abortableInputStreamSubscriber.close();
50+
verify(inputStreamSubscriber).close();
4451
verify(onClose).run();
4552
}
4653

4754
@Test
4855
void abort_shouldInvokeOnClose() {
56+
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder()
57+
.doAfterClose(onClose),
58+
inputStreamSubscriber);
4959
abortableInputStreamSubscriber.abort();
5060
verify(onClose).run();
5161
}
62+
63+
@Test
64+
void close_closeNotConfigured_shouldCloseDelegate() {
65+
abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(AbortableInputStreamSubscriber.builder(),
66+
inputStreamSubscriber);
67+
abortableInputStreamSubscriber.close();
68+
verify(inputStreamSubscriber).close();
69+
}
5270
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import software.amazon.awssdk.http.AbortableInputStream;
3232
import software.amazon.awssdk.http.SdkHttpFullResponse;
3333
import software.amazon.awssdk.http.SdkHttpResponse;
34+
import software.amazon.awssdk.http.async.AbortableInputStreamSubscriber;
3435
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
3536
import software.amazon.awssdk.utils.Logger;
36-
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
3737
import software.amazon.awssdk.utils.async.SimplePublisher;
3838

3939
/**
@@ -87,8 +87,10 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo
8787
@Override
8888
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
8989
if (inputStreamSubscriber == null) {
90-
inputStreamSubscriber = new AbortableInputStreamSubscriber(() -> responseHandlerHelper.closeConnection(stream),
91-
new InputStreamSubscriber());
90+
inputStreamSubscriber =
91+
AbortableInputStreamSubscriber.builder()
92+
.doAfterClose(() -> responseHandlerHelper.closeConnection(stream))
93+
.build();
9294
simplePublisher.subscribe(inputStreamSubscriber);
9395
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
9496
// the stream directly.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ private void onCancel() {
314314
try {
315315
SdkCancellationException e = new SdkCancellationException(
316316
"Subscriber cancelled before all events were published");
317-
log.warn(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
317+
log.debug(channelContext.channel(), () -> "Subscriber cancelled before all events were published");
318318
executeFuture.completeExceptionally(e);
319319
} finally {
320320
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",

test/codegen-generated-classes-test/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@
235235
<version>${awsjavasdk.version}</version>
236236
<scope>test</scope>
237237
</dependency>
238+
<dependency>
239+
<groupId>software.amazon.awssdk</groupId>
240+
<artifactId>netty-nio-client</artifactId>
241+
<version>${awsjavasdk.version}</version>
242+
<scope>test</scope>
243+
</dependency>
238244
<dependency>
239245
<groupId>io.reactivex.rxjava2</groupId>
240246
<artifactId>rxjava</artifactId>
@@ -305,6 +311,17 @@
305311
<skip>true</skip>
306312
</configuration>
307313
</plugin>
314+
<!-- Disable dependency check for this test module to speed up the build. -->
315+
<plugin>
316+
<groupId>org.apache.maven.plugins</groupId>
317+
<artifactId>maven-dependency-plugin</artifactId>
318+
<version>${maven-dependency-plugin.version}</version>
319+
<configuration>
320+
<failOnWarning>false</failOnWarning>
321+
<ignoreNonCompile>false</ignoreNonCompile>
322+
</configuration>
323+
</plugin>
324+
308325
</plugins>
309326
</build>
310327

0 commit comments

Comments
 (0)