Skip to content

Commit 4acd084

Browse files
committed
with self._lock
1 parent 39095a9 commit 4acd084

File tree

1 file changed

+37
-39
lines changed

1 file changed

+37
-39
lines changed

kafka/client_async.py

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -965,47 +965,45 @@ def check_version(self, node_id=None, timeout=None, strict=False):
965965
NoBrokersAvailable (if node_id is None)
966966
"""
967967
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
968-
self._lock.acquire()
969-
end = time.time() + timeout
970-
while time.time() < end:
971-
time_remaining = max(end - time.time(), 0)
972-
if node_id is not None and self.connection_delay(node_id) > 0:
973-
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
974-
if sleep_time > 0:
975-
time.sleep(sleep_time)
976-
continue
977-
try_node = node_id or self.least_loaded_node()
978-
if try_node is None:
979-
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
980-
if sleep_time > 0:
981-
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
982-
time.sleep(sleep_time)
983-
continue
984-
log.debug('Attempting to check version with node %s', try_node)
985-
if not self._init_connect(try_node):
986-
if try_node == node_id:
987-
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
988-
else:
968+
with self._lock:
969+
end = time.time() + timeout
970+
while time.time() < end:
971+
time_remaining = max(end - time.time(), 0)
972+
if node_id is not None and self.connection_delay(node_id) > 0:
973+
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
974+
if sleep_time > 0:
975+
time.sleep(sleep_time)
989976
continue
990-
conn = self._conns[try_node]
991-
992-
while not conn.disconnected() and conn._api_version is None and time.time() < end:
993-
timeout_ms = min((end - time.time()) * 1000, 200)
994-
self.poll(timeout_ms=timeout_ms)
995-
996-
if conn._api_version is not None:
997-
self._lock.release()
998-
if not self._api_versions:
999-
self._api_versions = conn._api_versions
1000-
return conn._api_version
1001-
1002-
# Timeout
1003-
else:
1004-
self._lock.release()
1005-
if node_id is not None:
1006-
raise Errors.NodeNotReadyError(node_id)
977+
try_node = node_id or self.least_loaded_node()
978+
if try_node is None:
979+
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
980+
if sleep_time > 0:
981+
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
982+
time.sleep(sleep_time)
983+
continue
984+
log.debug('Attempting to check version with node %s', try_node)
985+
if not self._init_connect(try_node):
986+
if try_node == node_id:
987+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
988+
else:
989+
continue
990+
conn = self._conns[try_node]
991+
992+
while not conn.disconnected() and conn._api_version is None and time.time() < end:
993+
timeout_ms = min((end - time.time()) * 1000, 200)
994+
self.poll(timeout_ms=timeout_ms)
995+
996+
if conn._api_version is not None:
997+
if not self._api_versions:
998+
self._api_versions = conn._api_versions
999+
return conn._api_version
1000+
1001+
# Timeout
10071002
else:
1008-
raise Errors.NoBrokersAvailable()
1003+
if node_id is not None:
1004+
raise Errors.NodeNotReadyError(node_id)
1005+
else:
1006+
raise Errors.NoBrokersAvailable()
10091007

10101008
def api_version(self, operation, max_version=None):
10111009
"""Find the latest version of the protocol operation supported by both

0 commit comments

Comments
 (0)