Skip to content

Commit 7fc565b

Browse files
authored
Merge branch 'master' into feature/skip_control_batch
2 parents 7e65560 + 18eaa2d commit 7fc565b

File tree

9 files changed

+193
-125
lines changed

9 files changed

+193
-125
lines changed

.github/workflows/python-package.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ jobs:
6868
- "3.12"
6969
- "pypy3.9"
7070
experimental: [ false ]
71-
include:
72-
- python-version: "~3.13.0-0"
73-
experimental: true
7471
steps:
7572
- name: Checkout the source code
7673
uses: actions/checkout@v4
@@ -110,7 +107,7 @@ jobs:
110107
KAFKA_VERSION: ${{ env.KAFKA_LATEST }}
111108

112109
test-kafka:
113-
name: Tests for Kafka ${{ matrix.kafka-version }}
110+
name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }})
114111
needs:
115112
- build-sdist
116113
runs-on: ubuntu-latest
@@ -127,10 +124,17 @@ jobs:
127124
- "2.4.0"
128125
- "2.5.0"
129126
- "2.6.0"
127+
python-version: ['3.12']
130128
experimental: [false]
131129
include:
132130
- kafka-version: '0.8.2.2'
133131
experimental: true
132+
python-version: "3.12"
133+
- kafka-version: '0.8.2.2'
134+
experimental: false
135+
python-version: "3.10"
136+
env:
137+
PYTHON_LATEST: ${{ matrix.python-version }}
134138
continue-on-error: ${{ matrix.experimental }}
135139
steps:
136140
- name: Checkout the source code
@@ -145,7 +149,7 @@ jobs:
145149
- name: Set up Python
146150
uses: actions/setup-python@v5
147151
with:
148-
python-version: ${{ env.PYTHON_LATEST }}
152+
python-version: ${{ matrix.python-version }}
149153
cache: pip
150154
cache-dependency-path: |
151155
requirements-dev.txt

README.rst

Lines changed: 104 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
3232
achieve something similar by manually assigning different partitions to each
3333
consumer instance with config management tools like chef, ansible, etc. This
3434
approach will work fine, though it does not support rebalancing on failures.
35-
See <https://kafka-python-ng.readthedocs.io/en/master/compatibility.html>
35+
36+
See https://kafka-python.readthedocs.io/en/master/compatibility.html
37+
3638
for more details.
3739

3840
Please note that the master branch may contain unreleased features. For release
3941
documentation, please see readthedocs and/or python's inline help.
4042

41-
>>> pip install kafka-python-ng
43+
44+
.. code-block:: bash
45+
46+
$ pip install kafka-python-ng
47+
4248
4349
4450
KafkaConsumer
@@ -48,89 +54,123 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
4854
as possible to the official java client. Full support for coordinated
4955
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.
5056

51-
See <https://kafka-python-ng.readthedocs.io/en/master/apidoc/KafkaConsumer.html>
57+
58+
See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
59+
5260
for API and configuration details.
5361

5462
The consumer iterator returns ConsumerRecords, which are simple namedtuples
5563
that expose basic message attributes: topic, partition, offset, key, and value:
5664

57-
>>> from kafka import KafkaConsumer
58-
>>> consumer = KafkaConsumer('my_favorite_topic')
59-
>>> for msg in consumer:
60-
... print (msg)
65+
.. code-block:: python
6166
62-
>>> # join a consumer group for dynamic partition assignment and offset commits
63-
>>> from kafka import KafkaConsumer
64-
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
65-
>>> for msg in consumer:
66-
... print (msg)
67+
from kafka import KafkaConsumer
68+
consumer = KafkaConsumer('my_favorite_topic')
69+
for msg in consumer:
70+
print (msg)
6771
68-
>>> # manually assign the partition list for the consumer
69-
>>> from kafka import TopicPartition
70-
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
71-
>>> consumer.assign([TopicPartition('foobar', 2)])
72-
>>> msg = next(consumer)
72+
.. code-block:: python
7373
74-
>>> # Deserialize msgpack-encoded values
75-
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
76-
>>> consumer.subscribe(['msgpackfoo'])
77-
>>> for msg in consumer:
78-
... assert isinstance(msg.value, dict)
74+
# join a consumer group for dynamic partition assignment and offset commits
75+
from kafka import KafkaConsumer
76+
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
77+
for msg in consumer:
78+
print (msg)
7979
80-
>>> # Access record headers. The returned value is a list of tuples
81-
>>> # with str, bytes for key and value
82-
>>> for msg in consumer:
83-
... print (msg.headers)
80+
.. code-block:: python
8481
85-
>>> # Get consumer metrics
86-
>>> metrics = consumer.metrics()
82+
# manually assign the partition list for the consumer
83+
from kafka import TopicPartition
84+
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
85+
consumer.assign([TopicPartition('foobar', 2)])
86+
msg = next(consumer)
87+
88+
.. code-block:: python
89+
90+
# Deserialize msgpack-encoded values
91+
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
92+
consumer.subscribe(['msgpackfoo'])
93+
for msg in consumer:
94+
assert isinstance(msg.value, dict)
95+
96+
.. code-block:: python
97+
98+
# Access record headers. The returned value is a list of tuples
99+
# with str, bytes for key and value
100+
for msg in consumer:
101+
print (msg.headers)
102+
103+
.. code-block:: python
104+
105+
# Get consumer metrics
106+
metrics = consumer.metrics()
87107
88108
89109
KafkaProducer
90110
*************
91111

