Skip to content

Commit 71c10f4

Browse files
authored
Minor Heartbeat updates: catch more exceptions / log configuration / raise KafkaConfigurationError (#2618)
1 parent be22ee5 commit 71c10f4

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

kafka/coordinator/base.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -909,28 +909,28 @@ def _handle_heartbeat_response(self, future, send_time, response):
909909
error_type = Errors.for_code(response.error_code)
910910
if error_type is Errors.NoError:
911911
heartbeat_log.debug("Received successful heartbeat response for group %s",
912-
self.group_id)
912+
self.group_id)
913913
future.success(None)
914914
elif error_type in (Errors.CoordinatorNotAvailableError,
915915
Errors.NotCoordinatorError):
916916
heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)"
917-
" is either not started or not valid", self.group_id,
917+
" is either not started or not valid", self.group_id,
918918
self.coordinator())
919919
self.coordinator_dead(error_type())
920920
future.failure(error_type())
921921
elif error_type is Errors.RebalanceInProgressError:
922922
heartbeat_log.warning("Heartbeat failed for group %s because it is"
923-
" rebalancing", self.group_id)
923+
" rebalancing", self.group_id)
924924
self.request_rejoin()
925925
future.failure(error_type())
926926
elif error_type is Errors.IllegalGenerationError:
927927
heartbeat_log.warning("Heartbeat failed for group %s: generation id is not "
928-
" current.", self.group_id)
928+
" current.", self.group_id)
929929
self.reset_generation()
930930
future.failure(error_type())
931931
elif error_type is Errors.UnknownMemberIdError:
932932
heartbeat_log.warning("Heartbeat: local member_id was not recognized;"
933-
" this consumer needs to re-join")
933+
" this consumer needs to re-join")
934934
self.reset_generation()
935935
future.failure(error_type)
936936
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -1038,16 +1038,16 @@ def close(self, timeout_ms=None):
10381038

10391039
def run(self):
10401040
try:
1041-
heartbeat_log.debug('Heartbeat thread started')
1041+
heartbeat_log.debug('Heartbeat thread started: %s', self.coordinator.heartbeat)
10421042
while not self.closed:
10431043
self._run_once()
10441044

10451045
except ReferenceError:
10461046
heartbeat_log.debug('Heartbeat thread closed due to coordinator gc')
10471047

1048-
except RuntimeError as e:
1049-
heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
1050-
self.coordinator.group_id, e)
1048+
except Exception as e:
1049+
heartbeat_log.exception("Heartbeat thread for group %s failed due to unexpected error: %s",
1050+
self.coordinator.group_id, e)
10511051
self.failed = e
10521052

10531053
finally:

kafka/coordinator/heartbeat.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from __future__ import absolute_import, division
22

33
import copy
4+
import logging
45
import time
56

7+
from kafka.errors import KafkaConfigurationError
8+
9+
log = logging.getLogger(__name__)
10+
611

712
class Heartbeat(object):
813
DEFAULT_CONFIG = {
@@ -20,9 +25,13 @@ def __init__(self, **configs):
2025
self.config[key] = configs[key]
2126

2227
if self.config['group_id'] is not None:
23-
assert (self.config['heartbeat_interval_ms']
24-
<= self.config['session_timeout_ms']), (
25-
'Heartbeat interval must be lower than the session timeout')
28+
if self.config['heartbeat_interval_ms'] >= self.config['session_timeout_ms']:
29+
raise KafkaConfigurationError('Heartbeat interval must be lower than the session timeout (%s v %s)' % (
30+
self.config['heartbeat_interval_ms'], self.config['session_timeout_ms']))
31+
if self.config['heartbeat_interval_ms'] > (self.config['session_timeout_ms'] / 3):
32+
log.warning('heartbeat_interval_ms is high relative to session_timeout_ms (%s v %s).'
33+
' Recommend heartbeat interval less than 1/3rd of session timeout',
34+
self.config['heartbeat_interval_ms'], self.config['session_timeout_ms'])
2635

2736
self.last_send = -1 * float('inf')
2837
self.last_receive = -1 * float('inf')
@@ -66,3 +75,10 @@ def reset_timeouts(self):
6675

6776
def poll_timeout_expired(self):
6877
return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000)
78+
79+
def __str__(self):
80+
return ("<Heartbeat group_id={group_id}"
81+
" heartbeat_interval_ms={heartbeat_interval_ms}"
82+
" session_timeout_ms={session_timeout_ms}"
83+
" max_poll_interval_ms={max_poll_interval_ms}"
84+
" retry_backoff_ms={retry_backoff_ms}>").format(**self.config)

0 commit comments

Comments
 (0)