Skip to content

Commit b53c838

Browse files
authored
Merge branch 'master' into feature/skip_control_batch
2 parents 8eb2c52 + 6756974 commit b53c838

File tree

19 files changed

+516
-394
lines changed

19 files changed

+516
-394
lines changed

kafka/client_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class KafkaClient:
5959
rate. To avoid connection storms, a randomization factor of 0.2
6060
will be applied to the backoff resulting in a random range between
6161
20% below and 20% above the computed value. Default: 1000.
62+
connection_timeout_ms (int): Connection timeout in milliseconds.
63+
Default: None, which defaults it to the same value as
64+
request_timeout_ms.
6265
request_timeout_ms (int): Client request timeout in milliseconds.
6366
Default: 30000.
6467
connections_max_idle_ms: Close idle connections after the number of
@@ -145,6 +148,7 @@ class KafkaClient:
145148
'bootstrap_servers': 'localhost',
146149
'bootstrap_topics_filter': set(),
147150
'client_id': 'kafka-python-' + __version__,
151+
'connection_timeout_ms': None,
148152
'request_timeout_ms': 30000,
149153
'wakeup_timeout_ms': 3000,
150154
'connections_max_idle_ms': 9 * 60 * 1000,

kafka/conn.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,6 @@ class SSLWantWriteError(Exception):
6868
gssapi = None
6969
GSSError = None
7070

71-
# needed for AWS_MSK_IAM authentication:
72-
try:
73-
from botocore.session import Session as BotoSession
74-
except ImportError:
75-
# no botocore available, will disable AWS_MSK_IAM mechanism
76-
BotoSession = None
77-
7871
AFI_NAMES = {
7972
socket.AF_UNSPEC: "unspecified",
8073
socket.AF_INET: "IPv4",
@@ -112,6 +105,9 @@ class BrokerConnection:
112105
rate. To avoid connection storms, a randomization factor of 0.2
113106
will be applied to the backoff resulting in a random range between
114107
20% below and 20% above the computed value. Default: 1000.
108+
connection_timeout_ms (int): Connection timeout in milliseconds.
109+
Default: None, which defaults it to the same value as
110+
request_timeout_ms.
115111
request_timeout_ms (int): Client request timeout in milliseconds.
116112
Default: 30000.
117113
max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -188,6 +184,7 @@ class BrokerConnection:
188184
'client_id': 'kafka-python-' + __version__,
189185
'node_id': 0,
190186
'request_timeout_ms': 30000,
187+
'connection_timeout_ms': None,
191188
'reconnect_backoff_ms': 50,
192189
'reconnect_backoff_max_ms': 1000,
193190
'max_in_flight_requests_per_connection': 5,
@@ -232,6 +229,9 @@ def __init__(self, host, port, afi, **configs):
232229
if key in configs:
233230
self.config[key] = configs[key]
234231

232+
if self.config['connection_timeout_ms'] is None:
233+
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']
234+
235235
self.node_id = self.config.pop('node_id')
236236

237237
if self.config['receive_buffer_bytes'] is not None:
@@ -246,19 +246,15 @@ def __init__(self, host, port, afi, **configs):
246246
assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
247247
'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))
248248

249-
250249
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
251250
assert ssl_available, "Python wasn't built with SSL support"
252251

253-
if self.config['sasl_mechanism'] == 'AWS_MSK_IAM':
254-
assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package'
255-
assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL'
256-
257252
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
258253
assert self.config['sasl_mechanism'] in sasl.MECHANISMS, (
259254
'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys()))
260255
)
261256
sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self)
257+
262258
# This is not a general lock / this class is not generally thread-safe yet
263259
# However, to avoid pushing responsibility for maintaining
264260
# per-connection locks to the upstream client, we will use this lock to
@@ -284,7 +280,10 @@ def __init__(self, host, port, afi, **configs):
284280
if self.config['ssl_context'] is not None:
285281
self._ssl_context = self.config['ssl_context']
286282
self._sasl_auth_future = None
287-
self.last_attempt = 0
283+
self.last_activity = 0
284+
# This value is not used for internal state, but it is left to allow backwards-compatability
285+
# The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks.
286+
self.last_attempt= 0
288287
self._gai = []
289288
self._sensors = None
290289
if self.config['metrics']:
@@ -362,6 +361,7 @@ def connect(self):
362361
self.config['state_change_callback'](self.node_id, self._sock, self)
363362
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
364363
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
364+
self.last_activity = time.time()
365365

366366
if self.state is ConnectionStates.CONNECTING:
367367
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -394,6 +394,7 @@ def connect(self):
394394
self.state = ConnectionStates.CONNECTED
395395
self._reset_reconnect_backoff()
396396
self.config['state_change_callback'](self.node_id, self._sock, self)
397+
self.last_activity = time.time()
397398

