Skip to content

Commit 1c71dfc

Browse files
authored
Always acquire client lock before coordinator lock to avoid deadlocks (#1464)
1 parent e23676d commit 1c71dfc

File tree

1 file changed

+64
-59
lines changed

1 file changed

+64
-59
lines changed

kafka/coordinator/base.py

Lines changed: 64 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -231,20 +231,19 @@ def coordinator(self):
231231
232232
Returns: the current coordinator id or None if it is unknown
233233
"""
234-
with self._lock:
235-
if self.coordinator_id is None:
236-
return None
237-
elif self._client.is_disconnected(self.coordinator_id):
238-
self.coordinator_dead('Node Disconnected')
239-
return None
240-
else:
241-
return self.coordinator_id
234+
if self.coordinator_id is None:
235+
return None
236+
elif self._client.is_disconnected(self.coordinator_id):
237+
self.coordinator_dead('Node Disconnected')
238+
return None
239+
else:
240+
return self.coordinator_id
242241

243242
def ensure_coordinator_ready(self):
244243
"""Block until the coordinator for this group is known
245244
(and we have an active connection -- java client uses unsent queue).
246245
"""
247-
with self._lock:
246+
with self._client._lock, self._lock:
248247
while self.coordinator_unknown():
249248

250249
# Prior to 0.8.2 there was no group coordinator
@@ -274,17 +273,18 @@ def _reset_find_coordinator_future(self, result):
274273
self._find_coordinator_future = None
275274

276275
def lookup_coordinator(self):
277-
if self._find_coordinator_future is not None:
278-
return self._find_coordinator_future
279-
280-
# If there is an error sending the group coordinator request
281-
# then _reset_find_coordinator_future will immediately fire and
282-
# set _find_coordinator_future = None
283-
# To avoid returning None, we capture the future in a local variable
284-
self._find_coordinator_future = self._send_group_coordinator_request()
285-
future = self._find_coordinator_future
286-
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
287-
return future
276+
with self._client._lock, self._lock:
277+
if self._find_coordinator_future is not None:
278+
return self._find_coordinator_future
279+
280+
# If there is an error sending the group coordinator request
281+
# then _reset_find_coordinator_future will immediately fire and
282+
# set _find_coordinator_future = None
283+
# To avoid returning None, we capture the future in a local variable
284+
future = self._send_group_coordinator_request()
285+
self._find_coordinator_future = future
286+
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
287+
return future
288288

289289
def need_rejoin(self):
290290
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -487,7 +487,7 @@ def _handle_join_group_response(self, future, send_time, response):
487487
log.debug("Received successful JoinGroup response for group %s: %s",
488488
self.group_id, response)
489489
self.sensors.join_latency.record((time.time() - send_time) * 1000)
490-
with self._lock:
490+
with self._client._lock, self._lock:
491491
if self.state is not MemberState.REBALANCING:
492492
# if the consumer was woken up before a rebalance completes,
493493
# we may have already left the group. In this case, we do
@@ -663,7 +663,7 @@ def _handle_group_coordinator_response(self, future, response):
663663

664664
error_type = Errors.for_code(response.error_code)
665665
if error_type is Errors.NoError:
666-
with self._lock:
666+
with self._client._lock, self._lock:
667667
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
668668
if not ok:
669669
# This could happen if coordinator metadata is different
@@ -693,11 +693,10 @@ def _handle_group_coordinator_response(self, future, response):
693693

694694
def coordinator_dead(self, error):
695695
"""Mark the current coordinator as dead."""
696-
with self._lock:
697-
if self.coordinator_id is not None:
698-
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
699-
self.coordinator_id, self.group_id, error)
700-
self.coordinator_id = None
696+
if self.coordinator_id is not None:
697+
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
698+
self.coordinator_id, self.group_id, error)
699+
self.coordinator_id = None
701700

702701
def generation(self):
703702
"""Get the current generation state if the group is stable.
@@ -741,13 +740,13 @@ def __del__(self):
741740
def close(self):
742741
"""Close the coordinator, leave the current group,
743742
and reset local generation / member_id"""
744-
with self._lock:
743+
with self._client._lock, self._lock:
745744
self._close_heartbeat_thread()
746745
self.maybe_leave_group()
747746

748747
def maybe_leave_group(self):
749748
"""Leave the current group and reset local generation/memberId."""
750-
with self._lock:
749+
with self._client._lock, self._lock:
751750
if (not self.coordinator_unknown()
752751
and self.state is not MemberState.UNJOINED
753752
and self._generation is not Generation.NO_GENERATION):
@@ -941,40 +940,46 @@ def _run_once(self):
941940
self.disable()
942941
return
943942

944-
# TODO: When consumer.wakeup() is implemented, we need to
945-
# disable here to prevent propagating an exception to this
946-
# heartbeat thread
947-
self.coordinator._client.poll(timeout_ms=0)
948-
949-
if self.coordinator.coordinator_unknown():
950-
future = self.coordinator.lookup_coordinator()
951-
if not future.is_done or future.failed():
952-
# the immediate future check ensures that we backoff
953-
# properly in the case that no brokers are available
954-
# to connect to (and the future is automatically failed).
943+
# TODO: When consumer.wakeup() is implemented, we need to
944+
# disable here to prevent propagating an exception to this
945+
# heartbeat thread
946+
#
947+
# Release coordinator lock during client poll to avoid deadlocks
948+
# if/when connection errback needs coordinator lock
949+
self.coordinator._client.poll(timeout_ms=0)
950+
951+
if self.coordinator.coordinator_unknown():
952+
future = self.coordinator.lookup_coordinator()
953+
if not future.is_done or future.failed():
954+
# the immediate future check ensures that we backoff
955+
# properly in the case that no brokers are available
956+
# to connect to (and the future is automatically failed).
957+
with self.coordinator._lock:
955958
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
956959

957-
elif self.coordinator.heartbeat.session_timeout_expired():
958-
# the session timeout has expired without seeing a
959-
# successful heartbeat, so we should probably make sure
960-
# the coordinator is still healthy.
961-
log.warning('Heartbeat session expired, marking coordinator dead')
962-
self.coordinator.coordinator_dead('Heartbeat session expired')
963-
964-
elif self.coordinator.heartbeat.poll_timeout_expired():
965-
# the poll timeout has expired, which means that the
966-
# foreground thread has stalled in between calls to
967-
# poll(), so we explicitly leave the group.
968-
log.warning('Heartbeat poll expired, leaving group')
969-
self.coordinator.maybe_leave_group()
970-
971-
elif not self.coordinator.heartbeat.should_heartbeat():
972-
# poll again after waiting for the retry backoff in case
973-
# the heartbeat failed or the coordinator disconnected
974-
log.log(0, 'Not ready to heartbeat, waiting')
960+
elif self.coordinator.heartbeat.session_timeout_expired():
961+
# the session timeout has expired without seeing a
962+
# successful heartbeat, so we should probably make sure
963+
# the coordinator is still healthy.
964+
log.warning('Heartbeat session expired, marking coordinator dead')
965+
self.coordinator.coordinator_dead('Heartbeat session expired')
966+
967+
elif self.coordinator.heartbeat.poll_timeout_expired():
968+
# the poll timeout has expired, which means that the
969+
# foreground thread has stalled in between calls to
970+
# poll(), so we explicitly leave the group.
971+
log.warning('Heartbeat poll expired, leaving group')
972+
self.coordinator.maybe_leave_group()
973+
974+
elif not self.coordinator.heartbeat.should_heartbeat():
975+
# poll again after waiting for the retry backoff in case
976+
# the heartbeat failed or the coordinator disconnected
977+
log.log(0, 'Not ready to heartbeat, waiting')
978+
with self.coordinator._lock:
975979
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
976980

977-
else:
981+
else:
982+
with self.coordinator._client._lock, self.coordinator._lock:
978983
self.coordinator.heartbeat.sent_heartbeat()
979984
future = self.coordinator._send_heartbeat_request()
980985
future.add_callback(self._handle_heartbeat_success)

0 commit comments

Comments
 (0)