Skip to content

Commit 42fef6e

Browse files
authored
GH-1982: Boot 2.6 Compatibility (Retryable Topics)
Resolves #1982 Spring Boot 2.6 disables circular bean references by default. When using `RetryTopicConfigurationBuilder.dltHandlerMethod` to specify the location of the bean handler, a circular reference is caused if the method is in the same bean as the listener (because the `EndpointHandlerMethod` attempts to resolve it while the listener bean is being post processed). Deprecate `RetryTopicConfigurationBuilder.dltHandlerMethod(Class<?> clazz, String methodName)`. As an aside, it is antithetical in Spring to expect a single bean with a type. Replace with `dltHandlerMethod(String beanName, String methodName)`. This allows deferring the bean lookup if a `BeanCurrentlyInCreationException` is thrown. When that happens, return the "endpoint" as the "bean" and a dummy `Method` to satisfy nullable requirements. When the registry detects that the "bean" is an "endpoint", resolve the bean and method at that time and re-populate the `MethodKafkaListenerEndpoint`. Move `EndpointHandlerMethod` to `support` to avoid a package tangle. Add a test to simulate Boot 2.6 default behavior. * Remove deprecated method from docs.
1 parent d2bf746 commit 42fef6e

15 files changed

+269
-94
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,7 @@ public void processMessage(MyPojo message) {
500500
----
501501
====
502502

503-
The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(Class, String) method, passing as arguments the class and method name that should process the DLT's messages.
504-
If a bean instance of the provided class is found in the application context that bean is used for Dlt processing, otherwise an instance is created with full dependency injection support.
503+
The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) method, passing as arguments the bean name and method name that should process the DLT's messages.
505504

506505
====
507506
[source, java]
@@ -510,7 +509,7 @@ If a bean instance of the provided class is found in the application context tha
510509
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
511510
return RetryTopicConfigurationBuilder
512511
.newInstance()
513-
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
512+
.dltProcessor("myCustomDltProcessor", "processDltMessage")
514513
.create(template);
515514
}
516515

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@
3232
import org.springframework.core.annotation.AnnotationUtils;
3333
import org.springframework.expression.spel.support.StandardEvaluationContext;
3434
import org.springframework.kafka.core.KafkaOperations;
35-
import org.springframework.kafka.retrytopic.EndpointHandlerMethod;
3635
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
3736
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3837
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
3938
import org.springframework.kafka.retrytopic.RetryTopicConstants;
4039
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
40+
import org.springframework.kafka.support.EndpointHandlerMethod;
4141
import org.springframework.retry.annotation.Backoff;
4242
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
4343
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.kafka.listener.ContainerGroup;
4343
import org.springframework.kafka.listener.ListenerContainerRegistry;
4444
import org.springframework.kafka.listener.MessageListenerContainer;
45+
import org.springframework.kafka.support.EndpointHandlerMethod;
4546
import org.springframework.lang.Nullable;
4647
import org.springframework.util.Assert;
4748
import org.springframework.util.StringUtils;
@@ -214,6 +215,16 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
214215
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
215216
KafkaListenerContainerFactory<?> factory) {
216217

218+
if (endpoint instanceof MethodKafkaListenerEndpoint) {
219+
MethodKafkaListenerEndpoint<?, ?> mkle = (MethodKafkaListenerEndpoint<?, ?>) endpoint;
220+
Object bean = mkle.getBean();
221+
if (bean instanceof EndpointHandlerMethod) {
222+
EndpointHandlerMethod ehm = (EndpointHandlerMethod) bean;
223+
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());
224+
mkle.setBean(ehm.resolveBean(this.applicationContext));
225+
mkle.setMethod(ehm.getMethod());
226+
}
227+
}
217228
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
218229

219230
if (listenerContainer instanceof InitializingBean) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.springframework.beans.factory.BeanFactory;
2525
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
26+
import org.springframework.kafka.support.EndpointHandlerMethod;
2627
import org.springframework.kafka.support.TopicPartitionOffset;
2728

2829
/**

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointHandlerMethod.java

Lines changed: 0 additions & 87 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020

2121
import org.springframework.kafka.support.AllowDenyCollectionManager;
22+
import org.springframework.kafka.support.EndpointHandlerMethod;
2223

2324
/**
2425
* Contains the provided configuration for the retryable topics.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2525
import org.springframework.kafka.core.KafkaOperations;
2626
import org.springframework.kafka.support.AllowDenyCollectionManager;
27+
import org.springframework.kafka.support.EndpointHandlerMethod;
2728
import org.springframework.lang.Nullable;
2829
import org.springframework.retry.backoff.BackOffPolicy;
2930
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
@@ -80,11 +81,31 @@ public class RetryTopicConfigurationBuilder {
8081
private Boolean autoStartDltHandler;
8182

8283
/* ---------------- DLT Behavior -------------- */
84+
/**
85+
* Configure a DLT handler method.
86+
* @param clazz the class containing the method.
87+
* @param methodName the method name.
88+
* @return the builder.
89+
* @deprecated in favor of {@link #dltHandlerMethod(String, String)}.
90+
*/
91+
@Deprecated
8392
public RetryTopicConfigurationBuilder dltHandlerMethod(Class<?> clazz, String methodName) {
8493
this.dltHandlerMethod = RetryTopicConfigurer.createHandlerMethodWith(clazz, methodName);
8594
return this;
8695
}
8796

