-
Notifications
You must be signed in to change notification settings - Fork 916
Splitting transformer #4826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Splitting transformer #4826
Changes from all commits
fe243ae
e6d2ab9
a73ec72
f76307a
54223e9
076f5c1
57c0140
4b10284
c53a980
deeb551
1508140
99098de
1e18772
173b2d1
5d4e7e8
60fa61e
ac0f2ce
7b0dfac
3d2baaf
8f1c7e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; | ||
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer; | ||
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer; | ||
import software.amazon.awssdk.core.internal.async.SplittingTransformer; | ||
import software.amazon.awssdk.utils.Validate; | ||
|
||
/** | ||
|
@@ -38,8 +39,8 @@ | |
* <h2>Synchronization</h2> | ||
* <p> | ||
* All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be | ||
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be | ||
* invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread. | ||
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be invoked | ||
* concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread. | ||
* <p> | ||
* <h2>Invocation Order</h2> | ||
* <p> | ||
|
@@ -81,11 +82,10 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> { | |
/** | ||
* Initial call to enable any setup required before the response is handled. | ||
* <p> | ||
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link | ||
* software.amazon.awssdk.core.retry.RetryPolicy}. | ||
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured | ||
* {@link software.amazon.awssdk.core.retry.RetryPolicy}. | ||
* <p> | ||
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is | ||
* signaled. | ||
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is signaled. | ||
* | ||
* @return The future holding the transformed response. | ||
*/ | ||
|
@@ -106,18 +106,38 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> { | |
void onStream(SdkPublisher<ByteBuffer> publisher); | ||
|
||
/** | ||
* Called when an error is encountered while making the request or receiving the response. | ||
* Implementations should free up any resources in this method. This method may be called | ||
* multiple times during the lifecycle of a request if automatic retries are enabled. | ||
* Called when an error is encountered while making the request or receiving the response. Implementations should free up any | ||
* resources in this method. This method may be called multiple times during the lifecycle of a request if automatic retries | ||
* are enabled. | ||
* | ||
* @param error Error that occurred. | ||
*/ | ||
void exceptionOccurred(Throwable error); | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, | ||
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an | ||
* exception will be thrown. | ||
* todo - javadoc | ||
* todo - use configuration class as input instead of bufferSize | ||
* | ||
* @return | ||
*/ | ||
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) { | ||
CompletableFuture<ResultT> future = new CompletableFuture<>(); | ||
Comment on lines
+123
to
+124
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to avoid returning the concrete type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SplitAsyncResponseTransformer is just a holder for the publisher and a completable future, the class implementing AsyncResponseTransformer is SplittingTransformer.IndividualTransformer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we make it implement AsyncResponseTransformer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, I don't think so. The usage would be:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just seems a bit odd that the static factory method returns a class that doesn't implement the same interface. Can we make it implement it? We did the same for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, I'll need to think a bit more on this. Let's keep this conversation open for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a TODO to revisit the naming? It's a bit odd that this class is suffixed with AsyncResponseTransformer but doesn't implement it |
||
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer | ||
.<ResponseT, ResultT>builder() | ||
.upstreamResponseTransformer(this) | ||
.maximumBufferSize(bufferSize) | ||
.returnFuture(future) | ||
.build(); | ||
return SplitAsyncResponseTransformer.<ResponseT, ResultT>builder() | ||
.asyncResponseTransformerPublisher(transformer) | ||
.future(future) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the | ||
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will | ||
* be thrown. | ||
* | ||
* @param path Path to file to write to. | ||
* @param <ResponseT> Pojo Response type. | ||
|
@@ -129,8 +149,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa | |
} | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link | ||
* FileTransformerConfiguration}. | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified | ||
* {@link FileTransformerConfiguration}. | ||
* | ||
* @param path Path to file to write to. | ||
* @param config configuration for the transformer | ||
|
@@ -143,8 +163,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa | |
} | ||
|
||
/** | ||
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, | ||
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}. | ||
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the | ||
* need to create one manually via {@link FileTransformerConfiguration#builder()}. | ||
* | ||
* @see #toFile(Path, FileTransformerConfiguration) | ||
*/ | ||
|
@@ -155,9 +175,9 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile( | |
} | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, | ||
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an | ||
* exception will be thrown. | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the | ||
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will | ||
* be thrown. | ||
* | ||
* @param file File to write to. | ||
* @param <ResponseT> Pojo Response type. | ||
|
@@ -168,8 +188,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi | |
} | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link | ||
* FileTransformerConfiguration}. | ||
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified | ||
* {@link FileTransformerConfiguration}. | ||
* | ||
* @param file File to write to. | ||
* @param config configuration for the transformer | ||
|
@@ -182,8 +202,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi | |
} | ||
|
||
/** | ||
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, | ||
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}. | ||
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the | ||
* need to create one manually via {@link FileTransformerConfiguration#builder()}. | ||
* | ||
* @see #toFile(File, FileTransformerConfiguration) | ||
*/ | ||
|
@@ -237,16 +257,14 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo | |
} | ||
|
||
/** | ||
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an | ||
* {@link InputStream}. | ||
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}. | ||
* <p> | ||
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will | ||
* be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This | ||
* behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only | ||
* have their {@link CompletableFuture} completed after the entire response body has finished streaming. | ||
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed | ||
* once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some | ||
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture} | ||
* completed after the entire response body has finished streaming. | ||
* <p> | ||
* You are responsible for performing blocking reads from this input stream and closing the stream when you are | ||
* finished. | ||
* You are responsible for performing blocking reads from this input stream and closing the stream when you are finished. | ||
* <p> | ||
* Example usage: | ||
* <pre> | ||
|
@@ -260,7 +278,7 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo | |
* </pre> | ||
*/ | ||
static <ResponseT extends SdkResponse> | ||
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() { | ||
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() { | ||
return new InputStreamResponseTransformer<>(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.core.async; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import software.amazon.awssdk.annotations.SdkPublicApi; | ||
import software.amazon.awssdk.utils.Validate; | ||
|
||
/** | ||
* Helper class containing the result of | ||
* {@link AsyncResponseTransformer#split(long) splitting} an AsyncResponseTransformer. This class holds both the publisher of | ||
* the individual {@code AsyncResponseTransformer<ResponseT, ResponseT>} and the {@code CompletableFuture<ResulT>} which will | ||
* complete when the {@code AsyncResponseTransformer} that was split itself would complete. | ||
* | ||
* @see AsyncResponseTransformer#split(long) | ||
* @param <ResponseT> ResponseT of the original AsyncResponseTransformer that was split. | ||
* @param <ResultT> ResultT of the original AsyncResponseTransformer that was split. | ||
* | ||
*/ | ||
@SdkPublicApi | ||
public final class SplitAsyncResponseTransformer<ResponseT, ResultT> { | ||
private final SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher; | ||
private final CompletableFuture<ResultT> future; | ||
|
||
private SplitAsyncResponseTransformer(Builder<ResponseT, ResultT> builder) { | ||
this.asyncResponseTransformerPublisher = Validate.paramNotNull( | ||
builder.asyncResponseTransformerPublisher, "asyncResponseTransformerPublisher"); | ||
this.future = Validate.paramNotNull( | ||
builder.future, "future"); | ||
} | ||
|
||
/** | ||
* The individual {@link AsyncResponseTransformer} will be available through the publisher returned by this method. | ||
* @return the publisher which publishes the individual {@link AsyncResponseTransformer} | ||
*/ | ||
public SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher() { | ||
L-Applin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return this.asyncResponseTransformerPublisher; | ||
} | ||
|
||
/** | ||
* The future returned by this method will be completed when the future returned by calling the | ||
* {@link AsyncResponseTransformer#prepare()} method on the AsyncResponseTransformer which was split completes. | ||
* @return The future | ||
*/ | ||
public CompletableFuture<ResultT> preparedFuture() { | ||
return this.future; | ||
} | ||
|
||
public static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() { | ||
return new Builder<>(); | ||
} | ||
|
||
public static class Builder<ResponseT, ResultT> { | ||
private SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher; | ||
private CompletableFuture<ResultT> future; | ||
|
||
public Builder<ResponseT, ResultT> asyncResponseTransformerPublisher( | ||
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher) { | ||
this.asyncResponseTransformerPublisher = asyncResponseTransformerPublisher; | ||
return this; | ||
} | ||
|
||
public Builder<ResponseT, ResultT> future(CompletableFuture<ResultT> future) { | ||
this.future = future; | ||
return this; | ||
} | ||
|
||
public SplitAsyncResponseTransformer<ResponseT, ResultT> build() { | ||
return new SplitAsyncResponseTransformer<>(this); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.