Skip to content

Commit a2b5ddc

Browse files
authored
Improve error message when expiring batches in KafkaProducer (#1077)
1 parent 83617b9 commit a2b5ddc

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

kafka/producer/record_accumulator.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,19 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
101101
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
102102
timeout = request_timeout_ms / 1000.0
103103

104-
if ((not self.in_retry() and is_full and timeout < since_append) or
105-
(not self.in_retry() and timeout < since_ready) or
106-
(self.in_retry() and timeout < since_backoff)):
107-
104+
error = None
105+
if not self.in_retry() and is_full and timeout < since_append:
106+
error = "%d ms has passed since last append" % since_append
107+
elif not self.in_retry() and timeout < since_ready:
108+
error = "%d ms has passed since batch creation plus linger time" % since_ready
109+
elif self.in_retry() and timeout < since_backoff:
110+
error = "%d ms has passed since last attempt plus backoff time" % since_backoff
111+
112+
if error:
108113
self.records.close()
109114
self.done(-1, None, Errors.KafkaTimeoutError(
110-
"Batch containing %s record(s) expired due to timeout while"
111-
" requesting metadata from brokers for %s", self.record_count,
112-
self.topic_partition))
115+
"Batch for %s containing %s record(s) expired: %s" % (
116+
self.topic_partition, self.record_count, error)))
113117
return True
114118
return False
115119

0 commit comments

Comments
 (0)