Skip to content

Commit 2a86b23

Browse files
authored
Optionally return OffsetAndMetadata from consumer.committed(tp) (#1979)
1 parent 1a91a54 commit 2a86b23

File tree

6 files changed

+20
-13
lines changed

6 files changed

+20
-13
lines changed

kafka/consumer/fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def update_fetch_positions(self, partitions):
185185
self._subscriptions.need_offset_reset(tp)
186186
self._reset_offset(tp)
187187
else:
188-
committed = self._subscriptions.assignment[tp].committed
188+
committed = self._subscriptions.assignment[tp].committed.offset
189189
log.debug("Resetting offset for partition %s to the committed"
190190
" offset %s", tp, committed)
191191
self._subscriptions.seek(tp, committed)

kafka/consumer/group.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ def commit(self, offsets=None):
525525
offsets = self._subscription.all_consumed_offsets()
526526
self._coordinator.commit_offsets_sync(offsets)
527527

528-
def committed(self, partition):
528+
def committed(self, partition, metadata=False):
529529
"""Get the last committed offset for the given partition.
530530
531531
This offset will be used as the position for the consumer
@@ -537,9 +537,11 @@ def committed(self, partition):
537537
538538
Arguments:
539539
partition (TopicPartition): The partition to check.
540+
metadata (bool, optional): If True, return OffsetAndMetadata struct
541+
instead of offset int. Default: False.
540542
541543
Returns:
542-
The last committed offset, or None if there was no prior commit.
544+
The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
543545
"""
544546
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
545547
assert self.config['group_id'] is not None, 'Requires group_id'
@@ -553,10 +555,15 @@ def committed(self, partition):
553555
else:
554556
commit_map = self._coordinator.fetch_committed_offsets([partition])
555557
if partition in commit_map:
556-
committed = commit_map[partition].offset
558+
committed = commit_map[partition]
557559
else:
558560
committed = None
559-
return committed
561+
562+
if committed is not None:
563+
if metadata:
564+
return committed
565+
else:
566+
return committed.offset
560567

561568
def _fetch_all_topic_metadata(self):
562569
"""A blocking call that fetches topic metadata for all topics in the

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ def _add_assigned_partition(self, partition):
374374

375375
class TopicPartitionState(object):
376376
def __init__(self):
377-
self.committed = None # last committed position
377+
self.committed = None # last committed OffsetAndMetadata
378378
self.has_valid_position = False # whether we have valid position
379379
self.paused = False # whether this partition has been paused by the user
380380
self.awaiting_reset = False # whether we are awaiting reset

kafka/coordinator/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ def refresh_committed_offsets_if_needed(self):
387387
for partition, offset in six.iteritems(offsets):
388388
# verify assignment is still active
389389
if self._subscription.is_assigned(partition):
390-
self._subscription.assignment[partition].committed = offset.offset
390+
self._subscription.assignment[partition].committed = offset
391391
self._subscription.needs_fetch_committed_offsets = False
392392

393393
def fetch_committed_offsets(self, partitions):
@@ -641,7 +641,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
641641
log.debug("Group %s committed offset %s for partition %s",
642642
self.group_id, offset, tp)
643643
if self._subscription.is_assigned(tp):
644-
self._subscription.assignment[tp].committed = offset.offset
644+
self._subscription.assignment[tp].committed = offset
645645
elif error_type is Errors.GroupAuthorizationFailedError:
646646
log.error("Not authorized to commit offsets for group %s",
647647
self.group_id)
@@ -704,7 +704,7 @@ def _send_offset_fetch_request(self, partitions):
704704
partitions (list of TopicPartition): the partitions to fetch
705705
706706
Returns:
707-
Future: resolves to dict of offsets: {TopicPartition: int}
707+
Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
708708
"""
709709
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
710710
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))

test/test_coordinator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
OffsetCommitRequest, OffsetCommitResponse,
2121
OffsetFetchRequest, OffsetFetchResponse)
2222
from kafka.protocol.metadata import MetadataResponse
23-
from kafka.structs import TopicPartition, OffsetAndMetadata
23+
from kafka.structs import OffsetAndMetadata, TopicPartition
2424
from kafka.util import WeakMethod
2525

2626

@@ -211,7 +211,7 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator):
211211
assert coordinator._subscription.needs_fetch_committed_offsets is True
212212
coordinator.refresh_committed_offsets_if_needed()
213213
assignment = coordinator._subscription.assignment
214-
assert assignment[TopicPartition('foobar', 0)].committed == 123
214+
assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'')
215215
assert TopicPartition('foobar', 1) not in assignment
216216
assert coordinator._subscription.needs_fetch_committed_offsets is False
217217

test/test_fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
UnknownTopicOrPartitionError, OffsetOutOfRangeError
2222
)
2323
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
24-
from kafka.structs import TopicPartition
24+
from kafka.structs import OffsetAndMetadata, TopicPartition
2525

2626

2727
@pytest.fixture
@@ -124,7 +124,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
124124
fetcher._reset_offset.reset_mock()
125125
fetcher._subscriptions.need_offset_reset(partition)
126126
fetcher._subscriptions.assignment[partition].awaiting_reset = False
127-
fetcher._subscriptions.assignment[partition].committed = 123
127+
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'')
128128
mocker.patch.object(fetcher._subscriptions, 'seek')
129129
fetcher.update_fetch_positions([partition])
130130
assert fetcher._reset_offset.call_count == 0

0 commit comments

Comments
 (0)