Skip to content

bpo-32410: Implement loop.sock_sendfile() #4976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
ee13d5b
Draft for sock_sendfile
asvetlov Dec 22, 2017
8b610ad
Add test for blocked socket
asvetlov Dec 22, 2017
8d658be
Polish tests
asvetlov Dec 22, 2017
24c5a28
Test partial file content
asvetlov Dec 22, 2017
10b0b61
Add NEWS entry
asvetlov Dec 22, 2017
5f782a5
Add test for abstract sock_sendfile
asvetlov Dec 22, 2017
3068aa9
Polishment
asvetlov Dec 22, 2017
898766e
Don't call loop.set_debug(True)
asvetlov Dec 22, 2017
6e70a62
Revert set_debug() back
asvetlov Dec 22, 2017
20b3778
Work on tests
asvetlov Dec 22, 2017
7ed0e67
Improve test cleanup
asvetlov Dec 22, 2017
f4b61b1
Make tests stable
asvetlov Dec 23, 2017
6f6b94b
Merge remote-tracking branch 'upstream/master' into sock_sendfile
asvetlov Dec 24, 2017
1c05795
Refactor _check_sendfile_params helper
asvetlov Dec 24, 2017
f670fad
Use NotImplementedError in private socket.sendfile implementation.
asvetlov Dec 24, 2017
26f6d4a
Refactoring
asvetlov Dec 24, 2017
87d4804
Polish error text
asvetlov Dec 24, 2017
76eeef5
Update docs
asvetlov Dec 24, 2017
35071ea
Fix check for SOCK_STREAM
asvetlov Dec 24, 2017
92ae10b
Accept int fd along with socket instance
asvetlov Dec 24, 2017
8c451d2
Drop support for int FD for socket
asvetlov Dec 24, 2017
921fe69
NotImplementedError -> RuntimeError
asvetlov Dec 24, 2017
0f2a48f
Switch to RuntimeError back
asvetlov Dec 24, 2017
1cc0e8f
Merge branch 'master' into sock_sendfile
asvetlov Dec 30, 2017
272029e
Add sendfile fallback
asvetlov Dec 30, 2017
fa0954e
Fix private names
asvetlov Dec 30, 2017
0ddb410
Another renaming
asvetlov Dec 30, 2017
c18c3c8
Work on
asvetlov Dec 30, 2017
a4a174b
Fix NEWS.d
asvetlov Dec 30, 2017
448e949
More tests
asvetlov Dec 30, 2017
46c92ed
Polish docs
asvetlov Dec 31, 2017
8dd45dc
Revert changes in socket.py
asvetlov Dec 31, 2017
c9112b9
Merge branch 'master' into sock_sendfile
asvetlov Dec 31, 2017
b6273e4
More tests
asvetlov Dec 31, 2017
4d88063
Tests
asvetlov Dec 31, 2017
71b9f93
Test partial file with fallback
asvetlov Dec 31, 2017
db2445e
Improve test coverage
asvetlov Dec 31, 2017
46a6b46
Switch to custom exception type
asvetlov Dec 31, 2017
2ec48f8
read -> readinto
asvetlov Dec 31, 2017
8a6ed3f
Make tests more stable
asvetlov Dec 31, 2017
44da800
Merge remote-tracking branch 'upstream/master' into sock_sendfile
asvetlov Jan 1, 2018
967408e
Change base class for _SendfileNotAvailable to RuntimeError
asvetlov Jan 2, 2018
099dc56
Better exception type when sendfile is not available
asvetlov Jan 2, 2018
f9701cb
Add a test for mixed sock_send and sock_sendfile
asvetlov Jan 2, 2018
a30acc9
Add cancellation callback
asvetlov Jan 2, 2018
f7d9bab
More tests
asvetlov Jan 2, 2018
4d25927
Support tribool for fallback
asvetlov Jan 2, 2018
e303db9
Fix signature of abstract sock_sendfile
asvetlov Jan 2, 2018
84e1057
Merge branch 'master' into sock_sendfile
asvetlov Jan 2, 2018
657aa67
Revert back tribool for fallback
asvetlov Jan 16, 2018
dd4143a
Fix tests
asvetlov Jan 16, 2018
96d0032
Merge branch 'master' into sock_sendfile
asvetlov Jan 16, 2018
5deb0e2
Add a space
asvetlov Jan 16, 2018
3c9abaf
Fix sock_sendfile callback
asvetlov Jan 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,36 @@ Low-level socket operations

:meth:`AbstractEventLoop.create_server` and :func:`start_server`.

.. coroutinemethod:: AbstractEventLoop.sock_sendfile(sock, file, \
offset=0, count=None, \
*, fallback=True)

Send a file using high-performance :mod:`os.sendfile` if possible
and return the total number of bytes which were sent.

Asynchronous version of :meth:`socket.socket.sendfile`.

*sock* must be non-blocking :class:`~socket.socket` of
:const:`socket.SOCK_STREAM` type.

*file* must be a regular file object opened in binary mode.

*offset* tells from where to start reading the file. If specified,
*count* is the total number of bytes to transmit as opposed to
sending the file until EOF is reached. File position is updated on
return or also in case of error in which case :meth:`file.tell()
<io.IOBase.tell>` can be used to figure out the number of bytes
which were sent.

*fallback* set to ``True`` makes asyncio to manually read and send
the file when the platform does not support the sendfile syscall
(e.g. Windows or SSL socket on Unix).

