Skip to content

Commit 351e874

Browse files
Igor Macedo Quintanilhaigormq
Igor Macedo Quintanilha
authored andcommitted
support per-record observations in batch listeners
document recordObservationsInBatch container property describe new option in the change history add integration test for per-record observations
1 parent 9674334 commit 351e874

File tree

4 files changed

+436
-18
lines changed

4 files changed

+436
-18
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,23 @@
11
[[history]]
22
= Change History
33

4+
[[what-s-new-in-3-4-since-3-3]]
5+
== What's New in 3.4 Since 3.3
6+
:page-section-summary-toc: 1
7+
8+
This section covers the changes made from version 3.3 to version 3.4.
9+
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
10+
11+
[[x34-record-observations-in-batch]]
12+
=== Individual Record Observations in Batch Processing
13+
14+
A new `ContainerProperties` property `recordObservationsInBatch` has been introduced to enable individual record observations within batch listeners.
15+
When enabled, and a batch listener is configured with observation enabled, an observation will be started for each record in the batch.
16+
This allows for proper distributed tracing of individual records within a batch, even when each record originated from different producers with different trace contexts.
17+
The default value is `false` to maintain backward compatibility.
18+
19+
See xref:kafka/micrometer.adoc#observation[Micrometer Observation] for more details.
20+
421
[[what-s-new-in-3-3-since-3-2]]
522
== What's New in 3.3 Since 3.2
623
:page-section-summary-toc: 1

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ public enum EOSMode {
310310

311311
private boolean restartAfterAuthExceptions;
312312

313+
private boolean recordObservationsInBatch;
314+
313315
/**
314316
* Create properties for a container that will subscribe to the specified topics.
315317
* @param topics the topics.
@@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
10911093
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
10921094
}
10931095

1096+
/**
1097+
* When true, and a batch listener is configured with observation enabled, an observation
1098+
* will be started for each record in the batch.
1099+
* @return recordObservationsInBatch.
1100+
* @since 3.4
1101+
*/
1102+
public boolean isRecordObservationsInBatch() {
1103+
return this.recordObservationsInBatch;
1104+
}
1105+
1106+
/**
1107+
* Set whether to enable individual record observations in a batch.
1108+
* When true, and a batch listener is configured with observation enabled, an observation
1109+
* will be started for each record in the batch. Default false.
1110+
* @param recordObservationsInBatch true to enable individual record observations.
1111+
* @since 3.4
1112+
*/
1113+
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
1114+
this.recordObservationsInBatch = recordObservationsInBatch;
1115+
}
1116+
10941117
@Override
10951118
public String toString() {
10961119
return "ContainerProperties ["
@@ -1141,6 +1164,7 @@ public String toString() {
11411164
? "\n observationRegistry=" + this.observationRegistry
11421165
: "")
11431166
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
1167+
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
11441168
+ "\n]";
11451169
}
11461170

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
898898
this.isBatchListener = true;
899899
this.wantsFullRecords = this.batchListener.wantsPollResult();
900900
this.pollThreadStateProcessor = setUpPollProcessor(true);
901-
this.observationEnabled = false;
901+
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
902902
}
903903
else if (listener instanceof MessageListener) {
904904
this.listener = (MessageListener<K, V>) listener;
@@ -2443,25 +2443,32 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24432443
}
24442444
}
24452445
Object sample = startMicrometerSample();
2446-
try {
2447-
if (this.wantsFullRecords) {
2448-
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
2449-
this.isAnyManualAck
2450-
? new ConsumerBatchAcknowledgment(records, recordList)
2451-
: null,
2452-
this.consumer);
2446+
2447+
// Handle individual record tracing for batch mode if enabled
2448+
if (this.containerProperties.isRecordObservationsInBatch() && this.observationEnabled) {
2449+
invokeBatchWithIndividualRecordTracing(records, recordList, sample);
2450+
}
2451+
else {
2452+
try {
2453+
if (this.wantsFullRecords) {
2454+
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
2455+
this.isAnyManualAck
2456+
? new ConsumerBatchAcknowledgment(records, recordList)
2457+
: null,
2458+
this.consumer);
2459+
}
2460+
else {
2461+
doInvokeBatchOnMessage(records, recordList); // NOSONAR
2462+
}
2463+
batchInterceptAfter(records, null);
2464+
successTimer(sample, null);
24532465
}
2454-
else {
2455-
doInvokeBatchOnMessage(records, recordList); // NOSONAR
2466+
catch (RuntimeException e) {
2467+
this.batchFailed = true;
2468+
failureTimer(sample, null, e);
2469+
batchInterceptAfter(records, e);
2470+
throw e;
24562471
}
2457-
batchInterceptAfter(records, null);
2458-
successTimer(sample, null);
2459-
}
2460-
catch (RuntimeException e) {
2461-
this.batchFailed = true;
2462-
failureTimer(sample, null, e);
2463-
batchInterceptAfter(records, e);
2464-
throw e;
24652472
}
24662473
}
24672474

@@ -4005,6 +4012,69 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
40054012

40064013
}
40074014

4015+
private void invokeBatchWithIndividualRecordTracing(final ConsumerRecords<K, V> records,
4016+
List<ConsumerRecord<K, V>> recordList, @Nullable Object sample) {
4017+
4018+
List<Observation> observations = new ArrayList<>();
4019+
List<Observation.Scope> scopes = new ArrayList<>();
4020+
4021+
try {
4022+
// Create individual observations for each record
4023+
for (ConsumerRecord<K, V> record : recordList) {
4024+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
4025+
this.containerProperties.getObservationConvention(),
4026+
DefaultKafkaListenerObservationConvention.INSTANCE,
4027+
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
4028+
this::clusterId),
4029+
this.observationRegistry);
4030+
observation.start();
4031+
observations.add(observation);
4032+
scopes.add(observation.openScope());
4033+
}
4034+
4035+
// Invoke the batch listener
4036+
if (this.wantsFullRecords) {
4037+
Objects.requireNonNull(this.batchListener).onMessage(records,
4038+
this.isAnyManualAck
4039+
? new ConsumerBatchAcknowledgment(records, recordList)
4040+
: null,
4041+
this.consumer);
4042+
}
4043+
else {
4044+
doInvokeBatchOnMessage(records, recordList);
4045+
}
4046+
4047+
batchInterceptAfter(records, null);
4048+
successTimer(sample, null);
4049+
}
4050+
catch (RuntimeException e) {
4051+
this.batchFailed = true;
4052+
failureTimer(sample, null, e);
4053+
batchInterceptAfter(records, e);
4054+
4055+
// Mark all observations with error
4056+
for (Observation observation : observations) {
4057+
if (!isListenerAdapterObservationAware()) {
4058+
observation.error(e);
4059+
}
4060+
}
4061+
throw e;
4062+
}
4063+
finally {
4064+
// Close scopes in reverse order
4065+
for (int i = scopes.size() - 1; i >= 0; i--) {
4066+
scopes.get(i).close();
4067+
}
4068+
4069+
// Stop observations
4070+
for (Observation observation : observations) {
4071+
if (!isListenerAdapterObservationAware()) {
4072+
observation.stop();
4073+
}
4074+
}
4075+
}
4076+
}
4077+
40084078
}
40094079

40104080

0 commit comments

Comments
 (0)