Skip to content

Commit e93ab64

Browse files
authored
Do not ignore metadata response for single topic with error (#2640)
1 parent 9f7384c commit e93ab64

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

kafka/cluster.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,6 @@ def update_metadata(self, metadata):
245245
246246
Returns: None
247247
"""
248-
# In the common case where we ask for a single topic and get back an
249-
# error, we should fail the future
250-
if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno:
251-
error_code, topic = metadata.topics[0][:2]
252-
error = Errors.for_code(error_code)(topic)
253-
return self.failed_update(error)
254-
255248
if not metadata.brokers:
256249
log.warning("No broker metadata found in MetadataResponse -- ignoring.")
257250
return self.failed_update(Errors.MetadataEmptyBrokerList(metadata))
@@ -349,7 +342,15 @@ def update_metadata(self, metadata):
349342
self._last_successful_refresh_ms = now
350343

351344
if f:
352-
f.success(self)
345+
# In the common case where we ask for a single topic and get back an
346+
# error, we should fail the future
347+
if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno:
348+
error_code, topic = metadata.topics[0][:2]
349+
error = Errors.for_code(error_code)(topic)
350+
f.failure(error)
351+
else:
352+
f.success(self)
353+
353354
log.debug("Updated cluster metadata to %s", self)
354355

355356
for listener in self._listeners:

kafka/producer/kafka.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,9 +960,11 @@ def _wait_on_metadata(self, topic, max_wait_ms):
960960
future.add_both(lambda e, *args: e.set(), metadata_event)
961961
self._sender.wakeup()
962962
metadata_event.wait(timer.timeout_ms / 1000)
963-
if not metadata_event.is_set():
963+
if not future.is_done:
964964
raise Errors.KafkaTimeoutError(
965965
"Failed to update metadata after %.1f secs." % (max_wait_ms / 1000,))
966+
elif future.failed() and not future.retriable():
967+
raise future.exception
966968
elif topic in self._metadata.unauthorized_topics:
967969
raise Errors.TopicAuthorizationFailedError(set([topic]))
968970
else:

test/test_cluster.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,21 @@ def test_metadata_v7():
136136
assert cluster._partitions['topic-1'][0].leader_epoch == 0
137137

138138

139+
def test_unauthorized_topic():
140+
cluster = ClusterMetadata()
141+
assert len(cluster.brokers()) == 0
142+
143+
cluster.update_metadata(MetadataResponse[0](
144+
[(0, 'foo', 12), (1, 'bar', 34)],
145+
[(29, 'unauthorized-topic', [])])) # single topic w/ unauthorized error
146+
147+
# broker metadata should get updated
148+
assert len(cluster.brokers()) == 2
149+
150+
# topic should be added to unauthorized list
151+
assert 'unauthorized-topic' in cluster.unauthorized_topics
152+
153+
139154
def test_collect_hosts__happy_path():
140155
hosts = "127.0.0.1:1234,127.0.0.1"
141156
results = collect_hosts(hosts)

0 commit comments

Comments
 (0)