-
-
Notifications
You must be signed in to change notification settings - Fork 32.3k
gh-109709: Fix asyncio test_stdin_broken_pipe() #109710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import os | ||
import signal | ||
import sys | ||
import textwrap | ||
import unittest | ||
import warnings | ||
from unittest import mock | ||
|
@@ -12,9 +13,14 @@ | |
from test import support | ||
from test.support import os_helper | ||
|
||
if sys.platform != 'win32': | ||
|
||
MS_WINDOWS = (sys.platform == 'win32') | ||
if MS_WINDOWS: | ||
import msvcrt | ||
else: | ||
from asyncio import unix_events | ||
|
||
|
||
if support.check_sanitizer(address=True): | ||
raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI") | ||
|
||
|
@@ -270,26 +276,43 @@ async def send_signal(proc): | |
finally: | ||
signal.signal(signal.SIGHUP, old_handler) | ||
|
||
def prepare_broken_pipe_test(self): | ||
def test_stdin_broken_pipe(self): | ||
# buffer large enough to feed the whole pipe buffer | ||
large_data = b'x' * support.PIPE_MAX_SIZE | ||
|
||
rfd, wfd = os.pipe() | ||
self.addCleanup(os.close, rfd) | ||
self.addCleanup(os.close, wfd) | ||
if MS_WINDOWS: | ||
handle = msvcrt.get_osfhandle(rfd) | ||
os.set_handle_inheritable(handle, True) | ||
code = textwrap.dedent(f''' | ||
import os, msvcrt | ||
handle = {handle} | ||
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) | ||
os.read(fd, 1) | ||
''') | ||
from subprocess import STARTUPINFO | ||
startupinfo = STARTUPINFO() | ||
startupinfo.lpAttributeList = {"handle_list": [handle]} | ||
kwargs = dict(startupinfo=startupinfo) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On Windows, passing pipes as stdin, stdout and stderr is well supported. Passing an additional pipe is not supported by msvcrt (CreateProcess). Passing a handle is possible, but it requires to convert FD to handle and then handle to FD. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could the process be synchronized in a way that requires less boilerplate for Windows? Is just calling terminate() not enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like using a pipe as a sync primitive, but I'm unhappy by the quantity of code needed for that :-( In practice, what is needed is that the child process hangs until the parent decides to terminate it in a clean fashion. |
||
else: | ||
code = f'import os; fd = {rfd}; os.read(fd, 1)' | ||
kwargs = dict(pass_fds=(rfd,)) | ||
|
||
# the program ends before the stdin can be fed | ||
proc = self.loop.run_until_complete( | ||
asyncio.create_subprocess_exec( | ||
sys.executable, '-c', 'pass', | ||
sys.executable, '-c', code, | ||
stdin=subprocess.PIPE, | ||
**kwargs | ||
) | ||
) | ||
|
||
return (proc, large_data) | ||
|
||
def test_stdin_broken_pipe(self): | ||
proc, large_data = self.prepare_broken_pipe_test() | ||
|
||
async def write_stdin(proc, data): | ||
await asyncio.sleep(0.5) | ||
proc.stdin.write(data) | ||
# Only exit the child process once the write buffer is filled | ||
os.write(wfd, b'go') | ||
await proc.stdin.drain() | ||
|
||
coro = write_stdin(proc, large_data) | ||
|
@@ -300,7 +323,16 @@ async def write_stdin(proc, data): | |
self.loop.run_until_complete(proc.wait()) | ||
|
||
def test_communicate_ignore_broken_pipe(self): | ||
proc, large_data = self.prepare_broken_pipe_test() | ||
# buffer large enough to feed the whole pipe buffer | ||
large_data = b'x' * support.PIPE_MAX_SIZE | ||
|
||
# the program ends before the stdin can be fed | ||
proc = self.loop.run_until_complete( | ||
asyncio.create_subprocess_exec( | ||
sys.executable, '-c', 'pass', | ||
stdin=subprocess.PIPE, | ||
) | ||
) | ||
|
||
# communicate() must ignore BrokenPipeError when feeding stdin | ||
self.loop.set_exception_handler(lambda loop, msg: None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if I understand correctly this change is here because write() could raise BrokenPipeError when registering a writer. Is this expected behavior? _UnixWritePipeTransport.write() tries hard not to raise if the os.write() call fails.
Could it be that this was not noticed on Linux because the error is raised only by the kqueue selector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't keep the traceback. I got a BrokenPipeError on FreeBSD, yeah, it was somewhere in the kqueue selector. You can revert my change, and stress-test the test as I described in the issue / PR, to easily trigger the bug.
Maybe on Windows and Linux, write() cannot trigger these exceptions, but write() does on FreeBSD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple snippet to trigger the exception:
Tested on modern releases of Linux, macOS, FreeBSD, and I only see it raise on FreeBSD. See discussion here for some details: tokio-rs/mio#582 (older versions of macOS used to fail in the same case, and NetBSD/OpenBSD report different errors).
I think this should be considered a bug. _UnixWritePipeTransport.write() should not be expected to raise, otherwise exception handling code would have to be sprinkled all over user code.
In the linked issues (both Tokio and libevent) it's solved at an abstraction level similar to the selectors module in Python. If we agree it could be considered a bug, it's either a selectors bug, or an asyncio bug.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, registering a closed FD is bad: don't do that. It wasn't the issue that I got (I hope).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think that there is a bug, please open a new issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a closed FD! Only the other end of the pipe is closed. The write fd is still valid. On Linux/Mac you will always get an event, so you get the error when you try to write. On other BSDs you get the error preemptively, when registering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry i was confused between rfd and wfd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a new issue here #109757. I'd be glad to work on it.