Skip to content

Commit d15008e

Browse files
authored
Merge pull request #654 from aws/shorea-backpressure-bug
Fixing bug where more events than requested could be delivered.
2 parents 8cb9942 + 0eddef8 commit d15008e

File tree

7 files changed

+708
-218
lines changed

7 files changed

+708
-218
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.squareup.javapoet.FieldSpec;
2626
import com.squareup.javapoet.MethodSpec;
2727
import com.squareup.javapoet.TypeSpec;
28+
import java.util.concurrent.Executor;
2829
import javax.lang.model.element.Modifier;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@
3839
import software.amazon.awssdk.codegen.poet.PoetUtils;
3940
import software.amazon.awssdk.codegen.poet.StaticImport;
4041
import software.amazon.awssdk.codegen.poet.client.specs.ProtocolSpec;
42+
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
4143
import software.amazon.awssdk.core.client.handler.AsyncClientHandler;
4244
import software.amazon.awssdk.core.internal.client.config.SdkClientConfiguration;
4345
import software.amazon.awssdk.utils.CompletableFutureUtils;
@@ -60,25 +62,25 @@ public AsyncClientClass(GeneratorTaskParams dependencies) {
6062
@Override
6163
public TypeSpec poetSpec() {
6264
ClassName interfaceClass = poetExtensions.getClientClass(model.getMetadata().getAsyncInterface());
63-
Builder classBuilder = PoetUtils.createClassBuilder(className)
64-
.addAnnotation(SdkInternalApi.class)
65-
.addModifiers(Modifier.FINAL)
66-
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
67-
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
68-
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
69-
className)
70-
.build())
71-
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
72-
.addField(protocolSpec.protocolFactory(model))
73-
.addSuperinterface(interfaceClass)
74-
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
75-
interfaceClass)
76-
.addMethod(constructor())
77-
.addMethod(nameMethod())
78-
.addMethods(operations())
79-
.addMethod(closeMethod())
80-
.addMethods(protocolSpec.additionalMethods())
81-
.addMethod(protocolSpec.initProtocolFactory(model));
65+
Builder classBuilder = PoetUtils.createClassBuilder(className);
66+
classBuilder.addAnnotation(SdkInternalApi.class)
67+
.addModifiers(Modifier.FINAL)
68+
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
69+
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
70+
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
71+
className)
72+
.build())
73+
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
74+
.addField(protocolSpec.protocolFactory(model))
75+
.addSuperinterface(interfaceClass)
76+
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
77+
interfaceClass)
78+
.addMethod(constructor(classBuilder))
79+
.addMethod(nameMethod())
80+
.addMethods(operations())
81+
.addMethod(closeMethod())
82+
.addMethods(protocolSpec.additionalMethods())
83+
.addMethod(protocolSpec.initProtocolFactory(model));
8284

