Skip to content

Commit 06f0450

Browse files
wbarnhapetterroea
authored andcommitted
Add connection_timeout_ms and reset the timeout counter more often (#132)
* Add connection_timeout_ms and reset the timeout counter more often * Refactor last_attempt -> last_activity This semantically reflects the new usage of the variable better * Make tests work again * Add unit tests of new BrokerConnection functionality The test mocks parts of BrokerConnection in order to assert that the connection state machine allows long-lasting connections as long as the state progresses often enough * Re-introduce last_attempt to avoid breakage --------- Co-authored-by: Liam S. Crouch <[email protected]>
1 parent 1814f11 commit 06f0450

File tree

4 files changed

+107
-13
lines changed

4 files changed

+107
-13
lines changed

kafka/client_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class KafkaClient:
5959
rate. To avoid connection storms, a randomization factor of 0.2
6060
will be applied to the backoff resulting in a random range between
6161
20% below and 20% above the computed value. Default: 1000.
62+
connection_timeout_ms (int): Connection timeout in milliseconds.
63+
Default: None, which defaults it to the same value as
64+
request_timeout_ms.
6265
request_timeout_ms (int): Client request timeout in milliseconds.
6366
Default: 30000.
6467
connections_max_idle_ms: Close idle connections after the number of
@@ -145,6 +148,7 @@ class KafkaClient:
145148
'bootstrap_servers': 'localhost',
146149
'bootstrap_topics_filter': set(),
147150
'client_id': 'kafka-python-' + __version__,
151+
'connection_timeout_ms': None,
148152
'request_timeout_ms': 30000,
149153
'wakeup_timeout_ms': 3000,
150154
'connections_max_idle_ms': 9 * 60 * 1000,

kafka/conn.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ class BrokerConnection:
112112
rate. To avoid connection storms, a randomization factor of 0.2
113113
will be applied to the backoff resulting in a random range between
114114
20% below and 20% above the computed value. Default: 1000.
115+
connection_timeout_ms (int): Connection timeout in milliseconds.
116+
Default: None, which defaults it to the same value as
117+
request_timeout_ms.
115118
request_timeout_ms (int): Client request timeout in milliseconds.
116119
Default: 30000.
117120
max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -188,6 +191,7 @@ class BrokerConnection:
188191
'client_id': 'kafka-python-' + __version__,
189192
'node_id': 0,
190193
'request_timeout_ms': 30000,
194+
'connection_timeout_ms': None,
191195
'reconnect_backoff_ms': 50,
192196
'reconnect_backoff_max_ms': 1000,
193197
'max_in_flight_requests_per_connection': 5,
@@ -231,6 +235,9 @@ def __init__(self, host, port, afi, **configs):
231235
for key in self.config:
232236
if key in configs:
233237
self.config[key] = configs[key]
238+
239+
if self.config['connection_timeout_ms'] is None:
240+
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']
234241

235242
self.node_id = self.config.pop('node_id')
236243

@@ -284,7 +291,10 @@ def __init__(self, host, port, afi, **configs):
284291
if self.config['ssl_context'] is not None:
285292
self._ssl_context = self.config['ssl_context']
286293
self._sasl_auth_future = None
287-
self.last_attempt = 0
294+
self.last_activity = 0
295+
# This value is not used for internal state, but it is left to allow backwards-compatability
296+
# The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks.
297+
self.last_attempt= 0
288298
self._gai = []
289299
self._sensors = None
290300
if self.config['metrics']:
@@ -362,6 +372,7 @@ def connect(self):
362372
self.config['state_change_callback'](self.node_id, self._sock, self)
363373
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
364374
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
375+
self.last_activity = time.time()
365376

366377
if self.state is ConnectionStates.CONNECTING:
367378
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -394,6 +405,7 @@ def connect(self):
394405
self.state = ConnectionStates.CONNECTED
395406
self._reset_reconnect_backoff()
396407
self.config['state_change_callback'](self.node_id, self._sock, self)
408+
self.last_activity = time.time()
397409

398410
# Connection failed
399411
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -419,6 +431,7 @@ def connect(self):
419431
self.state = ConnectionStates.CONNECTED
420432
self._reset_reconnect_backoff()
421433
self.config['state_change_callback'](self.node_id, self._sock, self)
434+
self.last_activity = time.time()
422435

423436
if self.state is ConnectionStates.AUTHENTICATING:
424437
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
@@ -429,12 +442,13 @@ def connect(self):
429442
self.state = ConnectionStates.CONNECTED
430443
self._reset_reconnect_backoff()
431444
self.config['state_change_callback'](self.node_id, self._sock, self)
445+
self.last_activity = time.time()
432446

433447
if self.state not in (ConnectionStates.CONNECTED,
434448
ConnectionStates.DISCONNECTED):
435449
# Connection timed out
436-
request_timeout = self.config['request_timeout_ms'] / 1000.0
437-
if time.time() > request_timeout + self.last_attempt:
450+
request_timeout = self.config['connection_timeout_ms'] / 1000.0
451+
if time.time() > request_timeout + self.last_activity:
438452
log.error('Connection attempt to %s timed out', self)
439453
self.close(Errors.KafkaConnectionError('timeout'))
440454
return self.state
@@ -595,7 +609,7 @@ def blacked_out(self):
595609
re-establish a connection yet
596610
"""
597611
if self.state is ConnectionStates.DISCONNECTED:
598-
if time.time() < self.last_attempt + self._reconnect_backoff:
612+
if time.time() < self.last_activity + self._reconnect_backoff:
599613
return True
600614
return False
601615

@@ -606,7 +620,7 @@ def connection_delay(self):
606620
the reconnect backoff time. When connecting or connected, returns a very
607621
large number to handle slow/stalled connections.
608622
"""
609-
time_waited = time.time() - (self.last_attempt or 0)
623+
time_waited = time.time() - (self.last_activity or 0)
610624
if self.state is ConnectionStates.DISCONNECTED:
611625
return max(self._reconnect_backoff - time_waited, 0) * 1000
612626
else:

kafka/producer/kafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ class KafkaProducer:
190190
brokers or partitions. Default: 300000
191191
retry_backoff_ms (int): Milliseconds to backoff when retrying on
192192
errors. Default: 100.
193+
connection_timeout_ms (int): Connection timeout in milliseconds.
194+
Default: None, which defaults it to the same value as
195+
request_timeout_ms.
193196
request_timeout_ms (int): Client request timeout in milliseconds.
194197
Default: 30000.
195198
receive_buffer_bytes (int): The size of the TCP receive buffer
@@ -300,6 +303,7 @@ class KafkaProducer:
300303
'max_request_size': 1048576,
301304
'metadata_max_age_ms': 300000,
302305
'retry_backoff_ms': 100,
306+
'connection_timeout_ms': None,
303307
'request_timeout_ms': 30000,
304308
'receive_buffer_bytes': None,
305309
'send_buffer_bytes': None,

test/test_conn.py

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77

88
import pytest
9+
import time
910

1011
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
1112
from kafka.protocol.api import RequestHeader
@@ -61,28 +62,99 @@ def test_connect_timeout(_socket, conn):
6162
# Initial connect returns EINPROGRESS
6263
# immediate inline connect returns EALREADY
6364
# second explicit connect returns EALREADY
64-
# third explicit connect returns EALREADY and times out via last_attempt
65+
# third explicit connect returns EALREADY and times out via last_activity
6566
_socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY]
6667
conn.connect()
6768
assert conn.state is ConnectionStates.CONNECTING
6869
conn.connect()
6970
assert conn.state is ConnectionStates.CONNECTING
71+
conn.last_activity = 0
7072
conn.last_attempt = 0
7173
conn.connect()
7274
assert conn.state is ConnectionStates.DISCONNECTED
7375

