Skip to content

Commit 9fc145b

Browse files
committed
bpo-29883: Add UDP to Windows Proactor Event Loop
1 parent e796b2f commit 9fc145b

File tree

5 files changed

+483
-5
lines changed

5 files changed

+483
-5
lines changed

Doc/library/asyncio-eventloops.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ Common limits of Windows event loops:
109109

110110
:class:`ProactorEventLoop` specific limits:
111111

112-
- :meth:`~AbstractEventLoop.create_datagram_endpoint` (UDP) is not supported
113112
- :meth:`~AbstractEventLoop.add_reader` and :meth:`~AbstractEventLoop.add_writer` are
114113
not supported
115114

Lib/asyncio/proactor_events.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import socket
1010
import warnings
11+
import collections
1112

1213
from . import base_events
1314
from . import constants
@@ -327,6 +328,102 @@ def _pipe_closed(self, fut):
327328
self.close()
328329

329330

331+
class _ProactorDatagramTransport(_ProactorBasePipeTransport):
332+
333+
def __init__(self, loop, sock, protocol, address=None,
334+
waiter=None, extra=None):
335+
super(_ProactorDatagramTransport, self).__init__(loop, sock, protocol,
336+
waiter=waiter,
337+
extra=extra)
338+
self._address = address
339+
# We don't need to call _protocol.connection_made() since our base
340+
# constructor does it for us.
341+
self._buffer = collections.deque()
342+
self._loop.call_soon(self._loop_reading)
343+
344+
def abort(self):
345+
self._force_close(None)
346+
347+
def sendto(self, data, addr=None):
348+
if not isinstance(data, (bytes, bytearray, memoryview)):
349+
raise TypeError('data argument must be byte-ish (%r)',
350+
type(data))
351+
352+
if not data:
353+
return
354+
355+
if self._conn_lost and self._address:
356+
# close() or force_close() has been called on the bound endpoint
357+
return
358+
359+
self._buffer.appendleft((data, addr))
360+
361+
if self._write_fut is None:
362+
# No current write operations are active, kick one off
363+
self._loop_writing()
364+
else:
365+
# A write operation is already kicked off
366+
pass
367+
368+
def _loop_writing(self, fut=None):
369+
if self._conn_lost:
370+
return
371+
372+
assert fut is self._write_fut
373+
if fut:
374+
# We are in a _loop_writing() done callback, get the result
375+
fut.result()
376+
377+
if not self._buffer or (self._conn_lost and self._address):
378+
# The connection has been closed
379+
self._write_fut = None
380+
return
381+
382+
data, addr = self._buffer.pop()
383+
384+
self._write_fut = None
385+
try:
386+
if self._address:
387+
self._write_fut = self._loop._proactor.send(self._sock, data)
388+
else:
389+
self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr)
390+
except OSError as exc:
391+
self._protocol.error_received(exc)
392+
self._fatal_error(exc, 'Fatal error sending UDP datagram')
393+
else:
394+
self._write_fut.add_done_callback(self._loop_writing)
395+
396+
def _loop_reading(self, fut=None):
397+
if self._conn_lost:
398+
return
399+
400+
assert self._read_fut is fut
401+
402+
if fut:
403+
res = fut.result()
404+
405+
if self._address:
406+
data, addr = res, self._address
407+
else:
408+
data, addr = res
409+
410+
self._protocol.datagram_received(data, addr)
411+
412+
if self._conn_lost:
413+
return
414+
415+
try:
416+
if self._address:
417+
self._read_fut = self._loop._proactor.recv(self._sock, 4096)
418+
else:
419+
self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096)
420+
except OSError as exc:
421+
self._protocol.error_received(exc)
422+
self._fatal_error(exc, "Fatal error reading from UDP endpoint")
423+
else:
424+
self._read_fut.add_done_callback(self._loop_reading)
425+
426+
330427
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
331428
_ProactorBaseWritePipeTransport,
332429
transports.Transport):
@@ -390,6 +487,11 @@ def _make_socket_transport(self, sock, protocol, waiter=None,
390487
return _ProactorSocketTransport(self, sock, protocol, waiter,
391488
extra, server)
392489

490+
def _make_datagram_transport(self, sock, protocol,
491+
address=None, waiter=None, extra=None):
492+
return _ProactorDatagramTransport(self, sock, protocol, address,
493+
waiter, extra)
494+
393495
def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
394496
*, server_side=False, server_hostname=None,
395497
extra=None, server=None):

Lib/asyncio/windows_events.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,14 @@ def recv_into(self, conn, buf, flags=0):
443443
except BrokenPipeError:
444444
return self._result(b'')
445445

446+
def recvfrom(self, conn, nbytes, flags=0):
447+
self._register_with_iocp(conn)
448+
ov = _overlapped.Overlapped(NULL)
449+
try:
450+
ov.WSARecvFrom(conn.fileno(), nbytes, flags)
451+
except BrokenPipeError:
452+
return self._result((b'', (None, None)))
453+
446454
def finish_recv(trans, key, ov):
447455
try:
448456
return ov.getresult()
@@ -454,6 +462,23 @@ def finish_recv(trans, key, ov):
454462

