mirror of
https://github.com/python/cpython.git
synced 2026-06-20 18:11:54 -04:00
gh-51067: Add remove() and repack() to ZipFile (GH-134627)
The docs included in the commit do the best job of describing this. Much discussion on the PR and issue. thank you to to core team folks jaraco, emmatyping, gpshead, and all others who added their constructive comments along the way. --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
@@ -550,6 +550,94 @@ ZipFile objects
|
||||
.. versionadded:: 3.11
|
||||
|
||||
|
||||
.. method:: ZipFile.remove(zinfo_or_arcname)
|
||||
|
||||
Removes a member entry from the archive's central directory.
|
||||
*zinfo_or_arcname* may be the full path of the member or a :class:`ZipInfo`
|
||||
instance. If multiple members share the same full path and the path is
|
||||
given as a string, only one of them is removed and which one is unspecified;
|
||||
it should not be relied upon. Pass the specific :class:`ZipInfo` instance to
|
||||
remove a particular member.
|
||||
|
||||
The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``.
|
||||
|
||||
Returns the removed :class:`ZipInfo` instance.
|
||||
|
||||
Calling :meth:`remove` on a closed ZipFile will raise a :exc:`ValueError`.
|
||||
|
||||
.. note::
|
||||
This method only removes the member's entry from the central directory,
|
||||
making it inaccessible to most tools. The member's local file entry,
|
||||
including content and metadata, remains in the archive and is still
|
||||
recoverable using forensic tools. Call :meth:`repack` afterwards to
|
||||
remove the local file entry and reclaim space; pass the returned
|
||||
:class:`ZipInfo` to :meth:`repack` to ensure the data is removed
|
||||
regardless of how the entry was written.
|
||||
|
||||
.. versionadded:: next
|
||||
|
||||
|
||||
.. method:: ZipFile.repack(removed=None, *, \
|
||||
strict_descriptor=True[, chunk_size])
|
||||
|
||||
Rewrites the archive to remove unreferenced local file entries, shrinking
|
||||
its file size. The archive must be opened with mode ``'a'``.
|
||||
|
||||
If *removed* is provided, it must be a sequence of :class:`ZipInfo` objects
|
||||
representing the recently removed members, and only their corresponding
|
||||
local file entries will be removed. Otherwise, the archive is scanned to
|
||||
locate and remove local file entries that are no longer referenced in the
|
||||
central directory.
|
||||
|
||||
Passing *removed* is the most reliable way to reclaim space: the
|
||||
corresponding local file entries are located directly from the central
|
||||
directory and removed regardless of how they were written, whereas the scan
|
||||
used when *removed* is omitted may leave some entries in place (see
|
||||
*strict_descriptor* below). To remove members and reclaim their space in a
|
||||
single step::
|
||||
|
||||
with ZipFile('spam.zip', 'a') as myzip:
|
||||
removed = [myzip.remove(name) for name in ('ham.txt', 'eggs.txt')]
|
||||
myzip.repack(removed)
|
||||
|
||||
When scanning, *strict_descriptor* controls how entries written with an
|
||||
unsigned *data descriptor* are handled. A data descriptor is an optional
|
||||
record holding an entry's CRC and sizes, stored just after the entry's data;
|
||||
it is used when the archive is written to a non-seekable stream, and is
|
||||
*signed* when it begins with a marker signature or *unsigned* otherwise.
|
||||
Unsigned descriptors have been deprecated by the `PKZIP Application Note`_
|
||||
since version 6.3.0 (released in 2006) and are written only by some legacy
|
||||
tools; signed descriptors—written by Python and other modern tools—are always
|
||||
detected. When *strict_descriptor* is true (the default), only signed data
|
||||
descriptors are detected, so an unreferenced entry written with an unsigned
|
||||
descriptor is not located and its space is not reclaimed by the scan.
|
||||
Setting ``strict_descriptor=False`` additionally detects unsigned
|
||||
descriptors, at the cost of a significantly slower scan—around 100 to 1000
|
||||
times in the worst case—which may be exploitable as a denial-of-service
|
||||
vector on untrusted input. This does not affect entries without a data
|
||||
descriptor, and is not needed when *removed* is provided.
|
||||
|
||||
*chunk_size* may be specified to control the buffer size when moving
|
||||
entry data (default is 1 MiB).
|
||||
|
||||
Calling :meth:`repack` on a closed ZipFile will raise a :exc:`ValueError`.
|
||||
|
||||
.. note::
|
||||
The scanning algorithm is heuristic-based and assumes that the ZIP file
|
||||
is normally structured—for example, with local file entries stored
|
||||
consecutively, without overlap or interleaved binary data. Prepended
|
||||
binary data, such as a self-extractor stub, is recognized and preserved
|
||||
unless it happens to contain bytes that coincidentally resemble a valid
|
||||
local file entry in multiple respects—an extremely rare case. Embedded
|
||||
ZIP payloads are also handled correctly, as long as they follow normal
|
||||
structure. However, the algorithm does not guarantee correctness or
|
||||
safety on untrusted or intentionally crafted input. It is generally
|
||||
recommended to provide the *removed* argument for better reliability and
|
||||
performance.
|
||||
|
||||
.. versionadded:: next
|
||||
|
||||
|
||||
The following data attributes are also available:
|
||||
|
||||
.. attribute:: ZipFile.filename
|
||||
|
||||
@@ -172,6 +172,15 @@ xml
|
||||
instead of failing later, when encounter non-ASCII data.
|
||||
(Contributed by Serhiy Storchaka in :gh:`62259`.)
|
||||
|
||||
zipfile
|
||||
-------
|
||||
|
||||
* Add :meth:`ZipFile.remove() <zipfile.ZipFile.remove>` to remove a member
|
||||
from an archive's central directory, and
|
||||
:meth:`ZipFile.repack() <zipfile.ZipFile.repack>` to reclaim the space used
|
||||
by the local file entries of removed members.
|
||||
(Contributed by Danny Lin in :gh:`51067`.)
|
||||
|
||||
.. Add improved modules above alphabetically, not here at the end.
|
||||
|
||||
Optimizations
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -13,12 +13,16 @@ support.requires(
|
||||
|
||||
import zipfile, unittest
|
||||
import time
|
||||
import tracemalloc
|
||||
import sys
|
||||
import unittest.mock as mock
|
||||
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
from test.support import os_helper
|
||||
from test.support import requires_zlib
|
||||
from test.test_zipfile.test_core import Unseekable
|
||||
from test.test_zipfile.test_core import struct_pack_no_dd_sig
|
||||
|
||||
TESTFN = os_helper.TESTFN
|
||||
TESTFN2 = TESTFN + "2"
|
||||
@@ -87,6 +91,178 @@ class TestsWithSourceFile(unittest.TestCase):
|
||||
os_helper.unlink(TESTFN2)
|
||||
|
||||
|
||||
class TestRepack(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create test data.
|
||||
line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
|
||||
self.data = '\n'.join(line_gen).encode('ascii')
|
||||
|
||||
# It will contain enough copies of self.data to reach about 8 GiB.
|
||||
self.datacount = 8*1024**3 // len(self.data)
|
||||
|
||||
# memory usage should not exceed 10 MiB
|
||||
self.allowed_memory = 10*1024**2
|
||||
|
||||
def _write_large_file(self, fh):
|
||||
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
|
||||
for num in range(self.datacount):
|
||||
fh.write(self.data)
|
||||
# Print still working message since this test can be really slow
|
||||
if next_time <= time.monotonic():
|
||||
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
|
||||
print((
|
||||
' writing %d of %d, be patient...' %
|
||||
(num, self.datacount)), file=sys.__stdout__)
|
||||
sys.__stdout__.flush()
|
||||
|
||||
def test_strip_removed_large_file(self):
|
||||
"""Should move the physical data of a file positioned after a large
|
||||
removed file without causing a memory issue."""
|
||||
# Try the temp file. If we do TESTFN2, then it hogs
|
||||
# gigabytes of disk space for the duration of the test.
|
||||
with TemporaryFile() as f:
|
||||
tracemalloc.start()
|
||||
self._test_strip_removed_large_file(f)
|
||||
self.assertFalse(f.closed)
|
||||
current, peak = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
self.assertLess(peak, self.allowed_memory)
|
||||
|
||||
def _test_strip_removed_large_file(self, f):
|
||||
file = 'file.txt'
|
||||
file1 = 'largefile.txt'
|
||||
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
|
||||
with zipfile.ZipFile(f, 'w') as zh:
|
||||
with zh.open(file1, 'w', force_zip64=True) as fh:
|
||||
self._write_large_file(fh)
|
||||
zh.writestr(file, data)
|
||||
|
||||
with zipfile.ZipFile(f, 'a') as zh:
|
||||
zh.remove(file1)
|
||||
zh.repack()
|
||||
self.assertIsNone(zh.testzip())
|
||||
|
||||
def test_strip_removed_file_before_large_file(self):
|
||||
"""Should move the physical data of a large file positioned after a
|
||||
removed file without causing a memory issue."""
|
||||
# Try the temp file. If we do TESTFN2, then it hogs
|
||||
# gigabytes of disk space for the duration of the test.
|
||||
with TemporaryFile() as f:
|
||||
tracemalloc.start()
|
||||
self._test_strip_removed_file_before_large_file(f)
|
||||
self.assertFalse(f.closed)
|
||||
current, peak = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
self.assertLess(peak, self.allowed_memory)
|
||||
|
||||
def _test_strip_removed_file_before_large_file(self, f):
|
||||
file = 'file.txt'
|
||||
file1 = 'largefile.txt'
|
||||
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
|
||||
with zipfile.ZipFile(f, 'w') as zh:
|
||||
zh.writestr(file, data)
|
||||
with zh.open(file1, 'w', force_zip64=True) as fh:
|
||||
self._write_large_file(fh)
|
||||
|
||||
with zipfile.ZipFile(f, 'a') as zh:
|
||||
zh.remove(file)
|
||||
zh.repack()
|
||||
self.assertIsNone(zh.testzip())
|
||||
|
||||
def test_strip_removed_large_file_with_dd(self):
|
||||
"""Should scan for the data descriptor of a removed large file without
|
||||
causing a memory issue."""
|
||||
# Try the temp file. If we do TESTFN2, then it hogs
|
||||
# gigabytes of disk space for the duration of the test.
|
||||
with TemporaryFile() as f:
|
||||
tracemalloc.start()
|
||||
self._test_strip_removed_large_file_with_dd(f)
|
||||
self.assertFalse(f.closed)
|
||||
current, peak = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
self.assertLess(peak, self.allowed_memory)
|
||||
|
||||
def _test_strip_removed_large_file_with_dd(self, f):
|
||||
file = 'file.txt'
|
||||
file1 = 'largefile.txt'
|
||||
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
|
||||
with zipfile.ZipFile(Unseekable(f), 'w') as zh:
|
||||
with zh.open(file1, 'w', force_zip64=True) as fh:
|
||||
self._write_large_file(fh)
|
||||
zh.writestr(file, data)
|
||||
|
||||
with zipfile.ZipFile(f, 'a') as zh:
|
||||
zh.remove(file1)
|
||||
zh.repack()
|
||||
self.assertIsNone(zh.testzip())
|
||||
|
||||
def test_strip_removed_large_file_with_dd_no_sig(self):
|
||||
"""Should scan for the data descriptor (without signature) of a removed
|
||||
large file without causing a memory issue."""
|
||||
# Reduce data scale for this test, as it's especially slow...
|
||||
self.datacount = 30*1024**2 // len(self.data)
|
||||
self.allowed_memory = 200*1024
|
||||
|
||||
# Try the temp file. If we do TESTFN2, then it hogs
|
||||
# gigabytes of disk space for the duration of the test.
|
||||
with TemporaryFile() as f:
|
||||
tracemalloc.start()
|
||||
self._test_strip_removed_large_file_with_dd_no_sig(f)
|
||||
self.assertFalse(f.closed)
|
||||
current, peak = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
self.assertLess(peak, self.allowed_memory)
|
||||
|
||||
def _test_strip_removed_large_file_with_dd_no_sig(self, f):
|
||||
file = 'file.txt'
|
||||
file1 = 'largefile.txt'
|
||||
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
|
||||
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
|
||||
with zipfile.ZipFile(Unseekable(f), 'w') as zh:
|
||||
with zh.open(file1, 'w', force_zip64=True) as fh:
|
||||
self._write_large_file(fh)
|
||||
zh.writestr(file, data)
|
||||
|
||||
with zipfile.ZipFile(f, 'a') as zh:
|
||||
zh.remove(file1)
|
||||
# strict_descriptor=False to scan the unsigned data descriptor
|
||||
# (scanning is disabled under the strict_descriptor=True default)
|
||||
zh.repack(strict_descriptor=False)
|
||||
self.assertIsNone(zh.testzip())
|
||||
|
||||
@requires_zlib()
|
||||
def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self):
|
||||
"""Should scan for the data descriptor (without signature) of a removed
|
||||
large file without causing a memory issue."""
|
||||
# Try the temp file. If we do TESTFN2, then it hogs
|
||||
# gigabytes of disk space for the duration of the test.
|
||||
with TemporaryFile() as f:
|
||||
tracemalloc.start()
|
||||
self._test_strip_removed_large_file_with_dd_no_sig_by_decompression(
|
||||
f, zipfile.ZIP_DEFLATED)
|
||||
self.assertFalse(f.closed)
|
||||
current, peak = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
self.assertLess(peak, self.allowed_memory)
|
||||
|
||||
def _test_strip_removed_large_file_with_dd_no_sig_by_decompression(self, f, method):
|
||||
file = 'file.txt'
|
||||
file1 = 'largefile.txt'
|
||||
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
|
||||
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
|
||||
with zipfile.ZipFile(Unseekable(f), 'w', compression=method) as zh:
|
||||
with zh.open(file1, 'w', force_zip64=True) as fh:
|
||||
self._write_large_file(fh)
|
||||
zh.writestr(file, data)
|
||||
|
||||
with zipfile.ZipFile(f, 'a') as zh:
|
||||
zh.remove(file1)
|
||||
# strict_descriptor=False to detect the unsigned data descriptor
|
||||
# (scanning is disabled under the strict_descriptor=True default)
|
||||
zh.repack(strict_descriptor=False)
|
||||
self.assertIsNone(zh.testzip())
|
||||
|
||||
|
||||
class OtherTests(unittest.TestCase):
|
||||
def testMoreThan64kFiles(self):
|
||||
# This test checks that more than 64k files can be added to an archive,
|
||||
|
||||
@@ -785,6 +785,13 @@ class LZMADecompressor:
|
||||
self._unconsumed = b''
|
||||
self.eof = False
|
||||
|
||||
@property
|
||||
def unused_data(self):
|
||||
try:
|
||||
return self._decomp.unused_data
|
||||
except AttributeError:
|
||||
return b''
|
||||
|
||||
def decompress(self, data):
|
||||
if self._decomp is None:
|
||||
self._unconsumed += data
|
||||
@@ -1387,6 +1394,475 @@ class _ZipWriteFile(io.BufferedIOBase):
|
||||
self._zipfile._writing = False
|
||||
|
||||
|
||||
_REPACK_CHUNK_SIZE = 2**20
|
||||
|
||||
|
||||
class _ZipRepacker:
|
||||
"""Class for ZipFile repacking."""
|
||||
def __init__(self, *, strict_descriptor=True,
|
||||
chunk_size=_REPACK_CHUNK_SIZE, debug=0):
|
||||
self.debug = debug # Level of printing: 0 through 3
|
||||
self.chunk_size = chunk_size
|
||||
self.strict_descriptor = strict_descriptor
|
||||
|
||||
def _debug(self, level, *msg):
|
||||
if self.debug >= level:
|
||||
print(*msg)
|
||||
|
||||
def repack(self, zfile, removed=None):
|
||||
"""
|
||||
Repack the ZIP file, stripping unreferenced local file entries.
|
||||
|
||||
Assumes that local file entries (and the central directory, which is
|
||||
mostly treated as the "last entry") are stored consecutively, with no
|
||||
gaps or overlaps:
|
||||
|
||||
1. If any referenced entry overlaps with another, a `BadZipFile` error
|
||||
is raised since safe repacking cannot be guaranteed.
|
||||
|
||||
2. Data before the first referenced entry is stripped only when it
|
||||
appears to be a sequence of consecutive entries with no extra
|
||||
following bytes; extra preceding bytes are preserved.
|
||||
|
||||
3. Data between referenced entries is stripped only when it appears to
|
||||
be a sequence of consecutive entries with no extra preceding bytes;
|
||||
extra following bytes are preserved.
|
||||
|
||||
This is to prevent an unexpected data removal (false positive), though
|
||||
a false negative may happen in certain rare cases.
|
||||
|
||||
Examples:
|
||||
|
||||
Stripping before the first referenced entry:
|
||||
|
||||
[random bytes]
|
||||
[unreferenced local file entry]
|
||||
[random bytes]
|
||||
<-- stripping start
|
||||
[unreferenced local file entry]
|
||||
[unreferenced local file entry]
|
||||
<-- stripping end
|
||||
[local file entry 1] (or central directory)
|
||||
...
|
||||
|
||||
Stripping between referenced entries:
|
||||
|
||||
...
|
||||
[local file entry]
|
||||
<-- stripping start
|
||||
[unreferenced local file entry]
|
||||
[unreferenced local file entry]
|
||||
<-- stripping end
|
||||
[random bytes]
|
||||
[unreferenced local file entry]
|
||||
[random bytes]
|
||||
[local file entry] (or central directory)
|
||||
...
|
||||
|
||||
No stripping:
|
||||
|
||||
[unreferenced local file entry]
|
||||
[random bytes]
|
||||
[local file entry 1] (or central directory)
|
||||
...
|
||||
|
||||
No stripping:
|
||||
|
||||
...
|
||||
[local file entry]
|
||||
[random bytes]
|
||||
[unreferenced local file entry]
|
||||
[local file entry] (or central directory)
|
||||
...
|
||||
|
||||
Side effects:
|
||||
- Modifies the ZIP file in place.
|
||||
- Updates zfile.start_dir to account for removed data.
|
||||
- Sets zfile._didModify to True.
|
||||
- Updates header_offset and clears _end_offset of referenced
|
||||
ZipInfo instances.
|
||||
|
||||
Parameters:
|
||||
zfile: A ZipFile object representing the archive to repack.
|
||||
removed: Optional. A sequence of ZipInfo instances representing
|
||||
the previously removed entries. When provided, only their
|
||||
corresponding local file entries are stripped.
|
||||
"""
|
||||
removed_zinfos = set(removed or ())
|
||||
|
||||
fp = zfile.fp
|
||||
|
||||
# get a sorted filelist by header offset, in case the dir order
|
||||
# doesn't match the actual entry order
|
||||
filelist = (*zfile.filelist, *removed_zinfos)
|
||||
filelist = sorted(filelist, key=lambda x: x.header_offset)
|
||||
|
||||
# calculate each entry size and validate
|
||||
entry_size_list = []
|
||||
used_entry_size_list = []
|
||||
for i, zinfo in enumerate(filelist):
|
||||
try:
|
||||
offset = filelist[i + 1].header_offset
|
||||
except IndexError:
|
||||
offset = zfile.start_dir
|
||||
entry_size = offset - zinfo.header_offset
|
||||
|
||||
# may raise on an invalid local file header
|
||||
used_entry_size = self._calc_local_file_entry_size(fp, zinfo)
|
||||
|
||||
self._debug(3, 'entry:', i, zinfo.orig_filename,
|
||||
zinfo.header_offset, entry_size, used_entry_size)
|
||||
if used_entry_size > entry_size:
|
||||
raise BadZipFile(
|
||||
f"Overlapped entries: {zinfo.orig_filename!r} ")
|
||||
|
||||
if removed is not None and zinfo not in removed_zinfos:
|
||||
used_entry_size = entry_size
|
||||
|
||||
entry_size_list.append(entry_size)
|
||||
used_entry_size_list.append(used_entry_size)
|
||||
|
||||
# calculate the starting entry offset (bytes to skip)
|
||||
if removed is None:
|
||||
try:
|
||||
offset = filelist[0].header_offset
|
||||
except IndexError:
|
||||
offset = zfile.start_dir
|
||||
entry_offset = self._calc_initial_entry_offset(fp, offset)
|
||||
else:
|
||||
entry_offset = 0
|
||||
|
||||
# move file entries
|
||||
for i, zinfo in enumerate(filelist):
|
||||
entry_size = entry_size_list[i]
|
||||
used_entry_size = used_entry_size_list[i]
|
||||
|
||||
# update the header and move entry data to the new position
|
||||
old_header_offset = zinfo.header_offset
|
||||
zinfo.header_offset -= entry_offset
|
||||
|
||||
if zinfo in removed_zinfos:
|
||||
self._copy_bytes(
|
||||
fp,
|
||||
old_header_offset + used_entry_size,
|
||||
zinfo.header_offset,
|
||||
entry_size - used_entry_size,
|
||||
)
|
||||
|
||||
# update entry_offset for subsequent files to follow
|
||||
entry_offset += used_entry_size
|
||||
|
||||
else:
|
||||
if entry_offset > 0:
|
||||
self._copy_bytes(
|
||||
fp,
|
||||
old_header_offset,
|
||||
zinfo.header_offset,
|
||||
used_entry_size,
|
||||
)
|
||||
|
||||
stale_entry_size = self._validate_local_file_entry_sequence(
|
||||
fp,
|
||||
old_header_offset + used_entry_size,
|
||||
old_header_offset + entry_size,
|
||||
)
|
||||
|
||||
if stale_entry_size > 0:
|
||||
self._copy_bytes(
|
||||
fp,
|
||||
old_header_offset + used_entry_size + stale_entry_size,
|
||||
zinfo.header_offset + used_entry_size,
|
||||
entry_size - used_entry_size - stale_entry_size,
|
||||
)
|
||||
|
||||
# update entry_offset for subsequent files to follow
|
||||
entry_offset += stale_entry_size
|
||||
|
||||
# update state
|
||||
zfile.start_dir -= entry_offset
|
||||
zfile._didModify = True
|
||||
|
||||
for zinfo in filelist:
|
||||
zinfo._end_offset = None
|
||||
|
||||
def _calc_initial_entry_offset(self, fp, data_offset):
|
||||
checked_offsets = {}
|
||||
if data_offset > 0:
|
||||
self._debug(3, 'scanning file signatures before:', data_offset)
|
||||
for pos in self._iter_scan_signature(fp, stringFileHeader, 0, data_offset):
|
||||
self._debug(3, 'checking file signature at:', pos)
|
||||
entry_size = self._validate_local_file_entry_sequence(
|
||||
fp, pos, data_offset, checked_offsets)
|
||||
if entry_size == data_offset - pos:
|
||||
return entry_size
|
||||
return 0
|
||||
|
||||
def _iter_scan_signature(self, fp, signature, start_offset, end_offset,
|
||||
chunk_size=io.DEFAULT_BUFFER_SIZE):
|
||||
sig_len = len(signature)
|
||||
remainder = b''
|
||||
pos = start_offset
|
||||
|
||||
while pos < end_offset:
|
||||
# required for each loop since fp may be changed during each yield
|
||||
fp.seek(pos)
|
||||
|
||||
chunk = remainder + fp.read(min(chunk_size, end_offset - pos))
|
||||
|
||||
delta = pos - len(remainder)
|
||||
idx = 0
|
||||
while True:
|
||||
idx = chunk.find(signature, idx)
|
||||
if idx == -1:
|
||||
break
|
||||
|
||||
yield delta + idx
|
||||
idx += 1
|
||||
|
||||
remainder = chunk[-(sig_len - 1):]
|
||||
pos += chunk_size
|
||||
|
||||
def _validate_local_file_entry_sequence(self, fp, start_offset, end_offset, checked_offsets=None):
|
||||
offset = start_offset
|
||||
|
||||
while offset < end_offset:
|
||||
self._debug(3, 'checking local file entry at:', offset)
|
||||
|
||||
# Cache checked offsets to improve performance.
|
||||
try:
|
||||
entry_size = checked_offsets[offset]
|
||||
except (KeyError, TypeError):
|
||||
entry_size = self._validate_local_file_entry(fp, offset, end_offset)
|
||||
if checked_offsets is not None:
|
||||
checked_offsets[offset] = entry_size
|
||||
else:
|
||||
self._debug(3, 'read from checked cache:', offset)
|
||||
|
||||
if entry_size is None:
|
||||
break
|
||||
|
||||
offset += entry_size
|
||||
|
||||
return offset - start_offset
|
||||
|
||||
def _validate_local_file_entry(self, fp, offset, end_offset):
|
||||
fp.seek(offset)
|
||||
try:
|
||||
fheader = self._read_local_file_header(fp)
|
||||
except BadZipFile:
|
||||
return None
|
||||
|
||||
# Create a dummy ZipInfo to utilize parsing.
|
||||
# Flush only the required information.
|
||||
zinfo = ZipInfo()
|
||||
zinfo.header_offset = offset
|
||||
zinfo.flag_bits = fheader[_FH_GENERAL_PURPOSE_FLAG_BITS]
|
||||
zinfo.compress_size = fheader[_FH_COMPRESSED_SIZE]
|
||||
zinfo.file_size = fheader[_FH_UNCOMPRESSED_SIZE]
|
||||
zinfo.CRC = fheader[_FH_CRC]
|
||||
|
||||
filename = fp.read(fheader[_FH_FILENAME_LENGTH])
|
||||
zinfo.extra = fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
|
||||
pos = fp.tell()
|
||||
|
||||
if pos > end_offset:
|
||||
return None
|
||||
|
||||
# parse zip64
|
||||
try:
|
||||
zinfo._decodeExtra(crc32(filename))
|
||||
except BadZipFile:
|
||||
return None
|
||||
|
||||
dd_size = 0
|
||||
|
||||
if zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR:
|
||||
# According to the spec, these fields should be zero when data
|
||||
# descriptor is used. Otherwise treat as a false positive on
|
||||
# random bytes to return early, as scanning for data descriptor
|
||||
# is rather expensive.
|
||||
if not (zinfo.CRC == zinfo.compress_size == zinfo.file_size == 0):
|
||||
return None
|
||||
|
||||
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
|
||||
|
||||
dd = self._scan_data_descriptor(fp, pos, end_offset, zip64)
|
||||
if dd is None and not self.strict_descriptor:
|
||||
if zinfo.flag_bits & _MASK_ENCRYPTED:
|
||||
dd = False
|
||||
else:
|
||||
dd = self._scan_data_descriptor_no_sig_by_decompression(
|
||||
fp, pos, end_offset, zip64, fheader[_FH_COMPRESSION_METHOD])
|
||||
if dd is False:
|
||||
dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64)
|
||||
if dd is None:
|
||||
return None
|
||||
|
||||
zinfo.CRC, zinfo.compress_size, zinfo.file_size, dd_size = dd
|
||||
|
||||
entry_size = (
|
||||
sizeFileHeader +
|
||||
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
|
||||
zinfo.compress_size +
|
||||
dd_size
|
||||
)
|
||||
|
||||
# Treat as a false positive if the entry would extend past end_offset,
|
||||
# so callers never strip more bytes than the gap actually holds.
|
||||
if offset + entry_size > end_offset:
|
||||
return None
|
||||
|
||||
return entry_size
|
||||
|
||||
def _read_local_file_header(self, fp):
|
||||
fheader = fp.read(sizeFileHeader)
|
||||
if len(fheader) != sizeFileHeader:
|
||||
raise BadZipFile("Truncated file header")
|
||||
fheader = struct.unpack(structFileHeader, fheader)
|
||||
if fheader[_FH_SIGNATURE] != stringFileHeader:
|
||||
raise BadZipFile("Bad magic number for file header")
|
||||
return fheader
|
||||
|
||||
def _scan_data_descriptor(self, fp, offset, end_offset, zip64):
|
||||
dd_fmt = '<LLQQ' if zip64 else '<LLLL'
|
||||
dd_size = struct.calcsize(dd_fmt)
|
||||
|
||||
# scan for signature and take the first valid descriptor
|
||||
for pos in self._iter_scan_signature(
|
||||
fp, struct.pack('<L', _DD_SIGNATURE), offset, end_offset
|
||||
):
|
||||
fp.seek(pos)
|
||||
dd = fp.read(min(dd_size, end_offset - pos))
|
||||
try:
|
||||
_, crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
|
||||
except struct.error:
|
||||
continue
|
||||
|
||||
# @TODO: also check CRC to better guard from a false positive?
|
||||
if pos - offset != compress_size:
|
||||
continue
|
||||
|
||||
return crc, compress_size, file_size, dd_size
|
||||
|
||||
return None
|
||||
|
||||
def _scan_data_descriptor_no_sig(self, fp, offset, end_offset, zip64,
|
||||
chunk_size=io.DEFAULT_BUFFER_SIZE):
|
||||
dd_fmt = '<LQQ' if zip64 else '<LLL'
|
||||
dd_size = struct.calcsize(dd_fmt)
|
||||
|
||||
pos = offset
|
||||
remainder = b''
|
||||
|
||||
fp.seek(offset)
|
||||
while pos < end_offset:
|
||||
chunk = remainder + fp.read(min(chunk_size, end_offset - pos))
|
||||
|
||||
delta = pos - len(remainder) - offset
|
||||
mv = memoryview(chunk)
|
||||
for i in range(len(chunk) - dd_size + 1):
|
||||
dd = mv[i:i + dd_size]
|
||||
crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
|
||||
if delta + i != compress_size:
|
||||
continue
|
||||
|
||||
return crc, compress_size, file_size, dd_size
|
||||
|
||||
remainder = chunk[-(dd_size - 1):]
|
||||
pos += chunk_size
|
||||
|
||||
return None
|
||||
|
||||
def _scan_data_descriptor_no_sig_by_decompression(self, fp, offset, end_offset, zip64, method):
|
||||
try:
|
||||
decompressor = _get_decompressor(method)
|
||||
except RuntimeError:
|
||||
return False
|
||||
|
||||
if decompressor is None:
|
||||
return False
|
||||
|
||||
dd_fmt = '<LQQ' if zip64 else '<LLL'
|
||||
dd_size = struct.calcsize(dd_fmt)
|
||||
|
||||
# early return and prevent potential `fp.read(-1)`
|
||||
if end_offset - dd_size < offset:
|
||||
return None
|
||||
|
||||
try:
|
||||
pos = self._trace_compressed_block_end(fp, offset, end_offset - dd_size, decompressor)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
fp.seek(pos)
|
||||
dd = fp.read(dd_size)
|
||||
try:
|
||||
crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
|
||||
except struct.error:
|
||||
return None
|
||||
if pos - offset != compress_size:
|
||||
return None
|
||||
|
||||
return crc, compress_size, file_size, dd_size
|
||||
|
||||
def _trace_compressed_block_end(self, fp, offset, end_offset, decompressor,
|
||||
chunk_size=io.DEFAULT_BUFFER_SIZE):
|
||||
fp.seek(offset)
|
||||
read_size = 0
|
||||
while True:
|
||||
chunk = fp.read(min(chunk_size, end_offset - offset - read_size))
|
||||
if not chunk:
|
||||
raise EOFError('Unexpected EOF while decompressing')
|
||||
|
||||
# may raise on error
|
||||
decompressor.decompress(chunk)
|
||||
|
||||
read_size += len(chunk)
|
||||
|
||||
if decompressor.eof:
|
||||
unused_len = len(decompressor.unused_data)
|
||||
return offset + read_size - unused_len
|
||||
|
||||
def _calc_local_file_entry_size(self, fp, zinfo):
|
||||
fp.seek(zinfo.header_offset)
|
||||
fheader = self._read_local_file_header(fp)
|
||||
|
||||
if zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR:
|
||||
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
|
||||
dd_fmt = '<LLQQ' if zip64 else '<LLLL'
|
||||
fp.seek(
|
||||
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
|
||||
zinfo.compress_size,
|
||||
os.SEEK_CUR,
|
||||
)
|
||||
if fp.read(struct.calcsize('<L')) != struct.pack('<L', _DD_SIGNATURE):
|
||||
dd_fmt = '<LQQ' if zip64 else '<LLL'
|
||||
dd_size = struct.calcsize(dd_fmt)
|
||||
else:
|
||||
dd_size = 0
|
||||
|
||||
return (
|
||||
sizeFileHeader +
|
||||
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
|
||||
zinfo.compress_size +
|
||||
dd_size
|
||||
)
|
||||
|
||||
def _copy_bytes(self, fp, old_offset, new_offset, size):
|
||||
read_size = 0
|
||||
while read_size < size:
|
||||
fp.seek(old_offset + read_size)
|
||||
data = fp.read(min(size - read_size, self.chunk_size))
|
||||
if not data:
|
||||
raise BadZipFile(
|
||||
"Truncated data while repacking "
|
||||
f"(expected {size} bytes at offset {old_offset})"
|
||||
)
|
||||
fp.seek(new_offset + read_size)
|
||||
fp.write(data)
|
||||
fp.flush()
|
||||
read_size += len(data)
|
||||
|
||||
|
||||
class ZipFile:
|
||||
""" Class with methods to open, read, write, close, list zip files.
|
||||
@@ -1874,6 +2350,77 @@ class ZipFile:
|
||||
for zipinfo in members:
|
||||
self._extract_member(zipinfo, path, pwd)
|
||||
|
||||
def remove(self, zinfo_or_arcname):
|
||||
"""Remove a member from the archive."""
|
||||
if self.mode not in ('w', 'x', 'a'):
|
||||
raise ValueError("remove() requires mode 'w', 'x', or 'a'")
|
||||
if not self.fp:
|
||||
raise ValueError(
|
||||
"Attempt to write to ZIP archive that was already closed")
|
||||
if self._writing:
|
||||
raise ValueError(
|
||||
"Can't write to ZIP archive while an open writing handle exists."
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
# get the zinfo
|
||||
if isinstance(zinfo_or_arcname, ZipInfo):
|
||||
zinfo = zinfo_or_arcname
|
||||
else:
|
||||
# raise KeyError if arcname does not exist
|
||||
zinfo = self.getinfo(zinfo_or_arcname)
|
||||
|
||||
try:
|
||||
self.filelist.remove(zinfo)
|
||||
except ValueError:
|
||||
raise KeyError('There is no item %r in the archive' % zinfo) from None
|
||||
|
||||
try:
|
||||
del self.NameToInfo[zinfo.filename]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Avoid missing entry if there is another entry having the same name,
|
||||
# to prevent an error on `testzip()`.
|
||||
# Reverse the order as NameToInfo normally stores the last added one.
|
||||
for zi in reversed(self.filelist):
|
||||
if zi.filename == zinfo.filename:
|
||||
self.NameToInfo.setdefault(zi.filename, zi)
|
||||
break
|
||||
|
||||
self._didModify = True
|
||||
|
||||
return zinfo
|
||||
|
||||
def repack(self, removed=None, *, strict_descriptor=True,
|
||||
chunk_size=_REPACK_CHUNK_SIZE):
|
||||
"""Repack a zip file, removing non-referenced file entries.
|
||||
|
||||
The archive must be opened with mode 'a', as mode 'w'/'x' do not
|
||||
truncate the file when closed. This cannot be simply changed as
|
||||
they may be used on an unseekable file buffer, which disallows
|
||||
truncation."""
|
||||
if self.mode != 'a':
|
||||
raise ValueError("repack() requires mode 'a'")
|
||||
if not self.fp:
|
||||
raise ValueError(
|
||||
"Attempt to write to ZIP archive that was already closed")
|
||||
if self._writing:
|
||||
raise ValueError(
|
||||
"Can't write to ZIP archive while an open writing handle exists"
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._writing = True
|
||||
try:
|
||||
repacker = _ZipRepacker(
|
||||
strict_descriptor=strict_descriptor,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
repacker.repack(self, removed)
|
||||
finally:
|
||||
self._writing = False
|
||||
|
||||
@classmethod
|
||||
def _sanitize_windows_name(cls, arcname, pathsep):
|
||||
"""Replace bad characters and remove trailing dots from parts."""
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Add :meth:`~zipfile.ZipFile.remove` and :meth:`~zipfile.ZipFile.repack` to :class:`~zipfile.ZipFile`.
|
||||
Reference in New Issue
Block a user