Skip to content

Commit e0f7262

Browse files
committed
Make get_messages() update and commit offsets just before returning
1 parent 8b3793a commit e0f7262

File tree

1 file changed

+35
-16
lines changed

1 file changed

+35
-16
lines changed

kafka/consumer.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
43
from itertools import izip_longest, repeat
54
import logging
65
import time
@@ -318,10 +317,17 @@ def get_messages(self, count=1, block=True, timeout=0.1):
318317
if timeout is not None:
319318
max_time = time.time() + timeout
320319

320+
new_offsets = {}
321321
while count > 0 and (timeout is None or timeout > 0):
322-
message = self.get_message(block, timeout)
323-
if message:
324-
messages.append(message)
322+
result = self._get_message(block, timeout, get_partition_info=True,
323+
update_offset=False)
324+
if result:
325+
partition, message = result
326+
if self.partition_info:
327+
messages.append(result)
328+
else:
329+
messages.append(message)
330+
new_offsets[partition] = message.offset + 1
325331
count -= 1
326332
else:
327333
# Ran out of messages for the last request.
@@ -333,24 +339,35 @@ def get_messages(self, count=1, block=True, timeout=0.1):
333339
# appropriate value
334340
timeout = max_time - time.time()
335341

342+
# Update and commit offsets if necessary
343+
self.offsets.update(new_offsets)
344+
self.count_since_commit += len(messages)
345+
self._auto_commit()
336346
return messages
337347

338-
def get_message(self, block=True, timeout=0.1):
348+
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
349+
return self._get_message(block, timeout, get_partition_info)
350+
351+
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
352+
update_offset=True):
339353
if self.queue.empty():
340354
# We're out of messages, go grab some more.
341355
with FetchContext(self, block, timeout):
342356
self._fetch()
343357
try:
344358
partition, message = self.queue.get_nowait()
345359

346-
# Update partition offset
347-
self.offsets[partition] = message.offset + 1
360+
if update_offset:
361+
# Update partition offset
362+
self.offsets[partition] = message.offset + 1
348363

349-
# Count, check and commit messages if necessary
350-
self.count_since_commit += 1
351-
self._auto_commit()
364+
# Count, check and commit messages if necessary
365+
self.count_since_commit += 1
366+
self._auto_commit()
352367

353-
if self.partition_info:
368+
if get_partition_info is None:
369+
get_partition_info = self.partition_info
370+
if get_partition_info:
354371
return partition, message
355372
else:
356373
return message
@@ -613,6 +630,7 @@ def get_messages(self, count=1, block=True, timeout=10):
613630
if timeout is not None:
614631
max_time = time.time() + timeout
615632

633+
new_offsets = {}
616634
while count > 0 and (timeout is None or timeout > 0):
617635
# Trigger consumption only if the queue is empty
618636
# By doing this, we will ensure that consumers do not
@@ -627,11 +645,7 @@ def get_messages(self, count=1, block=True, timeout=10):
627645
break
628646

629647
messages.append(message)
630-
631-
# Count, check and commit messages if necessary
632-
self.offsets[partition] = message.offset + 1
633-
self.count_since_commit += 1
634-
self._auto_commit()
648+
new_offsets[partition] = message.offset + 1
635649
count -= 1
636650
if timeout is not None:
637651
timeout = max_time - time.time()
@@ -640,4 +654,9 @@ def get_messages(self, count=1, block=True, timeout=10):
640654
self.start.clear()
641655
self.pause.set()
642656

657+
# Update and commit offsets if necessary
658+
self.offsets.update(new_offsets)
659+
self.count_since_commit += len(messages)
660+
self._auto_commit()
661+
643662
return messages

0 commit comments

Comments
 (0)