@@ -26,21 +26,90 @@ development, APIs are subject to change.
26
26
``` python
27
27
from kafka.client import KafkaClient
28
28
from kafka.consumer import SimpleConsumer
29
- from kafka.producer import SimpleProducer
29
+ from kafka.producer import SimpleProducer, KeyedProducer
30
30
31
31
kafka = KafkaClient(" localhost" , 9092 )
32
32
33
+ # To send messages synchronously
33
34
producer = SimpleProducer(kafka, " my-topic" )
34
35
producer.send_messages(" some message" )
35
36
producer.send_messages(" this method" , " is variadic" )
36
37
38
+ # To send messages asynchronously
39
+ producer = SimpleProducer(kafka, " my-topic" , async = True )
40
+ producer.send_messages(" async message" )
41
+
42
+ # To wait for acknowledgements
43
+ # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
44
+ # a local log before sending response
45
+ # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
46
+ # by all in sync replicas before sending a response
47
+ producer = SimpleProducer(kafka, " my-topic" , async = False ,
48
+ req_acks = SimpleProducer.ACK_AFTER_LOCAL_WRITE ,
49
+ acks_timeout = 2000 )
50
+
51
+ response = producer.send_messages(" async message" )
52
+
53
+ if response:
54
+ print (response[0 ].error)
55
+ print (response[0 ].offset)
56
+
57
+ # To send messages in batch. You can use any of the available
58
+ # producers for doing this. The following producer will collect
59
+ # messages in batch and send them to Kafka after 20 messages are
60
+ # collected or every 60 seconds
61
+ # Notes:
62
+ # * If the producer dies before the messages are sent, there will be losses
63
+ # * Call producer.stop() to send the messages and cleanup
64
+ producer = SimpleProducer(kafka, " my-topic" , batch_send = True ,
65
+ batch_send_every_n = 20 ,
66
+ batch_send_every_t = 60 )
67
+
68
+ # To consume messages
37
69
consumer = SimpleConsumer(kafka, " my-group" , " my-topic" )
38
70
for message in consumer:
39
71
print (message)
40
72
41
73
kafka.close()
42
74
```
43
75
76
+ ## Keyed messages
77
+ ``` python
78
+ from kafka.client import KafkaClient
79
+ from kafka.producer import KeyedProducer
80
+ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
81
+
82
+ kafka = KafkaClient(" localhost" , 9092 )
83
+
84
+ # HashedPartitioner is default
85
+ producer = KeyedProducer(kafka, " my-topic" )
86
+ producer.send(" key1" , " some message" )
87
+ producer.send(" key2" , " this methode" )
88
+
89
+ producer = KeyedProducer(kafka, " my-topic" , partitioner = RoundRobinPartitioner)
90
+ ```
91
+
92
+ ## Multiprocess consumer
93
+ ``` python
94
+ from kafka.client import KafkaClient
95
+ from kafka.consumer import MultiProcessConsumer
96
+
97
+ kafka = KafkaClient(" localhost" , 9092 )
98
+
99
+ # This will split the number of partitions among two processes
100
+ consumer = MultiProcessConsumer(kafka, " my-group" , " my-topic" , num_procs = 2 )
101
+
102
+ # This will spawn processes such that each handles 2 partitions max
103
+ consumer = MultiProcessConsumer(kafka, " my-group" , " my-topic" ,
104
+ partitions_per_proc = 2 )
105
+
106
+ for message in consumer:
107
+ print (message)
108
+
109
+ for message in consumer.get_messages(count = 5 , block = True , timeout = 4 ):
110
+ print (message)
111
+ ```
112
+
44
113
## Low level
45
114
46
115
``` python
@@ -101,16 +170,18 @@ pip install python-snappy
101
170
102
171
# Tests
103
172
104
- Some of the tests will fail if Snappy is not installed. These tests will throw
105
- NotImplementedError. If you see other failures, they might be bugs - so please
106
- report them!
107
-
108
173
## Run the unit tests
109
174
110
175
_ These are broken at the moment_
111
176
112
177
``` shell
113
- python -m test.unit
178
+ tox ./test/test_unit.py
179
+ ```
180
+
181
+ or
182
+
183
+ ``` shell
184
+ python -m test.test_unit
114
185
```
115
186
116
187
## Run the integration tests
@@ -125,11 +196,15 @@ cd kafka-src
125
196
./sbt package
126
197
```
127
198
128
- Next start up a ZooKeeper server on localhost:2181
199
+ And then run the tests. This will actually start up real local Zookeeper
200
+ instance and Kafka brokers, and send messages in using the client.
129
201
130
202
``` shell
131
- /opt/zookeeper/bin/zkServer.sh start
203
+ tox ./test/test_integration.py
132
204
```
133
205
134
- This will actually start up real Kafka brokers and send messages in using the
135
- client.
206
+ or
207
+
208
+ ``` shell
209
+ python -m test.test_integration
210
+ ```
0 commit comments