92112
KafkaProducer is a high-level, asynchronous message producer. The class is
93113
intended to operate as similarly as possible to the official java client.
94-
See <https://kafka-python-ng.readthedocs.io/en/master/apidoc/KafkaProducer.html>
114+
115+
See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
116+
95117
for more details.
96118

97-
>>> from kafka import KafkaProducer
98-
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
99-
>>> for _ in range(100):
100-
... producer.send('foobar', b'some_message_bytes')
119+
.. code-block:: python
120+
121+
from kafka import KafkaProducer
122+
producer = KafkaProducer(bootstrap_servers='localhost:1234')
123+
for _ in range(100):
124+
producer.send('foobar', b'some_message_bytes')
125+
126+
.. code-block:: python
127+
128+
# Block until a single message is sent (or timeout)
129+
future = producer.send('foobar', b'another_message')
130+
result = future.get(timeout=60)
131+
132+
.. code-block:: python
133+
134+
# Block until all pending messages are at least put on the network
135+
# NOTE: This does not guarantee delivery or success! It is really
136+
# only useful if you configure internal batching using linger_ms
137+
producer.flush()
138+
139+
.. code-block:: python
101140
102-
>>> # Block until a single message is sent (or timeout)
103-
>>> future = producer.send('foobar', b'another_message')
104-
>>> result = future.get(timeout=60)
141+
# Use a key for hashed-partitioning
142+
producer.send('foobar', key=b'foo', value=b'bar')
105143
106-
>>> # Block until all pending messages are at least put on the network
107-
>>> # NOTE: This does not guarantee delivery or success! It is really
108-
>>> # only useful if you configure internal batching using linger_ms
109-
>>> producer.flush()
144+
.. code-block:: python
110145
111-
>>> # Use a key for hashed-partitioning
112-
>>> producer.send('foobar', key=b'foo', value=b'bar')
146+
# Serialize json messages
147+
import json
148+
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
149+
producer.send('fizzbuzz', {'foo': 'bar'})
113150
114-
>>> # Serialize json messages
115-
>>> import json
116-
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
117-
>>> producer.send('fizzbuzz', {'foo': 'bar'})
151+
.. code-block:: python
118152
119-
>>> # Serialize string keys
120-
>>> producer = KafkaProducer(key_serializer=str.encode)
121-
>>> producer.send('flipflap', key='ping', value=b'1234')
153+
# Serialize string keys
154+
producer = KafkaProducer(key_serializer=str.encode)
155+
producer.send('flipflap', key='ping', value=b'1234')
122156
123-
>>> # Compress messages
124-
>>> producer = KafkaProducer(compression_type='gzip')
125-
>>> for i in range(1000):
126-
... producer.send('foobar', b'msg %d' % i)
157+
.. code-block:: python
127158
128-
>>> # Include record headers. The format is list of tuples with string key
129-
>>> # and bytes value.
130-
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
159+
# Compress messages
160+
producer = KafkaProducer(compression_type='gzip')
161+
for i in range(1000):
162+
producer.send('foobar', b'msg %d' % i)
131163
132-
>>> # Get producer performance metrics
133-
>>> metrics = producer.metrics()
164+
.. code-block:: python
165+
166+
# Include record headers. The format is list of tuples with string key
167+
# and bytes value.
168+
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
169+
170+
.. code-block:: python
171+
172+
# Get producer performance metrics
173+
metrics = producer.metrics()
134174
135175
136176
Thread safety
@@ -154,16 +194,19 @@ kafka-python-ng supports the following compression formats:
154194
- Zstandard (zstd)
155195

156196
gzip is supported natively, the others require installing additional libraries.
157-
See <https://kafka-python-ng.readthedocs.io/en/master/install.html> for more information.
197+
198+
See https://kafka-python.readthedocs.io/en/master/install.html for more information.
199+
158200

159201

160202
Optimized CRC32 Validation
161203
**************************
162204

163205
Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure
164206
python implementation for compatibility. To improve performance for high-throughput
165-
applications, kafka-python-ng will use `crc32c` for optimized native code if installed.
166-
See <https://kafka-python-ng.readthedocs.io/en/master/install.html> for installation instructions.
207+
applications, kafka-python will use `crc32c` for optimized native code if installed.
208+
See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.
209+
167210
See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.
168211

169212

0 commit comments

Comments
 (0)