Skip to content

Fixing bug where more events than requested could be delivered. #654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.TypeSpec;
import java.util.concurrent.Executor;
import javax.lang.model.element.Modifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +39,7 @@
import software.amazon.awssdk.codegen.poet.PoetUtils;
import software.amazon.awssdk.codegen.poet.StaticImport;
import software.amazon.awssdk.codegen.poet.client.specs.ProtocolSpec;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.handler.AsyncClientHandler;
import software.amazon.awssdk.core.internal.client.config.SdkClientConfiguration;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand All @@ -60,25 +62,25 @@ public AsyncClientClass(GeneratorTaskParams dependencies) {
@Override
public TypeSpec poetSpec() {
ClassName interfaceClass = poetExtensions.getClientClass(model.getMetadata().getAsyncInterface());
Builder classBuilder = PoetUtils.createClassBuilder(className)
.addAnnotation(SdkInternalApi.class)
.addModifiers(Modifier.FINAL)
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
className)
.build())
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
.addField(protocolSpec.protocolFactory(model))
.addSuperinterface(interfaceClass)
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
interfaceClass)
.addMethod(constructor())
.addMethod(nameMethod())
.addMethods(operations())
.addMethod(closeMethod())
.addMethods(protocolSpec.additionalMethods())
.addMethod(protocolSpec.initProtocolFactory(model));
Builder classBuilder = PoetUtils.createClassBuilder(className);
classBuilder.addAnnotation(SdkInternalApi.class)
.addModifiers(Modifier.FINAL)
.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
.addModifiers(Modifier.PRIVATE, Modifier.STATIC, Modifier.FINAL)
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
className)
.build())
.addField(AsyncClientHandler.class, "clientHandler", Modifier.PRIVATE, Modifier.FINAL)
.addField(protocolSpec.protocolFactory(model))
.addSuperinterface(interfaceClass)
.addJavadoc("Internal implementation of {@link $1T}.\n\n@see $1T#builder()",
interfaceClass)
.addMethod(constructor(classBuilder))
.addMethod(nameMethod())
.addMethods(operations())
.addMethod(closeMethod())
.addMethods(protocolSpec.additionalMethods())
.addMethod(protocolSpec.initProtocolFactory(model));

