Skip to content

Add KafkaClient.api_version(operation) for best available from api_versions #2495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 26, 2025
68 changes: 17 additions & 51 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,7 @@ def __init__(self, **configs):
)

# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']
else:
# need to run check_version for get_api_versions()
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
self.config['api_version'] = self._client.config['api_version']

self._closed = False
self._refresh_controller_id()
Expand All @@ -236,35 +232,6 @@ def close(self):
self._closed = True
log.debug("KafkaAdminClient is now closed.")

def _matching_api_version(self, operation):
"""Find the latest version of the protocol operation supported by both
this library and the broker.

This resolves to the lesser of either the latest api version this
library supports, or the max version supported by the broker.

Arguments:
operation: A list of protocol operation versions from kafka.protocol.

Returns:
int: The max matching version number between client and broker.
"""
broker_api_versions = self._client.get_api_versions()
api_key = operation[0].API_KEY
if broker_api_versions is None or api_key not in broker_api_versions:
raise IncompatibleBrokerVersion(
"Kafka broker does not support the '{}' Kafka protocol."
.format(operation[0].__name__))
min_version, max_version = broker_api_versions[api_key]
version = min(len(operation) - 1, max_version)
if version < min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation[0].__name__))
return version

def _validate_timeout(self, timeout_ms):
"""Validate the timeout is set or use the configuration default.

Expand All @@ -278,7 +245,7 @@ def _validate_timeout(self, timeout_ms):

def _refresh_controller_id(self, timeout_ms=30000):
"""Determine the Kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
version = self._client.api_version(MetadataRequest, max_version=6)
if 1 <= version <= 6:
timeout_at = time.time() + timeout_ms / 1000
while time.time() < timeout_at:
Expand Down Expand Up @@ -323,8 +290,7 @@ def _find_coordinator_id_send_request(self, group_id):
# When I experimented with this, the coordinator value returned in
# GroupCoordinatorResponse_v1 didn't match the value returned by
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
version = 0
# version = self._matching_api_version(GroupCoordinatorRequest)
version = self._client.api_version(GroupCoordinatorRequest, max_version=0)
if version <= 0:
request = GroupCoordinatorRequest[version](group_id)
else:
Expand Down Expand Up @@ -493,7 +459,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
Returns:
Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
version = self._client.api_version(CreateTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
Expand Down Expand Up @@ -531,7 +497,7 @@ def delete_topics(self, topics, timeout_ms=None):
Returns:
Appropriate version of DeleteTopicsResponse class.
"""
version = self._matching_api_version(DeleteTopicsRequest)
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 3:
request = DeleteTopicsRequest[version](
Expand All @@ -550,7 +516,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
version = self._matching_api_version(MetadataRequest)
version = self._client.api_version(MetadataRequest, max_version=5)
if version <= 3:
if auto_topic_creation:
raise IncompatibleBrokerVersion(
Expand Down Expand Up @@ -667,7 +633,7 @@ def describe_acls(self, acl_filter):
tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
"""

version = self._matching_api_version(DescribeAclsRequest)
version = self._client.api_version(DescribeAclsRequest, max_version=1)
if version == 0:
request = DescribeAclsRequest[version](
resource_type=acl_filter.resource_pattern.resource_type,
Expand Down Expand Up @@ -801,7 +767,7 @@ def create_acls(self, acls):
if not isinstance(acl, ACL):
raise IllegalArgumentError("acls must contain ACL objects")

version = self._matching_api_version(CreateAclsRequest)
version = self._client.api_version(CreateAclsRequest, max_version=1)
if version == 0:
request = CreateAclsRequest[version](
creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
Expand Down Expand Up @@ -923,7 +889,7 @@ def delete_acls(self, acl_filters):
if not isinstance(acl, ACLFilter):
raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")

version = self._matching_api_version(DeleteAclsRequest)
version = self._client.api_version(DeleteAclsRequest, max_version=1)

if version == 0:
request = DeleteAclsRequest[version](
Expand Down Expand Up @@ -992,7 +958,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
topic_resources.append(self._convert_describe_config_resource_request(config_resource))

futures = []
version = self._matching_api_version(DescribeConfigsRequest)
version = self._client.api_version(DescribeConfigsRequest, max_version=2)
if version == 0:
if include_synonyms:
raise IncompatibleBrokerVersion(
Expand Down Expand Up @@ -1077,7 +1043,7 @@ def alter_configs(self, config_resources):
Returns:
Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
version = self._client.api_version(AlterConfigsRequest, max_version=1)
if version <= 1:
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
Expand Down Expand Up @@ -1138,7 +1104,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
Returns:
Appropriate version of CreatePartitionsResponse class.
"""
version = self._matching_api_version(CreatePartitionsRequest)
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 1:
request = CreatePartitionsRequest[version](
Expand Down Expand Up @@ -1177,7 +1143,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
Returns:
A message future.
"""
version = self._matching_api_version(DescribeGroupsRequest)
version = self._client.api_version(DescribeGroupsRequest, max_version=3)
if version <= 2:
if include_authorized_operations:
raise IncompatibleBrokerVersion(
Expand Down Expand Up @@ -1311,7 +1277,7 @@ def _list_consumer_groups_send_request(self, broker_id):
Returns:
A message future
"""
version = self._matching_api_version(ListGroupsRequest)
version = self._client.api_version(ListGroupsRequest, max_version=2)
if version <= 2:
request = ListGroupsRequest[version]()
else:
Expand Down Expand Up @@ -1394,7 +1360,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
Returns:
A message future
"""
version = self._matching_api_version(OffsetFetchRequest)
version = self._client.api_version(OffsetFetchRequest, max_version=3)
if version <= 3:
if partitions is None:
if version <= 1:
Expand Down Expand Up @@ -1564,7 +1530,7 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
Returns:
A future representing the in-flight DeleteGroupsRequest.
"""
version = self._matching_api_version(DeleteGroupsRequest)
version = self._client.api_version(DeleteGroupsRequest, max_version=1)
if version <= 1:
request = DeleteGroupsRequest[version](group_ids)
else:
Expand Down Expand Up @@ -1595,7 +1561,7 @@ def describe_log_dirs(self):
Returns:
A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
if version <= 0:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)
Expand Down
60 changes: 54 additions & 6 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,23 @@ class KafkaClient(object):
format. If no cipher can be selected (because compile-time options
or other configuration forbids use of all the specified ciphers),
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use. If set
to None, KafkaClient will attempt to infer the broker version by
probing various APIs. Example: (0, 10, 2). Default: None
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will attempt to determine the broker version via
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
various known APIs. Dynamic version checking is performed eagerly
during __init__ and can raise NoBrokersAvailableError if no connection
was made before timeout (see api_version_auto_timeout_ms below).
Different versions enable different functionality.

Examples:
(3, 9) most recent broker release, enable all supported features
(0, 10, 0) enables sasl authentication
(0, 8, 0) enables basic functionality only

Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version is None
api version. Only applies if api_version set to None.
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
Expand Down Expand Up @@ -868,9 +879,9 @@ def _maybe_refresh_metadata(self, wakeup=False):
if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])

api_version = self.api_version(MetadataRequest, max_version=1)
if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10, 0) else None
api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1
topics = MetadataRequest[api_version].ALL_TOPICS
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request, wakeup=wakeup)
Expand Down Expand Up @@ -962,6 +973,43 @@ def check_version(self, node_id=None, timeout=2, strict=False):
self._lock.release()
raise Errors.NoBrokersAvailable()

def api_version(self, operation, max_version=None):
"""Find the latest version of the protocol operation supported by both
this library and the broker.

This resolves to the lesser of either the latest api version this
library supports, or the max version supported by the broker.

Arguments:
operation: A list of protocol operation versions from kafka.protocol.

Keyword Arguments:
max_version (int, optional): Provide an alternate maximum api version
to reflect limitations in user code.

Returns:
int: The highest api version number compatible between client and broker.

Raises: IncompatibleBrokerVersion if no matching version is found
"""
# Cap max_version at the largest available version in operation list
max_version = min(len(operation) - 1, max_version if max_version is not None else float('inf'))
broker_api_versions = self._api_versions
api_key = operation[0].API_KEY
if broker_api_versions is None or api_key not in broker_api_versions:
raise IncompatibleBrokerVersion(
"Kafka broker does not support the '{}' Kafka protocol."
.format(operation[0].__name__))
broker_min_version, broker_max_version = broker_api_versions[api_key]
version = min(max_version, broker_max_version)
if version < broker_min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation[0].__name__))
return version

def wakeup(self):
if self._waking or self._wake_w is None:
return
Expand Down
4 changes: 2 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest, DescribeClientQuotasRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest
Expand Down Expand Up @@ -1179,7 +1179,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
((2, 6), DescribeClientQuotasRequest[0]),
((2, 5), DescribeAclsRequest_v2),
((2, 5), DescribeAclsRequest[2]),
((2, 4), ProduceRequest[8]),
((2, 3), FetchRequest[11]),
((2, 2), OffsetRequest[5]),
Expand Down
22 changes: 5 additions & 17 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class Fetcher(six.Iterator):
'check_crcs': True,
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
'retry_backoff_ms': 100
}

Expand Down Expand Up @@ -561,18 +560,16 @@ def on_fail(err):
return list_offsets_future

def _send_offset_request(self, node_id, timestamps):
version = self._client.api_version(OffsetRequest, max_version=1)
by_topic = collections.defaultdict(list)
for tp, timestamp in six.iteritems(timestamps):
if self.config['api_version'] >= (0, 10, 1):
if version >= 1:
data = (tp.partition, timestamp)
else:
data = (tp.partition, timestamp, 1)
by_topic[tp.topic].append(data)

if self.config['api_version'] >= (0, 10, 1):
request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
else:
request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
request = OffsetRequest[version](-1, list(six.iteritems(by_topic)))

# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
Expand Down Expand Up @@ -662,7 +659,7 @@ def _create_fetch_requests(self):
FetchRequests skipped if no leader, or node has requests in flight

Returns:
dict: {node_id: FetchRequest, ...} (version depends on api_version)
dict: {node_id: FetchRequest, ...} (version depends on client api_versions)
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()
Expand Down Expand Up @@ -702,16 +699,7 @@ def _create_fetch_requests(self):
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
partition, node_id)

if self.config['api_version'] >= (0, 11):
version = 4
elif self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10, 0):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
else:
version = 0
version = self._client.api_version(FetchRequest, max_version=4)
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
if version < 3:
Expand Down
16 changes: 11 additions & 5 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,17 @@ class KafkaConsumer(six.Iterator):
or other configuration forbids use of all the specified ciphers),
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will attempt to infer the broker version by probing
various APIs. Different versions enable different functionality.
None, the client will attempt to determine the broker version via
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
various known APIs. Dynamic version checking is performed eagerly
during __init__ and can raise NoBrokersAvailableError if no connection
was made before timeout (see api_version_auto_timeout_ms below).
Different versions enable different functionality.

Examples:
(3, 9) most recent broker release, enable all supported features
(0, 11) enables message format v2 (internal)
(0, 10, 0) enables sasl authentication and message format v1
(0, 9) enables full group coordination features with automatic
partition assignment and rebalancing,
(0, 8, 2) enables kafka-storage offset commits with manual
Expand Down Expand Up @@ -357,9 +364,8 @@ def __init__(self, *topics, **configs):

self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)

# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']
# Get auto-discovered / normalized version from client
self.config['api_version'] = self._client.config['api_version']

# Coordinator configurations are different for older brokers
# max_poll_interval_ms is not supported directly -- it must the be
Expand Down
Loading