Skip to content

Commit 03ef066

Browse files
dpkp88manpreet
authored andcommitted
Always acquire client lock before coordinator lock to avoid deadlocks (dpkp#1464)
1 parent e650572 commit 03ef066

File tree

1 file changed

+34
-29
lines changed

1 file changed

+34
-29
lines changed

kafka/coordinator/base.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -940,37 +940,42 @@ def _run_once(self):
940940
self.disable()
941941
return
942942

943-
# TODO: When consumer.wakeup() is implemented, we need to
944-
# disable here to prevent propagating an exception to this
945-
# heartbeat thread
946-
self.coordinator._client.poll(timeout_ms=0)
947-
948-
if self.coordinator.coordinator_unknown():
949-
future = self.coordinator.lookup_coordinator()
950-
if not future.is_done or future.failed():
951-
# the immediate future check ensures that we backoff
952-
# properly in the case that no brokers are available
953-
# to connect to (and the future is automatically failed).
943+
# TODO: When consumer.wakeup() is implemented, we need to
944+
# disable here to prevent propagating an exception to this
945+
# heartbeat thread
946+
#
947+
# Release coordinator lock during client poll to avoid deadlocks
948+
# if/when connection errback needs coordinator lock
949+
self.coordinator._client.poll(timeout_ms=0)
950+
951+
if self.coordinator.coordinator_unknown():
952+
future = self.coordinator.lookup_coordinator()
953+
if not future.is_done or future.failed():
954+
# the immediate future check ensures that we backoff
955+
# properly in the case that no brokers are available
956+
# to connect to (and the future is automatically failed).
957+
with self.coordinator._lock:
954958
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
955959

956-
elif self.coordinator.heartbeat.session_timeout_expired():
957-
# the session timeout has expired without seeing a
958-
# successful heartbeat, so we should probably make sure
959-
# the coordinator is still healthy.
960-
log.warning('Heartbeat session expired, marking coordinator dead')
961-
self.coordinator.coordinator_dead('Heartbeat session expired')
962-
963-
elif self.coordinator.heartbeat.poll_timeout_expired():
964-
# the poll timeout has expired, which means that the
965-
# foreground thread has stalled in between calls to
966-
# poll(), so we explicitly leave the group.
967-
log.warning('Heartbeat poll expired, leaving group')
968-
self.coordinator.maybe_leave_group()
969-
970-
elif not self.coordinator.heartbeat.should_heartbeat():
971-
# poll again after waiting for the retry backoff in case
972-
# the heartbeat failed or the coordinator disconnected
973-
log.log(0, 'Not ready to heartbeat, waiting')
960+
elif self.coordinator.heartbeat.session_timeout_expired():
961+
# the session timeout has expired without seeing a
962+
# successful heartbeat, so we should probably make sure
963+
# the coordinator is still healthy.
964+
log.warning('Heartbeat session expired, marking coordinator dead')
965+
self.coordinator.coordinator_dead('Heartbeat session expired')
966+
967+
elif self.coordinator.heartbeat.poll_timeout_expired():
968+
# the poll timeout has expired, which means that the
969+
# foreground thread has stalled in between calls to
970+
# poll(), so we explicitly leave the group.
971+
log.warning('Heartbeat poll expired, leaving group')
972+
self.coordinator.maybe_leave_group()
973+
974+
elif not self.coordinator.heartbeat.should_heartbeat():
975+
# poll again after waiting for the retry backoff in case
976+
# the heartbeat failed or the coordinator disconnected
977+
log.log(0, 'Not ready to heartbeat, waiting')
978+
with self.coordinator._lock:
974979
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
975980

976981
else:

0 commit comments

Comments
 (0)