Skip to content

Commit 5a3d95d

Browse files
authored
Add KafkaClient.api_version(operation) for best available from api_versions (#2495)
1 parent b697808 commit 5a3d95d

File tree

14 files changed

+173
-174
lines changed

14 files changed

+173
-174
lines changed

kafka/admin/client.py

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,7 @@ def __init__(self, **configs):
215215
)
216216

217217
# Get auto-discovered version from client if necessary
218-
if self.config['api_version'] is None:
219-
self.config['api_version'] = self._client.config['api_version']
220-
else:
221-
# need to run check_version for get_api_versions()
222-
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
218+
self.config['api_version'] = self._client.config['api_version']
223219

224220
self._closed = False
225221
self._refresh_controller_id()
@@ -236,35 +232,6 @@ def close(self):
236232
self._closed = True
237233
log.debug("KafkaAdminClient is now closed.")
238234

239-
def _matching_api_version(self, operation):
240-
"""Find the latest version of the protocol operation supported by both
241-
this library and the broker.
242-
243-
This resolves to the lesser of either the latest api version this
244-
library supports, or the max version supported by the broker.
245-
246-
Arguments:
247-
operation: A list of protocol operation versions from kafka.protocol.
248-
249-
Returns:
250-
int: The max matching version number between client and broker.
251-
"""
252-
broker_api_versions = self._client.get_api_versions()
253-
api_key = operation[0].API_KEY
254-
if broker_api_versions is None or api_key not in broker_api_versions:
255-
raise IncompatibleBrokerVersion(
256-
"Kafka broker does not support the '{}' Kafka protocol."
257-
.format(operation[0].__name__))
258-
min_version, max_version = broker_api_versions[api_key]
259-
version = min(len(operation) - 1, max_version)
260-
if version < min_version:
261-
# max library version is less than min broker version. Currently,
262-
# no Kafka versions specify a min msg version. Maybe in the future?
263-
raise IncompatibleBrokerVersion(
264-
"No version of the '{}' Kafka protocol is supported by both the client and broker."
265-
.format(operation[0].__name__))
266-
return version
267-
268235
def _validate_timeout(self, timeout_ms):
269236
"""Validate the timeout is set or use the configuration default.
270237
@@ -278,7 +245,7 @@ def _validate_timeout(self, timeout_ms):
278245

279246
def _refresh_controller_id(self, timeout_ms=30000):
280247
"""Determine the Kafka cluster controller."""
281-
version = self._matching_api_version(MetadataRequest)
248+
version = self._client.api_version(MetadataRequest, max_version=6)
282249
if 1 <= version <= 6:
283250
timeout_at = time.time() + timeout_ms / 1000
284251
while time.time() < timeout_at:
@@ -323,8 +290,7 @@ def _find_coordinator_id_send_request(self, group_id):
323290
# When I experimented with this, the coordinator value returned in
324291
# GroupCoordinatorResponse_v1 didn't match the value returned by
325292
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
326-
version = 0
327-
# version = self._matching_api_version(GroupCoordinatorRequest)
293+
version = self._client.api_version(GroupCoordinatorRequest, max_version=0)
328294
if version <= 0:
329295
request = GroupCoordinatorRequest[version](group_id)
330296
else:
@@ -493,7 +459,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
493459
Returns:
494460
Appropriate version of CreateTopicResponse class.
495461
"""
496-
version = self._matching_api_version(CreateTopicsRequest)
462+
version = self._client.api_version(CreateTopicsRequest, max_version=3)
497463
timeout_ms = self._validate_timeout(timeout_ms)
498464
if version == 0:
499465
if validate_only:
@@ -531,7 +497,7 @@ def delete_topics(self, topics, timeout_ms=None):
531497
Returns:
532498
Appropriate version of DeleteTopicsResponse class.
533499
"""
534-
version = self._matching_api_version(DeleteTopicsRequest)
500+
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
535501
timeout_ms = self._validate_timeout(timeout_ms)
536502
if version <= 3:
537503
request = DeleteTopicsRequest[version](
@@ -550,7 +516,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
550516
"""
551517
topics == None means "get all topics"
552518
"""
553-
version = self._matching_api_version(MetadataRequest)
519+
version = self._client.api_version(MetadataRequest, max_version=5)
554520
if version <= 3:
555521
if auto_topic_creation:
556522
raise IncompatibleBrokerVersion(
@@ -667,7 +633,7 @@ def describe_acls(self, acl_filter):
667633
tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
668634
"""
669635