97+
/**
98+
* Configure a DLT handler method.
99+
* @param beanName the bean name.
100+
* @param methodName the method name.
101+
* @return the builder.
102+
* @since 2.8
103+
*/
104+
public RetryTopicConfigurationBuilder dltHandlerMethod(String beanName, String methodName) {
105+
this.dltHandlerMethod = RetryTopicConfigurer.createHandlerMethodWith(beanName, methodName);
106+
return this;
107+
}
108+
88109
public RetryTopicConfigurationBuilder dltHandlerMethod(
89110
EndpointHandlerMethod endpointHandlerMethod) {
90111

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
3535
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3636
import org.springframework.kafka.listener.ListenerUtils;
37+
import org.springframework.kafka.support.EndpointHandlerMethod;
3738
import org.springframework.lang.Nullable;
3839

3940

@@ -387,8 +388,8 @@ private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
387388
}
388389
}
389390

390-
public static EndpointHandlerMethod createHandlerMethodWith(Class<?> beanClass, String methodName) {
391-
return new EndpointHandlerMethod(beanClass, methodName);
391+
public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
392+
return new EndpointHandlerMethod(beanOrClass, methodName);
392393
}
393394

394395
public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method method) {
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.Arrays;
21+
22+
import org.springframework.beans.factory.BeanCurrentlyInCreationException;
23+
import org.springframework.beans.factory.BeanFactory;
24+
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
25+
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
26+
import org.springframework.beans.factory.support.RootBeanDefinition;
27+
import org.springframework.util.Assert;
28+
import org.springframework.util.ReflectionUtils;
29+
30+
/**
31+
* Handler method for retrying endpoints.
32+
*
33+
* @author Tomaz Fernandes
34+
* @author Gary Russell
35+
* @since 2.7
36+
*
37+
*/
38+
public class EndpointHandlerMethod {
39+
40+
private final Object beanOrClass;
41+
42+
private final String methodName;
43+
44+
private Object bean;
45+
46+
private Method method;
47+
48+
public EndpointHandlerMethod(Object beanOrClass, String methodName) {
49+
Assert.notNull(beanOrClass, () -> "No destination bean or class provided!");
50+
Assert.notNull(methodName, () -> "No method name for destination bean class provided!");
51+
this.beanOrClass = beanOrClass;
52+
this.methodName = methodName;
53+
}
54+
55+
public EndpointHandlerMethod(Object bean, Method method) {
56+
Assert.notNull(bean, () -> "No bean for destination provided!");
57+
Assert.notNull(method, () -> "No method for destination bean class provided!");
58+
this.method = method;
59+
this.bean = bean;
60+
this.beanOrClass = bean.getClass();
61+
this.methodName = method.getName();
62+
}
63+
64+
/**
65+
* Return the method.
66+
* @return the method.
67+
*/
68+
public Method getMethod() {
69+
if (this.beanOrClass instanceof Class) {
70+
return forClass((Class<?>) this.beanOrClass);
71+
}
72+
Assert.state(this.bean != null, "Bean must be resolved before accessing its method");
73+
if (this.bean instanceof EndpointHandlerMethod) {
74+
try {
75+
return Object.class.getMethod("toString");
76+
}
77+
catch (NoSuchMethodException | SecurityException e) {
78+
}
79+
}
80+
return forClass(this.bean.getClass());
81+
}
82+
83+
/**
84+
* Return the method name.
85+
* @return the name.
86+
* @since 2.8
87+
*/
88+
public String getMethodName() {
89+
Assert.state(this.methodName != null, "Unexpected call to getMethodName()");
90+
return this.methodName;
91+
}
92+
93+
public Object resolveBean(BeanFactory beanFactory) {
94+
if (this.bean instanceof EndpointHandlerMethod) {
95+
return ((EndpointHandlerMethod) this.bean).beanOrClass;
96+
}
97+
if (this.bean == null) {
98+
try {
99+
if (this.beanOrClass instanceof Class) {
100+
Class<?> clazz = (Class<?>) this.beanOrClass;
101+
try {
102+
this.bean = beanFactory.getBean(clazz);
103+
}
104+
catch (NoSuchBeanDefinitionException e) {
105+
String beanName = clazz.getSimpleName() + "-handlerMethod";
106+
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(beanName,
107+
new RootBeanDefinition(clazz));
108+
this.bean = beanFactory.getBean(beanName);
109+
}
110+
}
111+
else {
112+
String beanName = (String) this.beanOrClass;
113+
this.bean = beanFactory.getBean(beanName);
114+
}
115+
}
116+
catch (BeanCurrentlyInCreationException ex) {
117+
this.bean = this;
118+
}
119+
}
120+
return this.bean;
121+
}
122+
123+
private Method forClass(Class<?> clazz) {
124+
if (this.method == null) {
125+
this.method = Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
126+
.filter(mthd -> mthd.getName().equals(this.methodName))
127+
.findFirst()
128+
.orElseThrow(() -> new IllegalArgumentException(
129+
String.format("No method %s in class %s", this.methodName, clazz)));
130+
}
131+
return this.method;
132+
}
133+
134+
}

0 commit comments

Comments
 (0)