Skip to content

support per-record observations in batch listeners #3944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ public enum EOSMode {

private boolean restartAfterAuthExceptions;

private boolean recordObservationsInBatch;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
}

/**
* When true, and a batch listener is configured with observation enabled, an observation
* will be started for each record in the batch.
* @return recordObservationsInBatch.
* @since 4.0
*/
public boolean isRecordObservationsInBatch() {
return this.recordObservationsInBatch;
}

/**
* Set whether to enable individual record observations in a batch.
* When true, and a batch listener is configured with observation enabled, an observation
* will be started for each record in the batch. Default false.
* @param recordObservationsInBatch true to enable individual record observations.
* @since 4.0
*/
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
this.recordObservationsInBatch = recordObservationsInBatch;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand Down Expand Up @@ -1141,6 +1164,7 @@ public String toString() {
? "\n observationRegistry=" + this.observationRegistry
: "")
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
+ "\n]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
this.observationEnabled = false;
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
Expand Down Expand Up @@ -2443,7 +2443,13 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
}
}
Object sample = startMicrometerSample();


try {
if (this.containerProperties.isObservationEnabled() ) {
invokeBatchWithIndividualRecordObservation(recordList);
}

if (this.wantsFullRecords) {
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
this.isAnyManualAck
Expand Down Expand Up @@ -4005,6 +4011,21 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti

}

private void invokeBatchWithIndividualRecordObservation(List<ConsumerRecord<K, V>> recordList) {
// Create individual observations for each record without scopes
for (ConsumerRecord<K, V> record : recordList) {
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
this::clusterId),
this.observationRegistry);
observation.observe(() -> {
logger.debug(() -> "processing record: " + KafkaUtils.format(record));
});
}
}

}


Expand Down
Loading