Skip to content

Commit 17fcdb1

Browse files
dpkp88manpreet
authored andcommitted
Always acquire client lock before coordinator lock to avoid deadlocks (dpkp#1464)
1 parent ee8b943 commit 17fcdb1

File tree

1 file changed

+64
-59
lines changed

1 file changed

+64
-59
lines changed

kafka/coordinator/base.py

Lines changed: 64 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -231,20 +231,19 @@ def coordinator(self):
231231
232232
Returns: the current coordinator id or None if it is unknown
233233
"""
234-
with self._lock:
235-
if self.coordinator_id is None:
236-
return None
237-
elif self._client.is_disconnected(self.coordinator_id):
238-
self.coordinator_dead('Node Disconnected')
239-
return None
240-
else:
241-
return self.coordinator_id
234+
if self.coordinator_id is None:
235+
return None
236+
elif self._client.is_disconnected(self.coordinator_id):
237+
self.coordinator_dead('Node Disconnected')
238+
return None
239+
else:
240+
return self.coordinator_id
242241

243242
def ensure_coordinator_ready(self):
244243
"""Block until the coordinator for this group is known
245244
(and we have an active connection -- java client uses unsent queue).
246245
"""
247-
with self._lock:
246+
with self._client._lock, self._lock:
248247
while self.coordinator_unknown():
249248

250249
# Prior to 0.8.2 there was no group coordinator
@@ -274,17 +273,18 @@ def _reset_find_coordinator_future(self, result):
274273
self._find_coordinator_future = None
275274

276275
def lookup_coordinator(self):
277-
if self._find_coordinator_future is not None:
278-
return self._find_coordinator_future
279-
280-
# If there is an error sending the group coordinator request
281-
# then _reset_find_coordinator_future will immediately fire and
282-
# set _find_coordinator_future = None
283-
# To avoid returning None, we capture the future in a local variable
284-
self._find_coordinator_future = self._send_group_coordinator_request()
285-
future = self._find_coordinator_future
286-
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
287-
return future
276+
with self._client._lock, self._lock:
277+
if self._find_coordinator_future is not None:
278+
return self._find_coordinator_future
279+
280+
# If there is an error sending the group coordinator request
281+
# then _reset_find_coordinator_future will immediately fire and
282+
# set _find_coordinator_future = None
283+
# To avoid returning None, we capture the future in a local variable
284+
future = self._send_group_coordinator_request()
285+
self._find_coordinator_future = future
286+
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
287+
return future
288288

289289
def need_rejoin(self):
290290
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -487,7 +487,7 @@ def _handle_join_group_response(self, future, send_time, response):
487487
log.debug("Received successful JoinGroup response for group %s: %s",
488488
self.group_id, response)
489489
self.sensors.join_latency.record((time.time() - send_time) * 1000)
490-
with self._lock:
490+
with self._client._lock, self._lock:
491491
if self.state is not MemberState.REBALANCING:
492492
# if the consumer was woken up before a rebalance completes,
493493
# we may have already left the group. In this case, we do
@@ -663,7 +663,7 @@ def _handle_group_coordinator_response(self, future, response):
663663

664664
error_type = Errors.for_code(response.error_code)
665665
if error_type is Errors.NoError:
666-
with self._lock:
666+
with self._client._lock, self._lock:
667667
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
668668
if not ok:
669669
# This could happen if coordinator metadata is different
@@ -693,11 +693,10 @@ def _handle_group_coordinator_response(self, future, response):
693693

694694
def coordinator_dead(self, error):
695695
"""Mark the current coordinator as dead."""
696-
with self._lock:
697-
if self.coordinator_id is not None:
698-
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
699-
self.coordinator_id, self.group_id, error)
700-
self.coordinator_id = None
696+
if self.coordinator_id is not None:
697+
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
698+
self.coordinator_id, self.group_id, error)
699+
self.coordinator_id = None
701700

702701
def generation(self):
703702
"""Get the current generation state if the group is stable.
@@ -741,13 +740,13 @@ def __del__(self):
741740
def close(self):
742741
"""Close the coordinator, leave the current group,
743742
and reset local generation / member_id"""
744-
with self._lock:
743+
with self._client._lock, self._lock:
745744
self._close_heartbeat_thread()
746745
self.maybe_leave_group()
747746

748747
def maybe_leave_group(self):
749748
"""Leave the current group and reset local generation/memberId."""
750-
with self._lock:
749+
with self._client._lock, self._lock:
751750
if (not self.coordinator_unknown()
752751
and self.state is not MemberState.UNJOINED
753752
and self._generation is not Generation.NO_GENERATION):
@@ -939,40 +938,46 @@ def _run_once(self):
939938
self.disable()
940939
return
941940

942-
# TODO: When consumer.wakeup() is implemented, we need to
943-
# disable here to prevent propagating an exception to this
944-
# heartbeat thread
945-
self.coordinator._client.poll(timeout_ms=0)
946-
947-
if self.coordinator.coordinator_unknown():
948-
future = self.coordinator.lookup_coordinator()
949-
if not future.is_done or future.failed():
950-
# the immediate future check ensures that we backoff
951-
# properly in the case that no brokers are available
952-
# to connect to (and the future is automatically failed).
941+
# TODO: When consumer.wakeup() is implemented, we need to
942+
# disable here to prevent propagating an exception to this
943+
# heartbeat thread
944+
#
945+
# Release coordinator lock during client poll to avoid deadlocks
946+
# if/when connection errback needs coordinator lock
947+
self.coordinator._client.poll(timeout_ms=0)
948+
949+
if self.coordinator.coordinator_unknown():
950+
future = self.coordinator.lookup_coordinator()
951+
if not future.is_done or future.failed():
952+
# the immediate future check ensures that we backoff
953+
# properly in the case that no brokers are available
954+
# to connect to (and the future is automatically failed).
955+
with self.coordinator._lock:
953956
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
954957

955-
elif self.coordinator.heartbeat.session_timeout_expired():
956-
# the session timeout has expired without seeing a
957-
# successful heartbeat, so we should probably make sure
958-
# the coordinator is still healthy.
959-
log.debug('Heartbeat session expired, marking coordinator dead')
960-
self.coordinator.coordinator_dead('Heartbeat session expired')
961-
962-
elif self.coordinator.heartbeat.poll_timeout_expired():
963-
# the poll timeout has expired, which means that the
964-
# foreground thread has stalled in between calls to
965-
# poll(), so we explicitly leave the group.
966-
log.debug('Heartbeat poll expired, leaving group')
967-
self.coordinator.maybe_leave_group()
968-
969-
elif not self.coordinator.heartbeat.should_heartbeat():
970-
# poll again after waiting for the retry backoff in case
971-
# the heartbeat failed or the coordinator disconnected
972-
log.debug('Not ready to heartbeat, waiting')
958+
elif self.coordinator.heartbeat.session_timeout_expired():
959+
# the session timeout has expired without seeing a
960+
# successful heartbeat, so we should probably make sure
961+
# the coordinator is still healthy.
962+
log.warning('Heartbeat session expired, marking coordinator dead')
963+
self.coordinator.coordinator_dead('Heartbeat session expired')
964+
965+
elif self.coordinator.heartbeat.poll_timeout_expired():
966+
# the poll timeout has expired, which means that the
967+
# foreground thread has stalled in between calls to
968+
# poll(), so we explicitly leave the group.
969+
log.warning('Heartbeat poll expired, leaving group')
970+
self.coordinator.maybe_leave_group()
971+
972+
elif not self.coordinator.heartbeat.should_heartbeat():
973+
# poll again after waiting for the retry backoff in case
974+
# the heartbeat failed or the coordinator disconnected
975+
log.log(0, 'Not ready to heartbeat, waiting')
976+
with self.coordinator._lock:
973977
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
974978

975-
else:
979+
else:
980+
with self.coordinator._client._lock, self.coordinator._lock:
976981
self.coordinator.heartbeat.sent_heartbeat()
977982
future = self.coordinator._send_heartbeat_request()
978983
future.add_callback(self._handle_heartbeat_success)

0 commit comments

Comments
 (0)