Skip to content

Splitting transformer #4826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.SplittingTransformer;
import software.amazon.awssdk.utils.Validate;

/**
Expand All @@ -38,8 +39,8 @@
* <h2>Synchronization</h2>
* <p>
* All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be
* invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be invoked
* concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
* <p>
* <h2>Invocation Order</h2>
* <p>
Expand Down Expand Up @@ -81,11 +82,10 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
/**
* Initial call to enable any setup required before the response is handled.
* <p>
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link
* software.amazon.awssdk.core.retry.RetryPolicy}.
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured
* {@link software.amazon.awssdk.core.retry.RetryPolicy}.
* <p>
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is
* signaled.
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is signaled.
*
* @return The future holding the transformed response.
*/
Expand All @@ -106,18 +106,38 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
void onStream(SdkPublisher<ByteBuffer> publisher);

/**
* Called when an error is encountered while making the request or receiving the response.
* Implementations should free up any resources in this method. This method may be called
* multiple times during the lifecycle of a request if automatic retries are enabled.
* Called when an error is encountered while making the request or receiving the response. Implementations should free up any
* resources in this method. This method may be called multiple times during the lifecycle of a request if automatic retries
* are enabled.
*
* @param error Error that occurred.
*/
void exceptionOccurred(Throwable error);

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
* exception will be thrown.
* todo - javadoc
* todo - use configuration class as input instead of bufferSize
*
* @return
*/
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) {
CompletableFuture<ResultT> future = new CompletableFuture<>();
Comment on lines +123 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the split() method return a SplittingTransformer instead of a SplitAsyncResponseTransformer? Can't the splitting transformer create the future and make it available to the user in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid returning the concrete type SplittingTransformer and instead return the interface type SdkPublisher<AsyncResponseTransformer<T>>. If we return a SplittingTransformer here, we would need to make it @SdkPublicAPI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reasonSplitAsyncResponseTransformer doesn't implement AsyncResponseTransformer interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitAsyncResponseTransformer is just a holder for the publisher and a completable future, the class implementing AsyncResponseTransformer is SplittingTransformer.IndividualTransformer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it implement AsyncResponseTransformer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I don't think so. The usage would be:

SplitAsyncResponseTransformer<GetObjectResponse, GetObjectResponse> split = transformer.split(...);
split.publisher().subscribe(downloaderSubscriber);
CompletableFuture<GetObjectResponse> future = split.preparedFuture();
// do whatever we want with the future
GetObjectResponse response = future.join();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seems a bit odd that the static factory method returns a class that doesn't implement the same interface. Can we make it implement it? We did the same for toPublisher method

https://github.com/aws/aws-sdk-java-v2/blob/master/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java#L235

Copy link
Contributor Author

@L-Applin L-Applin Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.split() is not a static method though, it is an instance method called on a specific instance of an AsyncResponseTransformer just like the split() method on AsyncRequestBody. We could instead return just the SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> like AsyncRequestBody.split. I would need to find another way to deal with the future that needs to be returned from the service call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll need to think a bit more on this. Let's keep this conversation open for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO to revisit the naming? It's a bit odd that this class is suffixed with AsyncResponseTransformer but doesn't implement it

SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer
.<ResponseT, ResultT>builder()
.upstreamResponseTransformer(this)
.maximumBufferSize(bufferSize)
.returnFuture(future)
.build();
return SplitAsyncResponseTransformer.<ResponseT, ResultT>builder()
.asyncResponseTransformerPublisher(transformer)
.future(future)
.build();
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will
* be thrown.
*
* @param path Path to file to write to.
* @param <ResponseT> Pojo Response type.
Expand All @@ -129,8 +149,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
* FileTransformerConfiguration}.
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified
* {@link FileTransformerConfiguration}.
*
* @param path Path to file to write to.
* @param config configuration for the transformer
Expand All @@ -143,8 +163,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
}

/**
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the
* need to create one manually via {@link FileTransformerConfiguration#builder()}.
*
* @see #toFile(Path, FileTransformerConfiguration)
*/
Expand All @@ -155,9 +175,9 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
* exception will be thrown.
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will
* be thrown.
*
* @param file File to write to.
* @param <ResponseT> Pojo Response type.
Expand All @@ -168,8 +188,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
* FileTransformerConfiguration}.
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified
* {@link FileTransformerConfiguration}.
*
* @param file File to write to.
* @param config configuration for the transformer
Expand All @@ -182,8 +202,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi
}

/**
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the
* need to create one manually via {@link FileTransformerConfiguration#builder()}.
*
* @see #toFile(File, FileTransformerConfiguration)
*/
Expand Down Expand Up @@ -237,16 +257,14 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
}

/**
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an
* {@link InputStream}.
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}.
* <p>
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will
* be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This
* behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only
* have their {@link CompletableFuture} completed after the entire response body has finished streaming.
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
* once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
* completed after the entire response body has finished streaming.
* <p>
* You are responsible for performing blocking reads from this input stream and closing the stream when you are
* finished.
* You are responsible for performing blocking reads from this input stream and closing the stream when you are finished.
* <p>
* Example usage:
* <pre>
Expand All @@ -260,7 +278,7 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
* </pre>
*/
static <ResponseT extends SdkResponse>
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
return new InputStreamResponseTransformer<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.Validate;

/**
* Helper class containing the result of
* {@link AsyncResponseTransformer#split(long) splitting} an AsyncResponseTransformer. This class holds both the publisher of
* the individual {@code AsyncResponseTransformer<ResponseT, ResponseT>} and the {@code CompletableFuture<ResulT>} which will
* complete when the {@code AsyncResponseTransformer} that was split itself would complete.
*
* @see AsyncResponseTransformer#split(long)
* @param <ResponseT> ResponseT of the original AsyncResponseTransformer that was split.
* @param <ResultT> ResultT of the original AsyncResponseTransformer that was split.
*
*/
@SdkPublicApi
public final class SplitAsyncResponseTransformer<ResponseT, ResultT> {
private final SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher;
private final CompletableFuture<ResultT> future;

private SplitAsyncResponseTransformer(Builder<ResponseT, ResultT> builder) {
this.asyncResponseTransformerPublisher = Validate.paramNotNull(
builder.asyncResponseTransformerPublisher, "asyncResponseTransformerPublisher");
this.future = Validate.paramNotNull(
builder.future, "future");
}

/**
* The individual {@link AsyncResponseTransformer} will be available through the publisher returned by this method.
* @return the publisher which publishes the individual {@link AsyncResponseTransformer}
*/
public SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher() {
return this.asyncResponseTransformerPublisher;
}

/**
* The future returned by this method will be completed when the future returned by calling the
* {@link AsyncResponseTransformer#prepare()} method on the AsyncResponseTransformer which was split completes.
* @return The future
*/
public CompletableFuture<ResultT> preparedFuture() {
return this.future;
}

public static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
return new Builder<>();
}

public static class Builder<ResponseT, ResultT> {
private SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher;
private CompletableFuture<ResultT> future;

public Builder<ResponseT, ResultT> asyncResponseTransformerPublisher(
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher) {
this.asyncResponseTransformerPublisher = asyncResponseTransformerPublisher;
return this;
}

public Builder<ResponseT, ResultT> future(CompletableFuture<ResultT> future) {
this.future = future;
return this;
}

public SplitAsyncResponseTransformer<ResponseT, ResultT> build() {
return new SplitAsyncResponseTransformer<>(this);
}
}
}
Loading