Skip to content

Commit 8eb2c52

Browse files
authored
Merge branch 'master' into feature/skip_control_batch
2 parents 7fc565b + 3c124b2 commit 8eb2c52

File tree

110 files changed

+1359
-3410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+1359
-3410
lines changed

.covrc

Lines changed: 0 additions & 3 deletions
This file was deleted.

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ jobs:
111111
needs:
112112
- build-sdist
113113
runs-on: ubuntu-latest
114-
timeout-minutes: 10
114+
timeout-minutes: 15
115115
strategy:
116116
fail-fast: false
117117
matrix:

CHANGES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# 2.0.3 (under development)
2+
3+
Consumer
4+
* KIP-345: Implement static membership support
5+
16
# 2.0.2 (Sep 29, 2020)
27

38
Consumer

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ test-local: build-integration
2929
cov-local: build-integration
3030
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
3131
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
32-
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
32+
--cov-report html $(FLAGS) kafka test
3333
@echo "open file://`pwd`/htmlcov/index.html"
3434

3535
# Check the readme for syntax errors, which can lead to invalid formatting on

README.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:
6464

6565
.. code-block:: python
6666
67+
# join a consumer group for dynamic partition assignment and offset commits
6768
from kafka import KafkaConsumer
68-
consumer = KafkaConsumer('my_favorite_topic')
69+
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
70+
# or as a static member with a fixed group member name
71+
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
72+
# group_instance_id='consumer-1', leave_group_on_close=False)
6973
for msg in consumer:
7074
print (msg)
7175

benchmarks/consumer_performance.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import threading
1111
import traceback
1212

13-
from kafka.vendor.six.moves import range
14-
1513
from kafka import KafkaConsumer, KafkaProducer
1614
from test.fixtures import KafkaFixture, ZookeeperFixture
1715

benchmarks/producer_performance.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import threading
1010
import traceback
1111

12-
from kafka.vendor.six.moves import range
13-
1412
from kafka import KafkaProducer
1513
from test.fixtures import KafkaFixture, ZookeeperFixture
1614

benchmarks/varint_speed.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env python
2-
from __future__ import print_function
32
import pyperf
4-
from kafka.vendor import six
53

64

75
test_data = [
@@ -67,6 +65,10 @@
6765
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))
6866

6967

68+
def int2byte(i):
69+
return bytes((i),)
70+
71+
7072
def _assert_valid_enc(enc_func):
7173
for encoded, decoded in test_data:
7274
assert enc_func(decoded) == encoded, decoded
@@ -116,7 +118,7 @@ def encode_varint_1(num):
116118
_assert_valid_enc(encode_varint_1)
117119

118120

119-
def encode_varint_2(value, int2byte=six.int2byte):
121+
def encode_varint_2(value, int2byte=int2byte):
120122
value = (value << 1) ^ (value >> 63)
121123

122124
bits = value & 0x7f
@@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
151153
assert res == encoded
152154

153155

154-
def encode_varint_4(value, int2byte=six.int2byte):
156+
def encode_varint_4(value, int2byte=int2byte):
155157
value = (value << 1) ^ (value >> 63)
156158

157159
if value <= 0x7f: # 1 byte
@@ -301,22 +303,13 @@ def size_of_varint_2(value):
301303
_assert_valid_size(size_of_varint_2)
302304

303305

304-
if six.PY3:
305-
def _read_byte(memview, pos):
306-
""" Read a byte from memoryview as an integer
307-
308-
Raises:
309-
IndexError: if position is out of bounds
310-
"""
311-
return memview[pos]
312-
else:
313-
def _read_byte(memview, pos):
314-
""" Read a byte from memoryview as an integer
306+
def _read_byte(memview, pos):
307+
""" Read a byte from memoryview as an integer
315308
316309
Raises:
317310
IndexError: if position is out of bounds
318311
"""
319-
return ord(memview[pos])
312+
return memview[pos]
320313

321314

322315
def decode_varint_1(buffer, pos=0):

docs/changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
Changelog
22
=========
33

4+
2.2.0
5+
####################
6+
7+
Consumer
8+
--------
9+
* KIP-345: Implement static membership support
10+
411

512
2.0.2 (Sep 29, 2020)
613
####################

docs/usage.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ KafkaConsumer
4747
group_id='my-group',
4848
bootstrap_servers='my.server.com')
4949
50+
# Use multiple static consumers w/ 2.3.0 kafka brokers
51+
consumer1 = KafkaConsumer('my-topic',
52+
group_id='my-group',
53+
group_instance_id='process-1',
54+
leave_group_on_close=False,
55+
bootstrap_servers='my.server.com')
56+
consumer2 = KafkaConsumer('my-topic',
57+
group_id='my-group',
58+
group_instance_id='process-2',
59+
leave_group_on_close=False,
60+
bootstrap_servers='my.server.com')
61+
5062
5163
There are many configuration options for the consumer class. See
5264
:class:`~kafka.KafkaConsumer` API documentation for more details.

kafka/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
__title__ = 'kafka'
42
from kafka.version import __version__
53
__author__ = 'Dana Powers'

kafka/admin/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
42
from kafka.admin.client import KafkaAdminClient
53
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,

kafka/admin/acl_resource.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
from __future__ import absolute_import
2-
from kafka.errors import IllegalArgumentError
1+
from enum import IntEnum
32

4-
# enum in stdlib as of py3.4
5-
try:
6-
from enum import IntEnum # pylint: disable=import-error
7-
except ImportError:
8-
# vendored backport module
9-
from kafka.vendor.enum34 import IntEnum
3+
from kafka.errors import IllegalArgumentError
104

115

126
class ResourceType(IntEnum):
@@ -69,7 +63,7 @@ class ACLResourcePatternType(IntEnum):
6963
PREFIXED = 4
7064

7165

72-
class ACLFilter(object):
66+
class ACLFilter:
7367
"""Represents a filter to use with describing and deleting ACLs
7468
7569
The difference between this class and the ACL class is mainly that
@@ -161,7 +155,7 @@ def __init__(
161155
permission_type,
162156
resource_pattern
163157
):
164-
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
158+
super().__init__(principal, host, operation, permission_type, resource_pattern)
165159
self.validate()
166160

