Skip to content

Support S3 SelectObjectContent #2943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-83e16fc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Add support for `SelectObjectContent`."
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,23 @@
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.ParameterizedTypeName;
import com.squareup.javapoet.TypeName;
import com.squareup.javapoet.WildcardTypeName;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer;
import software.amazon.awssdk.awscore.eventstream.EventStreamTaggedUnionPojoSupplier;
import software.amazon.awssdk.awscore.eventstream.RestEventStreamAsyncResponseTransformer;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.codegen.model.config.customization.S3ArnableFieldConfig;
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
import software.amazon.awssdk.codegen.model.intermediate.ShapeModel;
import software.amazon.awssdk.codegen.poet.PoetExtensions;
import software.amazon.awssdk.codegen.poet.client.traits.HttpChecksumRequiredTrait;
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
import software.amazon.awssdk.codegen.poet.model.EventStreamSpecHelper;
import software.amazon.awssdk.core.SdkPojoBuilder;
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.protocols.xml.AwsXmlProtocolFactory;
Expand Down Expand Up @@ -60,21 +69,26 @@ protected Class<?> protocolFactoryClass() {
@Override
public CodeBlock responseHandler(IntermediateModel model,
OperationModel opModel) {

if (opModel.hasStreamingOutput()) {
return streamingResponseHandler(opModel);
}

ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());

if (opModel.hasEventStreamOutput()) {
return CodeBlock.builder()
.add(eventStreamResponseHandlers(opModel, responseType))
.build();
}

TypeName handlerType = ParameterizedTypeName.get(
ClassName.get(HttpResponseHandler.class),
ParameterizedTypeName.get(ClassName.get(software.amazon.awssdk.core.Response.class), responseType));

return CodeBlock.builder()
.addStatement("\n\n$T responseHandler = protocolFactory.createCombinedResponseHandler($T::builder, "
+ "new $T().withHasStreamingSuccessResponse($L))",
handlerType, responseType, XmlOperationMetadata.class, opModel.hasStreamingOutput())
handlerType, responseType, XmlOperationMetadata.class, opModel.hasStreamingOutput())
.build();
}

Expand Down Expand Up @@ -159,35 +173,68 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
ClassName pojoResponseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());
ClassName requestType = poetExtensions.getModelClass(opModel.getInput().getVariableType());
ClassName marshaller = poetExtensions.getRequestTransformClass(opModel.getInputShape().getShapeName() + "Marshaller");
String eventStreamTransformFutureName = "eventStreamTransformFuture";

CodeBlock.Builder builder = CodeBlock.builder();

if (opModel.hasEventStreamOutput()) {
builder.add(eventStreamResponseTransformers(opModel, eventStreamTransformFutureName));
}

TypeName executeFutureValueType = executeFutureValueType(opModel, poetExtensions);
CodeBlock.Builder builder =
CodeBlock.builder()
.add("\n\n$T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n",
CompletableFuture.class, executeFutureValueType,
ClientExecutionParams.class, requestType, pojoResponseType)
.add(".withOperationName(\"$N\")\n", opModel.getOperationName())
.add(".withMarshaller($L)\n", asyncMarshaller(intermediateModel, opModel, marshaller, "protocolFactory"))
.add(".withCombinedResponseHandler(responseHandler)\n")
.add(hostPrefixExpression(opModel))
.add(".withMetricCollector(apiCallMetricCollector)\n")
.add(asyncRequestBody(opModel))
.add(HttpChecksumRequiredTrait.putHttpChecksumAttribute(opModel));
String executionResponseTransformerName = "asyncResponseTransformer";

if (opModel.hasEventStreamOutput()) {
executionResponseTransformerName = "restAsyncResponseTransformer";
}

builder.add("\n\n$T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n",
CompletableFuture.class, executeFutureValueType,
ClientExecutionParams.class, requestType, pojoResponseType)
.add(".withOperationName(\"$N\")\n", opModel.getOperationName())
.add(".withMarshaller($L)\n", asyncMarshaller(intermediateModel, opModel, marshaller, "protocolFactory"));

if (opModel.hasEventStreamOutput()) {
builder.add(".withResponseHandler(responseHandler)");
} else {
builder.add(".withCombinedResponseHandler(responseHandler)");
}

builder.add(hostPrefixExpression(opModel))
.add(".withMetricCollector(apiCallMetricCollector)\n")
.add(asyncRequestBody(opModel))
.add(HttpChecksumRequiredTrait.putHttpChecksumAttribute(opModel));

