Skip to content

Commit cff0d7d

Browse files
committed
move to kafka.protocol.broker_api_versions; set _api_versions from configured api_version
1 parent be01345 commit cff0d7d

File tree

4 files changed

+50
-66
lines changed

4 files changed

+50
-66
lines changed

kafka/client_async.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from kafka.metrics import AnonMeasurable
2626
from kafka.metrics.stats import Avg, Count, Rate
2727
from kafka.metrics.stats.rate import TimeUnit
28+
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2829
from kafka.protocol.metadata import MetadataRequest
2930
from kafka.util import Dict, WeakMethod
3031
# Although this looks unused, it actually monkey-patches socket.socketpair()
@@ -239,6 +240,20 @@ def __init__(self, **configs):
239240
if self.config['api_version'] is None:
240241
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
241242
self.config['api_version'] = self.check_version(timeout=check_timeout)
243+
elif self.config['api_version'] in BROKER_API_VERSIONS:
244+
self._api_versions = BROKER_API_VERSIONS[self.config['api_version']]
245+
else:
246+
compatible_version = None
247+
for v in sorted(BROKER_API_VERSIONS.keys(), reverse=True):
248+
if v <= self.config['api_version']:
249+
compatible_version = v
250+
break
251+
if compatible_version:
252+
log.warning('Configured api_version %s not supported; using %s',
253+
self.config['api_version'], compatible_version)
254+
self._api_versions = BROKER_API_VERSIONS[compatible_version]
255+
else:
256+
raise Errors.UnrecognizedBrokerVersion(self.config['api_version'])
242257

243258
def _init_wakeup_socketpair(self):
244259
self._wake_r, self._wake_w = socket.socketpair()
@@ -925,7 +940,8 @@ def check_version(self, node_id=None, timeout=2, strict=False):
925940
try:
926941
remaining = end - time.time()
927942
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
928-
self._api_versions = conn.get_api_versions()
943+
if not self._api_versions:
944+
self._api_versions = conn.get_api_versions()
929945
self._lock.release()
930946
return version
931947
except Errors.NodeNotReadyError:

kafka/conn.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,7 +1233,8 @@ def reset_override_configs():
12331233
# request, both will be failed with a ConnectionError that wraps
12341234
# socket.error (32, 54, or 104)
12351235
from kafka.protocol.admin import ListGroupsRequest
1236-
from kafka.protocol.api_versions import ApiVersionsRequest, OLD_BROKER_API_VERSIONS
1236+
from kafka.protocol.api_versions import ApiVersionsRequest
1237+
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
12371238
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
12381239