// Kinesis doesn't support CBOR for STS yet so need another protocol factory for JSON
if (model.getMetadata().isCborProtocol()) {
Expand All @@ -93,7 +95,7 @@ public TypeSpec poetSpec() {
return classBuilder.build();
}

private MethodSpec constructor() {
private MethodSpec constructor(Builder classBuilder) {
MethodSpec.Builder builder = MethodSpec.constructorBuilder()
.addModifiers(Modifier.PROTECTED)
.addParameter(SdkClientConfiguration.class, "clientConfiguration")
Expand All @@ -108,9 +110,20 @@ private MethodSpec constructor() {
if (model.getMetadata().isCborProtocol()) {
builder.addStatement("this.jsonProtocolFactory = init(false)");
}
if (hasOperationWithEventStreamOutput()) {
classBuilder.addField(FieldSpec.builder(ClassName.get(Executor.class), "executor",
Modifier.PRIVATE, Modifier.FINAL)
.build());
builder.addStatement("this.executor = clientConfiguration.option($T.FUTURE_COMPLETION_EXECUTOR)",
SdkAdvancedAsyncClientOption.class);
}
return builder.build();
}

private boolean hasOperationWithEventStreamOutput() {
return model.getOperations().values().stream().anyMatch(OperationModel::hasEventStreamOutput);
}

private MethodSpec nameMethod() {
return MethodSpec.methodBuilder("serviceName")
.addAnnotation(Override.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.squareup.javapoet.WildcardTypeName;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.lang.model.element.Modifier;
import software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer;
Expand All @@ -42,7 +43,6 @@
import software.amazon.awssdk.codegen.poet.PoetExtensions;
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.internal.protocol.json.VoidJsonUnmarshaller;
Expand Down Expand Up @@ -213,38 +213,105 @@ public CodeBlock asyncExecutionHandler(OperationModel opModel) {
: "";
CodeBlock.Builder builder = CodeBlock.builder();
if (opModel.hasEventStreamOutput()) {
builder.add("$T<$T, $T> asyncResponseTransformer = new $T<>(\n"
+ "asyncResponseHandler, responseHandler, eventResponseHandler, errorResponseHandler, serviceName());\n",
ClassName.get(AsyncResponseTransformer.class),
ClassName.get(SdkResponse.class),
ClassName.get(Void.class),
ClassName.get(EventStreamAsyncResponseTransformer.class));
ClassName eventStreamBaseClass = EventStreamUtils.create(poetExtensions, opModel).eventStreamBaseClass();
ParameterizedTypeName transformerType = ParameterizedTypeName.get(
ClassName.get(EventStreamAsyncResponseTransformer.class), pojoResponseType, eventStreamBaseClass);
builder.addStatement("$1T<$2T> future = new $1T<>()",
ClassName.get(CompletableFuture.class),
ClassName.get(Void.class));
builder.add("$T asyncResponseTransformer = $T.<$T, $T>builder()\n" +
" .eventStreamResponseHandler(asyncResponseHandler)\n"
+ " .eventResponseHandler(eventResponseHandler)\n"
+ " .initialResponseHandler(responseHandler)\n"
+ " .exceptionResponseHandler(errorResponseHandler)\n"
+ " .future(future)\n"
+ " .executor(executor)\n"
+ " .serviceName(serviceName())\n"
+ " .build();",
transformerType,
ClassName.get(EventStreamAsyncResponseTransformer.class),
pojoResponseType,
eventStreamBaseClass);
}
boolean isStreaming = opModel.hasStreamingOutput() || opModel.hasEventStreamOutput();
String protocolFactory = opModel.hasEventStreamOutput() ? "jsonProtocolFactory" : "protocolFactory";
String customerResponseHandler = opModel.hasEventStreamOutput() ? "asyncResponseHandler" : "asyncResponseTransformer";
return builder.add("\n\nreturn clientHandler.execute(new $T<$T, $T>()\n" +
".withMarshaller(new $T($L))\n" +
".withResponseHandler($L)\n" +
".withErrorResponseHandler(errorResponseHandler)\n" +
asyncRequestBody +
".withInput($L)$L)$L;",
ClientExecutionParams.class,
requestType,
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
marshaller,
protocolFactory,
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
opModel.getInput().getVariableName(),
isStreaming ? ", asyncResponseTransformer" : "",
// If it's a streaming operation we also need to notify the handler on exception.
isStreaming ? String.format(".whenComplete((r, e) -> {%n"
+ " if (e != null) {%n"
+ " %s.exceptionOccurred(e);%n"
+ " }%n"
+ "})", customerResponseHandler)
: "")
.build();
builder.add("\n\n$L clientHandler.execute(new $T<$T, $T>()\n" +
".withMarshaller(new $T($L))\n" +
".withResponseHandler($L)\n" +
".withErrorResponseHandler(errorResponseHandler)\n" +
asyncRequestBody +
".withInput($L)$L)$L;",
// If the operation has an event stream output we use a different future so we don't return the one
// from the client.
opModel.hasEventStreamOutput() ? "" : "return",
ClientExecutionParams.class,
requestType,
opModel.hasEventStreamOutput() ? SdkResponse.class : pojoResponseType,
marshaller,
protocolFactory,
opModel.hasEventStreamOutput() ? "voidResponseHandler" : "responseHandler",
opModel.getInput().getVariableName(),
isStreaming ? ", asyncResponseTransformer" : "",
whenCompleteBody(opModel, customerResponseHandler));
if (opModel.hasEventStreamOutput()) {
builder.addStatement("return future");
}
return builder.build();
}

/**
* For streaming operations we need to notify the response handler or response transformer on exception so
* we add a .whenComplete to the future.
*
* @param operationModel Op model.
* @param responseHandlerName Variable name of response handler customer passed in.
* @return whenComplete to append to future.
*/
private String whenCompleteBody(OperationModel operationModel, String responseHandlerName) {
if (operationModel.hasEventStreamOutput()) {
return eventStreamOutputWhenComplete(responseHandlerName);
} else if (operationModel.hasStreamingOutput()) {
return streamingOutputWhenComplete(responseHandlerName);
} else {
// Non streaming can just return the future as is
return "";
}
}

/**
* Need to notify the response handler/response transformer if the future is completed exceptionally.
*
* @param responseHandlerName Variable name of response handler customer passed in.
* @return whenComplete to append to future.
*/
private String streamingOutputWhenComplete(String responseHandlerName) {
return String.format(".whenComplete((r, e) -> {%n"
+ " if (e != null) {%n"
+ " %s.exceptionOccurred(e);%n"
+ " }%n"
+ "})", responseHandlerName);
}

/**
* For event streaming our future notification is a bit complicated. We create a different future that is not tied
* to the lifecycle of the wire request. Successful completion of the future is signalled in
* {@link EventStreamAsyncResponseTransformer}. Failure is notified via the normal future (the one returned by the client
* handler).
*
* @param responseHandlerName Variable name of response handler customer passed in.
* @return whenComplete to append to future.
*/
private String eventStreamOutputWhenComplete(String responseHandlerName) {
return String.format(".whenComplete((r, e) -> {%n"
+ " if (e != null) {%n"
+ " try {"
+ " %s.exceptionOccurred(e);%n"
+ " } finally {"
+ " future.completeExceptionally(e);"
+ " }"
+ " }%n"
+ "})", responseHandlerName);
}

private ClassName getUnmarshallerType(OperationModel opModel) {
Expand Down
Loading