Skip to content

Commit f31b57c

Browse files
authored
Merge pull request #1 from dpkp/master
Update fork
2 parents f9e0264 + e485a6e commit f31b57c

34 files changed

+664
-98
lines changed

.travis.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ python:
66
- 2.7
77
- 3.4
88
- 3.7
9+
- 3.8
910
- pypy2.7-6.0
1011

1112
env:
@@ -15,11 +16,13 @@ env:
1516
- KAFKA_VERSION=0.11.0.3
1617
- KAFKA_VERSION=1.1.1
1718
- KAFKA_VERSION=2.4.0
19+
- KAFKA_VERSION=2.5.0
1820

1921
addons:
2022
apt:
2123
packages:
2224
- libsnappy-dev
25+
- libzstd-dev
2326
- openjdk-8-jdk
2427

2528
cache:

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ Kafka Python client
22
------------------------
33

44
.. image:: https://img.shields.io/badge/kafka-2.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
5-
:target: https://kafka-python.readthedocs.io/compatibility.html
5+
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
66
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
77
:target: https://pypi.python.org/pypi/kafka-python
88
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
File renamed without changes.

build_integration.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
3+
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
44
: ${SCALA_VERSION:=2.11}
55
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
66
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
@@ -33,12 +33,14 @@ pushd servers
3333
echo "-------------------------------------"
3434
echo "Checking kafka binaries for ${kafka}"
3535
echo
36-
# kafka 0.8.0 is only available w/ scala 2.8.0
3736
if [ "$kafka" == "0.8.0" ]; then
3837
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
38+
else if [ "$kafka" \> "2.4.0" ]; then
39+
KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
3940
else
4041
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
4142
fi
43+
fi
4244
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
4345
if [ -f "${KAFKA_ARTIFACT}" ]; then
4446
echo "Using cached artifact: ${KAFKA_ARTIFACT}"

docs/compatibility.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ Although kafka-python is tested and expected to work on recent broker versions,
1616
not all features are supported. Specifically, authentication codecs, and
1717
transactional producer/consumer support are not fully implemented. PRs welcome!
1818

19-
kafka-python is tested on python 2.7, 3.4, 3.7, and pypy2.7.
19+
kafka-python is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7.
2020

2121
Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python

docs/conf.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# All configuration values have a default; values that are commented out
1313
# serve to show the default.
1414

15-
import sys
1615
import os
1716

1817
# If extensions (or modules to document with autodoc) are in another directory,

docs/index.rst

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,12 @@ multiprocessing is recommended.
122122
Compression
123123
***********
124124

125-
kafka-python supports gzip compression/decompression natively. To produce or
126-
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
127-
To enable snappy, install python-snappy (also requires snappy library).
128-
See `Installation <install.html#optional-snappy-install>`_ for more information.
125+
kafka-python supports multiple compression types:
129126

127+
- gzip : supported natively
128+
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
129+
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
130+
- zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed
130131

131132
Protocol
132133
********

example.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
#!/usr/bin/env python
2-
import threading, logging, time
3-
import multiprocessing
2+
import threading, time
43

5-
from kafka import KafkaConsumer, KafkaProducer
4+
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
5+
from kafka.admin import NewTopic
66

77

88
class Producer(threading.Thread):
99
def __init__(self):
1010
threading.Thread.__init__(self)
1111
self.stop_event = threading.Event()
12-
12+
1313
def stop(self):
1414
self.stop_event.set()
1515

@@ -23,14 +23,15 @@ def run(self):
2323

2424
producer.close()
2525

26-
class Consumer(multiprocessing.Process):
26+
27+
class Consumer(threading.Thread):
2728
def __init__(self):
28-
multiprocessing.Process.__init__(self)
29-
self.stop_event = multiprocessing.Event()
30-
29+
threading.Thread.__init__(self)
30+
self.stop_event = threading.Event()
31+
3132
def stop(self):
3233
self.stop_event.set()
33-
34+
3435
def run(self):
3536
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
3637
auto_offset_reset='earliest',
@@ -44,29 +45,38 @@ def run(self):
4445
break
4546

4647
consumer.close()
47-
48-
48+
49+
4950
def main():
51+
# Create 'my-topic' Kafka topic
52+
try:
53+
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
54+
55+
topic = NewTopic(name='my-topic',
56+
num_partitions=1,
57+
replication_factor=1)
58+
admin.create_topics([topic])
59+
except Exception:
60+
pass
61+
5062
tasks = [
5163
Producer(),
5264
Consumer()
5365
]
5466

67+
# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
5568
for t in tasks:
5669
t.start()
5770

5871
time.sleep(10)
59-
72+
73+
# Stop threads
6074
for task in tasks:
6175
task.stop()
6276

6377
for task in tasks:
6478
task.join()
65-
66-
79+
80+
6781
if __name__ == "__main__":
68-
logging.basicConfig(
69-
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
70-
level=logging.INFO
71-
)
7282
main()

kafka/admin/client.py

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
from __future__ import absolute_import
22

3-
from collections import defaultdict
3+
from collections import defaultdict, namedtuple
44
import copy
55
import logging
66
import socket
77

88
from . import ConfigResourceType
99
from kafka.vendor import six
1010