398399
# Connection failed
399400
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -419,6 +420,7 @@ def connect(self):
419420
self.state = ConnectionStates.CONNECTED
420421
self._reset_reconnect_backoff()
421422
self.config['state_change_callback'](self.node_id, self._sock, self)
423+
self.last_activity = time.time()
422424

423425
if self.state is ConnectionStates.AUTHENTICATING:
424426
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
@@ -429,12 +431,13 @@ def connect(self):
429431
self.state = ConnectionStates.CONNECTED
430432
self._reset_reconnect_backoff()
431433
self.config['state_change_callback'](self.node_id, self._sock, self)
434+
self.last_activity = time.time()
432435

433436
if self.state not in (ConnectionStates.CONNECTED,
434437
ConnectionStates.DISCONNECTED):
435438
# Connection timed out
436-
request_timeout = self.config['request_timeout_ms'] / 1000.0
437-
if time.time() > request_timeout + self.last_attempt:
439+
request_timeout = self.config['connection_timeout_ms'] / 1000.0
440+
if time.time() > request_timeout + self.last_activity:
438441
log.error('Connection attempt to %s timed out', self)
439442
self.close(Errors.KafkaConnectionError('timeout'))
440443
return self.state
@@ -595,7 +598,7 @@ def blacked_out(self):
595598
re-establish a connection yet
596599
"""
597600
if self.state is ConnectionStates.DISCONNECTED:
598-
if time.time() < self.last_attempt + self._reconnect_backoff:
601+
if time.time() < self.last_activity + self._reconnect_backoff:
599602
return True
600603
return False
601604

@@ -606,7 +609,7 @@ def connection_delay(self):
606609
the reconnect backoff time. When connecting or connected, returns a very
607610
large number to handle slow/stalled connections.
608611
"""
609-
time_waited = time.time() - (self.last_attempt or 0)
612+
time_waited = time.time() - (self.last_activity or 0)
610613
if self.state is ConnectionStates.DISCONNECTED:
611614
return max(self._reconnect_backoff - time_waited, 0) * 1000
612615
else:

kafka/coordinator/assignors/abstract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class AbstractPartitionAssignor(object):
1212
partition counts which are always needed in assignors).
1313
"""
1414

15-
@abc.abstractproperty
15+
@abc.abstractmethod
1616
def name(self):
1717
""".name should be a string identifying the assignor"""
1818
pass

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from collections import defaultdict, namedtuple
33
from copy import deepcopy
44

5-
from kafka.cluster import ClusterMetadata
65
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
76
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
87
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet

kafka/errors.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import inspect
22
import sys
3+
from typing import Any
34

45

56
class KafkaError(RuntimeError):
67
retriable = False
78
# whether metadata should be refreshed on error
89
invalid_metadata = False
910

10-
def __str__(self):
11+
def __str__(self) -> str:
1112
if not self.args:
1213
return self.__class__.__name__
1314
return '{}: {}'.format(self.__class__.__name__,
@@ -65,7 +66,7 @@ class IncompatibleBrokerVersion(KafkaError):
6566

6667

6768
class CommitFailedError(KafkaError):
68-
def __init__(self, *args, **kwargs):
69+
def __init__(self, *args, **kwargs) -> None:
6970
super().__init__(
7071
"""Commit cannot be completed since the group has already
7172
rebalanced and assigned the partitions to another member.
@@ -92,7 +93,7 @@ class BrokerResponseError(KafkaError):
9293
message = None
9394
description = None
9495

