Skip to content

Commit bc42915

Browse files
committed
Do not call conn state_change_callback with lock
* Remove DISCONNECTING connection state * do not call state_change_callback with lock * Pass node_id, socket, and connection object to callback
1 parent 27cd93b commit bc42915

File tree

3 files changed

+39
-31
lines changed

3 files changed

+39
-31
lines changed

kafka/client_async.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,17 @@ def _can_connect(self, node_id):
260260
conn = self._conns[node_id]
261261
return conn.disconnected() and not conn.blacked_out()
262262

263-
def _conn_state_change(self, node_id, conn):
263+
def _conn_state_change(self, node_id, sock, conn):
264+
close_conns = []
264265
with self._lock:
265266
if conn.connecting():
266267
# SSL connections can enter this state 2x (second during Handshake)
267268
if node_id not in self._connecting:
268269
self._connecting.add(node_id)
269270
try:
270-
self._selector.register(conn._sock, selectors.EVENT_WRITE)
271+
key_selector = self._selector.register(sock, selectors.EVENT_WRITE)
271272
except KeyError:
272-
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
273+
key_selector = self._selector.modify(sock, selectors.EVENT_WRITE)
273274

274275
if self.cluster.is_bootstrap(node_id):
275276
self._last_bootstrap = time.time()
@@ -280,9 +281,9 @@ def _conn_state_change(self, node_id, conn):
280281
self._connecting.remove(node_id)
281282

282283
try:
283-
self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
284+
self._selector.modify(sock, selectors.EVENT_READ, conn)
284285
except KeyError:
285-
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
286+
self._selector.register(sock, selectors.EVENT_READ, conn)
286287

287288
if self._sensors:
288289
self._sensors.connection_created.record()
@@ -298,11 +299,11 @@ def _conn_state_change(self, node_id, conn):
298299
self._conns.pop(node_id).close()
299300

300301
# Connection failures imply that our metadata is stale, so let's refresh
301-
elif conn.state is ConnectionStates.DISCONNECTING:
302+
elif conn.state is ConnectionStates.DISCONNECTED:
302303
if node_id in self._connecting:
303304
self._connecting.remove(node_id)
304305
try:
305-
self._selector.unregister(conn._sock)
306+
self._selector.unregister(sock)
306307
except KeyError:
307308
pass
308309

