Skip to content

Commit a731b18

Browse files
authored
Add support for Metadata Request/Response v7 (#2497)
1 parent 85a113a commit a731b18

File tree

7 files changed

+214
-8
lines changed

7 files changed

+214
-8
lines changed

kafka/client_async.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ class KafkaClient(object):
102102
which we force a refresh of metadata even if we haven't seen any
103103
partition leadership changes to proactively discover any new
104104
brokers or partitions. Default: 300000
105+
allow_auto_create_topics (bool): Enable/disable auto topic creation
106+
on metadata request. Only available with api_version >= (0, 11).
107+
Default: True
105108
security_protocol (str): Protocol used to communicate with brokers.
106109
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
107110
Default: PLAINTEXT.
@@ -184,6 +187,7 @@ class KafkaClient(object):
184187
'sock_chunk_bytes': 4096, # undocumented experimental option
185188
'sock_chunk_buffer_count': 1000, # undocumented experimental option
186189
'retry_backoff_ms': 100,
190+
'allow_auto_create_topics': True,
187191
'metadata_max_age_ms': 300000,
188192
'security_protocol': 'PLAINTEXT',
189193
'ssl_context': None,
@@ -878,10 +882,13 @@ def _maybe_refresh_metadata(self, wakeup=False):
878882
if not topics and self.cluster.is_bootstrap(node_id):
879883
topics = list(self.config['bootstrap_topics_filter'])
880884

881-
api_version = self.api_version(MetadataRequest, max_version=1)
885+
api_version = self.api_version(MetadataRequest, max_version=7)
882886
if self.cluster.need_all_topic_metadata or not topics:
883887
topics = MetadataRequest[api_version].ALL_TOPICS
884-
request = MetadataRequest[api_version](topics)
888+
if api_version >= 4:
889+
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'])
890+
else:
891+
request = MetadataRequest[api_version](topics)
885892
log.debug("Sending metadata request %s to node %s", request, node_id)
886893
future = self.send(node_id, request, wakeup=wakeup)
887894
future.add_callback(self.cluster.update_metadata)

kafka/cluster.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(self, **configs):
5858
self.unauthorized_topics = set()
5959
self.internal_topics = set()
6060
self.controller = None
61+
self.cluster_id = None
6162

6263
self.config = copy.copy(self.DEFAULT_CONFIG)
6364
for key in self.config:
@@ -234,6 +235,9 @@ def update_metadata(self, metadata):
234235
235236
Returns: None
236237
"""
238+
if metadata.API_VERSION >= 3 and metadata.throttle_time_ms > 0:
239+
log.warning("MetadataRequest throttled by broker (%d ms)", metadata.throttle_time_ms)
240+
237241
# In the common case where we ask for a single topic and get back an
238242
# error, we should fail the future
239243
if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno:
@@ -261,6 +265,11 @@ def update_metadata(self, metadata):
261265
else:
262266
_new_controller = _new_brokers.get(metadata.controller_id)
263267

268+
if metadata.API_VERSION < 2:
269+
_new_cluster_id = None
270+
else:
271+
_new_cluster_id = metadata.cluster_id
272+
264273
_new_partitions = {}
265274
_new_broker_partitions = collections.defaultdict(set)
266275
_new_unauthorized_topics = set()
@@ -277,10 +286,21 @@ def update_metadata(self, metadata):
277286
error_type = Errors.for_code(error_code)
278287
if error_type is Errors.NoError:
279288
_new_partitions[topic] = {}
280-
for p_error, partition, leader, replicas, isr in partitions:
289+
for partition_data in partitions:
290+
leader_epoch = -1
291+
offline_replicas = []
292+
if metadata.API_VERSION >= 7:
293+
p_error, partition, leader, leader_epoch, replicas, isr, offline_replicas = partition_data
294+
elif metadata.API_VERSION >= 5:
295+
p_error, partition, leader, replicas, isr, offline_replicas = partition_data
296+
else:
297+
p_error, partition, leader, replicas, isr = partition_data
298+
281299
_new_partitions[topic][partition] = PartitionMetadata(
282-
topic=topic, partition=partition, leader=leader,
283-
replicas=replicas, isr=isr, error=p_error)
300+
topic=topic, partition=partition,
301+
leader=leader, leader_epoch=leader_epoch,
302+
replicas=replicas, isr=isr, offline_replicas=offline_replicas,
303+
error=p_error)
284304
if leader != -1:
285305
_new_broker_partitions[leader].add(
286306
TopicPartition(topic, partition))
@@ -306,6 +326,7 @@ def update_metadata(self, metadata):
306326
with self._lock:
307327
self._brokers = _new_brokers
308328
self.controller = _new_controller
329+
self.cluster_id = _new_cluster_id
309330
self._partitions = _new_partitions
310331
self._broker_partitions = _new_broker_partitions
311332
self.unauthorized_topics = _new_unauthorized_topics

kafka/consumer/group.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ class KafkaConsumer(six.Iterator):
118118
consumed. This ensures no on-the-wire or on-disk corruption to
119119
the messages occurred. This check adds some overhead, so it may
120120
be disabled in cases seeking extreme performance. Default: True
121+
allow_auto_create_topics (bool): Enable/disable auto topic creation
122+
on metadata request. Only available with api_version >= (0, 11).
123+
Default: True
121124
metadata_max_age_ms (int): The period of time in milliseconds after
122125
which we force a refresh of metadata, even if we haven't seen any
123126
partition leadership changes to proactively discover any new
@@ -277,6 +280,7 @@ class KafkaConsumer(six.Iterator):
277280
'auto_commit_interval_ms': 5000,
278281
'default_offset_commit_callback': lambda offsets, response: True,
279282
'check_crcs': True,
283+
'allow_auto_create_topics': True,
280284
'metadata_max_age_ms': 5 * 60 * 1000,
281285
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
282286
'max_poll_records': 500,

kafka/producer/kafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ class KafkaProducer(object):
188188
This setting will limit the number of record batches the producer
189189
will send in a single request to avoid sending huge requests.
190190
Default: 1048576.
191+
allow_auto_create_topics (bool): Enable/disable auto topic creation
192+
on metadata request. Only available with api_version >= (0, 11).
193+
Default: True
191194
metadata_max_age_ms (int): The period of time in milliseconds after
192195
which we force a refresh of metadata even if we haven't seen any
193196
partition leadership changes to proactively discover any new
@@ -314,6 +317,7 @@ class KafkaProducer(object):
314317
'connections_max_idle_ms': 9 * 60 * 1000,
315318
'max_block_ms': 60000,
316319
'max_request_size': 1048576,
320+
'allow_auto_create_topics': True,
317321
'metadata_max_age_ms': 300000,
318322
'retry_backoff_ms': 100,
319323
'request_timeout_ms': 30000,

kafka/protocol/metadata.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,42 @@ class MetadataResponse_v5(Response):
128128
)
129129

130130

131+
class MetadataResponse_v6(Response):
132+
"""Metadata Request/Response v6 is the same as v5,
133+
but on quota violation, brokers send out responses before throttling."""
134+
API_KEY = 3
135+
API_VERSION = 6
136+
SCHEMA = MetadataResponse_v5.SCHEMA
137+
138+
139+
class MetadataResponse_v7(Response):
140+
"""v7 adds per-partition leader_epoch field"""
141+
API_KEY = 3
142+
API_VERSION = 7
143+
SCHEMA = Schema(
144+
('throttle_time_ms', Int32),
145+
('brokers', Array(
146+
('node_id', Int32),
147+
('host', String('utf-8')),
148+
('port', Int32),
149+
('rack', String('utf-8')))),
150+
('cluster_id', String('utf-8')),
151+
('controller_id', Int32),
152+
('topics', Array(
153+
('error_code', Int16),
154+
('topic', String('utf-8')),
155+
('is_internal', Boolean),
156+
('partitions', Array(
157+
('error_code', Int16),
158+
('partition', Int32),
159+
('leader', Int32),
160+
('leader_epoch', Int32),
161+
('replicas', Array(Int32)),
162+
('isr', Array(Int32)),
163+
('offline_replicas', Array(Int32))))))
164+
)
165+
166+
131167
class MetadataRequest_v0(Request):
132168
API_KEY = 3
133169
API_VERSION = 0
@@ -190,11 +226,31 @@ class MetadataRequest_v5(Request):
190226
NO_TOPICS = []
191227

192228

229+
class MetadataRequest_v6(Request):
230+
API_KEY = 3
231+
API_VERSION = 6
232+
RESPONSE_TYPE = MetadataResponse_v6
233+
SCHEMA = MetadataRequest_v5.SCHEMA
234+
ALL_TOPICS = None
235+
NO_TOPICS = []
236+
237+
238+
class MetadataRequest_v7(Request):
239+
API_KEY = 3
240+
API_VERSION = 7
241+
RESPONSE_TYPE = MetadataResponse_v7
242+
SCHEMA = MetadataRequest_v6.SCHEMA
243+
ALL_TOPICS = None
244+
NO_TOPICS = []
245+
246+
193247
MetadataRequest = [
194248
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
195-
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
249+
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5,
250+
MetadataRequest_v6, MetadataRequest_v7,
196251
]
197252
MetadataResponse = [
198253
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
199-
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
254+
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,
255+
MetadataResponse_v6, MetadataResponse_v7,
200256
]

kafka/structs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
this partition metadata.
4343
"""
4444
PartitionMetadata = namedtuple("PartitionMetadata",
45-
["topic", "partition", "leader", "replicas", "isr", "error"])
45+
["topic", "partition", "leader", "leader_epoch", "replicas", "isr", "offline_replicas", "error"])
4646

