Skip to content

Commit 59ac410

Browse files
committed
Dont do client wakeup when sending from sender / same thread
1 parent d032844 commit 59ac410

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

kafka/client_async.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,14 +496,15 @@ def _can_send_request(self, node_id):
496496
return False
497497
return conn.connected() and conn.can_send_more()
498498

499-
def send(self, node_id, request):
499+
def send(self, node_id, request, wakeup=True):
500500
"""Send a request to a specific node. Bytes are placed on an
501501
internal per-connection send-queue. Actual network I/O will be
502502
triggered in a subsequent call to .poll()
503503
504504
Arguments:
505505
node_id (int): destination node
506506
request (Struct): request object (not-encoded)
507+
wakeup (bool): optional flag to disable thread-wakeup
507508
508509
Raises:
509510
AssertionError: if node_id is not in current cluster metadata
@@ -523,7 +524,8 @@ def send(self, node_id, request):
523524
# Wakeup signal is useful in case another thread is
524525
# blocked waiting for incoming network traffic while holding
525526
# the client lock in poll().
526-
self.wakeup()
527+
if wakeup:
528+
self.wakeup()
527529

528530
return future
529531

kafka/producer/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def run_once(self):
144144
for node_id, request in six.iteritems(requests):
145145
batches = batches_by_node[node_id]
146146
log.debug('Sending Produce Request: %r', request)
147-
(self._client.send(node_id, request)
147+
(self._client.send(node_id, request, wakeup=False)
148148
.add_callback(
149149
self._handle_produce_response, node_id, time.time(), batches)
150150
.add_errback(

0 commit comments

Comments
 (0)