Skip to content

Commit d34394e

Browse files
garyrussellartembilan
authored andcommitted
GH-2505: Use Correct Brokers for Cluster Id
Resolves #2505 When observation is enabled, we use the context's `KafkaAdmin`, if present, to determine the cluster id. The context's admin may be configured with different bootstrap servers than the producer/consumer (e.g. in Spring Cloud Stream). When looking up the cluster id for observation, use the appropriate bootstrap servers for the component.
1 parent 47aacf2 commit d34394e

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.Future;
3131

3232
import org.apache.commons.logging.LogFactory;
33+
import org.apache.kafka.clients.admin.AdminClientConfig;
3334
import org.apache.kafka.clients.consumer.Consumer;
3435
import org.apache.kafka.clients.consumer.ConsumerConfig;
3536
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -38,6 +39,7 @@
3839
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3940
import org.apache.kafka.clients.producer.Callback;
4041
import org.apache.kafka.clients.producer.Producer;
42+
import org.apache.kafka.clients.producer.ProducerConfig;
4143
import org.apache.kafka.clients.producer.ProducerInterceptor;
4244
import org.apache.kafka.clients.producer.ProducerRecord;
4345
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -426,6 +428,14 @@ public void afterSingletonsInstantiated() {
426428
.getIfUnique(() -> this.observationRegistry);
427429
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
428430
if (this.kafkaAdmin != null) {
431+
Object producerServers = this.producerFactory.getConfigurationProperties()
432+
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
433+
String adminServers = this.kafkaAdmin.getBootstrapServers();
434+
if (!producerServers.equals(adminServers)) {
435+
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
436+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
437+
this.kafkaAdmin = new KafkaAdmin(props);
438+
}
429439
this.clusterId = this.kafkaAdmin.clusterId();
430440
}
431441
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.regex.Pattern;
4747
import java.util.stream.Collectors;
4848

49+
import org.apache.kafka.clients.admin.AdminClientConfig;
4950
import org.apache.kafka.clients.consumer.CommitFailedException;
5051
import org.apache.kafka.clients.consumer.Consumer;
5152
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -784,6 +785,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
784785
@Nullable
785786
private final KafkaAdmin kafkaAdmin;
786787

788+
private final Object bootstrapServers;
789+
787790
private String clusterId;
788791

789792
private Map<TopicPartition, OffsetMetadata> definedPartitions;
@@ -848,6 +851,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
848851
this.containerProperties.getClientId(),
849852
KafkaMessageListenerContainer.this.clientIdSuffix,
850853
consumerProperties);
854+
this.bootstrapServers = determineBootstrapServers(consumerProperties);
851855

852856
this.clientId = determineClientId();
853857
this.transactionTemplate = determineTransactionTemplate();
@@ -921,12 +925,30 @@ else if (listener instanceof MessageListener) {
921925
this.kafkaAdmin = obtainAdmin();
922926
}
923927

928+
@Nullable
929+
private Object determineBootstrapServers(Properties consumerProperties) {
930+
Object servers = consumerProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
931+
if (servers == null) {
932+
servers = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
933+
.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
934+
}
935+
return servers;
936+
}
937+
924938
@Nullable
925939
private KafkaAdmin obtainAdmin() {
926940
if (this.containerProperties.isObservationEnabled()) {
927941
ApplicationContext applicationContext = getApplicationContext();
928942
if (applicationContext != null) {
929-
return applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
943+
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
944+
if (admin != null) {
945+
Map<String, Object> props = new HashMap<>(admin.getConfigurationProperties());
946+
if (!props.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG).equals(this.bootstrapServers)) {
947+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
948+
admin = new KafkaAdmin(props);
949+
}
950+
}
951+
return admin;
930952
}
931953
}
932954
return null;

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.TimeoutException;
3030

31+
import org.apache.kafka.clients.admin.AdminClientConfig;
32+
import org.apache.kafka.clients.consumer.ConsumerConfig;
3133
import org.apache.kafka.clients.consumer.ConsumerRecord;
34+
import org.apache.kafka.clients.producer.ProducerConfig;
3235
import org.apache.kafka.common.header.Headers;
3336
import org.junit.jupiter.api.Test;
3437

@@ -42,6 +45,7 @@
4245
import org.springframework.kafka.core.ConsumerFactory;
4346
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4447
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
48+
import org.springframework.kafka.core.KafkaAdmin;
4549
import org.springframework.kafka.core.KafkaTemplate;
4650
import org.springframework.kafka.core.ProducerFactory;
4751
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
@@ -84,7 +88,8 @@ public class ObservationTests {
8488
@Test
8589
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
8690
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
87-
@Autowired MeterRegistry meterRegistry)
91+
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
92+
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin)
8893
throws InterruptedException, ExecutionException, TimeoutException {
8994

9095
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
@@ -171,21 +176,51 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
171176
.hasTimerWithNameAndTags("spring.kafka.listener",
172177
KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux"))
173178
.hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0"));
179+
assertThat(admin.getConfigurationProperties())
180+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
181+
// producer factory broker different to admin
182+
assertThat(
183+
KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class).getConfigurationProperties())
184+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
185+
broker.getBrokersAsString() + "," + broker.getBrokersAsString());
186+
Object container = KafkaTestUtils
187+
.getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0);
188+
// consumer factory broker different to admin
189+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
190+
.getConfigurationProperties())
191+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
192+
broker.getBrokersAsString() + "," + broker.getBrokersAsString() + ","
193+
+ broker.getBrokersAsString());
194+
// broker override in annotation
195+
container = KafkaTestUtils
196+
.getPropertyValue(endpointRegistry.getListenerContainer("obs2"), "containers", List.class).get(0);
197+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
198+
.getConfigurationProperties())
199+
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
174200
}
175201

176202
@Configuration
177203
@EnableKafka
178204
public static class Config {
179205

206+
@Bean
207+
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
208+
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
209+
}
210+
180211
@Bean
181212
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
182213
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
214+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
215+
+ broker.getBrokersAsString());
183216
return new DefaultKafkaProducerFactory<>(producerProps);
184217
}
185218

186219
@Bean
187220
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
188221
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
222+
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
223+
+ broker.getBrokersAsString() + "," + broker.getBrokersAsString());
189224
return new DefaultKafkaConsumerFactory<>(consumerProps);
190225
}
191226

@@ -288,7 +323,8 @@ void listen1(ConsumerRecord<Integer, String> in) {
288323
this.template.send("observation.testT2", in.value());
289324
}
290325

291-
@KafkaListener(id = "obs2", topics = "observation.testT2")
326+
@KafkaListener(id = "obs2", topics = "observation.testT2",
327+
properties = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":" + "#{@embeddedKafka.brokersAsString}")
292328
void listen2(ConsumerRecord<?, ?> in) {
293329
this.record = in;
294330
this.latch1.countDown();

0 commit comments

Comments
 (0)