8
8
import fileinput
9
9
import collections
10
10
import builtins
11
+ import tempfile
11
12
import unittest
12
13
13
14
try :
32
33
# all the work, and a few functions (input, etc.) that use a global _state
33
34
# variable.
34
35
35
- # Write lines (a list of lines) to temp file number i, and return the
36
- # temp file's name.
37
- def writeTmp (i , lines , mode = 'w' ): # opening in text mode is the default
38
- name = TESTFN + str (i )
39
- f = open (name , mode )
40
- for line in lines :
41
- f .write (line )
42
- f .close ()
43
- return name
44
-
45
- def remove_tempfiles (* names ):
46
- for name in names :
47
- if name :
48
- safe_unlink (name )
36
+ class BaseTests :
37
+ # Write a content (str or bytes) to temp file, and return the
38
+ # temp file's name.
39
+ def writeTmp (self , content , * , mode = 'w' ): # opening in text mode is the default
40
+ fd , name = tempfile .mkstemp ()
41
+ self .addCleanup (support .unlink , name )
42
+ with open (fd , mode ) as f :
43
+ f .write (content )
44
+ return name
49
45
50
46
class LineReader :
51
47
@@ -83,23 +79,19 @@ def readlines(self, hint=-1):
83
79
def close (self ):
84
80
pass
85
81
86
- class BufferSizesTests (unittest .TestCase ):
82
+ class BufferSizesTests (BaseTests , unittest .TestCase ):
87
83
def test_buffer_sizes (self ):
88
84
# First, run the tests with default and teeny buffer size.
89
85
for round , bs in (0 , 0 ), (1 , 30 ):
90
- t1 = t2 = t3 = t4 = None
91
- try :
92
- t1 = writeTmp (1 , ["Line %s of file 1\n " % (i + 1 ) for i in range (15 )])
93
- t2 = writeTmp (2 , ["Line %s of file 2\n " % (i + 1 ) for i in range (10 )])
94
- t3 = writeTmp (3 , ["Line %s of file 3\n " % (i + 1 ) for i in range (5 )])
95
- t4 = writeTmp (4 , ["Line %s of file 4\n " % (i + 1 ) for i in range (1 )])
96
- if bs :
97
- with self .assertWarns (DeprecationWarning ):
98
- self .buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
99
- else :
86
+ t1 = self .writeTmp ('' .join ("Line %s of file 1\n " % (i + 1 ) for i in range (15 )))
87
+ t2 = self .writeTmp ('' .join ("Line %s of file 2\n " % (i + 1 ) for i in range (10 )))
88
+ t3 = self .writeTmp ('' .join ("Line %s of file 3\n " % (i + 1 ) for i in range (5 )))
89
+ t4 = self .writeTmp ('' .join ("Line %s of file 4\n " % (i + 1 ) for i in range (1 )))
90
+ if bs :
91
+ with self .assertWarns (DeprecationWarning ):
100
92
self .buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
101
- finally :
102
- remove_tempfiles (t1 , t2 , t3 , t4 )
93
+ else :
94
+ self . buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
103
95
104
96
def buffer_size_test (self , t1 , t2 , t3 , t4 , bs = 0 , round = 0 ):
105
97
pat = re .compile (r'LINE (\d+) OF FILE (\d+)' )
@@ -186,74 +178,59 @@ def __call__(self, *args, **kwargs):
186
178
self .invoked = True
187
179
raise self .exception_type ()
188
180
189
- class FileInputTests (unittest .TestCase ):
181
+ class FileInputTests (BaseTests , unittest .TestCase ):
190
182
191
183
def test_zero_byte_files (self ):
192
- t1 = t2 = t3 = t4 = None
193
- try :
194
- t1 = writeTmp (1 , ["" ])
195
- t2 = writeTmp (2 , ["" ])
196
- t3 = writeTmp (3 , ["The only line there is.\n " ])
197
- t4 = writeTmp (4 , ["" ])
198
- fi = FileInput (files = (t1 , t2 , t3 , t4 ))
199
-
200
- line = fi .readline ()
201
- self .assertEqual (line , 'The only line there is.\n ' )
202
- self .assertEqual (fi .lineno (), 1 )
203
- self .assertEqual (fi .filelineno (), 1 )
204
- self .assertEqual (fi .filename (), t3 )
205
-
206
- line = fi .readline ()
207
- self .assertFalse (line )
208
- self .assertEqual (fi .lineno (), 1 )
209
- self .assertEqual (fi .filelineno (), 0 )
210
- self .assertEqual (fi .filename (), t4 )
211
- fi .close ()
212
- finally :
213
- remove_tempfiles (t1 , t2 , t3 , t4 )
184
+ t1 = self .writeTmp ("" )
185
+ t2 = self .writeTmp ("" )
186
+ t3 = self .writeTmp ("The only line there is.\n " )
187
+ t4 = self .writeTmp ("" )
188
+ fi = FileInput (files = (t1 , t2 , t3 , t4 ))
189
+
190
+ line = fi .readline ()
191
+ self .assertEqual (line , 'The only line there is.\n ' )
192
+ self .assertEqual (fi .lineno (), 1 )
193
+ self .assertEqual (fi .filelineno (), 1 )
194
+ self .assertEqual (fi .filename (), t3 )
195
+
196
+ line = fi .readline ()
197
+ self .assertFalse (line )
198
+ self .assertEqual (fi .lineno (), 1 )
199
+ self .assertEqual (fi .filelineno (), 0 )
200
+ self .assertEqual (fi .filename (), t4 )
201
+ fi .close ()
214
202
215
203
def test_files_that_dont_end_with_newline (self ):
216
- t1 = t2 = None
217
- try :
218
- t1 = writeTmp (1 , ["A\n B\n C" ])
219
- t2 = writeTmp (2 , ["D\n E\n F" ])
220
- fi = FileInput (files = (t1 , t2 ))
221
- lines = list (fi )
222
- self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
223
- self .assertEqual (fi .filelineno (), 3 )
224
- self .assertEqual (fi .lineno (), 6 )
225
- finally :
226
- remove_tempfiles (t1 , t2 )
204
+ t1 = self .writeTmp ("A\n B\n C" )
205
+ t2 = self .writeTmp ("D\n E\n F" )
206
+ fi = FileInput (files = (t1 , t2 ))
207
+ lines = list (fi )
208
+ self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
209
+ self .assertEqual (fi .filelineno (), 3 )
210
+ self .assertEqual (fi .lineno (), 6 )
227
211
228
212
## def test_unicode_filenames(self):
229
213
## # XXX A unicode string is always returned by writeTmp.
230
214
## # So is this needed?
231
- ## try:
232
- ## t1 = writeTmp(1, ["A\nB"])
233
- ## encoding = sys.getfilesystemencoding()
234
- ## if encoding is None:
235
- ## encoding = 'ascii'
236
- ## fi = FileInput(files=str(t1, encoding))
237
- ## lines = list(fi)
238
- ## self.assertEqual(lines, ["A\n", "B"])
239
- ## finally:
240
- ## remove_tempfiles(t1)
215
+ ## t1 = self.writeTmp("A\nB")
216
+ ## encoding = sys.getfilesystemencoding()
217
+ ## if encoding is None:
218
+ ## encoding = 'ascii'
219
+ ## fi = FileInput(files=str(t1, encoding))
220
+ ## lines = list(fi)
221
+ ## self.assertEqual(lines, ["A\n", "B"])
241
222
242
223
def test_fileno (self ):
243
- t1 = t2 = None
244
- try :
245
- t1 = writeTmp (1 , ["A\n B" ])
246
- t2 = writeTmp (2 , ["C\n D" ])
247
- fi = FileInput (files = (t1 , t2 ))
248
- self .assertEqual (fi .fileno (), - 1 )
249
- line = next ( fi )
250
- self .assertNotEqual (fi .fileno (), - 1 )
251
- fi .nextfile ()
252
- self .assertEqual (fi .fileno (), - 1 )
253
- line = list (fi )
254
- self .assertEqual (fi .fileno (), - 1 )
255
- finally :
256
- remove_tempfiles (t1 , t2 )
224
+ t1 = self .writeTmp ("A\n B" )
225
+ t2 = self .writeTmp ("C\n D" )
226
+ fi = FileInput (files = (t1 , t2 ))
227
+ self .assertEqual (fi .fileno (), - 1 )
228
+ line = next (fi )
229
+ self .assertNotEqual (fi .fileno (), - 1 )
230
+ fi .nextfile ()
231
+ self .assertEqual (fi .fileno (), - 1 )
232
+ line = list (fi )
233
+ self .assertEqual (fi .fileno (), - 1 )
257
234
258
235
def test_opening_mode (self ):
259
236
try :
@@ -262,17 +239,13 @@ def test_opening_mode(self):
262
239
self .fail ("FileInput should reject invalid mode argument" )
263
240
except ValueError :
264
241
pass
265
- t1 = None
266
- try :
267
- # try opening in universal newline mode
268
- t1 = writeTmp (1 , [b"A\n B\r \n C\r D" ], mode = "wb" )
269
- with check_warnings (('' , DeprecationWarning )):
270
- fi = FileInput (files = t1 , mode = "U" )
271
- with check_warnings (('' , DeprecationWarning )):
272
- lines = list (fi )
273
- self .assertEqual (lines , ["A\n " , "B\n " , "C\n " , "D" ])
274
- finally :
275
- remove_tempfiles (t1 )
242
+ # try opening in universal newline mode
243
+ t1 = self .writeTmp (b"A\n B\r \n C\r D" , mode = "wb" )
244
+ with check_warnings (('' , DeprecationWarning )):
245
+ fi = FileInput (files = t1 , mode = "U" )
246
+ with check_warnings (('' , DeprecationWarning )):
247
+ lines = list (fi )
248
+ self .assertEqual (lines , ["A\n " , "B\n " , "C\n " , "D" ])
276
249
277
250
def test_stdin_binary_mode (self ):
278
251
with mock .patch ('sys.stdin' ) as m_stdin :
@@ -313,8 +286,7 @@ def __call__(self, *args):
313
286
self .invoked = True
314
287
return open (* args )
315
288
316
- t = writeTmp (1 , ["\n " ])
317
- self .addCleanup (remove_tempfiles , t )
289
+ t = self .writeTmp ("\n " )
318
290
custom_open_hook = CustomOpenHook ()
319
291
with FileInput ([t ], openhook = custom_open_hook ) as fi :
320
292
fi .readline ()
@@ -357,27 +329,22 @@ def test_readline_binary_mode(self):
357
329
self .assertEqual (fi .readline (), b'' )
358
330
359
331
def test_context_manager (self ):
360
- try :
361
- t1 = writeTmp (1 , ["A\n B\n C" ])
362
- t2 = writeTmp (2 , ["D\n E\n F" ])
363
- with FileInput (files = (t1 , t2 )) as fi :
364
- lines = list (fi )
365
- self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
366
- self .assertEqual (fi .filelineno (), 3 )
367
- self .assertEqual (fi .lineno (), 6 )
368
- self .assertEqual (fi ._files , ())
369
- finally :
370
- remove_tempfiles (t1 , t2 )
332
+ t1 = self .writeTmp ("A\n B\n C" )
333
+ t2 = self .writeTmp ("D\n E\n F" )
334
+ with FileInput (files = (t1 , t2 )) as fi :
335
+ lines = list (fi )
336
+ self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
337
+ self .assertEqual (fi .filelineno (), 3 )
338
+ self .assertEqual (fi .lineno (), 6 )
339
+ self .assertEqual (fi ._files , ())
371
340
372
341
def test_close_on_exception (self ):
342
+ t1 = self .writeTmp ("" )
373
343
try :
374
- t1 = writeTmp (1 , ["" ])
375
344
with FileInput (files = t1 ) as fi :
376
345
raise OSError
377
346
except OSError :
378
347
self .assertEqual (fi ._files , ())
379
- finally :
380
- remove_tempfiles (t1 )
381
348
382
349
def test_empty_files_list_specified_to_constructor (self ):
383
350
with FileInput (files = []) as fi :
@@ -386,8 +353,7 @@ def test_empty_files_list_specified_to_constructor(self):
386
353
def test__getitem__ (self ):
387
354
"""Tests invoking FileInput.__getitem__() with the current
388
355
line number"""
389
- t = writeTmp (1 , ["line1\n " , "line2\n " ])
390
- self .addCleanup (remove_tempfiles , t )
356
+ t = self .writeTmp ("line1\n line2\n " )
391
357
with FileInput (files = [t ]) as fi :
392
358
retval1 = fi [0 ]
393
359
self .assertEqual (retval1 , "line1\n " )
@@ -397,8 +363,7 @@ def test__getitem__(self):
397
363
def test__getitem__invalid_key (self ):
398
364
"""Tests invoking FileInput.__getitem__() with an index unequal to
399
365
the line number"""
400
- t = writeTmp (1 , ["line1\n " , "line2\n " ])
401
- self .addCleanup (remove_tempfiles , t )
366
+ t = self .writeTmp ("line1\n line2\n " )
402
367
with FileInput (files = [t ]) as fi :
403
368
with self .assertRaises (RuntimeError ) as cm :
404
369
fi [1 ]
@@ -407,8 +372,7 @@ def test__getitem__invalid_key(self):
407
372
def test__getitem__eof (self ):
408
373
"""Tests invoking FileInput.__getitem__() with the line number but at
409
374
end-of-input"""
410
- t = writeTmp (1 , [])
411
- self .addCleanup (remove_tempfiles , t )
375
+ t = self .writeTmp ('' )
412
376
with FileInput (files = [t ]) as fi :
413
377
with self .assertRaises (IndexError ) as cm :
414
378
fi [0 ]
@@ -422,8 +386,8 @@ def test_nextfile_oserror_deleting_backup(self):
422
386
os_unlink_orig = os .unlink
423
387
os_unlink_replacement = UnconditionallyRaise (OSError )
424
388
try :
425
- t = writeTmp (1 , [ "\n " ] )
426
- self .addCleanup (remove_tempfiles , t )
389
+ t = self . writeTmp ("\n " )
390
+ self .addCleanup (support . unlink , t + '.bak' )
427
391
with FileInput (files = [t ], inplace = True ) as fi :
428
392
next (fi ) # make sure the file is opened
429
393
os .unlink = os_unlink_replacement
@@ -442,8 +406,7 @@ def test_readline_os_fstat_raises_OSError(self):
442
406
os_fstat_orig = os .fstat
443
407
os_fstat_replacement = UnconditionallyRaise (OSError )
444
408
try :
445
- t = writeTmp (1 , ["\n " ])
446
- self .addCleanup (remove_tempfiles , t )
409
+ t = self .writeTmp ("\n " )
447
410
with FileInput (files = [t ], inplace = True ) as fi :
448
411
os .fstat = os_fstat_replacement
449
412
fi .readline ()
@@ -462,8 +425,7 @@ def test_readline_os_chmod_raises_OSError(self):
462
425
os_chmod_orig = os .chmod
463
426
os_chmod_replacement = UnconditionallyRaise (OSError )
464
427
try :
465
- t = writeTmp (1 , ["\n " ])
466
- self .addCleanup (remove_tempfiles , t )
428
+ t = self .writeTmp ("\n " )
467
429
with FileInput (files = [t ], inplace = True ) as fi :
468
430
os .chmod = os_chmod_replacement
469
431
fi .readline ()
@@ -482,8 +444,7 @@ def fileno(self):
482
444
self .__call__ ()
483
445
484
446
unconditionally_raise_ValueError = FilenoRaisesValueError ()
485
- t = writeTmp (1 , ["\n " ])
486
- self .addCleanup (remove_tempfiles , t )
447
+ t = self .writeTmp ("\n " )
487
448
with FileInput (files = [t ]) as fi :
488
449
file_backup = fi ._file
489
450
try :
@@ -530,6 +491,7 @@ def test_iteration_buffering(self):
530
491
self .assertRaises (StopIteration , next , fi )
531
492
self .assertEqual (src .linesread , [])
532
493
494
+
533
495
class MockFileInput :
534
496
"""A class that mocks out fileinput.FileInput for use during unit tests"""
535
497
0 commit comments