Skip to content

Commit 6985761

Browse files
hackaugustorushidave
authored andcommitted
bugfix: infinite loop when send msgs to controller (dpkp#2194)
If the value `_controller_id` is out-of-date and the node was removed from the cluster, `_send_request_to_node` would enter an infinite loop.
1 parent a927ff2 commit 6985761

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

kafka/admin/client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,10 +412,23 @@ def _send_request_to_controller(self, request):
412412
tries = 2 # in case our cached self._controller_id is outdated
413413
while tries:
414414
tries -= 1
415-
future = self._send_request_to_node(self._controller_id, request)
415+
future = self._client.send(self._controller_id, request)
416416

417417
self._wait_for_futures([future])
418418

419+
if future.exception is not None:
420+
log.error(
421+
"Sending request to controller_id %s failed with %s",
422+
self._controller_id,
423+
future.exception,
424+
)
425+
is_outdated_controler = (
426+
self._client.cluster.broker_metadata(self._controller_id) is None
427+
)
428+
if is_outdated_controler:
429+
self._refresh_controller_id()
430+
continue
431+
419432
response = future.value
420433
# In Java, the error field name is inconsistent:
421434
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors

0 commit comments

Comments
 (0)