s3ArnableFields(opModel, model).ifPresent(builder::add);
builder.add(".withInput($L) $L);", opModel.getInput().getVariableName(), opModel.hasStreamingOutput() ?
", asyncResponseTransformer" : "");

builder.add(".withInput($L)", opModel.getInput().getVariableName());
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
builder.add(", $N", executionResponseTransformerName);
}
builder.addStatement(")");

String whenCompleteFutureName = "whenCompleteFuture";
builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
executeFutureValueType), whenCompleteFutureName);
if (opModel.hasStreamingOutput()) {

if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName,
streamingOutputWhenComplete("asyncResponseTransformer"));
whenCompleteBlock(opModel, "asyncResponseHandler",
eventStreamTransformFutureName));
} else {
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, publishMetricsWhenComplete());
}
builder.addStatement("return $T.forwardExceptionTo($N, executeFuture)", CompletableFutureUtils.class,

builder.addStatement("$T.forwardExceptionTo($N, executeFuture)", CompletableFutureUtils.class,
whenCompleteFutureName);

if (opModel.hasEventStreamOutput()) {
builder.addStatement("return $T.forwardExceptionTo($N, executeFuture)", CompletableFutureUtils.class,
eventStreamTransformFutureName);
} else {
builder.addStatement("return $N", whenCompleteFutureName);
}

return builder.build();
}

Expand All @@ -198,4 +245,122 @@ private String asyncRequestBody(OperationModel opModel) {
private CodeBlock asyncStreamingExecutionHandler(IntermediateModel intermediateModel, OperationModel opModel) {
return super.asyncExecutionHandler(intermediateModel, opModel);
}

private CodeBlock eventStreamResponseHandlers(OperationModel opModel, TypeName pojoResponseType) {
CodeBlock streamResponseOpMd = CodeBlock.builder()
.add("$T.builder()", XmlOperationMetadata.class)
.add(".hasStreamingSuccessResponse(true)")
.add(".build()")
.build();


CodeBlock.Builder builder = CodeBlock.builder();

// Response handler for handling the initial response from the operation. Note, this does not handle the event stream
// messages, that is the job of "eventResponseHandler" below
builder.addStatement("$T<$T> responseHandler = protocolFactory.createResponseHandler($T::builder, $L)",
HttpResponseHandler.class,
pojoResponseType,
pojoResponseType,
streamResponseOpMd);

// Response handler responsible for errors for the API call itself, as well as errors sent over the event stream
builder.addStatement("$T errorResponseHandler = protocolFactory"
+ ".createErrorResponseHandler()", ParameterizedTypeName.get(HttpResponseHandler.class,
AwsServiceException.class));


ShapeModel eventStreamShape = EventStreamUtils.getEventStreamInResponse(opModel.getOutputShape());
ClassName eventStream = poetExtensions.getModelClassFromShape(eventStreamShape);
EventStreamSpecHelper eventStreamSpecHelper = new EventStreamSpecHelper(eventStreamShape, intermediateModel);

CodeBlock.Builder supplierBuilder = CodeBlock.builder()
.add("$T.builder()", EventStreamTaggedUnionPojoSupplier.class);
EventStreamUtils.getEvents(eventStreamShape).forEach(m -> {
String builderName = eventStreamSpecHelper.eventBuilderMethodName(m);
supplierBuilder.add(".putSdkPojoSupplier($S, $T::$N)", m.getName(), eventStream, builderName);
});
supplierBuilder.add(".defaultSdkPojoSupplier(() -> new $T($T.UNKNOWN))", SdkPojoBuilder.class, eventStream);
CodeBlock supplierCodeBlock = supplierBuilder.add(".build()").build();

CodeBlock nonStreamingOpMd = CodeBlock.builder()
.add("$T.builder()", XmlOperationMetadata.class)
.add(".hasStreamingSuccessResponse(false)")
.add(".build()")
.build();

// The response handler responsible for unmarshalling each event
builder.addStatement("$T eventResponseHandler = protocolFactory.createResponseHandler($L, $L)",
ParameterizedTypeName.get(ClassName.get(HttpResponseHandler.class),
WildcardTypeName.subtypeOf(eventStream)),
supplierCodeBlock,
nonStreamingOpMd);


return builder.build();
}

private CodeBlock eventStreamResponseTransformers(OperationModel opModel, String eventTransformerFutureName) {
ShapeModel shapeModel = EventStreamUtils.getEventStreamInResponse(opModel.getOutputShape());
ClassName pojoResponseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());
ClassName eventStreamBaseClass = poetExtensions.getModelClassFromShape(shapeModel);

CodeBlock.Builder builder = CodeBlock.builder();

ParameterizedTypeName transformerType = ParameterizedTypeName.get(
ClassName.get(EventStreamAsyncResponseTransformer.class),
pojoResponseType,
eventStreamBaseClass);

builder.addStatement("$1T<$2T> $3N = new $1T<>()", ClassName.get(CompletableFuture.class),
ClassName.get(Void.class), eventTransformerFutureName)
.add("$T asyncResponseTransformer = $T.<$T, $T>builder()\n",
transformerType, ClassName.get(EventStreamAsyncResponseTransformer.class), pojoResponseType,
eventStreamBaseClass)
.add(".eventStreamResponseHandler(asyncResponseHandler)\n")
.add(".eventResponseHandler(eventResponseHandler)\n")
.add(".initialResponseHandler(responseHandler)\n")
.add(".exceptionResponseHandler(errorResponseHandler)\n")
.add(".future($N)\n", eventTransformerFutureName)
.add(".executor(executor)\n")
.add(".serviceName(serviceName())\n")
.addStatement(".build()");

ParameterizedTypeName restTransformType =
ParameterizedTypeName.get(ClassName.get(RestEventStreamAsyncResponseTransformer.class), pojoResponseType,
eventStreamBaseClass);

// Wrap the event transformer with this so that the caller's response handler's onResponse() method is invoked. See
// docs for RestEventStreamAsyncResponseTransformer for more info on why it's needed
builder.addStatement("$T restAsyncResponseTransformer = $T.<$T, $T>builder()\n"
+ ".eventStreamAsyncResponseTransformer(asyncResponseTransformer)\n"
+ ".eventStreamResponseHandler(asyncResponseHandler)\n"
+ ".build()", restTransformType, RestEventStreamAsyncResponseTransformer.class,
pojoResponseType, eventStreamBaseClass);

return builder.build();
}

