Skip to content

Commit 25b5331

Browse files
committed
Fixing bug where more events than requested could be delivered.
1 parent 34a6884 commit 25b5331

File tree

6 files changed

+530
-108
lines changed

6 files changed

+530
-108
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.squareup.javapoet.FieldSpec;
2626
import com.squareup.javapoet.MethodSpec;
2727
import com.squareup.javapoet.TypeSpec;
28+
import java.util.concurrent.Executor;
2829
import javax.lang.model.element.Modifier;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@
3839
import software.amazon.awssdk.codegen.poet.PoetUtils;
3940
import software.amazon.awssdk.codegen.poet.StaticImport;
4041
import software.amazon.awssdk.codegen.poet.client.specs.ProtocolSpec;
42+
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
4143
import software.amazon.awssdk.core.client.handler.AsyncClientHandler;
4244
import software.amazon.awssdk.core.internal.client.config.SdkClientConfiguration;
4345
import software.amazon.awssdk.utils.CompletableFutureUtils;
@@ -60,25 +62,25 @@ public AsyncClientClass(GeneratorTaskParams dependencies) {
6062
@Override
6163
public TypeSpec poetSpec() {
6264
ClassName interfaceClass = poetExtensions.getClientClass(model.getMetadata().getAsyncInterface());
63-
Builder classBuilder = PoetUtils.createClassBuilder(className)
64-
.addAnnotation(SdkInternalApi.class)
65-
.addModifiers(Modifier.FINAL)
66-
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
67-
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
68-
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
69-
className)
70-
.build())
71-
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
72-
.addField(protocolSpec.protocolFactory(model))
73-
.addSuperinterface(interfaceClass)
74-
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
75-
interfaceClass)
76-
.addMethod(constructor())
77-
.addMethod(nameMethod())
78-
.addMethods(operations())
79-
.addMethod(closeMethod())
80-
.addMethods(protocolSpec.additionalMethods())
81-
.addMethod(protocolSpec.initProtocolFactory(model));
65+
Builder classBuilder = PoetUtils.createClassBuilder(className);
66+
classBuilder.addAnnotation(SdkInternalApi.class)
67+
.addModifiers(Modifier.FINAL)
68+
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
69+
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
70+
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
71+
className)
72+
.build())
73+
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
74+
.addField(protocolSpec.protocolFactory(model))
75+
.addSuperinterface(interfaceClass)
76+
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
77+
interfaceClass)
78+
.addMethod(constructor(classBuilder))
79+
.addMethod(nameMethod())
80+
.addMethods(operations())
81+
.addMethod(closeMethod())
82+
.addMethods(protocolSpec.additionalMethods())
83+
.addMethod(protocolSpec.initProtocolFactory(model));
8284