167161
def validate(self):
@@ -173,7 +167,7 @@ def validate(self):
173167
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")
174168

175169

176-
class ResourcePatternFilter(object):
170+
class ResourcePatternFilter:
177171
def __init__(
178172
self,
179173
resource_type,
@@ -232,13 +226,13 @@ def __init__(
232226
resource_name,
233227
pattern_type=ACLResourcePatternType.LITERAL
234228
):
235-
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
229+
super().__init__(resource_type, resource_name, pattern_type)
236230
self.validate()
237231

238232
def validate(self):
239233
if self.resource_type == ResourceType.ANY:
240234
raise IllegalArgumentError("resource_type cannot be ANY")
241235
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
242236
raise IllegalArgumentError(
243-
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
237+
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
244238
)

kafka/admin/client.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
from __future__ import absolute_import
2-
31
from collections import defaultdict
42
import copy
53
import logging
64
import socket
75

86
from . import ConfigResourceType
9-
from kafka.vendor import six
107

118
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
129
ACLResourcePatternType
@@ -20,7 +17,7 @@
2017
from kafka.protocol.admin import (
2118
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
2219
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
23-
DeleteGroupsRequest
20+
DeleteGroupsRequest, DescribeLogDirsRequest
2421
)
2522
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2623
from kafka.protocol.metadata import MetadataRequest
@@ -32,7 +29,7 @@
3229
log = logging.getLogger(__name__)
3330

3431

35-
class KafkaAdminClient(object):
32+
class KafkaAdminClient:
3633
"""A class for administering the Kafka cluster.
3734
3835
Warning:
@@ -194,7 +191,7 @@ def __init__(self, **configs):
194191
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
195192
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
196193
if extra_configs:
197-
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
194+
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")
198195

199196
self.config = copy.copy(self.DEFAULT_CONFIG)
200197
self.config.update(configs)
@@ -874,7 +871,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
874871
))
875872
else:
876873
raise NotImplementedError(
877-
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
874+
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")
878875

879876
self._wait_for_futures(futures)
880877
return [f.value for f in futures]
@@ -1197,7 +1194,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
11971194
topics_partitions_dict = defaultdict(set)
11981195
for topic, partition in partitions:
11991196
topics_partitions_dict[topic].add(partition)
1200-
topics_partitions = list(six.iteritems(topics_partitions_dict))
1197+
topics_partitions = list(topics_partitions_dict.items())
12011198
request = OffsetFetchRequest[version](group_id, topics_partitions)
12021199
else:
12031200
raise NotImplementedError(
@@ -1345,3 +1342,19 @@ def _wait_for_futures(self, futures):
13451342

13461343
if future.failed():
13471344
raise future.exception # pylint: disable-msg=raising-bad-type
1345+
1346+
def describe_log_dirs(self):
1347+
"""Send a DescribeLogDirsRequest request to a broker.
1348+
1349+
:return: A message future
1350+
"""
1351+
version = self._matching_api_version(DescribeLogDirsRequest)
1352+
if version <= 1:
1353+
request = DescribeLogDirsRequest[version]()
1354+
future = self._send_request_to_node(self._client.least_loaded_node(), request)
1355+
self._wait_for_futures([future])
1356+
else:
1357+
raise NotImplementedError(
1358+
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
1359+
.format(version))
1360+
return future.value

kafka/admin/config_resource.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
1-
from __future__ import absolute_import
2-
3-
# enum in stdlib as of py3.4
4-
try:
5-
from enum import IntEnum # pylint: disable=import-error
6-
except ImportError:
7-
# vendored backport module
8-
from kafka.vendor.enum34 import IntEnum
1+
from enum import IntEnum
92

103

114
class ConfigResourceType(IntEnum):
@@ -15,7 +8,7 @@ class ConfigResourceType(IntEnum):
158
TOPIC = 2
169

1710

18-
class ConfigResource(object):
11+
class ConfigResource:
1912
"""A class for specifying config resources.
2013
Arguments:
2114
resource_type (ConfigResourceType): the type of kafka resource

kafka/admin/new_partitions.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
from __future__ import absolute_import
2-
3-
4-
class NewPartitions(object):
1+
class NewPartitions:
52
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
63
must be the difference between the new total number of partitions and the existing number of partitions.
74
Arguments:

kafka/admin/new_topic.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from __future__ import absolute_import
2-
31
from kafka.errors import IllegalArgumentError
42

53

6-
class NewTopic(object):
4+
class NewTopic:
75
""" A class for new topic creation
86
Arguments:
97
name (string): name of the topic

0 commit comments

Comments
 (0)