Skip to content

bpo-29883: Add UDP support to Windows Proactor Event Loop #1067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -482,15 +482,16 @@ Opening network connections
transport. If specified, *local_addr* and *remote_addr* should be omitted
(must be :const:`None`).

On Windows, with :class:`ProactorEventLoop`, this method is not supported.

See :ref:`UDP echo client protocol <asyncio-udp-echo-client-protocol>` and
:ref:`UDP echo server protocol <asyncio-udp-echo-server-protocol>` examples.

.. versionchanged:: 3.4.4
The *family*, *proto*, *flags*, *reuse_address*, *reuse_port,
*allow_broadcast*, and *sock* parameters were added.

.. versionchanged:: 3.8
Added support for Windows.

.. coroutinemethod:: loop.create_unix_connection(protocol_factory, \
path=None, \*, ssl=None, sock=None, \
server_hostname=None, ssl_handshake_timeout=None)
Expand Down
3 changes: 0 additions & 3 deletions Doc/library/asyncio-platforms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ All event loops on Windows do not support the following methods:

:class:`ProactorEventLoop` has the following limitations:

* The :meth:`loop.create_datagram_endpoint` method
is not supported.

* The :meth:`loop.add_reader` and :meth:`loop.add_writer`
methods are not supported.

Expand Down
157 changes: 141 additions & 16 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import socket
import warnings
import signal
import collections

from . import base_events
from . import constants
Expand All @@ -22,6 +23,24 @@
from .log import logger


def _set_socket_extra(transport, sock):
transport._extra['socket'] = sock

try:
transport._extra['sockname'] = sock.getsockname()
except socket.error:
if transport._loop.get_debug():
logger.warning(
"getsockname() failed on %r", sock, exc_info=True)

if 'peername' not in transport._extra:
try:
transport._extra['peername'] = sock.getpeername()
except socket.error:
# UDP sockets may not have a peer name
transport._extra['peername'] = None


class _ProactorBasePipeTransport(transports._FlowControlMixin,
transports.BaseTransport):
"""Base class for pipe and socket transports."""
Expand Down Expand Up @@ -425,6 +444,122 @@ def _pipe_closed(self, fut):
self.close()


class _ProactorDatagramTransport(_ProactorBasePipeTransport):

def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
self._address = address
self._empty_waiter = None
# We don't need to call _protocol.connection_made() since our base
# constructor does it for us.
super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)

# The base constructor sets _buffer = None, so we set it here
self._buffer = collections.deque()
self._loop.call_soon(self._loop_reading)

def _set_extra(self, sock):
_set_socket_extra(self, sock)

def abort(self):
self._force_close(None)

def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))

if not data:
return

if self._address and addr not in (None, self._address):
raise ValueError(
f'Invalid address: must be None or {self._address}')

if self._conn_lost and self._address:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.sendto() raised exception.')
self._conn_lost += 1
return

self._buffer.appendleft((data, addr))

if self._write_fut is None:
# No current write operations are active, kick one off
self._loop_writing()
# else: A write operation is already kicked off

def _loop_writing(self, fut=None):
try:
if self._conn_lost:
return

assert fut is self._write_fut
self._write_fut = None
if fut:
# We are in a _loop_writing() done callback, get the result
fut.result()

if not self._buffer or (self._conn_lost and self._address):
# The connection has been closed
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
return

data, addr = self._buffer.pop()
if self._address:
self._write_fut = self._loop._proactor.send(self._sock, data)
else:
self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr)
except OSError as exc:
self._protocol.error_received(exc)
except Exception as exc:
self._fatal_error(exc, 'Fatal write error on datagram transport')
else:
self._write_fut.add_done_callback(self._loop_writing)

def _loop_reading(self, fut=None):
data = None
try:
if self._conn_lost:
return

assert self._read_fut is fut or (self._read_fut is None and
self._closing)

self._read_fut = None
if fut is not None:
res = fut.result()

if self._closing:
# since close() has been called we ignore any read data
data = None
return

if self._address:
data, addr = res, self._address
else:
data, addr = res

if self._conn_lost:
return
if self._address:
self._read_fut = self._loop._proactor.recv(self._sock, 4096)
else:
self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096)
except OSError as exc:
self._protocol.error_received(exc)
except exceptions.CancelledError:
if not self._closing:
raise
else:
if self._read_fut is not None:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if data:
self._protocol.datagram_received(data, addr)


class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
_ProactorBaseWritePipeTransport,
transports.Transport):
Expand All @@ -450,22 +585,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
base_events._set_nodelay(sock)

def _set_extra(self, sock):
self._extra['socket'] = sock

try:
self._extra['sockname'] = sock.getsockname()
except (socket.error, AttributeError):
if self._loop.get_debug():
logger.warning(
"getsockname() failed on %r", sock, exc_info=True)

if 'peername' not in self._extra:
try:
self._extra['peername'] = sock.getpeername()
except (socket.error, AttributeError):
if self._loop.get_debug():
logger.warning("getpeername() failed on %r",
sock, exc_info=True)
_set_socket_extra(self, sock)

