Skip to content

Commit f9494b3

Browse files
committed
Fix deprecated ConsumerRecords constructor calls and lossy conversion
- Update all ConsumerRecords constructors to use the non-deprecated API by adding Map.of() as the second parameter - Fix lossy conversion in KafkaMessageListenerContainer by adding explicit cast to long when multiplying idleEventInterval by containerProperties.getIdleBeforeDataMultiplier() which is of type double. Signed-off-by: Soby Chacko <[email protected]>
1 parent 8fca3da commit f9494b3

File tree

4 files changed

+11
-11
lines changed

4 files changed

+11
-11
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, D
389389
}
390390
}
391391
while (count < minRecords && remaining > 0);
392-
return new ConsumerRecords<>(records);
392+
return new ConsumerRecords<>(records, Map.of());
393393
}
394394

395395
/**

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested,
772772
consumerRecords.add(one);
773773
}
774774
});
775-
return new ConsumerRecords<>(records);
775+
return new ConsumerRecords<>(records, Map.of());
776776
}
777777
}
778778

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
278278
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
279279
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
280280
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
281-
return new ConsumerRecords<>(remains);
281+
return new ConsumerRecords<>(remains, Map.of());
282282
}
283283
}
284284

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,7 +1669,7 @@ private ConsumerRecords<K, V> doPoll() {
16691669
}
16701670
TopicPartition next = this.batchIterator.next();
16711671
List<ConsumerRecord<K, V>> subBatch = Objects.requireNonNull(this.lastBatch).records(next);
1672-
records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));
1672+
records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch), Map.of());
16731673
if (!this.batchIterator.hasNext()) {
16741674
this.batchIterator = null;
16751675
}
@@ -1911,7 +1911,7 @@ private void checkIdle() {
19111911
long idleEventInterval2 = idleEventInterval;
19121912
long now = System.currentTimeMillis();
19131913
if (!this.receivedSome) {
1914-
idleEventInterval2 *= this.containerProperties.getIdleBeforeDataMultiplier();
1914+
idleEventInterval2 = (long) (idleEventInterval2 * this.containerProperties.getIdleBeforeDataMultiplier());
19151915
}
19161916
if (now > this.lastReceive + idleEventInterval2
19171917
&& now > this.lastAlertAt + idleEventInterval2) {
@@ -2649,7 +2649,7 @@ private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
26492649
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
26502650
}
26512651
if (!remaining.isEmpty()) {
2652-
this.remainingRecords = new ConsumerRecords<>(remaining);
2652+
this.remainingRecords = new ConsumerRecords<>(remaining, Map.of());
26532653
return true;
26542654
}
26552655
}
@@ -2950,7 +2950,7 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedReco
29502950
tp -> new ArrayList<>()).add(cRecord);
29512951
}
29522952
if (!records.isEmpty()) {
2953-
this.remainingRecords = new ConsumerRecords<>(records);
2953+
this.remainingRecords = new ConsumerRecords<>(records, Map.of());
29542954
this.pauseForPending = true;
29552955
}
29562956
}
@@ -2996,7 +2996,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29962996
}
29972997
}
29982998
if (!records.isEmpty()) {
2999-
this.remainingRecords = new ConsumerRecords<>(records);
2999+
this.remainingRecords = new ConsumerRecords<>(records, Map.of());
30003000
this.pauseForPending = true;
30013001
}
30023002
}
@@ -3614,7 +3614,7 @@ public void acknowledge(int index) {
36143614
tp -> new ArrayList<>()).add(record);
36153615
}
36163616
if (!offsetsToCommit.isEmpty()) {
3617-
processAcks(new ConsumerRecords<>(offsetsToCommit));
3617+
processAcks(new ConsumerRecords<>(offsetsToCommit, Map.of()));
36183618
}
36193619
this.partial = index;
36203620
}
@@ -3644,7 +3644,7 @@ public void nack(int index, Duration sleep) {
36443644
newRecords.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
36453645
tp -> new LinkedList<>()).add(cRecord);
36463646
}
3647-
processAcks(new ConsumerRecords<K, V>(newRecords));
3647+
processAcks(new ConsumerRecords<K, V>(newRecords, Map.of()));
36483648
}
36493649

36503650
@Override
@@ -3727,7 +3727,7 @@ private void removeRevocationsFromPending(Collection<TopicPartition> partitions)
37273727
if (!remainingParts.isEmpty()) {
37283728
Map<TopicPartition, List<ConsumerRecord<K, V>>> trimmed = new LinkedHashMap<>();
37293729
remainingParts.forEach(part -> trimmed.computeIfAbsent(part, tp -> remaining.records(tp)));
3730-
ListenerConsumer.this.remainingRecords = new ConsumerRecords<>(trimmed);
3730+
ListenerConsumer.this.remainingRecords = new ConsumerRecords<>(trimmed, Map.of());
37313731
}
37323732
else {
37333733
ListenerConsumer.this.remainingRecords = null;

0 commit comments

Comments
 (0)