Skip to content

Commit 32a9285

Browse files
authored
Only disable heartbeat thread once at beginning of join-group (#2617)
1 parent bf77e9a commit 32a9285

File tree

1 file changed

+32
-30
lines changed

1 file changed

+32
-30
lines changed

kafka/coordinator/base.py

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from kafka.util import Timer
2020

2121
log = logging.getLogger('kafka.coordinator')
22+
heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat')
2223

2324

2425
class MemberState(object):
@@ -449,11 +450,12 @@ def join_group(self, timeout_ms=None):
449450
timeout_ms=timer.timeout_ms)
450451
self.rejoining = True
451452

452-
# fence off the heartbeat thread explicitly so that it cannot
453-
# interfere with the join group. # Note that this must come after
454-
# the call to onJoinPrepare since we must be able to continue
455-
# sending heartbeats if that callback takes some time.
456-
self._disable_heartbeat_thread()
453+
# fence off the heartbeat thread explicitly so that it cannot
454+
# interfere with the join group. # Note that this must come after
455+
# the call to onJoinPrepare since we must be able to continue
456+
# sending heartbeats if that callback takes some time.
457+
log.debug("Disabling heartbeat thread during join-group")
458+
self._disable_heartbeat_thread()
457459

458460
# ensure that there are no pending requests to the coordinator.
459461
# This is important in particular to avoid resending a pending
@@ -779,7 +781,7 @@ def _handle_group_coordinator_response(self, future, response):
779781
future.failure(error)
780782
else:
781783
error = error_type()
782-
log.error("Group coordinator lookup for group %s failed: %s",
784+
log.error("Group Coordinator lookup for group %s failed: %s",
783785
self.group_id, error)
784786
future.failure(error)
785787

@@ -815,11 +817,11 @@ def _start_heartbeat_thread(self):
815817
raise Errors.UnsupportedVersionError('Heartbeat APIs require 0.9+ broker')
816818
with self._lock:
817819
if self._heartbeat_thread is None:
818-
log.info('Starting new heartbeat thread')
820+
heartbeat_log.info('Starting new heartbeat thread')
819821
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
820822
self._heartbeat_thread.daemon = True
821823
self._heartbeat_thread.start()
822-
log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident)
824+
heartbeat_log.debug("Started heartbeat thread %s", self._heartbeat_thread.ident)
823825

