Skip to content

Commit 5e60bb5

Browse files
committed
with self._lock
1 parent f892b91 commit 5e60bb5

File tree

1 file changed

+37
-39
lines changed

1 file changed

+37
-39
lines changed

kafka/client_async.py

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,47 +1002,45 @@ def check_version(self, node_id=None, timeout=None, strict=False):
10021002
NoBrokersAvailable (if node_id is None)
10031003
"""
10041004
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
1005-
self._lock.acquire()
1006-
end = time.time() + timeout
1007-
while time.time() < end:
1008-
time_remaining = max(end - time.time(), 0)
1009-
if node_id is not None and self.connection_delay(node_id) > 0:
1010-
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
1011-
if sleep_time > 0:
1012-
time.sleep(sleep_time)
1013-
continue
1014-
try_node = node_id or self.least_loaded_node()
1015-
if try_node is None:
1016-
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
1017-
if sleep_time > 0:
1018-
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
1019-
time.sleep(sleep_time)
1020-
continue
1021-
log.debug('Attempting to check version with node %s', try_node)
1022-
if not self._init_connect(try_node):
1023-
if try_node == node_id:
1024-
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
1025-
else:
1005+
with self._lock:
1006+
end = time.time() + timeout
1007+
while time.time() < end:
1008+
time_remaining = max(end - time.time(), 0)
1009+
if node_id is not None and self.connection_delay(node_id) > 0:
1010+
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
1011+
if sleep_time > 0:
1012+
time.sleep(sleep_time)
10261013
continue
1027-
conn = self._conns[try_node]
1028-
1029-
while not conn.disconnected() and conn._api_version is None and time.time() < end:
1030-
timeout_ms = min((end - time.time()) * 1000, 200)
1031-
self.poll(timeout_ms=timeout_ms)
1032-
1033-
if conn._api_version is not None:
1034-
self._lock.release()
1035-
if not self._api_versions:
1036-
self._api_versions = conn._api_versions
1037-
return conn._api_version
1038-
1039-
# Timeout
1040-
else:
1041-
self._lock.release()
1042-
if node_id is not None:
1043-
raise Errors.NodeNotReadyError(node_id)
1014+
try_node = node_id or self.least_loaded_node()
1015+
if try_node is None:
1016+
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
1017+
if sleep_time > 0:
1018+
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
1019+
time.sleep(sleep_time)
1020+
continue
1021+
log.debug('Attempting to check version with node %s', try_node)
1022+
if not self._init_connect(try_node):
1023+
if try_node == node_id:
1024+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
1025+
else:
1026+
continue
1027+
conn = self._conns[try_node]
1028+
1029+
while not conn.disconnected() and conn._api_version is None and time.time() < end:
1030+
timeout_ms = min((end - time.time()) * 1000, 200)
1031+
self.poll(timeout_ms=timeout_ms)
1032+
1033+
if conn._api_version is not None:
1034+
if not self._api_versions:
1035+
self._api_versions = conn._api_versions
1036+
return conn._api_version
1037+
1038+
# Timeout
10441039
else:
1045-
raise Errors.NoBrokersAvailable()
1040+
if node_id is not None:
1041+
raise Errors.NodeNotReadyError(node_id)
1042+
else:
1043+
raise Errors.NoBrokersAvailable()
10461044

10471045
def api_version(self, operation, max_version=None):
10481046
"""Find the latest version of the protocol operation supported by both

0 commit comments

Comments
 (0)