Skip to content

Commit 8c18e39

Browse files
authored
Splitting transformer (#4826)
* working - no buffering * create DelegatingBufferingSubscriber and use it in SplittingTransformer * split now returns SplitAsyncResponseTransformer * clean up, doc, more tests * clean up * added todo for buffer size * DelegatingBufferingSubscriber Internal API and some clean up * fix checkstyle * fix checkstyle * added demand management in SplittingTransformer * clean-up * fix checkstyle * TCK tests * TCK tests - optional maxElement for SplittingTransformer - white box verification tests for DelegatingBufferingSubscriber and IndividualPartSubscriber - make DelegatingBufferingSubscriber.onNext iterative * checkstyle * Some PR comment - SplittingTransformer builder - DelegatingBufferingSubscriber protected API - remove spec comments - refactor emit method * checkstyle * removed maxElements, fix checkstyle, rename bufferSize in builder to maximumBufferSize * Use FlatteningSubscriber as base for DelegatingBufferingSubscriber. Move common logic for FlatteningSubscriber and DelegatingBufferingSubscriber to new class AbstractFlatteningSubscriber * japicmp fix, small javadoc
1 parent 7100df4 commit 8c18e39

File tree

14 files changed

+2022
-247
lines changed

14 files changed

+2022
-247
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
3131
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
3232
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
33+
import software.amazon.awssdk.core.internal.async.SplittingTransformer;
3334
import software.amazon.awssdk.utils.Validate;
3435

3536
/**
@@ -38,8 +39,8 @@
3839
* <h2>Synchronization</h2>
3940
* <p>
4041
* All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be
41-
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be
42-
* invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
42+
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be invoked
43+
* concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
4344
* <p>
4445
* <h2>Invocation Order</h2>
4546
* <p>
@@ -81,11 +82,10 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
8182
/**
8283
* Initial call to enable any setup required before the response is handled.
8384
* <p>
84-
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link
85-
* software.amazon.awssdk.core.retry.RetryPolicy}.
85+
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured
86+
* {@link software.amazon.awssdk.core.retry.RetryPolicy}.
8687
* <p>
87-
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is
88-
* signaled.
88+
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is signaled.
8989
*
9090
* @return The future holding the transformed response.
9191
*/
@@ -106,18 +106,38 @@ public interface AsyncResponseTransformer<ResponseT, ResultT> {
106106
void onStream(SdkPublisher<ByteBuffer> publisher);
107107

108108
/**
109-
* Called when an error is encountered while making the request or receiving the response.
110-
* Implementations should free up any resources in this method. This method may be called
111-
* multiple times during the lifecycle of a request if automatic retries are enabled.
109+
* Called when an error is encountered while making the request or receiving the response. Implementations should free up any
110+
* resources in this method. This method may be called multiple times during the lifecycle of a request if automatic retries
111+
* are enabled.
112112
*
113113
* @param error Error that occurred.
114114
*/
115115
void exceptionOccurred(Throwable error);
116116

117117
/**
118-
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
119-
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
120-
* exception will be thrown.
118+
* todo - javadoc
119+
* todo - use configuration class as input instead of bufferSize
120+
*
121+
* @return
122+
*/
123+
default SplitAsyncResponseTransformer<ResponseT, ResultT> split(long bufferSize) {
124+
CompletableFuture<ResultT> future = new CompletableFuture<>();
125+
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer
126+
.<ResponseT, ResultT>builder()
127+
.upstreamResponseTransformer(this)
128+
.maximumBufferSize(bufferSize)
129+
.returnFuture(future)
130+
.build();
131+
return SplitAsyncResponseTransformer.<ResponseT, ResultT>builder()
132+
.asyncResponseTransformerPublisher(transformer)
133+
.future(future)
134+
.build();
135+
}
136+
137+
/**
138+
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the
139+
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will
140+
* be thrown.
121141
*
122142
* @param path Path to file to write to.
123143
* @param <ResponseT> Pojo Response type.
@@ -129,8 +149,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
129149
}
130150

131151
/**
132-
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
133-
* FileTransformerConfiguration}.
152+
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified
153+
* {@link FileTransformerConfiguration}.
134154
*
135155
* @param path Path to file to write to.
136156
* @param config configuration for the transformer
@@ -143,8 +163,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
143163
}
144164

145165
/**
146-
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
147-
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
166+
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the
167+
* need to create one manually via {@link FileTransformerConfiguration#builder()}.
148168
*
149169
* @see #toFile(Path, FileTransformerConfiguration)
150170
*/
@@ -155,9 +175,9 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(
155175
}
156176

157177
/**
158-
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
159-
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
160-
* exception will be thrown.
178+
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, the
179+
* SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an exception will
180+
* be thrown.
161181
*
162182
* @param file File to write to.
163183
* @param <ResponseT> Pojo Response type.
@@ -168,8 +188,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi
168188
}
169189

170190
/**
171-
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
172-
* FileTransformerConfiguration}.
191+
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified
192+
* {@link FileTransformerConfiguration}.
173193
*
174194
* @param file File to write to.
175195
* @param config configuration for the transformer
@@ -182,8 +202,8 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File fi
182202
}
183203

184204
/**
185-
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
186-
* avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
205+
* This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, avoiding the
206+
* need to create one manually via {@link FileTransformerConfiguration#builder()}.
187207
*
188208
* @see #toFile(File, FileTransformerConfiguration)
189209
*/
@@ -237,16 +257,14 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
237257
}
238258

239259
/**
240-
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an
241-
* {@link InputStream}.
260+
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an {@link InputStream}.
242261
* <p>
243-
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will
244-
* be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This
245-
* behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only
246-
* have their {@link CompletableFuture} completed after the entire response body has finished streaming.
262+
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
263+
* once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some
264+
* other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
265+
* completed after the entire response body has finished streaming.
247266
* <p>
248-
* You are responsible for performing blocking reads from this input stream and closing the stream when you are
249-
* finished.
267+
* You are responsible for performing blocking reads from this input stream and closing the stream when you are finished.
250268
* <p>
251269
* Example usage:
252270
* <pre>
@@ -260,7 +278,7 @@ static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, Respo
260278
* </pre>
261279
*/
262280
static <ResponseT extends SdkResponse>
263-
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
281+
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
264282
return new InputStreamResponseTransformer<>();
265283
}
266284
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkPublicApi;
20+
import software.amazon.awssdk.utils.Validate;
21+
22+
/**
23+
* Helper class containing the result of
24+
* {@link AsyncResponseTransformer#split(long) splitting} an AsyncResponseTransformer. This class holds both the publisher of
25+
* the individual {@code AsyncResponseTransformer<ResponseT, ResponseT>} and the {@code CompletableFuture<ResulT>} which will
26+
* complete when the {@code AsyncResponseTransformer} that was split itself would complete.
27+
*
28+
* @see AsyncResponseTransformer#split(long)
29+
* @param <ResponseT> ResponseT of the original AsyncResponseTransformer that was split.
30+
* @param <ResultT> ResultT of the original AsyncResponseTransformer that was split.
31+
*
32+
*/
33+
@SdkPublicApi
34+
public final class SplitAsyncResponseTransformer<ResponseT, ResultT> {
35+
private final SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher;
36+
private final CompletableFuture<ResultT> future;
37+
38+
private SplitAsyncResponseTransformer(Builder<ResponseT, ResultT> builder) {
39+
this.asyncResponseTransformerPublisher = Validate.paramNotNull(
40+
builder.asyncResponseTransformerPublisher, "asyncResponseTransformerPublisher");
41+
this.future = Validate.paramNotNull(
42+
builder.future, "future");
43+
}
44+
45+
/**
46+
* The individual {@link AsyncResponseTransformer} will be available through the publisher returned by this method.
47+
* @return the publisher which publishes the individual {@link AsyncResponseTransformer}
48+
*/
49+
public SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher() {
50+
return this.asyncResponseTransformerPublisher;
51+
}
52+
53+
/**
54+
* The future returned by this method will be completed when the future returned by calling the
55+
* {@link AsyncResponseTransformer#prepare()} method on the AsyncResponseTransformer which was split completes.
56+
* @return The future
57+
*/
58+
public CompletableFuture<ResultT> preparedFuture() {
59+
return this.future;
60+
}
61+
62+
public static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
63+
return new Builder<>();
64+
}
65+
66+
public static class Builder<ResponseT, ResultT> {
67+
private SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher;
68+
private CompletableFuture<ResultT> future;
69+
70+
public Builder<ResponseT, ResultT> asyncResponseTransformerPublisher(
71+
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> asyncResponseTransformerPublisher) {
72+
this.asyncResponseTransformerPublisher = asyncResponseTransformerPublisher;
73+
return this;
74+
}
75+
76+
public Builder<ResponseT, ResultT> future(CompletableFuture<ResultT> future) {
77+
this.future = future;
78+
return this;
79+
}
80+
81+
public SplitAsyncResponseTransformer<ResponseT, ResultT> build() {
82+
return new SplitAsyncResponseTransformer<>(this);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)