Skip to content

Commit d1753de

Browse files
garyrussellartembilan
authored andcommitted
GH-920: Fix Class Tangle
- move `EndpointHandlerMethod` to its own file - remove backward reference from `...Configuration` to its builder - fix docs
1 parent b399f29 commit d1753de

File tree

9 files changed

+226
-174
lines changed

9 files changed

+226
-174
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 101 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ You can also configure the non-blocking retry support by creating `RetryTopicCon
6666
----
6767
@Bean
6868
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
69-
return RetryTopicConfiguration
70-
.builder()
71-
.create(template);
69+
return RetryTopicConfigurationBuilder
70+
.newInstance()
71+
.create(template);
7272
}
7373
----
7474
====
@@ -82,8 +82,8 @@ To achieve more fine-grained control over how to handle non-blocking retrials fo
8282
----
8383
@Bean
8484
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
85-
return RetryTopicConfiguration
86-
.builder()
85+
return RetryTopicConfigurationBuilder
86+
.newInstance()
8787
.fixedBackoff(3000)
8888
.maxAttempts(5)
8989
.includeTopics("my-topic", "my-other-topic")
@@ -92,13 +92,13 @@ return RetryTopicConfiguration
9292
9393
@Bean
9494
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
95-
return RetryTopicConfiguration
96-
.builder()
97-
.exponentialBackoff(1000, 2, 5000)
98-
.maxAttempts(4)
99-
.excludeTopics("my-topic", "my-other-topic")
100-
.retryOn(MyException.class)
101-
.create(template);
95+
return RetryTopicConfigurationBuilder
96+
.newInstance()
97+
.exponentialBackoff(1000, 2, 5000)
98+
.maxAttempts(4)
99+
.excludeTopics("my-topic", "my-other-topic")
100+
.retryOn(MyException.class)
101+
.create(template);
102102
}
103103
----
104104
====
@@ -137,11 +137,12 @@ public void processMessage(MyPojo message) {
137137
----
138138
@Bean
139139
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
140-
return RetryTopicConfiguration
141-
.builder()
142-
.fixedBackoff(3000)
143-
.maxAttempts(4)
144-
.build
140+
return RetryTopicConfigurationBuilder
141+
.newInstance()
142+
.fixedBackoff(3000)
143+
.maxAttempts(4)
144+
.build();
145+
}
145146
----
146147
====
147148

@@ -152,11 +153,12 @@ You can also provide a custom implementation of Spring Retry's `SleepingBackOffP
152153
----
153154
@Bean
154155
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
155-
return RetryTopicConfiguration
156-
.builder()
157-
.customBackOff(new MyCustomBackOffPolicy())
158-
.maxAttempts(5)
159-
.build
156+
return RetryTopicConfigurationBuilder
157+
.newInstance()
158+
.customBackOff(new MyCustomBackOffPolicy())
159+
.maxAttempts(5)
160+
.build();
161+
}
160162
----
161163
====
162164

@@ -185,12 +187,13 @@ public void processMessage(MyPojo message) {
185187
----
186188
@Bean
187189
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
188-
return RetryTopicConfiguration
189-
.builder()
190-
.fixedBackoff(3000)
191-
.maxAttempts(5)
192-
.useSingleTopicForFixedDelays()
193-
.build
190+
return RetryTopicConfigurationBuilder
191+
.newInstance()
192+
.fixedBackoff(3000)
193+
.maxAttempts(5)
194+
.useSingleTopicForFixedDelays()
195+
.build();
196+
}
194197
----
195198
====
196199

@@ -217,11 +220,12 @@ public void processMessage(MyPojo message) {
217220
----
218221
@Bean
219222
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
220-
return RetryTopicConfiguration
221-
.builder()
222-
.fixedBackoff(2000)
223-
.timeoutAfter(5000)
224-
.build
223+
return RetryTopicConfigurationBuilder
224+
.newInstance()
225+
.fixedBackoff(2000)
226+
.timeoutAfter(5000)
227+
.build();
228+
}
225229
----
226230
====
227231

@@ -249,11 +253,11 @@ public void processMessage(MyPojo message) {
249253
----
250254
@Bean
251255
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
252-
return RetryTopicConfiguration
253-
.builder()
254-
.notRetryOn(MyDontRetryException.class)
255-
.create(template);
256-
}
256+
return RetryTopicConfigurationBuilder
257+
.newInstance()
258+
.notRetryOn(MyDontRetryException.class)
259+
.create(template);
260+
}
257261
----
258262
====
259263

