Skip to content

Commit a34c74c

Browse files
committed
Update HTTP Async SPI
This commit makes changes to the HTTP Async SPI as well as the AsyncResponseTransformer interface exposed by SDK clients. For the HTTP SPI, the main changes are to SdkHttpResponseHandler and SdkAsyncHttpClient. - SdkHttpResponseHandler The most important change is to complete(). It has been removed as it was a potential spot where the async client could get blocked or where the implementation would be forced to block, since the handler must return the transformed response from this method. We also don't necessarily care about what the response handler does at the HTTP client layer, so there's no need to return the transformed result. Secondly, the interface now guarantees that onStream() is always called, even for responses with no content. This should simplify implementation a bit since implementers know to always expect a stream. Other changes are mostly minor, such as naming: the interface is now SdkAsyncResponseHandler to be more intuitive since it follows our current naming conventions. - SdkAsyncHttpClient prepareRequest() has been replaced with execute() that takes a single parameter object. It now also returns a CompletableFuture rather than a RunnableRequest. - AsyncResponseTransformer This continues to be almost identical to SdkAsyncHttpResponseHandler. The most important change here is in complete(). Like in SdkHttpResponseHandler, this method was problematic since it had the protential to block the caller. To address this, complete() has been replaced with a method named prepare() that returns a CompletableFuture. We guarantee that this method is always called before executing the request to allow implementers to do any setup/cleanup. This method could be called more than once in the event of retries. Another important result of prepare() returning a CompletableFuture is this gives users the ability to trigger retries by completing it exceptionally with a instance of RetryableException similar to the sync ReponseTransformer. Finally, this change decouples response transformation from request completion. With the current behavior, the complete() is only called when the entire response is received. This makes doing things like returning the Publisher from onStream() as the result of the transformer awkward.
1 parent ca9af92 commit a34c74c

File tree

58 files changed

+2197
-896
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2197
-896
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
179179
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
180180
String paramName = opModel.hasStreamingOutput() ? "asyncResponseTransformer" : "asyncResponseHandler";
181181
builder.addStatement("runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\",\n" +
182-
"() -> $L.exceptionOccurred(t))", paramName);
182+
"() -> $N.exceptionOccurred(t))", paramName);
183183
}
184184

185185
return builder.addStatement("return $T.failedFuture(t)", CompletableFutureUtils.class)

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryXmlProtocolSpec.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,14 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
154154
".withResponseHandler(responseHandler)" +
155155
".withErrorResponseHandler($N)\n" +
156156
asyncRequestBody +
157-
".withInput($L) $L)$L;",
157+
".withInput($L) $L);",
158158
ClientExecutionParams.class,
159159
requestType,
160160
pojoResponseType,
161161
marshaller,
162162
"errorResponseHandler",
163163
opModel.getInput().getVariableName(),
164-
opModel.hasStreamingOutput() ? ", asyncResponseTransformer" : "",
165-
// If it's a streaming operation we also need to notify the handler on exception
166-
opModel.hasStreamingOutput() ? ".whenComplete((r, e) -> {\n"
167-
+ " if (e != null) {\n"
168-
+ " asyncResponseTransformer.exceptionOccurred(e);\n"
169-
+ " }\n"
170-
+ "})" : "")
164+
opModel.hasStreamingOutput() ? ", asyncResponseTransformer" : "")
171165
.build();
172166
}
173167

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,7 @@ public CompletableFuture<Void> eventStreamOperation(EventStreamOperationRequest
282282
});
283283
return future;
284284
} catch (Throwable t) {
285-
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
286-
() -> asyncResponseHandler.exceptionOccurred(t));
285+
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", () -> asyncResponseHandler.exceptionOccurred(t));
287286
return CompletableFutureUtils.failedFuture(t);
288287
}
289288
}
@@ -764,7 +763,7 @@ public <ReturnT> CompletableFuture<ReturnT> streamingOutputOperation(
764763
});
765764
} catch (Throwable t) {
766765
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
767-
() -> asyncResponseTransformer.exceptionOccurred(t));
766+
() -> asyncResponseTransformer.exceptionOccurred(t));
768767
return CompletableFutureUtils.failedFuture(t);
769768
}
770769
}

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@
4646
import software.amazon.awssdk.core.http.HttpResponseHandler;
4747
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
4848
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
49-
import software.amazon.awssdk.core.internal.util.ThrowableUtils;
5049
import software.amazon.awssdk.http.AbortableInputStream;
50+
import software.amazon.awssdk.http.SdkCancellationException;
5151
import software.amazon.awssdk.http.SdkHttpFullResponse;
5252
import software.amazon.awssdk.utils.BinaryUtils;
5353
import software.amazon.eventstream.Message;
@@ -114,11 +114,6 @@ public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
114114
*/
115115
private volatile boolean isDone = false;
116116

