Skip to content

Commit 5de666a

Browse files
authored
More coordinator / heartbeat logging (#2621)
1 parent 8f77cc8 commit 5de666a

File tree

1 file changed

+45
-20
lines changed

1 file changed

+45
-20
lines changed

kafka/coordinator/base.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def __eq__(self, other):
4444
self.member_id == other.member_id and
4545
self.protocol == other.protocol)
4646

47+
def __str__(self):
48+
return "<Generation %s (member_id: %s, protocol: %s)>" % (self.generation_id, self.member_id, self.protocol)
49+
4750

4851
Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)
4952

@@ -404,17 +407,16 @@ def _handle_join_success(self, member_assignment_bytes):
404407
# will be invoked even if the consumer is woken up before
405408
# finishing the rebalance
406409
with self._lock:
407-
log.info("Successfully joined group %s with generation %s",
408-
self.group_id, self._generation.generation_id)
409410
self.state = MemberState.STABLE
410411
if self._heartbeat_thread:
411412
self._heartbeat_thread.enable()
412413

413-
def _handle_join_failure(self, _):
414+
def _handle_join_failure(self, exception):
414415
# we handle failures below after the request finishes.
415416
# if the join completes after having been woken up,
416417
# the exception is ignored and we will rejoin
417418
with self._lock:
419+
log.info("Failed to join group %s: %s", self.group_id, exception)
418420
self.state = MemberState.UNJOINED
419421

420422
def ensure_active_group(self, timeout_ms=None):
@@ -572,10 +574,9 @@ def _failed_request(self, node_id, request, future, error):
572574
future.failure(error)
573575

574576
def _handle_join_group_response(self, future, send_time, response):
577+
log.debug("Received JoinGroup response: %s", response)
575578
error_type = Errors.for_code(response.error_code)
576579
if error_type is Errors.NoError:
577-
log.debug("Received successful JoinGroup response for group %s: %s",
578-
self.group_id, response)
579580
if self._sensors:
580581
self._sensors.join_latency.record((time.time() - send_time) * 1000)
581582
with self._lock:
@@ -589,6 +590,7 @@ def _handle_join_group_response(self, future, send_time, response):
589590
response.member_id,
590591
response.group_protocol)
591592

593+
log.info("Successfully joined group %s %s", self.group_id, self._generation)
592594
if response.leader_id == response.member_id:
593595
log.info("Elected group leader -- performing partition"
594596
" assignments using %s", self._generation.protocol)
@@ -597,24 +599,24 @@ def _handle_join_group_response(self, future, send_time, response):
597599
self._on_join_follower().chain(future)
598600

599601
elif error_type is Errors.CoordinatorLoadInProgressError:
600-
log.debug("Attempt to join group %s rejected since coordinator %s"
601-
" is loading the group.", self.group_id, self.coordinator_id)
602+
log.info("Attempt to join group %s rejected since coordinator %s"
603+
" is loading the group.", self.group_id, self.coordinator_id)
602604
# backoff and retry
603605
future.failure(error_type(response))
604606
elif error_type is Errors.UnknownMemberIdError:
605607
# reset the member id and retry immediately
606608
error = error_type(self._generation.member_id)
607609
self.reset_generation()
608-
log.debug("Attempt to join group %s failed due to unknown member id",
609-
self.group_id)
610+
log.info("Attempt to join group %s failed due to unknown member id",
611+
self.group_id)
610612
future.failure(error)
611613
elif error_type in (Errors.CoordinatorNotAvailableError,
612614
Errors.NotCoordinatorError):
613615
# re-discover the coordinator and retry with backoff
614616
self.coordinator_dead(error_type())
615-
log.debug("Attempt to join group %s failed due to obsolete "
616-
"coordinator information: %s", self.group_id,
617-
error_type.__name__)
617+
log.info("Attempt to join group %s failed due to obsolete "
618+
"coordinator information: %s", self.group_id,
619+
error_type.__name__)
618620
future.failure(error_type())
619621
elif error_type in (Errors.InconsistentGroupProtocolError,
620622
Errors.InvalidSessionTimeoutError,
@@ -625,12 +627,21 @@ def _handle_join_group_response(self, future, send_time, response):
625627
self.group_id, error)
626628
future.failure(error)
627629
elif error_type is Errors.GroupAuthorizationFailedError:
630+
log.error("Attempt to join group %s failed due to group authorization error",
631+
self.group_id)
628632
future.failure(error_type(self.group_id))
629633
elif error_type is Errors.MemberIdRequiredError:
630634
# Broker requires a concrete member id to be allowed to join the group. Update member id
631635
# and send another join group request in next cycle.
636+
log.info("Received member id %s for group %s; will retry join-group",
637+
response.member_id, self.group_id)
632638
self.reset_generation(response.member_id)
633639
future.failure(error_type())
640+
elif error_type is Errors.RebalanceInProgressError:
641+
log.info("Attempt to join group %s failed due to RebalanceInProgressError,"
642+
" which could indicate a replication timeout on the broker. Will retry.",
643+
self.group_id)
644+
future.failure(error_type())
634645
else:
635646
# unexpected error, throw the exception
636647
error = error_type()
@@ -699,6 +710,7 @@ def _send_sync_group_request(self, request):
699710
return future
700711

701712
def _handle_sync_group_response(self, future, send_time, response):
713+
log.debug("Received SyncGroup response: %s", response)
702714
error_type = Errors.for_code(response.error_code)
703715
if error_type is Errors.NoError:
704716
if self._sensors:
@@ -745,13 +757,13 @@ def _send_group_coordinator_request(self):
745757
e = Errors.NodeNotReadyError(node_id)
746758
return Future().failure(e)
747759

