Skip to content

Commit ae25588

Browse files
authored
Merge pull request #68 from Yelp/u/baisang/STREAMINT-794
Cherry-pick fix to fetch consumer metadata
2 parents 720edb1 + faa6a2a commit ae25588

File tree

2 files changed

+34
-8
lines changed

2 files changed

+34
-8
lines changed

kafka/consumer/group.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -552,11 +552,9 @@ def committed(self, partition):
552552
committed = None
553553
return committed
554554

555-
def topics(self):
556-
"""Get all topics the user is authorized to view.
557-
558-
Returns:
559-
set: topics
555+
def _fetch_all_topic_metadata(self):
556+
"""A blocking call that fetches topic metadata for all topics in the
557+
cluster that the user is authorized to view.
560558
"""
561559
cluster = self._client.cluster
562560
if self._client._metadata_refresh_in_progress and self._client._topics:
@@ -567,18 +565,37 @@ def topics(self):
567565
future = cluster.request_update()
568566
self._client.poll(future=future)
569567
cluster.need_all_topic_metadata = stash
570-
return cluster.topics()
568+
569+
def topics(self):
570+
"""Get all topics the user is authorized to view.
571+
This will always issue a remote call to the cluster to fetch the latest
572+
information.
573+
574+
Returns:
575+
set: topics
576+
"""
577+
self._fetch_all_topic_metadata()
578+
return self._client.cluster.topics()
571579

572580
def partitions_for_topic(self, topic):
573-
"""Get metadata about the partitions for a given topic.
581+
"""This method first checks the local metadata cache for information
582+
about the topic. If the topic is not found (either because the topic
583+
does not exist, the user is not authorized to view the topic, or the
584+
metadata cache is not populated), then it will issue a metadata update
585+
call to the cluster.
574586
575587
Arguments:
576588
topic (str): Topic to check.
577589
578590
Returns:
579591
set: Partition ids
580592
"""
581-
return self._client.cluster.partitions_for_topic(topic)
593+
cluster = self._client.cluster
594+
partitions = cluster.partitions_for_topic(topic)
595+
if partitions is None:
596+
self._fetch_all_topic_metadata()
597+
partitions = cluster.partitions_for_topic(topic)
598+
return partitions
582599

583600
def poll(self, timeout_ms=0, max_records=None):
584601
"""Fetch data from assigned topics / partitions.

test/test_consumer_group.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ def test_consumer(kafka_broker, topic, version):
2929
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
3030
consumer.close()
3131

32+
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
33+
def test_consumer_topics(kafka_broker, topic, version):
34+
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
35+
# Necessary to drive the IO
36+
consumer.poll(500)
37+
consumer_topics = consumer.topics()
38+
assert topic in consumer_topics
39+
assert len(consumer.partitions_for_topic(topic)) > 0
40+
consumer.close()
3241

3342
@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
3443
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")

0 commit comments

Comments
 (0)