Skip to content

Commit 8cc36dd

Browse files
committed
Store fetched offsets separately.
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall or if _fetch() is called multiple times for some reason. We don't want to re-fetch messages that are already in our queue, so store the offsets of the last enqueued messages from each partition.
1 parent c36cb61 commit 8cc36dd

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

kafka/consumer.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
235235
buffer_size=FETCH_BUFFER_SIZE_BYTES,
236236
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
237237
iter_timeout=None):
238+
super(SimpleConsumer, self).__init__(
239+
client, group, topic,
240+
partitions=partitions,
241+
auto_commit=auto_commit,
242+
auto_commit_every_n=auto_commit_every_n,
243+
auto_commit_every_t=auto_commit_every_t)
238244

239245
if max_buffer_size is not None and buffer_size > max_buffer_size:
240246
raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +251,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
245251
self.partition_info = False # Do not return partition info in msgs
246252
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
247253
self.fetch_min_bytes = fetch_size_bytes
248-
self.fetch_started = defaultdict(bool) # defaults to false
254+
self.fetch_offsets = self.offsets.copy()
249255
self.iter_timeout = iter_timeout
250256
self.queue = Queue()
251257

252-
super(SimpleConsumer, self).__init__(
253-
client, group, topic,
254-
partitions=partitions,
255-
auto_commit=auto_commit,
256-
auto_commit_every_n=auto_commit_every_n,
257-
auto_commit_every_t=auto_commit_every_t)
258-
259258
def provide_partition_info(self):
260259
"""
261260
Indicates that partition info must be returned by the consumer
@@ -301,6 +300,10 @@ def seek(self, offset, whence):
301300
else:
302301
raise ValueError("Unexpected value for `whence`, %d" % whence)
303302

303+
# Reset queue and fetch offsets since they are invalid
304+
self.fetch_offsets = self.offsets.copy()
305+
self.queue = Queue()
306+
304307
def get_messages(self, count=1, block=True, timeout=0.1):
305308
"""
306309
Fetch the specified number of messages
@@ -375,11 +378,11 @@ def __iter__(self):
375378
def _fetch(self):
376379
# Create fetch request payloads for all the partitions
377380
requests = []
378-
partitions = self.offsets.keys()
381+
partitions = self.fetch_offsets.keys()
379382
while partitions:
380383
for partition in partitions:
381384
requests.append(FetchRequest(self.topic, partition,
382-
self.offsets[partition],
385+
self.fetch_offsets[partition],
383386
self.buffer_size))
384387
# Send request
385388
responses = self.client.send_fetch_request(
@@ -394,6 +397,7 @@ def _fetch(self):
394397
for message in resp.messages:
395398
# Put the message in our queue
396399
self.queue.put((partition, message))
400+
self.fetch_offsets[partition] = message.offset + 1
397401
except ConsumerFetchSizeTooSmall, e:
398402
if (self.max_buffer_size is not None and
399403
self.buffer_size == self.max_buffer_size):

0 commit comments

Comments
 (0)