Skip to content

Commit 153f620

Browse files
dpkp88manpreet
authored andcommitted
Improve error message when expiring batches in KafkaProducer (dpkp#1077)
1 parent c603eb2 commit 153f620

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

kafka/producer/record_accumulator.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,19 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
9797
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
9898
timeout = request_timeout_ms / 1000.0
9999

100-
if ((not self.in_retry() and is_full and timeout < since_append) or
101-
(not self.in_retry() and timeout < since_ready) or
102-
(self.in_retry() and timeout < since_backoff)):
103-
100+
error = None
101+
if not self.in_retry() and is_full and timeout < since_append:
102+
error = "%d ms has passed since last append" % since_append
103+
elif not self.in_retry() and timeout < since_ready:
104+
error = "%d ms has passed since batch creation plus linger time" % since_ready
105+
elif self.in_retry() and timeout < since_backoff:
106+
error = "%d ms has passed since last attempt plus backoff time" % since_backoff
107+
108+
if error:
104109
self.records.close()
105110
self.done(-1, None, Errors.KafkaTimeoutError(
106-
"Batch containing %s record(s) expired due to timeout while"
107-
" requesting metadata from brokers for %s", self.record_count,
108-
self.topic_partition))
111+
"Batch for %s containing %s record(s) expired: %s" % (
112+
self.topic_partition, self.record_count, error)))
109113
return True
110114
return False
111115

0 commit comments

Comments
 (0)