4747

4848
"""The Kafka offset commit API

test/test_cluster.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,117 @@ def test_empty_broker_list():
2020
[], # empty brokers
2121
[(17, 'foo', []), (17, 'bar', [])])) # topics w/ error
2222
assert len(cluster.brokers()) == 2
23+
24+
25+
def test_metadata_v0():
26+
cluster = ClusterMetadata()
27+
cluster.update_metadata(MetadataResponse[0](
28+
[(0, 'foo', 12), (1, 'bar', 34)],
29+
[(0, 'topic-1', [(0, 0, 0, [0], [0])])]))
30+
assert len(cluster.topics()) == 1
31+
assert cluster.controller is None
32+
assert cluster.cluster_id is None
33+
assert cluster._partitions['topic-1'][0].offline_replicas == []
34+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
35+
36+
37+
def test_metadata_v1():
38+
cluster = ClusterMetadata()
39+
cluster.update_metadata(MetadataResponse[1](
40+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
41+
0, # controller_id
42+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
43+
assert len(cluster.topics()) == 1
44+
assert cluster.controller == cluster.broker_metadata(0)
45+
assert cluster.cluster_id is None
46+
assert cluster._partitions['topic-1'][0].offline_replicas == []
47+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
48+
49+
50+
def test_metadata_v2():
51+
cluster = ClusterMetadata()
52+
cluster.update_metadata(MetadataResponse[2](
53+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
54+
'cluster-foo', # cluster_id
55+
0, # controller_id
56+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
57+
assert len(cluster.topics()) == 1
58+
assert cluster.controller == cluster.broker_metadata(0)
59+
assert cluster.cluster_id == 'cluster-foo'
60+
assert cluster._partitions['topic-1'][0].offline_replicas == []
61+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
62+
63+
64+
def test_metadata_v3():
65+
cluster = ClusterMetadata()
66+
cluster.update_metadata(MetadataResponse[3](
67+
0, # throttle_time_ms
68+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
69+
'cluster-foo', # cluster_id
70+
0, # controller_id
71+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
72+
assert len(cluster.topics()) == 1
73+
assert cluster.controller == cluster.broker_metadata(0)
74+
assert cluster.cluster_id == 'cluster-foo'
75+
assert cluster._partitions['topic-1'][0].offline_replicas == []
76+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
77+
78+
79+
def test_metadata_v4():
80+
cluster = ClusterMetadata()
81+
cluster.update_metadata(MetadataResponse[4](
82+
0, # throttle_time_ms
83+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
84+
'cluster-foo', # cluster_id
85+
0, # controller_id
86+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0])])]))
87+
assert len(cluster.topics()) == 1
88+
assert cluster.controller == cluster.broker_metadata(0)
89+
assert cluster.cluster_id == 'cluster-foo'
90+
assert cluster._partitions['topic-1'][0].offline_replicas == []
91+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
92+
93+
94+
def test_metadata_v5():
95+
cluster = ClusterMetadata()
96+
cluster.update_metadata(MetadataResponse[5](
97+
0, # throttle_time_ms
98+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
99+
'cluster-foo', # cluster_id
100+
0, # controller_id
101+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0], [12])])]))
102+
assert len(cluster.topics()) == 1
103+
assert cluster.controller == cluster.broker_metadata(0)
104+
assert cluster.cluster_id == 'cluster-foo'
105+
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
106+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
107+
108+
109+
def test_metadata_v6():
110+
cluster = ClusterMetadata()
111+
cluster.update_metadata(MetadataResponse[6](
112+
0, # throttle_time_ms
113+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
114+
'cluster-foo', # cluster_id
115+
0, # controller_id
116+
[(0, 'topic-1', False, [(0, 0, 0, [0], [0], [12])])]))
117+
assert len(cluster.topics()) == 1
118+
assert cluster.controller == cluster.broker_metadata(0)
119+
assert cluster.cluster_id == 'cluster-foo'
120+
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
121+
assert cluster._partitions['topic-1'][0].leader_epoch == -1
122+
123+
124+
def test_metadata_v7():
125+
cluster = ClusterMetadata()
126+
cluster.update_metadata(MetadataResponse[7](
127+
0, # throttle_time_ms
128+
[(0, 'foo', 12, 'rack-1'), (1, 'bar', 34, 'rack-2')],
129+
'cluster-foo', # cluster_id
130+
0, # controller_id
131+
[(0, 'topic-1', False, [(0, 0, 0, 0, [0], [0], [12])])]))
132+
assert len(cluster.topics()) == 1
133+
assert cluster.controller == cluster.broker_metadata(0)
134+
assert cluster.cluster_id == 'cluster-foo'
135+
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
136+
assert cluster._partitions['topic-1'][0].leader_epoch == 0

0 commit comments

Comments
 (0)