Skip to content

Commit 2a3f8c1

Browse files
committed
Handle api versions UnsupportedVersionError
1 parent 0d4b837 commit 2a3f8c1

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

kafka/conn.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ def __init__(self, host, port, afi, **configs):
249249
self._api_versions = None
250250
self._api_version = None
251251
self._check_version_idx = None
252+
self._api_versions_idx = 2
252253
self._throttle_time = None
253254

254255
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -544,13 +545,7 @@ def _try_api_versions_check(self):
544545
self._api_version = self.config['api_version']
545546
return True
546547
elif self._check_version_idx is None:
547-
# TODO: Implement newer versions
548-
# ((3, 9), ApiVersionsRequest[4]()),
549-
# ((2, 4), ApiVersionsRequest[3]()),
550-
# ((2, 0), ApiVersionsRequest[2]()),
551-
# ((0, 11), ApiVersionsRequest[1]()),
552-
# ((0, 10), ApiVersionsRequest[0]()),
553-
request = ApiVersionsRequest[0]()
548+
request = ApiVersionsRequest[self._api_versions_idx]()
554549
future = Future()
555550
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
556551
response.add_callback(self._handle_api_versions_response, future)
@@ -585,7 +580,17 @@ def _try_api_versions_check(self):
585580
def _handle_api_versions_response(self, future, response):
586581
error_type = Errors.for_code(response.error_code)
587582
# if error_type i UNSUPPORTED_VERSION: retry w/ latest version from response
588-
assert error_type is Errors.NoError, "API version check failed"
583+
if error_type is not Errors.NoError:
584+
future.failure(error_type())
585+
if error_type is Errors.UnsupportedVersionError:
586+
self._api_versions_idx -= 1
587+
if self._api_versions_idx >= 0:
588+
self._api_versions_future = None
589+
self.state = ConnectionStates.API_VERSIONS_SEND
590+
self.config['state_change_callback'](self.node_id, self._sock, self)
591+
else:
592+
self.close(error=error_type())
593+
return
589594
self._api_versions = dict([
590595
(api_key, (min_version, max_version))
591596
for api_key, min_version, max_version in response.api_versions
@@ -597,6 +602,7 @@ def _handle_api_versions_response(self, future, response):
597602
def _handle_api_versions_failure(self, future, ex):
598603
future.failure(ex)
599604
self._check_version_idx = 0
605+
# after failure connection is closed, so state should already be DISCONNECTED
600606

601607
def _handle_check_version_response(self, future, version, _response):
602608
log.info('Broker version identified as %s', '.'.join(map(str, version)))
@@ -610,6 +616,7 @@ def _handle_check_version_response(self, future, version, _response):
610616
def _handle_check_version_failure(self, future, ex):
611617
future.failure(ex)
612618
self._check_version_idx += 1
619+
# after failure connection is closed, so state should already be DISCONNECTED
613620

614621
def _try_authenticate(self):
615622
assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0)

0 commit comments

Comments
 (0)