Skip to content

Add config class for SplittingTransformer #4939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core;

import java.util.Objects;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.SplittingTransformer;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration options for {@link AsyncResponseTransformer#split(SplittingTransformerConfiguration)} to configure how the SDK
* should split the {@link AsyncResponseTransformer}.
*
* @see #builder()
*/
@SdkPublicApi
public final class SplittingTransformerConfiguration implements ToCopyableBuilder<SplittingTransformerConfiguration.Builder,
SplittingTransformerConfiguration> {

private final Long bufferSize;

private SplittingTransformerConfiguration(DefaultBuilder builder) {
this.bufferSize = Validate.paramNotNull(builder.bufferSize, "bufferSize");
}

/**
* Create a {@link Builder}, used to create a {@link SplittingTransformerConfiguration}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

/**
* @return the buffer size
*/
public Long bufferSize() {
return bufferSize;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SplittingTransformerConfiguration that = (SplittingTransformerConfiguration) o;

return Objects.equals(bufferSize, that.bufferSize);
}

@Override
public int hashCode() {
return bufferSize != null ? bufferSize.hashCode() : 0;
}

@Override
public Builder toBuilder() {
return new DefaultBuilder(this);
}

public interface Builder extends CopyableBuilder<Builder, SplittingTransformerConfiguration> {

/**
* Configures the buffer size of the {@link SplittingTransformer}.
*
* @param bufferSize the buffer size
* @return This object for method chaining.
*/
Builder bufferSize(Long bufferSize);
}

private static final class DefaultBuilder implements Builder {
private Long bufferSize;

private DefaultBuilder(SplittingTransformerConfiguration configuration) {
this.bufferSize = configuration.bufferSize;
}

private DefaultBuilder() {
}

@Override
public Builder bufferSize(Long bufferSize) {
this.bufferSize = bufferSize;
return this;
}

@Override
public SplittingTransformerConfiguration build() {
return new SplittingTransformerConfiguration(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
Expand Down Expand Up @@ -115,17 +116,20 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
void exceptionOccurred(Throwable error);

/**
* todo - javadoc
* todo - use configuration class as input instead of bufferSize
* Creates an {@link SplitAsyncResponseTransformer} which contains an {@link SplittingTransformer} that splits the
* {@link AsyncResponseTransformer} into multiple ones, publishing them as a {@link SdkPublisher}.
*
* @return
* @param splitConfig configuration for the split transformer
* @return SplitAsyncResponseTransformer instance.
* @see SplittingTransformer
* @see SplitAsyncResponseTransformer
*/
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) {
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(SplittingTransformerConfiguration splitConfig) {
CompletableFuture<ResultT> future = new CompletableFuture<>();
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer
.<ResponseT, ResultT>builder()
.upstreamResponseTransformer(this)
.maximumBufferSize(bufferSize)
.maximumBufferSize(splitConfig.bufferSize())
.returnFuture(future)
.build();
return SplitAsyncResponseTransformer.<ResponseT, ResultT>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.utils.Validate;

/**
* Helper class containing the result of
* {@link AsyncResponseTransformer#split(long) splitting} an AsyncResponseTransformer. This class holds both the publisher of
* the individual {@code AsyncResponseTransformer<ResponseT, ResponseT>} and the {@code CompletableFuture<ResulT>} which will
* complete when the {@code AsyncResponseTransformer} that was split itself would complete.
* {@link AsyncResponseTransformer#split(SplittingTransformerConfiguration) splitting} an AsyncResponseTransformer. This class
* holds both the publisher of the individual {@code AsyncResponseTransformer<ResponseT, ResponseT>} and the
* {@code CompletableFuture <ResultT>} which will complete when the {@code AsyncResponseTransformer} that was split itself would
* complete.
*
* @see AsyncResponseTransformer#split(long)
* @see AsyncResponseTransformer#split(SplittingTransformerConfiguration)
* @param <ResponseT> ResponseT of the original AsyncResponseTransformer that was split.
* @param <ResultT> ResultT of the original AsyncResponseTransformer that was split.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand All @@ -32,8 +33,8 @@

/**
* Split a {@link AsyncResponseTransformer} into multiple ones, publishing them as a {@link SdkPublisher}. Created using the
* {@link AsyncResponseTransformer#split(long) split} method. The upstream {@link AsyncResponseTransformer} that is split will
* receive data from the individual transformers.
* {@link AsyncResponseTransformer#split(SplittingTransformerConfiguration) split} method. The upstream
* {@link AsyncResponseTransformer} that is split will receive data from the individual transformers.
* <p>
* This publisher also buffers an amount of data before sending it to the upstream transformer, as specified by the
* maximumBufferSize. ByteBuffers will be published once the buffer has been reached, or when the subscription to this publisher
Expand All @@ -48,7 +49,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
private static final Logger log = Logger.loggerFor(SplittingTransformer.class);

/**
* The AsyncResponseTransformer on which the {@link AsyncResponseTransformer#split(long) split} method was called.
* The AsyncResponseTransformer on which the {@link AsyncResponseTransformer#split(SplittingTransformerConfiguration) split}
* method was called.
*/
private final AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Test;

public class SplittingTransformerConfigurationTest {

@Test
void equalsHashcode() {
EqualsVerifier.forClass(SplittingTransformerConfiguration.class)
.withNonnullFields("bufferSize")
.verify();

}

@Test
void toBuilder() {
SplittingTransformerConfiguration configuration =
SplittingTransformerConfiguration.builder()
.bufferSize(4444L)
.build();

SplittingTransformerConfiguration another = configuration.toBuilder().build();
assertThat(configuration).isEqualTo(another);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.ApiName;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SplitAsyncResponseTransformer;
Expand Down Expand Up @@ -49,6 +50,7 @@ public final class MultipartS3AsyncClient extends DelegatingS3AsyncClient {

private final UploadObjectHelper mpuHelper;
private final CopyObjectHelper copyObjectHelper;
private final long apiCallBufferSize;

private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration multipartConfiguration) {
super(delegate);
Expand All @@ -57,6 +59,7 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu
MultipartConfigurationResolver resolver = new MultipartConfigurationResolver(validConfiguration);
long minPartSizeInBytes = resolver.minimalPartSizeInBytes();
long threshold = resolver.thresholdInBytes();
apiCallBufferSize = resolver.apiCallBufferSize();
mpuHelper = new UploadObjectHelper(delegate, resolver);
copyObjectHelper = new CopyObjectHelper(delegate, minPartSizeInBytes, threshold);
}
Expand All @@ -75,7 +78,7 @@ public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyOb
public <ReturnT> CompletableFuture<ReturnT> getObject(
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ReturnT> asyncResponseTransformer) {
SplitAsyncResponseTransformer<GetObjectResponse, ReturnT> split =
asyncResponseTransformer.split(1024L * 1024L * 32L); // todo take from config
asyncResponseTransformer.split(SplittingTransformerConfiguration.builder().bufferSize(apiCallBufferSize).build());
split.publisher().subscribe(new MultipartDownloaderSubscriber((S3AsyncClient) delegate(), getObjectRequest));
return split.preparedFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.ResponsePublisher;
import software.amazon.awssdk.core.async.SplitAsyncResponseTransformer;
Expand All @@ -49,6 +50,9 @@ class MultipartDownloadIntegrationTest {
static final String key = String.format("debug-test-%smb", fileTestSize);

private S3AsyncClient s3;
private final SplittingTransformerConfiguration splitConfig = SplittingTransformerConfiguration.builder()
.bufferSize(1024 * 1024 * 32L)
.build();

@BeforeEach
void init() {
Expand All @@ -67,7 +71,7 @@ void testByteAsyncResponseTransformer() {
s3, GetObjectRequest.builder().bucket(bucket).key(key).build());

SplitAsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> split =
transformer.split(1024 * 1024 * 32);
transformer.split(splitConfig);
split.publisher().subscribe(downloaderSubscriber);
ResponseBytes<GetObjectResponse> res = split.preparedFuture().join();
log.info(() -> "complete");
Expand All @@ -85,7 +89,7 @@ void testFileAsyncResponseTransformer() {
MultipartDownloaderSubscriber downloaderSubscriber = new MultipartDownloaderSubscriber(
s3, GetObjectRequest.builder().bucket(bucket).key(key).build());

SplitAsyncResponseTransformer<GetObjectResponse, GetObjectResponse> split = transformer.split(1024 * 1024 * 32);
SplitAsyncResponseTransformer<GetObjectResponse, GetObjectResponse> split = transformer.split(splitConfig);
split.publisher().subscribe(downloaderSubscriber);
GetObjectResponse res = split.preparedFuture().join();
log.info(() -> "complete");
Expand All @@ -101,7 +105,7 @@ void testPublisherAsyncResponseTransformer() {
MultipartDownloaderSubscriber downloaderSubscriber = new MultipartDownloaderSubscriber(
s3, GetObjectRequest.builder().bucket(bucket).key(key).build());
SplitAsyncResponseTransformer<GetObjectResponse, ResponsePublisher<GetObjectResponse>> split =
transformer.split(1024 * 1024 * 32);
transformer.split(splitConfig);
split.publisher().subscribe(downloaderSubscriber);
split.preparedFuture().whenComplete((res, e) -> {
log.info(() -> "complete");
Expand Down Expand Up @@ -143,7 +147,7 @@ void testBlockingInputStreamResponseTransformer() {
s3, GetObjectRequest.builder().bucket(bucket).key(key).build());

SplitAsyncResponseTransformer<GetObjectResponse, ResponseInputStream<GetObjectResponse>> split =
transformer.split(1024 * 1024 * 32);
transformer.split(splitConfig);
split.publisher().subscribe(downloaderSubscriber);
ResponseInputStream<GetObjectResponse> res = split.preparedFuture().join();
log.info(() -> "complete");
Expand Down