@@ -267,21 +271,20 @@ You can decide which topics will and will not be handled by a `RetryTopicConfigu
267271
[source, java]
268272
----
269273
@Bean
270-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
271-
return RetryTopicConfigurer
272-
.builder()
273-
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
274-
.create(template);
274+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
275+
return RetryTopicConfigurationBuilder
276+
.newInstance()
277+
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
278+
.create(template);
275279
}
276280
277281
@Bean
278-
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
279-
return RetryTopicConfigurer
280-
.builder()
281-
.excludeTopic("my-excluded-topic")
282-
.create(template);
282+
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
283+
return RetryTopicConfigurationBuilder
284+
.newInstance()
285+
.excludeTopic("my-excluded-topic")
286+
.create(template);
283287
}
284-
285288
----
286289
====
287290

@@ -313,19 +316,19 @@ public void processMessage(MyPojo message) {
313316
[source, java]
314317
----
315318
@Bean
316-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
317-
return RetryTopicConfigurer
318-
.builder()
319-
.autoCreateTopicsWith(2, 3)
320-
.create(template);
319+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
320+
return RetryTopicConfigurationBuilder
321+
.newInstance()
322+
.autoCreateTopicsWith(2, 3)
323+
.create(template);
321324
}
322325
323326
@Bean
324-
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
325-
return RetryTopicConfigurer
326-
.builder()
327-
.doNotAutoCreateRetryTopics()
328-
.create(template);
327+
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
328+
return RetryTopicConfigurationBuilder
329+
.newInstance()
330+
.doNotAutoCreateRetryTopics()
331+
.create(template);
329332
}
330333
----
331334
====
@@ -363,12 +366,12 @@ public void processMessage(MyPojo message) {
363366
----
364367
@Bean
365368
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
366-
return RetryTopicConfiguration
367-
.builder()
368-
.retryTopicSuffix("-my-retry-suffix")
369-
.dltTopicSuffix("-my-dlt-suffix")
370-
.create(template);
371-
}
369+
return RetryTopicConfigurationBuilder
370+
.newInstance()
371+
.retryTopicSuffix("-my-retry-suffix")
372+
.dltTopicSuffix("-my-dlt-suffix")
373+
.create(template);
374+
}
372375
----
373376
====
374377

@@ -394,11 +397,11 @@ public void processMessage(MyPojo message) {
394397
----
395398
@Bean
396399
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
397-
return RetryTopicConfiguration
398-
.builder()
399-
.suffixTopicsWithIndexValues()
400-
.create(template);
401-
}
400+
return RetryTopicConfigurationBuilder
401+
.newInstance()
402+
.suffixTopicsWithIndexValues()
403+
.create(template);
404+
}
402405
----
403406
====
404407

@@ -438,11 +441,11 @@ If a bean instance of the provided class is found in the application context tha
438441
[source, java]
439442
----
440443
@Bean
441-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
442-
return RetryTopicConfigurer
443-
.builder()
444-
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
445-
.create(template);
444+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
445+
return RetryTopicConfigurationBuilder
446+
.newInstance()
447+
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
448+
.create(template);
446449
}
447450
448451
@Component
@@ -485,12 +488,12 @@ public void processMessage(MyPojo message) {
485488
[source, java]
486489
----
487490
@Bean
488-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
489-
return RetryTopicConfigurer
490-
.builder()
491-
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
492-
.doNotRetryOnDltFailure()
493-
.create(template);
491+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
492+
return RetryTopicConfigurationBuilder
493+
.newInstance()
494+
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
495+
.doNotRetryOnDltFailure()
496+
.create(template);
494497
}
495498
----
496499
====
@@ -517,11 +520,11 @@ public void processMessage(MyPojo message) {
517520
[source, java]
518521
----
519522
@Bean
520-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
521-
return RetryTopicConfigurer
522-
.builder()
523-
.doNotConfigureDlt()
524-
.create(template);
523+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
524+
return RetryTopicConfigurationBuilder
525+
.newInstance()
526+
.doNotConfigureDlt()
527+
.create(template);
525528
}
526529
----
527530
====
@@ -547,19 +550,21 @@ public void processMessage(MyPojo message) {
547550
[source, java]
548551
----
549552
@Bean
550-
public RetryTopicConfigurer myRetryTopic(KafkaTemplate<Integer, MyPojo> template, ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
551-
return RetryTopicConfigurer
552-
.builder()
553-
.listenerFactory(factory)
554-
.create(template);
553+
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
554+
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
555+
556+
return RetryTopicConfigurationBuilder
557+
.newInstance()
558+
.listenerFactory(factory)
559+
.create(template);
555560
}
556561
557562
@Bean
558-
public RetryTopicConfigurer myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
559-
return RetryTopicConfigurer
560-
.builder()
561-
.listenerFactory("my-retry-topic-factory")
562-
.create(template);
563+
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
564+
return RetryTopicConfigurationBuilder
565+
.newInstance()
566+
.listenerFactory("my-retry-topic-factory")
567+
.create(template);
563568
}
564569
----
565570
====

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.springframework.expression.spel.support.StandardEvaluationContext;
3030
import org.springframework.kafka.core.KafkaOperations;
3131
import org.springframework.kafka.listener.adapter.AdapterUtils;
32+
import org.springframework.kafka.retrytopic.EndpointHandlerMethod;
3233
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
34+
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3335
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
3436
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
3537
import org.springframework.retry.annotation.Backoff;
@@ -69,7 +71,7 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
6971
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
7072
Object bean) {
7173

72-
return RetryTopicConfiguration.builder()
74+
return RetryTopicConfigurationBuilder.newInstance()
7375
.maxAttempts(annotation.attempts())
7476
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
7577
.retryTopicSuffix(annotation.retryTopicSuffix())
@@ -140,7 +142,7 @@ private String resolve(String value, BeanFactory beanFactory) {
140142
return value;
141143
}
142144

143-
private RetryTopicConfigurer.EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
145+
private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
144146
Class<?> declaringClass = listenerMethod.getDeclaringClass();
145147
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
146148
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)

0 commit comments

Comments
 (0)