Skip to content

Use the controller for topic metadata requests #1995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 6, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def delete_topics(self, topics, timeout_ms=None):
return response


def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be cleaner if we could specify the broker ID here, and then the caller can pass in a function that identifies the controller... but looks like that's not how self._send_request_to_controller() is wired up. And it'd be hard to change because of needing to handle retries. So let's just go with this as-is (especially since it's non-public).

def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
"""
topics == None means "get all topics"
"""
Expand All @@ -492,6 +492,9 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
allow_auto_topic_creation=auto_topic_creation
)

if use_controller:
return self._send_request_to_controller(request)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
Expand All @@ -505,7 +508,7 @@ def list_topics(self):
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
obj = metadata.to_object()
return obj['topics']

Expand Down