76+
def test_connect_timeout_slowconn(_socket, conn, mocker):
77+
# Same as test_connect_timeout,
78+
# but we make the connection run longer than the timeout in order to test that
79+
# BrokerConnection resets the timer whenever things happen during the connection
80+
# See https://github.com/dpkp/kafka-python/issues/2386
81+
_socket.connect_ex.side_effect = [EINPROGRESS, EISCONN]
82+
83+
# 0.8 = we guarantee that when testing with three intervals of this we are past the timeout
84+
time_between_connect = (conn.config['connection_timeout_ms']/1000) * 0.8
85+
start = time.time()
86+
87+
# Use plaintext auth for simplicity
88+
last_activity = conn.last_activity
89+
last_attempt = conn.last_attempt
90+
conn.config['security_protocol'] = 'SASL_PLAINTEXT'
91+
conn.connect()
92+
assert conn.state is ConnectionStates.CONNECTING
93+
# Ensure the last_activity counter was updated
94+
# Last_attempt should also be updated
95+
assert conn.last_activity > last_activity
96+
assert conn.last_attempt > last_attempt
97+
last_attempt = conn.last_attempt
98+
last_activity = conn.last_activity
99+
100+
# Simulate time being passed
101+
# This shouldn't be enough time to time out the connection
102+
conn._try_authenticate = mocker.Mock(side_effect=[False, False, True])
103+
with mock.patch("time.time", return_value=start+time_between_connect):
104+
# This should trigger authentication
105+
# Note that an authentication attempt isn't actually made until now.
106+
# We simulate that authentication does not succeed at this point
107+
# This is technically incorrect, but it lets us see what happens
108+
# to the state machine when the state doesn't change for two function calls
109+
conn.connect()
110+
assert conn.last_activity > last_activity
111+
# Last attempt is kept as a legacy variable, should not update
112+
assert conn.last_attempt == last_attempt
113+
last_activity = conn.last_activity
114+
115+
assert conn.state is ConnectionStates.AUTHENTICATING
116+
117+
118+
# This time around we should be way past timeout.
119+
# Now we care about connect() not terminating the attempt,
120+
# because connection state was progressed in the meantime.
121+
with mock.patch("time.time", return_value=start+time_between_connect*2):
122+
# Simulate this one not succeeding as well. This is so we can ensure things don't time out
123+
conn.connect()
124+
125+
# No state change = no activity change
126+
assert conn.last_activity == last_activity
127+
assert conn.last_attempt == last_attempt
128+
129+
# If last_activity was not reset when the state transitioned to AUTHENTICATING,
130+
# the connection state would be timed out now.
131+
assert conn.state is ConnectionStates.AUTHENTICATING
132+
133+
134+
# This time around, the connection should succeed.
135+
with mock.patch("time.time", return_value=start+time_between_connect*3):
136+
# This should finalize the connection
137+
conn.connect()
138+
139+
assert conn.last_activity > last_activity
140+
assert conn.last_attempt == last_attempt
141+
last_activity = conn.last_activity
142+
143+
assert conn.state is ConnectionStates.CONNECTED
144+
145+
74146

