Skip to content

Commit b6b4952

Browse files
author
ebartkus
committed
log less messages while waiting for kafka commit on shard end
1 parent fbd6754 commit b6b4952

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,19 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti
211211
if (lastProcessedSeqNo != null && !lastProcessedSeqNo.isEmpty()) {
212212
ShardInfo processorRegister = shardRegister.getOrDefault(this.shardId, null);
213213
if (processorRegister != null) {
214+
int i = 0;
214215
while (!processorRegister.getLastCommittedRecordSeqNo().equals(this.lastProcessedSeqNo)) {
215-
LOGGER.info(
216-
"Shard ended. Waiting for all data table: {} from shard: {} to be committed. " +
217-
"lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}",
218-
tableName,
219-
shardId,
220-
processorRegister.getLastCommittedRecordSeqNo(),
221-
this.lastProcessedSeqNo);
216+
if (i % 20 == 0) {
217+
LOGGER.info(
218+
"Shard ended. Waiting for all data table: {} from shard: {} to be committed. " +
219+
"lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}",
220+
tableName,
221+
shardId,
222+
processorRegister.getLastCommittedRecordSeqNo(),
223+
this.lastProcessedSeqNo);
224+
}
225+
i += 1;
226+
222227
Thread.sleep(500);
223228
}
224229
}

0 commit comments

Comments
 (0)