Skip to content

Commit f465adb

Browse files
authored
Add baseline leader_epoch support for ListOffsets v4 / FetchRequest v10 (#2511)
1 parent ad8d1c4 commit f465adb

File tree

14 files changed

+309
-109
lines changed

14 files changed

+309
-109
lines changed

kafka/admin/client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
13531353
Returns:
13541354
A message future
13551355
"""
1356-
version = self._client.api_version(OffsetFetchRequest, max_version=3)
1356+
version = self._client.api_version(OffsetFetchRequest, max_version=5)
13571357
if version <= 3:
13581358
if partitions is None:
13591359
if version <= 1:
@@ -1386,7 +1386,7 @@ def _list_consumer_group_offsets_process_response(self, response):
13861386
A dictionary composed of TopicPartition keys and
13871387
OffsetAndMetadata values.
13881388
"""
1389-
if response.API_VERSION <= 3:
1389+
if response.API_VERSION <= 5:
13901390

13911391
# OffsetFetchResponse_v1 lacks a top-level error_code
13921392
if response.API_VERSION > 1:
@@ -1401,13 +1401,18 @@ def _list_consumer_group_offsets_process_response(self, response):
14011401
# OffsetAndMetadata values--this is what the Java AdminClient returns
14021402
offsets = {}
14031403
for topic, partitions in response.topics:
1404-
for partition, offset, metadata, error_code in partitions:
1404+
for partition_data in partitions:
1405+
if response.API_VERSION <= 4:
1406+
partition, offset, metadata, error_code = partition_data
1407+
leader_epoch = -1
1408+
else:
1409+
partition, offset, leader_epoch, metadata, error_code = partition_data
14051410
error_type = Errors.for_code(error_code)
14061411
if error_type is not Errors.NoError:
14071412
raise error_type(
14081413
"Unable to fetch consumer group offsets for topic {}, partition {}"
14091414
.format(topic, partition))
1410-
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
1415+
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch)
14111416
else:
14121417
raise NotImplementedError(
14131418
"Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
@@ -1439,7 +1444,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
14391444
14401445
Returns:
14411446
dictionary: A dictionary with TopicPartition keys and
1442-
OffsetAndMetada values. Partitions that are not specified and for
1447+
OffsetAndMetadata values. Partitions that are not specified and for
14431448
which the group_id does not have a recorded offset are omitted. An
14441449
offset value of `-1` indicates the group_id has no offset for that
14451450
TopicPartition. A `-1` can only happen for partitions that are

kafka/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ def leader_for_partition(self, partition):
141141
return None
142142
return self._partitions[partition.topic][partition.partition].leader
143143

144+
def leader_epoch_for_partition(self, partition):
145+
return self._partitions[partition.topic][partition.partition].leader_epoch
146+
144147
def partitions_for_broker(self, broker_id):
145148
"""Return TopicPartitions for which the broker is a leader.
146149

kafka/consumer/fetcher.py

Lines changed: 75 additions & 56 deletions
Large diffs are not rendered by default.

kafka/consumer/group.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1818
from kafka.metrics import MetricConfig, Metrics
1919
from kafka.protocol.list_offsets import OffsetResetStrategy
20-
from kafka.structs import TopicPartition
20+
from kafka.structs import OffsetAndMetadata, TopicPartition
2121
from kafka.version import __version__
2222

2323
log = logging.getLogger(__name__)
@@ -732,16 +732,16 @@ def position(self, partition):
732732
partition (TopicPartition): Partition to check
733733
734734
Returns:
735-
int: Offset
735+
int: Offset or None
736736
"""
737737
if not isinstance(partition, TopicPartition):
738738
raise TypeError('partition must be a TopicPartition namedtuple')
739739
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
740-
offset = self._subscription.assignment[partition].position
741-
if offset is None:
740+
position = self._subscription.assignment[partition].position
741+
if position is None:
742742
self._update_fetch_positions([partition])
743-
offset = self._subscription.assignment[partition].position
744-
return offset
743+
position = self._subscription.assignment[partition].position
744+
return position.offset if position else None
745745

746746
def highwater(self, partition):
747747
"""Last known highwater offset for a partition.
@@ -1144,7 +1144,7 @@ def _message_generator_v2(self):
11441144
log.debug("Not returning fetched records for partition %s"
11451145
" since it is no longer fetchable", tp)
11461146
break
1147-
self._subscription.assignment[tp].position = record.offset + 1
1147+
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1)
11481148
yield record
11491149

11501150
def _message_generator(self):

kafka/consumer/subscription_state.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def all_consumed_offsets(self):
319319
all_consumed = {}
320320
for partition, state in six.iteritems(self.assignment):
321321
if state.has_valid_position:
322-
all_consumed[partition] = OffsetAndMetadata(state.position, '')
322+
all_consumed[partition] = state.position
323323
return all_consumed
324324

325325
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -379,7 +379,7 @@ def __init__(self):
379379
self.paused = False # whether this partition has been paused by the user
380380
self.awaiting_reset = False # whether we are awaiting reset
381381
self.reset_strategy = None # the reset strategy if awaitingReset is set
382-
self._position = None # offset exposed to the user
382+
self._position = None # OffsetAndMetadata exposed to the user
383383
self.highwater = None
384384
self.drop_pending_record_batch = False
385385
# The last message offset hint available from a record batch with
@@ -388,6 +388,7 @@ def __init__(self):
388388

389389
def _set_position(self, offset):
390390
assert self.has_valid_position, 'Valid position required'
391+
assert isinstance(offset, OffsetAndMetadata)
391392
self._position = offset
392393

393394
def _get_position(self):
@@ -403,7 +404,7 @@ def await_reset(self, strategy):
403404
self.has_valid_position = False
404405

405406
def seek(self, offset):
406-
self._position = offset
407+
self._position = OffsetAndMetadata(offset, '', -1)
407408
self.awaiting_reset = False
408409
self.reset_strategy = None
409410
self.has_valid_position = True

kafka/coordinator/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def _send_offset_commit_request(self, offsets):
649649
topic, [(
650650
partition,
651651
offset.offset,
652-
-1, # leader_epoch
652+
offset.leader_epoch,
653653
offset.metadata
654654
) for partition, offset in six.iteritems(partitions)]
655655
) for topic, partitions in six.iteritems(offset_data)]
@@ -809,7 +809,6 @@ def _handle_offset_fetch_response(self, future, response):
809809
else:
810810
metadata, error_code = partition_data[2:]
811811
leader_epoch = -1
812-
# TODO: save leader_epoch!
813812
tp = TopicPartition(topic, partition)
814813
error_type = Errors.for_code(error_code)
815814
if error_type is not Errors.NoError:
@@ -836,7 +835,8 @@ def _handle_offset_fetch_response(self, future, response):
836835
elif offset >= 0:
837836
# record the position with the offset
838837
# (-1 indicates no committed offset to fetch)
839-
offsets[tp] = OffsetAndMetadata(offset, metadata)
838+
# TODO: save leader_epoch
839+
offsets[tp] = OffsetAndMetadata(offset, metadata, -1)
840840
else:
841841
log.debug("Group %s has no committed offset for partition"
842842
" %s", self.group_id, tp)

kafka/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,7 @@ class UnknownLeaderEpochError(BrokerResponseError):
664664
message = 'UNKNOWN_LEADER_EPOCH'
665665
description = 'The leader epoch in the request is newer than the epoch on the broker.'
666666
retriable = True
667+
invalid_metadata = True
667668

668669

669670
class UnsupportedCompressionTypeError(BrokerResponseError):

kafka/protocol/list_offsets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class ListOffsetsRequest_v4(Request):
166166
('topic', String('utf-8')),
167167
('partitions', Array(
168168
('partition', Int32),
169-
('current_leader_epoch', Int64),
169+
('current_leader_epoch', Int32),
170170
('timestamp', Int64)))))
171171
)
172172
DEFAULTS = {
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields
5+
6+
7+
class OffsetForLeaderEpochResponse_v0(Request):
8+
API_KEY = 23
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('topics', Array(
12+
('topic', String('utf-8')),
13+
('partitions', Array(
14+
('error_code', Int16),
15+
('partition', Int32),
16+
('end_offset', Int64))))))
17+
18+
19+
class OffsetForLeaderEpochResponse_v1(Request):
20+
API_KEY = 23
21+
API_VERSION = 1
22+
SCHEMA = Schema(
23+
('topics', Array(
24+
('topic', String('utf-8')),
25+
('partitions', Array(
26+
('error_code', Int16),
27+
('partition', Int32),
28+
('leader_epoch', Int32),
29+
('end_offset', Int64))))))
30+
31+
32+
class OffsetForLeaderEpochResponse_v2(Request):
33+
API_KEY = 23
34+
API_VERSION = 2
35+
SCHEMA = Schema(
36+
('throttle_time_ms', Int32),
37+
('topics', Array(
38+
('topic', String('utf-8')),
39+
('partitions', Array(
40+
('error_code', Int16),
41+
('partition', Int32),
42+
('leader_epoch', Int32),
43+
('end_offset', Int64))))))
44+
45+
46+
class OffsetForLeaderEpochResponse_v3(Request):
47+
API_KEY = 23
48+
API_VERSION = 3
49+
SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA
50+
51+
52+
class OffsetForLeaderEpochResponse_v4(Request):
53+
API_KEY = 23
54+
API_VERSION = 4
55+
SCHEMA = Schema(
56+
('throttle_time_ms', Int32),
57+
('topics', CompactArray(
58+
('topic', CompactString('utf-8')),
59+
('partitions', CompactArray(
60+
('error_code', Int16),
61+
('partition', Int32),
62+
('leader_epoch', Int32),
63+
('end_offset', Int64),
64+
('tags', TaggedFields))),
65+
('tags', TaggedFields))),
66+
('tags', TaggedFields))
67+
68+
69+
class OffsetForLeaderEpochRequest_v0(Request):
70+
API_KEY = 23
71+
API_VERSION = 0
72+
RESPONSE_TYPE = OffsetForLeaderEpochResponse_v0
73+
SCHEMA = Schema(
74+
('topics', Array(
75+
('topic', String('utf-8')),
76+
('partitions', Array(
77+
('partition', Int32),
78+
('leader_epoch', Int32))))))
79+
80+
81+
class OffsetForLeaderEpochRequest_v1(Request):
82+
API_KEY = 23
83+
API_VERSION = 1
84+
RESPONSE_TYPE = OffsetForLeaderEpochResponse_v1
85+
SCHEMA = OffsetForLeaderEpochRequest_v0.SCHEMA
86+
87+
88+
class OffsetForLeaderEpochRequest_v2(Request):
89+
API_KEY = 23
90+
API_VERSION = 2
91+
RESPONSE_TYPE = OffsetForLeaderEpochResponse_v2
92+
SCHEMA = Schema(
93+
('topics', Array(
94+
('topic', String('utf-8')),
95+
('partitions', Array(
96+
('partition', Int32),
97+
('current_leader_epoch', Int32),
98+
('leader_epoch', Int32))))))
99+
100+
101+
class OffsetForLeaderEpochRequest_v3(Request):
102+
API_KEY = 23
103+
API_VERSION = 3
104+
RESPONSE_TYPE = OffsetForLeaderEpochResponse_v3
105+
SCHEMA = Schema(
106+
('replica_id', Int32),
107+
('topics', Array(
108+
('topic', String('utf-8')),
109+
('partitions', Array(
110+
('partition', Int32),
111+
('current_leader_epoch', Int32),
112+
('leader_epoch', Int32))))))
113+
114+
115+
class OffsetForLeaderEpochRequest_v4(Request):
116+
API_KEY = 23
117+
API_VERSION = 4
118+
RESPONSE_TYPE = OffsetForLeaderEpochResponse_v4
119+
SCHEMA = Schema(
120+
('replica_id', Int32),
121+
('topics', CompactArray(
122+
('topic', CompactString('utf-8')),
123+
('partitions', CompactArray(
124+
('partition', Int32),
125+
('current_leader_epoch', Int32),
126+
('leader_epoch', Int32),
127+
('tags', TaggedFields))),
128+
('tags', TaggedFields))),
129+
('tags', TaggedFields))
130+
131+
OffsetForLeaderEpochRequest = [
132+
OffsetForLeaderEpochRequest_v0, OffsetForLeaderEpochRequest_v1,
133+
OffsetForLeaderEpochRequest_v2, OffsetForLeaderEpochRequest_v3,
134+
OffsetForLeaderEpochRequest_v4,
135+
]
136+
OffsetForLeaderEpochResponse = [
137+
OffsetForLeaderEpochResponse_v0, OffsetForLeaderEpochResponse_v1,
138+
OffsetForLeaderEpochResponse_v2, OffsetForLeaderEpochResponse_v3,
139+
OffsetForLeaderEpochResponse_v4,
140+
]

kafka/record/default_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def __init__(self, buffer):
136136
def base_offset(self):
137137
return self._header_data[0]
138138

139+
@property
140+
def leader_epoch(self):
141+
return self._header_data[2]
142+
139143
@property
140144
def magic(self):
141145
return self._header_data[3]

kafka/structs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,21 @@
5555
Keyword Arguments:
5656
offset (int): The offset to be committed
5757
metadata (str): Non-null metadata
58+
leader_epoch (int): The last known epoch from the leader / broker
5859
"""
5960
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
60-
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
61-
["offset", "metadata"])
61+
["offset", "metadata", "leader_epoch"])
6262

6363

6464
"""An offset and timestamp tuple
6565
6666
Keyword Arguments:
6767
offset (int): An offset
6868
timestamp (int): The timestamp associated to the offset
69+
leader_epoch (int): The last known epoch from the leader / broker
6970
"""
7071
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
71-
["offset", "timestamp"])
72+
["offset", "timestamp", "leader_epoch"])
7273

7374
MemberInformation = namedtuple("MemberInformation",
7475
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])

test/test_consumer_integration.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import time
33

4-
from mock import patch
4+
from mock import patch, ANY
55
import pytest
66
from kafka.vendor.six.moves import range
77

@@ -258,9 +258,10 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro
258258
tp1: send_time
259259
})
260260

261+
leader_epoch = ANY if env_kafka_version() >= (2, 1) else -1
261262
assert offsets == {
262-
tp0: OffsetAndTimestamp(p0msg.offset, send_time),
263-
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
263+
tp0: OffsetAndTimestamp(p0msg.offset, send_time, leader_epoch),
264+
tp1: OffsetAndTimestamp(p1msg.offset, send_time, leader_epoch)
264265
}
265266

266267
offsets = consumer.beginning_offsets([tp0, tp1])

0 commit comments

Comments
 (0)