748-
log.debug("Sending group coordinator request for group %s to broker %s",
749-
self.group_id, node_id)
750760
version = self._client.api_version(FindCoordinatorRequest, max_version=2)
751761
if version == 0:
752762
request = FindCoordinatorRequest[version](self.group_id)
753763
else:
754764
request = FindCoordinatorRequest[version](self.group_id, 0)
765+
log.debug("Sending group coordinator request for group %s to broker %s: %s",
766+
self.group_id, node_id, request)
755767
future = Future()
756768
_f = self._client.send(node_id, request)
757769
_f.add_callback(self._handle_group_coordinator_response, future)
@@ -880,6 +892,7 @@ def maybe_leave_group(self, timeout_ms=None):
880892
log.info('Leaving consumer group (%s).', self.group_id)
881893
version = self._client.api_version(LeaveGroupRequest, max_version=2)
882894
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
895+
log.debug('Sending LeaveGroupRequest to %s: %s', self.coordinator_id, request)
883896
future = self._client.send(self.coordinator_id, request)
884897
future.add_callback(self._handle_leave_group_response)
885898
future.add_errback(log.error, "LeaveGroup request failed: %s")
@@ -888,10 +901,11 @@ def maybe_leave_group(self, timeout_ms=None):
888901
self.reset_generation()
889902

890903
def _handle_leave_group_response(self, response):
904+
log.debug("Received LeaveGroupResponse: %s", response)
891905
error_type = Errors.for_code(response.error_code)
892906
if error_type is Errors.NoError:
893-
log.debug("LeaveGroup request for group %s returned successfully",
894-
self.group_id)
907+
log.info("LeaveGroup request for group %s returned successfully",
908+
self.group_id)
895909
else:
896910
log.error("LeaveGroup request for group %s failed with error: %s",
897911
self.group_id, error_type())
@@ -911,7 +925,7 @@ def _send_heartbeat_request(self):
911925
request = HeartbeatRequest[version](self.group_id,
912926
self._generation.generation_id,
913927
self._generation.member_id)
914-
heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
928+
heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request)
915929
future = Future()
916930
_f = self._client.send(self.coordinator_id, request)
917931
_f.add_callback(self._handle_heartbeat_response, future, time.time())
@@ -922,10 +936,10 @@ def _send_heartbeat_request(self):
922936
def _handle_heartbeat_response(self, future, send_time, response):
923937
if self._sensors:
924938
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
939+
heartbeat_log.debug("Received heartbeat response for group %s: %s",
940+
self.group_id, response)
925941
error_type = Errors.for_code(response.error_code)
926942
if error_type is Errors.NoError:
927-
heartbeat_log.debug("Received successful heartbeat response for group %s",
928-
self.group_id)
929943
future.success(None)
930944
elif error_type in (Errors.CoordinatorNotAvailableError,
931945
Errors.NotCoordinatorError):
@@ -1118,7 +1132,13 @@ def _run_once(self):
11181132
# the poll timeout has expired, which means that the
11191133
# foreground thread has stalled in between calls to
11201134
# poll(), so we explicitly leave the group.
1121-
heartbeat_log.warning('Heartbeat poll expired, leaving group')
1135+
heartbeat_log.warning(
1136+
"Consumer poll timeout has expired. This means the time between subsequent calls to poll()"
1137+
" was longer than the configured max_poll_interval_ms, which typically implies that"
1138+
" the poll loop is spending too much time processing messages. You can address this"
1139+
" either by increasing max_poll_interval_ms or by reducing the maximum size of batches"
1140+
" returned in poll() with max_poll_records."
1141+
)
11221142
self.coordinator.maybe_leave_group()
11231143

11241144
elif not self.coordinator.heartbeat.should_heartbeat():
@@ -1128,10 +1148,12 @@ def _run_once(self):
11281148
self.coordinator._lock.wait(next_hb)
11291149

11301150
else:
1151+
heartbeat_log.debug('Sending heartbeat for group %s %s', self.coordinator.group_id, self.coordinator._generation)
11311152
self.coordinator.heartbeat.sent_heartbeat()
11321153
future = self.coordinator._send_heartbeat_request()
11331154
future.add_callback(self._handle_heartbeat_success)
11341155
future.add_errback(self._handle_heartbeat_failure)
1156+
11351157
finally:
11361158
self.coordinator._lock.release()
11371159
try:
@@ -1142,6 +1164,7 @@ def _run_once(self):
11421164

11431165
def _handle_heartbeat_success(self, result):
11441166
with self.coordinator._lock:
1167+
heartbeat_log.debug('Heartbeat success')
11451168
self.coordinator.heartbeat.received_heartbeat()
11461169

11471170
def _handle_heartbeat_failure(self, exception):
@@ -1152,8 +1175,10 @@ def _handle_heartbeat_failure(self, exception):
11521175
# member in the group for as long as the duration of the
11531176
# rebalance timeout. If we stop sending heartbeats, however,
11541177
# then the session timeout may expire before we can rejoin.
1178+
heartbeat_log.debug('Treating RebalanceInProgressError as successful heartbeat')
11551179
self.coordinator.heartbeat.received_heartbeat()
11561180
else:
1181+
heartbeat_log.debug('Heartbeat failure: %s', exception)
11571182
self.coordinator.heartbeat.fail_heartbeat()
11581183
# wake up the thread if it's sleeping to reschedule the heartbeat
11591184
self.coordinator._lock.notify()

0 commit comments

Comments
 (0)