Skip to content

Commit 3ad62b0

Browse files
committed
Add KafkaClient.api_version(operation) for best available from api_versions
1 parent b697808 commit 3ad62b0

File tree

2 files changed

+52
-51
lines changed

2 files changed

+52
-51
lines changed

kafka/admin/client.py

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,7 @@ def __init__(self, **configs):
215215
)
216216

217217
# Get auto-discovered version from client if necessary
218-
if self.config['api_version'] is None:
219-
self.config['api_version'] = self._client.config['api_version']
220-
else:
221-
# need to run check_version for get_api_versions()
222-
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
218+
self.config['api_version'] = self._client.config['api_version']
223219

224220
self._closed = False
225221
self._refresh_controller_id()
@@ -236,35 +232,6 @@ def close(self):
236232
self._closed = True
237233
log.debug("KafkaAdminClient is now closed.")
238234

239-
def _matching_api_version(self, operation):
240-
"""Find the latest version of the protocol operation supported by both
241-
this library and the broker.
242-
243-
This resolves to the lesser of either the latest api version this
244-
library supports, or the max version supported by the broker.
245-
246-
Arguments:
247-
operation: A list of protocol operation versions from kafka.protocol.
248-
249-
Returns:
250-
int: The max matching version number between client and broker.
251-
"""
252-
broker_api_versions = self._client.get_api_versions()
253-
api_key = operation[0].API_KEY
254-
if broker_api_versions is None or api_key not in broker_api_versions:
255-
raise IncompatibleBrokerVersion(
256-
"Kafka broker does not support the '{}' Kafka protocol."
257-
.format(operation[0].__name__))
258-
min_version, max_version = broker_api_versions[api_key]
259-
version = min(len(operation) - 1, max_version)
260-
if version < min_version:
261-
# max library version is less than min broker version. Currently,
262-
# no Kafka versions specify a min msg version. Maybe in the future?
263-
raise IncompatibleBrokerVersion(
264-
"No version of the '{}' Kafka protocol is supported by both the client and broker."
265-
.format(operation[0].__name__))
266-
return version
267-
268235
def _validate_timeout(self, timeout_ms):
269236
"""Validate the timeout is set or use the configuration default.
270237
@@ -278,7 +245,7 @@ def _validate_timeout(self, timeout_ms):
278245

279246
def _refresh_controller_id(self, timeout_ms=30000):
280247
"""Determine the Kafka cluster controller."""
281-
version = self._matching_api_version(MetadataRequest)
248+
version = self._client.api_version(MetadataRequest, max_version=6)
282249
if 1 <= version <= 6:
283250
timeout_at = time.time() + timeout_ms / 1000
284251
while time.time() < timeout_at:
@@ -323,8 +290,7 @@ def _find_coordinator_id_send_request(self, group_id):
323290
# When I experimented with this, the coordinator value returned in
324291
# GroupCoordinatorResponse_v1 didn't match the value returned by
325292
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
326-
version = 0
327-
# version = self._matching_api_version(GroupCoordinatorRequest)
293+
version = self._client.api_version(GroupCoordinatorRequest, max_version=0)
328294
if version <= 0:
329295
request = GroupCoordinatorRequest[version](group_id)
330296
else:
@@ -493,7 +459,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
493459
Returns:
494460
Appropriate version of CreateTopicResponse class.
495461
"""
496-
version = self._matching_api_version(CreateTopicsRequest)
462+
version = self._client.api_version(CreateTopicsRequest, max_version=3)
497463
timeout_ms = self._validate_timeout(timeout_ms)
498464
if version == 0:
499465
if validate_only:
@@ -531,7 +497,7 @@ def delete_topics(self, topics, timeout_ms=None):
531497
Returns:
532498
Appropriate version of DeleteTopicsResponse class.
533499
"""
534-
version = self._matching_api_version(DeleteTopicsRequest)
500+
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
535501
timeout_ms = self._validate_timeout(timeout_ms)
536502
if version <= 3:
537503
request = DeleteTopicsRequest[version](
@@ -550,7 +516,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
550516
"""
551517
topics == None means "get all topics"
552518
"""
553-
version = self._matching_api_version(MetadataRequest)
519+
version = self._client.api_version(MetadataRequest, max_version=5)
554520
if version <= 3:
555521
if auto_topic_creation:
556522
raise IncompatibleBrokerVersion(
@@ -667,7 +633,7 @@ def describe_acls(self, acl_filter):
667633
tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
668634
"""
669635

