Skip to content

Commit 592ada9

Browse files
committed
asyncio: Improve canceled timer handles cleanup. Closes issue #22448.
Patch by Joshua Moore-Oliva.
1 parent 8c0e0ab commit 592ada9

File tree

5 files changed

+159
-26
lines changed

5 files changed

+159
-26
lines changed

Lib/asyncio/base_events.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@
4040
# Argument for default thread pool executor creation.
4141
_MAX_WORKERS = 5
4242

43+
# Minimum number of _scheduled timer handles before cleanup of
44+
# cancelled handles is performed.
45+
_MIN_SCHEDULED_TIMER_HANDLES = 100
46+
47+
# Minimum fraction of _scheduled timer handles that are cancelled
48+
# before cleanup of cancelled handles is performed.
49+
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
4350

4451
def _format_handle(handle):
4552
cb = handle._callback
@@ -145,6 +152,7 @@ def wait_closed(self):
145152
class BaseEventLoop(events.AbstractEventLoop):
146153

147154
def __init__(self):
155+
self._timer_cancelled_count = 0
148156
self._closed = False
149157
self._ready = collections.deque()
150158
self._scheduled = []
@@ -349,6 +357,7 @@ def call_at(self, when, callback, *args):
349357
if timer._source_traceback:
350358
del timer._source_traceback[-1]
351359
heapq.heappush(self._scheduled, timer)
360+
timer._scheduled = True
352361
return timer
353362

354363
def call_soon(self, callback, *args):
@@ -964,26 +973,46 @@ def _add_callback(self, handle):
964973
assert isinstance(handle, events.Handle), 'A Handle is required here'
965974
if handle._cancelled:
966975
return
967-
if isinstance(handle, events.TimerHandle):
968-
heapq.heappush(self._scheduled, handle)
969-
else:
970-
self._ready.append(handle)
976+
assert not isinstance(handle, events.TimerHandle)
977+
self._ready.append(handle)
971978

972979
def _add_callback_signalsafe(self, handle):
973980
"""Like _add_callback() but called from a signal handler."""
974981
self._add_callback(handle)
975982
self._write_to_self()
976983

984+
def _timer_handle_cancelled(self, handle):
985+
"""Notification that a TimerHandle has been cancelled."""
986+
if handle._scheduled:
987+
self._timer_cancelled_count += 1
988+
977989
def _run_once(self):
978990
"""Run one full iteration of the event loop.
979991
980992
This calls all currently ready callbacks, polls for I/O,
981993
schedules the resulting callbacks, and finally schedules
982994
'call_later' callbacks.
983995
"""
984-
# Remove delayed calls that were cancelled from head of queue.
985-
while self._scheduled and self._scheduled[0]._cancelled:
986-
heapq.heappop(self._scheduled)
996+
997+
# Remove delayed calls that were cancelled if their number is too high
998+
sched_count = len(self._scheduled)
999+
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1000+
self._timer_cancelled_count / sched_count >
1001+
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1002+
for handle in self._scheduled:
1003+
if handle._cancelled:
1004+
handle._scheduled = False
1005+
1006+
self._scheduled = [x for x in self._scheduled if not x._cancelled]
1007+
self._timer_cancelled_count = 0
1008+
1009+
heapq.heapify(self._scheduled)
1010+
else:
1011+
# Remove delayed calls that were cancelled from head of queue.
1012+
while self._scheduled and self._scheduled[0]._cancelled:
1013+
self._timer_cancelled_count -= 1
1014+
handle = heapq.heappop(self._scheduled)
1015+
handle._scheduled = False
9871016

9881017
timeout = None
9891018
if self._ready:
@@ -1024,6 +1053,7 @@ def _run_once(self):
10241053
if handle._when >= end_time:
10251054
break
10261055
handle = heapq.heappop(self._scheduled)
1056+
handle._scheduled = False
10271057
self._ready.append(handle)
10281058

10291059
# This is the only place where callbacks are actually *called*.

Lib/asyncio/events.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,15 @@ def __repr__(self):
105105
return '<%s>' % ' '.join(info)
106106

107107
def cancel(self):
108-
self._cancelled = True
109-
if self._loop.get_debug():
110-
# Keep a representation in debug mode to keep callback and
111-
# parameters. For example, to log the warning "Executing <Handle
112-
# ...> took 2.5 second"
113-
self._repr = repr(self)
114-
self._callback = None
115-
self._args = None
108+
if not self._cancelled:
109+
self._cancelled = True
110+
if self._loop.get_debug():
111+
# Keep a representation in debug mode to keep callback and
112+
# parameters. For example, to log the warning
113+
# "Executing <Handle...> took 2.5 second"
114+
self._repr = repr(self)
115+
self._callback = None
116+
self._args = None
116117

117118
def _run(self):
118119
try:
@@ -134,14 +135,15 @@ def _run(self):
134135
class TimerHandle(Handle):
135136
"""Object returned by timed callback registration methods."""
136137

137-
__slots__ = ['_when']
138+
__slots__ = ['_scheduled', '_when']
138139

139140
def __init__(self, when, callback, args, loop):
140141
assert when is not None
141142
super().__init__(callback, args, loop)
142143
if self._source_traceback:
143144
del self._source_traceback[-1]
144145
self._when = when
146+
self._scheduled = False
145147

146148
def _repr_info(self):
147149
info = super()._repr_info()
@@ -180,6 +182,11 @@ def __ne__(self, other):
180182
equal = self.__eq__(other)
181183
return NotImplemented if equal is NotImplemented else not equal
182184