8385
// Kinesis doesn't support CBOR for STS yet so need another protocol factory for JSON
8486
if (model.getMetadata().isCborProtocol()) {
@@ -93,7 +95,7 @@ public TypeSpec poetSpec() {
9395
return classBuilder.build();
9496
}
9597

96-
private MethodSpec constructor() {
98+
private MethodSpec constructor(Builder classBuilder) {
9799
MethodSpec.Builder builder = MethodSpec.constructorBuilder()
98100
.addModifiers(Modifier.PROTECTED)
99101
.addParameter(SdkClientConfiguration.class, "clientConfiguration")
@@ -108,9 +110,20 @@ private MethodSpec constructor() {
108110
if (model.getMetadata().isCborProtocol()) {
109111
builder.addStatement("this.jsonProtocolFactory = init(false)");
110112
}
113+
if (hasOperationWithEventStreamOutput()) {
114+
classBuilder.addField(FieldSpec.builder(ClassName.get(Executor.class), "executor",
115+
Modifier.PRIVATE, Modifier.FINAL)
116+
.build());
117+
builder.addStatement("this.executor = clientConfiguration.option($T.FUTURE_COMPLETION_EXECUTOR)",
118+
SdkAdvancedAsyncClientOption.class);
119+
}
111120
return builder.build();
112121
}
113122

123+
private boolean hasOperationWithEventStreamOutput() {
124+
return model.getOperations().values().stream().anyMatch(OperationModel::hasEventStreamOutput);
125+
}
126+
114127
private MethodSpec nameMethod() {
115128
return MethodSpec.methodBuilder("serviceName")
116129
.addAnnotation(Override.class)

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.squareup.javapoet.WildcardTypeName;
2727
import java.util.List;
2828
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.stream.Collectors;
3031
import javax.lang.model.element.Modifier;
3132

@@ -228,38 +229,59 @@ public CodeBlock asyncExecutionHandler(OperationModel opModel) {
228229
: "";
229230
CodeBlock.Builder builder = CodeBlock.builder();
230231
if (opModel.hasEventStreamOutput()) {
231-
builder.add("$T<$T, $T> asyncResponseTransformer = new $T<>(\n"
232-
+ "asyncResponseHandler, responseHandler, eventResponseHandler, exceptionHandler);\n",
233-
ClassName.get(AsyncResponseTransformer.class),
234-
ClassName.get(SdkResponse.class),
235-
ClassName.get(Void.class),
236-
ClassName.get(EventStreamAsyncResponseTransformer.class));
232+
ClassName eventStreamBaseClass = EventStreamUtils.create(poetExtensions, opModel).eventStreamBaseClass();
233+
ParameterizedTypeName transformerType = ParameterizedTypeName.get(ClassName.get(EventStreamAsyncResponseTransformer.class),
234+
235+
pojoResponseType, eventStreamBaseClass);
236+
builder.addStatement("$1T<$2T> future = new $1T<>()",
237+
ClassName.get(CompletableFuture.class),
238+
ClassName.get(Void.class));
239+
builder.add("$T asyncResponseTransformer = $T.<$T, $T>builder()\n" +
240+
" .eventStreamResponseTransformer(asyncResponseHandler)\n"
241+
+ " .eventUnmarshaller(eventResponseHandler)\n"
242+
+ " .initialResponseUnmarshaller(responseHandler)\n"
243+
+ " .exceptionUnmarshaller(exceptionHandler)\n"
244+
+ " .future(future)\n"
245+
+ " .executor(executor)\n"
246+
+ " .build();",
247+
transformerType,
248+
ClassName.get(EventStreamAsyncResponseTransformer.class),
249+
pojoResponseType,
250+
eventStreamBaseClass);
237251
}
238252
boolean isStreaming = opModel.hasStreamingOutput() || opModel.hasEventStreamOutput();
239253
String protocolFactory = opModel.hasEventStreamOutput() ? "jsonProtocolFactory" : "protocolFactory";
240254
String customerResponseHandler = opModel.hasEventStreamOutput() ? "asyncResponseHandler" : "asyncResponseTransformer";
241-
return builder.add("\n\nreturn clientHandler.execute(new $T<$T, $T>()\n" +
242-
".withMarshaller(new $T($L))\n" +
243-
".withResponseHandler($L)\n" +
244-
".withErrorResponseHandler(errorResponseHandler)\n" +
245-
asyncRequestBody +
246-
".withInput($L)$L)$L;",
247-
ClientExecutionParams.class,
248-
requestType,
249-
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
250-
marshaller,
251-
protocolFactory,
252-
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
253-
opModel.getInput().getVariableName(),
254-
isStreaming ? ", asyncResponseTransformer" : "",
255-
// If it's a streaming operation we also need to notify the handler on exception.
256-
isStreaming ? String.format(".whenComplete((r, e) -> {%n"
257-
+ " if (e != null) {%n"
258-
+ " %s.exceptionOccurred(e);%n"
259-
+ " }%n"
260-
+ "})", customerResponseHandler)
261-
: "")
262-
.build();
255+
builder.add("\n\n$L clientHandler.execute(new $T<$T, $T>()\n" +
256+
".withMarshaller(new $T($L))\n" +
257+
".withResponseHandler($L)\n" +
258+
".withErrorResponseHandler(errorResponseHandler)\n" +
259+
asyncRequestBody +
260+
".withInput($L)$L)$L;",
261+
// If the operation has an event stream output we use a different future so we don't return the one
262+
// from the client.
263+
opModel.hasEventStreamOutput() ? "" : "return",
264+
ClientExecutionParams.class,
265+
requestType,
266+
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
267+
marshaller,
268+
protocolFactory,
269+
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
270+
opModel.getInput().getVariableName(),
271+
isStreaming ? ", asyncResponseTransformer" : "",
272+
// If it's a streaming operation we also need to notify the handler on exception.
273+
isStreaming ? String.format(".whenComplete((r, e) -> {%n"
274+
+ " if (e != null) {%n"
275+
+ " %s.exceptionOccurred(e);%n"
276+
+ " %s"
277+
+ " }%n"
278+
+ "})", customerResponseHandler,
279+
opModel.hasEventStreamOutput() ? "future.completeExceptionally(e);" : "")
280+
: "");
281+
if (opModel.hasEventStreamOutput()) {
282+
builder.addStatement("return future");
283+
}
284+
return builder.build();
263285
}
264286

265287
private ClassName getUnmarshallerType(OperationModel opModel) {

0 commit comments

Comments
 (0)