Skip to content

Do not reset_generation after RebalanceInProgressError; improve CommitFailed error messages #2614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,18 +614,19 @@ def _send_offset_commit_request(self, offsets):
for tp, offset in six.iteritems(offsets):
offset_data[tp.topic][tp.partition] = offset

if self._subscription.partitions_auto_assigned():
generation = self.generation() or Generation.NO_GENERATION
version = self._client.api_version(OffsetCommitRequest, max_version=6)
if version > 1 and self._subscription.partitions_auto_assigned():
generation = self.generation()
else:
generation = Generation.NO_GENERATION

# if the generation is None, we are not part of an active group
# (and we expect to be). The only thing we can do is fail the commit
# and let the user rejoin the group in poll()
if self.config['api_version'] >= (0, 9) and generation is None:
return Future().failure(Errors.CommitFailedError())
if generation is None:
log.info("Failing OffsetCommit request since the consumer is not part of an active group")
return Future().failure(Errors.CommitFailedError('Group rebalance in progress'))

version = self._client.api_version(OffsetCommitRequest, max_version=6)
if version == 0:
request = OffsetCommitRequest[version](
self.group_id,
Expand Down Expand Up @@ -747,13 +748,22 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
self.coordinator_dead(error_type())
future.failure(error_type(self.group_id))
return
elif error_type is Errors.RebalanceInProgressError:
# Consumer never tries to commit offset in between join-group and sync-group,
# and hence on broker-side it is not expected to see a commit offset request
# during CompletingRebalance phase; if it ever happens then broker would return
# this error. In this case we should just treat as a fatal CommitFailed exception.
# However, we do not need to reset generations and just request re-join, such that
# if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
self.request_rejoin()
future.failure(Errors.CommitFailedError('Group rebalance in progress'))
return
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError,
Errors.RebalanceInProgressError):
# need to re-join group
Errors.IllegalGenerationError):
# need reset generation and re-join group
error = error_type(self.group_id)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error)
log.warning("OffsetCommit for group %s failed: %s",
self.group_id, error)
self.reset_generation()
future.failure(Errors.CommitFailedError())
return
Expand Down
24 changes: 12 additions & 12 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ class Cancelled(KafkaError):


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
super(CommitFailedError, self).__init__(
"""Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
""", *args, **kwargs)
def __init__(self, *args):
if not args:
args = ("Commit cannot be completed since the group has already"
" rebalanced and assigned the partitions to another member."
" This means that the time between subsequent calls to poll()"
" was longer than the configured max_poll_interval_ms, which"
" typically implies that the poll loop is spending too much"
" time message processing. You can address this either by"
" increasing the rebalance timeout with max_poll_interval_ms,"
" or by reducing the maximum size of batches returned in poll()"
" with max_poll_records.",)
super(CommitFailedError, self).__init__(*args)


class IllegalArgumentError(KafkaError):
Expand Down
Loading