Skip to content

Commit fa5c35e

Browse files
garyrussellartembilan
authored andcommitted
GH-1661: @KafkaListener Improvements
Resolves #1661 Allow properties that resolve to bean names to also resolve to instances of the required type instead.
1 parent f637add commit fa5c35e

File tree

3 files changed

+90
-26
lines changed

3 files changed

+90
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,10 @@
104104
/**
105105
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
106106
* to use to create the message listener container responsible to serve this endpoint.
107-
* <p>If not specified, the default container factory is used, if any.
107+
* <p>
108+
* If not specified, the default container factory is used, if any. If a SpEL
109+
* expression is provided ({@code #{...}}), the expression can either evaluate to a
110+
* container factory instance or a bean name.
108111
* @return the container factory bean name.
109112
*/
110113
String containerFactory() default "";
@@ -156,7 +159,10 @@
156159

157160
/**
158161
* Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
159-
* name to invoke if the listener method throws an exception.
162+
* name to invoke if the listener method throws an exception. If a SpEL expression is
163+
* provided ({@code #{...}}), the expression can either evaluate to a
164+
* {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} instance or a
165+
* bean name.
160166
* @return the error handler.
161167
* @since 1.3
162168
*/

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -449,31 +449,75 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
449449
resolveKafkaProperties(endpoint, kafkaListener.properties());
450450
endpoint.setSplitIterables(kafkaListener.splitIterables());
451451

452-
KafkaListenerContainerFactory<?> factory = null;
453-
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
454-
if (StringUtils.hasText(containerFactoryBeanName)) {
455-
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
456-
try {
457-
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
458-
}
459-
catch (NoSuchBeanDefinitionException ex) {
460-
throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
461-
+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
462-
+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
463-
}
452+
String containerFactory = resolve(kafkaListener.containerFactory());
453+
if (StringUtils.hasText(containerFactory)) {
454+
this.registrar.registerEndpoint(endpoint, resolveContainerFactory(kafkaListener, containerFactory, beanName));
455+
}
456+
else {
457+
this.registrar.registerEndpoint(endpoint);
464458
}
465459

466460
endpoint.setBeanFactory(this.beanFactory);
467461
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
468462
if (StringUtils.hasText(errorHandlerBeanName)) {
469-
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
463+
resolveErrorHandler(endpoint, kafkaListener);
470464
}
471-
this.registrar.registerEndpoint(endpoint, factory);
472465
if (StringUtils.hasText(beanRef)) {
473466
this.listenerScope.removeListener(beanRef);
474467
}
475468
}
476469

470+
private void resolveErrorHandler(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener KafkaListener) {
471+
Object errorHandler = resolveExpression(KafkaListener.errorHandler());
472+
if (errorHandler instanceof KafkaListenerErrorHandler) {
473+
endpoint.setErrorHandler((KafkaListenerErrorHandler) errorHandler);
474+
}
475+
else {
476+
String errorHandlerBeanName = resolveExpressionAsString(KafkaListener.errorHandler(), "errorHandler");
477+
if (StringUtils.hasText(errorHandlerBeanName)) {
478+
endpoint.setErrorHandler(
479+
this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
480+
}
481+
}
482+
}
483+
484+
@Nullable
485+
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener KafkaListener,
486+
Object factoryTarget, String beanName) {
487+
488+
KafkaListenerContainerFactory<?> factory = null;
489+
Object resolved = resolveExpression(KafkaListener.containerFactory());
490+
if (resolved instanceof KafkaListenerContainerFactory) {
491+
return (KafkaListenerContainerFactory<?>) resolved;
492+
}
493+
String containerFactoryBeanName = resolveExpressionAsString(KafkaListener.containerFactory(),
494+
"containerFactory");
495+
if (StringUtils.hasText(containerFactoryBeanName)) {
496+
assertBeanFactory();
497+
try {
498+
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
499+
}
500+
catch (NoSuchBeanDefinitionException ex) {
501+
throw new BeanInitializationException(
502+
noBeanFoundMessage(factoryTarget, beanName, containerFactoryBeanName,
503+
KafkaListenerContainerFactory.class), ex);
504+
}
505+
}
506+
return factory;
507+
}
508+
509+
protected void assertBeanFactory() {
510+
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
511+
}
512+
513+
protected String noBeanFoundMessage(Object target, String listenerBeanName, String requestedBeanName,
514+
Class<?> expectedClass) {
515+
516+
return "Could not register Kafka listener endpoint on ["
517+
+ target + "] for bean " + listenerBeanName + ", no '" + expectedClass.getSimpleName() + "' with id '"
518+
+ requestedBeanName + "' was found in the application context";
519+
}
520+
477521
private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
478522
if (propertyStrings.length > 0) {
479523
Properties properties = new Properties();

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,13 +72,19 @@ public class BatchListenerConversionTests {
7272
@Autowired
7373
private Config config;
7474

75+
@Autowired
76+
private Listener listener1;
77+
78+
@Autowired
79+
private Listener listener2;
80+
7581
@Autowired
7682
private KafkaTemplate<Integer, Foo> template;
7783

7884
@Test
7985
public void testBatchOfPojos() throws Exception {
80-
doTest(this.config.listener1(), "blc1");
81-
doTest(this.config.listener2(), "blc2");
86+
doTest(this.listener1, "blc1");
87+
doTest(this.listener2, "blc2");
8288
}
8389

8490
private void doTest(Listener listener, String topic) throws InterruptedException {
@@ -173,13 +179,13 @@ public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {
173179
}
174180

175181
@Bean
176-
public Listener listener1() {
177-
return new Listener("blc1");
182+
public Listener listener1(KafkaListenerContainerFactory<?> cf) {
183+
return new Listener("blc1", cf);
178184
}
179185

180186
@Bean
181-
public Listener listener2() {
182-
return new Listener("blc2");
187+
public Listener listener2(KafkaListenerContainerFactory<?> cf) {
188+
return new Listener("blc2", cf);
183189
}
184190

185191
@Bean
@@ -202,17 +208,25 @@ public static class Listener {
202208

203209
private final CountDownLatch latch2 = new CountDownLatch(1);
204210

211+
private final KafkaListenerContainerFactory<?> cf;
212+
205213
private List<Foo> received;
206214

207215
private List<String> receivedTopics;
208216

209217
private List<Integer> receivedPartitions;
210218

211-
public Listener(String topic) {
219+
public Listener(String topic, KafkaListenerContainerFactory<?> cf) {
212220
this.topic = topic;
221+
this.cf = cf;
222+
}
223+
224+
public KafkaListenerContainerFactory<?> getContainerFactory() {
225+
return this.cf;
213226
}
214227

215-
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group")
228+
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group",
229+
containerFactory = "#{__listener.containerFactory}")
216230
// @SendTo("foo") test WARN log for void return
217231
public void listen1(List<Foo> foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
218232
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions) {

0 commit comments

Comments
 (0)