Skip to content

Commit 3574c5c

Browse files
garyrussellartembilan
authored andcommitted
GH-2055: Containers Must Implement DisposableBean
Resolves #2055 If context initialization fails, `Lifecycle.stop()` is not called. Containers must be stopped from `DisposableBean` in this case. **cherry-pick to 2.7.x, 2.6.x, 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java
1 parent 7f51078 commit 3574c5c

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -242,14 +242,7 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint
242242
@Override
243243
public void destroy() {
244244
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
245-
if (listenerContainer instanceof DisposableBean) {
246-
try {
247-
((DisposableBean) listenerContainer).destroy();
248-
}
249-
catch (Exception ex) {
250-
this.logger.warn(ex, "Failed to destroy message listener container");
251-
}
252-
}
245+
listenerContainer.destroy();
253246
}
254247
}
255248

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.MetricName;
2424
import org.apache.kafka.common.TopicPartition;
2525

26+
import org.springframework.beans.factory.DisposableBean;
2627
import org.springframework.context.SmartLifecycle;
2728
import org.springframework.lang.Nullable;
2829

@@ -35,7 +36,7 @@
3536
* @author Vladimir Tsanev
3637
* @author Tomaz Fernandes
3738
*/
38-
public interface MessageListenerContainer extends SmartLifecycle {
39+
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
3940

4041
/**
4142
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
@@ -202,4 +203,9 @@ default String getListenerId() {
202203
default boolean isChildRunning() {
203204
return isRunning();
204205
}
206+
@Override
207+
default void destroy() {
208+
stop();
209+
}
210+
205211
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -648,8 +648,9 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
648648
inOrder.verify(consumer).commitSync(anyMap(), any());
649649
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
650650
inOrder.verify(consumer).commitSync(anyMap(), any());
651-
container.stop();
651+
container.destroy();
652652
assertThat(advised).containsExactly("one", "two", "one", "two");
653+
assertThat(container.isRunning()).isFalse();
653654
}
654655

655656
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)