Skip to content

Commit 4103da4

Browse files
author
Ashish Walia
committed
Don't replay the message from last commited offset
1 parent 5154a5e commit 4103da4

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

kafka/consumer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ def get_or_init_offset_callback(resp):
104104
# (offset,) = self.client.send_offset_fetch_request(group, [req],
105105
# callback=get_or_init_offset_callback,
106106
# fail_on_error=False)
107+
#
108+
# #Increment the offset with 1 so that message corresponding to last
109+
# #committed offset doesn't get replayed
110+
# if offset != 0:
111+
# offset +=1
112+
#
107113
# self.offsets[partition] = offset
108114

109115
for partition in partitions:
@@ -399,6 +405,18 @@ def __iter_partition__(self, partition, offset):
399405
except ConsumerNoMoreData, e:
400406
log.debug("Iteration was ended by %r", e)
401407

408+
# Uncomment for 0.8.1
409+
#
410+
# Decrement the value of offset for the partition from where
411+
# no messages have been consumed. We initially incremented the offset value
412+
# before making fetch request so that message corresponding to the
413+
# last committed offset doesn't get replayed.
414+
# This step ensures that correct value of offset per partition get
415+
# committed if user decides to make an explicit call to commit method.
416+
#
417+
#if self.offsets[partition] == offset and offset != 0:
418+
# self.offsets[partition] -= 1
419+
402420
if next_offset is None:
403421
break
404422
else:

0 commit comments

Comments
 (0)