Skip to content

gh-109709: Fix asyncio test_stdin_broken_pipe() #109710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Lib/asyncio/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,17 @@ def kill(self):

async def _feed_stdin(self, input):
debug = self._loop.get_debug()
if input is not None:
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))
try:
if input is not None:
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))

await self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
# communicate() ignores BrokenPipeError and ConnectionResetError
# communicate() ignores BrokenPipeError and ConnectionResetError.
# write() and drain() can raise these exceptions.
Comment on lines +151 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if I understand correctly this change is here because write() could raise BrokenPipeError when registering a writer. Is this expected behavior? _UnixWritePipeTransport.write() tries hard not to raise if the os.write() call fails.

Could it be that this was not noticed on Linux because the error is raised only by the kqueue selector?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't keep the traceback. I got a BrokenPipeError on FreeBSD, yeah, it was somewhere in the kqueue selector. You can revert my change, and stress-test the test as I described in the issue / PR, to easily trigger the bug.

Could it be that this was not noticed on Linux because the error is raised only by the kqueue selector?

Maybe on Windows and Linux, write() cannot trigger these exceptions, but write() does on FreeBSD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple snippet to trigger the exception:

import os, selectors
sel = selectors.DefaultSelector()
rfd, wfd = os.pipe()
os.close(rfd)
sel.register(wfd, selectors.EVENT_WRITE)

Tested on modern releases of Linux, macOS, FreeBSD, and I only see it raise on FreeBSD. See discussion here for some details: tokio-rs/mio#582 (older versions of macOS used to fail in the same case, and NetBSD/OpenBSD report different errors).

I think this should be considered a bug. _UnixWritePipeTransport.write() should not be expected to raise, otherwise exception handling code would have to be sprinkled all over user code.

In the linked issues (both Tokio and libevent) it's solved at an abstraction level similar to the selectors module in Python. If we agree it could be considered a bug, it's either a selectors bug, or an asyncio bug.

Copy link
Member Author

@vstinner vstinner Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, registering a closed FD is bad: don't do that. It wasn't the issue that I got (I hope).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be considered a bug. _UnixWritePipeTransport.write() should not be expected to raise, otherwise exception handling code would have to be sprinkled all over user code.

If you think that there is a bug, please open a new issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a closed FD! Only the other end of the pipe is closed. The write fd is still valid. On Linux/Mac you will always get an event, so you get the error when you try to write. On other BSDs you get the error preemptively, when registering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry i was confused between rfd and wfd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a new issue here #109757. I'd be glad to work on it.

if debug:
logger.debug('%r communicate: stdin got %r', self, exc)

Expand Down
52 changes: 42 additions & 10 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import signal
import sys
import textwrap
import unittest
import warnings
from unittest import mock
Expand All @@ -12,9 +13,14 @@
from test import support
from test.support import os_helper

if sys.platform != 'win32':

MS_WINDOWS = (sys.platform == 'win32')
if MS_WINDOWS:
import msvcrt
else:
from asyncio import unix_events


if support.check_sanitizer(address=True):
raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")

Expand Down Expand Up @@ -270,26 +276,43 @@ async def send_signal(proc):
finally:
signal.signal(signal.SIGHUP, old_handler)

def prepare_broken_pipe_test(self):
def test_stdin_broken_pipe(self):
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
self.addCleanup(os.close, wfd)
if MS_WINDOWS:
handle = msvcrt.get_osfhandle(rfd)
os.set_handle_inheritable(handle, True)
code = textwrap.dedent(f'''
import os, msvcrt
handle = {handle}
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
os.read(fd, 1)
''')
from subprocess import STARTUPINFO
startupinfo = STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [handle]}
kwargs = dict(startupinfo=startupinfo)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Windows, passing pipes as stdin, stdout and stderr is well supported. Passing an additional pipe is not supported by msvcrt (CreateProcess). Passing a handle is possible, but it requires to convert FD to handle and then handle to FD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the process be synchronized in a way that requires less boilerplate for Windows? Is just calling terminate() not enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like using a pipe as a sync primitive, but I'm unhappy by the quantity of code needed for that :-(

In practice, what is needed is that the child process hangs until the parent decides to terminate it in a clean fashion.

else:
code = f'import os; fd = {rfd}; os.read(fd, 1)'
kwargs = dict(pass_fds=(rfd,))

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
sys.executable, '-c', code,
stdin=subprocess.PIPE,
**kwargs
)
)

return (proc, large_data)

def test_stdin_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()

async def write_stdin(proc, data):
await asyncio.sleep(0.5)
proc.stdin.write(data)
# Only exit the child process once the write buffer is filled
os.write(wfd, b'go')
await proc.stdin.drain()

coro = write_stdin(proc, large_data)
Expand All @@ -300,7 +323,16 @@ async def write_stdin(proc, data):
self.loop.run_until_complete(proc.wait())

def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
stdin=subprocess.PIPE,
)
)

# communicate() must ignore BrokenPipeError when feeding stdin
self.loop.set_exception_handler(lambda loop, msg: None)
Expand Down