|
1 |
| -from __future__ import absolute_import |
| 1 | +from __future__ import absolute_import, division |
2 | 2 |
|
3 | 3 | from collections import defaultdict
|
4 | 4 | import copy
|
5 | 5 | import logging
|
6 | 6 | import socket
|
| 7 | +import time |
7 | 8 |
|
8 | 9 | from . import ConfigResourceType
|
9 | 10 | from kafka.vendor import six
|
@@ -273,24 +274,33 @@ def _validate_timeout(self, timeout_ms):
|
273 | 274 | """
|
274 | 275 | return timeout_ms or self.config['request_timeout_ms']
|
275 | 276 |
|
276 |
| - def _refresh_controller_id(self): |
| 277 | + def _refresh_controller_id(self, timeout_ms=30000): |
277 | 278 | """Determine the Kafka cluster controller."""
|
278 | 279 | version = self._matching_api_version(MetadataRequest)
|
279 | 280 | if 1 <= version <= 6:
|
280 |
| - request = MetadataRequest[version]() |
281 |
| - future = self._send_request_to_node(self._client.least_loaded_node(), request) |
282 |
| - |
283 |
| - self._wait_for_futures([future]) |
284 |
| - |
285 |
| - response = future.value |
286 |
| - controller_id = response.controller_id |
287 |
| - # verify the controller is new enough to support our requests |
288 |
| - controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) |
289 |
| - if controller_version < (0, 10, 0): |
290 |
| - raise IncompatibleBrokerVersion( |
291 |
| - "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." |
292 |
| - .format(controller_version)) |
293 |
| - self._controller_id = controller_id |
| 281 | + timeout_at = time.time() + timeout_ms / 1000 |
| 282 | + while time.time() < timeout_at: |
| 283 | + request = MetadataRequest[version]() |
| 284 | + future = self._send_request_to_node(self._client.least_loaded_node(), request) |
| 285 | + |
| 286 | + self._wait_for_futures([future]) |
| 287 | + |
| 288 | + response = future.value |
| 289 | + controller_id = response.controller_id |
| 290 | + if controller_id == -1: |
| 291 | + log.warning("Controller ID not available, got -1") |
| 292 | + time.sleep(1) |
| 293 | + continue |
| 294 | + # verify the controller is new enough to support our requests |
| 295 | + controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) |
| 296 | + if controller_version < (0, 10, 0): |
| 297 | + raise IncompatibleBrokerVersion( |
| 298 | + "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." |
| 299 | + .format(controller_version)) |
| 300 | + self._controller_id = controller_id |
| 301 | + return |
| 302 | + else: |
| 303 | + raise Errors.NodeNotAvailableError('controller') |
294 | 304 | else:
|
295 | 305 | raise UnrecognizedBrokerVersion(
|
296 | 306 | "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
|
|
0 commit comments