Skip to content

Commit d094c39

Browse files
garyrussellartembilan
authored andcommitted
GH-1708: Fix New Package Tangles
Resolves #1708 - add a new interface `ListenerContainerRegistry` in the `listener` package - eliminate the `destinationtopic` sub-package - move classes with references to the `annotation` package to `annotation`. - add `package-info.java`
1 parent d963813 commit d094c39

31 files changed

+144
-81
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
7979
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
8080
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
81-
import org.springframework.kafka.retrytopic.RetryTopicConfigurationProvider;
8281
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
8382
import org.springframework.kafka.support.KafkaNull;
8483
import org.springframework.kafka.support.TopicPartitionOffset;
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic;
17+
package org.springframework.kafka.annotation;
1818

1919
import java.lang.reflect.Method;
2020
import java.util.Map;
@@ -25,7 +25,8 @@
2525
import org.springframework.beans.factory.ListableBeanFactory;
2626
import org.springframework.core.annotation.AnnotationUtils;
2727
import org.springframework.core.log.LogAccessor;
28-
import org.springframework.kafka.annotation.RetryableTopic;
28+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
29+
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
2930

3031

3132
/**
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic;
17+
package org.springframework.kafka.annotation;
1818

1919
import java.lang.reflect.Method;
2020
import java.util.Arrays;
@@ -27,10 +27,11 @@
2727
import org.springframework.core.annotation.AnnotationUtils;
2828
import org.springframework.expression.spel.standard.SpelExpressionParser;
2929
import org.springframework.expression.spel.support.StandardEvaluationContext;
30-
import org.springframework.kafka.annotation.DltHandler;
31-
import org.springframework.kafka.annotation.RetryableTopic;
3230
import org.springframework.kafka.core.KafkaOperations;
3331
import org.springframework.kafka.listener.adapter.AdapterUtils;
32+
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
33+
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
34+
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
3435
import org.springframework.retry.annotation.Backoff;
3536
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
3637
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
@@ -42,7 +43,6 @@
4243
import org.springframework.util.StringUtils;
4344

4445
/**
45-
*
4646
* Processes the provided {@link RetryableTopic} annotation
4747
* returning an {@link RetryTopicConfiguration}.
4848
*

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
import org.springframework.context.event.ContextRefreshedEvent;
4040
import org.springframework.core.log.LogAccessor;
4141
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
42+
import org.springframework.kafka.listener.ListenerContainerRegistry;
4243
import org.springframework.kafka.listener.MessageListenerContainer;
4344
import org.springframework.util.Assert;
4445
import org.springframework.util.StringUtils;
@@ -66,8 +67,8 @@
6667
* @see MessageListenerContainer
6768
* @see KafkaListenerContainerFactory
6869
*/
69-
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
70-
ApplicationListener<ContextRefreshedEvent> {
70+
public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,
71+
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
7172

7273
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
7374

@@ -96,6 +97,7 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
9697
* @see KafkaListenerEndpoint#getId()
9798
* @see #getListenerContainerIds()
9899
*/
100+
@Override
99101
public MessageListenerContainer getListenerContainer(String id) {
100102
Assert.hasText(id, "Container identifier must not be empty");
101103
return this.listenerContainers.get(id);
@@ -106,6 +108,7 @@ public MessageListenerContainer getListenerContainer(String id) {
106108
* @return the ids.
107109
* @see #getListenerContainer(String)
108110
*/
111+
@Override
109112
public Set<String> getListenerContainerIds() {
110113
return Collections.unmodifiableSet(this.listenerContainers.keySet());
111114
}
@@ -115,6 +118,7 @@ public Set<String> getListenerContainerIds() {
115118
* @return the managed {@link MessageListenerContainer} instance(s).
116119
* @see #getAllListenerContainers()
117120
*/
121+
@Override
118122
public Collection<MessageListenerContainer> getListenerContainers() {
119123
return Collections.unmodifiableCollection(this.listenerContainers.values());
120124
}
@@ -128,6 +132,7 @@ public Collection<MessageListenerContainer> getListenerContainers() {
128132
* @since 2.2.5
129133
* @see #getListenerContainers()
130134
*/
135+
@Override
131136
public Collection<MessageListenerContainer> getAllListenerContainers() {
132137
List<MessageListenerContainer> containers = new ArrayList<>();
133138
containers.addAll(getListenerContainers());
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Package for kafka configuration
33
*/
4+
@org.springframework.lang.NonNullApi
45
package org.springframework.kafka.config;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.springframework.beans.factory.annotation.Qualifier;
2828
import org.springframework.context.ApplicationListener;
29-
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3029
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
3130

3231
/**
@@ -36,6 +35,7 @@
3635
* again after partition consumption is resumed (or seek it manually by other means).
3736
*
3837
* @author Tomaz Fernandes
38+
* @author Gary Russell
3939
* @since 2.7
4040
* @see SeekToCurrentErrorHandler
4141
*/
@@ -46,14 +46,15 @@ public class KafkaConsumerBackoffManager implements ApplicationListener<Listener
4646
*/
4747
public static final String INTERNAL_BACKOFF_CLOCK_BEAN_NAME = "internalBackOffClock";
4848

49-
private final KafkaListenerEndpointRegistry registry;
49+
private final ListenerContainerRegistry registry;
5050

5151
private final Map<TopicPartition, Context> backOffTimes;
5252

5353
private final Clock clock;
5454

55-
public KafkaConsumerBackoffManager(KafkaListenerEndpointRegistry registry,
55+
public KafkaConsumerBackoffManager(ListenerContainerRegistry registry,
5656
@Qualifier(INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
57+
5758
this.registry = registry;
5859
this.clock = clock;
5960
this.backOffTimes = new HashMap<>();
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collection;
20+
import java.util.Set;
21+
22+
import org.springframework.kafka.config.KafkaListenerEndpoint;
23+
24+
/**
25+
* A registry for listener containers.
26+
*
27+
* @author Gary Russell
28+
* @since 2.7
29+
*
30+
*/
31+
public interface ListenerContainerRegistry {
32+
33+
/**
34+
* Return the {@link MessageListenerContainer} with the specified id or
35+
* {@code null} if no such container exists.
36+
* @param id the id of the container
37+
* @return the container or {@code null} if no container with that id exists
38+
* @see KafkaListenerEndpoint#getId()
39+
* @see #getListenerContainerIds()
40+
*/
41+
MessageListenerContainer getListenerContainer(String id);
42+
43+
/**
44+
* Return the ids of the managed {@link MessageListenerContainer} instance(s).
45+
* @return the ids.
46+
* @see #getListenerContainer(String)
47+
*/
48+
Set<String> getListenerContainerIds();
49+
50+
/**
51+
* Return the managed {@link MessageListenerContainer} instance(s).
52+
* @return the managed {@link MessageListenerContainer} instance(s).
53+
* @see #getAllListenerContainers()
54+
*/
55+
Collection<MessageListenerContainer> getListenerContainers();
56+
57+
/**
58+
* Return all {@link MessageListenerContainer} instances including those managed by
59+
* this registry and those declared as beans in the application context.
60+
* Prototype-scoped containers will be included. Lazy beans that have not yet been
61+
* created will not be initialized by a call to this method.
62+
* @return the {@link MessageListenerContainer} instance(s).
63+
* @see #getListenerContainers()
64+
*/
65+
Collection<MessageListenerContainer> getAllListenerContainers();
66+
67+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
public class KafkaBackoffAwareMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<AcknowledgingConsumerAwareMessageListener<K, V>> implements AcknowledgingConsumerAwareMessageListener<K, V> {
4848

4949
private final String listenerId;
50+
5051
private final String backoffTimestampHeader;
52+
5153
private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
5254

5355
public KafkaBackoffAwareMessageListenerAdapter(AcknowledgingConsumerAwareMessageListener<K, V> delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId, String backoffTimestampHeader) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.springframework.kafka.core.KafkaOperations;
3131
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
3232
import org.springframework.kafka.listener.KafkaBackoffException;
33-
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopic;
34-
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicResolver;
3533
import org.springframework.util.Assert;
3634

3735
/**
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.util.ArrayList;
2020
import java.util.Collection;
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.util.Objects;
2020
import java.util.function.BiPredicate;
2121

2222
import org.springframework.kafka.core.KafkaOperations;
23-
import org.springframework.kafka.retrytopic.DltStrategy;
2423

2524
/**
2625
*
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.time.Clock;
2020
import java.time.Instant;
@@ -25,13 +25,12 @@
2525
import org.springframework.context.ApplicationListener;
2626
import org.springframework.context.event.ContextRefreshedEvent;
2727
import org.springframework.kafka.listener.ListenerExecutionFailedException;
28-
import org.springframework.kafka.retrytopic.RetryTopicConstants;
2928

3029

3130
/**
3231
*
3332
* Contains the destination topics and correlates them with their source via the
34-
* Map&lt;String, {@link org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicResolver.DestinationsHolder}&gt; map.
33+
* Map&lt;String, {@link org.springframework.kafka.retrytopic.DestinationTopicResolver.DestinationsHolder}&gt; map.
3534
*
3635
* Implements the {@link DestinationTopicResolver} interface.
3736
*
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.util.Collection;
2020
import java.util.HashMap;
Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.util.Arrays;
2020
import java.util.List;
@@ -24,11 +24,6 @@
2424

2525
import org.springframework.classify.BinaryExceptionClassifier;
2626
import org.springframework.kafka.core.KafkaOperations;
27-
import org.springframework.kafka.retrytopic.BackOffValuesGenerator;
28-
import org.springframework.kafka.retrytopic.DltStrategy;
29-
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
30-
import org.springframework.kafka.retrytopic.RetryTopicConstants;
31-
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
3227
import org.springframework.retry.backoff.BackOffPolicy;
3328
import org.springframework.util.StringUtils;
3429

@@ -195,9 +190,10 @@ private String joinWithRetrySuffix(long parameter) {
195190
public static class DestinationTopicSuffixes {
196191

197192
private final String retryTopicSuffix;
193+
198194
private final String dltSuffix;
199195

200-
DestinationTopicSuffixes(String retryTopicSuffix, String dltSuffix) {
196+
public DestinationTopicSuffixes(String retryTopicSuffix, String dltSuffix) {
201197
this.retryTopicSuffix = StringUtils.hasText(retryTopicSuffix)
202198
? retryTopicSuffix
203199
: RetryTopicConstants.DEFAULT_RETRY_SUFFIX;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.retrytopic.destinationtopic;
17+
package org.springframework.kafka.retrytopic;
1818

1919
import java.util.Map;
2020

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBootstrapper.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.springframework.context.ApplicationContext;
2626
import org.springframework.context.ConfigurableApplicationContext;
2727
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
28-
import org.springframework.kafka.retrytopic.destinationtopic.DefaultDestinationTopicProcessor;
29-
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicContainer;
3028

3129
/**
3230
*

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.util.List;
2020

21-
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopic;
2221
import org.springframework.kafka.support.AllowDenyCollectionManager;
2322

2423
/**

0 commit comments

Comments
 (0)