def can_write_eof(self):
return True
Expand Down Expand Up @@ -510,6 +630,11 @@ def _make_ssl_transport(
extra=extra, server=server)
return ssl_protocol._app_transport

def _make_datagram_transport(self, sock, protocol,
address=None, waiter=None, extra=None):
return _ProactorDatagramTransport(self, sock, protocol, address,
waiter, extra)

def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
extra=None):
return _ProactorDuplexPipeTransport(self,
Expand Down
46 changes: 46 additions & 0 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,44 @@ def finish_recv(trans, key, ov):

return self._register(ov, conn, finish_recv)

def recvfrom(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
try:
ov.WSARecvFrom(conn.fileno(), nbytes, flags)
except BrokenPipeError:
return self._result((b'', (None, None)))

def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_recv)

def sendto(self, conn, buf, flags=0, addr=None):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)

ov.WSASendTo(conn.fileno(), buf, flags, addr)

def finish_send(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_send)

def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
Expand Down Expand Up @@ -530,6 +568,14 @@ async def accept_coro(future, conn):
return future

def connect(self, conn, address):
if conn.type == socket.SOCK_DGRAM:
# WSAConnect will complete immediately for UDP sockets so we don't
# need to register any IOCP operation
_overlapped.WSAConnect(conn.fileno(), address)
fut = self._loop.create_future()
fut.set_result(None)
return fut

self._register_with_iocp(conn)
# The socket needs to be locally bound before we call ConnectEx().
try:
Expand Down
9 changes: 0 additions & 9 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1252,11 +1252,6 @@ def datagram_received(self, data, addr):
server.transport.close()

def test_create_datagram_endpoint_sock(self):
if (sys.platform == 'win32' and
isinstance(self.loop, proactor_events.BaseProactorEventLoop)):
raise unittest.SkipTest(
'UDP is not supported with proactor event loops')

sock = None
local_address = ('127.0.0.1', 0)
infos = self.loop.run_until_complete(
Expand Down Expand Up @@ -2009,10 +2004,6 @@ def test_writer_callback(self):
def test_writer_callback_cancel(self):
raise unittest.SkipTest("IocpEventLoop does not have add_writer()")

def test_create_datagram_endpoint(self):
raise unittest.SkipTest(
"IocpEventLoop does not have create_datagram_endpoint()")

def test_remove_fds_after_closing(self):
raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
else:
Expand Down
75 changes: 75 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from asyncio.proactor_events import _ProactorSocketTransport
from asyncio.proactor_events import _ProactorWritePipeTransport
from asyncio.proactor_events import _ProactorDuplexPipeTransport
from asyncio.proactor_events import _ProactorDatagramTransport
from test import support
from test.test_asyncio import utils as test_utils

Expand Down Expand Up @@ -864,6 +865,80 @@ def test_stop_serving(self):
self.assertFalse(sock2.close.called)
self.assertFalse(future2.cancel.called)

def datagram_transport(self):
self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
return self.loop._make_datagram_transport(self.sock, self.protocol)

def test_make_datagram_transport(self):
tr = self.datagram_transport()
self.assertIsInstance(tr, _ProactorDatagramTransport)
close_transport(tr)

def test_datagram_loop_writing(self):
tr = self.datagram_transport()
tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
tr._loop_writing()
self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
self.loop._proactor.sendto.return_value.add_done_callback.\
assert_called_with(tr._loop_writing)

close_transport(tr)

def test_datagram_loop_reading(self):
tr = self.datagram_transport()
tr._loop_reading()
self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096)
self.assertFalse(self.protocol.datagram_received.called)
self.assertFalse(self.protocol.error_received.called)
close_transport(tr)

def test_datagram_loop_reading_data(self):
res = asyncio.Future(loop=self.loop)
res.set_result((b'data', ('127.0.0.1', 12068)))

tr = self.datagram_transport()
tr._read_fut = res
tr._loop_reading(res)
self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096)
self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
close_transport(tr)

def test_datagram_loop_reading_no_data(self):
res = asyncio.Future(loop=self.loop)
res.set_result((b'', ('127.0.0.1', 12068)))

tr = self.datagram_transport()
self.assertRaises(AssertionError, tr._loop_reading, res)

tr.close = mock.Mock()
tr._read_fut = res
tr._loop_reading(res)
self.assertTrue(self.loop._proactor.recvfrom.called)
self.assertFalse(self.protocol.error_received.called)
self.assertFalse(tr.close.called)
close_transport(tr)

def test_datagram_loop_reading_aborted(self):
err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()

tr = self.datagram_transport()
tr._fatal_error = mock.Mock()
tr._protocol.error_received = mock.Mock()
tr._loop_reading()
tr._protocol.error_received.assert_called_with(err)
close_transport(tr)

def test_datagram_loop_writing_aborted(self):
err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()

tr = self.datagram_transport()
tr._fatal_error = mock.Mock()
tr._protocol.error_received = mock.Mock()
tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
tr._loop_writing()
tr._protocol.error_received.assert_called_with(err)
close_transport(tr)


@unittest.skipIf(sys.platform != 'win32',
'Proactor is supported on Windows only')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add Windows support for UDP transports for the Proactor Event Loop. Patch by
Adam Meily.
Loading