670-
version = self._matching_api_version(DescribeAclsRequest)
636+
version = self._client.api_version(DescribeAclsRequest, max_version=1)
671637
if version == 0:
672638
request = DescribeAclsRequest[version](
673639
resource_type=acl_filter.resource_pattern.resource_type,
@@ -801,7 +767,7 @@ def create_acls(self, acls):
801767
if not isinstance(acl, ACL):
802768
raise IllegalArgumentError("acls must contain ACL objects")
803769

804-
version = self._matching_api_version(CreateAclsRequest)
770+
version = self._client.api_version(CreateAclsRequest, max_version=1)
805771
if version == 0:
806772
request = CreateAclsRequest[version](
807773
creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
@@ -923,7 +889,7 @@ def delete_acls(self, acl_filters):
923889
if not isinstance(acl, ACLFilter):
924890
raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")
925891

926-
version = self._matching_api_version(DeleteAclsRequest)
892+
version = self._client.api_version(DeleteAclsRequest, max_version=1)
927893

928894
if version == 0:
929895
request = DeleteAclsRequest[version](
@@ -992,7 +958,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
992958
topic_resources.append(self._convert_describe_config_resource_request(config_resource))
993959

994960
futures = []
995-
version = self._matching_api_version(DescribeConfigsRequest)
961+
version = self._client.api_version(DescribeConfigsRequest, max_version=2)
996962
if version == 0:
997963
if include_synonyms:
998964
raise IncompatibleBrokerVersion(
@@ -1077,7 +1043,7 @@ def alter_configs(self, config_resources):
10771043
Returns:
10781044
Appropriate version of AlterConfigsResponse class.
10791045
"""
1080-
version = self._matching_api_version(AlterConfigsRequest)
1046+
version = self._client.api_version(AlterConfigsRequest, max_version=1)
10811047
if version <= 1:
10821048
request = AlterConfigsRequest[version](
10831049
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
@@ -1138,7 +1104,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
11381104
Returns:
11391105
Appropriate version of CreatePartitionsResponse class.
11401106
"""
1141-
version = self._matching_api_version(CreatePartitionsRequest)
1107+
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
11421108
timeout_ms = self._validate_timeout(timeout_ms)
11431109
if version <= 1:
11441110
request = CreatePartitionsRequest[version](
@@ -1177,7 +1143,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
11771143
Returns:
11781144
A message future.
11791145
"""
1180-
version = self._matching_api_version(DescribeGroupsRequest)
1146+
version = self._client.api_version(DescribeGroupsRequest, max_version=3)
11811147
if version <= 2:
11821148
if include_authorized_operations:
11831149
raise IncompatibleBrokerVersion(
@@ -1311,7 +1277,7 @@ def _list_consumer_groups_send_request(self, broker_id):
13111277
Returns:
13121278
A message future
13131279
"""
1314-
version = self._matching_api_version(ListGroupsRequest)
1280+
version = self._client.api_version(ListGroupsRequest, max_version=2)
13151281
if version <= 2:
13161282
request = ListGroupsRequest[version]()
13171283
else:
@@ -1394,7 +1360,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
13941360
Returns:
13951361
A message future
13961362
"""
1397-
version = self._matching_api_version(OffsetFetchRequest)
1363+
version = self._client.api_version(OffsetFetchRequest, max_version=3)
13981364
if version <= 3:
13991365
if partitions is None:
14001366
if version <= 1:
@@ -1564,7 +1530,7 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
15641530
Returns:
15651531
A future representing the in-flight DeleteGroupsRequest.
15661532
"""
1567-
version = self._matching_api_version(DeleteGroupsRequest)
1533+
version = self._client.api_version(DeleteGroupsRequest, max_version=1)
15681534
if version <= 1:
15691535
request = DeleteGroupsRequest[version](group_ids)
15701536
else:
@@ -1595,7 +1561,7 @@ def describe_log_dirs(self):
15951561
Returns:
15961562
A message future
15971563
"""
1598-
version = self._matching_api_version(DescribeLogDirsRequest)
1564+
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
15991565
if version <= 0:
16001566
request = DescribeLogDirsRequest[version]()
16011567
future = self._send_request_to_node(self._client.least_loaded_node(), request)

kafka/client_async.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,23 @@ class KafkaClient(object):
130130
format. If no cipher can be selected (because compile-time options
131131
or other configuration forbids use of all the specified ciphers),
132132
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
133-
api_version (tuple): Specify which Kafka API version to use. If set
134-
to None, KafkaClient will attempt to infer the broker version by
135-
probing various APIs. Example: (0, 10, 2). Default: None
133+
api_version (tuple): Specify which Kafka API version to use. If set to
134+
None, the client will attempt to determine the broker version via
135+
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
136+
various known APIs. Dynamic version checking is performed eagerly
137+
during __init__ and can raise NoBrokersAvailableError if no connection
138+
was made before timeout (see api_version_auto_timeout_ms below).
139+
Different versions enable different functionality.
140+
141+
Examples:
142+
(3, 9) most recent broker release, enable all supported features
143+
(0, 10, 0) enables sasl authentication
144+
(0, 8, 0) enables basic functionality only
145+
146+
Default: None
136147
api_version_auto_timeout_ms (int): number of milliseconds to throw a
137148
timeout exception from the constructor when checking the broker
138-
api version. Only applies if api_version is None
149+
api version. Only applies if api_version set to None.
139150
selector (selectors.BaseSelector): Provide a specific selector
140151
implementation to use for I/O multiplexing.
141152
Default: selectors.DefaultSelector
@@ -868,9 +879,9 @@ def _maybe_refresh_metadata(self, wakeup=False):
868879
if not topics and self.cluster.is_bootstrap(node_id):
869880
topics = list(self.config['bootstrap_topics_filter'])
870881

882+
api_version = self.api_version(MetadataRequest, max_version=1)
871883
if self.cluster.need_all_topic_metadata or not topics:
872-
topics = [] if self.config['api_version'] < (0, 10, 0) else None
873-
api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1
884+
topics = MetadataRequest[api_version].ALL_TOPICS
874885
request = MetadataRequest[api_version](topics)
875886
log.debug("Sending metadata request %s to node %s", request, node_id)
876887
future = self.send(node_id, request, wakeup=wakeup)
@@ -962,6 +973,43 @@ def check_version(self, node_id=None, timeout=2, strict=False):
962973
self._lock.release()
963974
raise Errors.NoBrokersAvailable()
964975

976+
def api_version(self, operation, max_version=None):
977+
"""Find the latest version of the protocol operation supported by both
978+
this library and the broker.
979+
980+
This resolves to the lesser of either the latest api version this
981+
library supports, or the max version supported by the broker.
982+
983+
Arguments:
984+
operation: A list of protocol operation versions from kafka.protocol.
985+
986+
Keyword Arguments:
987+
max_version (int, optional): Provide an alternate maximum api version
988+
to reflect limitations in user code.
989+
990+
Returns:
991+
int: The highest api version number compatible between client and broker.
992+
993+
Raises: IncompatibleBrokerVersion if no matching version is found
994+
"""
995+
# Cap max_version at the largest available version in operation list
996+
max_version = min(len(operation) - 1, max_version if max_version is not None else float('inf'))
997+
broker_api_versions = self._api_versions
998+
api_key = operation[0].API_KEY
999+
if broker_api_versions is None or api_key not in broker_api_versions:
1000+
raise IncompatibleBrokerVersion(
1001+
"Kafka broker does not support the '{}' Kafka protocol."
1002+
.format(operation[0].__name__))
1003+
broker_min_version, broker_max_version = broker_api_versions[api_key]
1004+
version = min(max_version, broker_max_version)
1005+
if version < broker_min_version:
1006+
# max library version is less than min broker version. Currently,
1007+
# no Kafka versions specify a min msg version. Maybe in the future?
1008+
raise IncompatibleBrokerVersion(
1009+
"No version of the '{}' Kafka protocol is supported by both the client and broker."
1010+
.format(operation[0].__name__))
1011+
return version
1012+
9651013
def wakeup(self):
9661014
if self._waking or self._wake_w is None:
9671015
return

kafka/conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from kafka.future import Future
2525
from kafka.metrics.stats import Avg, Count, Max, Rate
2626
from kafka.oauth.abstract import AbstractTokenProvider
27-
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest
27+
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest, DescribeClientQuotasRequest
2828
from kafka.protocol.commit import OffsetFetchRequest
2929
from kafka.protocol.offset import OffsetRequest
3030
from kafka.protocol.produce import ProduceRequest
@@ -1179,7 +1179,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11791179
# format (<broker version>, <needed struct>)
11801180
# Make sure to update consumer_integration test check when adding newer versions.
11811181
((2, 6), DescribeClientQuotasRequest[0]),
1182-
((2, 5), DescribeAclsRequest_v2),
1182+
((2, 5), DescribeAclsRequest[2]),
11831183
((2, 4), ProduceRequest[8]),
11841184
((2, 3), FetchRequest[11]),
11851185
((2, 2), OffsetRequest[5]),

kafka/consumer/fetcher.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class Fetcher(six.Iterator):
5757
'check_crcs': True,
5858
'iterator_refetch_records': 1, # undocumented -- interface may change
5959
'metric_group_prefix': 'consumer',
60-
'api_version': (0, 8, 0),
6160
'retry_backoff_ms': 100
6261
}
6362

@@ -561,18 +560,16 @@ def on_fail(err):
561560
return list_offsets_future
562561

563562
def _send_offset_request(self, node_id, timestamps):
563+
version = self._client.api_version(OffsetRequest, max_version=1)
564564
by_topic = collections.defaultdict(list)
565565
for tp, timestamp in six.iteritems(timestamps):
566-
if self.config['api_version'] >= (0, 10, 1):
566+
if version >= 1:
567567
data = (tp.partition, timestamp)
568568
else:
569569
data = (tp.partition, timestamp, 1)
570570
by_topic[tp.topic].append(data)
571571

572-
if self.config['api_version'] >= (0, 10, 1):
573-
request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
574-
else:
575-
request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
572+
request = OffsetRequest[version](-1, list(six.iteritems(by_topic)))
576573

577574
# Client returns a future that only fails on network issues
578575
# so create a separate future and attach a callback to update it
@@ -662,7 +659,7 @@ def _create_fetch_requests(self):
662659
FetchRequests skipped if no leader, or node has requests in flight
663660
664661
Returns:
665-
dict: {node_id: FetchRequest, ...} (version depends on api_version)
662+
dict: {node_id: FetchRequest, ...} (version depends on client api_versions)
666663
"""
667664
# create the fetch info as a dict of lists of partition info tuples
668665
# which can be passed to FetchRequest() via .items()
@@ -702,16 +699,7 @@ def _create_fetch_requests(self):
702699
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
703700
partition, node_id)
704701

705-
if self.config['api_version'] >= (0, 11):
706-
version = 4
707-
elif self.config['api_version'] >= (0, 10, 1):
708-
version = 3
709-
elif self.config['api_version'] >= (0, 10, 0):
710-
version = 2
711-
elif self.config['api_version'] == (0, 9):
712-
version = 1
713-
else:
714-
version = 0
702+
version = self._client.api_version(FetchRequest, max_version=4)
715703
requests = {}
716704
for node_id, partition_data in six.iteritems(fetchable):
717705
if version < 3:

kafka/consumer/group.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,17 @@ class KafkaConsumer(six.Iterator):
195195
or other configuration forbids use of all the specified ciphers),
196196
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
197197
api_version (tuple): Specify which Kafka API version to use. If set to
198-
None, the client will attempt to infer the broker version by probing
199-
various APIs. Different versions enable different functionality.
198+
None, the client will attempt to determine the broker version via
199+
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
200+
various known APIs. Dynamic version checking is performed eagerly
201+
during __init__ and can raise NoBrokersAvailableError if no connection
202+
was made before timeout (see api_version_auto_timeout_ms below).
203+
Different versions enable different functionality.
200204
201205
Examples:
206+
(3, 9) most recent broker release, enable all supported features
207+
(0, 11) enables message format v2 (internal)
208+
(0, 10, 0) enables sasl authentication and message format v1
202209
(0, 9) enables full group coordination features with automatic
203210
partition assignment and rebalancing,
204211
(0, 8, 2) enables kafka-storage offset commits with manual
@@ -357,9 +364,8 @@ def __init__(self, *topics, **configs):
357364

358365
self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
359366

360-
# Get auto-discovered version from client if necessary
361-
if self.config['api_version'] is None:
362-
self.config['api_version'] = self._client.config['api_version']
367+
# Get auto-discovered / normalized version from client
368+
self.config['api_version'] = self._client.config['api_version']
363369

364370
# Coordinator configurations are different for older brokers
365371
# max_poll_interval_ms is not supported directly -- it must the be

0 commit comments

Comments
 (0)