Skip to content

Commit 6919bab

Browse files
committed
refactor client.check_version()
1 parent 3f53f43 commit 6919bab

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

kafka/client_async.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -970,17 +970,19 @@ def check_version(self, node_id=None, timeout=None, strict=False):
970970
self._lock.acquire()
971971
end = time.time() + timeout
972972
while time.time() < end:
973+
time_remaining = max(end - time.time(), 0)
974+
if node_id is not None and self.connection_delay(node_id) > 0:
975+
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
976+
if sleep_time > 0:
977+
time.sleep(sleep_time)
978+
continue
973979
try_node = node_id or self.least_loaded_node()
974980
if try_node is None:
975-
time_remaining = end - time.time()
976-
if time_remaining <= 0:
977-
self._lock.release()
978-
raise Errors.NoBrokersAvailable()
979-
else:
980-
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000)
981+
sleep_time = min(time_remaining, least_loaded_node_refresh_ms / 1000.0)
982+
if sleep_time > 0:
981983
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
982984
time.sleep(sleep_time)
983-
continue
985+
continue
984986
log.debug('Attempting to check version with node %s', try_node)
985987
if not self._maybe_connect(try_node):
986988
if try_node == node_id:
@@ -1002,7 +1004,10 @@ def check_version(self, node_id=None, timeout=None, strict=False):
10021004
# Timeout
10031005
else:
10041006
self._lock.release()
1005-
raise Errors.NoBrokersAvailable()
1007+
if node_id is not None:
1008+
raise Errors.NodeNotReadyError(node_id)
1009+
else:
1010+
raise Errors.NoBrokersAvailable()
10061011

10071012
def api_version(self, operation, max_version=None):
10081013
"""Find the latest version of the protocol operation supported by both

0 commit comments

Comments
 (0)