-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-46771: Implement asyncio context managers for handling timeouts #31394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
dac5874
bpo-46771: Implement asyncio context managers for handling timeouts
asvetlov 374ff2a
Add reschedule tests
asvetlov 70fa59b
Add reschedule tests
asvetlov 3ae2af6
Dro breakpoint
asvetlov 1654ec4
Tune repr
asvetlov 2c9dbf8
Add tests
asvetlov baa7400
More tests
asvetlov 1ca0fb8
Rename
asvetlov 8a81dd1
Update Lib/asyncio/timeouts.py
asvetlov fae235d
Update Lib/asyncio/timeouts.py
asvetlov 663b82f
Update Lib/asyncio/timeouts.py
asvetlov 24d62d1
Update Lib/asyncio/timeouts.py
asvetlov 6a26d1b
Add a test
asvetlov 930b92b
Polish docstrings
asvetlov ac5c53d
Format
asvetlov f96ad1c
Tune tests
asvetlov 94b4b4c
Fix comment
asvetlov 388c6da
Tune comment
asvetlov cdc7f88
Tune docstrings
asvetlov 9949fe4
Tune
asvetlov c716856
Tune tests
asvetlov b4889a0
Update Lib/test/test_asyncio/test_timeouts.py
asvetlov b6504e6
Tune tests
asvetlov fd2688d
Don't clobber foreign exceptions even if timeout is expiring
gvanrossum 2ddda69
Add test from discussion
gvanrossum 493545a
Fix indent of added test
gvanrossum ff36f2a
Disable slow callback warning
asvetlov 8790e49
Reformat
asvetlov ac6f8c8
Increase delay
asvetlov e8c67ce
Don't raise TimeoutError if the CancelledError was swallowed by inner…
asvetlov e65d766
Don't duplicate py/c tests, timeout has no C accelerators
asvetlov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import enum | ||
|
||
from types import TracebackType | ||
from typing import final, Optional, Type | ||
|
||
from . import events | ||
from . import exceptions | ||
from . import tasks | ||
|
||
|
||
__all__ = ( | ||
"Timeout", | ||
"timeout", | ||
"timeout_at", | ||
) | ||
|
||
|
||
class _State(enum.Enum): | ||
CREATED = "created" | ||
ENTERED = "active" | ||
EXPIRING = "expiring" | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
EXPIRED = "expired" | ||
EXITED = "finished" | ||
|
||
|
||
@final | ||
class Timeout: | ||
|
||
def __init__(self, when: Optional[float]) -> None: | ||
self._state = _State.CREATED | ||
|
||
self._timeout_handler: Optional[events.TimerHandle] = None | ||
self._task: Optional[tasks.Task] = None | ||
self._when = when | ||
|
||
def when(self) -> Optional[float]: | ||
return self._when | ||
|
||
def reschedule(self, when: Optional[float]) -> None: | ||
assert self._state is not _State.CREATED | ||
if self._state is not _State.ENTERED: | ||
raise RuntimeError( | ||
f"Cannot change state of {self._state.value} Timeout", | ||
) | ||
|
||
self._when = when | ||
|
||
if self._timeout_handler is not None: | ||
self._timeout_handler.cancel() | ||
|
||
if when is None: | ||
self._timeout_handler = None | ||
else: | ||
loop = events.get_running_loop() | ||
self._timeout_handler = loop.call_at( | ||
when, | ||
self._on_timeout, | ||
) | ||
|
||
def expired(self) -> bool: | ||
asvetlov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Is timeout expired during execution?""" | ||
return self._state in (_State.EXPIRING, _State.EXPIRED) | ||
|
||
def __repr__(self) -> str: | ||
info = [''] | ||
if self._state is _State.ENTERED: | ||
when = round(self._when, 3) if self._when is not None else None | ||
info.append(f"when={when}") | ||
info_str = ' '.join(info) | ||
return f"<Timeout [{self._state.value}]{info_str}>" | ||
|
||
async def __aenter__(self) -> "Timeout": | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._state = _State.ENTERED | ||
self._task = tasks.current_task() | ||
if self._task is None: | ||
raise RuntimeError("Timeout should be used inside a task") | ||
self.reschedule(self._when) | ||
return self | ||
|
||
async def __aexit__( | ||
self, | ||
exc_type: Optional[Type[BaseException]], | ||
exc_val: Optional[BaseException], | ||
exc_tb: Optional[TracebackType], | ||
) -> Optional[bool]: | ||
assert self._state in (_State.ENTERED, _State.EXPIRING) | ||
|
||
if self._timeout_handler is not None: | ||
self._timeout_handler.cancel() | ||
self._timeout_handler = None | ||
|
||
if self._state is _State.EXPIRING: | ||
self._state = _State.EXPIRED | ||
|
||
if self._task.uncancel() == 0 and exc_type is exceptions.CancelledError: | ||
# Since there are no outstanding cancel requests, we're | ||
# handling this. | ||
raise TimeoutError | ||
elif self._state is _State.ENTERED: | ||
self._state = _State.EXITED | ||
|
||
return None | ||
|
||
def _on_timeout(self) -> None: | ||
assert self._state is _State.ENTERED | ||
self._task.cancel() | ||
self._state = _State.EXPIRING | ||
# drop the reference early | ||
self._timeout_handler = None | ||
|
||
|
||
def timeout(delay: Optional[float]) -> Timeout: | ||
"""Timeout async context manager. | ||
|
||
Useful in cases when you want to apply timeout logic around block | ||
of code or in cases when asyncio.wait_for is not suitable. For example: | ||
|
||
>>> async with asyncio.timeout(10): # 10 seconds timeout | ||
... await long_running_task() | ||
|
||
|
||
delay - value in seconds or None to disable timeout logic | ||
|
||
long_running_task() is interrupted by raising asyncio.CancelledError, | ||
the top-most affected timeout() context manager converts CancelledError | ||
into TimeoutError. | ||
""" | ||
loop = events.get_running_loop() | ||
return Timeout(loop.time() + delay if delay is not None else None) | ||
|
||
|
||
def timeout_at(when: Optional[float]) -> Timeout: | ||
"""Schedule the timeout at absolute time. | ||
|
||
Like timeout() but argument gives absolute time in the same clock system | ||
as loop.time(). | ||
|
||
Please note: it is not POSIX time but a time with | ||
undefined starting base, e.g. the time of the system power on. | ||
|
||
>>> async with asyncio.timeout_at(loop.time() + 10): | ||
... await long_running_task() | ||
|
||
|
||
when - a deadline when timeout occurs or None to disable timeout logic | ||
|
||
long_running_task() is interrupted by raising asyncio.CancelledError, | ||
the top-most affected timeout() context manager converts CancelledError | ||
into TimeoutError. | ||
""" | ||
return Timeout(when) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
"""Tests for asyncio/timeouts.py""" | ||
|
||
import unittest | ||
import time | ||
|
||
import asyncio | ||
from asyncio import tasks | ||
|
||
|
||
def tearDownModule(): | ||
asyncio.set_event_loop_policy(None) | ||
|
||
|
||
class TimeoutTests(unittest.IsolatedAsyncioTestCase): | ||
|
||
async def test_timeout_basic(self): | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01) as cm: | ||
await asyncio.sleep(10) | ||
self.assertTrue(cm.expired()) | ||
|
||
async def test_timeout_at_basic(self): | ||
loop = asyncio.get_running_loop() | ||
|
||
with self.assertRaises(TimeoutError): | ||
deadline = loop.time() + 0.01 | ||
async with asyncio.timeout_at(deadline) as cm: | ||
await asyncio.sleep(10) | ||
self.assertTrue(cm.expired()) | ||
self.assertEqual(deadline, cm.when()) | ||
|
||
async def test_nested_timeouts(self): | ||
loop = asyncio.get_running_loop() | ||
cancelled = False | ||
with self.assertRaises(TimeoutError): | ||
deadline = loop.time() + 0.01 | ||
async with asyncio.timeout_at(deadline) as cm1: | ||
# Only the topmost context manager should raise TimeoutError | ||
try: | ||
async with asyncio.timeout_at(deadline) as cm2: | ||
await asyncio.sleep(10) | ||
except asyncio.CancelledError: | ||
cancelled = True | ||
raise | ||
self.assertTrue(cancelled) | ||
self.assertTrue(cm1.expired()) | ||
self.assertTrue(cm2.expired()) | ||
|
||
async def test_waiter_cancelled(self): | ||
loop = asyncio.get_running_loop() | ||
cancelled = False | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01): | ||
try: | ||
await asyncio.sleep(10) | ||
except asyncio.CancelledError: | ||
cancelled = True | ||
raise | ||
self.assertTrue(cancelled) | ||
|
||
async def test_timeout_not_called(self): | ||
loop = asyncio.get_running_loop() | ||
t0 = loop.time() | ||
async with asyncio.timeout(10) as cm: | ||
await asyncio.sleep(0.01) | ||
t1 = loop.time() | ||
|
||
self.assertFalse(cm.expired()) | ||
# 2 sec for slow CI boxes | ||
self.assertLess(t1-t0, 2) | ||
self.assertGreater(cm.when(), t1) | ||
|
||
async def test_timeout_disabled(self): | ||
loop = asyncio.get_running_loop() | ||
t0 = loop.time() | ||
async with asyncio.timeout(None) as cm: | ||
await asyncio.sleep(0.01) | ||
t1 = loop.time() | ||
|
||
self.assertFalse(cm.expired()) | ||
self.assertIsNone(cm.when()) | ||
# 2 sec for slow CI boxes | ||
self.assertLess(t1-t0, 2) | ||
|
||
async def test_timeout_at_disabled(self): | ||
loop = asyncio.get_running_loop() | ||
t0 = loop.time() | ||
async with asyncio.timeout_at(None) as cm: | ||
await asyncio.sleep(0.01) | ||
t1 = loop.time() | ||
|
||
self.assertFalse(cm.expired()) | ||
self.assertIsNone(cm.when()) | ||
# 2 sec for slow CI boxes | ||
self.assertLess(t1-t0, 2) | ||
|
||
async def test_timeout_zero(self): | ||
loop = asyncio.get_running_loop() | ||
t0 = loop.time() | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0) as cm: | ||
await asyncio.sleep(10) | ||
t1 = loop.time() | ||
self.assertTrue(cm.expired()) | ||
# 2 sec for slow CI boxes | ||
self.assertLess(t1-t0, 2) | ||
self.assertTrue(t0 <= cm.when() <= t1) | ||
|
||
async def test_foreign_exception_passed(self): | ||
with self.assertRaises(KeyError): | ||
async with asyncio.timeout(0.01) as cm: | ||
raise KeyError | ||
self.assertFalse(cm.expired()) | ||
|
||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async def test_foreign_exception_on_timeout(self): | ||
async def crash(): | ||
try: | ||
await asyncio.sleep(1) | ||
finally: | ||
1/0 | ||
with self.assertRaises(ZeroDivisionError): | ||
async with asyncio.timeout(0.01): | ||
await crash() | ||
|
||
async def test_foreign_cancel_doesnt_timeout_if_not_expired(self): | ||
with self.assertRaises(asyncio.CancelledError): | ||
async with asyncio.timeout(10) as cm: | ||
asyncio.current_task().cancel() | ||
await asyncio.sleep(10) | ||
self.assertFalse(cm.expired()) | ||
|
||
async def test_outer_task_is_not_cancelled(self): | ||
async def outer() -> None: | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.001): | ||
await asyncio.sleep(10) | ||
|
||
task = asyncio.create_task(outer()) | ||
await task | ||
self.assertFalse(task.cancelled()) | ||
self.assertTrue(task.done()) | ||
|
||
async def test_nested_timeouts_concurrent(self): | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.002): | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.1): | ||
# Pretend we crunch some numbers. | ||
time.sleep(0.01) | ||
await asyncio.sleep(1) | ||
|
||
async def test_nested_timeouts_loop_busy(self): | ||
# After the inner timeout is an expensive operation which should | ||
# be stopped by the outer timeout. | ||
loop = asyncio.get_running_loop() | ||
# Disable a message about long running task | ||
loop.slow_callback_duration = 10 | ||
t0 = loop.time() | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.1): # (1) | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01): # (2) | ||
# Pretend the loop is busy for a while. | ||
time.sleep(0.1) | ||
await asyncio.sleep(1) | ||
# TimeoutError was cought by (2) | ||
await asyncio.sleep(10) # This sleep should be interrupted by (1) | ||
t1 = loop.time() | ||
self.assertTrue(t0 <= t1 <= t0 + 1) | ||
|
||
async def test_reschedule(self): | ||
loop = asyncio.get_running_loop() | ||
fut = loop.create_future() | ||
deadline1 = loop.time() + 10 | ||
deadline2 = deadline1 + 20 | ||
|
||
async def f(): | ||
async with asyncio.timeout_at(deadline1) as cm: | ||
fut.set_result(cm) | ||
await asyncio.sleep(50) | ||
|
||
task = asyncio.create_task(f()) | ||
cm = await fut | ||
|
||
self.assertEqual(cm.when(), deadline1) | ||
cm.reschedule(deadline2) | ||
self.assertEqual(cm.when(), deadline2) | ||
cm.reschedule(None) | ||
self.assertIsNone(cm.when()) | ||
|
||
task.cancel() | ||
|
||
with self.assertRaises(asyncio.CancelledError): | ||
await task | ||
self.assertFalse(cm.expired()) | ||
|
||
async def test_repr_active(self): | ||
async with asyncio.timeout(10) as cm: | ||
self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>") | ||
|
||
async def test_repr_expired(self): | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01) as cm: | ||
await asyncio.sleep(10) | ||
self.assertEqual(repr(cm), "<Timeout [expired]>") | ||
|
||
async def test_repr_finished(self): | ||
async with asyncio.timeout(10) as cm: | ||
await asyncio.sleep(0) | ||
|
||
self.assertEqual(repr(cm), "<Timeout [finished]>") | ||
|
||
async def test_repr_disabled(self): | ||
async with asyncio.timeout(None) as cm: | ||
self.assertEqual(repr(cm), r"<Timeout [active] when=None>") | ||
|
||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
async def test_nested_timeout_in_finally(self): | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01): | ||
try: | ||
await asyncio.sleep(1) | ||
finally: | ||
with self.assertRaises(TimeoutError): | ||
async with asyncio.timeout(0.01): | ||
await asyncio.sleep(10) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.