We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 12d2765 commit e8ccee7Copy full SHA for e8ccee7
kafka/coordinator/base.py
@@ -945,7 +945,11 @@ def _run_once(self):
945
self.coordinator._client.poll(timeout_ms=0)
946
947
if self.coordinator.coordinator_unknown():
948
- if not self.coordinator.lookup_coordinator().is_done:
+ future = self.coordinator.lookup_coordinator()
949
+ if not future.is_done or future.failed():
950
+ # the immediate future check ensures that we backoff
951
+ # properly in the case that no brokers are available
952
+ # to connect to (and the future is automatically failed).
953
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
954
955
elif self.coordinator.heartbeat.session_timeout_expired():
0 commit comments