Skip to content

Commit c36cb61

Browse files
committed
Fix offset increments:
* Increment the offset before returning a message rather than when putting it in the internal queue. This prevents committing the wrong offsets. * In MultiProcessConsumer, store the offset of the next message
1 parent 9644166 commit c36cb61

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

kafka/consumer.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,19 @@ def get_message(self, block=True, timeout=0.1):
338338
with FetchContext(self, block, timeout):
339339
self._fetch()
340340
try:
341-
return self.queue.get_nowait()
341+
partition, message = self.queue.get_nowait()
342+
343+
# Update partition offset
344+
self.offsets[partition] = message.offset + 1
345+
346+
# Count, check and commit messages if necessary
347+
self.count_since_commit += 1
348+
self._auto_commit()
349+
350+
if self.partition_info:
351+
return partition, message
352+
else:
353+
return message
342354
except Empty:
343355
return None
344356

@@ -380,18 +392,8 @@ def _fetch(self):
380392
partition = resp.partition
381393
try:
382394
for message in resp.messages:
383-
# Update partition offset
384-
self.offsets[partition] = message.offset + 1
385-
386-
# Count, check and commit messages if necessary
387-
self.count_since_commit += 1
388-
self._auto_commit()
389-
390395
# Put the message in our queue
391-
if self.partition_info:
392-
self.queue.put((partition, message))
393-
else:
394-
self.queue.put(message)
396+
self.queue.put((partition, message))
395397
except ConsumerFetchSizeTooSmall, e:
396398
if (self.max_buffer_size is not None and
397399
self.buffer_size == self.max_buffer_size):
@@ -577,12 +579,11 @@ def __iter__(self):
577579
break
578580

579581
# Count, check and commit messages if necessary
580-
self.offsets[partition] = message.offset
582+
self.offsets[partition] = message.offset + 1
581583
self.start.clear()
582-
yield message
583-
584584
self.count_since_commit += 1
585585
self._auto_commit()
586+
yield message
586587

587588
self.start.clear()
588589

@@ -624,7 +625,7 @@ def get_messages(self, count=1, block=True, timeout=10):
624625
messages.append(message)
625626

626627
# Count, check and commit messages if necessary
627-
self.offsets[partition] = message.offset
628+
self.offsets[partition] = message.offset + 1
628629
self.count_since_commit += 1
629630
self._auto_commit()
630631
count -= 1

0 commit comments

Comments
 (0)