12391240
test_cases = [
@@ -1280,7 +1281,9 @@ def reset_override_configs():
12801281
continue
12811282
version = self._infer_broker_version_from_api_versions(api_versions)
12821283
else:
1283-
self._api_versions = OLD_BROKER_API_VERSIONS.get(version)
1284+
if version not in BROKER_API_VERSIONS:
1285+
raise Errors.UnrecognizedBrokerVersion(version)
1286+
self._api_versions = BROKER_API_VERSIONS[version]
12841287
log.info('Broker version identified as %s', '.'.join(map(str, version)))
12851288
log.info('Set configuration api_version=%s to skip auto'
12861289
' check_version requests on startup', version)

kafka/protocol/api_versions.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -86,66 +86,3 @@ class ApiVersionsRequest_v2(Request):
8686
ApiVersionsResponse = [
8787
ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2,
8888
]
89-
90-
91-
OLD_BROKER_API_VERSIONS = {
92-
(0, 8, 0): {
93-
0: (0, 0),
94-
1: (0, 0),
95-
2: (0, 0),
96-
3: (0, 0),
97-
},
98-
(0, 8, 1): { # adds offset commit + fetch
99-
0: (0, 0),
100-
1: (0, 0),
101-
2: (0, 0),
102-
3: (0, 0),
103-
8: (0, 0),
104-
9: (0, 0),
105-
},
106-
(0, 8, 2): { # adds find coordinator
107-
0: (0, 0),
108-
1: (0, 0),
109-
2: (0, 0),
110-
3: (0, 0),
111-
8: (0, 1),
112-
9: (0, 1),
113-
10: (0, 0),
114-
},
115-
(0, 9): { # adds group management (join/sync/leave/heartbeat)
116-
0: (0, 1),
117-
1: (0, 1),
118-
2: (0, 0),
119-
3: (0, 0),
120-
8: (0, 2),
121-
9: (0, 1),
122-
10: (0, 0),
123-
11: (0, 0),
124-
12: (0, 0),
125-
13: (0, 0),
126-
14: (0, 0),
127-
15: (0, 0),
128-
16: (0, 0),
129-
},
130-
(0, 10): { # adds sasl + api versions [pulled from api_versions response]
131-
0: (0, 2),
132-
1: (0, 2),
133-
2: (0, 0),
134-
3: (0, 1),
135-
4: (0, 0),
136-
5: (0, 0),
137-
6: (0, 2),
138-
7: (1, 1),
139-
8: (0, 2),
140-
9: (0, 1),
141-
10: (0, 0),
142-
11: (0, 0),
143-
12: (0, 0),
144-
13: (0, 0),
145-
14: (0, 0),
146-
15: (0, 0),
147-
16: (0, 0),
148-
17: (0, 0),
149-
18: (0, 0),
150-
},
151-
}

kafka/protocol/broker_api_versions.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
BROKER_API_VERSIONS = {
2+
# api_versions responses prior to (0, 10) are synthesized for compatibility
3+
(0, 8, 0): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0)},
4+
# adds offset commit + fetch
5+
(0, 8, 1): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0), 8: (0, 0), 9: (0, 0)},
6+
# adds find coordinator
7+
(0, 8, 2): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0), 8: (0, 1), 9: (0, 1), 10: (0, 0)},
8+
# adds group management (join/sync/leave/heartbeat)
9+
(0, 9): {0: (0, 1), 1: (0, 1), 2: (0, 0), 3: (0, 0), 8: (0, 2), 9: (0, 1), 10: (0, 0), 11: (0, 0), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0)},
10+
# adds message format v1, sasl, and api versions
11+
(0, 10): {0: (0, 2), 1: (0, 2), 2: (0, 0), 3: (0, 1), 4: (0, 0), 5: (0, 0), 6: (0, 2), 7: (1, 1), 8: (0, 2), 9: (0, 1), 10: (0, 0), 11: (0, 0), 12: (0, 0), 13: (0, 0), 14: (0, 0), 15: (0, 0), 16: (0, 0), 17: (0, 0), 18: (0, 0)},
12+
# Adds message format v2, and initial admin apis (create topics, etc)
13+
(0, 11): {0: (0, 3), 1: (0, 5), 2: (0, 2), 3: (0, 4), 4: (0, 0), 5: (0, 0), 6: (0, 3), 7: (1, 1), 8: (0, 3), 9: (0, 3), 10: (0, 1), 11: (0, 2), 12: (0, 1), 13: (0, 1), 14: (0, 1), 15: (0, 1), 16: (0, 1), 17: (0, 0), 18: (0, 1), 19: (0, 2), 20: (0, 1), 21: (0, 0), 22: (0, 0), 23: (0, 0), 24: (0, 0), 25: (0, 0), 26: (0, 0), 27: (0, 0), 28: (0, 0), 29: (0, 0), 30: (0, 0), 31: (0, 0), 32: (0, 0), 33: (0, 0)},
14+
15+
# Adds Sasl Authenticate, and additional admin apis (alter configs, etc)
16+
(1, 0): {0: (0, 5), 1: (0, 6), 2: (0, 2), 3: (0, 5), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 3), 9: (0, 3), 10: (0, 1), 11: (0, 2), 12: (0, 1), 13: (0, 1), 14: (0, 1), 15: (0, 1), 16: (0, 1), 17: (0, 1), 18: (0, 1), 19: (0, 2), 20: (0, 1), 21: (0, 0), 22: (0, 0), 23: (0, 0), 24: (0, 0), 25: (0, 0), 26: (0, 0), 27: (0, 0), 28: (0, 0), 29: (0, 0), 30: (0, 0), 31: (0, 0), 32: (0, 0), 33: (0, 0), 34: (0, 0), 35: (0, 0), 36: (0, 0), 37: (0, 0)},
17+
18+
(2, 0): {0: (0, 6), 1: (0, 8), 2: (0, 3), 3: (0, 6), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 4), 9: (0, 4), 10: (0, 2), 11: (0, 3), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 2), 21: (0, 1), 22: (0, 1), 23: (0, 1), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 1), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 0), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1)},
19+
20+
(2, 1): {0: (0, 7), 1: (0, 10), 2: (0, 4), 3: (0, 7), 4: (0, 1), 5: (0, 0), 6: (0, 4), 7: (0, 1), 8: (0, 6), 9: (0, 5), 10: (0, 2), 11: (0, 3), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 2), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 0), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1)},
21+
22+
(2, 2): {0: (0, 7), 1: (0, 10), 2: (0, 5), 3: (0, 7), 4: (0, 2), 5: (0, 1), 6: (0, 5), 7: (0, 2), 8: (0, 6), 9: (0, 5), 10: (0, 2), 11: (0, 4), 12: (0, 2), 13: (0, 2), 14: (0, 2), 15: (0, 2), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 2), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1), 43: (0, 0)},
23+
24+
(2, 3): {0: (0, 7), 1: (0, 11), 2: (0, 5), 3: (0, 8), 4: (0, 2), 5: (0, 1), 6: (0, 5), 7: (0, 2), 8: (0, 7), 9: (0, 5), 10: (0, 2), 11: (0, 5), 12: (0, 3), 13: (0, 2), 14: (0, 3), 15: (0, 3), 16: (0, 2), 17: (0, 1), 18: (0, 2), 19: (0, 3), 20: (0, 3), 21: (0, 1), 22: (0, 1), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 1), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 1), 43: (0, 0), 44: (0, 0)},
25+
26+
(2, 4): {0: (0, 8), 1: (0, 11), 2: (0, 5), 3: (0, 9), 4: (0, 4), 5: (0, 2), 6: (0, 6), 7: (0, 3), 8: (0, 8), 9: (0, 6), 10: (0, 3), 11: (0, 6), 12: (0, 4), 13: (0, 4), 14: (0, 4), 15: (0, 5), 16: (0, 3), 17: (0, 1), 18: (0, 3), 19: (0, 5), 20: (0, 4), 21: (0, 1), 22: (0, 2), 23: (0, 3), 24: (0, 1), 25: (0, 1), 26: (0, 1), 27: (0, 0), 28: (0, 2), 29: (0, 1), 30: (0, 1), 31: (0, 1), 32: (0, 2), 33: (0, 1), 34: (0, 1), 35: (0, 1), 36: (0, 1), 37: (0, 1), 38: (0, 2), 39: (0, 1), 40: (0, 1), 41: (0, 1), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0)},
27+
28+
}

0 commit comments

Comments
 (0)