455463
return self._register(ov, conn, finish_recv)
456464

465+
def sendto(self, conn, buf, flags=0, addr=None):
466+
self._register_with_iocp(conn)
467+
ov = _overlapped.Overlapped(NULL)
468+
469+
ov.WSASendTo(conn.fileno(), buf, flags, addr)
470+
471+
def finish_send(trans, key, ov):
472+
try:
473+
return ov.getresult()
474+
except OSError as exc:
475+
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
476+
raise ConnectionResetError(*exc.args)
477+
else:
478+
raise
479+
480+
return self._register(ov, conn, finish_send)
481+
457482
def send(self, conn, buf, flags=0):
458483
self._register_with_iocp(conn)
459484
ov = _overlapped.Overlapped(NULL)
@@ -502,6 +527,15 @@ async def accept_coro(future, conn):
502527
return future
503528

504529
def connect(self, conn, address):
530+
if conn.type == socket.SOCK_DGRAM:
531+
# WSAConnect will complete immediately for UDP sockets so we don't
532+
# need to register any IOCP operation
533+
_overlapped.WSAConnect(conn.fileno(), address)
534+
fut = self._loop.create_future()
535+
536+
fut.set_result(None)
537+
return fut
538+
505539
self._register_with_iocp(conn)
506540
# The socket needs to be locally bound before we call ConnectEx().
507541
try:

Lib/test/test_asyncio/test_proactor_events.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from asyncio.proactor_events import _ProactorSocketTransport
1010
from asyncio.proactor_events import _ProactorWritePipeTransport
1111
from asyncio.proactor_events import _ProactorDuplexPipeTransport
12+
from asyncio.proactor_events import _ProactorDatagramTransport
1213
from test.test_asyncio import utils as test_utils
1314

1415

@@ -562,6 +563,81 @@ def test_stop_serving(self):
562563
self.assertTrue(sock.close.called)
563564
self.proactor._stop_serving.assert_called_with(sock)
564565

566+
def datagram_transport(self):
567+
self.protocol = make_test_protocol(asyncio.DatagramProtocol)
568+
return self.loop._make_datagram_transport(self.sock, self.protocol)
569+
570+
def test_make_datagram_transport(self):
571+
tr = self.datagram_transport()
572+
self.assertIsInstance(tr, _ProactorDatagramTransport)
573+
close_transport(tr)
574+
575+
def test_datagram_loop_writing(self):
576+
tr = self.datagram_transport()
577+
tr._buffer.appendleft((b'data', ('127.0.0.1', 12068)))
578+
tr._loop_writing()
579+
self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068))
580+
self.loop._proactor.sendto.return_value.add_done_callback.\
581+
assert_called_with(tr._loop_writing)
582+
583+
close_transport(tr)
584+
585+
def test_datagram_loop_reading(self):
586+
tr = self.datagram_transport()
587+
tr._loop_reading()
588+
self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096)
589+
self.assertFalse(self.protocol.datagram_received.called)
590+
self.assertFalse(self.protocol.error_received.called)
591+
592+
def test_datagram_loop_reading_data(self):
593+
res = asyncio.Future(loop=self.loop)
594+
res.set_result((b'data', ('127.0.0.1', 12068)))
595+
596+
tr = self.datagram_transport()
597+
tr._read_fut = res
598+
tr._loop_reading(res)
599+
self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096)
600+
self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068))
601+
602+
def test_datagram_loop_reading_no_data(self):
603+
res = asyncio.Future(loop=self.loop)
604+
res.set_result((b'', ('127.0.0.1', 12068)))
605+
606+
tr = self.datagram_transport()
607+
self.assertRaises(AssertionError, tr._loop_reading, res)
608+
609+
tr.close = mock.Mock()
610+
tr._read_fut = res
611+
tr._loop_reading(res)
612+
self.assertFalse(self.loop._proactor.recvfrom.called)
613+
self.assertFalse(self.protocol.error_received.called)
614+
self.assertFalse(tr.close.called)
615+
616+
def test_datagram_loop_reading_aborted(self):
617+
err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError()
618+
619+
tr = self.datagram_transport()
620+
tr._fatal_error = mock.Mock()
621+
tr._protocol.error_received = mock.Mock()
622+
tr._loop_reading()
623+
tr._protocol.error_received.assert_called_with(err)
624+
tr._fatal_error.assert_called_with(
625+
err,
626+
'Fatal error reading from UDP endpoint')
627+
628+
def test_datagram_loop_writing_aborted(self):
629+
err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError()
630+
631+
tr = self.datagram_transport()
632+
tr._fatal_error = mock.Mock()
633+
tr._protocol.error_received = mock.Mock()
634+
tr._protocol.error_received.assert_called_with(err)
635+
tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068)))
636+
tr._loop_writing()
637+
tr._fatal_error.assert_called_with(
638+
err,
639+
'Fatal error sending UDP datagram')
640+
565641

566642
if __name__ == '__main__':
567643
unittest.main()

0 commit comments

Comments
 (0)