117-
/**
118-
* Holds a reference to any exception delivered to exceptionOccurred.
119-
*/
120-
private final AtomicReference<Throwable> error = new AtomicReference<>();
121-
122117
/**
123118
* Executor to deliver events to the subscriber
124119
*/
@@ -161,6 +156,8 @@ public class EventStreamAsyncResponseTransformer<ResponseT, EventT>
161156
*/
162157
private String requestId = null;
163158

159+
private volatile CompletableFuture<Void> transformFuture;
160+
164161
@Deprecated
165162
@ReviewBeforeRelease("Remove this on full GA of 2.0.0")
166163
public EventStreamAsyncResponseTransformer(
@@ -191,7 +188,16 @@ private EventStreamAsyncResponseTransformer(
191188
}
192189

193190
@Override
194-
public void responseReceived(SdkResponse response) {
191+
public CompletableFuture<Void> prepare() {
192+
transformFuture = new CompletableFuture<>();
193+
return transformFuture;
194+
}
195+
196+
@Override
197+
public void onResponse(SdkResponse response) {
198+
// We use a void unmarshaller and unmarshall the actual response in the message
199+
// decoder when we receive the initial-response frame. TODO not clear
200+
// how we would handle REST protocol which would unmarshall the response from the HTTP headers
195201
if (response != null && response.sdkHttpResponse() != null) {
196202
this.requestId = response.sdkHttpResponse()
197203
.firstMatchingHeader(X_AMZN_REQUEST_ID_HEADER)
@@ -223,31 +229,17 @@ public void exceptionOccurred(Throwable throwable) {
223229
synchronized (this) {
224230
if (!isDone) {
225231
isDone = true;
226-
error.set(throwable);
227232
// If we have a Subscriber at this point notify it as well
228-
if (subscriberRef.get() != null) {
233+
if (subscriberRef.get() != null && shouldSurfaceErrorToEventSubscriber(throwable)) {
229234
runAndLogError(log, "Error thrown from Subscriber#onError, ignoring.",
230235
() -> subscriberRef.get().onError(throwable));
231236
}
232237
eventStreamResponseHandler.exceptionOccurred(throwable);
238+
transformFuture.completeExceptionally(throwable);
233239
}
234240
}
235241
}
236242

237-
@Override
238-
public Void complete() {
239-
if (error.get() == null) {
240-
// Add the special on complete event to signal drainEvents to complete the subscriber
241-
eventsToDeliver.add(ON_COMPLETE_EVENT);
242-
drainEventsIfNotAlready();
243-
return null;
244-
} else {
245-
// Need to propagate the failure up so the future is completed exceptionally. This should only happen
246-
// when there is a frame level exception that the upper layers don't know about.
247-
throw ThrowableUtils.failure(error.get());
248-
}
249-
}
250-
251243
/**
252244
* Called when all events have been delivered to the downstream subscriber.
253245
*/
@@ -340,6 +332,10 @@ private SdkHttpFullResponse adaptMessageToResponse(Message message, boolean isEx
340332
.build();
341333
}
342334

335+
private static boolean shouldSurfaceErrorToEventSubscriber(Throwable t) {
336+
return !(t instanceof SdkCancellationException);
337+
}
338+
343339
/**
344340
* Subscriber for the raw bytes from the stream. Feeds them to the {@link MessageDecoder} as they arrive
345341
* and will request as much as needed to fulfill any outstanding demand.
@@ -390,8 +386,10 @@ public void onError(Throwable throwable) {
390386

391387
@Override
392388
public void onComplete() {
393-
// Notified in onEventComplete method because we have more context on what we've delivered to
394-
// the event stream subscriber there.
389+
// Add the special on complete event to signal drainEvents to complete the subscriber
390+
eventsToDeliver.add(ON_COMPLETE_EVENT);
391+
drainEventsIfNotAlready();
392+
transformFuture.complete(null);
395393
}
396394
}
397395

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/RestEventStreamAsyncResponseTransformer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package software.amazon.awssdk.awscore.eventstream;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.concurrent.CompletableFuture;
20+
1921
import software.amazon.awssdk.annotations.SdkProtectedApi;
2022
import software.amazon.awssdk.core.SdkResponse;
2123
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -43,8 +45,13 @@ private RestEventStreamAsyncResponseTransformer(
4345
}
4446

4547
@Override
46-
public void responseReceived(ResponseT response) {
47-
delegate.responseReceived(response);
48+
public CompletableFuture<Void> prepare() {
49+
return delegate.prepare();
50+
}
51+
52+
@Override
53+
public void onResponse(ResponseT response) {
54+
delegate.onResponse(response);
4855
eventStreamResponseHandler.responseReceived(response);
4956
}
5057

@@ -58,11 +65,6 @@ public void exceptionOccurred(Throwable throwable) {
5865
delegate.exceptionOccurred(throwable);
5966
}
6067

61-
@Override
62-
public Void complete() {
63-
return delegate.complete();
64-
}
65-
6668
public static <ResponseT extends SdkResponse, EventT> Builder<ResponseT, EventT> builder() {
6769
return new Builder<>();
6870
}

core/aws-core/src/test/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformerTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.Map;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionException;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.Executors;
2829
import java.util.concurrent.atomic.AtomicInteger;
@@ -123,10 +124,19 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
123124
.executor(Executors.newSingleThreadExecutor())
124125
.future(new CompletableFuture<>())
125126
.build();
126-
transformer.responseReceived(null);
127+
CompletableFuture<Void> cf = transformer.prepare();
128+
transformer.onResponse(null);
127129
transformer.onStream(SdkPublisher.adapt(bytePublisher));
128130

129-
assertThatThrownBy(transformer::complete).isSameAs(exception);
131+
assertThatThrownBy(() -> {
132+
try {
133+
cf.join();
134+
} catch (CompletionException e) {
135+
if (e.getCause() instanceof SdkServiceException) {
136+
throw ((SdkServiceException) e.getCause());
137+
}
138+
}
139+
}).isSameAs(exception);
130140
}
131141

132142
private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,70 +18,92 @@
1818
import java.io.File;
1919
import java.nio.ByteBuffer;
2020
import java.nio.file.Path;
21-
import org.reactivestreams.Publisher;
22-
import org.reactivestreams.Subscription;
21+
import java.util.concurrent.CompletableFuture;
22+
2323
import software.amazon.awssdk.annotations.SdkPublicApi;
2424
import software.amazon.awssdk.core.ResponseBytes;
2525
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
2626
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
2727

2828
/**
2929
* Callback interface to handle a streaming asynchronous response.
30+
* <p>
31+
* <h2>Synchronization</h2>
32+
* <p>
33+
* All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be
34+
* synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be
35+
* invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
36+
* <p>
37+
* <h2>Invocation Order</h2>
38+
* <p>
39+
* The methods are called in the following order:
40+
* <ul>
41+
* <li>
42+
* {@link #prepare()}: This method is always called first. Implementations should use this to setup or perform any
43+
* cleanup necessary. <b>Note that this will be called upon each request attempt</b>. If the {@link CompletableFuture}
44+
* returned from the previous invocation has already been completed, the implementation should return a new instance.
45+
* </li>
46+
* <li>
47+
* {@link #onResponse}: If the response was received successfully, this method is called next.
48+
* </li>
49+
* <li>
50+
* {@link #onStream(SdkPublisher)}: Called after {@code onResponse}. This is always invoked, even if the service
51+
* operation response does not contain a body. If the response does not have a body, then the {@link SdkPublisher} will
52+
* complete the subscription without signaling any elements.
53+
* </li>
54+
* <li>
55+
* {@link #exceptionOccurred(Throwable)}: If there is an error sending the request. This method is called before {@link
56+
* org.reactivestreams.Subscriber#onError(Throwable)}.
57+
* </li>
58+
* <li>
59+
* {@link org.reactivestreams.Subscriber#onError(Throwable)}: If an error is encountered while the {@code Publisher} is
60+
* publishing to a {@link org.reactivestreams.Subscriber}.
61+
* </li>
62+
* </ul>
63+
* <p>
64+
* <h2>Retries</h2>
65+
* <p>
66+
* The transformer has the ability to trigger retries at any time by completing the {@link CompletableFuture} with an
67+
* exception that is deemed retryable by the configured {@link software.amazon.awssdk.core.retry.RetryPolicy}.
3068
*
3169
* @param <ResponseT> POJO response type.
32-
* @param <ReturnT> Type this response handler produces. I.E. the type you are transforming the response into.
70+
* @param <ResultT> Type this response handler produces. I.E. the type you are transforming the response into.
3371
*/
3472
@SdkPublicApi
35-
public interface AsyncResponseTransformer<ResponseT, ReturnT> {
36-
73+
public interface AsyncResponseTransformer<ResponseT, ResultT> {
3774
/**
38-
* Called when the initial response has been received and the POJO response has
39-
* been unmarshalled. This is guaranteed to be called before onStream.
40-
*
41-
* <p>In the event of a retryable error, this callback may be called multiple times. It
42-
* also may never be invoked if the request never succeeds.</p>
75+
* Initial call to enable any setup required before the response is handled.
76+
* <p>
77+
* Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link
78+
* software.amazon.awssdk.core.retry.RetryPolicy}.
79+
* <p>
80+
* This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is
81+
* signaled.
4382
*
44-
* @param response Unmarshalled POJO containing metadata about the streamed data.
83+
* @return The future holding the transformed response.
4584
*/
46-
void responseReceived(ResponseT response);
85+
CompletableFuture<ResultT> prepare();
4786

4887
/**
49-
* Called when events are ready to be streamed. Implementations must subscribe to the {@link Publisher} and request data via
50-
* a {@link org.reactivestreams.Subscription} as they can handle it.
88+
* Called when the unmarshalled response object is ready.
5189
*
52-
* <p>
53-
* If at any time the subscriber wishes to stop receiving data, it may call {@link Subscription#cancel()}. This
54-
* will be treated as a failure of the response and the {@link #exceptionOccurred(Throwable)} callback will be invoked.
55-
* </p>
56-
*
57-
* <p>This callback may never be called if the response has no content or if an error occurs.</p>
58-
*
59-
* <p>
60-
* In the event of a retryable error, this callback may be called multiple times with different Publishers.
61-
* If this method is called more than once, implementation must either reset any state to prepare for another
62-
* stream of data or must throw an exception indicating they cannot reset. If any exception is thrown then no
63-
* automatic retry is performed.
64-
* </p>
90+
* @param response The unmarshalled response.
6591
*/
66-
void onStream(SdkPublisher<ByteBuffer> publisher);
92+
void onResponse(ResponseT response);
6793

6894
/**
69-
* Called when an exception occurs while establishing the connection or streaming the response. Implementations
70-
* should free up any resources in this method. This method may be called multiple times during the lifecycle
71-
* of a request if automatic retries are enabled.
95+
* Called when the response stream is ready.
7296
*
73-
* @param throwable Exception that occurred.
97+
* @param publisher The publisher.
7498
*/
75-
void exceptionOccurred(Throwable throwable);
99+
void onStream(SdkPublisher<ByteBuffer> publisher);
76100

77101
/**
78-
* Called when all data has been successfully published to the {@link org.reactivestreams.Subscriber}. This will
79-
* only be called once during the lifecycle of the request. Implementors should free up any resources they have
80-
* opened and do final transformations to produce the return object.
102+
* Called when a error is encountered while making the request or receiving the response.
81103
*
82-
* @return Transformed object as a result of the streamed data.
104+
* @param error Error that occurred.
83105
*/
84-
ReturnT complete();
106+
void exceptionOccurred(Throwable error);
85107

86108
/**
87109
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,

0 commit comments

Comments
 (0)