Skip to content

Commit a927ff2

Browse files
hackaugustorushidave
authored andcommitted
bugfix: fix infinite loop on KafkaAdminClient (dpkp#2194)
An infinite loop may happen with the following pattern: self._send_request_to_node(self._client.least_loaded_node(), request) The problem happens when `self._client`'s cluster metadata is out-of-date, and the result of `least_loaded_node()` is a node that has been removed from the cluster but the client is unware of it. When this happens `_send_request_to_node` will enter an infinite loop waiting for the chosen node to become available, which won't happen, resulting in an infinite loop. This commit introduces a new method named `_send_request_to_least_loaded_node` which handles the case above. This is done by regularly checking if the target node is available in the cluster metadata, and if not, a new node is chosen. Notes: - This does not yet cover every call site to `_send_request_to_node`, there are some other places were similar race conditions may happen. - The code above does not guarantee that the request itself will be sucessful, since it is still possible for the target node to exit, however, it does remove the infinite loop which can render client code unusable.
1 parent 6efff52 commit a927ff2

File tree

1 file changed

+37
-15
lines changed

1 file changed

+37
-15
lines changed

kafka/admin/client.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def _refresh_controller_id(self):
274274
version = self._matching_api_version(MetadataRequest)
275275
if 1 <= version <= 6:
276276
request = MetadataRequest[version]()
277-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
277+
future = self._send_request_to_least_loaded_node(request)
278278

279279
self._wait_for_futures([future])
280280

@@ -312,7 +312,7 @@ def _find_coordinator_id_send_request(self, group_id):
312312
raise NotImplementedError(
313313
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
314314
.format(version))
315-
return self._send_request_to_node(self._client.least_loaded_node(), request)
315+
return self._send_request_to_least_loaded_node(request)
316316

317317
def _find_coordinator_id_process_response(self, response):
318318
"""Process a FindCoordinatorResponse.
@@ -357,9 +357,36 @@ def _find_coordinator_ids(self, group_ids):
357357
}
358358
return groups_coordinators
359359

360-
def _send_request_to_node(self, node_id, request, wakeup=True):
360+
def _send_request_to_least_loaded_node(self, request):
361+
"""Send a Kafka protocol message to the least loaded broker.
362+
363+
Returns a future that may be polled for status and results.
364+
365+
:param request: The message to send.
366+
:return: A future object that may be polled for status and results.
367+
:exception: The exception if the message could not be sent.
368+
"""
369+
node_id = self._client.least_loaded_node()
370+
while not self._client.ready(node_id):
371+
# poll until the connection to broker is ready, otherwise send()
372+
# will fail with NodeNotReadyError
373+
self._client.poll()
374+
375+
# node_id is not part of the cluster anymore, choose a new broker
376+
# to connect to
377+
if self._client.cluster.broker_metadata(node_id) is None:
378+
node_id = self._client.least_loaded_node()
379+
380+
return self._client.send(node_id, request)
381+
382+
def _send_request_to_node(self, node_id, request):
361383
"""Send a Kafka protocol message to a specific broker.
362384
385+
.. note::
386+
387+
This function will enter in an infinite loop if `node_id` is
388+
removed from the cluster.
389+
363390
Returns a future that may be polled for status and results.
364391
365392
:param node_id: The broker id to which to send the message.
@@ -509,10 +536,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
509536
allow_auto_topic_creation=auto_topic_creation
510537
)
511538

512-
future = self._send_request_to_node(
513-
self._client.least_loaded_node(),
514-
request
515-
)
539+
future = self._send_request_to_least_loaded_node(request)
516540
self._wait_for_futures([future])
517541
return future.value
518542

@@ -604,7 +628,7 @@ def describe_acls(self, acl_filter):
604628
.format(version)
605629
)
606630

607-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
631+
future = self._send_request_to_least_loaded_node(request)
608632
self._wait_for_futures([future])
609633
response = future.value
610634

@@ -695,7 +719,7 @@ def create_acls(self, acls):
695719
.format(version)
696720
)
697721

698-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
722+
future = self._send_request_to_least_loaded_node(request)
699723
self._wait_for_futures([future])
700724
response = future.value
701725

@@ -789,7 +813,7 @@ def delete_acls(self, acl_filters):
789813
.format(version)
790814
)
791815

792-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
816+
future = self._send_request_to_least_loaded_node(request)
793817
self._wait_for_futures([future])
794818
response = future.value
795819

@@ -849,8 +873,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
849873
))
850874

851875
if len(topic_resources) > 0:
852-
futures.append(self._send_request_to_node(
853-
self._client.least_loaded_node(),
876+
futures.append(self._send_request_to_least_loaded_node(
854877
DescribeConfigsRequest[version](resources=topic_resources)
855878
))
856879

@@ -870,8 +893,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
870893
))
871894

872895
if len(topic_resources) > 0:
873-
futures.append(self._send_request_to_node(
874-
self._client.least_loaded_node(),
896+
futures.append(self._send_request_to_least_loaded_node(
875897
DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
876898
))
877899
else:
@@ -918,7 +940,7 @@ def alter_configs(self, config_resources):
918940
# // a single request that may be sent to any broker.
919941
#
920942
# So this is currently broken as it always sends to the least_loaded_node()
921-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
943+
future = self._send_request_to_least_loaded_node(request)
922944

923945
self._wait_for_futures([future])
924946
response = future.value

0 commit comments

Comments
 (0)