Skip to content

Commit 3e4d295

Browse files
committed
use leader_epoch from metadata for list_offsets
1 parent 2ec578d commit 3e4d295

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

kafka/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ def leader_for_partition(self, partition):
141141
return None
142142
return self._partitions[partition.topic][partition.partition].leader
143143

144+
def leader_epoch_for_partition(self, partition):
145+
return self._partitions[partition.topic][partition.partition].leader_epoch
146+
144147
def partitions_for_broker(self, broker_id):
145148
"""Return TopicPartitions for which the broker is a leader.
146149

kafka/consumer/fetcher.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,8 @@ def _send_list_offsets_requests(self, timestamps):
549549
return Future().failure(
550550
Errors.LeaderNotAvailableError(partition))
551551
else:
552-
timestamps_by_node[node_id][partition] = timestamp
552+
leader_epoch = self._client.cluster.leader_epoch_for_partition(partition)
553+
timestamps_by_node[node_id][partition] = (timestamp, leader_epoch)
553554

554555
# Aggregate results until we have all
555556
list_offsets_future = Future()
@@ -574,13 +575,12 @@ def on_fail(err):
574575
_f.add_errback(on_fail)
575576
return list_offsets_future
576577

577-
def _send_list_offsets_request(self, node_id, timestamps):
578+
def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
578579
version = self._client.api_version(ListOffsetsRequest, max_version=3)
579580
by_topic = collections.defaultdict(list)
580-
for tp, timestamp in six.iteritems(timestamps):
581+
for tp, (timestamp, leader_epoch) in six.iteritems(timestamps_and_epochs):
581582
if version >= 4:
582-
# TODO: leader_epoch
583-
data = (tp.partition, -1, timestamp)
583+
data = (tp.partition, leader_epoch, timestamp)
584584
elif version >= 1:
585585
data = (tp.partition, timestamp)
586586
else:

0 commit comments

Comments
 (0)