|
1 | 1 | """Test script for the gzip module.
|
2 | 2 | """
|
3 | 3 |
|
4 |
| -import unittest |
5 |
| -from test import support |
6 |
| -from test.support import bigmemtest, _4G |
| 4 | +import array |
| 5 | +import functools |
| 6 | +import io |
7 | 7 | import os
|
8 | 8 | import pathlib
|
9 |
| -import io |
10 | 9 | import struct
|
11 |
| -import array |
| 10 | +import sys |
| 11 | +import unittest |
| 12 | +from subprocess import PIPE, Popen |
| 13 | +from test import support |
| 14 | +from test.support import _4G, bigmemtest |
| 15 | +from test.support.script_helper import assert_python_ok |
| 16 | + |
12 | 17 | gzip = support.import_module('gzip')
|
13 | 18 |
|
14 | 19 | data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
|
|
24 | 29 | """
|
25 | 30 |
|
26 | 31 |
|
| 32 | +TEMPDIR = os.path.abspath(support.TESTFN) + '-gzdir' |
| 33 | + |
| 34 | + |
27 | 35 | class UnseekableIO(io.BytesIO):
|
28 | 36 | def seekable(self):
|
29 | 37 | return False
|
@@ -665,8 +673,87 @@ def test_newline(self):
|
665 | 673 | with gzip.open(self.filename, "rt", newline="\r") as f:
|
666 | 674 | self.assertEqual(f.readlines(), [uncompressed])
|
667 | 675 |
|
| 676 | + |
| 677 | +def create_and_remove_directory(directory): |
| 678 | + def decorator(function): |
| 679 | + @functools.wraps(function) |
| 680 | + def wrapper(*args, **kwargs): |
| 681 | + os.makedirs(directory) |
| 682 | + try: |
| 683 | + return function(*args, **kwargs) |
| 684 | + finally: |
| 685 | + support.rmtree(directory) |
| 686 | + return wrapper |
| 687 | + return decorator |
| 688 | + |
| 689 | + |
| 690 | +class TestCommandLine(unittest.TestCase): |
| 691 | + data = b'This is a simple test with gzip' |
| 692 | + |
| 693 | + def test_decompress_stdin_stdout(self): |
| 694 | + with io.BytesIO() as bytes_io: |
| 695 | + with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file: |
| 696 | + gzip_file.write(self.data) |
| 697 | + |
| 698 | + args = sys.executable, '-m', 'gzip', '-d' |
| 699 | + with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: |
| 700 | + out, err = proc.communicate(bytes_io.getvalue()) |
| 701 | + |
| 702 | + self.assertEqual(err, b'') |
| 703 | + self.assertEqual(out, self.data) |
| 704 | + |
| 705 | + @create_and_remove_directory(TEMPDIR) |
| 706 | + def test_decompress_infile_outfile(self): |
| 707 | + gzipname = os.path.join(TEMPDIR, 'testgzip.gz') |
| 708 | + self.assertFalse(os.path.exists(gzipname)) |
| 709 | + |
| 710 | + with gzip.open(gzipname, mode='wb') as fp: |
| 711 | + fp.write(self.data) |
| 712 | + rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname) |
| 713 | + |
| 714 | + with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped: |
| 715 | + self.assertEqual(gunziped.read(), self.data) |
| 716 | + |
| 717 | + self.assertTrue(os.path.exists(gzipname)) |
| 718 | + self.assertEqual(rc, 0) |
| 719 | + self.assertEqual(out, b'') |
| 720 | + self.assertEqual(err, b'') |
| 721 | + |
| 722 | + def test_decompress_infile_outfile_error(self): |
| 723 | + rc, out, err = assert_python_ok('-m', 'gzip', '-d', 'thisisatest.out') |
| 724 | + self.assertIn(b"filename doesn't end in .gz:", out) |
| 725 | + self.assertEqual(rc, 0) |
| 726 | + self.assertEqual(err, b'') |
| 727 | + |
| 728 | + @create_and_remove_directory(TEMPDIR) |
| 729 | + def test_compress_stdin_outfile(self): |
| 730 | + args = sys.executable, '-m', 'gzip' |
| 731 | + with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: |
| 732 | + out, err = proc.communicate(self.data) |
| 733 | + |
| 734 | + self.assertEqual(err, b'') |
| 735 | + self.assertEqual(out[:2], b"\x1f\x8b") |
| 736 | + |
| 737 | + @create_and_remove_directory(TEMPDIR) |
| 738 | + def test_compress_infile_outfile(self): |
| 739 | + local_testgzip = os.path.join(TEMPDIR, 'testgzip') |
| 740 | + gzipname = local_testgzip + '.gz' |
| 741 | + self.assertFalse(os.path.exists(gzipname)) |
| 742 | + |
| 743 | + with open(local_testgzip, 'wb') as fp: |
| 744 | + fp.write(self.data) |
| 745 | + |
| 746 | + rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip) |
| 747 | + |
| 748 | + self.assertTrue(os.path.exists(gzipname)) |
| 749 | + self.assertEqual(rc, 0) |
| 750 | + self.assertEqual(out, b'') |
| 751 | + self.assertEqual(err, b'') |
| 752 | + |
| 753 | + |
668 | 754 | def test_main(verbose=None):
|
669 |
| - support.run_unittest(TestGzip, TestOpen) |
| 755 | + support.run_unittest(TestGzip, TestOpen, TestCommandLine) |
| 756 | + |
670 | 757 |
|
671 | 758 | if __name__ == "__main__":
|
672 | 759 | test_main(verbose=True)
|
0 commit comments