Skip to content

Commit 16c88db

Browse files
Andrew Kowalik88manpreet
authored andcommitted
raise KafkaTimeoutException when flush times out
1 parent 97f25b8 commit 16c88db

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

kafka/producer/kafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,10 @@ def flush(self, timeout=None):
548548
549549
Arguments:
550550
timeout (float, optional): timeout in seconds to wait for completion.
551+
552+
Raises:
553+
KafkaTimeoutError: failure to flush buffered records within the
554+
provided timeout
551555
"""
552556
log.debug("Flushing accumulated records in producer.") # trace
553557
self._accumulator.begin_flush()

kafka/producer/record_accumulator.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,11 @@ def await_flush_completion(self, timeout=None):
522522
for batch in self._incomplete.all():
523523
log.debug('Waiting on produce to %s',
524524
batch.produce_future.topic_partition)
525-
assert batch.produce_future.wait(timeout=timeout), 'Timeout waiting for future'
526-
assert batch.produce_future.is_done, 'Future not done?'
525+
if not batch.produce_future.wait(timeout=timeout):
526+
raise Errors.KafkaTimeoutError('Timeout waiting for future')
527+
if not batch.produce_future.is_done:
528+
raise Errors.UnknownError('Future not done')
529+
527530
if batch.produce_future.failed():
528531
log.warning(batch.produce_future.exception)
529532
finally:

0 commit comments

Comments
 (0)