Skip to content

Commit c84f513

Browse files
committed
Check broker version inline during connection; send api versions request before sasl
1 parent 2de3c34 commit c84f513

File tree

3 files changed

+152
-164
lines changed

3 files changed

+152
-164
lines changed

kafka/client_async.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def _can_connect(self, node_id):
303303

304304
def _conn_state_change(self, node_id, sock, conn):
305305
with self._lock:
306-
if conn.connecting():
306+
if conn.state is ConnectionStates.CONNECTING:
307307
# SSL connections can enter this state 2x (second during Handshake)
308308
if node_id not in self._connecting:
309309
self._connecting.add(node_id)
@@ -315,7 +315,13 @@ def _conn_state_change(self, node_id, sock, conn):
315315
if self.cluster.is_bootstrap(node_id):
316316
self._last_bootstrap = time.time()
317317

318-
elif conn.connected():
318+
elif conn.state in (ConnectionStates.API_VERSIONS, ConnectionStates.AUTHENTICATING):
319+
try:
320+
self._selector.register(sock, selectors.EVENT_READ, conn)
321+
except KeyError:
322+
self._selector.modify(sock, selectors.EVENT_READ, conn)
323+
324+
elif conn.state is ConnectionStates.CONNECTED:
319325
log.debug("Node %s connected", node_id)
320326
if node_id in self._connecting:
321327
self._connecting.remove(node_id)
@@ -959,45 +965,39 @@ def check_version(self, node_id=None, timeout=None, strict=False):
959965
Raises:
960966
NodeNotReadyError (if node_id is provided)
961967
NoBrokersAvailable (if node_id is None)
962-
UnrecognizedBrokerVersion: please file bug if seen!
963-
AssertionError (if strict=True): please file bug if seen!
964968
"""
965969
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
966970
self._lock.acquire()
967971
end = time.time() + timeout
968972
while time.time() < end:
969-
970-
# It is possible that least_loaded_node falls back to bootstrap,
971-
# which can block for an increasing backoff period
972973
try_node = node_id or self.least_loaded_node()
973974
if try_node is None:
974-
self._lock.release()
975-
raise Errors.NoBrokersAvailable()
975+
time_remaining = end - time.time()
976+
if time_remaining <= 0:
977+
self._lock.release()
978+
raise Errors.NoBrokersAvailable()
979+
else:
980+
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000)
981+
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
982+
time.sleep(sleep_time)
983+
continue
984+
log.debug('Attempting to check version with node %s', try_node)
976985
if not self._maybe_connect(try_node):
977986
if try_node == node_id:
978987
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
979988
else:
980989
continue
981-
982990
conn = self._conns[try_node]
983991

984-
# We will intentionally cause socket failures
985-
# These should not trigger metadata refresh
986-
self._refresh_on_disconnects = False
987-
try:
988-
remaining = end - time.time()
989-
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
990-
if not self._api_versions:
991-
self._api_versions = conn.get_api_versions()
992+
while not conn.disconnected() and conn._api_version is None and time.time() < end:
993+
timeout_ms = min((end - time.time()) * 1000, 200)
994+
self.poll(timeout_ms=timeout_ms)
995+
996+
if conn._api_version is not None:
992997
self._lock.release()
993-
return version
994-
except Errors.NodeNotReadyError:
995-
# Only raise to user if this is a node-specific request
996-
if node_id is not None:
997-
self._lock.release()
998-
raise
999-
finally:
1000-
self._refresh_on_disconnects = True
998+
if not self._api_versions:
999+
self._api_versions = conn._api_versions
1000+
return conn._api_version
10011001

10021002
# Timeout
10031003
else:

0 commit comments

Comments
 (0)