Skip to content

Commit 441aeb8

Browse files
authored
Avoid consuming duplicate compressed messages from mid-batch (#1367)
1 parent 618c505 commit 441aeb8

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

kafka/consumer/fetcher.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,12 +835,21 @@ def _parse_fetched_data(self, completed_fetch):
835835

836836
return parsed_records
837837

838-
class PartitionRecords(six.Iterator):
838+
class PartitionRecords(object):
839839
def __init__(self, fetch_offset, tp, messages):
840840
self.fetch_offset = fetch_offset
841841
self.topic_partition = tp
842842
self.messages = messages
843-
self.message_idx = 0
843+
# When fetching an offset that is in the middle of a
844+
# compressed batch, we will get all messages in the batch.
845+
# But we want to start 'take' at the fetch_offset
846+
for i, msg in enumerate(messages):
847+
if msg.offset == fetch_offset:
848+
self.message_idx = i
849+
break
850+
else:
851+
self.message_idx = 0
852+
self.messages = None
844853

845854
# For truthiness evaluation we need to define __len__ or __nonzero__
846855
def __len__(self):

test/test_fetcher.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,3 +498,43 @@ def test__parse_fetched_data__out_of_range(fetcher, topic, mocker):
498498
partition_record = fetcher._parse_fetched_data(completed_fetch)
499499
assert partition_record is None
500500
assert fetcher._subscriptions.assignment[tp].awaiting_reset is True
501+
502+
503+
def test_partition_records_offset():
504+
"""Test that compressed messagesets are handle correctly
505+
when fetch offset is in the middle of the message list
506+
"""
507+
batch_start = 120
508+
batch_end = 130
509+
fetch_offset = 123
510+
tp = TopicPartition('foo', 0)
511+
messages = [ConsumerRecord(tp.topic, tp.partition, i,
512+
None, None, 'key', 'value', 'checksum', 0, 0)
513+
for i in range(batch_start, batch_end)]
514+
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
515+
assert len(records) > 0
516+
msgs = records.take(1)
517+
assert msgs[0].offset == 123
518+
assert records.fetch_offset == 124
519+
msgs = records.take(2)
520+
assert len(msgs) == 2
521+
assert len(records) > 0
522+
records.discard()
523+
assert len(records) == 0
524+
525+
526+
def test_partition_records_empty():
527+
records = Fetcher.PartitionRecords(0, None, [])
528+
assert len(records) == 0
529+
530+
531+
def test_partition_records_no_fetch_offset():
532+
batch_start = 0
533+
batch_end = 100
534+
fetch_offset = 123
535+
tp = TopicPartition('foo', 0)
536+
messages = [ConsumerRecord(tp.topic, tp.partition, i,
537+
None, None, 'key', 'value', 'checksum', 0, 0)
538+
for i in range(batch_start, batch_end)]
539+
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
540+
assert len(records) == 0

0 commit comments

Comments
 (0)