Skip to content

Commit 248cd70

Browse files
garyrussellartembilan
authored andcommitted
GH-2055: Containers Must Implement DisposableBean
Resolves #2055 If context initialization fails, `Lifecycle.stop()` is not called. Containers must be stopped from `DisposableBean` in this case. **cherry-pick to 2.7.x, 2.6.x, 2.5.x**
1 parent 5e4e6b4 commit 248cd70

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -253,14 +253,7 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint
253253
@Override
254254
public void destroy() {
255255
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
256-
if (listenerContainer instanceof DisposableBean) {
257-
try {
258-
((DisposableBean) listenerContainer).destroy();
259-
}
260-
catch (Exception ex) {
261-
this.logger.warn(ex, "Failed to destroy message listener container");
262-
}
263-
}
256+
listenerContainer.destroy();
264257
}
265258
}
266259

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.MetricName;
2424
import org.apache.kafka.common.TopicPartition;
2525

26+
import org.springframework.beans.factory.DisposableBean;
2627
import org.springframework.context.SmartLifecycle;
2728
import org.springframework.lang.Nullable;
2829

@@ -35,7 +36,7 @@
3536
* @author Vladimir Tsanev
3637
* @author Tomaz Fernandes
3738
*/
38-
public interface MessageListenerContainer extends SmartLifecycle {
39+
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
3940

4041
/**
4142
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
@@ -225,4 +226,9 @@ default void stopAbnormally(Runnable callback) {
225226
stop(callback);
226227
}
227228

229+
@Override
230+
default void destroy() {
231+
stop();
232+
}
233+
228234
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -650,8 +650,9 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
650650
inOrder.verify(consumer).commitSync(anyMap(), any());
651651
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
652652
inOrder.verify(consumer).commitSync(anyMap(), any());
653-
container.stop();
653+
container.destroy();
654654
assertThat(advised).containsExactly("one", "two", "one", "two");
655+
assertThat(container.isRunning()).isFalse();
655656
}
656657

657658
@Test

0 commit comments

Comments
 (0)