Skip to content

Attach SdkHttpResponse to the responses of event streaming opeartions #712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-7b7c331.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "AWS SDK for Java v2",
"type": "bugfix",
"description": "Attach `SdkHttpResponse` to the responses of event streaming operations."
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.awssdk.codegen.poet.PoetExtensions;
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler;
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.internal.protocol.json.VoidJsonUnmarshaller;
Expand Down Expand Up @@ -112,51 +113,20 @@ public CodeBlock responseHandler(IntermediateModel model, OperationModel opModel

// TODO remove this once kinesis supports CBOR for event streaming
String protocolFactory = opModel.hasEventStreamOutput() ? "jsonProtocolFactory" : "protocolFactory";
CodeBlock.Builder builder = CodeBlock
.builder()
.add("\n\n$T<$T> responseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson($L)" +
" .withHasStreamingSuccessResponse($L), new $T());",
HttpResponseHandler.class,
pojoResponseType,
protocolFactory,
JsonOperationMetadata.class,
!opModel.getHasBlobMemberAsPayload(),
opModel.hasStreamingOutput(),
unmarshaller);
CodeBlock.Builder builder = CodeBlock.builder();
if (opModel.hasEventStreamOutput()) {
builder.add("\n\n$T<$T> voidResponseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson(false)" +
" .withHasStreamingSuccessResponse(true), new $T());",
responseHandlersForEventStreaming(opModel, unmarshaller, pojoResponseType, protocolFactory, builder);
} else {
builder.add("\n\n$T<$T> responseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson($L)" +
" .withHasStreamingSuccessResponse($L), new $T());",
HttpResponseHandler.class,
SdkResponse.class,
pojoResponseType,
protocolFactory,
JsonOperationMetadata.class,
VoidJsonUnmarshaller.class);
EventStreamUtils eventStreamUtils = EventStreamUtils.create(poetExtensions, opModel);
ClassName eventStreamBaseClass = eventStreamUtils.eventStreamBaseClass();
builder
.add("\n\n$T<$T> eventResponseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson($L)" +
" .withHasStreamingSuccessResponse($L), "
+ "$T.builder()",
HttpResponseHandler.class,
WildcardTypeName.subtypeOf(eventStreamBaseClass),
protocolFactory,
JsonOperationMetadata.class,
true,
false,
ClassName.get(EventStreamTaggedUnionJsonUnmarshaller.class));

eventStreamUtils.getEventStreamMembers()
.forEach(m -> {
String unmarshallerClassName = m.getShape().getVariable().getVariableType() + "Unmarshaller";
builder.add(".addUnmarshaller(\"$L\", $T.getInstance())\n",
m.getC2jName(),
poetExtensions.getTransformClass(unmarshallerClassName));
});
builder.add(".defaultUnmarshaller((in) -> $T.UNKNOWN)\n"
+ ".build());\n", eventStreamUtils.eventStreamBaseClass());
!opModel.getHasBlobMemberAsPayload(),
opModel.hasStreamingOutput(),
unmarshaller);
}
return builder.build();
}
Expand Down Expand Up @@ -375,4 +345,56 @@ private ClassName baseExceptionClassName(IntermediateModel model) {

return ClassName.get(exceptionPath, model.getSdkModeledExceptionBaseClassName());
}


