Skip to content

bpo-38456: Use /bin/true in test_subprocess #16736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 59 additions & 49 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@
# Ignore errors that indicate the command was not found
NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)

ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')


def setUpModule():
shell_true = shutil.which('true')
if (os.access(shell_true, os.X_OK) and
subprocess.run([shell_true]).returncode == 0):
global ZERO_RETURN_CMD
ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup.


class BaseTestCase(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -98,7 +108,7 @@ def _execute_child(self, *args, **kwargs):
class ProcessTestCase(BaseTestCase):

def test_io_buffered_by_default(self):
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
Expand All @@ -112,7 +122,7 @@ def test_io_buffered_by_default(self):
p.wait()

def test_io_unbuffered_works(self):
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=0)
try:
Expand Down Expand Up @@ -142,8 +152,7 @@ def test_call_timeout(self):

def test_check_call_zero(self):
# check_call() function with zero return code
rc = subprocess.check_call([sys.executable, "-c",
"import sys; sys.exit(0)"])
rc = subprocess.check_call(ZERO_RETURN_CMD)
self.assertEqual(rc, 0)

def test_check_call_nonzero(self):
Expand Down Expand Up @@ -709,19 +718,19 @@ def test_invalid_env(self):
newenv = os.environ.copy()
newenv["FRUIT\0VEGETABLE"] = "cabbage"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)

# null character in the environment variable value
newenv = os.environ.copy()
newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)

# equal character in the environment variable name
newenv = os.environ.copy()
newenv["FRUIT=ORANGE"] = "lemon"
with self.assertRaises(ValueError):
subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
subprocess.Popen(ZERO_RETURN_CMD, env=newenv)

# equal character in the environment variable value
newenv = os.environ.copy()
Expand Down Expand Up @@ -822,7 +831,7 @@ def test_communicate_pipe_fd_leak(self):
options['stderr'] = subprocess.PIPE
if not options:
continue
p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
p = subprocess.Popen(ZERO_RETURN_CMD, **options)
p.communicate()
if p.stdin is not None:
self.assertTrue(p.stdin.closed)
Expand Down Expand Up @@ -961,7 +970,7 @@ def test_universal_newlines_communicate_input_none(self):
#
# We set stdout to PIPE because, as of this writing, a different
# code path is tested when the number of pipes is zero or one.
p = subprocess.Popen([sys.executable, "-c", "pass"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True)
Expand Down Expand Up @@ -1109,7 +1118,7 @@ def test_poll(self):
self.assertEqual(p.poll(), 0)

def test_wait(self):
p = subprocess.Popen([sys.executable, "-c", "pass"])
p = subprocess.Popen(ZERO_RETURN_CMD)
self.assertEqual(p.wait(), 0)
# Subsequent invocations should just return the returncode
self.assertEqual(p.wait(), 0)
Expand All @@ -1128,14 +1137,14 @@ def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
# TypeError.
with self.assertRaises(TypeError):
subprocess.Popen([sys.executable, "-c", "pass"], "orange")
subprocess.Popen(ZERO_RETURN_CMD, "orange")

def test_bufsize_is_none(self):
# bufsize=None should be the same as bufsize=0.
p = subprocess.Popen([sys.executable, "-c", "pass"], None)
p = subprocess.Popen(ZERO_RETURN_CMD, None)
self.assertEqual(p.wait(), 0)
# Again with keyword arg
p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
self.assertEqual(p.wait(), 0)

def _test_bufsize_equal_one(self, line, expected, universal_newlines):
Expand Down Expand Up @@ -1340,7 +1349,7 @@ def test_handles_closed_on_exception(self):

def test_communicate_epipe(self):
# Issue 10963: communicate() should hide EPIPE
p = subprocess.Popen([sys.executable, "-c", 'pass'],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
Expand All @@ -1351,7 +1360,7 @@ def test_communicate_epipe(self):

def test_communicate_epipe_only_stdin(self):
# Issue 10963: communicate() should hide EPIPE
p = subprocess.Popen([sys.executable, "-c", 'pass'],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE)
self.addCleanup(p.stdin.close)
p.wait()
Expand Down Expand Up @@ -1390,7 +1399,7 @@ def test_failed_child_execute_fd_leak(self):
fds_before_popen = os.listdir(fd_directory)
with self.assertRaises(PopenTestException):
PopenExecuteChildRaises(
[sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
ZERO_RETURN_CMD, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# NOTE: This test doesn't verify that the real _execute_child
Expand Down Expand Up @@ -1433,7 +1442,7 @@ def test_check(self):

def test_check_zero(self):
# check_returncode shouldn't raise when returncode is zero
cp = self.run_python("import sys; sys.exit(0)", check=True)
cp = subprocess.run(ZERO_RETURN_CMD, check=True)
self.assertEqual(cp.returncode, 0)

def test_timeout(self):
Expand Down Expand Up @@ -1791,16 +1800,16 @@ def test_user(self):
self.assertEqual(child_user, user_uid)

with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], user=-1)
subprocess.check_call(ZERO_RETURN_CMD, user=-1)

if pwd is None:
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], user=name_uid)
subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)

@unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
def test_user_error(self):
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], user=65535)
subprocess.check_call(ZERO_RETURN_CMD, user=65535)

@unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
def test_group(self):
Expand Down Expand Up @@ -1834,16 +1843,16 @@ def test_group(self):

# make sure we bomb on negative values
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], group=-1)
subprocess.check_call(ZERO_RETURN_CMD, group=-1)

if grp is None:
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], group=name_group)
subprocess.check_call(ZERO_RETURN_CMD, group=name_group)

@unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
def test_group_error(self):
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], group=65535)
subprocess.check_call(ZERO_RETURN_CMD, group=65535)

@unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
def test_extra_groups(self):
Expand Down Expand Up @@ -1882,17 +1891,17 @@ def test_extra_groups(self):

# make sure we bomb on negative values
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[-1])
subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])

if grp is None:
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"],
subprocess.check_call(ZERO_RETURN_CMD,
extra_groups=[name_group])

@unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
def test_extra_groups_error(self):
with self.assertRaises(ValueError):
subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[])
subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])

@unittest.skipIf(mswindows or not hasattr(os, 'umask'),
'POSIX umask() is not available.')
Expand All @@ -1904,7 +1913,7 @@ def test_umask(self):
# We set an unusual umask in the child so as a unique mode
# for us to test the child's touched file for.
subprocess.check_call(
[sys.executable, "-c", f"open({name!r}, 'w')"], # touch
[sys.executable, "-c", f"open({name!r}, 'w').close()"],
umask=0o053)
# Ignore execute permissions entirely in our test,
# filesystems could be mounted to ignore or force that.
Expand Down Expand Up @@ -2007,7 +2016,7 @@ def raise_it():

with self.assertRaises(subprocess.SubprocessError):
self._TestExecuteChildPopen(
self, [sys.executable, "-c", "pass"],
self, ZERO_RETURN_CMD,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, preexec_fn=raise_it)

Expand Down Expand Up @@ -2464,7 +2473,7 @@ def prepare():

try:
subprocess.call(
[sys.executable, "-c", "pass"],
ZERO_RETURN_CMD,
preexec_fn=prepare)
except ValueError as err:
# Pure Python implementations keeps the message
Expand Down Expand Up @@ -2507,29 +2516,30 @@ def test_undecodable_env(self):
self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))

def test_bytes_program(self):
abs_program = os.fsencode(sys.executable)
path, program = os.path.split(sys.executable)
abs_program = os.fsencode(ZERO_RETURN_CMD[0])
args = list(ZERO_RETURN_CMD[1:])
path, program = os.path.split(ZERO_RETURN_CMD[0])
program = os.fsencode(program)

# absolute bytes path
exitcode = subprocess.call([abs_program, "-c", "pass"])
exitcode = subprocess.call([abs_program]+args)
self.assertEqual(exitcode, 0)

# absolute bytes path as a string
cmd = b"'" + abs_program + b"' -c pass"
cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
exitcode = subprocess.call(cmd, shell=True)
self.assertEqual(exitcode, 0)

# bytes program, unicode PATH
env = os.environ.copy()
env["PATH"] = path
exitcode = subprocess.call([program, "-c", "pass"], env=env)
exitcode = subprocess.call([program]+args, env=env)
self.assertEqual(exitcode, 0)

# bytes program, bytes PATH
envb = os.environb.copy()
envb[b"PATH"] = os.fsencode(path)
exitcode = subprocess.call([program, "-c", "pass"], env=envb)
exitcode = subprocess.call([program]+args, env=envb)
self.assertEqual(exitcode, 0)

def test_pipe_cloexec(self):
Expand Down Expand Up @@ -2757,7 +2767,7 @@ def test_pass_fds(self):
# pass_fds overrides close_fds with a warning.
with self.assertWarns(RuntimeWarning) as context:
self.assertFalse(subprocess.call(
[sys.executable, "-c", "import sys; sys.exit(0)"],
ZERO_RETURN_CMD,
close_fds=False, pass_fds=(fd, )))
self.assertIn('overriding close_fds', str(context.warning))