95-
def __str__(self):
96+
def __str__(self) -> str:
9697
"""Add errno to standard KafkaError str"""
9798
return '[Error {}] {}'.format(
9899
self.errno,
@@ -509,7 +510,7 @@ def _iter_broker_errors():
509510
kafka_errors = {x.errno: x for x in _iter_broker_errors()}
510511

511512

512-
def for_code(error_code):
513+
def for_code(error_code: int) -> Any:
513514
return kafka_errors.get(error_code, UnknownError)
514515

515516

kafka/producer/kafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ class KafkaProducer:
190190
brokers or partitions. Default: 300000
191191
retry_backoff_ms (int): Milliseconds to backoff when retrying on
192192
errors. Default: 100.
193+
connection_timeout_ms (int): Connection timeout in milliseconds.
194+
Default: None, which defaults it to the same value as
195+
request_timeout_ms.
193196
request_timeout_ms (int): Client request timeout in milliseconds.
194197
Default: 30000.
195198
receive_buffer_bytes (int): The size of the TCP receive buffer
@@ -300,6 +303,7 @@ class KafkaProducer:
300303
'max_request_size': 1048576,
301304
'metadata_max_age_ms': 300000,
302305
'retry_backoff_ms': 100,
306+
'connection_timeout_ms': None,
303307
'request_timeout_ms': 30000,
304308
'receive_buffer_bytes': None,
305309
'send_buffer_bytes': None,

kafka/protocol/api.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,22 @@ class Request(Struct):
5252

5353
FLEXIBLE_VERSION = False
5454

55-
@abc.abstractproperty
55+
@abc.abstractmethod
5656
def API_KEY(self):
5757
"""Integer identifier for api request"""
5858
pass
5959

60-
@abc.abstractproperty
60+
@abc.abstractmethod
6161
def API_VERSION(self):
6262
"""Integer of api request version"""
6363
pass
6464

65-
@abc.abstractproperty
65+
@abc.abstractmethod
6666
def SCHEMA(self):
6767
"""An instance of Schema() representing the request structure"""
6868
pass
6969

70-
@abc.abstractproperty
70+
@abc.abstractmethod
7171
def RESPONSE_TYPE(self):
7272
"""The Response class associated with the api request"""
7373
pass
@@ -93,17 +93,17 @@ def parse_response_header(self, read_buffer):
9393
class Response(Struct):
9494
__metaclass__ = abc.ABCMeta
9595

96-
@abc.abstractproperty
96+
@abc.abstractmethod
9797
def API_KEY(self):
9898
"""Integer identifier for api request/response"""
9999
pass
100100

101-
@abc.abstractproperty
101+
@abc.abstractmethod
102102
def API_VERSION(self):
103103
"""Integer of api request/response version"""
104104
pass
105105

106-
@abc.abstractproperty
106+
@abc.abstractmethod
107107
def SCHEMA(self):
108108
"""An instance of Schema() representing the response structure"""
109109
pass

kafka/protocol/struct.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
from io import BytesIO
2+
from typing import List, Union
23

34
from kafka.protocol.abstract import AbstractType
45
from kafka.protocol.types import Schema
56

7+
68
from kafka.util import WeakMethod
79

810

911
class Struct(AbstractType):
1012
SCHEMA = Schema()
1113

12-
def __init__(self, *args, **kwargs):
14+
def __init__(self, *args, **kwargs) -> None:
1315
if len(args) == len(self.SCHEMA.fields):
1416
for i, name in enumerate(self.SCHEMA.names):
1517
self.__dict__[name] = args[i]
@@ -36,23 +38,23 @@ def encode(cls, item): # pylint: disable=E0202
3638
bits.append(field.encode(item[i]))
3739
return b''.join(bits)
3840

39-
def _encode_self(self):
41+
def _encode_self(self) -> bytes:
4042
return self.SCHEMA.encode(
4143
[self.__dict__[name] for name in self.SCHEMA.names]
4244
)
4345

4446
@classmethod
45-
def decode(cls, data):
47+
def decode(cls, data: Union[BytesIO, bytes]) -> "Struct":
4648
if isinstance(data, bytes):
4749
data = BytesIO(data)
4850
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
4951

50-
def get_item(self, name):
52+
def get_item(self, name: str) -> Union[int, List[List[Union[int, str, bool, List[List[Union[int, List[int]]]]]]], str, List[List[Union[int, str]]]]:
5153
if name not in self.SCHEMA.names:
5254
raise KeyError("%s is not in the schema" % name)
5355
return self.__dict__[name]
5456

55-
def __repr__(self):
57+
def __repr__(self) -> str:
5658
key_vals = []
5759
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
5860
key_vals.append(f'{name}={field.repr(self.__dict__[name])}')
@@ -61,7 +63,7 @@ def __repr__(self):
6163
def __hash__(self):
6264
return hash(self.encode())
6365

64-
def __eq__(self, other):
66+
def __eq__(self, other: "Struct") -> bool:
6567
if self.SCHEMA != other.SCHEMA:
6668
return False
6769
for attr in self.SCHEMA.names:

kafka/record/_crc32c.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
_MASK = 0xFFFFFFFF
9898

9999

100-
def crc_update(crc, data):
100+
def crc_update(crc: int, data: bytes) -> int:
101101
"""Update CRC-32C checksum with data.
102102
Args:
103103
crc: 32-bit checksum to update as long.
@@ -116,7 +116,7 @@ def crc_update(crc, data):
116116
return crc ^ _MASK
117117

118118

119-
def crc_finalize(crc):
119+
def crc_finalize(crc: int) -> int:
120120
"""Finalize CRC-32C checksum.
121121
This function should be called as last step of crc calculation.
122122
Args:
@@ -127,7 +127,7 @@ def crc_finalize(crc):
127127
return crc & _MASK
128128

129129

130-
def crc(data):
130+
def crc(data: bytes) -> int:
131131
"""Compute CRC-32C checksum of the data.
132132
Args:
133133
data: byte array, string or iterable over bytes.

0 commit comments

Comments
 (0)