Skip to content

Commit 91eee4c

Browse files
committed
Fix request cancellation issue in the AWS CRT-based S3 client that could lead to memory leak
1 parent 2692379 commit 91eee4c

File tree

15 files changed

+377
-69
lines changed

15 files changed

+377
-69
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT-based S3 client",
4+
"contributor": "",
5+
"description": "Fixed memory leak issue when a request was cancelled in the AWS CRT-based S3 client."
6+
}

build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,8 @@
5353
<!-- Allow private field declaration before public, to have correct initialization order -->
5454
<suppress checks="DeclarationOrder"
5555
files=".*SdkAdvancedClientOption\.java$"/>
56+
57+
<!-- Ignore usage of S3MetaRequest in S3MetaRequestWrapper. !-->
58+
<suppress checks="Regexp"
59+
files="software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper.java"/>
5660
</suppressions>

build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@
359359
<property name="ignoreComments" value="true"/>
360360
</module>
361361

362+
<!-- Checks that we don't use S3MetaRequest -->
363+
<module name="Regexp">
364+
<property name="format" value="\bS3MetaRequest\b"/>
365+
<property name="illegalPattern" value="true"/>
366+
<property name="message" value="Don't use S3MetaRequest directly. Use software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper instead"/>
367+
<property name="ignoreComments" value="true"/>
368+
</module>
369+
362370
<!-- Checks that we don't implement AutoCloseable/Closeable -->
363371
<module name="Regexp">
364372
<property name="format" value="(class|interface).*(implements|extends).*[^\w](Closeable|AutoCloseable)[^\w]"/>

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import software.amazon.awssdk.crt.s3.ResumeToken;
4646
import software.amazon.awssdk.crt.s3.S3Client;
4747
import software.amazon.awssdk.crt.s3.S3ClientOptions;
48-
import software.amazon.awssdk.crt.s3.S3MetaRequest;
4948
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
5049
import software.amazon.awssdk.http.Header;
5150
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
@@ -133,10 +132,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
133132
URI uri = asyncRequest.request().getUri();
134133
HttpRequest httpRequest = toCrtRequest(asyncRequest);
135134
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
135+
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
136136
S3CrtResponseHandlerAdapter responseHandler =
137137
new S3CrtResponseHandlerAdapter(executeFuture,
138138
asyncRequest.responseHandler(),
139-
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER));
139+
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
140+
s3MetaRequestFuture);
140141

141142
S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);
142143

@@ -160,16 +161,19 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
160161
.withRequestFilePath(requestFilePath)
161162
.withSigningConfig(signingConfig);
162163

163-
S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
164-
S3MetaRequestPauseObservable observable =
165-
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
164+
try {
165+
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));
166+
s3MetaRequestFuture.complete(requestWrapper);
166167

167-
responseHandler.metaRequest(s3MetaRequest);
168+
S3MetaRequestPauseObservable observable =
169+
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
168170

169-
if (observable != null) {
170-
observable.subscribe(s3MetaRequest);
171+
if (observable != null) {
172+
observable.subscribe(requestWrapper);
173+
}
174+
} finally {
175+
signingConfig.close();
171176
}
172-
closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig);
173177

174178
return executeFuture;
175179
}
@@ -215,23 +219,6 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
215219
return S3MetaRequestOptions.MetaRequestType.DEFAULT;
216220
}
217221

218-
private static void closeResourceCallback(CompletableFuture<Void> executeFuture,
219-
S3MetaRequest s3MetaRequest,
220-
S3CrtResponseHandlerAdapter responseHandler,
221-
AwsSigningConfig signingConfig) {
222-
executeFuture.whenComplete((r, t) -> {
223-
if (executeFuture.isCancelled()) {
224-
log.debug(() -> "The request is cancelled, cancelling meta request");
225-
responseHandler.cancelRequest();
226-
s3MetaRequest.cancel();
227-
signingConfig.close();
228-
} else {
229-
s3MetaRequest.close();
230-
signingConfig.close();
231-
}
232-
});
233-
}
234-
235222
private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
236223
SdkHttpRequest sdkRequest = asyncRequest.request();
237224

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@
2525
import software.amazon.awssdk.crt.CRT;
2626
import software.amazon.awssdk.crt.http.HttpHeader;
2727
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
28-
import software.amazon.awssdk.crt.s3.S3MetaRequest;
2928
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
3029
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
31-
import software.amazon.awssdk.http.SdkCancellationException;
3230
import software.amazon.awssdk.http.SdkHttpResponse;
3331
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
3432
import software.amazon.awssdk.utils.Logger;
@@ -46,20 +44,44 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
4644
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();
4745

