Skip to content

Commit 85a113a

Browse files
authored
Default client.check_version timeout to api_version_auto_timeout_ms (#2496)
1 parent 5a3d95d commit 85a113a

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

kafka/admin/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def _refresh_controller_id(self, timeout_ms=30000):
261261
time.sleep(1)
262262
continue
263263
# verify the controller is new enough to support our requests
264-
controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
264+
controller_version = self._client.check_version(node_id=controller_id)
265265
if controller_version < (0, 10, 0):
266266
raise IncompatibleBrokerVersion(
267267
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."

kafka/client_async.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,7 @@ def __init__(self, **configs):
249249

250250
# Check Broker Version if not set explicitly
251251
if self.config['api_version'] is None:
252-
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
253-
self.config['api_version'] = self.check_version(timeout=check_timeout)
252+
self.config['api_version'] = self.check_version()
254253
elif self.config['api_version'] in BROKER_API_VERSIONS:
255254
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
256255
elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS:
@@ -921,13 +920,16 @@ def get_api_versions(self):
921920
"""
922921
return self._api_versions
923922

924-
def check_version(self, node_id=None, timeout=2, strict=False):
923+
def check_version(self, node_id=None, timeout=None, strict=False):
925924
"""Attempt to guess the version of a Kafka broker.
926925
927-
Note: It is possible that this method blocks longer than the
928-
specified timeout. This can happen if the entire cluster
929-
is down and the client enters a bootstrap backoff sleep.
930-
This is only possible if node_id is None.
926+
Keyword Arguments:
927+
node_id (str, optional): Broker node id from cluster metadata. If None, attempts
928+
to connect to any available broker until version is identified.
929+
Default: None
930+
timeout (num, optional): Maximum time in seconds to try to check broker version.
931+
If unable to identify version before timeout, raise error (see below).
932+
Default: api_version_auto_timeout_ms / 1000
931933
932934
Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc
933935
@@ -937,6 +939,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
937939
UnrecognizedBrokerVersion: please file bug if seen!
938940
AssertionError (if strict=True): please file bug if seen!
939941
"""
942+
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
940943
self._lock.acquire()
941944
end = time.time() + timeout
942945
while time.time() < end:

0 commit comments

Comments
 (0)