Skip to content

Commit 09ce0ec

Browse files
committed
Fixed an issue in S3 multipart client where BlockingInputStreamAsyncRequestBody#writeInputStream could get stuck if any of the multipart request fails.
1 parent 44bf0d9 commit 09ce0ec

File tree

4 files changed

+143
-1
lines changed

4 files changed

+143
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Fixed an issue in S3 multipart client that could cause `BlockingInputStreamAsyncRequestBody#writeInputStream` to get stuck if any of the multipart request fails. See [#4801](https://github.com/aws/aws-sdk-java-v2/issues/4801)"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void onError(Throwable t) {
202202

203203
private void sendCurrentBody(AsyncRequestBody body) {
204204
downstreamPublisher.send(body).exceptionally(t -> {
205-
downstreamPublisher.error(t);
205+
upstreamSubscription.cancel();
206206
return null;
207207
});
208208
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.assertj.core.api.Assertions.fail;
1920
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
2021
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
2122
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
@@ -146,6 +147,20 @@ public Optional<Long> contentLength() {
146147

147148
}
148149

150+
@Test
151+
void downStreamFailed_shouldPropagateCancellation() {
152+
CompletableFuture<Void> future = new CompletableFuture<>();
153+
TestAsyncRequestBody asyncRequestBody = new TestAsyncRequestBody();
154+
SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, AsyncRequestBodySplitConfiguration.builder()
155+
.chunkSizeInBytes((long) CHUNK_SIZE)
156+
.bufferSizeInBytes(10L)
157+
.build());
158+
159+
assertThatThrownBy(() -> splittingPublisher.subscribe(requestBody -> {
160+
throw new RuntimeException("foobar");
161+
}).get(5, TimeUnit.SECONDS)).hasMessageContaining("foobar");
162+
assertThat(asyncRequestBody.cancelled).isTrue();
163+
}
149164

150165
private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
151166
SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.multipart;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.delete;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.post;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.put;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
24+
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
25+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
26+
27+
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
28+
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
29+
import java.io.InputStream;
30+
import java.net.URI;
31+
import java.util.concurrent.CancellationException;
32+
import java.util.concurrent.CompletableFuture;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.Timeout;
36+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
37+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
38+
import software.amazon.awssdk.core.async.AsyncRequestBody;
39+
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
40+
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
41+
import software.amazon.awssdk.regions.Region;
42+
import software.amazon.awssdk.services.s3.S3AsyncClient;
43+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
44+
import software.amazon.awssdk.services.s3.model.S3Exception;
45+
46+
@WireMockTest
47+
@Timeout(10)
48+
public class S3MultipartClientPutObjectWiremockTest {
49+
50+
private static final String BUCKET = "Example-Bucket";
51+
private static final String KEY = "Example-Object";
52+
private static final String CREATE_MULTIPART_PAYLOAD = "<InitiateMultipartUploadResult>\n"
53+
+ " <Bucket>string</Bucket>\n"
54+
+ " <Key>string</Key>\n"
55+
+ " <UploadId>string</UploadId>\n"
56+
+ "</InitiateMultipartUploadResult>";
57+
private S3AsyncClient s3AsyncClient;
58+
59+
@BeforeEach
60+
public void setup(WireMockRuntimeInfo wiremock) {
61+
stubPutObjectCalls();
62+
s3AsyncClient = S3AsyncClient.builder()
63+
.region(Region.US_EAST_1)
64+
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort()))
65+
.credentialsProvider(
66+
StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")))
67+
.multipartEnabled(true)
68+
.multipartConfiguration(b -> b.minimumPartSizeInBytes(10L).apiCallBufferSizeInBytes(10L))
69+
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
70+
.build();
71+
}
72+
73+
private void stubPutObjectCalls() {
74+
stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody(CREATE_MULTIPART_PAYLOAD)));
75+
stubFor(put(anyUrl()).willReturn(aResponse().withStatus(404)));
76+
stubFor(put(urlEqualTo("/Example-Bucket/Example-Object?partNumber=1&uploadId=string")).willReturn(aResponse().withStatus(200)));
77+
stubFor(delete(anyUrl()).willReturn(aResponse().withStatus(200)));
78+
}
79+
80+
// https://github.com/aws/aws-sdk-java-v2/issues/4801
81+
@Test
82+
void uploadWithUnknownContentLength_onePartFails_shouldCancelUpstream() {
83+
BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody = AsyncRequestBody.forBlockingInputStream(null);
84+
CompletableFuture<PutObjectResponse> putObjectResponse = s3AsyncClient.putObject(
85+
r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody);
86+
87+
assertThatThrownBy(() -> {
88+
try (InputStream inputStream = createUnlimitedInputStream()) {
89+
blockingInputStreamAsyncRequestBody.writeInputStream(inputStream);
90+
}
91+
}).isInstanceOf(CancellationException.class);
92+
93+
assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class);
94+
}
95+
96+
@Test
97+
void uploadWithKnownContentLength_onePartFails_shouldCancelUpstream() {
98+
BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody =
99+
AsyncRequestBody.forBlockingInputStream(1024L * 20); // must be larger than the buffer used in
100+
// InputStreamConsumingPublisher to trigger the error
101+
CompletableFuture<PutObjectResponse> putObjectResponse = s3AsyncClient.putObject(
102+
r -> r.bucket(BUCKET).key(KEY), blockingInputStreamAsyncRequestBody);
103+
104+
assertThatThrownBy(() -> {
105+
try (InputStream inputStream = createUnlimitedInputStream()) {
106+
blockingInputStreamAsyncRequestBody.writeInputStream(inputStream);
107+
}
108+
}).isInstanceOf(CancellationException.class);
109+
110+
assertThatThrownBy(() -> putObjectResponse.join()).hasRootCauseInstanceOf(S3Exception.class);
111+
}
112+
113+
private InputStream createUnlimitedInputStream() {
114+
return new InputStream() {
115+
@Override
116+
public int read() {
117+
return 1;
118+
}
119+
};
120+
}
121+
}

0 commit comments

Comments
 (0)