21
21
import com .squareup .javapoet .CodeBlock ;
22
22
import com .squareup .javapoet .ParameterizedTypeName ;
23
23
import com .squareup .javapoet .TypeName ;
24
+ import com .squareup .javapoet .WildcardTypeName ;
24
25
import java .util .Map ;
25
26
import java .util .Optional ;
26
27
import java .util .concurrent .CompletableFuture ;
28
+ import software .amazon .awssdk .awscore .eventstream .EventStreamAsyncResponseTransformer ;
29
+ import software .amazon .awssdk .awscore .eventstream .EventStreamTaggedUnionPojoSupplier ;
30
+ import software .amazon .awssdk .awscore .eventstream .RestEventStreamAsyncResponseTransformer ;
31
+ import software .amazon .awssdk .awscore .exception .AwsServiceException ;
27
32
import software .amazon .awssdk .codegen .model .config .customization .S3ArnableFieldConfig ;
28
33
import software .amazon .awssdk .codegen .model .intermediate .IntermediateModel ;
29
34
import software .amazon .awssdk .codegen .model .intermediate .OperationModel ;
35
+ import software .amazon .awssdk .codegen .model .intermediate .ShapeModel ;
30
36
import software .amazon .awssdk .codegen .poet .PoetExtensions ;
31
37
import software .amazon .awssdk .codegen .poet .client .traits .HttpChecksumRequiredTrait ;
38
+ import software .amazon .awssdk .codegen .poet .eventstream .EventStreamUtils ;
39
+ import software .amazon .awssdk .codegen .poet .model .EventStreamSpecHelper ;
40
+ import software .amazon .awssdk .core .SdkPojoBuilder ;
32
41
import software .amazon .awssdk .core .client .handler .ClientExecutionParams ;
33
42
import software .amazon .awssdk .core .http .HttpResponseHandler ;
34
43
import software .amazon .awssdk .protocols .xml .AwsXmlProtocolFactory ;
@@ -60,21 +69,26 @@ protected Class<?> protocolFactoryClass() {
60
69
@ Override
61
70
public CodeBlock responseHandler (IntermediateModel model ,
62
71
OperationModel opModel ) {
63
-
64
72
if (opModel .hasStreamingOutput ()) {
65
73
return streamingResponseHandler (opModel );
66
74
}
67
75
68
76
ClassName responseType = poetExtensions .getModelClass (opModel .getReturnType ().getReturnType ());
69
77
78
+ if (opModel .hasEventStreamOutput ()) {
79
+ return CodeBlock .builder ()
80
+ .add (eventStreamResponseHandlers (opModel , responseType ))
81
+ .build ();
82
+ }
83
+
70
84
TypeName handlerType = ParameterizedTypeName .get (
71
85
ClassName .get (HttpResponseHandler .class ),
72
86
ParameterizedTypeName .get (ClassName .get (software .amazon .awssdk .core .Response .class ), responseType ));
73
87
74
88
return CodeBlock .builder ()
75
89
.addStatement ("\n \n $T responseHandler = protocolFactory.createCombinedResponseHandler($T::builder, "
76
90
+ "new $T().withHasStreamingSuccessResponse($L))" ,
77
- handlerType , responseType , XmlOperationMetadata .class , opModel .hasStreamingOutput ())
91
+ handlerType , responseType , XmlOperationMetadata .class , opModel .hasStreamingOutput ())
78
92
.build ();
79
93
}
80
94
@@ -160,34 +174,64 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
160
174
ClassName requestType = poetExtensions .getModelClass (opModel .getInput ().getVariableType ());
161
175
ClassName marshaller = poetExtensions .getRequestTransformClass (opModel .getInputShape ().getShapeName () + "Marshaller" );
162
176
177
+ CodeBlock .Builder builder = CodeBlock .builder ();
178
+
179
+ if (opModel .hasEventStreamOutput ()) {
180
+ builder .add (eventStreamResponseTransformers (opModel ));
181
+ }
182
+
163
183
TypeName executeFutureValueType = executeFutureValueType (opModel , poetExtensions );
164
- CodeBlock .Builder builder =
165
- CodeBlock .builder ()
166
- .add ("\n \n $T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n " ,
167
- CompletableFuture .class , executeFutureValueType ,
168
- ClientExecutionParams .class , requestType , pojoResponseType )
169
- .add (".withOperationName(\" $N\" )\n " , opModel .getOperationName ())
170
- .add (".withMarshaller($L)\n " , asyncMarshaller (intermediateModel , opModel , marshaller , "protocolFactory" ))
171
- .add (".withCombinedResponseHandler(responseHandler)\n " )
172
- .add (hostPrefixExpression (opModel ))
173
- .add (".withMetricCollector(apiCallMetricCollector)\n " )
174
- .add (asyncRequestBody (opModel ))
175
- .add (HttpChecksumRequiredTrait .putHttpChecksumAttribute (opModel ));
184
+ String executionResponseTransformerName = "asyncResponseTransformer" ;
185
+
186
+ if (opModel .hasEventStreamOutput ()) {
187
+ executionResponseTransformerName = "restAsyncResponseTransformer" ;
188
+ }
189
+
190
+ builder .add ("\n \n $T<$T> executeFuture = clientHandler.execute(new $T<$T, $T>()\n " ,
191
+ CompletableFuture .class , executeFutureValueType ,
192
+ ClientExecutionParams .class , requestType , pojoResponseType )
193
+ .add (".withOperationName(\" $N\" )\n " , opModel .getOperationName ())
194
+ .add (".withMarshaller($L)\n " , asyncMarshaller (intermediateModel , opModel , marshaller , "protocolFactory" ));
195
+
196
+ if (opModel .hasEventStreamOutput ()) {
197
+ builder .add (".withResponseHandler(responseHandler)" );
198
+ } else {
199
+ builder .add (".withCombinedResponseHandler(responseHandler)" );
200
+ }
201
+
202
+ builder .add (hostPrefixExpression (opModel ))
203
+ .add (".withMetricCollector(apiCallMetricCollector)\n " )
204
+ .add (asyncRequestBody (opModel ))
205
+ .add (HttpChecksumRequiredTrait .putHttpChecksumAttribute (opModel ));
176
206
177
207
s3ArnableFields (opModel , model ).ifPresent (builder ::add );
178
- builder .add (".withInput($L) $L);" , opModel .getInput ().getVariableName (), opModel .hasStreamingOutput () ?
179
- ", asyncResponseTransformer" : "" );
208
+
209
+ builder .add (".withInput($L)" , opModel .getInput ().getVariableName ());
210
+ if (opModel .hasStreamingOutput () || opModel .hasEventStreamOutput ()) {
211
+ builder .add (", $N" , executionResponseTransformerName );
212
+ }
213
+ builder .addStatement (")" );
214
+
180
215
String whenCompleteFutureName = "whenCompleteFuture" ;
181
216
builder .addStatement ("$T $N = null" , ParameterizedTypeName .get (ClassName .get (CompletableFuture .class ),
182
217
executeFutureValueType ), whenCompleteFutureName );
183
- if (opModel .hasStreamingOutput ()) {
218
+
219
+ if (opModel .hasStreamingOutput () || opModel .hasEventStreamOutput ()) {
184
220
builder .addStatement ("$N = executeFuture$L" , whenCompleteFutureName ,
185
- streamingOutputWhenComplete ( "asyncResponseTransformer " ));
221
+ whenCompleteBlock ( opModel , "asyncResponseHandler " ));
186
222
} else {
187
223
builder .addStatement ("$N = executeFuture$L" , whenCompleteFutureName , publishMetricsWhenComplete ());
188
224
}
189
- builder .addStatement ("return $T.forwardExceptionTo($N, executeFuture)" , CompletableFutureUtils .class ,
225
+
226
+ builder .addStatement ("$T.forwardExceptionTo($N, executeFuture)" , CompletableFutureUtils .class ,
190
227
whenCompleteFutureName );
228
+
229
+ if (opModel .hasEventStreamOutput ()) {
230
+ builder .addStatement ("return $T.forwardExceptionTo(future, executeFuture)" , CompletableFutureUtils .class );
231
+ } else {
232
+ builder .addStatement ("return $N" , whenCompleteFutureName );
233
+ }
234
+
191
235
return builder .build ();
192
236
}
193
237
@@ -198,4 +242,120 @@ private String asyncRequestBody(OperationModel opModel) {
198
242
private CodeBlock asyncStreamingExecutionHandler (IntermediateModel intermediateModel , OperationModel opModel ) {
199
243
return super .asyncExecutionHandler (intermediateModel , opModel );
200
244
}
245
+
246
+ private CodeBlock eventStreamResponseHandlers (OperationModel opModel , TypeName pojoResponseType ) {
247
+ CodeBlock streamResponseOpMd = CodeBlock .builder ()
248
+ .add ("$T.builder()" , XmlOperationMetadata .class )
249
+ .add (".hasStreamingSuccessResponse(true)" )
250
+ .add (".build()" )
251
+ .build ();
252
+
253
+
254
+ CodeBlock .Builder builder = CodeBlock .builder ();
255
+
256
+ // Response handler for handling the initial response from the operation. Note, this does not handle the event stream
257
+ // messages, that is the job of "eventResponseHandler" below
258
+ builder .addStatement ("$T<$T> responseHandler = protocolFactory.createResponseHandler($T::builder, $L)" ,
259
+ HttpResponseHandler .class ,
260
+ pojoResponseType ,
261
+ pojoResponseType ,
262
+ streamResponseOpMd );
263
+
264
+ // Response handler responsible for errors for the API call itself, as well as errors sent over the event stream
265
+ builder .addStatement ("$T errorResponseHandler = protocolFactory"
266
+ + ".createErrorResponseHandler()" , ParameterizedTypeName .get (HttpResponseHandler .class ,
267
+ AwsServiceException .class ));
268
+
269
+
270
+ ShapeModel eventStreamShape = EventStreamUtils .getEventStreamInResponse (opModel .getOutputShape ());
271
+ ClassName eventStream = poetExtensions .getModelClassFromShape (eventStreamShape );
272
+ EventStreamSpecHelper eventStreamSpecHelper = new EventStreamSpecHelper (eventStreamShape , intermediateModel );
273
+
274
+ CodeBlock .Builder supplierBuilder = CodeBlock .builder ()
275
+ .add ("$T.builder()" , EventStreamTaggedUnionPojoSupplier .class );
276
+ EventStreamUtils .getEvents (eventStreamShape ).forEach (m -> {
277
+ String builderName = eventStreamSpecHelper .eventBuilderMethodName (m );
278
+ supplierBuilder .add (".putSdkPojoSupplier($S, $T::$N)" , m .getName (), eventStream , builderName );
279
+ });
280
+ supplierBuilder .add (".defaultSdkPojoSupplier(() -> new $T($T.UNKNOWN))" , SdkPojoBuilder .class , eventStream );
281
+ CodeBlock supplierCodeBlock = supplierBuilder .add (".build()" ).build ();
282
+
283
+ CodeBlock nonStreamingOpMd = CodeBlock .builder ()
284
+ .add ("$T.builder()" , XmlOperationMetadata .class )
285
+ .add (".hasStreamingSuccessResponse(false)" )
286
+ .add (".build()" )
287
+ .build ();
288
+
289
+ // The response handler responsible for unmarshalling each event
290
+ builder .addStatement ("$T eventResponseHandler = protocolFactory.createResponseHandler($L, $L)" ,
291
+ ParameterizedTypeName .get (ClassName .get (HttpResponseHandler .class ),
292
+ WildcardTypeName .subtypeOf (eventStream )),
293
+ supplierCodeBlock ,
294
+ nonStreamingOpMd );
295
+
296
+
297
+ return builder .build ();
298
+ }
299
+
300
+ private CodeBlock eventStreamResponseTransformers (OperationModel opModel ) {
301
+ ShapeModel shapeModel = EventStreamUtils .getEventStreamInResponse (opModel .getOutputShape ());
302
+ ClassName pojoResponseType = poetExtensions .getModelClass (opModel .getReturnType ().getReturnType ());
303
+ ClassName eventStreamBaseClass = poetExtensions .getModelClassFromShape (shapeModel );
304
+
305
+ CodeBlock .Builder builder = CodeBlock .builder ();
306
+
307
+ ParameterizedTypeName transformerType = ParameterizedTypeName .get (
308
+ ClassName .get (EventStreamAsyncResponseTransformer .class ),
309
+ pojoResponseType ,
310
+ eventStreamBaseClass );
311
+
312
+ builder .addStatement ("$1T<$2T> future = new $1T<>()" , ClassName .get (CompletableFuture .class ), ClassName .get (Void .class ))
313
+ .add ("$T asyncResponseTransformer = $T.<$T, $T>builder()" ,
314
+ transformerType , ClassName .get (EventStreamAsyncResponseTransformer .class ), pojoResponseType ,
315
+ eventStreamBaseClass )
316
+ .add (".eventStreamResponseHandler(asyncResponseHandler)" )
317
+ .add (".eventResponseHandler(eventResponseHandler)" )
318
+ .add (".initialResponseHandler(responseHandler)" )
319
+ .add (".exceptionResponseHandler(errorResponseHandler)" )
320
+ .add (".future(future)" )
321
+ .add (".executor(executor)" )
322
+ .add (".serviceName(serviceName())" )
323
+ .addStatement (".build()" );
324
+
325
+ ParameterizedTypeName restTransformType =
326
+ ParameterizedTypeName .get (ClassName .get (RestEventStreamAsyncResponseTransformer .class ), pojoResponseType ,
327
+ eventStreamBaseClass );
328
+
329
+ // Wrap the event transformer with this so that the caller's response handler's onResponse() method is invoked. See
330
+ // docs for RestEventStreamAsyncResponseTransformer for more info on why it's needed
331
+ builder .addStatement ("$T restAsyncResponseTransformer = $T.<$T, $T>builder()"
332
+ + ".eventStreamAsyncResponseTransformer(asyncResponseTransformer)"
333
+ + ".eventStreamResponseHandler(asyncResponseHandler)"
334
+ + ".build()" , restTransformType , RestEventStreamAsyncResponseTransformer .class ,
335
+ pojoResponseType , eventStreamBaseClass );
336
+
337
+ return builder .build ();
338
+ }
339
+
340
+ private CodeBlock whenCompleteBlock (OperationModel operationModel , String responseHandlerName ) {
341
+ CodeBlock .Builder whenComplete = CodeBlock .builder ()
342
+ .add (".whenComplete((r, e) -> " )
343
+ .beginControlFlow ("" )
344
+ .beginControlFlow ("if (e != null)" )
345
+ .add ("runAndLogError(log, $S, () -> $N.exceptionOccurred(e));" ,
346
+ "Exception thrown in exceptionOccurred callback, ignoring" ,
347
+ responseHandlerName );
348
+
349
+ if (operationModel .hasEventStreamOutput ()) {
350
+ whenComplete .add ("future.completeExceptionally(e);" );
351
+ }
352
+
353
+ whenComplete .endControlFlow ()
354
+ .add (publishMetrics ())
355
+ .endControlFlow ()
356
+ .add (")" )
357
+ .build ();
358
+
359
+ return whenComplete .build ();
360
+ }
201
361
}
0 commit comments