Skip to content

Commit a341bb1

Browse files
Linus Wallgren88manpreet
authored andcommitted
Describe consumer thread-safety
1 parent d8c2058 commit a341bb1

File tree

4 files changed

+25
-3
lines changed

4 files changed

+25
-3
lines changed

README.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ for more details.
110110
>>> for i in range(1000):
111111
... producer.send('foobar', b'msg %d' % i)
112112

113+
Thread safety
114+
*************
115+
116+
The KafkaProducer can be used across threads without issue, unlike the
117+
KafkaConsumer which cannot.
118+
119+
While it is possible to use the KafkaConsumer in a thread-local manner,
120+
multiprocessing is recommended.
121+
113122
Compression
114123
***********
115124

docs/index.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
109109
... producer.send('foobar', b'msg %d' % i)
110110

111111

112+
Thread safety
113+
*************
114+
115+
The KafkaProducer can be used across threads without issue, unlike the
116+
KafkaConsumer which cannot.
117+
118+
While it is possible to use the KafkaConsumer in a thread-local manner,
119+
multiprocessing is recommended.
120+
121+
112122
Compression
113123
***********
114124

example.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python
22
import threading, logging, time
3+
import multiprocessing
34

45
from kafka import KafkaConsumer, KafkaProducer
56

@@ -16,7 +17,7 @@ def run(self):
1617
time.sleep(1)
1718

1819

19-
class Consumer(threading.Thread):
20+
class Consumer(multiprocessing.Process):
2021
daemon = True
2122

2223
def run(self):
@@ -29,12 +30,12 @@ def run(self):
2930

3031

3132
def main():
32-
threads = [
33+
tasks = [
3334
Producer(),
3435
Consumer()
3536
]
3637

37-
for t in threads:
38+
for t in tasks:
3839
t.start()
3940

4041
time.sleep(10)

kafka/consumer/group.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class KafkaConsumer(six.Iterator):
3333
to allow multiple consumers to load balance consumption of topics (requires
3434
kafka >= 0.9.0.0).
3535
36+
The consumer is not thread safe and should not be shared across threads.
37+
3638
Arguments:
3739
*topics (str): optional list of topics to subscribe to. If not set,
3840
call :meth:`~kafka.KafkaConsumer.subscribe` or

0 commit comments

Comments
 (0)