Skip to content

Commit 545cdb1

Browse files
committed
Avoid call to wakeup from sender thread for maybe_connect
1 parent 3c5dfef commit 545cdb1

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

kafka/client_async.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,15 @@ def _conn_state_change(self, node_id, conn):
321321
log.warning("Node %s connection failed -- refreshing metadata", node_id)
322322
self.cluster.request_update()
323323

324-
def maybe_connect(self, node_id):
324+
def maybe_connect(self, node_id, wakeup=True):
325325
"""Queues a node for asynchronous connection during the next .poll()"""
326326
if self._can_connect(node_id):
327327
self._connecting.add(node_id)
328328
# Wakeup signal is useful in case another thread is
329329
# blocked waiting for incoming network traffic while holding
330330
# the client lock in poll().
331-
self.wakeup()
331+
if wakeup:
332+
self.wakeup()
332333
return True
333334
return False
334335

kafka/producer/sender.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ def run_once(self):
105105
# remove any nodes we aren't ready to send to
106106
not_ready_timeout = float('inf')
107107
for node in list(ready_nodes):
108-
if not self._client.ready(node):
108+
if not self._client.is_ready(node):
109109
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
110+
self._client.maybe_connect(node, wakeup=False)
110111
ready_nodes.remove(node)
111112
not_ready_timeout = min(not_ready_timeout,
112113
self._client.connection_delay(node))

0 commit comments

Comments
 (0)