Skip to content

Remove unused partial KIP-467 implementation (ProduceResponse batch error details) #2524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,19 @@ def try_append(self, timestamp_ms, key, value, headers):
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future

def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
level = logging.DEBUG if exception is None else logging.WARNING
log.log(level, "Produced messages to topic-partition %s with base offset"
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
log_start_offset, global_error) # trace
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None):
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
log.debug("Produced messages to topic-partition %s with base offset"
" %s log start offset %s.", self.topic_partition, base_offset,
log_start_offset) # trace
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
else:
log.warning("Failed to produce messages to topic-partition %s with base offset"
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
log_start_offset, exception) # trace
self.produce_future.failure(exception)

def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
Expand Down Expand Up @@ -109,7 +111,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)

if error:
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
self.done(base_offset=-1, exception=Errors.KafkaTimeoutError(
"Batch for %s containing %s record(s) expired: %s" % (
self.topic_partition, self.records.next_offset(), error)))
return True
Expand Down
20 changes: 9 additions & 11 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def add_topic(self, topic):
def _failed_produce(self, batches, node_id, error):
log.error("Error sending produce request to node %d: %s", node_id, error) # trace
for batch in batches:
self._complete_batch(batch, error, -1, None)
self._complete_batch(batch, error, -1)

def _handle_produce_response(self, node_id, send_time, batches, response):
"""Handle a produce response."""
Expand All @@ -194,7 +194,6 @@ def _handle_produce_response(self, node_id, send_time, batches, response):

for topic, partitions in response.topics:
for partition_info in partitions:
global_error = None
log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
Expand All @@ -204,28 +203,27 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
elif 5 <= response.API_VERSION <= 7:
partition, error_code, offset, ts, log_start_offset = partition_info
else:
# the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
# Currently unused / TODO: KIP-467
partition, error_code, offset, ts, log_start_offset, _record_errors, _global_error = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
self._complete_batch(batch, error, offset, timestamp_ms=ts, log_start_offset=log_start_offset)

else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
self._complete_batch(batch, None, -1, None)
self._complete_batch(batch, None, -1)

def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None):
"""Complete or retry the given batch of records.

Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
log_start_offset (int): The start offset of the log at the time this produce response was created
global_error (str): The summarising error message
log_start_offset (int, optional): The start offset of the log at the time this produce response was created
"""
# Standardize no-error to None
if error is Errors.NoError:
Expand All @@ -237,15 +235,15 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
" retrying (%d attempts left). Error: %s",
batch.topic_partition,
self.config['retries'] - batch.attempts - 1,
global_error or error)
error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)

# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
batch.done(base_offset, timestamp_ms, error, log_start_offset)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
Expand Down