Skip to content

Commit 590ef93

Browse files
authored
Fix producer busy loop with no pending batches (#2616)
1 parent fef828f commit 590ef93

File tree

3 files changed

+22
-5
lines changed

3 files changed

+22
-5
lines changed

kafka/producer/record_accumulator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None):
328328
finally:
329329
self._appends_in_progress.decrement()
330330

331+
def reset_next_batch_expiry_time(self):
332+
self._next_batch_expiry_time_ms = float('inf')
333+
331334
def maybe_update_next_batch_expiry_time(self, batch):
332335
self._next_batch_expiry_time_ms = min(self._next_batch_expiry_time_ms, batch.created * 1000 + self.delivery_timeout_ms)
333336

kafka/producer/sender.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _maybe_remove_from_inflight_batches(self, batch):
7777
queue.pop()
7878
heapq.heapify(queue)
7979

80-
def _get_expired_inflight_batches(self):
80+
def _get_expired_inflight_batches(self, now=None):
8181
"""Get the in-flight batches that has reached delivery timeout."""
8282
expired_batches = []
8383
to_remove = []
@@ -174,7 +174,7 @@ def run_once(self):
174174
def _send_producer_data(self, now=None):
175175
now = time.time() if now is None else now
176176
# get the list of partitions with data ready to send
177-
result = self._accumulator.ready(self._metadata)
177+
result = self._accumulator.ready(self._metadata, now=now)
178178
ready_nodes, next_ready_check_delay, unknown_leaders_exist = result
179179

180180
# if there are any partitions whose leaders are not known yet, force
@@ -195,7 +195,7 @@ def _send_producer_data(self, now=None):
195195

196196
# create produce requests
197197
batches_by_node = self._accumulator.drain(
198-
self._metadata, ready_nodes, self.config['max_request_size'])
198+
self._metadata, ready_nodes, self.config['max_request_size'], now=now)
199199

200200
for batch_list in six.itervalues(batches_by_node):
201201
for batch in batch_list:
@@ -209,8 +209,9 @@ def _send_producer_data(self, now=None):
209209
for batch in batch_list:
210210
self._accumulator.muted.add(batch.topic_partition)
211211

212-
expired_batches = self._accumulator.expired_batches()
213-
expired_batches.extend(self._get_expired_inflight_batches())
212+
self._accumulator.reset_next_batch_expiry_time()
213+
expired_batches = self._accumulator.expired_batches(now=now)
214+
expired_batches.extend(self._get_expired_inflight_batches(now=now))
214215

215216
if expired_batches:
216217
log.debug("%s: Expired %s batches in accumulator", str(self), len(expired_batches))

test/test_sender.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,3 +240,16 @@ def test_maybe_wait_for_producer_id():
240240

241241
def test_run_once():
242242
pass
243+
244+
245+
def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker):
246+
now = time.time()
247+
tp = TopicPartition('foo', 0)
248+
mocker.patch.object(sender, '_failed_produce')
249+
result = accumulator.append(tp, 0, b'key', b'value', [], now=now)
250+
poll_timeout_ms = sender._send_producer_data(now=now)
251+
assert poll_timeout_ms == accumulator.config['delivery_timeout_ms']
252+
sender._failed_produce.assert_not_called()
253+
now += accumulator.config['delivery_timeout_ms']
254+
poll_timeout_ms = sender._send_producer_data(now=now)
255+
assert poll_timeout_ms > 0

0 commit comments

Comments
 (0)