Skip to content

Commit e24dee5

Browse files
committed
Add leader_epoch to OffsetAndMetadata struct
1 parent d036034 commit e24dee5

File tree

5 files changed

+22
-18
lines changed

5 files changed

+22
-18
lines changed

kafka/admin/client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
13531353
Returns:
13541354
A message future
13551355
"""
1356-
version = self._client.api_version(OffsetFetchRequest, max_version=3)
1356+
version = self._client.api_version(OffsetFetchRequest, max_version=5)
13571357
if version <= 3:
13581358
if partitions is None:
13591359
if version <= 1:
@@ -1386,7 +1386,7 @@ def _list_consumer_group_offsets_process_response(self, response):
13861386
A dictionary composed of TopicPartition keys and
13871387
OffsetAndMetadata values.
13881388
"""
1389-
if response.API_VERSION <= 3:
1389+
if response.API_VERSION <= 5:
13901390

13911391
# OffsetFetchResponse_v1 lacks a top-level error_code
13921392
if response.API_VERSION > 1:
@@ -1401,13 +1401,18 @@ def _list_consumer_group_offsets_process_response(self, response):
14011401
# OffsetAndMetadata values--this is what the Java AdminClient returns
14021402
offsets = {}
14031403
for topic, partitions in response.topics:
1404-
for partition, offset, metadata, error_code in partitions:
1404+
for partition_data in partitions:
1405+
if response.API_VERSION <= 4:
1406+
partition, offset, metadata, error_code = partition_data
1407+
leader_epoch = -1
1408+
else:
1409+
partition, offset, leader_epoch, metadata, error_code = partition_data
14051410
error_type = Errors.for_code(error_code)
14061411
if error_type is not Errors.NoError:
14071412
raise error_type(
14081413
"Unable to fetch consumer group offsets for topic {}, partition {}"
14091414
.format(topic, partition))
1410-
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
1415+
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch)
14111416
else:
14121417
raise NotImplementedError(
14131418
"Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
@@ -1439,7 +1444,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
14391444
14401445
Returns:
14411446
dictionary: A dictionary with TopicPartition keys and
1442-
OffsetAndMetada values. Partitions that are not specified and for
1447+
OffsetAndMetadata values. Partitions that are not specified and for
14431448
which the group_id does not have a recorded offset are omitted. An
14441449
offset value of `-1` indicates the group_id has no offset for that
14451450
TopicPartition. A `-1` can only happen for partitions that are

kafka/coordinator/consumer.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def _send_offset_commit_request(self, offsets):
649649
topic, [(
650650
partition,
651651
offset.offset,
652-
-1, # leader_epoch
652+
offset.leader_epoch,
653653
offset.metadata
654654
) for partition, offset in six.iteritems(partitions)]
655655
) for topic, partitions in six.iteritems(offset_data)]
@@ -815,7 +815,6 @@ def _handle_offset_fetch_response(self, future, response):
815815
else:
816816
metadata, error_code = partition_data[2:]
817817
leader_epoch = -1
818-
# TODO: save leader_epoch!
819818
tp = TopicPartition(topic, partition)
820819
error_type = Errors.for_code(error_code)
821820
if error_type is not Errors.NoError:
@@ -842,7 +841,7 @@ def _handle_offset_fetch_response(self, future, response):
842841
elif offset >= 0:
843842
# record the position with the offset
844843
# (-1 indicates no committed offset to fetch)
845-
offsets[tp] = OffsetAndMetadata(offset, metadata)
844+
offsets[tp] = OffsetAndMetadata(offset, metadata, leader_epoch)
846845
else:
847846
log.debug("Group %s has no committed offset for partition"
848847
" %s", self.group_id, tp)

kafka/structs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@
4848
"""The Kafka offset commit API
4949
5050
The Kafka offset commit API allows users to provide additional metadata
51-
(in the form of a string) when an offset is committed. This can be useful
51+
(in the form of raw bytes) when an offset is committed. This can be useful
5252
(for example) to store information about which node made the commit,
5353
what time the commit was made, etc.
5454
5555
Keyword Arguments:
5656
offset (int): The offset to be committed
57-
metadata (str): Non-null metadata
57+
metadata (bytes): Non-null metadata
58+
leader_epoch (int): The last known epoch from the leader / broker
5859
"""
5960
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
60-
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
61-
["offset", "metadata"])
61+
["offset", "metadata", "leader_epoch"])
6262

6363

6464
"""An offset and timestamp tuple

test/test_coordinator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,13 @@ def test_need_rejoin(coordinator):
230230
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
231231
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
232232
return_value = {
233-
TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
234-
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')})
233+
TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1),
234+
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1)})
235235
coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)])
236236
assert coordinator._subscription.needs_fetch_committed_offsets is True
237237
coordinator.refresh_committed_offsets_if_needed()
238238
assignment = coordinator._subscription.assignment
239-
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'')
239+
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'', -1)
240240
assert TopicPartition('foobar', 1) not in assignment
241241
assert coordinator._subscription.needs_fetch_committed_offsets is False
242242

@@ -303,8 +303,8 @@ def test_close(mocker, coordinator):
303303
@pytest.fixture
304304
def offsets():
305305
return {
306-
TopicPartition('foobar', 0): OffsetAndMetadata(123, b''),
307-
TopicPartition('foobar', 1): OffsetAndMetadata(234, b''),
306+
TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1),
307+
TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1),
308308
}
309309

310310

test/test_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
138138
fetcher._reset_offset.reset_mock()
139139
fetcher._subscriptions.need_offset_reset(partition)
140140
fetcher._subscriptions.assignment[partition].awaiting_reset = False
141-
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'')
141+
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'', -1)
142142
mocker.patch.object(fetcher._subscriptions, 'seek')
143143
fetcher.update_fetch_positions([partition])
144144
assert fetcher._reset_offset.call_count == 0

0 commit comments

Comments
 (0)