Skip to content

Add support for Metadata Request/Response v7 #2497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class KafkaClient(object):
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
allow_auto_create_topics (bool): Enable/disable auto topic creation
on metadata request. Only available with api_version >= (0, 11).
Default: True
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
Expand Down Expand Up @@ -184,6 +187,7 @@ class KafkaClient(object):
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'retry_backoff_ms': 100,
'allow_auto_create_topics': True,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
Expand Down Expand Up @@ -879,10 +883,13 @@ def _maybe_refresh_metadata(self, wakeup=False):
if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])

api_version = self.api_version(MetadataRequest, max_version=1)
api_version = self.api_version(MetadataRequest, max_version=7)
if self.cluster.need_all_topic_metadata or not topics:
topics = MetadataRequest[api_version].ALL_TOPICS
request = MetadataRequest[api_version](topics)
if api_version >= 4:
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'])
else:
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request, wakeup=wakeup)
future.add_callback(self.cluster.update_metadata)
Expand Down
27 changes: 24 additions & 3 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, **configs):
self.unauthorized_topics = set()
self.internal_topics = set()
self.controller = None
self.cluster_id = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -234,6 +235,9 @@ def update_metadata(self, metadata):

Returns: None
"""
if metadata.API_VERSION >= 3 and metadata.throttle_time_ms > 0:
log.warning("MetadataRequest throttled by broker (%d ms)", metadata.throttle_time_ms)

# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno:
Expand Down Expand Up @@ -261,6 +265,11 @@ def update_metadata(self, metadata):
else:
_new_controller = _new_brokers.get(metadata.controller_id)

if metadata.API_VERSION < 2:
_new_cluster_id = None
else:
_new_cluster_id = metadata.cluster_id

_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
_new_unauthorized_topics = set()
Expand All @@ -277,10 +286,21 @@ def update_metadata(self, metadata):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
for p_error, partition, leader, replicas, isr in partitions:
for partition_data in partitions:
leader_epoch = -1
offline_replicas = []
if metadata.API_VERSION >= 7:
p_error, partition, leader, leader_epoch, replicas, isr, offline_replicas = partition_data
elif metadata.API_VERSION >= 5:
p_error, partition, leader, replicas, isr, offline_replicas = partition_data
else:
p_error, partition, leader, replicas, isr = partition_data

_new_partitions[topic][partition] = PartitionMetadata(
topic=topic, partition=partition, leader=leader,
replicas=replicas, isr=isr, error=p_error)
topic=topic, partition=partition,
leader=leader, leader_epoch=leader_epoch,
replicas=replicas, isr=isr, offline_replicas=offline_replicas,
error=p_error)
if leader != -1:
_new_broker_partitions[leader].add(
TopicPartition(topic, partition))
Expand All @@ -306,6 +326,7 @@ def update_metadata(self, metadata):
with self._lock:
self._brokers = _new_brokers
self.controller = _new_controller
self.cluster_id = _new_cluster_id
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics
Expand Down
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class KafkaConsumer(six.Iterator):
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
allow_auto_create_topics (bool): Enable/disable auto topic creation
on metadata request. Only available with api_version >= (0, 11).
Default: True
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata, even if we haven't seen any
partition leadership changes to proactively discover any new
Expand Down Expand Up @@ -277,6 +280,7 @@ class KafkaConsumer(six.Iterator):
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
'check_crcs': True,
'allow_auto_create_topics': True,
'metadata_max_age_ms': 5 * 60 * 1000,
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'max_poll_records': 500,
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class KafkaProducer(object):
This setting will limit the number of record batches the producer
will send in a single request to avoid sending huge requests.
Default: 1048576.
allow_auto_create_topics (bool): Enable/disable auto topic creation
on metadata request. Only available with api_version >= (0, 11).
Default: True
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
Expand Down Expand Up @@ -314,6 +317,7 @@ class KafkaProducer(object):
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
'allow_auto_create_topics': True,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'request_timeout_ms': 30000,
Expand Down
60 changes: 58 additions & 2 deletions kafka/protocol/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,42 @@ class MetadataResponse_v5(Response):
)


class MetadataResponse_v6(Response):
"""Metadata Request/Response v6 is the same as v5,
but on quota violation, brokers send out responses before throttling."""
API_KEY = 3
API_VERSION = 6
SCHEMA = MetadataResponse_v5.SCHEMA


class MetadataResponse_v7(Response):
"""v7 adds per-partition leader_epoch field"""
API_KEY = 3
API_VERSION = 7
SCHEMA = Schema(
('throttle_time_ms', Int32),
('brokers', Array(
('node_id', Int32),
('host', String('utf-8')),
('port', Int32),
('rack', String('utf-8')))),
('cluster_id', String('utf-8')),
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('leader_epoch', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('offline_replicas', Array(Int32))))))
)


class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
Expand Down Expand Up @@ -190,11 +226,31 @@ class MetadataRequest_v5(Request):
NO_TOPICS = []


class MetadataRequest_v6(Request):
API_KEY = 3
API_VERSION = 6
RESPONSE_TYPE = MetadataResponse_v6
SCHEMA = MetadataRequest_v5.SCHEMA
ALL_TOPICS = None
NO_TOPICS = []


class MetadataRequest_v7(Request):
API_KEY = 3
API_VERSION = 7
RESPONSE_TYPE = MetadataResponse_v7
SCHEMA = MetadataRequest_v6.SCHEMA
ALL_TOPICS = None
NO_TOPICS = []


MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5,
MetadataRequest_v6, MetadataRequest_v7,
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,
MetadataResponse_v6, MetadataResponse_v7,
]
2 changes: 1 addition & 1 deletion kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
this partition metadata.
"""
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
["topic", "partition", "leader", "leader_epoch", "replicas", "isr", "offline_replicas", "error"])


"""The Kafka offset commit API
Expand Down
114 changes: 114 additions & 0 deletions test/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,117 @@ def test_empty_broker_list():
[], # empty brokers
[(17, 'foo', []), (17, 'bar', [])])) # topics w/ error
assert len(cluster.brokers()) == 2


def test_metadata_v0():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)],
[(0, 'topic-1', [(0, 0, 0, [0], [0])])]))
assert len(cluster.topics()) == 1
assert cluster.controller is None
assert cluster.cluster_id is None
assert cluster._partitions['topic-1'][0].offline_replicas == []
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v1():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[1](
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id is None
assert cluster._partitions['topic-1'][0].offline_replicas == []
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v2():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[2](
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == []
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v3():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[3](
0, # throttle_time_ms
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == []
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v4():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[4](
0, # throttle_time_ms
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == []
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v5():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[5](
0, # throttle_time_ms
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0], [12])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v6():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[6](
0, # throttle_time_ms
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, [0], [0], [12])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
assert cluster._partitions['topic-1'][0].leader_epoch == -1


def test_metadata_v7():
cluster = ClusterMetadata()
cluster.update_metadata(MetadataResponse[7](
0, # throttle_time_ms
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
'cluster-foo', # cluster_id
0, # controller_id
[(0, 'topic-1', False, [(0, 0, 0, 0, [0], [0], [12])])]))
assert len(cluster.topics()) == 1
assert cluster.controller == cluster.broker_metadata(0)
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
assert cluster._partitions['topic-1'][0].leader_epoch == 0