Skip to content

bpo-33753: Refactor creating temporary files in test_fileinput. #7377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 101 additions & 148 deletions Lib/test/test_fileinput.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import fileinput
import collections
import builtins
import tempfile
import unittest

try:
Expand All @@ -33,20 +34,15 @@
# all the work, and a few functions (input, etc.) that use a global _state
# variable.

# Write lines (a list of lines) to temp file number i, and return the
# temp file's name.
def writeTmp(i, lines, mode='w'): # opening in text mode is the default
name = TESTFN + str(i)
f = open(name, mode)
for line in lines:
f.write(line)
f.close()
return name

def remove_tempfiles(*names):
for name in names:
if name:
safe_unlink(name)
class BaseTests:
# Write a content (str or bytes) to temp file, and return the
# temp file's name.
def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
fd, name = tempfile.mkstemp()
self.addCleanup(support.unlink, name)
with open(fd, mode) as f:
f.write(content)
return name

class LineReader:

Expand Down Expand Up @@ -84,23 +80,19 @@ def readlines(self, hint=-1):
def close(self):
pass

class BufferSizesTests(unittest.TestCase):
class BufferSizesTests(BaseTests, unittest.TestCase):
def test_buffer_sizes(self):
# First, run the tests with default and teeny buffer size.
for round, bs in (0, 0), (1, 30):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
if bs:
with self.assertWarns(DeprecationWarning):
self.buffer_size_test(t1, t2, t3, t4, bs, round)
else:
t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15)))
t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10)))
t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5)))
t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1)))
if bs:
with self.assertWarns(DeprecationWarning):
self.buffer_size_test(t1, t2, t3, t4, bs, round)
finally:
remove_tempfiles(t1, t2, t3, t4)
else:
self.buffer_size_test(t1, t2, t3, t4, bs, round)

def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Expand Down Expand Up @@ -187,74 +179,59 @@ def __call__(self, *args, **kwargs):
self.invoked = True
raise self.exception_type()

class FileInputTests(unittest.TestCase):
class FileInputTests(BaseTests, unittest.TestCase):

def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))

line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)

line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
t1 = self.writeTmp("")
t2 = self.writeTmp("")
t3 = self.writeTmp("The only line there is.\n")
t4 = self.writeTmp("")
fi = FileInput(files=(t1, t2, t3, t4))

line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)

line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()

def test_files_that_dont_end_with_newline(self):
t1 = t2 = None
try:
t1 = writeTmp(1, ["A\nB\nC"])
t2 = writeTmp(2, ["D\nE\nF"])
fi = FileInput(files=(t1, t2))
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)
finally:
remove_tempfiles(t1, t2)
t1 = self.writeTmp("A\nB\nC")
t2 = self.writeTmp("D\nE\nF")
fi = FileInput(files=(t1, t2))
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)

## def test_unicode_filenames(self):
## # XXX A unicode string is always returned by writeTmp.
## # So is this needed?
## try:
## t1 = writeTmp(1, ["A\nB"])
## encoding = sys.getfilesystemencoding()
## if encoding is None:
## encoding = 'ascii'
## fi = FileInput(files=str(t1, encoding))
## lines = list(fi)
## self.assertEqual(lines, ["A\n", "B"])
## finally:
## remove_tempfiles(t1)
## t1 = self.writeTmp("A\nB")
## encoding = sys.getfilesystemencoding()
## if encoding is None:
## encoding = 'ascii'
## fi = FileInput(files=str(t1, encoding))
## lines = list(fi)
## self.assertEqual(lines, ["A\n", "B"])

def test_fileno(self):
t1 = t2 = None
try:
t1 = writeTmp(1, ["A\nB"])
t2 = writeTmp(2, ["C\nD"])
fi = FileInput(files=(t1, t2))
self.assertEqual(fi.fileno(), -1)
line =next( fi)
self.assertNotEqual(fi.fileno(), -1)
fi.nextfile()
self.assertEqual(fi.fileno(), -1)
line = list(fi)
self.assertEqual(fi.fileno(), -1)
finally:
remove_tempfiles(t1, t2)
t1 = self.writeTmp("A\nB")
t2 = self.writeTmp("C\nD")
fi = FileInput(files=(t1, t2))
self.assertEqual(fi.fileno(), -1)
line = next(fi)
self.assertNotEqual(fi.fileno(), -1)
fi.nextfile()
self.assertEqual(fi.fileno(), -1)
line = list(fi)
self.assertEqual(fi.fileno(), -1)

def test_opening_mode(self):
try:
Expand All @@ -263,17 +240,13 @@ def test_opening_mode(self):
self.fail("FileInput should reject invalid mode argument")
except ValueError:
pass
t1 = None
try:
# try opening in universal newline mode
t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
with check_warnings(('', DeprecationWarning)):
fi = FileInput(files=t1, mode="U")
with check_warnings(('', DeprecationWarning)):
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
finally:
remove_tempfiles(t1)
# try opening in universal newline mode
t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb")
with check_warnings(('', DeprecationWarning)):
fi = FileInput(files=t1, mode="U")
with check_warnings(('', DeprecationWarning)):
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])

