Skip to content

Avoid more cases of client wakeup when sending from sender / same thread #1765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def send(self, node_id, request, wakeup=True):
Future: resolves to Response struct or Error
"""
if not self._can_send_request(node_id):
self.maybe_connect(node_id)
self.maybe_connect(node_id, wakeup=wakeup)
return Future().failure(Errors.NodeNotReadyError(node_id))

# conn.send will queue the request internally
Expand All @@ -533,7 +533,7 @@ def send(self, node_id, request, wakeup=True):

return future

def poll(self, timeout_ms=None, future=None):
def poll(self, timeout_ms=None, future=None, wakeup=True):
"""Try to read and write to sockets.

This method will also attempt to complete node connections, refresh
Expand All @@ -545,6 +545,7 @@ def poll(self, timeout_ms=None, future=None):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
wakeup (bool): optional flag to disable thread-wakeup

Returns:
list: responses received (can be empty)
Expand All @@ -568,7 +569,7 @@ def poll(self, timeout_ms=None, future=None):
self._maybe_connect(node_id)

# Send a metadata request if needed
metadata_timeout_ms = self._maybe_refresh_metadata()
metadata_timeout_ms = self._maybe_refresh_metadata(wakeup=wakeup)

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
Expand Down Expand Up @@ -759,9 +760,12 @@ def add_topic(self, topic):
return self.cluster.request_update()

# This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self):
def _maybe_refresh_metadata(self, wakeup=True):
"""Send a metadata request if needed.

Arguments:
wakeup (bool): optional flag to disable thread-wakeup

Returns:
int: milliseconds until next refresh
"""
Expand Down Expand Up @@ -790,7 +794,7 @@ def _maybe_refresh_metadata(self):
api_version = 0 if self.config['api_version'] < (0, 10) else 1
request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future = self.send(node_id, request, wakeup=wakeup)
future.add_callback(self.cluster.update_metadata)
future.add_errback(self.cluster.failed_update)

Expand All @@ -807,7 +811,7 @@ def refresh_done(val_or_error):
if self._connecting:
return self.config['reconnect_backoff_ms']

if self.maybe_connect(node_id):
if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return self.config['reconnect_backoff_ms']

Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def run_once(self):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
self._client.poll(poll_timeout_ms)
self._client.poll(poll_timeout_ms, wakeup=False)

def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
Expand Down