Skip to content

Commit 8cc2301

Browse files
committed
GH-1633: Do not stop parent container when fenced
Resolves #1633 Only the child container instance should be stopped when a producer is fenced. Tested with a Boot Application: ```java @SpringBootApplication public class Kgh1633Application { public static void main(String[] args) { SpringApplication.run(Kgh1633Application.class, args); } @Autowired KafkaTemplate<String, String> template; @KafkaListener(id = "kgh1633a", topics = "kgh1633") public void listen(String in) throws InterruptedException { System.out.println(in); this.template.send("kgh1633-1", "foo"); Thread.sleep(10_000); } @bean public NewTopic topic1() { return TopicBuilder.name("kgh1633").partitions(1).replicas(1).build(); } @bean public NewTopic topic2() { return TopicBuilder.name("kgh1633-1").partitions(1).replicas(1).build(); } // @eventlistener // public void started(ConsumerStartedEvent event) { // // temporary until GH-1633 is fixed // KafkaMessageListenerContainer<?, ?> container = (KafkaMessageListenerContainer<?, ?>) event.getSource(); // container.setEmergencyStop(() -> container.stop(() -> { })); // } @eventlistener public void stopped(ConsumerStoppedEvent event) { ((KafkaMessageListenerContainer<?, ?>) event.getSource()).start(); } } @component class Customizer { Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) { factory.getContainerProperties().setStopContainerWhenFenced(true); } } ``` ```properties spring.kafka.producer.transaction-id-prefix=tx- spring.kafka.producer.acks=all spring.kafka.producer.properties.transaction.timeout.ms=5000 ```
1 parent 9d61d73 commit 8cc2301

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,11 @@ public void run() {
10721072
+ "' has been fenced");
10731073
break;
10741074
}
1075-
catch (StopAfterFenceException | Error e) { // NOSONAR - rethrown
1075+
catch (StopAfterFenceException e) {
1076+
this.logger.error(e, "Stopping container due to fencing");
1077+
stop();
1078+
}
1079+
catch (Error e) { // NOSONAR - rethrown
10761080
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
10771081
if (runnable != null) {
10781082
runnable.run();

0 commit comments

Comments
 (0)