8
8
import fileinput
9
9
import collections
10
10
import builtins
11
+ import tempfile
11
12
import unittest
12
13
13
14
try :
33
34
# all the work, and a few functions (input, etc.) that use a global _state
34
35
# variable.
35
36
36
- # Write lines (a list of lines) to temp file number i, and return the
37
- # temp file's name.
38
- def writeTmp (i , lines , mode = 'w' ): # opening in text mode is the default
39
- name = TESTFN + str (i )
40
- f = open (name , mode )
41
- for line in lines :
42
- f .write (line )
43
- f .close ()
44
- return name
45
-
46
- def remove_tempfiles (* names ):
47
- for name in names :
48
- if name :
49
- safe_unlink (name )
37
+ class BaseTests :
38
+ # Write a content (str or bytes) to temp file, and return the
39
+ # temp file's name.
40
+ def writeTmp (self , content , * , mode = 'w' ): # opening in text mode is the default
41
+ fd , name = tempfile .mkstemp ()
42
+ self .addCleanup (support .unlink , name )
43
+ with open (fd , mode ) as f :
44
+ f .write (content )
45
+ return name
50
46
51
47
class LineReader :
52
48
@@ -84,23 +80,19 @@ def readlines(self, hint=-1):
84
80
def close (self ):
85
81
pass
86
82
87
- class BufferSizesTests (unittest .TestCase ):
83
+ class BufferSizesTests (BaseTests , unittest .TestCase ):
88
84
def test_buffer_sizes (self ):
89
85
# First, run the tests with default and teeny buffer size.
90
86
for round , bs in (0 , 0 ), (1 , 30 ):
91
- t1 = t2 = t3 = t4 = None
92
- try :
93
- t1 = writeTmp (1 , ["Line %s of file 1\n " % (i + 1 ) for i in range (15 )])
94
- t2 = writeTmp (2 , ["Line %s of file 2\n " % (i + 1 ) for i in range (10 )])
95
- t3 = writeTmp (3 , ["Line %s of file 3\n " % (i + 1 ) for i in range (5 )])
96
- t4 = writeTmp (4 , ["Line %s of file 4\n " % (i + 1 ) for i in range (1 )])
97
- if bs :
98
- with self .assertWarns (DeprecationWarning ):
99
- self .buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
100
- else :
87
+ t1 = self .writeTmp ('' .join ("Line %s of file 1\n " % (i + 1 ) for i in range (15 )))
88
+ t2 = self .writeTmp ('' .join ("Line %s of file 2\n " % (i + 1 ) for i in range (10 )))
89
+ t3 = self .writeTmp ('' .join ("Line %s of file 3\n " % (i + 1 ) for i in range (5 )))
90
+ t4 = self .writeTmp ('' .join ("Line %s of file 4\n " % (i + 1 ) for i in range (1 )))
91
+ if bs :
92
+ with self .assertWarns (DeprecationWarning ):
101
93
self .buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
102
- finally :
103
- remove_tempfiles (t1 , t2 , t3 , t4 )
94
+ else :
95
+ self . buffer_size_test (t1 , t2 , t3 , t4 , bs , round )
104
96
105
97
def buffer_size_test (self , t1 , t2 , t3 , t4 , bs = 0 , round = 0 ):
106
98
pat = re .compile (r'LINE (\d+) OF FILE (\d+)' )
@@ -187,74 +179,59 @@ def __call__(self, *args, **kwargs):
187
179
self .invoked = True
188
180
raise self .exception_type ()
189
181
190
- class FileInputTests (unittest .TestCase ):
182
+ class FileInputTests (BaseTests , unittest .TestCase ):
191
183
192
184
def test_zero_byte_files (self ):
193
- t1 = t2 = t3 = t4 = None
194
- try :
195
- t1 = writeTmp (1 , ["" ])
196
- t2 = writeTmp (2 , ["" ])
197
- t3 = writeTmp (3 , ["The only line there is.\n " ])
198
- t4 = writeTmp (4 , ["" ])
199
- fi = FileInput (files = (t1 , t2 , t3 , t4 ))
200
-
201
- line = fi .readline ()
202
- self .assertEqual (line , 'The only line there is.\n ' )
203
- self .assertEqual (fi .lineno (), 1 )
204
- self .assertEqual (fi .filelineno (), 1 )
205
- self .assertEqual (fi .filename (), t3 )
206
-
207
- line = fi .readline ()
208
- self .assertFalse (line )
209
- self .assertEqual (fi .lineno (), 1 )
210
- self .assertEqual (fi .filelineno (), 0 )
211
- self .assertEqual (fi .filename (), t4 )
212
- fi .close ()
213
- finally :
214
- remove_tempfiles (t1 , t2 , t3 , t4 )
185
+ t1 = self .writeTmp ("" )
186
+ t2 = self .writeTmp ("" )
187
+ t3 = self .writeTmp ("The only line there is.\n " )
188
+ t4 = self .writeTmp ("" )
189
+ fi = FileInput (files = (t1 , t2 , t3 , t4 ))
190
+
191
+ line = fi .readline ()
192
+ self .assertEqual (line , 'The only line there is.\n ' )
193
+ self .assertEqual (fi .lineno (), 1 )
194
+ self .assertEqual (fi .filelineno (), 1 )
195
+ self .assertEqual (fi .filename (), t3 )
196
+
197
+ line = fi .readline ()
198
+ self .assertFalse (line )
199
+ self .assertEqual (fi .lineno (), 1 )
200
+ self .assertEqual (fi .filelineno (), 0 )
201
+ self .assertEqual (fi .filename (), t4 )
202
+ fi .close ()
215
203
216
204
def test_files_that_dont_end_with_newline (self ):
217
- t1 = t2 = None
218
- try :
219
- t1 = writeTmp (1 , ["A\n B\n C" ])
220
- t2 = writeTmp (2 , ["D\n E\n F" ])
221
- fi = FileInput (files = (t1 , t2 ))
222
- lines = list (fi )
223
- self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
224
- self .assertEqual (fi .filelineno (), 3 )
225
- self .assertEqual (fi .lineno (), 6 )
226
- finally :
227
- remove_tempfiles (t1 , t2 )
205
+ t1 = self .writeTmp ("A\n B\n C" )
206
+ t2 = self .writeTmp ("D\n E\n F" )
207
+ fi = FileInput (files = (t1 , t2 ))
208
+ lines = list (fi )
209
+ self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
210
+ self .assertEqual (fi .filelineno (), 3 )
211
+ self .assertEqual (fi .lineno (), 6 )
228
212
229
213
## def test_unicode_filenames(self):
230
214
## # XXX A unicode string is always returned by writeTmp.
231
215
## # So is this needed?
232
- ## try:
233
- ## t1 = writeTmp(1, ["A\nB"])
234
- ## encoding = sys.getfilesystemencoding()
235
- ## if encoding is None:
236
- ## encoding = 'ascii'
237
- ## fi = FileInput(files=str(t1, encoding))
238
- ## lines = list(fi)
239
- ## self.assertEqual(lines, ["A\n", "B"])
240
- ## finally:
241
- ## remove_tempfiles(t1)
216
+ ## t1 = self.writeTmp("A\nB")
217
+ ## encoding = sys.getfilesystemencoding()
218
+ ## if encoding is None:
219
+ ## encoding = 'ascii'
220
+ ## fi = FileInput(files=str(t1, encoding))
221
+ ## lines = list(fi)
222
+ ## self.assertEqual(lines, ["A\n", "B"])
242
223
243
224
def test_fileno (self ):
244
- t1 = t2 = None
245
- try :
246
- t1 = writeTmp (1 , ["A\n B" ])
247
- t2 = writeTmp (2 , ["C\n D" ])
248
- fi = FileInput (files = (t1 , t2 ))
249
- self .assertEqual (fi .fileno (), - 1 )
250
- line = next ( fi )
251
- self .assertNotEqual (fi .fileno (), - 1 )
252
- fi .nextfile ()
253
- self .assertEqual (fi .fileno (), - 1 )
254
- line = list (fi )
255
- self .assertEqual (fi .fileno (), - 1 )
256
- finally :
257
- remove_tempfiles (t1 , t2 )
225
+ t1 = self .writeTmp ("A\n B" )
226
+ t2 = self .writeTmp ("C\n D" )
227
+ fi = FileInput (files = (t1 , t2 ))
228
+ self .assertEqual (fi .fileno (), - 1 )
229
+ line = next (fi )
230
+ self .assertNotEqual (fi .fileno (), - 1 )
231
+ fi .nextfile ()
232
+ self .assertEqual (fi .fileno (), - 1 )
233
+ line = list (fi )
234
+ self .assertEqual (fi .fileno (), - 1 )
258
235
259
236
def test_opening_mode (self ):
260
237
try :
@@ -263,17 +240,13 @@ def test_opening_mode(self):
263
240
self .fail ("FileInput should reject invalid mode argument" )
264
241
except ValueError :
265
242
pass
266
- t1 = None
267
- try :
268
- # try opening in universal newline mode
269
- t1 = writeTmp (1 , [b"A\n B\r \n C\r D" ], mode = "wb" )
270
- with check_warnings (('' , DeprecationWarning )):
271
- fi = FileInput (files = t1 , mode = "U" )
272
- with check_warnings (('' , DeprecationWarning )):
273
- lines = list (fi )
274
- self .assertEqual (lines , ["A\n " , "B\n " , "C\n " , "D" ])
275
- finally :
276
- remove_tempfiles (t1 )
243
+ # try opening in universal newline mode
244
+ t1 = self .writeTmp (b"A\n B\r \n C\r D" , mode = "wb" )
245
+ with check_warnings (('' , DeprecationWarning )):
246
+ fi = FileInput (files = t1 , mode = "U" )
247
+ with check_warnings (('' , DeprecationWarning )):
248
+ lines = list (fi )
249
+ self .assertEqual (lines , ["A\n " , "B\n " , "C\n " , "D" ])
277
250
278
251
def test_stdin_binary_mode (self ):
279
252
with mock .patch ('sys.stdin' ) as m_stdin :
@@ -314,8 +287,7 @@ def __call__(self, *args):
314
287
self .invoked = True
315
288
return open (* args )
316
289
317
- t = writeTmp (1 , ["\n " ])
318
- self .addCleanup (remove_tempfiles , t )
290
+ t = self .writeTmp ("\n " )
319
291
custom_open_hook = CustomOpenHook ()
320
292
with FileInput ([t ], openhook = custom_open_hook ) as fi :
321
293
fi .readline ()
@@ -358,27 +330,22 @@ def test_readline_binary_mode(self):
358
330
self .assertEqual (fi .readline (), b'' )
359
331
360
332
def test_context_manager (self ):
361
- try :
362
- t1 = writeTmp (1 , ["A\n B\n C" ])
363
- t2 = writeTmp (2 , ["D\n E\n F" ])
364
- with FileInput (files = (t1 , t2 )) as fi :
365
- lines = list (fi )
366
- self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
367
- self .assertEqual (fi .filelineno (), 3 )
368
- self .assertEqual (fi .lineno (), 6 )
369
- self .assertEqual (fi ._files , ())
370
- finally :
371
- remove_tempfiles (t1 , t2 )
333
+ t1 = self .writeTmp ("A\n B\n C" )
334
+ t2 = self .writeTmp ("D\n E\n F" )
335
+ with FileInput (files = (t1 , t2 )) as fi :
336
+ lines = list (fi )
337
+ self .assertEqual (lines , ["A\n " , "B\n " , "C" , "D\n " , "E\n " , "F" ])
338
+ self .assertEqual (fi .filelineno (), 3 )
339
+ self .assertEqual (fi .lineno (), 6 )
340
+ self .assertEqual (fi ._files , ())
372
341
373
342
def test_close_on_exception (self ):
343
+ t1 = self .writeTmp ("" )
374
344
try :
375
- t1 = writeTmp (1 , ["" ])
376
345
with FileInput (files = t1 ) as fi :
377
346
raise OSError
378
347
except OSError :
379
348
self .assertEqual (fi ._files , ())
380
- finally :
381
- remove_tempfiles (t1 )
382
349
383
350
def test_empty_files_list_specified_to_constructor (self ):
384
351
with FileInput (files = []) as fi :
@@ -387,8 +354,7 @@ def test_empty_files_list_specified_to_constructor(self):
387
354
def test__getitem__ (self ):
388
355
"""Tests invoking FileInput.__getitem__() with the current
389
356
line number"""
390
- t = writeTmp (1 , ["line1\n " , "line2\n " ])
391
- self .addCleanup (remove_tempfiles , t )
357
+ t = self .writeTmp ("line1\n line2\n " )
392
358
with FileInput (files = [t ]) as fi :
393
359
retval1 = fi [0 ]
394
360
self .assertEqual (retval1 , "line1\n " )
@@ -398,8 +364,7 @@ def test__getitem__(self):
398
364
def test__getitem__invalid_key (self ):
399
365
"""Tests invoking FileInput.__getitem__() with an index unequal to
400
366
the line number"""
401
- t = writeTmp (1 , ["line1\n " , "line2\n " ])
402
- self .addCleanup (remove_tempfiles , t )
367
+ t = self .writeTmp ("line1\n line2\n " )
403
368
with FileInput (files = [t ]) as fi :
404
369
with self .assertRaises (RuntimeError ) as cm :
405
370
fi [1 ]
@@ -408,8 +373,7 @@ def test__getitem__invalid_key(self):
408
373
def test__getitem__eof (self ):
409
374
"""Tests invoking FileInput.__getitem__() with the line number but at
410
375
end-of-input"""
411
- t = writeTmp (1 , [])
412
- self .addCleanup (remove_tempfiles , t )
376
+ t = self .writeTmp ('' )
413
377
with FileInput (files = [t ]) as fi :
414
378
with self .assertRaises (IndexError ) as cm :
415
379
fi [0 ]
@@ -423,8 +387,8 @@ def test_nextfile_oserror_deleting_backup(self):
423
387
os_unlink_orig = os .unlink
424
388
os_unlink_replacement = UnconditionallyRaise (OSError )
425
389
try :
426
- t = writeTmp (1 , [ "\n " ] )
427
- self .addCleanup (remove_tempfiles , t )
390
+ t = self . writeTmp ("\n " )
391
+ self .addCleanup (support . unlink , t + '.bak' )
428
392
with FileInput (files = [t ], inplace = True ) as fi :
429
393
next (fi ) # make sure the file is opened
430
394
os .unlink = os_unlink_replacement
@@ -443,8 +407,7 @@ def test_readline_os_fstat_raises_OSError(self):
443
407
os_fstat_orig = os .fstat
444
408
os_fstat_replacement = UnconditionallyRaise (OSError )
445
409
try :
446
- t = writeTmp (1 , ["\n " ])
447
- self .addCleanup (remove_tempfiles , t )
410
+ t = self .writeTmp ("\n " )
448
411
with FileInput (files = [t ], inplace = True ) as fi :
449
412
os .fstat = os_fstat_replacement
450
413
fi .readline ()
@@ -463,8 +426,7 @@ def test_readline_os_chmod_raises_OSError(self):
463
426
os_chmod_orig = os .chmod
464
427
os_chmod_replacement = UnconditionallyRaise (OSError )
465
428
try :
466
- t = writeTmp (1 , ["\n " ])
467
- self .addCleanup (remove_tempfiles , t )
429
+ t = self .writeTmp ("\n " )
468
430
with FileInput (files = [t ], inplace = True ) as fi :
469
431
os .chmod = os_chmod_replacement
470
432
fi .readline ()
@@ -483,8 +445,7 @@ def fileno(self):
483
445
self .__call__ ()
484
446
485
447
unconditionally_raise_ValueError = FilenoRaisesValueError ()
486
- t = writeTmp (1 , ["\n " ])
487
- self .addCleanup (remove_tempfiles , t )
448
+ t = self .writeTmp ("\n " )
488
449
with FileInput (files = [t ]) as fi :
489
450
file_backup = fi ._file
490
451
try :
@@ -532,30 +493,22 @@ def test_iteration_buffering(self):
532
493
self .assertEqual (src .linesread , [])
533
494
534
495
def test_pathlib_file (self ):
535
- t1 = None
536
- try :
537
- t1 = Path (writeTmp (1 , ["Pathlib file." ]))
538
- with FileInput (t1 ) as fi :
539
- line = fi .readline ()
540
- self .assertEqual (line , 'Pathlib file.' )
541
- self .assertEqual (fi .lineno (), 1 )
542
- self .assertEqual (fi .filelineno (), 1 )
543
- self .assertEqual (fi .filename (), os .fspath (t1 ))
544
- finally :
545
- remove_tempfiles (t1 )
496
+ t1 = Path (self .writeTmp ("Pathlib file." ))
497
+ with FileInput (t1 ) as fi :
498
+ line = fi .readline ()
499
+ self .assertEqual (line , 'Pathlib file.' )
500
+ self .assertEqual (fi .lineno (), 1 )
501
+ self .assertEqual (fi .filelineno (), 1 )
502
+ self .assertEqual (fi .filename (), os .fspath (t1 ))
546
503
547
504
def test_pathlib_file_inplace (self ):
548
- t1 = None
549
- try :
550
- t1 = Path (writeTmp (1 , ['Pathlib file.' ]))
551
- with FileInput (t1 , inplace = True ) as fi :
552
- line = fi .readline ()
553
- self .assertEqual (line , 'Pathlib file.' )
554
- print ('Modified %s' % line )
555
- with open (t1 ) as f :
556
- self .assertEqual (f .read (), 'Modified Pathlib file.\n ' )
557
- finally :
558
- remove_tempfiles (t1 )
505
+ t1 = Path (self .writeTmp ('Pathlib file.' ))
506
+ with FileInput (t1 , inplace = True ) as fi :
507
+ line = fi .readline ()
508
+ self .assertEqual (line , 'Pathlib file.' )
509
+ print ('Modified %s' % line )
510
+ with open (t1 ) as f :
511
+ self .assertEqual (f .read (), 'Modified Pathlib file.\n ' )
559
512
560
513
561
514
class MockFileInput :
0 commit comments