Skip to content

Commit 42467c8

Browse files
frosiereartembilan
authored andcommitted
GH-3808: ReplyingKafkaTemplate observation on reply
Fixes: #3808 Issue link: #3808 Since `ReplyingKafkaTemplate` is a `BatchMessageListener` for the provided listener container, we cannot rely on the observation from that container. * Implement consumer observation from the `ReplyingKafkaTemplate.BatchMessageListener`. Signed-off-by: Francois Rosiere <[email protected]> [[email protected] Improve commit message] Signed-off-by: Artem Bilan <[email protected]> # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java # spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
1 parent 7d0eea9 commit 42467c8

File tree

3 files changed

+101
-35
lines changed

3 files changed

+101
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* @author Soby Chacko
104104
* @author Gurps Bassi
105105
* @author Valentina Armenise
106+
* @author Francois Rosiere
106107
*/
107108
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
108109
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -456,6 +457,15 @@ public void setObservationConvention(KafkaTemplateObservationConvention observat
456457
this.observationConvention = observationConvention;
457458
}
458459

460+
/**
461+
* Return the {@link ObservationRegistry} used by the template.
462+
* @return the observation registry
463+
* @since 3.2.9
464+
*/
465+
protected ObservationRegistry getObservationRegistry() {
466+
return this.observationRegistry;
467+
}
468+
459469
/**
460470
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
461471
* present.
@@ -520,8 +530,13 @@ private String getAdminBootstrapAddress() {
520530
return removeLeadingAndTrailingBrackets(adminServers);
521531
}
522532

533+
/**
534+
* Return the cluster id, if available.
535+
* @return the cluster id.
536+
* @since 3.2.9
537+
*/
523538
@Nullable
524-
private String clusterId() {
539+
protected String clusterId() {
525540
if (this.kafkaAdmin != null && this.clusterId == null) {
526541
this.clusterIdLock.lock();
527542
try {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.function.Function;
3131

32+
import io.micrometer.observation.Observation;
3233
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.kafka.clients.producer.ProducerRecord;
3435
import org.apache.kafka.common.TopicPartition;
@@ -51,6 +52,8 @@
5152
import org.springframework.kafka.support.KafkaHeaders;
5253
import org.springframework.kafka.support.KafkaUtils;
5354
import org.springframework.kafka.support.TopicPartitionOffset;
55+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
56+
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
5457
import org.springframework.kafka.support.serializer.DeserializationException;
5558
import org.springframework.kafka.support.serializer.SerializationUtils;
5659
import org.springframework.lang.Nullable;
@@ -69,6 +72,7 @@
6972
* @author Gary Russell
7073
* @author Artem Bilan
7174
* @author Borahm Lee
75+
* @author Francois Rosiere
7276
*
7377
* @since 2.1.3
7478
*
@@ -501,39 +505,50 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
501505
@Override
502506
public void onMessage(List<ConsumerRecord<K, R>> data) {
503507
data.forEach(record -> {
504-
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
505-
Object correlationId = null;
506-
if (correlationHeader != null) {
507-
correlationId = this.binaryCorrelation
508-
? new CorrelationKey(correlationHeader.value())
509-
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
510-
}
511-
if (correlationId == null) {
512-
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
513-
+ " - to use request/reply semantics, the responding server must return the correlation id "
514-
+ " in the '" + this.correlationHeaderName + "' header");
508+
ContainerProperties containerProperties = this.replyContainer.getContainerProperties();
509+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
510+
containerProperties.getObservationConvention(),
511+
KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE,
512+
() -> new KafkaRecordReceiverContext(record, this.replyContainer.getListenerId(), containerProperties.getClientId(), this.replyContainer.getGroupId(),
513+
this::clusterId),
514+
getObservationRegistry());
515+
observation.observe(() -> handleReply(record));
516+
});
517+
}
518+
519+
private void handleReply(ConsumerRecord<K, R> record) {
520+
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
521+
Object correlationId = null;
522+
if (correlationHeader != null) {
523+
correlationId = this.binaryCorrelation
524+
? new CorrelationKey(correlationHeader.value())
525+
: new String(correlationHeader.value(), StandardCharsets.UTF_8);
526+
}
527+
if (correlationId == null) {
528+
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
529+
+ " - to use request/reply semantics, the responding server must return the correlation id "
530+
+ " in the '" + this.correlationHeaderName + "' header");
531+
}
532+
else {
533+
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
534+
Object correlationKey = correlationId;
535+
if (future == null) {
536+
logLateArrival(record, correlationId);
515537
}
516538
else {
517-
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
518-
Object correlationKey = correlationId;
519-
if (future == null) {
520-
logLateArrival(record, correlationId);
539+
boolean ok = true;
540+
Exception exception = checkForErrors(record);
541+
if (exception != null) {
542+
ok = false;
543+
future.completeExceptionally(exception);
521544
}
522-
else {
523-
boolean ok = true;
524-
Exception exception = checkForErrors(record);
525-
if (exception != null) {
526-
ok = false;
527-
future.completeExceptionally(exception);
528-
}
529-
if (ok) {
530-
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
531-
+ WITH_CORRELATION_ID + correlationKey);
532-
future.complete(record);
533-
}
545+
if (ok) {
546+
this.logger.debug(() -> "Received: " + KafkaUtils.format(record)
547+
+ WITH_CORRELATION_ID + correlationKey);
548+
future.complete(record);
534549
}
535550
}
536-
});
551+
}
537552
}
538553

539554
/**

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.Deque;
2223
import java.util.List;
@@ -76,13 +77,14 @@
7677
import org.springframework.kafka.core.KafkaTemplate;
7778
import org.springframework.kafka.core.ProducerFactory;
7879
import org.springframework.kafka.listener.MessageListenerContainer;
80+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
7981
import org.springframework.kafka.support.ProducerListener;
8082
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8183
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8284
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8385
import org.springframework.kafka.test.context.EmbeddedKafka;
8486
import org.springframework.kafka.test.utils.KafkaTestUtils;
85-
import org.springframework.lang.Nullable;
87+
import org.springframework.messaging.handler.annotation.SendTo;
8688
import org.springframework.test.annotation.DirtiesContext;
8789
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
8890
import org.springframework.util.StringUtils;
@@ -98,13 +100,15 @@
98100
* @author Wang Zhiyang
99101
* @author Christian Mergenthaler
100102
* @author Soby Chacko
103+
* @author Francois Rosiere
101104
*
102105
* @since 3.0
103106
*/
104107
@SpringJUnitConfig
105108
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
106-
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
107-
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
109+
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
110+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
111+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
108112
@DirtiesContext
109113
public class ObservationTests {
110114

@@ -114,6 +118,10 @@ public class ObservationTests {
114118

115119
public final static String OBSERVATION_TEST_3 = "observation.testT3";
116120

121+
public final static String OBSERVATION_TEST_4 = "observation.testT4";
122+
123+
public final static String OBSERVATION_REPLY = "observation.reply";
124+
117125
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
118126

119127
public final static String OBSERVATION_ERROR = "observation.error";
@@ -356,7 +364,7 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
356364
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
357365
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
358366
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
359-
throws ExecutionException, InterruptedException, TimeoutException {
367+
throws ExecutionException, InterruptedException, TimeoutException {
360368

361369
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
362370
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
@@ -459,6 +467,19 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
459467
tracer.getSpans().clear();
460468
}
461469

470+
@Test
471+
void testReplyingKafkaTemplateObservation(
472+
@Autowired ReplyingKafkaTemplate<Integer, String, String> template,
473+
@Autowired ObservationRegistry observationRegistry) {
474+
assertThat(template.sendAndReceive(new ProducerRecord<>(OBSERVATION_TEST_4, "test"))
475+
// the current observation must be retrieved from the consumer thread of the reply
476+
.thenApply(replyRecord -> observationRegistry.getCurrentObservation().getContext()))
477+
.succeedsWithin(Duration.ofSeconds(30))
478+
.isInstanceOf(KafkaRecordReceiverContext.class)
479+
.extracting("name")
480+
.isEqualTo("spring.kafka.listener");
481+
}
482+
462483
@Configuration
463484
@EnableKafka
464485
public static class Config {
@@ -530,13 +551,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
530551
return template;
531552
}
532553

554+
@Bean
555+
ReplyingKafkaTemplate<Integer, String, String> replyingKafkaTemplate(ProducerFactory<Integer, String> pf, ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
556+
ReplyingKafkaTemplate<Integer, String, String> kafkaTemplate = new ReplyingKafkaTemplate<>(pf, containerFactory.createContainer(OBSERVATION_REPLY));
557+
kafkaTemplate.setObservationEnabled(true);
558+
return kafkaTemplate;
559+
}
560+
533561
@Bean
534562
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
535-
ConsumerFactory<Integer, String> cf) {
563+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry,
564+
KafkaTemplate<Integer, String> kafkaTemplate) {
536565

537566
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
538567
new ConcurrentKafkaListenerContainerFactory<>();
539568
factory.setConsumerFactory(cf);
569+
factory.setReplyTemplate(kafkaTemplate);
540570
factory.getContainerProperties().setObservationEnabled(true);
541571
factory.setContainerCustomizer(container -> {
542572
if (container.getListenerId().equals("obs3")) {
@@ -585,7 +615,7 @@ public List<String> fields() {
585615
// This is called on the producer side when the message is being sent
586616
// Normally we would pass information from tracing context - for tests we don't need to
587617
@Override
588-
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
618+
public <C> void inject(TraceContext context, C carrier, Setter<C> setter) {
589619
setter.set(carrier, "foo", "some foo value");
590620
setter.set(carrier, "bar", "some bar value");
591621

@@ -649,6 +679,12 @@ void listen2(ConsumerRecord<?, ?> in) {
649679
void listen3(ConsumerRecord<Integer, String> in) {
650680
}
651681

682+
@KafkaListener(id = "obsReply", topics = OBSERVATION_TEST_4)
683+
@SendTo // default REPLY_TOPIC header
684+
public String replyListener(ConsumerRecord<Integer, String> in) {
685+
return in.value().toUpperCase();
686+
}
687+
652688
}
653689

654690
public static class ExceptionListener {

0 commit comments

Comments
 (0)