Skip to content

Commit 91d3149

Browse files
authored
Do not call state_change_callback with lock (#1775)
1 parent 27cd93b commit 91d3149

File tree

3 files changed

+40
-31
lines changed

3 files changed

+40
-31
lines changed

kafka/client_async.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,16 @@ def _can_connect(self, node_id):
260260
conn = self._conns[node_id]
261261
return conn.disconnected() and not conn.blacked_out()
262262

263-
def _conn_state_change(self, node_id, conn):
263+
def _conn_state_change(self, node_id, sock, conn):
264264
with self._lock:
265265
if conn.connecting():
266266
# SSL connections can enter this state 2x (second during Handshake)
267267
if node_id not in self._connecting:
268268
self._connecting.add(node_id)
269269
try:
270-
self._selector.register(conn._sock, selectors.EVENT_WRITE)
270+
self._selector.register(sock, selectors.EVENT_WRITE)
271271
except KeyError:
272-
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
272+
self._selector.modify(sock, selectors.EVENT_WRITE)
273273

274274
if self.cluster.is_bootstrap(node_id):
275275
self._last_bootstrap = time.time()
@@ -280,9 +280,9 @@ def _conn_state_change(self, node_id, conn):
280280
self._connecting.remove(node_id)
281281

282282
try:
283-
self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
283+
self._selector.modify(sock, selectors.EVENT_READ, conn)
284284
except KeyError:
285-
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
285+
self._selector.register(sock, selectors.EVENT_READ, conn)
286286

287287
if self._sensors:
288288
self._sensors.connection_created.record()
@@ -298,11 +298,11 @@ def _conn_state_change(self, node_id, conn):
298298
self._conns.pop(node_id).close()
299299

300300
# Connection failures imply that our metadata is stale, so let's refresh
301-
elif conn.state is ConnectionStates.DISCONNECTING:
301+
elif conn.state is ConnectionStates.DISCONNECTED:
302302
if node_id in self._connecting:
303303
self._connecting.remove(node_id)
304304
try:
305-
self._selector.unregister(conn._sock)
305+
self._selector.unregister(sock)
306306
except KeyError:
307307
pass
308308

@@ -369,7 +369,7 @@ def _maybe_connect(self, node_id):
369369
log.debug("Initiating connection to node %s at %s:%s",
370370
node_id, broker.host, broker.port)
371371
host, port, afi = get_ip_port_afi(broker.host)
372-
cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
372+
cb = WeakMethod(self._conn_state_change)
373373
conn = BrokerConnection(host, broker.port, afi,
374374
state_change_callback=cb,
375375
node_id=node_id,

kafka/conn.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ class BrokerConnection(object):
212212
'ssl_ciphers': None,
213213
'api_version': (0, 8, 2), # default to most restrictive
214214
'selector': selectors.DefaultSelector,
215-
'state_change_callback': lambda conn: True,
215+
'state_change_callback': lambda node_id, sock, conn: True,
216216
'metrics': None,
217217
'metric_group_prefix': '',
218218
'sasl_mechanism': None,
@@ -357,6 +357,7 @@ def connect(self):
357357
return self.state
358358
else:
359359
log.debug('%s: creating new socket', self)
360+
assert self._sock is None
360361
self._sock_afi, self._sock_addr = next_lookup
361362
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
362363

@@ -366,7 +367,7 @@ def connect(self):
366367

367368
self._sock.setblocking(False)
368369
self.state = ConnectionStates.CONNECTING
369-
self.config['state_change_callback'](self)
370+
self.config['state_change_callback'](self.node_id, self._sock, self)
370371
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
371372
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
372373

@@ -386,21 +387,21 @@ def connect(self):
386387
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
387388
log.debug('%s: initiating SSL handshake', self)
388389
self.state = ConnectionStates.HANDSHAKE
389-
self.config['state_change_callback'](self)
390+
self.config['state_change_callback'](self.node_id, self._sock, self)
390391
# _wrap_ssl can alter the connection state -- disconnects on failure
391392
self._wrap_ssl()
392393

393394
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
394395
log.debug('%s: initiating SASL authentication', self)
395396
self.state = ConnectionStates.AUTHENTICATING
396-
self.config['state_change_callback'](self)
397+
self.config['state_change_callback'](self.node_id, self._sock, self)
397398

398399
else:
399400
# security_protocol PLAINTEXT
400401
log.info('%s: Connection complete.', self)
401402
self.state = ConnectionStates.CONNECTED
402403
self._reset_reconnect_backoff()
403-
self.config['state_change_callback'](self)
404+
self.config['state_change_callback'](self.node_id, self._sock, self)
404405

405406
# Connection failed
406407
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -425,7 +426,7 @@ def connect(self):
425426
log.info('%s: Connection complete.', self)
426427
self.state = ConnectionStates.CONNECTED
427428
self._reset_reconnect_backoff()
428-
self.config['state_change_callback'](self)
429+
self.config['state_change_callback'](self.node_id, self._sock, self)
429430

430431
if self.state is ConnectionStates.AUTHENTICATING:
431432
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
@@ -435,7 +436,7 @@ def connect(self):
435436
log.info('%s: Connection complete.', self)
436437
self.state = ConnectionStates.CONNECTED
437438
self._reset_reconnect_backoff()
438-
self.config['state_change_callback'](self)
439+
self.config['state_change_callback'](self.node_id, self._sock, self)
439440

440441
if self.state not in (ConnectionStates.CONNECTED,
441442
ConnectionStates.DISCONNECTED):
@@ -802,15 +803,13 @@ def close(self, error=None):
802803
will be failed with this exception.
803804
Default: kafka.errors.KafkaConnectionError.
804805
"""
806+
if self.state is ConnectionStates.DISCONNECTED:
807+
return
805808
with self._lock:
806809
if self.state is ConnectionStates.DISCONNECTED:
807810
return
808811
log.info('%s: Closing connection. %s', self, error or '')
809-
self.state = ConnectionStates.DISCONNECTING
810-
self.config['state_change_callback'](self)
811812
self._update_reconnect_backoff()
812-
self._close_socket()
813-
self.state = ConnectionStates.DISCONNECTED
814813
self._sasl_auth_future = None
815814
self._protocol = KafkaProtocol(
816815
client_id=self.config['client_id'],
@@ -819,9 +818,18 @@ def close(self, error=None):
819818
error = Errors.Cancelled(str(self))
820819
ifrs = list(self.in_flight_requests.items())
821820
self.in_flight_requests.clear()
822-
self.config['state_change_callback'](self)
821+
self.state = ConnectionStates.DISCONNECTED
822+
# To avoid race conditions and/or deadlocks
823+
# keep a reference to the socket but leave it
824+
# open until after the state_change_callback
825+
# This should give clients a change to deregister
826+
# the socket fd from selectors cleanly.
827+
sock = self._sock
828+
self._sock = None
823829

824-
# drop lock before processing futures
830+
# drop lock before state change callback and processing futures
831+
self.config['state_change_callback'](self.node_id, sock, self)
832+
sock.close()
825833
for (_correlation_id, (future, _timestamp)) in ifrs:
826834
future.failure(error)
827835

test/test_client_async.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,28 +95,29 @@ def test_conn_state_change(mocker, cli, conn):
9595
node_id = 0
9696
cli._conns[node_id] = conn
9797
conn.state = ConnectionStates.CONNECTING
98-
cli._conn_state_change(node_id, conn)
98+
sock = conn._sock
99+
cli._conn_state_change(node_id, sock, conn)
99100
assert node_id in cli._connecting
100-
sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
101+
sel.register.assert_called_with(sock, selectors.EVENT_WRITE)
101102

102103
conn.state = ConnectionStates.CONNECTED
103-
cli._conn_state_change(node_id, conn)
104+
cli._conn_state_change(node_id, sock, conn)
104105
assert node_id not in cli._connecting
105-
sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
106+
sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn)
106107

107108
# Failure to connect should trigger metadata update
108109
assert cli.cluster._need_update is False
109-
conn.state = ConnectionStates.DISCONNECTING
110-
cli._conn_state_change(node_id, conn)
110+
conn.state = ConnectionStates.DISCONNECTED
111+
cli._conn_state_change(node_id, sock, conn)
111112
assert node_id not in cli._connecting
112113
assert cli.cluster._need_update is True
113-
sel.unregister.assert_called_with(conn._sock)
114+
sel.unregister.assert_called_with(sock)
114115

115116
conn.state = ConnectionStates.CONNECTING
116-
cli._conn_state_change(node_id, conn)
117+
cli._conn_state_change(node_id, sock, conn)
117118
assert node_id in cli._connecting
118-
conn.state = ConnectionStates.DISCONNECTING
119-
cli._conn_state_change(node_id, conn)
119+
conn.state = ConnectionStates.DISCONNECTED
120+
cli._conn_state_change(node_id, sock, conn)
120121
assert node_id not in cli._connecting
121122

122123

0 commit comments

Comments
 (0)