Skip to content

Wait for next heartbeat in thread loop; check for connected coordinator #2622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ def coordinator(self):
else:
return self.coordinator_id

def connected(self):
"""Return True iff the coordinator node is connected"""
with self._lock:
return self.coordinator_id is not None and self._client.connected(self.coordinator_id)

def ensure_coordinator_ready(self, timeout_ms=None):
"""Block until the coordinator for this group is known.

Expand Down Expand Up @@ -1058,28 +1063,28 @@ def _run_once(self):
self.coordinator._client._lock.acquire()
self.coordinator._lock.acquire()
try:
if self.enabled and self.coordinator.state is MemberState.STABLE:
# TODO: When consumer.wakeup() is implemented, we need to
# disable here to prevent propagating an exception to this
# heartbeat thread
# must get client._lock, or maybe deadlock at heartbeat
# failure callback in consumer poll
self.coordinator._client.poll(timeout_ms=0)

if not self.enabled:
heartbeat_log.debug('Heartbeat disabled. Waiting')
self.coordinator._client._lock.release()
self.coordinator._lock.wait()
heartbeat_log.debug('Heartbeat re-enabled.')
if self.enabled:
heartbeat_log.debug('Heartbeat re-enabled.')
return

elif self.coordinator.state is not MemberState.STABLE:
if self.coordinator.state is not MemberState.STABLE:
# the group is not stable (perhaps because we left the
# group or because the coordinator kicked us out), so
# disable heartbeats and wait for the main thread to rejoin.
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
self.disable()
return

# TODO: When consumer.wakeup() is implemented, we need to
# disable here to prevent propagating an exception to this
# heartbeat thread
self.coordinator._client.poll(timeout_ms=0)

elif self.coordinator.coordinator_unknown():
if self.coordinator.coordinator_unknown():
future = self.coordinator.lookup_coordinator()
if not future.is_done or future.failed():
# the immediate future check ensures that we backoff
Expand All @@ -1088,6 +1093,10 @@ def _run_once(self):
self.coordinator._client._lock.release()
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

elif not self.coordinator.connected():
self.coordinator._client._lock.release()
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

elif self.coordinator.heartbeat.session_timeout_expired():
# the session timeout has expired without seeing a
# successful heartbeat, so we should probably make sure
Expand All @@ -1103,11 +1112,10 @@ def _run_once(self):
self.coordinator.maybe_leave_group()

elif not self.coordinator.heartbeat.should_heartbeat():
# poll again after waiting for the retry backoff in case
# the heartbeat failed or the coordinator disconnected
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
next_hb = self.coordinator.heartbeat.time_to_next_heartbeat()
heartbeat_log.debug('Waiting %0.1f secs to send next heartbeat', next_hb)
self.coordinator._client._lock.release()
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
self.coordinator._lock.wait(next_hb)

else:
self.coordinator.heartbeat.sent_heartbeat()
Expand Down
1 change: 1 addition & 0 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ def test_heartbeat(mocker, patched_coord):
heartbeat.enable()
patched_coord.state = MemberState.STABLE
mocker.spy(patched_coord, '_send_heartbeat_request')
mocker.patch.object(patched_coord, 'connected', return_value=True)
mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True)
heartbeat._run_once()
assert patched_coord._send_heartbeat_request.call_count == 1
Expand Down
Loading