Skip to content

Commit 907fb35

Browse files
committed
conn: use api_version_auto_timeout_ms for timeout in version checking
1 parent 0e289bd commit 907fb35

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

kafka/conn.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class BrokerConnection(object):
171171
Default: None
172172
api_version_auto_timeout_ms (int): number of milliseconds to throw a
173173
timeout exception from the constructor when checking the broker
174-
api version. Only applies if api_version is None
174+
api version. Only applies if api_version is None. Default: 2000.
175175
selector (selectors.BaseSelector): Provide a specific selector
176176
implementation to use for I/O multiplexing.
177177
Default: selectors.DefaultSelector
@@ -217,6 +217,7 @@ class BrokerConnection(object):
217217
'ssl_password': None,
218218
'ssl_ciphers': None,
219219
'api_version': None,
220+
'api_version_auto_timeout_ms': 2000,
220221
'selector': selectors.DefaultSelector,
221222
'state_change_callback': lambda node_id, sock, conn: True,
222223
'metrics': None,
@@ -549,14 +550,14 @@ def _try_api_versions_check(self):
549550
# ((0, 10), ApiVersionsRequest[0]()),
550551
request = ApiVersionsRequest[0]()
551552
future = Future()
552-
response = self._send(request, blocking=True)
553+
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
553554
response.add_callback(self._handle_api_versions_response, future)
554555
response.add_errback(self._handle_api_versions_failure, future)
555556
self._api_versions_future = future
556557
elif self._check_version_idx < len(self.VERSION_CHECKS):
557558
version, request = self.VERSION_CHECKS[self._check_version_idx]
558559
future = Future()
559-
response = self._send(request, blocking=True)
560+
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
560561
response.add_callback(self._handle_check_version_response, future, version)
561562
response.add_errback(self._handle_check_version_failure, future)
562563
self._api_versions_future = future

0 commit comments

Comments
 (0)