@@ -369,7 +370,7 @@ def _maybe_connect(self, node_id):
369370
log.debug("Initiating connection to node %s at %s:%s",
370371
node_id, broker.host, broker.port)
371372
host, port, afi = get_ip_port_afi(broker.host)
372-
cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
373+
cb = WeakMethod(self._conn_state_change)
373374
conn = BrokerConnection(host, broker.port, afi,
374375
state_change_callback=cb,
375376
node_id=node_id,

kafka/conn.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ class BrokerConnection(object):
212212
'ssl_ciphers': None,
213213
'api_version': (0, 8, 2), # default to most restrictive
214214
'selector': selectors.DefaultSelector,
215-
'state_change_callback': lambda conn: True,
215+
'state_change_callback': lambda node_id, sock, conn: True,
216216
'metrics': None,
217217
'metric_group_prefix': '',
218218
'sasl_mechanism': None,
@@ -357,6 +357,7 @@ def connect(self):
357357
return self.state
358358
else:
359359
log.debug('%s: creating new socket', self)
360+
assert self._sock is None
360361
self._sock_afi, self._sock_addr = next_lookup
361362
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
362363

@@ -366,7 +367,7 @@ def connect(self):
366367

367368
self._sock.setblocking(False)
368369
self.state = ConnectionStates.CONNECTING
369-
self.config['state_change_callback'](self)
370+
self.config['state_change_callback'](self.node_id, self._sock, self)
370371
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
371372
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
372373

@@ -386,21 +387,21 @@ def connect(self):
386387
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
387388
log.debug('%s: initiating SSL handshake', self)
388389
self.state = ConnectionStates.HANDSHAKE
389-
self.config['state_change_callback'](self)
390+
self.config['state_change_callback'](self.node_id, self._sock, self)
390391
# _wrap_ssl can alter the connection state -- disconnects on failure
391392
self._wrap_ssl()
392393

393394
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
394395
log.debug('%s: initiating SASL authentication', self)
395396
self.state = ConnectionStates.AUTHENTICATING
396-
self.config['state_change_callback'](self)
397+
self.config['state_change_callback'](self.node_id, self._sock, self)
397398

398399
else:
399400
# security_protocol PLAINTEXT
400401
log.info('%s: Connection complete.', self)
401402
self.state = ConnectionStates.CONNECTED
402403
self._reset_reconnect_backoff()
403-
self.config['state_change_callback'](self)
404+
self.config['state_change_callback'](self.node_id, self._sock, self)
404405

405406
# Connection failed
406407
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -425,7 +426,7 @@ def connect(self):
425426
log.info('%s: Connection complete.', self)
426427
self.state = ConnectionStates.CONNECTED
427428
self._reset_reconnect_backoff()
428-
self.config['state_change_callback'](self)
429+
self.config['state_change_callback'](self.node_id, self._sock, self)
429430

430431
if self.state is ConnectionStates.AUTHENTICATING:
431432
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
@@ -435,7 +436,7 @@ def connect(self):
435436
log.info('%s: Connection complete.', self)
436437
self.state = ConnectionStates.CONNECTED
437438
self._reset_reconnect_backoff()
438-
self.config['state_change_callback'](self)
439+
self.config['state_change_callback'](self.node_id, self._sock, self)
439440

440441
if self.state not in (ConnectionStates.CONNECTED,
441442
ConnectionStates.DISCONNECTED):
@@ -806,11 +807,7 @@ def close(self, error=None):
806807
if self.state is ConnectionStates.DISCONNECTED:
807808
return
808809
log.info('%s: Closing connection. %s', self, error or '')
809-
self.state = ConnectionStates.DISCONNECTING
810-
self.config['state_change_callback'](self)
811810
self._update_reconnect_backoff()
812-
self._close_socket()
813-
self.state = ConnectionStates.DISCONNECTED
814811
self._sasl_auth_future = None
815812
self._protocol = KafkaProtocol(
816813
client_id=self.config['client_id'],
@@ -819,9 +816,18 @@ def close(self, error=None):
819816
error = Errors.Cancelled(str(self))
820817
ifrs = list(self.in_flight_requests.items())
821818
self.in_flight_requests.clear()
822-
self.config['state_change_callback'](self)
819+
self.state = ConnectionStates.DISCONNECTED
820+
# To avoid race conditions and/or deadlocks
821+
# keep a reference to the socket but leave it
822+
# open until after the state_change_callback
823+
# This should give clients a change to deregister
824+
# the socket fd from selectors cleanly.
825+
sock = self._sock
826+
self._sock = None
823827

824-
# drop lock before processing futures
828+
# drop lock before state change callback and processing futures
829+
self.config['state_change_callback'](self.node_id, sock, self)
830+
sock.close()
825831
for (_correlation_id, (future, _timestamp)) in ifrs:
826832
future.failure(error)
827833

test/test_client_async.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,28 +95,29 @@ def test_conn_state_change(mocker, cli, conn):
9595
node_id = 0
9696
cli._conns[node_id] = conn
9797
conn.state = ConnectionStates.CONNECTING
98-
cli._conn_state_change(node_id, conn)
98+
sock = conn._sock
99+
cli._conn_state_change(node_id, sock, conn)
99100
assert node_id in cli._connecting
100-
sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
101+
sel.register.assert_called_with(sock, selectors.EVENT_WRITE)
101102

102103
conn.state = ConnectionStates.CONNECTED
103-
cli._conn_state_change(node_id, conn)
104+
cli._conn_state_change(node_id, sock, conn)
104105
assert node_id not in cli._connecting
105-
sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
106+
sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn)
106107

107108
# Failure to connect should trigger metadata update
108109
assert cli.cluster._need_update is False
109-
conn.state = ConnectionStates.DISCONNECTING
110-
cli._conn_state_change(node_id, conn)
110+
conn.state = ConnectionStates.DISCONNECTED
111+
cli._conn_state_change(node_id, sock, conn)
111112
assert node_id not in cli._connecting
112113
assert cli.cluster._need_update is True
113-
sel.unregister.assert_called_with(conn._sock)
114+
sel.unregister.assert_called_with(sock)
114115

115116
conn.state = ConnectionStates.CONNECTING
116-
cli._conn_state_change(node_id, conn)
117+
cli._conn_state_change(node_id, sock, conn)
117118
assert node_id in cli._connecting
118-
conn.state = ConnectionStates.DISCONNECTING
119-
cli._conn_state_change(node_id, conn)
119+
conn.state = ConnectionStates.DISCONNECTED
120+
cli._conn_state_change(node_id, sock, conn)
120121
assert node_id not in cli._connecting
121122

122123

0 commit comments

Comments
 (0)