Expand Down Expand Up @@ -2819,19 +2829,19 @@ def test_pass_fds_redirected(self):

def test_stdout_stdin_are_single_inout_fd(self):
with io.open(os.devnull, "r+") as inout:
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdout=inout, stdin=inout)
p.wait()

def test_stdout_stderr_are_single_inout_fd(self):
with io.open(os.devnull, "r+") as inout:
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stdout=inout, stderr=inout)
p.wait()

def test_stderr_stdin_are_single_inout_fd(self):
with io.open(os.devnull, "r+") as inout:
p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
p = subprocess.Popen(ZERO_RETURN_CMD,
stderr=inout, stdin=inout)
p.wait()

Expand Down Expand Up @@ -3031,7 +3041,7 @@ def __int__(self):
def test_communicate_BrokenPipeError_stdin_close(self):
# By not setting stdout or stderr or a timeout we force the fast path
# that just calls _stdin_write() internally due to our mock.
proc = subprocess.Popen([sys.executable, '-c', 'pass'])
proc = subprocess.Popen(ZERO_RETURN_CMD)
with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
mock_proc_stdin.close.side_effect = BrokenPipeError
proc.communicate() # Should swallow BrokenPipeError from close.
Expand All @@ -3040,7 +3050,7 @@ def test_communicate_BrokenPipeError_stdin_close(self):
def test_communicate_BrokenPipeError_stdin_write(self):
# By not setting stdout or stderr or a timeout we force the fast path
# that just calls _stdin_write() internally due to our mock.
proc = subprocess.Popen([sys.executable, '-c', 'pass'])
proc = subprocess.Popen(ZERO_RETURN_CMD)
with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
mock_proc_stdin.write.side_effect = BrokenPipeError
proc.communicate(b'stuff') # Should swallow the BrokenPipeError.
Expand Down Expand Up @@ -3079,7 +3089,7 @@ def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
'need _testcapi.W_STOPCODE')
def test_stopped(self):
"""Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
args = [sys.executable, '-c', 'pass']
args = ZERO_RETURN_CMD
proc = subprocess.Popen(args)

# Wait until the real process completes to avoid zombie process
Expand Down Expand Up @@ -3109,7 +3119,7 @@ def test_startupinfo(self):
# Since Python is a console process, it won't be affected
# by wShowWindow, but the argument should be silently
# ignored
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
subprocess.call(ZERO_RETURN_CMD,
startupinfo=startupinfo)

def test_startupinfo_keywords(self):
Expand All @@ -3125,7 +3135,7 @@ def test_startupinfo_keywords(self):
# Since Python is a console process, it won't be affected
# by wShowWindow, but the argument should be silently
# ignored
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
subprocess.call(ZERO_RETURN_CMD,
startupinfo=startupinfo)

def test_startupinfo_copy(self):
Expand All @@ -3137,7 +3147,7 @@ def test_startupinfo_copy(self):
# Call Popen() twice with the same startupinfo object to make sure
# that it's not modified
for _ in range(2):
cmd = [sys.executable, "-c", "pass"]
cmd = ZERO_RETURN_CMD
with open(os.devnull, 'w') as null:
proc = subprocess.Popen(cmd,
stdout=null,
Expand Down Expand Up @@ -3177,7 +3187,7 @@ def test_issue31471(self):
class BadEnv(dict):
keys = None
with self.assertRaises(TypeError):
subprocess.Popen([sys.executable, "-c", "pass"], env=BadEnv())
subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())

def test_close_fds(self):
# close file descriptors
Expand Down Expand Up @@ -3238,13 +3248,13 @@ def test_close_fds_with_stdio(self):
def test_empty_attribute_list(self):
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList = {}
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
subprocess.call(ZERO_RETURN_CMD,
startupinfo=startupinfo)

def test_empty_handle_list(self):
startupinfo = subprocess.STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": []}
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
subprocess.call(ZERO_RETURN_CMD,
startupinfo=startupinfo)

def test_shell_sequence(self):
Expand Down Expand Up @@ -3543,7 +3553,7 @@ def test_invalid_args(self):

def test_broken_pipe_cleanup(self):
"""Broken pipe error should not prevent wait() (Issue 21619)"""
proc = subprocess.Popen([sys.executable, '-c', 'pass'],
proc = subprocess.Popen(ZERO_RETURN_CMD,
stdin=subprocess.PIPE,
bufsize=support.PIPE_MAX_SIZE*2)
proc = proc.__enter__()
Expand Down