8385
// Kinesis doesn't support CBOR for STS yet so need another protocol factory for JSON
8486
if (model.getMetadata().isCborProtocol()) {
@@ -93,7 +95,7 @@ public TypeSpec poetSpec() {
9395
return classBuilder.build();
9496
}
9597

96-
private MethodSpec constructor() {
98+
private MethodSpec constructor(Builder classBuilder) {
9799
MethodSpec.Builder builder = MethodSpec.constructorBuilder()
98100
.addModifiers(Modifier.PROTECTED)
99101
.addParameter(SdkClientConfiguration.class, "clientConfiguration")
@@ -108,9 +110,20 @@ private MethodSpec constructor() {
108110
if (model.getMetadata().isCborProtocol()) {
109111
builder.addStatement("this.jsonProtocolFactory = init(false)");
110112
}
113+
if (hasOperationWithEventStreamOutput()) {
114+
classBuilder.addField(FieldSpec.builder(ClassName.get(Executor.class), "executor",
115+
Modifier.PRIVATE, Modifier.FINAL)
116+
.build());
117+
builder.addStatement("this.executor = clientConfiguration.option($T.FUTURE_COMPLETION_EXECUTOR)",
118+
SdkAdvancedAsyncClientOption.class);
119+
}
111120
return builder.build();
112121
}
113122

123+
private boolean hasOperationWithEventStreamOutput() {
124+
return model.getOperations().values().stream().anyMatch(OperationModel::hasEventStreamOutput);
125+
}
126+
114127
private MethodSpec nameMethod() {
115128
return MethodSpec.methodBuilder("serviceName")
116129
.addAnnotation(Override.class)

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java

Lines changed: 96 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.squareup.javapoet.WildcardTypeName;
2727
import java.util.List;
2828
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.stream.Collectors;
3031
import javax.lang.model.element.Modifier;
3132
import software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer;
@@ -42,7 +43,6 @@
4243
import software.amazon.awssdk.codegen.poet.PoetExtensions;
4344
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
4445
import software.amazon.awssdk.core.SdkResponse;
45-
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4646
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
4747
import software.amazon.awssdk.core.http.HttpResponseHandler;
4848
import software.amazon.awssdk.core.internal.protocol.json.VoidJsonUnmarshaller;
@@ -213,38 +213,105 @@ public CodeBlock asyncExecutionHandler(OperationModel opModel) {
213213
: "";
214214
CodeBlock.Builder builder = CodeBlock.builder();
215215
if (opModel.hasEventStreamOutput()) {
216-
builder.add("$T<$T, $T> asyncResponseTransformer = new $T<>(\n"
217-
+ "asyncResponseHandler, responseHandler, eventResponseHandler, errorResponseHandler, serviceName());\n",
218-
ClassName.get(AsyncResponseTransformer.class),
219-
ClassName.get(SdkResponse.class),
220-
ClassName.get(Void.class),
221-
ClassName.get(EventStreamAsyncResponseTransformer.class));
216+
ClassName eventStreamBaseClass = EventStreamUtils.create(poetExtensions, opModel).eventStreamBaseClass();
217+
ParameterizedTypeName transformerType = ParameterizedTypeName.get(
218+
ClassName.get(EventStreamAsyncResponseTransformer.class), pojoResponseType, eventStreamBaseClass);
219+
builder.addStatement("$1T<$2T> future = new $1T<>()",
220+
ClassName.get(CompletableFuture.class),
221+
ClassName.get(Void.class));
222+
builder.add("$T asyncResponseTransformer = $T.<$T, $T>builder()\n" +
223+
" .eventStreamResponseHandler(asyncResponseHandler)\n"
224+
+ " .eventResponseHandler(eventResponseHandler)\n"
225+
+ " .initialResponseHandler(responseHandler)\n"
226+
+ " .exceptionResponseHandler(errorResponseHandler)\n"
227+
+ " .future(future)\n"
228+
+ " .executor(executor)\n"
229+
+ " .serviceName(serviceName())\n"
230+
+ " .build();",
231+
transformerType,
232+
ClassName.get(EventStreamAsyncResponseTransformer.class),
233+
pojoResponseType,
234+
eventStreamBaseClass);
222235
}
223236
boolean isStreaming = opModel.hasStreamingOutput() || opModel.hasEventStreamOutput();
224237
String protocolFactory = opModel.hasEventStreamOutput() ? "jsonProtocolFactory" : "protocolFactory";
225238
String customerResponseHandler = opModel.hasEventStreamOutput() ? "asyncResponseHandler" : "asyncResponseTransformer";
226-
return builder.add("\n\nreturn clientHandler.execute(new $T<$T, $T>()\n" +
227-
".withMarshaller(new $T($L))\n" +
228-
".withResponseHandler($L)\n" +
229-
".withErrorResponseHandler(errorResponseHandler)\n" +
230-
asyncRequestBody +
231-
".withInput($L)$L)$L;",
232-
ClientExecutionParams.class,
233-
requestType,
234-
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
235-
marshaller,
236-
protocolFactory,
237-
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
238-
opModel.getInput().getVariableName(),
239-
isStreaming ? ", asyncResponseTransformer" : "",
240-
// If it's a streaming operation we also need to notify the handler on exception.
241-
isStreaming ? String.format(".whenComplete((r, e) -> {%n"
242-
+ " if (e != null) {%n"
243-
+ " %s.exceptionOccurred(e);%n"
244-
+ " }%n"
245-
+ "})", customerResponseHandler)
246-
: "")
247-
.build();
239+
builder.add("\n\n$L clientHandler.execute(new $T<$T, $T>()\n" +
240+
".withMarshaller(new $T($L))\n" +
241+
".withResponseHandler($L)\n" +
242+
".withErrorResponseHandler(errorResponseHandler)\n" +
243+
asyncRequestBody +
244+
".withInput($L)$L)$L;",
245+
// If the operation has an event stream output we use a different future so we don't return the one
246+
// from the client.
247+
opModel.hasEventStreamOutput() ? "" : "return",
248+
ClientExecutionParams.class,
249+
requestType,
250+
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
251+
marshaller,
252+
protocolFactory,
253+
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
254+
opModel.getInput().getVariableName(),
255+
isStreaming ? ", asyncResponseTransformer" : "",
256+
whenCompleteBody(opModel, customerResponseHandler));
257+
if (opModel.hasEventStreamOutput()) {
258+
builder.addStatement("return future");
259+
}
260+
return builder.build();
261+
}
262+
263+
/**
264+
* For streaming operations we need to notify the response handler or response transformer on exception so
265+
* we add a .whenComplete to the future.
266+
*
267+
* @param operationModel Op model.
268+
* @param responseHandlerName Variable name of response handler customer passed in.
269+
* @return whenComplete to append to future.
270+
*/
271+
private String whenCompleteBody(OperationModel operationModel, String responseHandlerName) {
272+
if (operationModel.hasEventStreamOutput()) {
273+
return eventStreamOutputWhenComplete(responseHandlerName);
274+
} else if (operationModel.hasStreamingOutput()) {
275+
return streamingOutputWhenComplete(responseHandlerName);
276+
} else {
277+
// Non streaming can just return the future as is
278+
return "";
279+
}
280+
}
281+
282+
/**
283+
* Need to notify the response handler/response transformer if the future is completed exceptionally.
284+
*
285+
* @param responseHandlerName Variable name of response handler customer passed in.
286+
* @return whenComplete to append to future.
287+
*/
288+
private String streamingOutputWhenComplete(String responseHandlerName) {
289+
return String.format(".whenComplete((r, e) -> {%n"
290+
+ " if (e != null) {%n"
291+
+ " %s.exceptionOccurred(e);%n"
292+
+ " }%n"
293+
+ "})", responseHandlerName);
294+
}
295+
296+
/**
297+
* For event streaming our future notification is a bit complicated. We create a different future that is not tied
298+
* to the lifecycle of the wire request. Successful completion of the future is signalled in
299+
* {@link EventStreamAsyncResponseTransformer}. Failure is notified via the normal future (the one returned by the client
300+
* handler).
301+
*
302+
* @param responseHandlerName Variable name of response handler customer passed in.
303+
* @return whenComplete to append to future.
304+
*/
305+
private String eventStreamOutputWhenComplete(String responseHandlerName) {
306+
return String.format(".whenComplete((r, e) -> {%n"
307+
+ " if (e != null) {%n"
308+
+ " try {"
309+
+ " %s.exceptionOccurred(e);%n"
310+
+ " } finally {"
311+
+ " future.completeExceptionally(e);"
312+
+ " }"
313+
+ " }%n"
314+
+ "})", responseHandlerName);
248315
}
249316

250317
private ClassName getUnmarshallerType(OperationModel opModel) {

0 commit comments

Comments
 (0)