Skip to content

Commit 9223613

Browse files
garyrussellartembilan
authored andcommitted
ConsumerRecord Logging Metadata Option
1 parent eb3c6a0 commit 9223613

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ ext {
9898
soapVersion = '1.4.0'
9999
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.3.0-M1'
100100
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2020.0.0-M1'
101-
springKafkaVersion = '2.5.3.RELEASE'
101+
springKafkaVersion = '2.5.4.BUILD-SNAPSHOT'
102102
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.4.0-M1'
103103
springRetryVersion = '1.3.0'
104104
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.0-M1'

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5656
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
5757
import org.springframework.kafka.listener.ConsumerProperties;
58+
import org.springframework.kafka.listener.ListenerUtils;
5859
import org.springframework.kafka.listener.LoggingCommitCallback;
5960
import org.springframework.kafka.support.Acknowledgment;
6061
import org.springframework.kafka.support.KafkaHeaders;
@@ -667,6 +668,8 @@ public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Ac
667668

668669
private final boolean isSyncCommits;
669670

671+
private final boolean logOnlyMetadata;
672+
670673
private volatile boolean acknowledged;
671674

672675
private boolean autoAckEnabled = true;
@@ -690,6 +693,7 @@ public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties
690693
consumerProperties != null
691694
? consumerProperties.getCommitLogLevel()
692695
: LogIfLevelEnabled.Level.DEBUG);
696+
this.logOnlyMetadata = consumerProperties.isOnlyLogRecordMetadata();
693697
}
694698

695699
@Override
@@ -739,7 +743,8 @@ private void rollback(ConsumerRecord<K, V> record) {
739743
})
740744
.collect(Collectors.toList());
741745
if (rewound.size() > 0 && this.logger.isWarnEnabled()) {
742-
this.logger.warn("Rolled back " + record + " later in-flight offsets "
746+
this.logger.warn("Rolled back " + ListenerUtils.recordToString(record, this.logOnlyMetadata)
747+
+ " later in-flight offsets "
743748
+ rewound + " will also be re-fetched");
744749
}
745750
}
@@ -749,7 +754,8 @@ private void rollback(ConsumerRecord<K, V> record) {
749754
private void commitIfPossible(ConsumerRecord<K, V> record) {
750755
if (this.ackInfo.isRolledBack()) {
751756
if (this.logger.isWarnEnabled()) {
752-
this.logger.warn("Cannot commit offset for " + record
757+
this.logger.warn("Cannot commit offset for "
758+
+ ListenerUtils.recordToString(record, this.logOnlyMetadata)
753759
+ "; an earlier offset was rolled back");
754760
}
755761
}
@@ -773,13 +779,17 @@ private void commitIfPossible(ConsumerRecord<K, V> record) {
773779
if (toCommit.size() > 0) {
774780
ackInformation = toCommit.get(toCommit.size() - 1);
775781
KafkaAckInfo<K, V> ackInformationToLog = ackInformation;
776-
this.commitLogger.log(() -> "Committing pending offsets for " + record
777-
+ " and all deferred to " + ackInformationToLog.getRecord());
782+
this.commitLogger.log(() -> "Committing pending offsets for "
783+
+ ListenerUtils.recordToString(record, this.logOnlyMetadata)
784+
+ " and all deferred to "
785+
+ ListenerUtils.recordToString(ackInformationToLog.getRecord(),
786+
this.logOnlyMetadata));
778787
candidates.removeAll(toCommit);
779788
}
780789
else {
781790
ackInformation = this.ackInfo;
782-
this.commitLogger.log(() -> "Committing offset for " + record);
791+
this.commitLogger.log(() -> "Committing offset for "
792+
+ ListenerUtils.recordToString(record, this.logOnlyMetadata));
783793
}
784794
}
785795
else { // earlier offsets present

0 commit comments

Comments
 (0)