Skip to content

Commit d815011

Browse files
committed
Implement offset fetch/commit via zookeeper.
* Tried to be consistent with the Scala client. * Support in MultiProcessConsumer not implemented. * Removed check for self.count_since_commit in commit() since we may have manually changed the offsets (e.g. with a seek)
1 parent 9644166 commit d815011

File tree

4 files changed

+157
-68
lines changed

4 files changed

+157
-68
lines changed

kafka/consumer.py

Lines changed: 121 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
from __future__ import absolute_import
22

33
from collections import defaultdict
4+
from copy import copy
45
from itertools import izip_longest, repeat
6+
from kazoo.client import KazooClient
57
import logging
6-
import time
7-
from threading import Lock
88
from multiprocessing import Process, Queue as MPQueue, Event, Value
9+
import os
910
from Queue import Empty, Queue
11+
import time
12+
from threading import Lock
1013

1114
from kafka.common import (
1215
ErrorMapping, FetchRequest,
@@ -30,6 +33,8 @@
3033
ITER_TIMEOUT_SECONDS = 60
3134
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
3235

36+
CONSUMERS_DIR = "consumers"
37+
OFFSETS_DIR = "offsets"
3338

3439
class FetchContext(object):
3540
"""
@@ -71,7 +76,8 @@ class Consumer(object):
7176
"""
7277
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
7378
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
74-
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
79+
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
80+
zk_hosts=None, zk_chroot='/'):
7581

7682
self.client = client
7783
self.topic = topic
@@ -89,34 +95,80 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
8995
self.auto_commit = auto_commit
9096
self.auto_commit_every_n = auto_commit_every_n
9197
self.auto_commit_every_t = auto_commit_every_t
98+
self.partitions_to_commit = set()
99+
100+
for partition in partitions:
101+
self.offsets[partition] = 0
102+
103+
# Zookeeper
104+
self._init_zk(zk_hosts, zk_chroot)
105+
106+
# Uncomment for 0.8.1
107+
# def get_or_init_offset_callback(resp):
108+
# if resp.error == ErrorMapping.NO_ERROR:
109+
# return resp.offset
110+
# elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
111+
# return 0
112+
# else:
113+
# raise Exception("OffsetFetchRequest for topic=%s, "
114+
# "partition=%d failed with errorcode=%s" % (
115+
# resp.topic, resp.partition, resp.error))
116+
#
117+
# for partition in partitions:
118+
# req = OffsetFetchRequest(topic, partition)
119+
# (offset,) = self.client.send_offset_fetch_request(group, [req],
120+
# callback=get_or_init_offset_callback,
121+
# fail_on_error=False)
122+
# self.offsets[partition] = offset
92123

93124
# Set up the auto-commit timer
94125
if auto_commit is True and auto_commit_every_t is not None:
95126
self.commit_timer = ReentrantTimer(auto_commit_every_t,
96127
self.commit)
97128
self.commit_timer.start()
98129

99-
def get_or_init_offset_callback(resp):
100-
if resp.error == ErrorMapping.NO_ERROR:
101-
return resp.offset
102-
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
103-
return 0
104-
else:
105-
raise Exception("OffsetFetchRequest for topic=%s, "
106-
"partition=%d failed with errorcode=%s" % (
107-
resp.topic, resp.partition, resp.error))
130+
def _get_offsets_dir(self):
131+
return os.path.join(self.zk_chroot, CONSUMERS_DIR, self.group,
132+
OFFSETS_DIR, self.topic)
133+
def _init_zk(self, zk_hosts, zk_chroot):
134+
if zk_hosts:
135+
self.zk = KazooClient(",".join(zk_hosts))
136+
self.zk.start()
137+
self.zk_chroot = zk_chroot
138+
self.offset_path = self._get_offsets_dir()
139+
self.zk_offset_counters = {}
140+
self._fetch_offsets()
141+
else:
142+
self.zk = None
143+
self.zk_chroot = None
144+
self.offset_path = None
145+
self.zk_offset_counters = None
108146

109-
# Uncomment for 0.8.1
110-
#
111-
#for partition in partitions:
112-
# req = OffsetFetchRequest(topic, partition)
113-
# (offset,) = self.client.send_offset_fetch_request(group, [req],
114-
# callback=get_or_init_offset_callback,
115-
# fail_on_error=False)
116-
# self.offsets[partition] = offset
147+
def _get_partition_offsets(self, partitions, offset_time):
148+
reqs = []
149+
for partition in partitions:
150+
reqs.append(OffsetRequest(self.topic, partition, offset_time, 1))
151+
return self.client.send_offset_request(reqs)
152+
153+
def _fetch_offsets(self, partitions=None):
154+
if self.zk is None:
155+
raise Exception("Cannot fetch offsets without zookeeper")
156+
157+
if partitions is None:
158+
partitions = self.offsets.keys()
159+
start_offsets = {}
160+
for resp in self._get_partition_offsets(partitions, -2):
161+
start_offsets[resp.partition] = resp.offsets[0]
162+
163+
self.zk_offset_counters = {}
117164

