Skip to content

bpo-38334: Fix seeking backward on an encrypted zipfile.ZipExtFile. #16937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Lib/test/test_zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,44 @@ def test_unicode_password(self):
self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")

def test_seek_tell(self):
self.zip.setpassword(b"python")
txt = self.plain
test_word = b'encryption'
bloc = txt.find(test_word)
bloc_len = len(test_word)
with self.zip.open("test.txt", "r") as fp:
fp.seek(bloc, os.SEEK_SET)
self.assertEqual(fp.tell(), bloc)
fp.seek(-bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), bloc)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])

# Make sure that the second read after seeking back beyond
# _readbuffer returns the same content (ie. rewind to the start of
# the file to read forward to the required position).
old_read_size = fp.MIN_READ_SIZE
fp.MIN_READ_SIZE = 1
fp._readbuffer = b''
fp._offset = 0
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
fp.MIN_READ_SIZE = old_read_size

fp.seek(0, os.SEEK_END)
self.assertEqual(fp.tell(), len(txt))
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)

# Read the file completely to definitely call any eof integrity
# checks (crc) and make sure they still pass.
fp.read()


class AbstractTestsWithRandomBinaryFiles:
@classmethod
def setUpClass(cls):
Expand Down
57 changes: 31 additions & 26 deletions Lib/zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,10 @@ class ZipExtFile(io.BufferedIOBase):
# Chunk size to read during seek
MAX_SEEK_READ = 1 << 24

def __init__(self, fileobj, mode, zipinfo, decrypter=None,
def __init__(self, fileobj, mode, zipinfo, pwd=None,
close_fileobj=False):
self._fileobj = fileobj
self._decrypter = decrypter
self._pwd = pwd
self._close_fileobj = close_fileobj

self._compress_type = zipinfo.compress_type
Expand All @@ -810,11 +810,6 @@ def __init__(self, fileobj, mode, zipinfo, decrypter=None,

self.newlines = None

# Adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information.
if self._decrypter is not None:
self._compress_left -= 12

self.mode = mode
self.name = zipinfo.filename

Expand All @@ -835,6 +830,30 @@ def __init__(self, fileobj, mode, zipinfo, decrypter=None,
except AttributeError:
pass

self._decrypter = None
if pwd:
if zipinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zipinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zipinfo.CRC >> 24) & 0xff
h = self._init_decrypter()
if h != check_byte:
raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename)


def _init_decrypter(self):
self._decrypter = _ZipDecrypter(self._pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = self._fileobj.read(12)
self._compress_left -= 12
return self._decrypter(header)[11]

def __repr__(self):
result = ['<%s.%s' % (self.__class__.__module__,
self.__class__.__qualname__)]
Expand Down Expand Up @@ -1061,6 +1080,8 @@ def seek(self, offset, whence=0):
self._decompressor = _get_decompressor(self._compress_type)
self._eof = False
read_offset = new_pos
if self._decrypter is not None:
self._init_decrypter()

while read_offset > 0:
read_len = min(self.MAX_SEEK_READ, read_offset)
Expand Down Expand Up @@ -1524,32 +1545,16 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False):

# check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1
zd = None
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
raise RuntimeError("File %r is encrypted, password "
"required for extraction" % name)
else:
pwd = None

zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = zef_file.read(12)
h = zd(header[0:12])
if zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zinfo.CRC >> 24) & 0xff
if h[11] != check_byte:
raise RuntimeError("Bad password for file %r" % name)

return ZipExtFile(zef_file, mode, zinfo, zd, True)
return ZipExtFile(zef_file, mode, zinfo, pwd, True)
except:
zef_file.close()
raise
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed seeking backward on an encrypted :class:`zipfile.ZipExtFile`.