Skip to content

Commit 35e6e4e

Browse files
authored
Create split method in AsyncRequestBody to return SplittingPublisher (#4188)
* Create split method in AsyncRequestBody to return SplittingPublisher * Fix Javadoc and build
1 parent afe5f58 commit 35e6e4e

File tree

6 files changed

+198
-38
lines changed

6 files changed

+198
-38
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
2525
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.ExecutorService;
2728
import org.reactivestreams.Publisher;
2829
import org.reactivestreams.Subscriber;
2930
import software.amazon.awssdk.annotations.SdkPublicApi;
3031
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
3132
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3233
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
34+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
3335
import software.amazon.awssdk.core.internal.util.Mimetype;
3436
import software.amazon.awssdk.utils.BinaryUtils;
37+
import software.amazon.awssdk.utils.Validate;
3538

3639
/**
3740
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
@@ -246,4 +249,42 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
246249
static AsyncRequestBody empty() {
247250
return fromBytes(new byte[0]);
248251
}
252+
253+
254+
/**
255+
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
256+
* portion of the original data, based on the configured {code chunkSizeInBytes}.
257+
*
258+
* <p>
259+
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
260+
* subscriber right after it's initialized.
261+
* <p>
262+
* // TODO: API Surface Area review: should we make this behavior configurable?
263+
* If content length is null, it is sent after the entire content for that chunk is buffered.
264+
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
265+
*
266+
* @param chunkSizeInBytes the size for each divided chunk. The last chunk may be smaller than the configured size.
267+
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
268+
* @return SplitAsyncRequestBodyResult
269+
*/
270+
default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
271+
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
272+
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
273+
274+
if (!this.contentLength().isPresent()) {
275+
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
276+
"maxMemoryUsageInBytes must be larger than or equal to " +
277+
"chunkSizeInBytes if the content length is unknown");
278+
}
279+
280+
CompletableFuture<Void> future = new CompletableFuture<>();
281+
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
282+
.asyncRequestBody(this)
283+
.chunkSizeInBytes(chunkSizeInBytes)
284+
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
285+
.resultFuture(future)
286+
.build();
287+
288+
return SplitAsyncRequestBodyResponse.create(splittingPublisher, future);
289+
}
249290
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.utils.Validate;
22+
23+
/**
24+
* Containing the result from {@link AsyncRequestBody#split(long, long)}
25+
*/
26+
@SdkPublicApi
27+
public final class SplitAsyncRequestBodyResponse {
28+
private final SdkPublisher<AsyncRequestBody> asyncRequestBody;
29+
private final CompletableFuture<Void> future;
30+
31+
private SplitAsyncRequestBodyResponse(SdkPublisher<AsyncRequestBody> asyncRequestBody, CompletableFuture<Void> future) {
32+
this.asyncRequestBody = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody");
33+
this.future = Validate.paramNotNull(future, "future");
34+
}
35+
36+
public static SplitAsyncRequestBodyResponse create(SdkPublisher<AsyncRequestBody> asyncRequestBody,
37+
CompletableFuture<Void> future) {
38+
return new SplitAsyncRequestBodyResponse(asyncRequestBody, future);
39+
}
40+
41+
/**
42+
* Returns the converted {@link SdkPublisher} of {@link AsyncRequestBody}s. Each {@link AsyncRequestBody} publishes a specific
43+
* portion of the original data.
44+
*/
45+
public SdkPublisher<AsyncRequestBody> asyncRequestBodyPublisher() {
46+
return asyncRequestBody;
47+
}
48+
49+
/**
50+
* Returns {@link CompletableFuture} that will be notified when all data has been consumed or if an error occurs.
51+
*/
52+
public CompletableFuture<Void> future() {
53+
return future;
54+
}
55+
56+
@Override
57+
public boolean equals(Object o) {
58+
if (this == o) {
59+
return true;
60+
}
61+
if (o == null || getClass() != o.getClass()) {
62+
return false;
63+
}
64+
65+
SplitAsyncRequestBodyResponse that = (SplitAsyncRequestBodyResponse) o;
66+
67+
if (!asyncRequestBody.equals(that.asyncRequestBody)) {
68+
return false;
69+
}
70+
return future.equals(that.future);
71+
}
72+
73+
@Override
74+
public int hashCode() {
75+
int result = asyncRequestBody.hashCode();
76+
result = 31 * result + future.hashCode();
77+
return result;
78+
}
79+
}
80+

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@
3131
import software.amazon.awssdk.utils.async.SimplePublisher;
3232