75147
def test_blacked_out(conn):
76148
with mock.patch("time.time", return_value=1000):
77-
conn.last_attempt = 0
149+
conn.last_activity = 0
78150
assert conn.blacked_out() is False
79-
conn.last_attempt = 1000
151+
conn.last_activity = 1000
80152
assert conn.blacked_out() is True
81153

82154

83155
def test_connection_delay(conn):
84156
with mock.patch("time.time", return_value=1000):
85-
conn.last_attempt = 1000
157+
conn.last_activity = 1000
86158
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
87159
conn.state = ConnectionStates.CONNECTING
88160
assert conn.connection_delay() == float('inf')
@@ -286,7 +358,7 @@ def test_lookup_on_connect():
286358
]
287359

288360
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
289-
conn.last_attempt = 0
361+
conn.last_activity = 0
290362
conn.connect()
291363
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
292364
assert conn._sock_afi == afi2
@@ -301,11 +373,10 @@ def test_relookup_on_failure():
301373
assert conn.host == hostname
302374
mock_return1 = []
303375
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
304-
last_attempt = conn.last_attempt
376+
last_activity = conn.last_activity
305377
conn.connect()
306378
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
307379
assert conn.disconnected()
308-
assert conn.last_attempt > last_attempt
309380

310381
afi2 = socket.AF_INET
311382
sockaddr2 = ('127.0.0.2', 9092)
@@ -314,12 +385,13 @@ def test_relookup_on_failure():
314385
]
315386

316387
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
317-
conn.last_attempt = 0
388+
conn.last_activity = 0
318389
conn.connect()
319390
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
320391
assert conn._sock_afi == afi2
321392
assert conn._sock_addr == sockaddr2
322393
conn.close()
394+
assert conn.last_activity > last_activity
323395

324396

325397
def test_requests_timed_out(conn):

0 commit comments

Comments
 (0)