Skip to content

bpo-32193: Convert asyncio to async/await usage #4753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
db463e3
Convert asyncio/tasks.py to async/await
asvetlov Dec 7, 2017
bdde875
Convert asyncio/queues.py to async/await
asvetlov Dec 7, 2017
a44c892
Convert asyncio/test_utils.py to async/await
asvetlov Dec 7, 2017
ee5ecc7
Convert asyncio/base_subprocess.py to async/await
asvetlov Dec 7, 2017
5654dcf
Convert asyncio/subprocess.py to async/await
asvetlov Dec 7, 2017
0038376
Convert asyncio/streams.py to async/await
asvetlov Dec 7, 2017
3e90af4
Fix comments
asvetlov Dec 7, 2017
ca9c418
Convert asyncio/locks.py to async/await
asvetlov Dec 7, 2017
c0ec04e
Convert asyncio.sleep to async def
asvetlov Dec 7, 2017
f6d230d
Add a comment
asvetlov Dec 8, 2017
251d990
Add missing news
asvetlov Dec 8, 2017
983552a
Convert stubs from AbstrctEventLoop to async functions
asvetlov Dec 8, 2017
618fcee
Convert subprocess_shell/subprocess_exec
asvetlov Dec 8, 2017
cfab76e
Convert connect_read_pipe/connect_write_pip to async/await syntax
asvetlov Dec 8, 2017
ee8c389
Convert create_datagram_endpoint
asvetlov Dec 8, 2017
ad64f0a
Convert create_unix_server/create_unix_connection
asvetlov Dec 8, 2017
cdd059a
Get rid of old style coroutines in unix_events.py
asvetlov Dec 8, 2017
b2c7d61
Convert selector_events.py to async/await
asvetlov Dec 8, 2017
a83deee
Convert wait_closed and create_connection
asvetlov Dec 8, 2017
0a355a5
Drop redundant line
asvetlov Dec 8, 2017
186f23a
Convert base_events.py
asvetlov Dec 8, 2017
6cc5056
Code cleanup
asvetlov Dec 8, 2017
6f15fa7
Drop redundant comments
asvetlov Dec 8, 2017
936b104
Fix indentation
asvetlov Dec 8, 2017
e339a2b
Add explicit tests for compatibility between old and new coroutines
asvetlov Dec 8, 2017
43eb968
Convert windows event loop to use async/await
asvetlov Dec 8, 2017
6ea4d75
Fix double awaiting of async function
asvetlov Dec 8, 2017
7c80c7c
Convert asyncio/locks.py
asvetlov Dec 8, 2017
1c0c2e5
Improve docstring
asvetlov Dec 8, 2017
d1a9a96
Convert tests to async/await
asvetlov Dec 8, 2017
17a5969
Convert more tests
asvetlov Dec 8, 2017
f159ca7
Convert more tests
asvetlov Dec 8, 2017
74cd8f3
Convert more tests
asvetlov Dec 8, 2017
19460e5
Convert tests
asvetlov Dec 8, 2017
61cf3eb
Improve test
asvetlov Dec 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 59 additions & 69 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from . import events
from . import futures
from . import tasks
from .coroutines import coroutine
from .log import logger


Expand Down Expand Up @@ -220,13 +219,12 @@ def _wakeup(self):
if not waiter.done():
waiter.set_result(waiter)

@coroutine
def wait_closed(self):
async def wait_closed(self):
if self.sockets is None or self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
yield from waiter
await waiter


class BaseEventLoop(events.AbstractEventLoop):
Expand Down Expand Up @@ -330,10 +328,9 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
"""Create write pipe transport."""
raise NotImplementedError

@coroutine
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
"""Create subprocess transport."""
raise NotImplementedError

Expand Down Expand Up @@ -371,8 +368,7 @@ def _asyncgen_firstiter_hook(self, agen):

self._asyncgens.add(agen)

@coroutine
def shutdown_asyncgens(self):
async def shutdown_asyncgens(self):
"""Shutdown all active asynchronous generators."""
self._asyncgens_shutdown_called = True

Expand All @@ -384,12 +380,11 @@ def shutdown_asyncgens(self):
closing_agens = list(self._asyncgens)
self._asyncgens.clear()

shutdown_coro = tasks.gather(
results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True,
loop=self)

