Skip to content

Commit 3c5dfef

Browse files
committed
Dont do client wakeup when sending from sender / same thread
1 parent 8e2ed3e commit 3c5dfef

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

kafka/client_async.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,14 +499,15 @@ def _can_send_request(self, node_id):
499499
return False
500500
return conn.connected() and conn.can_send_more()
501501

502-
def send(self, node_id, request):
502+
def send(self, node_id, request, wakeup=True):
503503
"""Send a request to a specific node. Bytes are placed on an
504504
internal per-connection send-queue. Actual network I/O will be
505505
triggered in a subsequent call to .poll()
506506
507507
Arguments:
508508
node_id (int): destination node
509509
request (Struct): request object (not-encoded)
510+
wakeup (bool): optional flag to disable thread-wakeup
510511
511512
Raises:
512513
AssertionError: if node_id is not in current cluster metadata
@@ -526,7 +527,8 @@ def send(self, node_id, request):
526527
# Wakeup signal is useful in case another thread is
527528
# blocked waiting for incoming network traffic while holding
528529
# the client lock in poll().
529-
self.wakeup()
530+
if wakeup:
531+
self.wakeup()
530532

531533
return future
532534

kafka/producer/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def run_once(self):
144144
for node_id, request in six.iteritems(requests):
145145
batches = batches_by_node[node_id]
146146
log.debug('Sending Produce Request: %r', request)
147-
(self._client.send(node_id, request)
147+
(self._client.send(node_id, request, wakeup=False)
148148
.add_callback(
149149
self._handle_produce_response, node_id, time.time(), batches)
150150
.add_errback(

0 commit comments

Comments
 (0)