Skip to content

Commit b7ca376

Browse files
committed
try to reslove conflict
1 parent 3126ac3 commit b7ca376

File tree

3 files changed

+98
-53
lines changed

3 files changed

+98
-53
lines changed

Lib/test/test_zipfile/_path/test_path.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import io
22
import itertools
33
import contextlib
4-
import os
54
import pathlib
65
import pickle
76
import stat
87
import sys
9-
import time
108
import unittest
119
import zipfile
1210
import zipfile._path
@@ -635,7 +633,7 @@ def test_backslash_not_separator(self):
635633
"""
636634
data = io.BytesIO()
637635
zf = zipfile.ZipFile(data, "w")
638-
zf.writestr(DirtyZipInfo.for_name("foo\\bar", zf), b"content")
636+
zf.writestr(DirtyZipInfo("foo\\bar")._for_archive(zf), b"content")
639637
zf.filename = ''
640638
root = zipfile.Path(zf)
641639
(first,) = root.iterdir()
@@ -658,22 +656,3 @@ class DirtyZipInfo(zipfile.ZipInfo):
658656
def __init__(self, filename, *args, **kwargs):
659657
super().__init__(filename, *args, **kwargs)
660658
self.filename = filename
661-
662-
@classmethod
663-
def for_name(cls, name, archive):
664-
"""
665-
Construct the same way that ZipFile.writestr does.
666-
667-
TODO: extract this functionality and re-use
668-
"""
669-
epoch = os.environ.get('SOURCE_DATE_EPOCH')
670-
get_time = int(epoch) if epoch else time.time()
671-
self = cls(filename=name, date_time=time.gmtime(get_time)[:6])
672-
self.compress_type = archive.compression
673-
self.compress_level = archive.compresslevel
674-
if self.filename.endswith('/'): # pragma: no cover
675-
self.external_attr = 0o40775 << 16 # drwxrwxr-x
676-
self.external_attr |= 0x10 # MS-DOS directory flag
677-
else:
678-
self.external_attr = 0o600 << 16 # ?rw-------
679-
return self

Lib/test/test_zipfile/test_core.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import itertools
66
import os
77
import posixpath
8+
import stat
89
import struct
910
import subprocess
1011
import sys
@@ -15,6 +16,7 @@
1516

1617

1718
from tempfile import TemporaryFile
19+
from test.support import os_helper
1820
from random import randint, random, randbytes
1921

2022
from test import archiver_tests
@@ -1781,19 +1783,20 @@ def test_writestr_extended_local_header_issue1202(self):
17811783
orig_zip.writestr(zinfo, data)
17821784

17831785
def test_write_with_source_date_epoch(self):
1784-
# Set the SOURCE_DATE_EPOCH environment variable to a specific timestamp
1785-
os.environ['SOURCE_DATE_EPOCH'] = "1727440508"
1786+
with os_helper.EnvironmentVarGuard() as env:
1787+
# Set the SOURCE_DATE_EPOCH environment variable to a specific timestamp
1788+
env['SOURCE_DATE_EPOCH'] = "1727440508"
17861789

1787-
with zipfile.ZipFile(TESTFN, "w") as zf:
1788-
zf.writestr("test_source_date_epoch.txt", "Testing SOURCE_DATE_EPOCH")
1790+
with zipfile.ZipFile(TESTFN, "w") as zf:
1791+
zf.writestr("test_source_date_epoch.txt", "Testing SOURCE_DATE_EPOCH")
17891792

1790-
with zipfile.ZipFile(TESTFN, "r") as zf:
1791-
zip_info = zf.getinfo("test_source_date_epoch.txt")
1792-
get_time = time.gmtime(int(os.environ['SOURCE_DATE_EPOCH']))[:6]
1793-
# Compare each element of the date_time tuple
1794-
# Allow for a 1-second difference
1795-
for z_time, g_time in zip(zip_info.date_time, get_time):
1796-
self.assertAlmostEqual(z_time, g_time, delta=1)
1793+
with zipfile.ZipFile(TESTFN, "r") as zf:
1794+
zip_info = zf.getinfo("test_source_date_epoch.txt")
1795+
get_time = time.gmtime(int(os.environ['SOURCE_DATE_EPOCH']))[:6]
1796+
# Compare each element of the date_time tuple
1797+
# Allow for a 1-second difference
1798+
for z_time, g_time in zip(zip_info.date_time, get_time):
1799+
self.assertAlmostEqual(z_time, g_time, delta=1)
17971800

17981801
def test_write_without_source_date_epoch(self):
17991802
if 'SOURCE_DATE_EPOCH' in os.environ:
@@ -1997,10 +2000,16 @@ def test_is_zip_valid_file(self):
19972000
zip_contents = fp.read()
19982001
# - passing a file-like object
19992002
fp = io.BytesIO()
2000-
fp.write(zip_contents)
2003+
end = fp.write(zip_contents)
2004+
self.assertEqual(fp.tell(), end)
2005+
mid = end // 2
2006+
fp.seek(mid, 0)
20012007
self.assertTrue(zipfile.is_zipfile(fp))
2002-
fp.seek(0, 0)
2008+
# check that the position is left unchanged after the call
2009+
# see: https://github.com/python/cpython/issues/122356
2010+
self.assertEqual(fp.tell(), mid)
20032011
self.assertTrue(zipfile.is_zipfile(fp))
2012+
self.assertEqual(fp.tell(), mid)
20042013

20052014
def test_non_existent_file_raises_OSError(self):
20062015
# make sure we don't raise an AttributeError when a partially-constructed
@@ -2233,6 +2242,34 @@ def test_create_empty_zipinfo_repr(self):
22332242
zi = zipfile.ZipInfo(filename="empty")
22342243
self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
22352244

2245+
def test_for_archive(self):
2246+
base_filename = TESTFN2.rstrip('/')
2247+
2248+
with zipfile.ZipFile(TESTFN, mode="w", compresslevel=1,
2249+
compression=zipfile.ZIP_STORED) as zf:
2250+
# no trailing forward slash
2251+
zi = zipfile.ZipInfo(base_filename)._for_archive(zf)
2252+
self.assertEqual(zi.compress_level, 1)
2253+
self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2254+
# ?rw- --- ---
2255+
filemode = stat.S_IRUSR | stat.S_IWUSR
2256+
# filemode is stored as the highest 16 bits of external_attr
2257+
self.assertEqual(zi.external_attr >> 16, filemode)
2258+
self.assertEqual(zi.external_attr & 0xFF, 0) # no MS-DOS flag
2259+
2260+
with zipfile.ZipFile(TESTFN, mode="w", compresslevel=1,
2261+
compression=zipfile.ZIP_STORED) as zf:
2262+
# with a trailing slash
2263+
zi = zipfile.ZipInfo(f'{base_filename}/')._for_archive(zf)
2264+
self.assertEqual(zi.compress_level, 1)
2265+
self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2266+
# d rwx rwx r-x
2267+
filemode = stat.S_IFDIR
2268+
filemode |= stat.S_IRWXU | stat.S_IRWXG
2269+
filemode |= stat.S_IROTH | stat.S_IXOTH
2270+
self.assertEqual(zi.external_attr >> 16, filemode)
2271+
self.assertEqual(zi.external_attr & 0xFF, 0x10) # MS-DOS flag
2272+
22362273
def test_create_empty_zipinfo_default_attributes(self):
22372274
"""Ensure all required attributes are set."""
22382275
zi = zipfile.ZipInfo()
@@ -2355,6 +2392,18 @@ def test_read_after_seek(self):
23552392
fp.seek(1, os.SEEK_CUR)
23562393
self.assertEqual(fp.read(-1), b'men!')
23572394

2395+
def test_uncompressed_interleaved_seek_read(self):
2396+
# gh-127847: Make sure the position in the archive is correct
2397+
# in the special case of seeking in a ZIP_STORED entry.
2398+
with zipfile.ZipFile(TESTFN, "w") as zipf:
2399+
zipf.writestr("a.txt", "123")
2400+
zipf.writestr("b.txt", "456")
2401+
with zipfile.ZipFile(TESTFN, "r") as zipf:
2402+
with zipf.open("a.txt", "r") as a, zipf.open("b.txt", "r") as b:
2403+
self.assertEqual(a.read(1), b"1")
2404+
self.assertEqual(b.seek(1), 1)
2405+
self.assertEqual(b.read(1), b"5")
2406+
23582407
@requires_bz2()
23592408
def test_decompress_without_3rd_party_library(self):
23602409
data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

Lib/zipfile/__init__.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import sys
1414
import threading
1515
import time
16+
from typing import Self
1617

1718
try:
1819
import zlib # We may need its compression method
@@ -241,7 +242,9 @@ def is_zipfile(filename):
241242
result = False
242243
try:
243244
if hasattr(filename, "read"):
245+
pos = filename.tell()
244246
result = _check_zipfile(fp=filename)
247+
filename.seek(pos)
245248
else:
246249
with open(filename, "rb") as fp:
247250
result = _check_zipfile(fp)
@@ -309,7 +312,7 @@ def _EndRecData(fpin):
309312
fpin.seek(-sizeEndCentDir, 2)
310313
except OSError:
311314
return None
312-
data = fpin.read()
315+
data = fpin.read(sizeEndCentDir)
313316
if (len(data) == sizeEndCentDir and
314317
data[0:4] == stringEndArchive and
315318
data[-2:] == b"\000\000"):
@@ -329,9 +332,9 @@ def _EndRecData(fpin):
329332
# record signature. The comment is the last item in the ZIP file and may be
330333
# up to 64K long. It is assumed that the "end of central directory" magic
331334
# number does not appear in the comment.
332-
maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0)
335+
maxCommentStart = max(filesize - ZIP_MAX_COMMENT - sizeEndCentDir, 0)
333336
fpin.seek(maxCommentStart, 0)
334-
data = fpin.read()
337+
data = fpin.read(ZIP_MAX_COMMENT + sizeEndCentDir)
335338
start = data.rfind(stringEndArchive)
336339
if start >= 0:
337340
# found the magic number; attempt to unpack and interpret
@@ -603,6 +606,28 @@ def from_file(cls, filename, arcname=None, *, strict_timestamps=True):
603606

604607
return zinfo
605608

609+
def _for_archive(self, archive: ZipFile) -> Self:
610+
"""Resolve suitable defaults from the archive.
611+
612+
Resolve the date_time, compression attributes, and external attributes
613+
to suitable defaults as used by :method:`ZipFile.writestr`.
614+
615+
Return self.
616+
"""
617+
# gh-91279: Set the SOURCE_DATE_EPOCH to a specific timestamp
618+
self.epoch = os.environ.get('SOURCE_DATE_EPOCH')
619+
self.get_time = int(self.epoch) if self.epoch else time.time()
620+
self.date_time = time.gmtime(self.get_time)[:6]
621+
622+
self.compress_type = archive.compression
623+
self.compress_level = archive.compresslevel
624+
if self.filename.endswith('/'): # pragma: no cover
625+
self.external_attr = 0o40775 << 16 # drwxrwxr-x
626+
self.external_attr |= 0x10 # MS-DOS directory flag
627+
else:
628+
self.external_attr = 0o600 << 16 # ?rw-------
629+
return self
630+
606631
def is_dir(self):
607632
"""Return True if this archive member is a directory."""
608633
if self.filename.endswith('/'):
@@ -817,7 +842,10 @@ def seek(self, offset, whence=0):
817842
raise ValueError("Can't reposition in the ZIP file while "
818843
"there is an open writing handle on it. "
819844
"Close the writing handle before trying to read.")
820-
self._file.seek(offset, whence)
845+
if whence == os.SEEK_CUR:
846+
self._file.seek(self._pos + offset)
847+
else:
848+
self._file.seek(offset, whence)
821849
self._pos = self._file.tell()
822850
return self._pos
823851

@@ -1903,21 +1931,10 @@ def writestr(self, zinfo_or_arcname, data,
19031931
the name of the file in the archive."""
19041932
if isinstance(data, str):
19051933
data = data.encode("utf-8")
1906-
if not isinstance(zinfo_or_arcname, ZipInfo):
1907-
# gh-91279: Set the SOURCE_DATE_EPOCH to a specific timestamp
1908-
epoch = os.environ.get('SOURCE_DATE_EPOCH')
1909-
get_time = int(epoch) if epoch else time.time()
1910-
zinfo = ZipInfo(filename=zinfo_or_arcname,
1911-
date_time=time.gmtime(get_time)[:6])
1912-
zinfo.compress_type = self.compression
1913-
zinfo.compress_level = self.compresslevel
1914-
if zinfo.filename.endswith('/'):
1915-
zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
1916-
zinfo.external_attr |= 0x10 # MS-DOS directory flag
1917-
else:
1918-
zinfo.external_attr = 0o600 << 16 # ?rw-------
1919-
else:
1934+
if isinstance(zinfo_or_arcname, ZipInfo):
19201935
zinfo = zinfo_or_arcname
1936+
else:
1937+
zinfo = ZipInfo(zinfo_or_arcname)._for_archive(self)
19211938

19221939
if not self.fp:
19231940
raise ValueError(

0 commit comments

Comments
 (0)