185+
def cancel(self):
186+
if not self._cancelled:
187+
self._loop._timer_handle_cancelled(self)
188+
super().cancel()
189+
183190

184191
class AbstractServer:
185192
"""Abstract server returned by create_server()."""
@@ -238,6 +245,10 @@ def close(self):
238245

239246
# Methods scheduling callbacks. All these return Handles.
240247

248+
def _timer_handle_cancelled(self, handle):
249+
"""Notification that a TimerHandle has been cancelled."""
250+
raise NotImplementedError
251+
241252
def call_soon(self, callback, *args):
242253
return self.call_later(0, callback, *args)
243254

Lib/test/test_asyncio/test_base_events.py

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import errno
44
import logging
5+
import math
56
import socket
67
import sys
78
import time
@@ -73,13 +74,6 @@ def test__add_callback_handle(self):
7374
self.assertFalse(self.loop._scheduled)
7475
self.assertIn(h, self.loop._ready)
7576

76-
def test__add_callback_timer(self):
77-
h = asyncio.TimerHandle(time.monotonic()+10, lambda: False, (),
78-
self.loop)
79-
80-
self.loop._add_callback(h)
81-
self.assertIn(h, self.loop._scheduled)
82-
8377
def test__add_callback_cancelled_handle(self):
8478
h = asyncio.Handle(lambda: False, (), self.loop)
8579
h.cancel()
@@ -283,6 +277,82 @@ def cb(loop):
283277
self.assertTrue(processed)
284278
self.assertEqual([handle], list(self.loop._ready))
285279

280+
def test__run_once_cancelled_event_cleanup(self):
281+
self.loop._process_events = mock.Mock()
282+
283+
self.assertTrue(
284+
0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
285+
286+
def cb():
287+
pass
288+
289+
# Set up one "blocking" event that will not be cancelled to
290+
# ensure later cancelled events do not make it to the head
291+
# of the queue and get cleaned.
292+
not_cancelled_count = 1
293+
self.loop.call_later(3000, cb)
294+
295+
# Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
296+
# cancelled handles, ensure they aren't removed
297+
298+
cancelled_count = 2
299+
for x in range(2):
300+
h = self.loop.call_later(3600, cb)
301+
h.cancel()
302+
303+
# Add some cancelled events that will be at head and removed
304+
cancelled_count += 2
305+
for x in range(2):
306+
h = self.loop.call_later(100, cb)
307+
h.cancel()
308+
309+
# This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
310+
self.assertLessEqual(cancelled_count + not_cancelled_count,
311+
base_events._MIN_SCHEDULED_TIMER_HANDLES)
312+
313+
self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
314+
315+
self.loop._run_once()
316+
317+
cancelled_count -= 2
318+
319+
self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
320+
321+
self.assertEqual(len(self.loop._scheduled),
322+
cancelled_count + not_cancelled_count)
323+
324+
# Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
325+
# so that deletion of cancelled events will occur on next _run_once
326+
add_cancel_count = int(math.ceil(
327+
base_events._MIN_SCHEDULED_TIMER_HANDLES *
328+
base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
329+
330+
add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
331+
add_cancel_count, 0)
332+
333+
# Add some events that will not be cancelled
334+
not_cancelled_count += add_not_cancel_count
335+
for x in range(add_not_cancel_count):
336+
self.loop.call_later(3600, cb)
337+
338+
# Add enough cancelled events
339+
cancelled_count += add_cancel_count
340+
for x in range(add_cancel_count):
341+
h = self.loop.call_later(3600, cb)
342+
h.cancel()
343+
344+
# Ensure all handles are still scheduled
345+
self.assertEqual(len(self.loop._scheduled),
346+
cancelled_count + not_cancelled_count)
347+
348+
self.loop._run_once()
349+
350+
# Ensure cancelled events were removed
351+
self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
352+
353+
# Ensure only uncancelled events remain scheduled
354+
self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
355+
286356
def test_run_until_complete_type_error(self):
287357
self.assertRaises(TypeError,
288358
self.loop.run_until_complete, 'blah')

Lib/test/test_asyncio/test_events.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,9 +1890,17 @@ def test_handle_repr_debug(self):
18901890

18911891
# cancelled handle
18921892
h.cancel()
1893-
self.assertEqual(repr(h),
1894-
'<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
1895-
% (filename, lineno, create_filename, create_lineno))
1893+
self.assertEqual(
1894+
repr(h),
1895+
'<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
1896+
% (filename, lineno, create_filename, create_lineno))
1897+
1898+
# double cancellation won't overwrite _repr
1899+
h.cancel()
1900+
self.assertEqual(
1901+
repr(h),
1902+
'<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
1903+
% (filename, lineno, create_filename, create_lineno))
18961904

18971905
def test_handle_source_traceback(self):
18981906
loop = asyncio.get_event_loop_policy().new_event_loop()

Misc/NEWS

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22
Python News
33
+++++++++++
44

5+
6+
What's New in Python 3.4.3?
7+
===========================
8+
9+
Core and Builtins
10+
-----------------
11+
12+
Library
13+
-------
14+
15+
- Issue #22448: Improve canceled timer handles cleanup to prevent
16+
unbound memory usage. Patch by Joshua Moore-Oliva.
17+
18+
519
What's New in Python 3.4.2?
620
===========================
721

0 commit comments

Comments
 (0)