Skip to content

Commit ffa6f78

Browse files
garyrussellartembilan
authored andcommitted
GH-1770: Fix ClientId for @KafkaListener
Resolves #1770 The clientId property, if set on the container factory, is overwritten by an empty String if not defined on the annotation. Check for an empty String (groupId too). **cherry-pick to 2.6.x** * Use simpler JavaUtils method.
1 parent 9ab0e6b commit ffa6f78

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,8 @@ else if (this.autoStartup != null) {
411411
.acceptIfNotNull(this.phase, instance::setPhase)
412412
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
413413
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
414-
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
415-
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
414+
.acceptIfHasText(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
415+
.acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
416416
.acceptIfNotNull(endpoint.getConsumerProperties(),
417417
instance.getContainerProperties()::setKafkaConsumerProperties);
418418
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/ContainerFactoryTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,9 +23,14 @@
2323

2424
import org.junit.jupiter.api.Test;
2525

26+
import org.springframework.kafka.config.AbstractKafkaListenerEndpoint;
2627
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2728
import org.springframework.kafka.core.ConsumerFactory;
2829
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
30+
import org.springframework.kafka.listener.MessageListenerContainer;
31+
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
32+
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
33+
import org.springframework.kafka.support.converter.MessageConverter;
2934
import org.springframework.kafka.test.utils.KafkaTestUtils;
3035

3136
/**
@@ -36,7 +41,7 @@
3641
public class ContainerFactoryTests {
3742

3843
@Test
39-
public void testConfigContainer() {
44+
void testConfigContainer() {
4045
ConcurrentKafkaListenerContainerFactory<String, String> factory =
4146
new ConcurrentKafkaListenerContainerFactory<>();
4247
factory.setAutoStartup(false);
@@ -56,4 +61,33 @@ public void testConfigContainer() {
5661
assertThat(customized).isTrue();
5762
}
5863

64+
@SuppressWarnings("unchecked")
65+
@Test
66+
void clientIdAndGroupIdTransferred() {
67+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
68+
new ConcurrentKafkaListenerContainerFactory<>();
69+
factory.getContainerProperties().setClientId("myClientId");
70+
factory.getContainerProperties().setGroupId("myGroup");
71+
factory.setConsumerFactory(mock(ConsumerFactory.class));
72+
AbstractKafkaListenerEndpoint<String, String> endpoint = new AbstractKafkaListenerEndpoint<String, String>() {
73+
74+
@Override
75+
protected MessagingMessageListenerAdapter<String, String> createMessageListener(
76+
MessageListenerContainer container, MessageConverter messageConverter) {
77+
78+
RecordMessagingMessageListenerAdapter<String, String> adapter =
79+
new RecordMessagingMessageListenerAdapter<String, String>(null, null);
80+
return adapter;
81+
}
82+
83+
};
84+
endpoint.setTopics("test");
85+
endpoint.setClientIdPrefix("");
86+
endpoint.setGroupId("");
87+
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(
88+
endpoint);
89+
assertThat(container.getContainerProperties().getClientId()).isEqualTo("myClientId");
90+
assertThat(container.getContainerProperties().getGroupId()).isEqualTo("myGroup");
91+
}
92+
5993
}

0 commit comments

Comments
 (0)