824826
def _disable_heartbeat_thread(self):
825827
with self._lock:
@@ -829,7 +831,7 @@ def _disable_heartbeat_thread(self):
829831
def _close_heartbeat_thread(self, timeout_ms=None):
830832
with self._lock:
831833
if self._heartbeat_thread is not None:
832-
log.info('Stopping heartbeat thread')
834+
heartbeat_log.info('Stopping heartbeat thread')
833835
try:
834836
self._heartbeat_thread.close(timeout_ms=timeout_ms)
835837
except ReferenceError:
@@ -893,7 +895,7 @@ def _send_heartbeat_request(self):
893895
request = HeartbeatRequest[version](self.group_id,
894896
self._generation.generation_id,
895897
self._generation.member_id)
896-
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
898+
heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
897899
future = Future()
898900
_f = self._client.send(self.coordinator_id, request)
899901
_f.add_callback(self._handle_heartbeat_response, future, time.time())
@@ -906,38 +908,38 @@ def _handle_heartbeat_response(self, future, send_time, response):
906908
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
907909
error_type = Errors.for_code(response.error_code)
908910
if error_type is Errors.NoError:
909-
log.debug("Received successful heartbeat response for group %s",
911+
heartbeat_log.debug("Received successful heartbeat response for group %s",
910912
self.group_id)
911913
future.success(None)
912914
elif error_type in (Errors.CoordinatorNotAvailableError,
913915
Errors.NotCoordinatorError):
914-
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
916+
heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)"
915917
" is either not started or not valid", self.group_id,
916918
self.coordinator())
917919
self.coordinator_dead(error_type())
918920
future.failure(error_type())
919921
elif error_type is Errors.RebalanceInProgressError:
920-
log.warning("Heartbeat failed for group %s because it is"
922+
heartbeat_log.warning("Heartbeat failed for group %s because it is"
921923
" rebalancing", self.group_id)
922924
self.request_rejoin()
923925
future.failure(error_type())
924926
elif error_type is Errors.IllegalGenerationError:
925-
log.warning("Heartbeat failed for group %s: generation id is not "
927+
heartbeat_log.warning("Heartbeat failed for group %s: generation id is not "
926928
" current.", self.group_id)
927929
self.reset_generation()
928930
future.failure(error_type())
929931
elif error_type is Errors.UnknownMemberIdError:
930-
log.warning("Heartbeat: local member_id was not recognized;"
932+
heartbeat_log.warning("Heartbeat: local member_id was not recognized;"
931933
" this consumer needs to re-join")
932934
self.reset_generation()
933935
future.failure(error_type)
934936
elif error_type is Errors.GroupAuthorizationFailedError:
935937
error = error_type(self.group_id)
936-
log.error("Heartbeat failed: authorization error: %s", error)
938+
heartbeat_log.error("Heartbeat failed: authorization error: %s", error)
937939
future.failure(error)
938940
else:
939941
error = error_type()
940-
log.error("Heartbeat failed: Unhandled error: %s", error)
942+
heartbeat_log.error("Heartbeat failed: Unhandled error: %s", error)
941943
future.failure(error)
942944

943945

@@ -1003,14 +1005,14 @@ def __init__(self, coordinator):
10031005

10041006
def enable(self):
10051007
with self.coordinator._lock:
1006-
log.debug('Enabling heartbeat thread')
1008+
heartbeat_log.debug('Enabling heartbeat thread')
10071009
self.enabled = True
10081010
self.coordinator.heartbeat.reset_timeouts()
10091011
self.coordinator._lock.notify()
10101012

10111013
def disable(self):
10121014
with self.coordinator._lock:
1013-
log.debug('Disabling heartbeat thread')
1015+
heartbeat_log.debug('Disabling heartbeat thread')
10141016
self.enabled = False
10151017

10161018
def close(self, timeout_ms=None):
@@ -1032,24 +1034,24 @@ def close(self, timeout_ms=None):
10321034
timeout_ms = self.coordinator.config['heartbeat_interval_ms']
10331035
self.join(timeout_ms / 1000)
10341036
if self.is_alive():
1035-
log.warning("Heartbeat thread did not fully terminate during close")
1037+
heartbeat_log.warning("Heartbeat thread did not fully terminate during close")
10361038

10371039
def run(self):
10381040
try:
1039-
log.debug('Heartbeat thread started')
1041+
heartbeat_log.debug('Heartbeat thread started')
10401042
while not self.closed:
10411043
self._run_once()
10421044

10431045
except ReferenceError:
1044-
log.debug('Heartbeat thread closed due to coordinator gc')
1046+
heartbeat_log.debug('Heartbeat thread closed due to coordinator gc')
10451047

10461048
except RuntimeError as e:
1047-
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
1049+
heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
10481050
self.coordinator.group_id, e)
10491051
self.failed = e
10501052

10511053
finally:
1052-
log.debug('Heartbeat thread closed')
1054+
heartbeat_log.debug('Heartbeat thread closed')
10531055

10541056
def _run_once(self):
10551057
with self.coordinator._client._lock, self.coordinator._lock:
@@ -1063,16 +1065,16 @@ def _run_once(self):
10631065

10641066
with self.coordinator._lock:
10651067
if not self.enabled:
1066-
log.debug('Heartbeat disabled. Waiting')
1068+
heartbeat_log.debug('Heartbeat disabled. Waiting')
10671069
self.coordinator._lock.wait()
1068-
log.debug('Heartbeat re-enabled.')
1070+
heartbeat_log.debug('Heartbeat re-enabled.')
10691071
return
10701072

10711073
if self.coordinator.state is not MemberState.STABLE:
10721074
# the group is not stable (perhaps because we left the
10731075
# group or because the coordinator kicked us out), so
10741076
# disable heartbeats and wait for the main thread to rejoin.
1075-
log.debug('Group state is not stable, disabling heartbeats')
1077+
heartbeat_log.debug('Group state is not stable, disabling heartbeats')
10761078
self.disable()
10771079
return
10781080

@@ -1088,14 +1090,14 @@ def _run_once(self):
10881090
# the session timeout has expired without seeing a
10891091
# successful heartbeat, so we should probably make sure
10901092
# the coordinator is still healthy.
1091-
log.warning('Heartbeat session expired, marking coordinator dead')
1093+
heartbeat_log.warning('Heartbeat session expired, marking coordinator dead')
10921094
self.coordinator.coordinator_dead('Heartbeat session expired')
10931095

10941096
elif self.coordinator.heartbeat.poll_timeout_expired():
10951097
# the poll timeout has expired, which means that the
10961098
# foreground thread has stalled in between calls to
10971099
# poll(), so we explicitly leave the group.
1098-
log.warning('Heartbeat poll expired, leaving group')
1100+
heartbeat_log.warning('Heartbeat poll expired, leaving group')
10991101
### XXX
11001102
# maybe_leave_group acquires client + coordinator lock;
11011103
# if we hold coordinator lock before calling, we risk deadlock
@@ -1106,7 +1108,7 @@ def _run_once(self):
11061108
elif not self.coordinator.heartbeat.should_heartbeat():
11071109
# poll again after waiting for the retry backoff in case
11081110
# the heartbeat failed or the coordinator disconnected
1109-
log.log(0, 'Not ready to heartbeat, waiting')
1111+
heartbeat_log.log(0, 'Not ready to heartbeat, waiting')
11101112
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
11111113

11121114
else:

0 commit comments

Comments
 (0)