results = yield from shutdown_coro
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
self.call_exception_handler({
Expand Down Expand Up @@ -671,10 +666,10 @@ def getaddrinfo(self, host, port, *,
def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)

@coroutine
def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None):
async def create_connection(self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None):
"""Connect to a TCP server.

Create a streaming transport connection to a given Internet host and
Expand Down Expand Up @@ -722,7 +717,7 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
else:
f2 = None

yield from tasks.wait(fs, loop=self)
await tasks.wait(fs, loop=self)

infos = f1.result()
if not infos:
Expand Down Expand Up @@ -755,7 +750,7 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
yield from self.sock_connect(sock, address)
await self.sock_connect(sock, address)
except OSError as exc:
if sock is not None:
sock.close()
Expand Down Expand Up @@ -793,7 +788,7 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
raise ValueError(
'A Stream Socket was expected, got {!r}'.format(sock))

transport, protocol = yield from self._create_connection_transport(
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
# Get the socket from the transport because SSL transport closes
Expand All @@ -803,9 +798,8 @@ def create_connection(self, protocol_factory, host=None, port=None, *,
sock, host, port, transport, protocol)
return transport, protocol

@coroutine
def _create_connection_transport(self, sock, protocol_factory, ssl,
server_hostname, server_side=False):
async def _create_connection_transport(self, sock, protocol_factory, ssl,
server_hostname, server_side=False):

sock.setblocking(False)

Expand All @@ -820,19 +814,18 @@ def _create_connection_transport(self, sock, protocol_factory, ssl,
transport = self._make_socket_transport(sock, protocol, waiter)

try:
yield from waiter
await waiter
except:
transport.close()
raise

return transport, protocol

@coroutine
def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None, sock=None):
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None, sock=None):
"""Create datagram connection."""
if sock is not None:
if not _is_dgram_socket(sock):
Expand Down Expand Up @@ -872,7 +865,7 @@ def create_datagram_endpoint(self, protocol_factory,
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')

infos = yield from _ensure_resolved(
infos = await _ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags, loop=self)
if not infos:
Expand Down Expand Up @@ -918,7 +911,7 @@ def create_datagram_endpoint(self, protocol_factory,
if local_addr:
sock.bind(local_address)
if remote_addr:
yield from self.sock_connect(sock, remote_address)
await self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
Expand Down Expand Up @@ -948,32 +941,30 @@ def create_datagram_endpoint(self, protocol_factory,
remote_addr, transport, protocol)

try:
yield from waiter
await waiter
except:
transport.close()
raise

return transport, protocol

@coroutine
def _create_server_getaddrinfo(self, host, port, family, flags):
infos = yield from _ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
flags=flags, loop=self)
async def _create_server_getaddrinfo(self, host, port, family, flags):
infos = await _ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
return infos

@coroutine
def create_server(self, protocol_factory, host=None, port=None,
*,
family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None):
async def create_server(self, protocol_factory, host=None, port=None,
*,
family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None):
"""Create a TCP server.

The host parameter can be a string, in that case the TCP server is bound
Expand Down Expand Up @@ -1011,7 +1002,7 @@ def create_server(self, protocol_factory, host=None, port=None,
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
infos = yield from tasks.gather(*fs, loop=self)
infos = await tasks.gather(*fs, loop=self)
infos = set(itertools.chain.from_iterable(infos))

completed = False
Expand Down Expand Up @@ -1068,8 +1059,8 @@ def create_server(self, protocol_factory, host=None, port=None,
logger.info("%r is serving", server)
return server

@coroutine
def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
async def connect_accepted_socket(self, protocol_factory, sock,
*, ssl=None):
"""Handle an accepted connection.

This is used by servers that accept connections outside of
Expand All @@ -1082,7 +1073,7 @@ def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
raise ValueError(
'A Stream Socket was expected, got {!r}'.format(sock))

transport, protocol = yield from self._create_connection_transport(
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, '', server_side=True)
if self._debug:
# Get the socket from the transport because SSL transport closes
Expand All @@ -1091,14 +1082,13 @@ def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
return transport, protocol

@coroutine
def connect_read_pipe(self, protocol_factory, pipe):
async def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_read_pipe_transport(pipe, protocol, waiter)

try:
yield from waiter
await waiter
except:
transport.close()
raise
Expand All @@ -1108,14 +1098,13 @@ def connect_read_pipe(self, protocol_factory, pipe):
pipe.fileno(), transport, protocol)
return transport, protocol

@coroutine
def connect_write_pipe(self, protocol_factory, pipe):
async def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
transport = self._make_write_pipe_transport(pipe, protocol, waiter)

try:
yield from waiter
await waiter
except:
transport.close()
raise
Expand All @@ -1138,11 +1127,13 @@ def _log_subprocess(self, msg, stdin, stdout, stderr):
info.append('stderr=%s' % _format_pipe(stderr))
logger.debug(' '.join(info))

@coroutine
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=False, shell=True, bufsize=0,
**kwargs):
async def subprocess_shell(self, protocol_factory, cmd, *,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False,
shell=True, bufsize=0,
**kwargs):
if not isinstance(cmd, (bytes, str)):
raise ValueError("cmd must be a string")
if universal_newlines:
Expand All @@ -1157,17 +1148,16 @@ def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
# (password) and may be too long
debug_log = 'run shell command %r' % cmd
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
transport = await self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug:
logger.info('%s: %r', debug_log, transport)
return transport, protocol

@coroutine
def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False,
shell=False, bufsize=0, **kwargs):
async def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False,
shell=False, bufsize=0, **kwargs):
if universal_newlines:
raise ValueError("universal_newlines must be False")
if shell:
Expand All @@ -1186,7 +1176,7 @@ def subprocess_exec(self, protocol_factory, program, *args,
# (password) and may be too long
debug_log = 'execute program %r' % program
self._log_subprocess(debug_log, stdin, stdout, stderr)
transport = yield from self._make_subprocess_transport(
transport = await self._make_subprocess_transport(
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug:
Expand Down
15 changes: 6 additions & 9 deletions Lib/asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from . import protocols
from . import transports
from .coroutines import coroutine
from .log import logger


Expand Down Expand Up @@ -154,26 +153,25 @@ def kill(self):
self._check_proc()
self._proc.kill()

@coroutine
def _connect_pipes(self, waiter):
async def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop

if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
_, pipe = await loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe

if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
_, pipe = await loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe

if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
_, pipe = await loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
self._pipes[2] = pipe
Expand Down Expand Up @@ -224,8 +222,7 @@ def _process_exited(self, returncode):
waiter.set_result(returncode)
self._exit_waiters = None

@coroutine
def _wait(self):
async def _wait(self):
"""Wait until the process exit and return the process return code.

This method is a coroutine."""
Expand All @@ -234,7 +231,7 @@ def _wait(self):

waiter = self._loop.create_future()
self._exit_waiters.append(waiter)
return (yield from waiter)
return await waiter

def _try_finish(self):
assert not self._finished
Expand Down
Loading