4846
private final SdkHttpResponse.Builder initialHeadersResponse = SdkHttpResponse.builder();
49-
private volatile S3MetaRequest metaRequest;
47+
private final CompletableFuture<S3MetaRequestWrapper> metaRequestFuture;
5048

5149
private final PublisherListener<S3MetaRequestProgress> progressListener;
5250

5351
private volatile boolean responseHandlingInitiated;
5452

5553
public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture,
5654
SdkAsyncHttpResponseHandler responseHandler,
57-
PublisherListener<S3MetaRequestProgress> progressListener) {
55+
PublisherListener<S3MetaRequestProgress> progressListener,
56+
CompletableFuture<S3MetaRequestWrapper> metaRequestFuture) {
5857
this.resultFuture = executeFuture;
58+
this.metaRequestFuture = metaRequestFuture;
59+
60+
resultFuture.whenComplete((r, t) -> {
61+
S3MetaRequestWrapper s3MetaRequest = s3MetaRequest();
62+
if (s3MetaRequest == null) {
63+
return;
64+
}
65+
66+
if (executeFuture.isCancelled()) {
67+
s3MetaRequest.cancel();
68+
}
69+
s3MetaRequest.close();
70+
});
71+
5972
this.responseHandler = responseHandler;
6073
this.progressListener = progressListener == null ? new NoOpPublisherListener() : progressListener;
6174
}
6275

76+
private S3MetaRequestWrapper s3MetaRequest() {
77+
if (!metaRequestFuture.isDone()) {
78+
return null;
79+
}
80+
81+
S3MetaRequestWrapper s3MetaRequest = metaRequestFuture.join();
82+
return s3MetaRequest;
83+
}
84+
6385
@Override
6486
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
6587
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
@@ -86,8 +108,7 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
86108
failResponseHandlerAndFuture(failure);
87109
return;
88110
}
89-
90-
metaRequest.incrementReadWindow(bytesReceived);
111+
s3MetaRequest().incrementReadWindow(bytesReceived);
91112
});
92113

93114
// Returning 0 to disable flow control because we manually increase read window above
@@ -115,22 +136,10 @@ private void onSuccessfulResponseComplete() {
115136
return;
116137
}
117138
this.progressListener.subscriberOnComplete();
118-
completeFutureAndCloseRequest();
139+
resultFuture.complete(null);
119140
});
120141
}
121142

122-
private void completeFutureAndCloseRequest() {
123-
resultFuture.complete(null);
124-
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
125-
() -> metaRequest.close());
126-
}
127-
128-
public void cancelRequest() {
129-
SdkCancellationException sdkClientException =
130-
new SdkCancellationException("request is cancelled");
131-
failResponseHandlerAndFuture(sdkClientException);
132-
}
133-
134143
private void handleError(S3FinishedResponseContext context) {
135144
int crtCode = context.getErrorCode();
136145
HttpHeader[] headers = context.getErrorHeaders();
@@ -168,27 +177,21 @@ private void onErrorResponseComplete(byte[] errorPayload) {
168177
failResponseHandlerAndFuture(throwable);
169178
return null;
170179
}
171-
completeFutureAndCloseRequest();
180+
resultFuture.complete(null);
172181
return null;
173182
});
174183
}
175184

176185
private void failResponseHandlerAndFuture(Throwable exception) {
177-
resultFuture.completeExceptionally(exception);
178186
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
179187
() -> responseHandler.onError(exception));
180-
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
181-
() -> metaRequest.close());
188+
resultFuture.completeExceptionally(exception);
182189
}
183190

184191
private static boolean isErrorResponse(int responseStatus) {
185192
return responseStatus != 0;
186193
}
187194