11+
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
12+
ACLResourcePatternType
1113
from kafka.client_async import KafkaClient, selectors
14+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
1215
import kafka.errors as Errors
1316
from kafka.errors import (
1417
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
@@ -19,9 +22,8 @@
1922
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
2023
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2124
from kafka.protocol.metadata import MetadataRequest
22-
from kafka.structs import TopicPartition, OffsetAndMetadata
23-
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
24-
ACLResourcePatternType
25+
from kafka.protocol.types import Array
26+
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
2527
from kafka.version import __version__
2628

2729

@@ -204,7 +206,7 @@ def __init__(self, **configs):
204206
self._client = KafkaClient(metrics=self._metrics,
205207
metric_group_prefix='admin',
206208
**self.config)
207-
self._client.check_version()
209+
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
208210

209211
# Get auto-discovered version from client if necessary
210212
if self.config['api_version'] is None:
@@ -271,7 +273,7 @@ def _refresh_controller_id(self):
271273
response = future.value
272274
controller_id = response.controller_id
273275
# verify the controller is new enough to support our requests
274-
controller_version = self._client.check_version(controller_id)
276+
controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
275277
if controller_version < (0, 10, 0):
276278
raise IncompatibleBrokerVersion(
277279
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
@@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
10001002
"""Process a DescribeGroupsResponse into a group description."""
10011003
if response.API_VERSION <= 3:
10021004
assert len(response.groups) == 1
1003-
# TODO need to implement converting the response tuple into
1004-
# a more accessible interface like a namedtuple and then stop
1005-
# hardcoding tuple indices here. Several Java examples,
1006-
# including KafkaAdminClient.java
1007-
group_description = response.groups[0]
1008-
error_code = group_description[0]
1005+
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1006+
if isinstance(response_field, Array):
1007+
described_groups = response.__dict__[response_name]
1008+
described_groups_field_schema = response_field.array_of
1009+
described_group = response.__dict__[response_name][0]
1010+
described_group_information_list = []
1011+
protocol_type_is_consumer = False
1012+
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1013+
if group_information_name == 'protocol_type':
1014+
protocol_type = described_group_information
1015+
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1016+
if isinstance(group_information_field, Array):
1017+
member_information_list = []
1018+
member_schema = group_information_field.array_of
1019+
for members in described_group_information:
1020+
member_information = []
1021+
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1022+
if protocol_type_is_consumer:
1023+
if member_name == 'member_metadata' and member:
1024+
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1025+
elif member_name == 'member_assignment' and member:
1026+
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
1027+
else:
1028+
member_information.append(member)
1029+
member_info_tuple = MemberInformation._make(member_information)
1030+
member_information_list.append(member_info_tuple)
1031+
described_group_information_list.append(member_information_list)
1032+
else:
1033+
described_group_information_list.append(described_group_information)
1034+
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
1035+
# Therefore, appending a placeholder of None in it.
1036+
if response.API_VERSION <=2:
1037+
described_group_information_list.append(None)
1038+
group_description = GroupInformation._make(described_group_information_list)
1039+
error_code = group_description.error_code
10091040
error_type = Errors.for_code(error_code)
10101041
# Java has the note: KAFKA-6789, we can retry based on the error code
10111042
if error_type is not Errors.NoError:
10121043
raise error_type(
10131044
"DescribeGroupsResponse failed with response '{}'."
10141045
.format(response))
1015-
# TODO Java checks the group protocol type, and if consumer
1016-
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
1017-
# the members' partition assignments... that hasn't yet been
1018-
# implemented here so just return the raw struct results
10191046
else:
10201047
raise NotImplementedError(
10211048
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."

kafka/client_async.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import collections
44
import copy
5-
import functools
65
import logging
76
import random
87
import socket
@@ -202,18 +201,22 @@ def __init__(self, **configs):
202201
if key in configs:
203202
self.config[key] = configs[key]
204203

204+
# these properties need to be set on top of the initialization pipeline
205+
# because they are used when __del__ method is called
206+
self._closed = False
207+
self._wake_r, self._wake_w = socket.socketpair()
208+
self._selector = self.config['selector']()
209+
205210
self.cluster = ClusterMetadata(**self.config)
206211
self._topics = set() # empty set will fetch all topic metadata
207212
self._metadata_refresh_in_progress = False
208-
self._selector = self.config['selector']()
209213
self._conns = Dict() # object to support weakrefs
210214
self._api_versions = None
211215
self._connecting = set()
212216
self._sending = set()
213217
self._refresh_on_disconnects = True
214218
self._last_bootstrap = 0
215219
self._bootstrap_fails = 0
216-
self._wake_r, self._wake_w = socket.socketpair()
217220
self._wake_r.setblocking(False)
218221
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
219222
self._wake_lock = threading.Lock()
@@ -227,7 +230,6 @@ def __init__(self, **configs):
227230

228231
self._selector.register(self._wake_r, selectors.EVENT_READ)
229232
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
230-
self._closed = False
231233
self._sensors = None
232234
if self.config['metrics']:
233235
self._sensors = KafkaClientMetrics(self.config['metrics'],

kafka/codec.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@
1010

1111
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
1212
_XERIAL_V1_FORMAT = 'bccccccBii'
13+
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
1314

1415
try:
1516
import snappy
1617
except ImportError:
1718
snappy = None
1819

20+
try:
21+
import zstandard as zstd
22+
except ImportError:
23+
zstd = None
24+
1925
try:
2026
import lz4.frame as lz4
2127

@@ -58,6 +64,10 @@ def has_snappy():
5864
return snappy is not None
5965

6066

67+
def has_zstd():
68+
return zstd is not None
69+
70+
6171
def has_lz4():
6272
if lz4 is not None:
6373
return True
@@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
299309
payload[header_size:]
300310
])
301311
return lz4_decode(munged_payload)
312+
313+
314+
def zstd_encode(payload):
315+
if not zstd:
316+
raise NotImplementedError("Zstd codec is not available")
317+
return zstd.ZstdCompressor().compress(payload)
318+
319+
320+
def zstd_decode(payload):
321+
if not zstd:
322+
raise NotImplementedError("Zstd codec is not available")
323+
try:
324+
return zstd.ZstdDecompressor().decompress(payload)
325+
except zstd.ZstdError:
326+
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)

0 commit comments

Comments
 (0)