118165
for partition in partitions:
119-
self.offsets[partition] = 0
166+
partition_offset_path = os.path.join(self.offset_path,
167+
str(partition))
168+
counter = self.zk.Counter(partition_offset_path,
169+
default=start_offsets[partition])
170+
self.zk_offset_counters[partition] = counter
171+
self.offsets[partition] = counter.value
120172

121173
def commit(self, partitions=None):
122174
"""
@@ -125,50 +177,60 @@ def commit(self, partitions=None):
125177
partitions: list of partitions to commit, default is to commit
126178
all of them
127179
"""
180+
if self.zk is None:
181+
raise Exception("Cannot commit offsets without zookeeper")
128182

129-
# short circuit if nothing happened. This check is kept outside
130-
# to prevent un-necessarily acquiring a lock for checking the state
131-
if self.count_since_commit == 0:
132-
return
183+
if partitions is None:
184+
partitions = self.offsets.keys()
133185

134186
with self.commit_lock:
135-
# Do this check again, just in case the state has changed
136-
# during the lock acquiring timeout
137-
if self.count_since_commit == 0:
138-
return
139-
140-
reqs = []
141-
if not partitions: # commit all partitions
142-
partitions = self.offsets.keys()
143-
144187
for partition in partitions:
145188
offset = self.offsets[partition]
146-
log.debug("Commit offset %d in SimpleConsumer: "
147-
"group=%s, topic=%s, partition=%s" %
148-
(offset, self.group, self.topic, partition))
149-
150-
reqs.append(OffsetCommitRequest(self.topic, partition,
151-
offset, None))
152-
153-
resps = self.client.send_offset_commit_request(self.group, reqs)
154-
for resp in resps:
155-
assert resp.error == 0
156-
157-
self.count_since_commit = 0
189+
counter = self.zk_offset_counters[partition]
190+
if counter.value == offset:
191+
# Nothing changed for this partition
192+
continue
193+
log.debug("Commit offset %d for consumer group=%s, topic=%s, "
194+
"partition=%s", offset, self.group, self.topic,
195+
partition)
196+
counter += offset - counter.value
197+
self.zk.sync(counter.path)
198+
199+
# Do this instead for kafka 0.8.1
200+
# reqs = []
201+
# if not partitions: # demmit all partitions
202+
# partitions = self.offsets.keys()
203+
#
204+
# for partition in partitions:
205+
# offset = self.offsets[partition]
206+
# log.debug("Commit offset %d in SimpleConsumer: "
207+
# "group=%s, topic=%s, partition=%s" %
208+
# (offset, self.group, self.topic, partition))
209+
#
210+
# reqs.append(OffsetCommitRequest(self.topic, partition,
211+
# offset, None))
212+
#
213+
# resps = self.client.send_offset_commit_request(self.group, reqs)
214+
# for resp in resps:
215+
# assert resp.error == 0
158216

159217
def _auto_commit(self):
160218
"""
161219
Check if we have to commit based on number of messages and commit
162220
"""
163221

164222
# Check if we are supposed to do an auto-commit
165-
if not self.auto_commit or self.auto_commit_every_n is None:
223+
if (self.zk is None or not self.auto_commit or
224+
self.auto_commit_every_n is None):
166225
return
167226

168-
if self.count_since_commit > self.auto_commit_every_n:
227+
if self.count_since_commit >= self.auto_commit_every_n:
169228
self.commit()
229+
self.count_since_commit = 0
170230

171231
def stop(self):
232+
if self.zk:
233+
self.zk.stop()
172234
if self.commit_timer is not None:
173235
self.commit_timer.stop()
174236
self.commit()
@@ -234,7 +296,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
234296
fetch_size_bytes=FETCH_MIN_BYTES,
235297
buffer_size=FETCH_BUFFER_SIZE_BYTES,
236298
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
237-
iter_timeout=None):
299+
iter_timeout=None, zk_hosts=None,
300+
zk_chroot='/'):
238301

239302
if max_buffer_size is not None and buffer_size > max_buffer_size:
240303
raise ValueError("buffer_size (%d) is greater than "
@@ -250,11 +313,11 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
250313
self.queue = Queue()
251314

252315
super(SimpleConsumer, self).__init__(
253-
client, group, topic,
254-
partitions=partitions,
316+
client, group, topic, partitions=partitions,
255317
auto_commit=auto_commit,
256318
auto_commit_every_n=auto_commit_every_n,
257-
auto_commit_every_t=auto_commit_every_t)
319+
auto_commit_every_t=auto_commit_every_t,
320+
zk_hosts=zk_hosts, zk_chroot=zk_chroot)
258321