188-
public void metaRequest(S3MetaRequest s3MetaRequest) {
189-
metaRequest = s3MetaRequest;
190-
}
191-
192195
@Override
193196
public void onProgress(S3MetaRequestProgress progress) {
194197
this.progressListener.subscriberOnNext(progress);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,24 @@
1818
import java.util.function.Function;
1919
import software.amazon.awssdk.annotations.SdkInternalApi;
2020
import software.amazon.awssdk.crt.s3.ResumeToken;
21-
import software.amazon.awssdk.crt.s3.S3MetaRequest;
2221

2322
/**
2423
* An observable that notifies the observer {@link S3CrtAsyncHttpClient} to pause the request.
2524
*/
2625
@SdkInternalApi
2726
public class S3MetaRequestPauseObservable {
2827

29-
private final Function<S3MetaRequest, ResumeToken> pause;
30-
private volatile S3MetaRequest request;
28+
private final Function<S3MetaRequestWrapper, ResumeToken> pause;
29+
private volatile S3MetaRequestWrapper request;
3130

3231
public S3MetaRequestPauseObservable() {
33-
this.pause = S3MetaRequest::pause;
32+
this.pause = S3MetaRequestWrapper::pause;
3433
}
3534

3635
/**
37-
* Subscribe {@link S3MetaRequest} to be potentially paused later.
36+
* Subscribe {@link S3MetaRequestWrapper} to be potentially paused later.
3837
*/
39-
public void subscribe(S3MetaRequest request) {
38+
public void subscribe(S3MetaRequestWrapper request) {
4039
this.request = request;
4140
}
4241

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import software.amazon.awssdk.annotations.SdkInternalApi;
19+
import software.amazon.awssdk.crt.s3.ResumeToken;
20+
import software.amazon.awssdk.crt.s3.S3MetaRequest;
21+
22+
/**
23+
* A wrapper class that manages the lifecycle of the underlying {@link S3MetaRequest}. This class is needed to ensure we don't
24+
* invoke methods on {@link S3MetaRequest} after it's closed, otherwise CRT will crash.
25+
*/
26+
@SdkInternalApi
27+
public class S3MetaRequestWrapper {
28+
private final S3MetaRequest delegate;
29+
private volatile boolean isClosed;
30+
private final Object lock = new Object();
31+
32+
public S3MetaRequestWrapper(S3MetaRequest delegate) {
33+
this.delegate = delegate;
34+
}
35+
36+
public void close() {
37+
synchronized (lock) {
38+
if (!isClosed) {
39+
isClosed = true;
40+
delegate.close();
41+
}
42+
}
43+
}
44+
45+
public void incrementReadWindow(long windowSize) {
46+
synchronized (lock) {
47+
if (!isClosed) {
48+
delegate.incrementReadWindow(windowSize);
49+
}
50+
}
51+
}
52+
53+
public ResumeToken pause() {
54+
synchronized (lock) {
55+
if (!isClosed) {
56+
return delegate.pause();
57+
}
58+
}
59+
return null;
60+
}
61+
62+
public void cancel() {
63+
synchronized (lock) {
64+
if (!isClosed) {
65+
delegate.cancel();
66+
}
67+
}
68+
}
69+
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressCreateSessionTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@
6464
import software.amazon.awssdk.services.s3.model.Protocol;
6565
import software.amazon.awssdk.services.s3.model.S3Exception;
6666
import software.amazon.awssdk.utils.AttributeMap;
67+
import software.amazon.awssdk.utils.Logger;
6768
import software.amazon.awssdk.utils.http.SdkHttpUtils;
6869

6970
@WireMockTest(httpsEnabled = true)
7071
public class S3ExpressCreateSessionTest extends BaseRuleSetClientTest {
72+
private static final Logger log = Logger.loggerFor(S3ExpressCreateSessionTest.class);
7173

7274
private static final Function<WireMockRuntimeInfo, URI> WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
7375
private static final Function<WireMockRuntimeInfo, URI> WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
@@ -329,9 +331,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
329331
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
330332
SdkHttpRequest sdkHttpRequest = context.httpRequest();
331333
this.headers = sdkHttpRequest.headers();
332-
System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
333-
headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
334-
System.out.println();
334+
log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));
335+
headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
335336
}
336337
}
337338
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/S3ExpressTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@
7070
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
7171
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
7272
import software.amazon.awssdk.utils.AttributeMap;
73+
import software.amazon.awssdk.utils.Logger;
7374
import software.amazon.awssdk.utils.http.SdkHttpUtils;
7475

7576
@WireMockTest(httpsEnabled = true)
7677
public class S3ExpressTest extends BaseRuleSetClientTest {
77-
78+
private static final Logger log = Logger.loggerFor(S3ExpressTest.class);
7879
private static final Function<WireMockRuntimeInfo, URI> WM_HTTP_ENDPOINT = wm -> URI.create(wm.getHttpBaseUrl());
7980
private static final Function<WireMockRuntimeInfo, URI> WM_HTTPS_ENDPOINT = wm -> URI.create(wm.getHttpsBaseUrl());
8081
private static final AwsCredentialsProvider CREDENTIALS_PROVIDER =
@@ -431,9 +432,8 @@ private static final class CapturingInterceptor implements ExecutionInterceptor
431432
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
432433
SdkHttpRequest sdkHttpRequest = context.httpRequest();
433434
this.headers = sdkHttpRequest.headers();
434-
System.out.printf("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath());
435-
headers.forEach((k, strings) -> System.out.printf("%s, %s%n", k, strings));
436-
System.out.println();
435+
log.debug(() -> String.format("%s %s%n", sdkHttpRequest.method(), sdkHttpRequest.encodedPath()));
436+
headers.forEach((k, strings) -> log.debug(() -> String.format("%s, %s%n", k, strings)));
437437
}
438438
}
439439
}

0 commit comments

Comments
 (0)