@@ -190,25 +190,27 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
190
190
191
191
private final AtomicBoolean enhancerIsBuilt = new AtomicBoolean ();
192
192
193
+ @ SuppressWarnings ("NullAway.Init" )
193
194
private KafkaListenerEndpointRegistry endpointRegistry ;
194
195
195
196
private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME ;
196
197
197
- @ Nullable
198
- private ApplicationContext applicationContext ;
198
+ private @ Nullable ApplicationContext applicationContext ;
199
199
200
+ @ SuppressWarnings ("NullAway.Init" )
200
201
private BeanFactory beanFactory ;
201
202
202
203
private BeanExpressionResolver resolver = new StandardBeanExpressionResolver ();
203
204
205
+ @ SuppressWarnings ("NullAway.Init" )
204
206
private BeanExpressionContext expressionContext ;
205
207
206
208
private Charset charset = StandardCharsets .UTF_8 ;
207
209
210
+ @ SuppressWarnings ("NullAway.Init" )
208
211
private AnnotationEnhancer enhancer ;
209
212
210
- @ Nullable
211
- private RetryTopicConfigurer retryTopicConfigurer ;
213
+ private @ Nullable RetryTopicConfigurer retryTopicConfigurer ;
212
214
213
215
private final Lock globalLock = new ReentrantLock ();
214
216
@@ -473,9 +475,6 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
473
475
Method checked = checkProxy (method , bean );
474
476
KafkaHandler annotation = AnnotationUtils .findAnnotation (method , KafkaHandler .class );
475
477
if (annotation != null && annotation .isDefault ()) {
476
- Method toAssert = defaultMethod ;
477
- Assert .state (toAssert == null , () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
478
- + toAssert .toString () + " and " + method );
479
478
defaultMethod = checked ;
480
479
}
481
480
checkedMethods .add (checked );
@@ -737,9 +736,8 @@ private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaList
737
736
}
738
737
}
739
738
740
- @ Nullable
741
- private KafkaListenerContainerFactory <?> resolveContainerFactory (KafkaListener kafkaListener ,
742
- Object factoryTarget , String beanName ) {
739
+ private @ Nullable KafkaListenerContainerFactory <?> resolveContainerFactory (KafkaListener kafkaListener ,
740
+ @ Nullable Object factoryTarget , String beanName ) {
743
741
744
742
String containerFactory = kafkaListener .containerFactory ();
745
743
if (!StringUtils .hasText (containerFactory )) {
@@ -786,7 +784,7 @@ protected void assertBeanFactory() {
786
784
Assert .state (this .beanFactory != null , "BeanFactory must be set to obtain container factory by bean name" );
787
785
}
788
786
789
- protected String noBeanFoundMessage (Object target , String listenerBeanName , String requestedBeanName ,
787
+ protected String noBeanFoundMessage (@ Nullable Object target , String listenerBeanName , String requestedBeanName ,
790
788
Class <?> expectedClass ) {
791
789
792
790
return "Could not register Kafka listener endpoint on ["
@@ -833,7 +831,7 @@ private void loadProperty(Properties properties, String property, Object value)
833
831
}
834
832
}
835
833
836
- private String getEndpointId (KafkaListener kafkaListener ) {
834
+ private @ Nullable String getEndpointId (KafkaListener kafkaListener ) {
837
835
if (StringUtils .hasText (kafkaListener .id ())) {
838
836
return resolveExpressionAsString (kafkaListener .id (), "id" );
839
837
}
@@ -875,7 +873,7 @@ private String[] resolveTopics(KafkaListener kafkaListener) {
875
873
return result .toArray (new String [0 ]);
876
874
}
877
875
878
- private Pattern resolvePattern (KafkaListener kafkaListener ) {
876
+ private @ Nullable Pattern resolvePattern (KafkaListener kafkaListener ) {
879
877
Pattern pattern = null ;
880
878
String text = kafkaListener .topicPattern ();
881
879
if (StringUtils .hasText (text )) {
@@ -896,6 +894,7 @@ else if (resolved != null) {
896
894
897
895
private List <TopicPartitionOffset > resolveTopicPartitionsList (TopicPartition topicPartition ) {
898
896
Object topic = resolveExpression (topicPartition .topic ());
897
+ Assert .state (topic != null , "Topic must not be null" );
899
898
Assert .state (topic instanceof String ,
900
899
() -> "topic in @TopicPartition must resolve to a String, not " + topic .getClass ());
901
900
Assert .state (StringUtils .hasText ((String ) topic ), "topic in @TopicPartition must not be empty" );
@@ -907,19 +906,22 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
907
906
for (String partition : partitions ) {
908
907
resolvePartitionAsInteger ((String ) topic , resolveExpression (partition ), result );
909
908
}
910
- if (partitionOffsets .length == 1 && resolveExpression (partitionOffsets [0 ].partition ()).equals ("*" )) {
911
- result .forEach (tpo -> {
912
- tpo .setOffset (resolveInitialOffset (tpo .getTopic (), partitionOffsets [0 ]));
913
- tpo .setRelativeToCurrent (isRelative (tpo .getTopic (), partitionOffsets [0 ]));
914
- });
915
- }
916
- else {
917
- for (PartitionOffset partitionOffset : partitionOffsets ) {
918
- Assert .isTrue (!partitionOffset .partition ().equals ("*" ), () ->
919
- "Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result );
920
- resolvePartitionAsInteger ((String ) topic , resolveExpression (partitionOffset .partition ()), result ,
921
- resolveInitialOffset (topic , partitionOffset ), isRelative (topic , partitionOffset ), true ,
922
- resolveExpression (partitionOffset .seekPosition ()));
909
+ if (partitionOffsets .length > 0 ) {
910
+ Object resolvedExpression = resolveExpression (partitionOffsets [0 ].partition ());
911
+ if (partitionOffsets .length == 1 && resolvedExpression != null && resolvedExpression .equals ("*" )) {
912
+ result .forEach (tpo -> {
913
+ tpo .setOffset (resolveInitialOffset (tpo .getTopic (), partitionOffsets [0 ]));
914
+ tpo .setRelativeToCurrent (isRelative (tpo .getTopic (), partitionOffsets [0 ]));
915
+ });
916
+ }
917
+ else {
918
+ for (PartitionOffset partitionOffset : partitionOffsets ) {
919
+ Assert .isTrue (!partitionOffset .partition ().equals ("*" ), () ->
920
+ "Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result );
921
+ resolvePartitionAsInteger ((String ) topic , resolveExpression (partitionOffset .partition ()), result ,
922
+ resolveInitialOffset (topic , partitionOffset ), isRelative (topic , partitionOffset ), true ,
923
+ resolveExpression (partitionOffset .seekPosition ()));
924
+ }
923
925
}
924
926
}
925
927
Assert .isTrue (!result .isEmpty (), () -> "At least one partition required for " + topic );
@@ -938,9 +940,14 @@ else if (initialOffsetValue instanceof Long lng) {
938
940
initialOffset = lng ;
939
941
}
940
942
else {
941
- throw new IllegalArgumentException (String .format (
942
- "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'" ,
943
- topic , partitionOffset .initialOffset (), initialOffsetValue .getClass ()));
943
+ if (initialOffsetValue != null ) {
944
+ throw new IllegalArgumentException (String .format (
945
+ "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'" ,
946
+ topic , partitionOffset .initialOffset (), initialOffsetValue .getClass ()));
947
+ }
948
+ else {
949
+ throw new IllegalArgumentException ("@PartitionOffset for topic '" + topic + "' cannot be empty. Initial offset is null" );
950
+ }
944
951
}
945
952
return initialOffset ;
946
953
}
@@ -955,15 +962,20 @@ else if (relativeToCurrentValue instanceof Boolean bool) {
955
962
relativeToCurrent = bool ;
956
963
}
957
964
else {
958
- throw new IllegalArgumentException (String .format (
959
- "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'" ,
960
- topic , partitionOffset .relativeToCurrent (), relativeToCurrentValue .getClass ()));
965
+ if (relativeToCurrentValue != null ) {
966
+ throw new IllegalArgumentException (String .format (
967
+ "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'" ,
968
+ topic , partitionOffset .relativeToCurrent (), relativeToCurrentValue .getClass ()));
969
+ }
970
+ else {
971
+ throw new IllegalArgumentException ("@PartitionOffset for topic '" + topic + "' cannot be empty. Relative to current value is null" );
972
+ }
961
973
}
962
974
return relativeToCurrent ;
963
975
}
964
976
965
977
@ SuppressWarnings (UNCHECKED )
966
- private void resolveAsString (Object resolvedValue , List <String > result ) {
978
+ private void resolveAsString (@ Nullable Object resolvedValue , List <String > result ) {
967
979
if (resolvedValue instanceof String [] strArr ) {
968
980
for (Object object : strArr ) {
969
981
resolveAsString (object , result );
@@ -983,12 +995,12 @@ else if (resolvedValue instanceof Iterable) {
983
995
}
984
996
}
985
997
986
- private void resolvePartitionAsInteger (String topic , Object resolvedValue , List <TopicPartitionOffset > result ) {
998
+ private void resolvePartitionAsInteger (String topic , @ Nullable Object resolvedValue , List <TopicPartitionOffset > result ) {
987
999
resolvePartitionAsInteger (topic , resolvedValue , result , null , false , false , null );
988
1000
}
989
1001
990
1002
@ SuppressWarnings (UNCHECKED )
991
- private void resolvePartitionAsInteger (String topic , Object resolvedValue , List <TopicPartitionOffset > result ,
1003
+ private void resolvePartitionAsInteger (String topic , @ Nullable Object resolvedValue , List <TopicPartitionOffset > result ,
992
1004
@ Nullable Long offset , boolean isRelative , boolean checkDups , @ Nullable Object seekPosition ) {
993
1005
994
1006
if (resolvedValue instanceof String [] strArr ) {
@@ -1034,6 +1046,7 @@ else if (resolvedValue instanceof Integer intgr) {
1034
1046
}
1035
1047
}
1036
1048
1049
+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
1037
1050
private TopicPartitionOffset .SeekPosition resloveTopicPartitionOffsetSeekPosition (@ Nullable Object seekPosition ) {
1038
1051
TopicPartitionOffset .SeekPosition resloveTpoSp = null ;
1039
1052
if (seekPosition instanceof String seekPositionName ) {
@@ -1062,7 +1075,7 @@ private TopicPartitionOffset createTopicPartitionOffset(String topic, int partit
1062
1075
}
1063
1076
}
1064
1077
1065
- private String resolveExpressionAsString (String value , String attribute ) {
1078
+ private @ Nullable String resolveExpressionAsString (String value , String attribute ) {
1066
1079
Object resolved = resolveExpression (value );
1067
1080
if (resolved instanceof String str ) {
1068
1081
return str ;
@@ -1074,7 +1087,7 @@ else if (resolved != null) {
1074
1087
return null ;
1075
1088
}
1076
1089
1077
- @ Nullable
1090
+ @ SuppressWarnings ( "NullAway" ) // Dataflow analysis limitation
1078
1091
private byte [] resolveExpressionAsBytes (String value , String attribute ) {
1079
1092
Object resolved = resolveExpression (value );
1080
1093
if (resolved instanceof String str ) {
@@ -1092,7 +1105,7 @@ else if (resolved != null) {
1092
1105
return null ;
1093
1106
}
1094
1107
1095
- private Integer resolveExpressionAsInteger (String value , String attribute ) {
1108
+ private @ Nullable Integer resolveExpressionAsInteger (String value , String attribute ) {
1096
1109
Object resolved = resolveExpression (value );
1097
1110
Integer result = null ;
1098
1111
if (resolved instanceof String str ) {
@@ -1109,7 +1122,7 @@ else if (resolved != null) {
1109
1122
return result ;
1110
1123
}
1111
1124
1112
- private Boolean resolveExpressionAsBoolean (String value , String attribute ) {
1125
+ private @ Nullable Boolean resolveExpressionAsBoolean (String value , String attribute ) {
1113
1126
Object resolved = resolveExpression (value );
1114
1127
Boolean result = null ;
1115
1128
if (resolved instanceof Boolean bool ) {
@@ -1126,7 +1139,7 @@ else if (resolved != null) {
1126
1139
return result ;
1127
1140
}
1128
1141
1129
- private Object resolveExpression (String value ) {
1142
+ private @ Nullable Object resolveExpression (String value ) {
1130
1143
return this .resolver .evaluate (resolve (value ), this .expressionContext );
1131
1144
}
1132
1145
@@ -1136,7 +1149,7 @@ private Object resolveExpression(String value) {
1136
1149
* @return the resolved value
1137
1150
* @see ConfigurableBeanFactory#resolveEmbeddedValue
1138
1151
*/
1139
- private String resolve (String value ) {
1152
+ private @ Nullable String resolve (String value ) {
1140
1153
if (this .beanFactory instanceof ConfigurableBeanFactory cbf ) {
1141
1154
return cbf .resolveEmbeddedValue (value );
1142
1155
}
@@ -1210,7 +1223,7 @@ private final class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMe
1210
1223
private final DefaultFormattingConversionService defaultFormattingConversionService =
1211
1224
new DefaultFormattingConversionService ();
1212
1225
1213
- private MessageHandlerMethodFactory handlerMethodFactory ;
1226
+ private @ Nullable MessageHandlerMethodFactory handlerMethodFactory ;
1214
1227
1215
1228
public void setHandlerMethodFactory (MessageHandlerMethodFactory kafkaHandlerMethodFactory1 ) {
1216
1229
this .handlerMethodFactory = kafkaHandlerMethodFactory1 ;
@@ -1264,7 +1277,7 @@ public String convert(byte[] source) {
1264
1277
1265
1278
static class ListenerScope implements Scope {
1266
1279
1267
- private final Map <String , Object > listeners = new HashMap <>();
1280
+ private final Map <String , @ Nullable Object > listeners = new HashMap <>();
1268
1281
1269
1282
ListenerScope () {
1270
1283
}
@@ -1277,11 +1290,13 @@ public void removeListener(String key) {
1277
1290
this .listeners .remove (key );
1278
1291
}
1279
1292
1293
+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
1280
1294
@ Override
1281
- public Object get (String name , ObjectFactory <?> objectFactory ) {
1295
+ public @ Nullable Object get (String name , ObjectFactory <?> objectFactory ) {
1282
1296
return this .listeners .get (name );
1283
1297
}
1284
1298
1299
+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
1285
1300
@ Override
1286
1301
public Object remove (String name ) {
1287
1302
return null ;
@@ -1292,12 +1307,12 @@ public void registerDestructionCallback(String name, Runnable callback) {
1292
1307
}
1293
1308
1294
1309
@ Override
1295
- public Object resolveContextualObject (String key ) {
1310
+ public @ Nullable Object resolveContextualObject (String key ) {
1296
1311
return this .listeners .get (key );
1297
1312
}
1298
1313
1299
1314
@ Override
1300
- public String getConversationId () {
1315
+ public @ Nullable String getConversationId () {
1301
1316
return null ;
1302
1317
}
1303
1318
@@ -1319,7 +1334,6 @@ private static final class BytesToNumberConverter implements ConditionalGenericC
1319
1334
}
1320
1335
1321
1336
@ Override
1322
- @ Nullable
1323
1337
public Set <ConvertiblePair > getConvertibleTypes () {
1324
1338
HashSet <ConvertiblePair > pairs = new HashSet <>();
1325
1339
pairs .add (new ConvertiblePair (byte [].class , long .class ));
@@ -1334,8 +1348,7 @@ public Set<ConvertiblePair> getConvertibleTypes() {
1334
1348
}
1335
1349
1336
1350
@ Override
1337
- @ Nullable
1338
- public Object convert (@ Nullable Object source , TypeDescriptor sourceType , TypeDescriptor targetType ) {
1351
+ public @ Nullable Object convert (@ Nullable Object source , TypeDescriptor sourceType , TypeDescriptor targetType ) {
1339
1352
byte [] bytes = (byte []) source ;
1340
1353
if (bytes == null ) {
1341
1354
return null ;
0 commit comments