/**
* Add responseHandlers for event streaming operations
*/
private void responseHandlersForEventStreaming(OperationModel opModel, ClassName unmarshaller, TypeName pojoResponseType,
String protocolFactory, CodeBlock.Builder builder) {
builder.add("\n\n$T<$T> responseHandler = new $T($L.createResponseHandler(new $T()" +
" .withPayloadJson($L)" +
" .withHasStreamingSuccessResponse($L), new $T()));",
HttpResponseHandler.class,
pojoResponseType,
AttachHttpMetadataResponseHandler.class,
protocolFactory,
JsonOperationMetadata.class,
!opModel.getHasBlobMemberAsPayload(),
opModel.hasStreamingOutput(),
unmarshaller);

builder.add("\n\n$T<$T> voidResponseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson(false)" +
" .withHasStreamingSuccessResponse(true), new $T());",
HttpResponseHandler.class,
SdkResponse.class,
protocolFactory,
JsonOperationMetadata.class,
VoidJsonUnmarshaller.class);
EventStreamUtils eventStreamUtils = EventStreamUtils.create(poetExtensions, opModel);
ClassName eventStreamBaseClass = eventStreamUtils.eventStreamBaseClass();
builder
.add("\n\n$T<$T> eventResponseHandler = $L.createResponseHandler(new $T()" +
" .withPayloadJson($L)" +
" .withHasStreamingSuccessResponse($L), "
+ "$T.builder()",
HttpResponseHandler.class,
WildcardTypeName.subtypeOf(eventStreamBaseClass),
protocolFactory,
JsonOperationMetadata.class,
true,
false,
ClassName.get(EventStreamTaggedUnionJsonUnmarshaller.class));

eventStreamUtils.getEventStreamMembers()
.forEach(m -> {
String unmarshallerClassName = m.getShape().getVariable().getVariableType() + "Unmarshaller";
builder.add(".addUnmarshaller(\"$L\", $T.getInstance())\n",
m.getC2jName(),
poetExtensions.getTransformClass(unmarshallerClassName));
});
builder.add(".defaultUnmarshaller((in) -> $T.UNKNOWN)\n"
+ ".build());\n", eventStreamUtils.eventStreamBaseClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.handler.AsyncClientHandler;
import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler;
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.internal.client.config.SdkClientConfiguration;
Expand Down Expand Up @@ -216,9 +217,9 @@ public CompletableFuture<Void> eventStreamOperation(EventStreamOperationRequest
EventStreamOperationResponseHandler asyncResponseHandler) {
try {

HttpResponseHandler<EventStreamOperationResponse> responseHandler = jsonProtocolFactory.createResponseHandler(
new JsonOperationMetadata().withPayloadJson(true).withHasStreamingSuccessResponse(false),
new EventStreamOperationResponseUnmarshaller());
HttpResponseHandler<EventStreamOperationResponse> responseHandler = new AttachHttpMetadataResponseHandler(
jsonProtocolFactory.createResponseHandler(new JsonOperationMetadata().withPayloadJson(true)
.withHasStreamingSuccessResponse(false), new EventStreamOperationResponseUnmarshaller()));

HttpResponseHandler<SdkResponse> voidResponseHandler = jsonProtocolFactory.createResponseHandler(
new JsonOperationMetadata().withPayloadJson(false).withHasStreamingSuccessResponse(true),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.client.handler;

import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;

/**
* Decorate {@link HttpResponseHandler} to attach {@link SdkHttpResponse} to the response object.
*/
@SdkProtectedApi
public final class AttachHttpMetadataResponseHandler<T extends SdkResponse> implements HttpResponseHandler<T> {

private final HttpResponseHandler<T> delegate;

public AttachHttpMetadataResponseHandler(HttpResponseHandler<T> delegate) {
this.delegate = delegate;
}

@Override
@SuppressWarnings("unchecked")
public T handle(SdkHttpFullResponse response, ExecutionAttributes executionAttributes) throws Exception {
return (T) delegate.handle(response, executionAttributes)
.toBuilder()
.sdkHttpResponse(response)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import software.amazon.awssdk.core.internal.interceptor.ExecutionInterceptorChain;
import software.amazon.awssdk.core.internal.interceptor.InterceptorContext;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpResponse;

@SdkProtectedApi
public abstract class BaseClientHandler {
Expand Down Expand Up @@ -117,21 +116,6 @@ private static <OutputT extends SdkResponse> OutputT runAfterUnmarshallingInterc
return (OutputT) interceptorContext.response();
}

/**
* Add {@link SdkHttpResponse} to SdkResponse.
*/
@SuppressWarnings("unchecked")
private static <OutputT extends SdkResponse> HttpResponseHandler<OutputT> addHttpResponseMetadataResponseHandler(
HttpResponseHandler<OutputT> delegate) {
return (response, executionAttributes) -> {
OutputT sdkResponse = delegate.handle(response, executionAttributes);

return (OutputT) sdkResponse.toBuilder()
.sdkHttpResponse(response)
.build();
};
}

static <OutputT extends SdkResponse> HttpResponseHandler<OutputT> interceptorCalling(
HttpResponseHandler<OutputT> delegate, ExecutionContext context) {
return (response, executionAttributes) ->
Expand Down Expand Up @@ -182,6 +166,6 @@ protected boolean isCalculateCrc32FromCompressedData() {
<OutputT extends SdkResponse> HttpResponseHandler<OutputT> decorateResponseHandlers(
HttpResponseHandler<OutputT> delegate, ExecutionContext executionContext) {
HttpResponseHandler<OutputT> interceptorCallingResponseHandler = interceptorCalling(delegate, executionContext);
return addHttpResponseMetadataResponseHandler(interceptorCallingResponseHandler);
return new AttachHttpMetadataResponseHandler<>(interceptorCallingResponseHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
Expand Down Expand Up @@ -106,13 +107,13 @@ public void subscribeToShard_ReceivesAllData() {
SubscribeToShardResponseHandler.builder()
.onEventStream(p -> p.filter(SubscribeToShardEvent.class)
.subscribe(eventConsumer))
.onResponse(this::verifyHttpMetadata)
.build())
.join();
producer.shutdown();
// Make sure we all the data we received was data we published, we may have published more
// if the producer isn't shutdown immediately after we finish subscribing.
assertThat(producedData).containsSequence(receivedData);

}

@Test
Expand All @@ -126,7 +127,7 @@ public void cancelledSubscription_DoesNotCallTerminalMethods() {
new SubscribeToShardResponseHandler() {
@Override
public void responseReceived(SubscribeToShardResponse response) {

verifyHttpMetadata(response);
}

@Override
Expand Down Expand Up @@ -222,4 +223,10 @@ private Optional<SdkBytes> putRecord() {
}
}

private void verifyHttpMetadata(SubscribeToShardResponse response) {
SdkHttpResponse sdkHttpResponse = response.sdkHttpResponse();
assertThat(sdkHttpResponse).isNotNull();
assertThat(sdkHttpResponse.isSuccessful()).isTrue();
assertThat(sdkHttpResponse.headers()).isNotEmpty();
}
}