Skip to content

Fix offsets #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 28, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 70 additions & 39 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import absolute_import

from collections import defaultdict
from itertools import izip_longest, repeat
import logging
import time
Expand Down Expand Up @@ -235,6 +234,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than "
Expand All @@ -245,17 +250,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.queue = Queue()

super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
Expand Down Expand Up @@ -301,6 +299,10 @@ def seek(self, offset, whence):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)

# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
self.queue = Queue()

def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
Expand All @@ -312,33 +314,69 @@ def get_messages(self, count=1, block=True, timeout=0.1):
it will block forever.
"""
messages = []
if timeout:
if timeout is not None:
max_time = time.time() + timeout

new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
message = self.get_message(block, timeout)
if message:
messages.append(message)
result = self._get_message(block, timeout, get_partition_info=True,
update_offset=False)
if result:
partition, message = result
if self.partition_info:
messages.append(result)
else:
messages.append(message)
new_offsets[partition] = message.offset + 1
count -= 1
else:
# Ran out of messages for the last request.
if not block:
# If we're not blocking, break.
break
if timeout:
if timeout is not None:
# If we're blocking and have a timeout, reduce it to the
# appropriate value
timeout = max_time - time.time()

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages

def get_message(self, block=True, timeout=0.1):
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
return self._get_message(block, timeout, get_partition_info)

def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
if self.queue.empty():
# We're out of messages, go grab some more.
with FetchContext(self, block, timeout):
self._fetch()
try:
return self.queue.get_nowait()
partition, message = self.queue.get_nowait()

if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1

# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()

if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
else:
return message
except Empty:
return None

Expand All @@ -363,11 +401,11 @@ def __iter__(self):
def _fetch(self):
# Create fetch request payloads for all the partitions
requests = []
partitions = self.offsets.keys()
partitions = self.fetch_offsets.keys()
while partitions:
for partition in partitions:
requests.append(FetchRequest(self.topic, partition,
self.offsets[partition],
self.fetch_offsets[partition],
self.buffer_size))
# Send request
responses = self.client.send_fetch_request(
Expand All @@ -380,18 +418,9 @@ def _fetch(self):
partition = resp.partition
try:
for message in resp.messages:
# Update partition offset
self.offsets[partition] = message.offset + 1

# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()

# Put the message in our queue
if self.partition_info:
self.queue.put((partition, message))
else:
self.queue.put(message)
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
Expand Down Expand Up @@ -577,12 +606,11 @@ def __iter__(self):
break

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.offsets[partition] = message.offset + 1
self.start.clear()
yield message

self.count_since_commit += 1
self._auto_commit()
yield message

self.start.clear()

Expand All @@ -605,9 +633,10 @@ def get_messages(self, count=1, block=True, timeout=10):
self.size.value = count
self.pause.clear()

if timeout:
if timeout is not None:
max_time = time.time() + timeout

new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
Expand All @@ -622,16 +651,18 @@ def get_messages(self, count=1, block=True, timeout=10):
break

messages.append(message)

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.count_since_commit += 1
self._auto_commit()
new_offsets[partition] = message.offset + 1
count -= 1
timeout = max_time - time.time()
if timeout is not None:
timeout = max_time - time.time()

self.size.value = 0
self.start.clear()
self.pause.set()

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()

return messages