Skip to content

Commit 921c553

Browse files
dpkpjeffwidman
authored andcommitted
Attempt to join heartbeat thread during close() (#1735)
Underlying issue here is a race on consumer.close() between the client, the connections/sockets, and the heartbeat thread. Although the heartbeat thread is signaled to close, we do not block for it. So when we go on to close the client and its underlying connections, if the heartbeat is still doing work it can cause errors/crashes if it attempts to access the now closed objects (selectors and/or sockets, primarily). So this commit adds a blocking thread join to the heartbeat close. This may cause some additional blocking time on consumer.close() while the heartbeat thread finishes. But it should be small in average case and in the worst case will be no longer than the heartbeat_timeout_ms (though if we timeout the join, race errors may still occur). Fix #1666
1 parent 7965460 commit 921c553

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

kafka/coordinator/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,9 +752,8 @@ def __del__(self):
752752
def close(self):
753753
"""Close the coordinator, leave the current group,
754754
and reset local generation / member_id"""
755-
with self._client._lock, self._lock:
756-
self._close_heartbeat_thread()
757-
self.maybe_leave_group()
755+
self._close_heartbeat_thread()
756+
self.maybe_leave_group()
758757

759758
def maybe_leave_group(self):
760759
"""Leave the current group and reset local generation/memberId."""
@@ -918,6 +917,10 @@ def close(self):
918917
self.closed = True
919918
with self.coordinator._lock:
920919
self.coordinator._lock.notify()
920+
if self.is_alive():
921+
self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
922+
if self.is_alive():
923+
log.warning("Heartbeat thread did not fully terminate during close")
921924

922925
def run(self):
923926
try:

0 commit comments

Comments
 (0)