private CodeBlock whenCompleteBlock(OperationModel operationModel, String responseHandlerName,
String eventTransformerFutureName) {
CodeBlock.Builder whenComplete = CodeBlock.builder()
.add(".whenComplete((r, e) -> ")
.beginControlFlow("")
.beginControlFlow("if (e != null)")
.add("runAndLogError(log, $S, () -> $N.exceptionOccurred(e));",
"Exception thrown in exceptionOccurred callback, ignoring",
responseHandlerName);

if (operationModel.hasEventStreamOutput()) {
whenComplete.add("$N.completeExceptionally(e);", eventTransformerFutureName);
}

whenComplete.endControlFlow()
.add(publishMetrics())
.endControlFlow()
.add(")")
.build();

return whenComplete.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@
"shape": "StructureWithStreamingMember"
},
"documentation": "Some operation with a streaming output"
},
"EventStreamOperation": {
"name": "EventStreamOperation",
"http": {
"method": "POST",
"requestUri": "/2016-03-11/eventStreamOperation"
},
"input": {
"shape": "EventStreamOperationRequest"
},
"output": {
"shape": "EventStreamOutput"
}
}
},
"shapes": {
Expand Down Expand Up @@ -162,6 +175,9 @@
},
"documentation": "<p>A shape with nested sub-members"
},
"String": {
"type": "string"
},
"subMember": {
"type": "string",
"max": 63,
Expand All @@ -187,6 +203,61 @@
}
},
"payload": "StreamingMember"
},
"EventStreamOperationRequest": {
"type": "structure",
"members": {
}
},
"EventStreamOutput": {
"type": "structure",
"required": [
"EventStream"
],
"members": {
"HeaderMember": {
"shape": "String",
"location": "header",
"locationName": "Header-Member"
},
"EventStream": {
"shape": "EventStream"
}
}
},
"EventStream": {
"type": "structure",
"members": {
"EventPayloadEvent": {
"shape": "EventPayloadEvent"
},
"NonEventPayloadEvent": {
"shape": "NonEventPayloadEvent"
},
"SecondEventPayloadEvent": {
"shape": "EventPayloadEvent"
}
},
"eventstream": true
},
"EventPayloadEvent": {
"type": "structure",
"members": {
"Foo": {
"shape": "String",
"eventpayload": true
}
},
"event": true
},
"NonEventPayloadEvent": {
"type": "structure",
"members": {
"Bar": {
"shape": "String"
}
},
"event": true
}
},
"documentation": "A service that is implemented using the xml protocol"
Expand Down
Loading