3333
/**
34-
* Splits an {@link SdkPublisher} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of the
35-
* original data.
34+
* Splits an {@link AsyncRequestBody} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of
35+
* the original data.
3636
*
3737
* <p>If content length is known, each {@link AsyncRequestBody} is sent to the subscriber right after it's initialized.
3838
* Otherwise, it is sent after the entire content for that chunk is buffered. This is required to get content length.
39-
*
40-
* // TODO: create a default method in AsyncRequestBody for this
4139
*/
4240
@SdkInternalApi
4341
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
@@ -51,9 +49,9 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
5149

5250
private SplittingPublisher(Builder builder) {
5351
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
54-
this.chunkSizeInBytes = Validate.paramNotNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
52+
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
5553
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
56-
this.maxMemoryUsageInBytes = builder.maxMemoryUsageInBytes == null ? Long.MAX_VALUE : builder.maxMemoryUsageInBytes;
54+
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
5755
this.future = builder.future;
5856

5957
// We need to cancel upstream subscription if the future gets cancelled.
@@ -304,13 +302,13 @@ public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
304302
* @param chunkSizeInBytes The new chunkSizeInBytes value.
305303
* @return This object for method chaining.
306304
*/
307-
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
305+
public Builder chunkSizeInBytes(long chunkSizeInBytes) {
308306
this.chunkSizeInBytes = chunkSizeInBytes;
309307
return this;
310308
}
311309

312310
/**
313-
* Sets the maximum memory usage in bytes. By default, it uses unlimited memory.
311+
* Sets the maximum memory usage in bytes.
314312
*
315313
* @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
316314
* @return This object for method chaining.
@@ -319,7 +317,7 @@ public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
319317
// on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
320318
// for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
321319
// buffer size instead?
322-
public Builder maxMemoryUsageInBytes(Long maxMemoryUsageInBytes) {
320+
public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
323321
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
324322
return this;
325323
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,30 @@
1515

1616
package software.amazon.awssdk.core.async;
1717

18-
import static java.nio.charset.StandardCharsets.UTF_8;
1918
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020

2121
import com.google.common.jimfs.Configuration;
2222
import com.google.common.jimfs.Jimfs;
2323
import io.reactivex.Flowable;
24-
import java.io.File;
25-
import java.io.FileWriter;
2624
import java.io.IOException;
27-
import java.io.InputStream;
2825
import java.nio.ByteBuffer;
2926
import java.nio.charset.StandardCharsets;
3027
import java.nio.file.FileSystem;
3128
import java.nio.file.Files;
3229
import java.nio.file.Path;
33-
import java.time.Instant;
34-
import java.util.Collections;
3530
import java.util.List;
36-
import java.util.concurrent.Callable;
3731
import java.util.concurrent.CountDownLatch;
3832
import java.util.stream.Collectors;
3933
import org.assertj.core.util.Lists;
40-
import org.junit.Rule;
4134
import org.junit.Test;
42-
import org.junit.rules.TemporaryFolder;
4335
import org.junit.runner.RunWith;
4436
import org.junit.runners.Parameterized;
4537
import org.reactivestreams.Publisher;
4638
import org.reactivestreams.Subscriber;
4739
import software.amazon.awssdk.core.internal.util.Mimetype;
4840
import software.amazon.awssdk.http.async.SimpleSubscriber;
4941
import software.amazon.awssdk.utils.BinaryUtils;
50-
import software.amazon.awssdk.utils.StringInputStream;
5142

5243
@RunWith(Parameterized.class)
5344
public class AsyncRequestBodyTest {
@@ -177,4 +168,25 @@ public void fromBytes_byteArrayNotNull_createsCopy() {
177168
ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
178169
assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
179170
}
171+
172+
@Test
173+
public void split_nonPositiveInput_shouldThrowException() {
174+
AsyncRequestBody body = AsyncRequestBody.fromString("test");
175+
assertThatThrownBy(() -> body.split(0, 4)).hasMessageContaining("must be positive");
176+
assertThatThrownBy(() -> body.split(-1, 4)).hasMessageContaining("must be positive");
177+
assertThatThrownBy(() -> body.split(5, 0)).hasMessageContaining("must be positive");
178+
assertThatThrownBy(() -> body.split(5, -1)).hasMessageContaining("must be positive");
179+
}
180+
181+
@Test
182+
public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() {
183+
AsyncRequestBody body = AsyncRequestBody.fromPublisher(new Publisher<ByteBuffer>() {
184+
@Override
185+
public void subscribe(Subscriber<? super ByteBuffer> s) {
186+
187+
}
188+
});
189+
assertThatThrownBy(() -> body.split(10, 4))
190+
.hasMessageContaining("must be larger than or equal");
191+
}
180192
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import nl.jqno.equalsverifier.EqualsVerifier;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class SplitAsyncRequestBodyResponseTest {
22+
23+
@Test
24+
void equalsHashcode() {
25+
EqualsVerifier.forClass(SplitAsyncRequestBodyResponse.class)
26+
.withNonnullFields("asyncRequestBody", "future")
27+
.verify();
28+
}
29+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.function.Function;
2626
import software.amazon.awssdk.annotations.SdkInternalApi;
2727
import software.amazon.awssdk.core.async.AsyncRequestBody;
28-
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
28+
import software.amazon.awssdk.core.async.SplitAsyncRequestBodyResponse;
2929
import software.amazon.awssdk.services.s3.S3AsyncClient;
3030
import software.amazon.awssdk.services.s3.model.CompletedPart;
3131
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -169,26 +169,26 @@ private CompletableFuture<Void> sendUploadPartRequests(MpuRequestContext mpuRequ
169169
CompletableFuture<PutObjectResponse> returnFuture,
170170
Collection<CompletableFuture<CompletedPart>> futures) {
171171

172-
CompletableFuture<Void> splittingPublisherFuture = new CompletableFuture<>();
172+
173173

174174
AsyncRequestBody asyncRequestBody = mpuRequestContext.request.right();
175-
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
176-
.asyncRequestBody(asyncRequestBody)
177-
.chunkSizeInBytes(mpuRequestContext.partSize)
178-
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
179-
.resultFuture(splittingPublisherFuture)
180-
.build();
181-
182-
splittingPublisher.map(new BodyToRequestConverter(mpuRequestContext.request.left(), mpuRequestContext.uploadId))
183-
.subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId,
184-
completedParts,
185-
futures,
186-
pair,
187-
splittingPublisherFuture))
188-
.exceptionally(throwable -> {
189-
returnFuture.completeExceptionally(throwable);
190-
return null;
191-
});
175+
176+
SplitAsyncRequestBodyResponse result = asyncRequestBody.split(mpuRequestContext.partSize, maxMemoryUsageInBytes);
177+
178+
CompletableFuture<Void> splittingPublisherFuture = result.future();
179+
180+
result.asyncRequestBodyPublisher()
181+
.map(new BodyToRequestConverter(mpuRequestContext.request.left(),
182+
mpuRequestContext.uploadId))
183+
.subscribe(pair -> sendIndividualUploadPartRequest(mpuRequestContext.uploadId,
184+
completedParts,
185+
futures,
186+
pair,
187+
splittingPublisherFuture))
188+
.exceptionally(throwable -> {
189+
returnFuture.completeExceptionally(throwable);
190+
return null;
191+
});
192192
return splittingPublisherFuture;
193193
}
194194

0 commit comments

Comments
 (0)