Skip to content

Commit be01345

Browse files
committed
Fake api_versions for old brokers; rename to ApiVersionsRequest; handle errors
1 parent fe98d6c commit be01345

File tree

4 files changed

+169
-79
lines changed

4 files changed

+169
-79
lines changed

kafka/client_async.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -925,10 +925,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
925925
try:
926926
remaining = end - time.time()
927927
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
928-
if version >= (0, 10, 0):
929-
# cache the api versions map if it's available (starting
930-
# in 0.10 cluster version)
931-
self._api_versions = conn.get_api_versions()
928+
self._api_versions = conn.get_api_versions()
932929
self._lock.release()
933930
return version
934931
except Errors.NodeNotReadyError:

kafka/conn.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,9 +1154,10 @@ def next_ifr_request_timeout_ms(self):
11541154
else:
11551155
return float('inf')
11561156

1157-
def _handle_api_version_response(self, response):
1157+
def _handle_api_versions_response(self, response):
11581158
error_type = Errors.for_code(response.error_code)
1159-
assert error_type is Errors.NoError, "API version check failed"
1159+
if error_type is not Errors.NoError:
1160+
return False
11601161
self._api_versions = dict([
11611162
(api_key, (min_version, max_version))
11621163
for api_key, min_version, max_version in response.api_versions
@@ -1168,12 +1169,7 @@ def get_api_versions(self):
11681169
return self._api_versions
11691170

11701171
version = self.check_version()
1171-
if version < (0, 10, 0):
1172-
raise Errors.UnsupportedVersionError(
1173-
"ApiVersion not supported by cluster version {} < 0.10.0"
1174-
.format(version))
1175-
# _api_versions is set as a side effect of check_versions() on a cluster
1176-
# that supports 0.10.0 or later
1172+
# _api_versions is set as a side effect of check_versions()
11771173
return self._api_versions
11781174

11791175
def _infer_broker_version_from_api_versions(self, api_versions):
@@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
12041200
if min_version <= struct.API_VERSION <= max_version:
12051201
return broker_version
12061202

1207-
# We know that ApiVersionResponse is only supported in 0.10+
1203+
# We know that ApiVersionsResponse is only supported in 0.10+
12081204
# so if all else fails, choose that
12091205
return (0, 10, 0)
12101206

@@ -1236,12 +1232,14 @@ def reset_override_configs():
12361232
# vanilla MetadataRequest. If the server did not recognize the first
12371233
# request, both will be failed with a ConnectionError that wraps
12381234
# socket.error (32, 54, or 104)
1239-
from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
1235+
from kafka.protocol.admin import ListGroupsRequest
1236+
from kafka.protocol.api_versions import ApiVersionsRequest, OLD_BROKER_API_VERSIONS
12401237
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
12411238

12421239
test_cases = [
1243-
# All cases starting from 0.10 will be based on ApiVersionResponse
1244-
((0, 10), ApiVersionRequest[0]()),
1240+
# All cases starting from 0.10 will be based on ApiVersionsResponse
1241+
((0, 11), ApiVersionsRequest[1]()),
1242+
((0, 10), ApiVersionsRequest[0]()),
12451243
((0, 9), ListGroupsRequest[0]()),
12461244
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
12471245
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
@@ -1274,11 +1272,15 @@ def reset_override_configs():
12741272
selector.close()
12751273

12761274
if f.succeeded():
1277-
if isinstance(request, ApiVersionRequest[0]):
1275+
if version >= (0, 10):
12781276
# Starting from 0.10 kafka broker we determine version
1279-
# by looking at ApiVersionResponse
1280-
api_versions = self._handle_api_version_response(f.value)
1277+
# by looking at ApiVersionsResponse
1278+
api_versions = self._handle_api_versions_response(f.value)
1279+
if not api_versions:
1280+
continue
12811281
version = self._infer_broker_version_from_api_versions(api_versions)
1282+
else:
1283+
self._api_versions = OLD_BROKER_API_VERSIONS.get(version)
12821284
log.info('Broker version identified as %s', '.'.join(map(str, version)))
12831285
log.info('Set configuration api_version=%s to skip auto'
12841286
' check_version requests on startup', version)

kafka/protocol/admin.py

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,6 @@
44
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
55

66

7-
class ApiVersionResponse_v0(Response):
8-
API_KEY = 18
9-
API_VERSION = 0
10-
SCHEMA = Schema(
11-
('error_code', Int16),
12-
('api_versions', Array(
13-
('api_key', Int16),
14-
('min_version', Int16),
15-
('max_version', Int16)))
16-
)
17-
18-
19-
class ApiVersionResponse_v1(Response):
20-
API_KEY = 18
21-
API_VERSION = 1
22-
SCHEMA = Schema(
23-
('error_code', Int16),
24-
('api_versions', Array(
25-
('api_key', Int16),
26-
('min_version', Int16),
27-
('max_version', Int16))),
28-
('throttle_time_ms', Int32)
29-
)
30-
31-
32-
class ApiVersionResponse_v2(Response):
33-
API_KEY = 18
34-
API_VERSION = 2
35-
SCHEMA = ApiVersionResponse_v1.SCHEMA
36-
37-
38-
class ApiVersionRequest_v0(Request):
39-
API_KEY = 18
40-
API_VERSION = 0
41-
RESPONSE_TYPE = ApiVersionResponse_v0
42-
SCHEMA = Schema()
43-
44-
45-
class ApiVersionRequest_v1(Request):
46-
API_KEY = 18
47-
API_VERSION = 1
48-
RESPONSE_TYPE = ApiVersionResponse_v1
49-
SCHEMA = ApiVersionRequest_v0.SCHEMA
50-
51-
52-
class ApiVersionRequest_v2(Request):
53-
API_KEY = 18
54-
API_VERSION = 2
55-
RESPONSE_TYPE = ApiVersionResponse_v1
56-
SCHEMA = ApiVersionRequest_v0.SCHEMA
57-
58-
59-
ApiVersionRequest = [
60-
ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2,
61-
]
62-
ApiVersionResponse = [
63-
ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2,
64-
]
65-
66-
677
class CreateTopicsResponse_v0(Response):
688
API_KEY = 19
699
API_VERSION = 0

kafka/protocol/api_versions.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, Int16, Int32, Schema
5+
6+
7+
class BaseApiVersionsResponse(Response):
8+
API_KEY = 18
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('error_code', Int16),
12+
('api_versions', Array(
13+
('api_key', Int16),
14+
('min_version', Int16),
15+
('max_version', Int16)))
16+
)
17+
18+
@classmethod
19+
def decode(cls, data):
20+
if isinstance(data, bytes):
21+
data = BytesIO(data)
22+
# Check error_code, decode as v0 if any error
23+
curr = data.tell()
24+
err = Int16.decode(data)
25+
data.seek(curr)
26+
if err != 0:
27+
return ApiVersionsResponse_v0.decode(data)
28+
return super(BaseApiVersionsResponse, cls).decode(data)
29+
30+
31+
class ApiVersionsResponse_v0(Response):
32+
API_KEY = 18
33+
API_VERSION = 0
34+
SCHEMA = Schema(
35+
('error_code', Int16),
36+
('api_versions', Array(
37+
('api_key', Int16),
38+
('min_version', Int16),
39+
('max_version', Int16)))
40+
)
41+
42+
43+
class ApiVersionsResponse_v1(BaseApiVersionsResponse):
44+
API_KEY = 18
45+
API_VERSION = 1
46+
SCHEMA = Schema(
47+
('error_code', Int16),
48+
('api_versions', Array(
49+
('api_key', Int16),
50+
('min_version', Int16),
51+
('max_version', Int16))),
52+
('throttle_time_ms', Int32)
53+
)
54+
55+
56+
class ApiVersionsResponse_v2(BaseApiVersionsResponse):
57+
API_KEY = 18
58+
API_VERSION = 2
59+
SCHEMA = ApiVersionsResponse_v1.SCHEMA
60+
61+
62+
class ApiVersionsRequest_v0(Request):
63+
API_KEY = 18
64+
API_VERSION = 0
65+
RESPONSE_TYPE = ApiVersionsResponse_v0
66+
SCHEMA = Schema()
67+
68+
69+
class ApiVersionsRequest_v1(Request):
70+
API_KEY = 18
71+
API_VERSION = 1
72+
RESPONSE_TYPE = ApiVersionsResponse_v1
73+
SCHEMA = ApiVersionsRequest_v0.SCHEMA
74+
75+
76+
class ApiVersionsRequest_v2(Request):
77+
API_KEY = 18
78+
API_VERSION = 2
79+
RESPONSE_TYPE = ApiVersionsResponse_v1
80+
SCHEMA = ApiVersionsRequest_v0.SCHEMA
81+
82+
83+
ApiVersionsRequest = [
84+
ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2,
85+
]
86+
ApiVersionsResponse = [
87+
ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2,
88+
]
89+
90+
91+
OLD_BROKER_API_VERSIONS = {
92+
(0, 8, 0): {
93+
0: (0, 0),
94+
1: (0, 0),
95+
2: (0, 0),
96+
3: (0, 0),
97+
},
98+
(0, 8, 1): { # adds offset commit + fetch
99+
0: (0, 0),
100+
1: (0, 0),
101+
2: (0, 0),
102+
3: (0, 0),
103+
8: (0, 0),
104+
9: (0, 0),
105+
},
106+
(0, 8, 2): { # adds find coordinator
107+
0: (0, 0),
108+
1: (0, 0),
109+
2: (0, 0),
110+
3: (0, 0),
111+
8: (0, 1),
112+
9: (0, 1),
113+
10: (0, 0),
114+
},
115+
(0, 9): { # adds group management (join/sync/leave/heartbeat)
116+
0: (0, 1),
117+
1: (0, 1),
118+
2: (0, 0),
119+
3: (0, 0),
120+
8: (0, 2),
121+
9: (0, 1),
122+
10: (0, 0),
123+
11: (0, 0),
124+
12: (0, 0),
125+
13: (0, 0),
126+
14: (0, 0),
127+
15: (0, 0),
128+
16: (0, 0),
129+
},
130+
(0, 10): { # adds sasl + api versions [pulled from api_versions response]
131+
0: (0, 2),
132+
1: (0, 2),
133+
2: (0, 0),
134+
3: (0, 1),
135+
4: (0, 0),
136+
5: (0, 0),
137+
6: (0, 2),
138+
7: (1, 1),
139+
8: (0, 2),
140+
9: (0, 1),
141+
10: (0, 0),
142+
11: (0, 0),
143+
12: (0, 0),
144+
13: (0, 0),
145+
14: (0, 0),
146+
15: (0, 0),
147+
16: (0, 0),
148+
17: (0, 0),
149+
18: (0, 0),
150+
},
151+
}

0 commit comments

Comments
 (0)