Skip to content

Commit f892b91

Browse files
committed
refactor client.check_version()
1 parent 31419ac commit f892b91

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

kafka/client_async.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,17 +1005,19 @@ def check_version(self, node_id=None, timeout=None, strict=False):
10051005
self._lock.acquire()
10061006
end = time.time() + timeout
10071007
while time.time() < end:
1008+
time_remaining = max(end - time.time(), 0)
1009+
if node_id is not None and self.connection_delay(node_id) > 0:
1010+
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
1011+
if sleep_time > 0:
1012+
time.sleep(sleep_time)
1013+
continue
10081014
try_node = node_id or self.least_loaded_node()
10091015
if try_node is None:
1010-
time_remaining = end - time.time()
1011-
if time_remaining <= 0:
1012-
self._lock.release()
1013-
raise Errors.NoBrokersAvailable()
1014-
else:
1015-
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000)
1016+
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
1017+
if sleep_time > 0:
10161018
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
10171019
time.sleep(sleep_time)
1018-
continue
1020+
continue
10191021
log.debug('Attempting to check version with node %s', try_node)
10201022
if not self._init_connect(try_node):
10211023
if try_node == node_id:
@@ -1037,7 +1039,10 @@ def check_version(self, node_id=None, timeout=None, strict=False):
10371039
# Timeout
10381040
else:
10391041
self._lock.release()
1040-
raise Errors.NoBrokersAvailable()
1042+
if node_id is not None:
1043+
raise Errors.NodeNotReadyError(node_id)
1044+
else:
1045+
raise Errors.NoBrokersAvailable()
10411046

10421047
def api_version(self, operation, max_version=None):
10431048
"""Find the latest version of the protocol operation supported by both

0 commit comments

Comments
 (0)