259322
def provide_partition_info(self):
260323
"""
@@ -272,7 +335,7 @@ def seek(self, offset, whence):
272335
1 is relative to the current offset
273336
2 is relative to the latest known offset (tail)
274337
"""
275-
338+
partitions = self.offsets.keys()
276339
if whence == 1: # relative to current position
277340
for partition, _offset in self.offsets.items():
278341
self.offsets[partition] = _offset + offset
@@ -281,20 +344,15 @@ def seek(self, offset, whence):
281344
# distribute the remained evenly
282345
(delta, rem) = divmod(offset, len(self.offsets))
283346
deltas = {}
284-
for partition, r in izip_longest(self.offsets.keys(),
347+
for partition, r in izip_longest(partitions,
285348
repeat(1, rem), fillvalue=0):
286349
deltas[partition] = delta + r
287350

288-
reqs = []
289-
for partition in self.offsets.keys():
290-
if whence == 0:
291-
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
292-
elif whence == 2:
293-
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
294-
else:
295-
pass
351+
if whence == 0:
352+
resps = self._get_partition_offsets(partitions, -2)
353+
else:
354+
resps = self._get_partition_offsets(partitions, -1)
296355

297-
resps = self.client.send_offset_request(reqs)
298356
for resp in resps:
299357
self.offsets[resp.partition] = \
300358
resp.offsets[0] + deltas[resp.partition]

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
kazoo==1.3.1

test/test_integration.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,13 @@ def tearDownClass(cls): # noqa
587587
cls.server2.close()
588588
cls.zk.close()
589589

590+
def setUp(self):
591+
super(TestConsumer, self).setUp()
592+
self.zk_chroot = self.id()[self.id().rindex(".")+1:]
593+
590594
def test_simple_consumer(self):
595+
group = "group1"
596+
591597
# Produce 100 messages to partition 0
592598
produce1 = ProduceRequest(self.topic, 0, messages=[
593599
create_message("Test message 0 %d" % i) for i in range(100)
@@ -607,9 +613,11 @@ def test_simple_consumer(self):
607613
self.assertEquals(resp.offset, 0)
608614

609615
# Start a consumer
610-
consumer = SimpleConsumer(self.client, "group1",
611-
self.topic, auto_commit=False,
612-
iter_timeout=0)
616+
zk_server = "%s:%d" % (self.zk.host, self.zk.port)
617+
consumer = SimpleConsumer(self.client, group, self.topic,
618+
auto_commit=False, iter_timeout=0,
619+
zk_hosts=[zk_server],
620+
zk_chroot=self.zk_chroot)
613621
all_messages = []
614622
for message in consumer:
615623
all_messages.append(message)
@@ -631,7 +639,27 @@ def test_simple_consumer(self):
631639
all_messages.append(message)
632640

633641
self.assertEquals(len(all_messages), 13)
642+
self.assertEqual(consumer.offsets, {0: 100, 1: 100})
643+
consumer.commit()
644+
consumer.stop()
645+
646+
# Start another consumer with the same group and check the offsets
647+
consumer = SimpleConsumer(self.client, group, self.topic,
648+
auto_commit=False, iter_timeout=0,
649+
zk_hosts=[zk_server],
650+
zk_chroot=self.zk_chroot)
651+
self.assertEqual(consumer.offsets, {0: 100, 1: 100})
652+
consumer.seek(-10, 1)
653+
# Commit offset for a single partition
654+
consumer.commit([1])
655+
consumer.stop()
656+
# Start another consumer with the same group and check the offsets
657+
consumer = SimpleConsumer(self.client, group, self.topic,
658+
auto_commit=False, iter_timeout=0,
659+
zk_hosts=[zk_server],
660+
zk_chroot=self.zk_chroot)
634661

662+
self.assertEqual(consumer.offsets, {0: 100, 1:90})
635663
consumer.stop()
636664

637665
def test_simple_consumer_blocking(self):
@@ -746,7 +774,7 @@ def test_multi_process_consumer(self):
746774
messages = consumer.get_messages(count=10, block=True, timeout=5)
747775
self.assertEqual(len(messages), 5)
748776
diff = (datetime.now() - start).total_seconds()
749-
self.assertGreaterEqual(diff, 5)
777+
self.assertGreaterEqual(diff, 4.9)
750778

751779
consumer.stop()
752780

tox.ini

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
[tox]
22
envlist = py26, py27
33
[testenv]
4-
deps = pytest
4+
deps =
5+
pytest
6+
-rrequirements.txt
57
commands = py.test --basetemp={envtmpdir} []
68
setenv =
79
PROJECT_ROOT = {toxinidir}

0 commit comments

Comments
 (0)