Raise :exc:`RuntimeError` if the system does not support
*sendfile* syscall and *fallback* is ``False``.

.. versionadded:: 3.7


Resolve host name
-----------------
Expand Down
70 changes: 70 additions & 0 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ def _run_until_complete_cb(fut):
futures._get_loop(fut).stop()


class _SendfileNotAvailable(RuntimeError):
pass


class Server(events.AbstractServer):

def __init__(self, loop, sockets):
Expand Down Expand Up @@ -647,6 +651,72 @@ async def getnameinfo(self, sockaddr, flags=0):
return await self.run_in_executor(
None, socket.getnameinfo, sockaddr, flags)

async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=True):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
self._check_sendfile_params(sock, file, offset, count)
try:
return await self._sock_sendfile_native(sock, file,
offset, count)
except _SendfileNotAvailable as exc:
if fallback:
return await self._sock_sendfile_fallback(sock, file,
offset, count)
else:
raise RuntimeError(exc.args[0]) from None

async def _sock_sendfile_native(self, sock, file, offset, count):
# NB: sendfile syscall is not supported for SSL sockets and
# non-mmap files even if sendfile is supported by OS
raise _SendfileNotAvailable(
f"syscall sendfile is not available for socket {sock!r} "
"and file {file!r} combination")

async def _sock_sendfile_fallback(self, sock, file, offset, count):
if offset:
file.seek(offset)
blocksize = min(count, 16384) if count else 16384
buf = bytearray(blocksize)
total_sent = 0
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
view = memoryview(buf)[:blocksize]
read = file.readinto(view)
if not read:
break # EOF
await self.sock_sendall(sock, view)
total_sent += read
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)

def _check_sendfile_params(self, sock, file, offset, count):
if 'b' not in getattr(file, 'mode', 'b'):
raise ValueError("file should be opened in binary mode")
if not sock.type == socket.SOCK_STREAM:
raise ValueError("only SOCK_STREAM type sockets are supported")
if count is not None:
if not isinstance(count, int):
raise TypeError(
"count must be a positive integer (got {!r})".format(count))
if count <= 0:
raise ValueError(
"count must be a positive integer (got {!r})".format(count))
if not isinstance(offset, int):
raise TypeError(
"offset must be a non-negative integer (got {!r})".format(
offset))
if offset < 0:
raise ValueError(
"offset must be a non-negative integer (got {!r})".format(
offset))

async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
Expand Down
4 changes: 4 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ async def sock_connect(self, sock, address):
async def sock_accept(self, sock):
raise NotImplementedError

async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=None):
raise NotImplementedError

# Signal handling.

def add_signal_handler(self, sig, callback, *args):
Expand Down
93 changes: 93 additions & 0 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Selector event loop for Unix with signal handling."""

import errno
import io
import os
import selectors
import signal
Expand Down Expand Up @@ -308,6 +309,98 @@ async def create_unix_server(
ssl_handshake_timeout=ssl_handshake_timeout)
return server

async def _sock_sendfile_native(self, sock, file, offset, count):
try:
os.sendfile
except AttributeError as exc:
raise base_events._SendfileNotAvailable(
"os.sendfile() is not available")
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
raise base_events._SendfileNotAvailable("not a regular file")
try:
fsize = os.fstat(fileno).st_size
except OSError as err:
raise base_events._SendfileNotAvailable("not a regular file")
blocksize = count if count else fsize
if not blocksize:
return 0 # empty file

fut = self.create_future()
self._sock_sendfile_native_impl(fut, None, sock, fileno,
offset, count, blocksize, 0)
return await fut

def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
offset, count, blocksize, total_sent):
fd = sock.fileno()
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_writer(registered_fd)
if fut.cancelled():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You copied this check from other loop.sock_* methods, but in fact it's incorrect. It will only handle a case when a Future was cancelled right away.

The correct way of doing this (and that's btw what I'm doing in uvloop and planned to fix in asyncio) is to register a special callback on fut. That callback should check if the future is cancelled, and if it is, cancel the sendfile operation (remove writer).

Otherwise, we can't cancel sendfile while fd is in selector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this check?

self._sock_sendfile_update_filepos(fileno, offset, total_sent)
return
if count:
blocksize = count - total_sent
if blocksize <= 0:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_result(total_sent)
return

try:
sent = os.sendfile(fd, fileno, offset, blocksize)
except (BlockingIOError, InterruptedError):
if registered_fd is None:
self._sock_add_cancellation_callback(fut, sock)
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
fd, sock, fileno,
offset, count, blocksize, total_sent)
except OSError as exc:
if total_sent == 0:
# We can get here for different reasons, the main
# one being 'file' is not a regular mmap(2)-like
# file, in which case we'll fall back on using
# plain send().
err = base_events._SendfileNotAvailable(
"os.sendfile call failed")
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(err)
else:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
except Exception as exc:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
else:
if sent == 0:
# EOF
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_result(total_sent)
else:
offset += sent
total_sent += sent
if registered_fd is None:
self._sock_add_cancellation_callback(fut, sock)
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
fd, sock, fileno,
offset, count, blocksize, total_sent)

def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
if total_sent > 0:
os.lseek(fileno, offset, os.SEEK_SET)

def _sock_add_cancellation_callback(self, fut, sock):
def cb(fut):
if fut.cancelled():
fd = sock.fileno()
if fd != -1:
self.remove_writer(fd)
fut.add_done_callback(cb)


class _UnixReadPipeTransport(transports.ReadTransport):

Expand Down
Loading