def test_stdin_binary_mode(self):
with mock.patch('sys.stdin') as m_stdin:
Expand Down Expand Up @@ -314,8 +287,7 @@ def __call__(self, *args):
self.invoked = True
return open(*args)

t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("\n")
custom_open_hook = CustomOpenHook()
with FileInput([t], openhook=custom_open_hook) as fi:
fi.readline()
Expand Down Expand Up @@ -358,27 +330,22 @@ def test_readline_binary_mode(self):
self.assertEqual(fi.readline(), b'')

def test_context_manager(self):
try:
t1 = writeTmp(1, ["A\nB\nC"])
t2 = writeTmp(2, ["D\nE\nF"])
with FileInput(files=(t1, t2)) as fi:
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)
self.assertEqual(fi._files, ())
finally:
remove_tempfiles(t1, t2)
t1 = self.writeTmp("A\nB\nC")
t2 = self.writeTmp("D\nE\nF")
with FileInput(files=(t1, t2)) as fi:
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)
self.assertEqual(fi._files, ())

def test_close_on_exception(self):
t1 = self.writeTmp("")
try:
t1 = writeTmp(1, [""])
with FileInput(files=t1) as fi:
raise OSError
except OSError:
self.assertEqual(fi._files, ())
finally:
remove_tempfiles(t1)

def test_empty_files_list_specified_to_constructor(self):
with FileInput(files=[]) as fi:
Expand All @@ -387,8 +354,7 @@ def test_empty_files_list_specified_to_constructor(self):
def test__getitem__(self):
"""Tests invoking FileInput.__getitem__() with the current
line number"""
t = writeTmp(1, ["line1\n", "line2\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("line1\nline2\n")
with FileInput(files=[t]) as fi:
retval1 = fi[0]
self.assertEqual(retval1, "line1\n")
Expand All @@ -398,8 +364,7 @@ def test__getitem__(self):
def test__getitem__invalid_key(self):
"""Tests invoking FileInput.__getitem__() with an index unequal to
the line number"""
t = writeTmp(1, ["line1\n", "line2\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("line1\nline2\n")
with FileInput(files=[t]) as fi:
with self.assertRaises(RuntimeError) as cm:
fi[1]
Expand All @@ -408,8 +373,7 @@ def test__getitem__invalid_key(self):
def test__getitem__eof(self):
"""Tests invoking FileInput.__getitem__() with the line number but at
end-of-input"""
t = writeTmp(1, [])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp('')
with FileInput(files=[t]) as fi:
with self.assertRaises(IndexError) as cm:
fi[0]
Expand All @@ -423,8 +387,8 @@ def test_nextfile_oserror_deleting_backup(self):
os_unlink_orig = os.unlink
os_unlink_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("\n")
self.addCleanup(support.unlink, t + '.bak')
with FileInput(files=[t], inplace=True) as fi:
next(fi) # make sure the file is opened
os.unlink = os_unlink_replacement
Expand All @@ -443,8 +407,7 @@ def test_readline_os_fstat_raises_OSError(self):
os_fstat_orig = os.fstat
os_fstat_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("\n")
with FileInput(files=[t], inplace=True) as fi:
os.fstat = os_fstat_replacement
fi.readline()
Expand All @@ -463,8 +426,7 @@ def test_readline_os_chmod_raises_OSError(self):
os_chmod_orig = os.chmod
os_chmod_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("\n")
with FileInput(files=[t], inplace=True) as fi:
os.chmod = os_chmod_replacement
fi.readline()
Expand All @@ -483,8 +445,7 @@ def fileno(self):
self.__call__()

unconditionally_raise_ValueError = FilenoRaisesValueError()
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
t = self.writeTmp("\n")
with FileInput(files=[t]) as fi:
file_backup = fi._file
try:
Expand Down Expand Up @@ -532,30 +493,22 @@ def test_iteration_buffering(self):
self.assertEqual(src.linesread, [])

def test_pathlib_file(self):
t1 = None
try:
t1 = Path(writeTmp(1, ["Pathlib file."]))
with FileInput(t1) as fi:
line = fi.readline()
self.assertEqual(line, 'Pathlib file.')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), os.fspath(t1))
finally:
remove_tempfiles(t1)
t1 = Path(self.writeTmp("Pathlib file."))
with FileInput(t1) as fi:
line = fi.readline()
self.assertEqual(line, 'Pathlib file.')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), os.fspath(t1))

def test_pathlib_file_inplace(self):
t1 = None
try:
t1 = Path(writeTmp(1, ['Pathlib file.']))
with FileInput(t1, inplace=True) as fi:
line = fi.readline()
self.assertEqual(line, 'Pathlib file.')
print('Modified %s' % line)
with open(t1) as f:
self.assertEqual(f.read(), 'Modified Pathlib file.\n')
finally:
remove_tempfiles(t1)
t1 = Path(self.writeTmp('Pathlib file.'))
with FileInput(t1, inplace=True) as fi:
line = fi.readline()
self.assertEqual(line, 'Pathlib file.')
print('Modified %s' % line)
with open(t1) as f:
self.assertEqual(f.read(), 'Modified Pathlib file.\n')


class MockFileInput:
Expand Down