Skip to content

Commit 760f741

Browse files
authored
Call ApiVersionsRequest during connection, prior to Sasl Handshake (#2493)
1 parent 5141d01 commit 760f741

File tree

4 files changed

+238
-199
lines changed

4 files changed

+238
-199
lines changed

kafka/client_async.py

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def _can_connect(self, node_id):
303303

304304
def _conn_state_change(self, node_id, sock, conn):
305305
with self._lock:
306-
if conn.connecting():
306+
if conn.state is ConnectionStates.CONNECTING:
307307
# SSL connections can enter this state 2x (second during Handshake)
308308
if node_id not in self._connecting:
309309
self._connecting.add(node_id)
@@ -315,7 +315,19 @@ def _conn_state_change(self, node_id, sock, conn):
315315
if self.cluster.is_bootstrap(node_id):
316316
self._last_bootstrap = time.time()
317317

318-
elif conn.connected():
318+
elif conn.state is ConnectionStates.API_VERSIONS_SEND:
319+
try:
320+
self._selector.register(sock, selectors.EVENT_WRITE, conn)
321+
except KeyError:
322+
self._selector.modify(sock, selectors.EVENT_WRITE, conn)
323+
324+
elif conn.state in (ConnectionStates.API_VERSIONS_RECV, ConnectionStates.AUTHENTICATING):
325+
try:
326+
self._selector.register(sock, selectors.EVENT_READ, conn)
327+
except KeyError:
328+
self._selector.modify(sock, selectors.EVENT_READ, conn)
329+
330+
elif conn.state is ConnectionStates.CONNECTED:
319331
log.debug("Node %s connected", node_id)
320332
if node_id in self._connecting:
321333
self._connecting.remove(node_id)
@@ -332,6 +344,8 @@ def _conn_state_change(self, node_id, sock, conn):
332344

333345
if self.cluster.is_bootstrap(node_id):
334346
self._bootstrap_fails = 0
347+
if self._api_versions is None:
348+
self._api_versions = conn._api_versions
335349

336350
else:
337351
for node_id in list(self._conns.keys()):
@@ -970,15 +984,14 @@ def refresh_done(val_or_error):
970984
def get_api_versions(self):
971985
"""Return the ApiVersions map, if available.
972986
973-
Note: A call to check_version must previously have succeeded and returned
974-
version 0.10.0 or later
987+
Note: Only available after bootstrap; requires broker version 0.10.0 or later.
975988
976989
Returns: a map of dict mapping {api_key : (min_version, max_version)},
977990
or None if ApiVersion is not supported by the kafka cluster.
978991
"""
979992
return self._api_versions
980993

981-
def check_version(self, node_id=None, timeout=None, strict=False):
994+
def check_version(self, node_id=None, timeout=None, **kwargs):
982995
"""Attempt to guess the version of a Kafka broker.
983996
984997
Keyword Arguments:
@@ -994,50 +1007,45 @@ def check_version(self, node_id=None, timeout=None, strict=False):
9941007
Raises:
9951008
NodeNotReadyError (if node_id is provided)
9961009
NoBrokersAvailable (if node_id is None)
997-
UnrecognizedBrokerVersion: please file bug if seen!
998-
AssertionError (if strict=True): please file bug if seen!
9991010
"""
10001011
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
1001-
self._lock.acquire()
1002-
end = time.time() + timeout
1003-
while time.time() < end:
1004-
1005-
# It is possible that least_loaded_node falls back to bootstrap,
1006-
# which can block for an increasing backoff period
1007-
try_node = node_id or self.least_loaded_node()
1008-
if try_node is None:
1009-
self._lock.release()
1010-
raise Errors.NoBrokersAvailable()
1011-
if not self._init_connect(try_node):
1012-
if try_node == node_id:
1013-
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
1014-
else:
1012+
with self._lock:
1013+
end = time.time() + timeout
1014+
while time.time() < end:
1015+
time_remaining = max(end - time.time(), 0)
1016+
if node_id is not None and self.connection_delay(node_id) > 0:
1017+
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
1018+
if sleep_time > 0:
1019+
time.sleep(sleep_time)
10151020
continue
1016-
1017-
conn = self._conns[try_node]
1018-
1019-
# We will intentionally cause socket failures
1020-
# These should not trigger metadata refresh
1021-
self._refresh_on_disconnects = False
1022-
try:
1023-
remaining = end - time.time()
1024-
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
1025-
if not self._api_versions:
1026-
self._api_versions = conn.get_api_versions()
1027-
self._lock.release()
1028-
return version
1029-
except Errors.NodeNotReadyError:
1030-
# Only raise to user if this is a node-specific request
1021+
try_node = node_id or self.least_loaded_node()
1022+
if try_node is None:
1023+
sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0)
1024+
if sleep_time > 0:
1025+
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
1026+
time.sleep(sleep_time)
1027+
continue
1028+
log.debug('Attempting to check version with node %s', try_node)
1029+
if not self._init_connect(try_node):
1030+
if try_node == node_id:
1031+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
1032+
else:
1033+
continue
1034+
conn = self._conns[try_node]
1035+
1036+
while conn.connecting() and time.time() < end:
1037+
timeout_ms = min((end - time.time()) * 1000, 200)
1038+
self.poll(timeout_ms=timeout_ms)
1039+
1040+
if conn._api_version is not None:
1041+
return conn._api_version
1042+
1043+
# Timeout
1044+
else:
10311045
if node_id is not None:
1032-
self._lock.release()
1033-
raise
1034-
finally:
1035-
self._refresh_on_disconnects = True
1036-
1037-
# Timeout
1038-
else:
1039-
self._lock.release()
1040-
raise Errors.NoBrokersAvailable()
1046+
raise Errors.NodeNotReadyError(node_id)
1047+
else:
1048+
raise Errors.NoBrokersAvailable()
10411049

10421050
def api_version(self, operation, max_version=None):
10431051
"""Find the latest version of the protocol operation supported by both
@@ -1063,15 +1071,15 @@ def api_version(self, operation, max_version=None):
10631071
broker_api_versions = self._api_versions
10641072
api_key = operation[0].API_KEY
10651073
if broker_api_versions is None or api_key not in broker_api_versions:
1066-
raise IncompatibleBrokerVersion(
1074+
raise Errors.IncompatibleBrokerVersion(
10671075
"Kafka broker does not support the '{}' Kafka protocol."
10681076
.format(operation[0].__name__))
10691077
broker_min_version, broker_max_version = broker_api_versions[api_key]
10701078
version = min(max_version, broker_max_version)
10711079
if version < broker_min_version:
10721080
# max library version is less than min broker version. Currently,
10731081
# no Kafka versions specify a min msg version. Maybe in the future?
1074-
raise IncompatibleBrokerVersion(
1082+
raise Errors.IncompatibleBrokerVersion(
10751083
"No version of the '{}' Kafka protocol is supported by both the client and broker."
10761084
.format(operation[0].__name__))
10771085
return version

0 commit comments

Comments
 (0)