670-
version = self._matching_api_version(DescribeAclsRequest)
636+
version = self._client.api_version(DescribeAclsRequest, max_version=1)
671637
if version == 0:
672638
request = DescribeAclsRequest[version](
673639
resource_type=acl_filter.resource_pattern.resource_type,
@@ -801,7 +767,7 @@ def create_acls(self, acls):
801767
if not isinstance(acl, ACL):
802768
raise IllegalArgumentError("acls must contain ACL objects")
803769

804-
version = self._matching_api_version(CreateAclsRequest)
770+
version = self._client.api_version(CreateAclsRequest, max_version=1)
805771
if version == 0:
806772
request = CreateAclsRequest[version](
807773
creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
@@ -923,7 +889,7 @@ def delete_acls(self, acl_filters):
923889
if not isinstance(acl, ACLFilter):
924890
raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")
925891

926-
version = self._matching_api_version(DeleteAclsRequest)
892+
version = self._client.api_version(DeleteAclsRequest, max_version=1)
927893

928894
if version == 0:
929895
request = DeleteAclsRequest[version](
@@ -992,7 +958,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
992958
topic_resources.append(self._convert_describe_config_resource_request(config_resource))
993959

994960
futures = []
995-
version = self._matching_api_version(DescribeConfigsRequest)
961+
version = self._client.api_version(DescribeConfigsRequest, max_version=2)
996962
if version == 0:
997963
if include_synonyms:
998964
raise IncompatibleBrokerVersion(
@@ -1077,7 +1043,7 @@ def alter_configs(self, config_resources):
10771043
Returns:
10781044
Appropriate version of AlterConfigsResponse class.
10791045
"""
1080-
version = self._matching_api_version(AlterConfigsRequest)
1046+
version = self._client.api_version(AlterConfigsRequest, max_version=1)
10811047
if version <= 1:
10821048
request = AlterConfigsRequest[version](
10831049
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
@@ -1138,7 +1104,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
11381104
Returns:
11391105
Appropriate version of CreatePartitionsResponse class.
11401106
"""
1141-
version = self._matching_api_version(CreatePartitionsRequest)
1107+
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
11421108
timeout_ms = self._validate_timeout(timeout_ms)
11431109
if version <= 1:
11441110
request = CreatePartitionsRequest[version](
@@ -1177,7 +1143,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
11771143
Returns:
11781144
A message future.
11791145
"""
1180-
version = self._matching_api_version(DescribeGroupsRequest)
1146+
version = self._client.api_version(DescribeGroupsRequest, max_version=3)
11811147
if version <= 2:
11821148
if include_authorized_operations:
11831149
raise IncompatibleBrokerVersion(
@@ -1311,7 +1277,7 @@ def _list_consumer_groups_send_request(self, broker_id):
13111277
Returns:
13121278
A message future
13131279
"""
1314-
version = self._matching_api_version(ListGroupsRequest)
1280+
version = self._client.api_version(ListGroupsRequest, max_version=2)
13151281
if version <= 2:
13161282
request = ListGroupsRequest[version]()
13171283
else:
@@ -1394,7 +1360,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
13941360
Returns:
13951361
A message future
13961362
"""
1397-
version = self._matching_api_version(OffsetFetchRequest)
1363+
version = self._client.api_version(OffsetFetchRequest, max_version=3)
13981364
if version <= 3:
13991365
if partitions is None:
14001366
if version <= 1:
@@ -1564,7 +1530,7 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
15641530
Returns:
15651531
A future representing the in-flight DeleteGroupsRequest.
15661532
"""
1567-
version = self._matching_api_version(DeleteGroupsRequest)
1533+
version = self._client.api_version(DeleteGroupsRequest, max_version=1)
15681534
if version <= 1:
15691535
request = DeleteGroupsRequest[version](group_ids)
15701536
else:
@@ -1595,7 +1561,7 @@ def describe_log_dirs(self):
15951561
Returns:
15961562
A message future
15971563
"""
1598-
version = self._matching_api_version(DescribeLogDirsRequest)
1564+
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
15991565
if version <= 0:
16001566
request = DescribeLogDirsRequest[version]()
16011567
future = self._send_request_to_node(self._client.least_loaded_node(), request)

kafka/client_async.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,41 @@ def check_version(self, node_id=None, timeout=2, strict=False):
962962
self._lock.release()
963963
raise Errors.NoBrokersAvailable()
964964

965+
def api_version(self, operation, max_version=None):
966+
"""Find the latest version of the protocol operation supported by both
967+
this library and the broker.
968+
969+
This resolves to the lesser of either the latest api version this
970+
library supports, or the max version supported by the broker.
971+
972+
Arguments:
973+
operation: A list of protocol operation versions from kafka.protocol.
974+
975+
Keyword Arguments:
976+
max_version (int, optional): Provide an alternate maximum api version
977+
to reflect limitations in user code.
978+
979+
Returns:
980+
int: The highest api version number compatible between client and broker.
981+
"""
982+
if max_version is None:
983+
max_version = len(operation) - 1
984+
broker_api_versions = self._api_versions
985+
api_key = operation[0].API_KEY
986+
if broker_api_versions is None or api_key not in broker_api_versions:
987+
raise IncompatibleBrokerVersion(
988+
"Kafka broker does not support the '{}' Kafka protocol."
989+
.format(operation[0].__name__))
990+
broker_min_version, broker_max_version = broker_api_versions[api_key]
991+
version = min(max_version, broker_max_version)
992+
if version < broker_min_version:
993+
# max library version is less than min broker version. Currently,
994+
# no Kafka versions specify a min msg version. Maybe in the future?
995+
raise IncompatibleBrokerVersion(
996+
"No version of the '{}' Kafka protocol is supported by both the client and broker."
997+
.format(operation[0].__name__))
998+
return version
999+
9651000
def wakeup(self):
9661001
if self._waking or self._wake_w is None:
9671002
return

0 commit comments

Comments
 (0)