@@ -288,7 +288,8 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
288
288
if (this .hasAckParameter && acknowledgment == null ) {
289
289
throw new ListenerExecutionFailedException ("invokeHandler Failed" ,
290
290
new IllegalStateException ("No Acknowledgment available as an argument, "
291
- + "the listener container must have a MANUAL Ackmode to populate the Acknowledgment." , ex ));
291
+ + "the listener container must have a MANUAL Ackmode to populate the Acknowledgment." ,
292
+ ex ));
292
293
}
293
294
throw new ListenerExecutionFailedException (createMessagingErrorMessage ("Listener method could not " +
294
295
"be invoked with the incoming message" , message .getPayload ()),
@@ -347,8 +348,8 @@ private String evaluateTopic(Object request, Object source, Object result, Expre
347
348
boolean isByteArray = value instanceof byte [];
348
349
if (!(value == null || value instanceof String || isByteArray )) {
349
350
throw new IllegalStateException (
350
- "replyTopic expression must evaluate to a String or byte[], it is: "
351
- + value .getClass ().getName ());
351
+ "replyTopic expression must evaluate to a String or byte[], it is: "
352
+ + value .getClass ().getName ());
352
353
}
353
354
if (isByteArray ) {
354
355
return new String ((byte []) value , StandardCharsets .UTF_8 );
@@ -427,14 +428,14 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
427
428
.setHeader (KafkaHeaders .TOPIC , topic );
428
429
if (this .replyHeadersConfigurer != null ) {
429
430
Map <String , Object > headersToCopy = ((Message <?>) source ).getHeaders ().entrySet ().stream ()
430
- .filter (e -> {
431
- String key = e .getKey ();
432
- return !key .equals (MessageHeaders .ID ) && !key .equals (MessageHeaders .TIMESTAMP )
433
- && !key .equals (KafkaHeaders .CORRELATION_ID )
434
- && !key .startsWith (KafkaHeaders .RECEIVED );
435
- })
436
- .filter (e -> this .replyHeadersConfigurer .shouldCopy (e .getKey (), e .getValue ()))
437
- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
431
+ .filter (e -> {
432
+ String key = e .getKey ();
433
+ return !key .equals (MessageHeaders .ID ) && !key .equals (MessageHeaders .TIMESTAMP )
434
+ && !key .equals (KafkaHeaders .CORRELATION_ID )
435
+ && !key .startsWith (KafkaHeaders .RECEIVED );
436
+ })
437
+ .filter (e -> this .replyHeadersConfigurer .shouldCopy (e .getKey (), e .getValue ()))
438
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
438
439
if (headersToCopy .size () > 0 ) {
439
440
builder .copyHeaders (headersToCopy );
440
441
}
@@ -488,42 +489,12 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
488
489
&& (methodParameter .getParameterAnnotations ().length == 0
489
490
|| methodParameter .hasParameterAnnotation (Payload .class ))) {
490
491
if (genericParameterType == null ) {
491
- genericParameterType = methodParameter .getGenericParameterType ();
492
- if (genericParameterType instanceof ParameterizedType ) {
493
- ParameterizedType parameterizedType = (ParameterizedType ) genericParameterType ;
494
- if (parameterizedType .getRawType ().equals (Message .class )) {
495
- genericParameterType = ((ParameterizedType ) genericParameterType )
496
- .getActualTypeArguments ()[0 ];
497
- }
498
- else if (parameterizedType .getRawType ().equals (List .class )
499
- && parameterizedType .getActualTypeArguments ().length == 1 ) { // NOSONAR complex
500
- Type paramType = parameterizedType .getActualTypeArguments ()[0 ];
501
- this .isConsumerRecordList = paramType .equals (ConsumerRecord .class )
502
- || (paramType instanceof ParameterizedType
503
- && ((ParameterizedType ) paramType ).getRawType ().equals (ConsumerRecord .class )
504
- || (paramType instanceof WildcardType
505
- && ((WildcardType ) paramType ).getUpperBounds () != null
506
- && ((WildcardType ) paramType ).getUpperBounds ().length > 0
507
- && ((WildcardType ) paramType ).getUpperBounds ()[0 ] instanceof ParameterizedType
508
- && ((ParameterizedType ) ((WildcardType )
509
- paramType ).getUpperBounds ()[0 ]).getRawType ().equals (ConsumerRecord .class ))
510
- );
511
- boolean messageHasGeneric = paramType instanceof ParameterizedType
512
- && ((ParameterizedType ) paramType ).getRawType ().equals (Message .class );
513
- this .isMessageList = paramType .equals (Message .class ) || messageHasGeneric ;
514
- if (messageHasGeneric ) {
515
- genericParameterType = ((ParameterizedType ) paramType ).getActualTypeArguments ()[0 ];
516
- }
517
- }
518
- else {
519
- this .isConsumerRecords = parameterizedType .getRawType ().equals (ConsumerRecords .class );
520
- }
521
- }
492
+ genericParameterType = extractGenericParameterTypFromMethodParameter (methodParameter );
522
493
}
523
494
else {
524
495
if (this .logger .isDebugEnabled ()) {
525
496
this .logger .debug ("Ambiguous parameters for target payload for method " + method
526
- + "; no inferred type available" );
497
+ + "; no inferred type available" );
527
498
}
528
499
break ;
529
500
}
@@ -534,7 +505,7 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class))
534
505
}
535
506
else if (methodParameter .hasParameterAnnotation (Header .class )) {
536
507
Header header = methodParameter .getParameterAnnotation (Header .class );
537
- if (KafkaHeaders .GROUP_ID .equals (header .value ())) {
508
+ if (header != null && KafkaHeaders .GROUP_ID .equals (header .value ())) {
538
509
allowedBatchParameters ++;
539
510
}
540
511
}
@@ -551,6 +522,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
551
522
}
552
523
}
553
524
}
525
+
554
526
boolean validParametersForBatch = method .getGenericParameterTypes ().length <= allowedBatchParameters ;
555
527
556
528
if (!validParametersForBatch ) {
@@ -568,6 +540,41 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
568
540
return genericParameterType ;
569
541
}
570
542
543
+ private Type extractGenericParameterTypFromMethodParameter (MethodParameter methodParameter ) {
544
+ Type genericParameterType = methodParameter .getGenericParameterType ();
545
+ if (genericParameterType instanceof ParameterizedType ) {
546
+ ParameterizedType parameterizedType = (ParameterizedType ) genericParameterType ;
547
+ if (parameterizedType .getRawType ().equals (Message .class )) {
548
+ genericParameterType = ((ParameterizedType ) genericParameterType ).getActualTypeArguments ()[0 ];
549
+ }
550
+ else if (parameterizedType .getRawType ().equals (List .class )
551
+ && parameterizedType .getActualTypeArguments ().length == 1 ) {
552
+
553
+ Type paramType = parameterizedType .getActualTypeArguments ()[0 ];
554
+ this .isConsumerRecordList = paramType .equals (ConsumerRecord .class )
555
+ || (paramType instanceof ParameterizedType
556
+ && ((ParameterizedType ) paramType ).getRawType ().equals (ConsumerRecord .class )
557
+ || (paramType instanceof WildcardType
558
+ && ((WildcardType ) paramType ).getUpperBounds () != null
559
+ && ((WildcardType ) paramType ).getUpperBounds ().length > 0
560
+ && ((WildcardType ) paramType ).getUpperBounds ()[0 ] instanceof ParameterizedType
561
+ && ((ParameterizedType ) ((WildcardType )
562
+ paramType ).getUpperBounds ()[0 ]).getRawType ().equals (ConsumerRecord .class ))
563
+ );
564
+ boolean messageHasGeneric = paramType instanceof ParameterizedType
565
+ && ((ParameterizedType ) paramType ).getRawType ().equals (Message .class );
566
+ this .isMessageList = paramType .equals (Message .class ) || messageHasGeneric ;
567
+ if (messageHasGeneric ) {
568
+ genericParameterType = ((ParameterizedType ) paramType ).getActualTypeArguments ()[0 ];
569
+ }
570
+ }
571
+ else {
572
+ this .isConsumerRecords = parameterizedType .getRawType ().equals (ConsumerRecords .class );
573
+ }
574
+ }
575
+ return genericParameterType ;
576
+ }
577
+
571
578
/*
572
579
* Don't consider parameter types that are available after conversion.
573
580
* Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message<?>.
0 commit comments