diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index f25af8ca6a338f..313ebcc2768497 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -8,7 +8,7 @@ .. sectionauthor:: Lars Gustäbel **Source code:** :source:`Lib/tarfile.py` - + -------------- The :mod:`tarfile` module makes it possible to read and write tar @@ -512,6 +512,137 @@ be finalized; only the internally used file object will be closed. See the +.. _safetarfile-objects: + +SafeTarFile Objects +------------------- + +In general, it is no good idea to extract tar archives from sources you do not +completely trust. Archives that were created carelessly or maliciously may +contain file system objects in configurations that pose a variety of risks to +the system if they are extracted, for example overwriting existing files in +unanticipated locations. See the warning for :meth:`TarFile.extractall`. + +The :class:`SafeTarFile` class is a replacement for the :class:`TarFile` class +that can be used identically but tries to safeguard against a number of +unwanted side-effects. :class:`SafeTarFile` does this by identifying bad +archives and preventing the bad parts from being extracted. The default +behaviour of the :class:`SafeTarFile` class is to raise a :exc:`SecurityError` +exception in case of a bad archive member or a :exc:`LimitError` in case of an +exceeded limit. + +.. note:: + + There is no additional benefit in using :class:`SafeTarFile` for the + creation of tar archives. + +.. versionadded:: 3.5 + Added the :class:`SafeTarFile` class. + +.. class:: SafeTarFile(..., ignore_warnings=None, max_files=100000, max_total=1073741824) + + :class:`SafeTarFile` offers a few additional keyword arguments to the + arguments it has in common with the :class:`TarFile` class: + + *ignore_warnings* takes a list of constants one for each warning that + you like to ignore, by default no warnings are ignored. See the first part + of :ref:`safetarfile-configuration` for the constants. + + *max_files* is the maximum allowed number of files stored in the tar + archive, default is ``100000``. To disable the limit, pass :const:`None` or + ``0``. + + *max_total* is the maximum allowed size in bytes that all files together may + occupy when extracted. This defaults to 1 GiB. To disable the limit, pass + :const:`None` or ``0``. + +.. method:: SafeTarFile.analyze() + + Check the archive for possible issues, and generate a 2-tuple for each + member consisting of the member's :class:`TarInfo` object and a :class:`set` + that is either empty (good) or contains one or more warnings described in + :ref:`safetarfile-configuration` (bad). No :exc:`SecurityError` exceptions + are raised. If a limit is exceeded a :exc:`LimitError` is raised. + +.. method:: SafeTarFile.filter() + + Return a generator that only produces :class:`TarInfo` objects that are not + marked as bad, e.g. to restore the good parts of an archive. However, if a + limit is exceeded a :exc:`LimitError` is raised. + +.. method:: SafeTarFile.is_safe() + + Analyze the archive and return :const:`True` if there were no issues found + and it should be safe to extract the archive to the file system. Neither + :exc:`SecurityError` nor :exc:`LimitError` will be raised. + + + +.. _safetarfile-configuration: + +SafeTarFile configuration +~~~~~~~~~~~~~~~~~~~~~~~~~ + +There are two different types of checks built into :class:`SafeTarFile`. The +first type takes care of archive members whose configuration poses a risk to +the system when they are extracted. Each of these checks can be switched off +by passing a list of the following constants as the *ignore_warnings* argument +to the :class:`SafeTarFile` constructor. These constants are also stored in +the :attr:`warning` attribute of a :exc:`SecurityError`. + +.. data:: WARN_ABSOLUTE_NAME + + An absolute pathname (names starting with a ``"/"``). + +.. data:: WARN_RELATIVE_NAME + + A relative pathname (names starting with ``".."``) that breaks out of the + destination directory. + +.. data:: WARN_DUPLICATE_NAME + + A duplicate pathname. + +.. data:: WARN_ABSOLUTE_LINKNAME + + An absolute linkname. + +.. data:: WARN_RELATIVE_LINKNAME + + A relative linkname that breaks out of the destination directory. + +.. data:: WARN_SETUID_SET + + A regular file with a set-user-id permission bit set. + +.. data:: WARN_SETGID_SET + + A regular file with a set-group-id permission bit set. + +.. data:: WARN_CHARACTER_DEVICE + + A character device node. + +.. data:: WARN_BLOCK_DEVICE + + A block device node. + +The second type of check makes sure that the archive complies to a number of +user-defined limits, e.g. to prevent denial-of-service scenarios by excessive +use of memory or disk space. These limits can be configured using the keyword +arguments exclusive to the :class:`SafeTarFile` constructor. The following +constants are stored in the :attr:`warning` attribute of a :exc:`LimitError`. + +.. data:: LIMIT_MAX_FILES + + Maximum allowed number of files exceeded. + +.. data:: LIMIT_MAX_SIZE + + Maximum allowed total size of unpacked contents exceeded. + + + .. _tarinfo-objects: TarInfo Objects @@ -804,6 +935,21 @@ parameter in :meth:`TarFile.add`:: tar.add("foo", filter=reset) tar.close() +How to safely extract a tar archive from an untrusted source:: + + import tarfile + + with tarfile.safe_open("sample.tar", ignore_warnings={tarfile.WARN_DUPLICATE_NAME}) as tar: + # We don't care about duplicate archive members. + if not tar.is_safe(): + print("sample.tar has the following issues:") + for tarinfo, warnings in tar.analyze(): + print(tarinfo.name, ",".join(warnings)) + print("extracting the good parts") + tar.extractall(members=tar.filter()) + else: + tar.extractall() + .. _tar-formats: diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 2c06f9160c658a..66cda8dc6f1251 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -66,10 +66,14 @@ pass # from tarfile import * -__all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError", - "CompressionError", "StreamError", "ExtractError", "HeaderError", - "ENCODING", "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT", - "DEFAULT_FORMAT", "open"] +__all__ = ["TarFile", "SafeTarFile", "TarInfo", "is_tarfile", "TarError", + "ReadError", "CompressionError", "StreamError", "ExtractError", + "HeaderError", "SecurityError", "LimitError", "ENCODING", + "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT", "LIMIT_MAX_SIZE", + "LIMIT_MAX_FILES", "WARN_BLOCK_DEVICE", "WARN_CHARACTER_DEVICE", + "WARN_SETGID_SET", "WARN_SETUID_SET", "WARN_RELATIVE_LINKNAME", + "WARN_ABSOLUTE_LINKNAME", "WARN_DUPLICATE_NAME", "WARN_RELATIVE_NAME", + "WARN_ABSOLUTE_NAME", "DEFAULT_FORMAT", "open", "safe_open"] #--------------------------------------------------------- # tar constants @@ -143,6 +147,20 @@ "size": int } +# SafeTarFile-related string constants. +WARN_ABSOLUTE_NAME = "absolute name" +WARN_RELATIVE_NAME = "relative name" +WARN_DUPLICATE_NAME = "duplicate name" +WARN_ABSOLUTE_LINKNAME = "absolute linkname" +WARN_RELATIVE_LINKNAME = "relative linkname" +WARN_SETUID_SET = "setuid set" +WARN_SETGID_SET = "setgid set" +WARN_CHARACTER_DEVICE = "character device" +WARN_BLOCK_DEVICE = "block device" + +LIMIT_MAX_FILES = "file limit exceeded" +LIMIT_MAX_SIZE = "space limit exceeded" + #--------------------------------------------------------- # initialization #--------------------------------------------------------- @@ -296,6 +314,19 @@ class InvalidHeaderError(HeaderError): class SubsequentHeaderError(HeaderError): """Exception for missing and invalid extended headers.""" pass +class SecurityError(TarError): + """Exception for potentially dangerous contents.""" + def __init__(self, tarinfo, warning): + self.tarinfo = tarinfo + self.warning = warning + def __str__(self): + return "%s: %s" % (self.tarinfo, self.warning) +class LimitError(SecurityError): + """Exception for an exceeded limit.""" + def __init__(self, warning): + super().__init__(None, warning) + def __str__(self): + return self.warning #--------------------------- # internal stream interface @@ -2455,6 +2486,159 @@ def __exit__(self, type, value, traceback): self.fileobj.close() self.closed = True +class SafeTarFile(TarFile): + """A subclass of TarFile that safeguards against malicious data. + """ + + def __init__(self, *args, ignore_warnings=None, + max_files=100000, max_total=1024**3, **kwargs): + super().__init__(*args, **kwargs) + + if ignore_warnings: + self.ignore_warnings = set(ignore_warnings) + else: + self.ignore_warnings = set() + + self.max_files = max_files + self.max_total = max_total + self.symlink_effective_name_map = {} + + def __iter__(self): + """Safe iterator over the TarFile, that raises a SecurityError + exception on the first warning. + """ + for tarinfo, warnings in self.analyze(): + if warnings: + raise SecurityError(tarinfo, warnings.pop()) + yield tarinfo + + def analyze(self): + """Generate a list of (TarInfo, warnings) tuples. + """ + self.names = set() + self.total = 0 + + for tarinfo in super().__iter__(): + warnings = set(self._check_member(tarinfo)) + yield tarinfo, warnings - self.ignore_warnings + + def filter(self): + """Generate a list of good TarInfo objects. + """ + for tarinfo, warnings in self.analyze(): + if warnings: + continue + yield tarinfo + + def is_safe(self): + """Return True if the archive should be safe to extract. + """ + try: + for tarinfo, warnings in self.analyze(): + if warnings: + return False + else: + return True + + except LimitError: + return False + + def _check_member(self, tarinfo): + """Check a single TarInfo object for problems. Override this in a + subclass if you want to add more checks. + """ + if self.max_files and len(self.members) == self.max_files: + raise LimitError(LIMIT_MAX_FILES) + + self.total = tarinfo.size + if self.max_total and self.total > self.max_total: + raise LimitError(LIMIT_MAX_SIZE) + + effective_name = self._get_effective_name(tarinfo.name) + if effective_name in self.symlink_effective_name_map: + del self.symlink_effective_name_map[effective_name] + + yield from self._check_all(tarinfo, effective_name) + + if tarinfo.issym(): + effective_linkname = self._get_effective_name(tarinfo.linkname) + cwd = os.path.dirname(effective_name) + relative_effective_linkname = effective_linkname if (os.path.isabs(effective_linkname)) \ + else os.path.relpath(effective_linkname, cwd) + self.symlink_effective_name_map[effective_name] = relative_effective_linkname + yield from self._check_symlink(effective_name, relative_effective_linkname) + elif tarinfo.islnk(): + yield from self._check_link(tarinfo) + elif tarinfo.ischr() or tarinfo.isblk(): + yield from self._check_device(tarinfo) + + def _get_effective_name(self, given_name): + namelist = given_name.split("/") + if len(namelist) > 1: + effective_name = "" + + for i in range(len(namelist)): + name = namelist[i] + + if name == "": + effective_name += "/" + else: + effective_name += name + + effective_name = os.path.normpath(effective_name) + if effective_name in self.symlink_effective_name_map: + effective_name = self.symlink_effective_name_map[effective_name] + + if i < len(namelist) - 1 and effective_name[len(effective_name)-1] != "/": + effective_name += "/" + + return effective_name + else: + return given_name + + def _check_all(self, tarinfo, effective_name): + if os.path.isabs(effective_name): + yield WARN_ABSOLUTE_NAME + + name = os.path.normpath(effective_name) + if name.startswith(".."): + yield WARN_RELATIVE_NAME + + if effective_name in self.names: + yield WARN_DUPLICATE_NAME + else: + self.names.add(effective_name) + + if tarinfo.isreg() and tarinfo.mode & stat.S_ISUID: + yield WARN_SETUID_SET + + if tarinfo.isreg() and tarinfo.mode & stat.S_ISGID: + yield WARN_SETGID_SET + + def _check_symlink(self, effective_name, effective_linkname): + if os.path.isabs(effective_linkname): + yield WARN_ABSOLUTE_LINKNAME + + linkname = os.path.join(os.path.dirname(effective_name), effective_linkname) + linkname = os.path.normpath(linkname) + + if linkname.startswith(".."): + yield WARN_RELATIVE_LINKNAME + + def _check_link(self, tarinfo): + if os.path.isabs(tarinfo.linkname): + yield WARN_ABSOLUTE_LINKNAME + + linkname = os.path.normpath(tarinfo.linkname) + if linkname.startswith(".."): + yield WARN_RELATIVE_LINKNAME + + def _check_device(self, tarinfo): + if tarinfo.ischr(): + yield WARN_CHARACTER_DEVICE + elif tarinfo.isblk(): + yield WARN_BLOCK_DEVICE + #-------------------- # exported functions #-------------------- @@ -2470,6 +2654,7 @@ def is_tarfile(name): return False open = TarFile.open +safe_open = SafeTarFile.open def main(): diff --git a/Lib/test/tarfiletestdata/sly_absolute0.tar b/Lib/test/tarfiletestdata/sly_absolute0.tar new file mode 100644 index 00000000000000..94fabeb382dfcb Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_absolute0.tar differ diff --git a/Lib/test/tarfiletestdata/sly_absolute1.tar b/Lib/test/tarfiletestdata/sly_absolute1.tar new file mode 100644 index 00000000000000..87e4083715c707 Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_absolute1.tar differ diff --git a/Lib/test/tarfiletestdata/sly_dirsymlink0.tar b/Lib/test/tarfiletestdata/sly_dirsymlink0.tar new file mode 100644 index 00000000000000..f5c14d9b4f90e8 Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_dirsymlink0.tar differ diff --git a/Lib/test/tarfiletestdata/sly_dirsymlink1.tar b/Lib/test/tarfiletestdata/sly_dirsymlink1.tar new file mode 100644 index 00000000000000..6e9bfa32cc2036 Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_dirsymlink1.tar differ diff --git a/Lib/test/tarfiletestdata/sly_dirsymlink2.tar b/Lib/test/tarfiletestdata/sly_dirsymlink2.tar new file mode 100644 index 00000000000000..b8c488dffb9f6c Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_dirsymlink2.tar differ diff --git a/Lib/test/tarfiletestdata/sly_dirsymlink3.tar b/Lib/test/tarfiletestdata/sly_dirsymlink3.tar new file mode 100644 index 00000000000000..fafc43145f557e Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_dirsymlink3.tar differ diff --git a/Lib/test/tarfiletestdata/sly_relative0.tar b/Lib/test/tarfiletestdata/sly_relative0.tar new file mode 100644 index 00000000000000..f5f4e99fbdb41e Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_relative0.tar differ diff --git a/Lib/test/tarfiletestdata/sly_relative1.tar b/Lib/test/tarfiletestdata/sly_relative1.tar new file mode 100644 index 00000000000000..60e4982a56c029 Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_relative1.tar differ diff --git a/Lib/test/tarfiletestdata/sly_symlink.tar b/Lib/test/tarfiletestdata/sly_symlink.tar new file mode 100644 index 00000000000000..442198febbe557 Binary files /dev/null and b/Lib/test/tarfiletestdata/sly_symlink.tar differ diff --git a/Lib/test/testtar.tar b/Lib/test/tarfiletestdata/testtar.tar similarity index 100% rename from Lib/test/testtar.tar rename to Lib/test/tarfiletestdata/testtar.tar diff --git a/Lib/test/test_safetarfile.py b/Lib/test/test_safetarfile.py new file mode 100644 index 00000000000000..8c7da946c978c9 --- /dev/null +++ b/Lib/test/test_safetarfile.py @@ -0,0 +1,243 @@ +import sys +import os +import io +import unittest +import tarfile +from .test_tarfile import testtardir, setUpModule as setUpModuleTarFile, tearDownModule as tearDownModuleTarFile, GzipTest, Bz2Test, LzmaTest, UstarReadTestBase, ListTestBase + +class SafeTarFileTestBase: + tarfile_module = tarfile.SafeTarFile + tarfile_open = tarfile.safe_open + +class SafeTarFileTest(unittest.TestCase): + ANALYZE_RESULTS = "analyzeresults" + FILTER_RESULTS = "filterresults" + IS_SAFE_RESULTS = "is_saferesults" + + TEST_RESULTS = { + os.path.join(testtardir, "sly_absolute0.tar"): + { + ANALYZE_RESULTS: + { + "/tmp/moo": {tarfile.WARN_ABSOLUTE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_absolute1.tar"): + { + ANALYZE_RESULTS: + { + "//tmp/moo": {tarfile.WARN_ABSOLUTE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_dirsymlink0.tar"): + { + ANALYZE_RESULTS: + { + "tmp": {tarfile.WARN_ABSOLUTE_LINKNAME}, + "tmp/moo": {tarfile.WARN_ABSOLUTE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_dirsymlink1.tar"): + { + ANALYZE_RESULTS: + { + "cur": set(), + "par": {tarfile.WARN_RELATIVE_LINKNAME}, + "par/moo": {tarfile.WARN_RELATIVE_NAME} + }, + FILTER_RESULTS: ["cur"], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_dirsymlink2.tar"): + { + ANALYZE_RESULTS: + { + "cur": set(), + "cur/par": {tarfile.WARN_RELATIVE_LINKNAME}, + "par/moo": {tarfile.WARN_RELATIVE_NAME} + }, + FILTER_RESULTS: ["cur"], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_dirsymlink3.tar"): + { + ANALYZE_RESULTS: + { + "dirsym": set(), + "dirsym/sym": set(), + "dirsym/symsym3": {tarfile.WARN_RELATIVE_LINKNAME} + }, + FILTER_RESULTS: ["dirsym", "dirsym/sym"], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_relative0.tar"): + { + ANALYZE_RESULTS: + { + "../moo": {tarfile.WARN_RELATIVE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_relative1.tar"): + { + ANALYZE_RESULTS: + { + "tmp/../../moo": {tarfile.WARN_RELATIVE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + }, + + os.path.join(testtardir, "sly_symlink.tar"): + { + ANALYZE_RESULTS: + { + "moo": {tarfile.WARN_ABSOLUTE_LINKNAME}, + "moo": {tarfile.WARN_DUPLICATE_NAME} + }, + FILTER_RESULTS: [], + IS_SAFE_RESULTS: False + } + } + + def test_analyze(self): + for entry in self.TEST_RESULTS: + analyzeresults = self._get_analyze_results(entry) + expectedvalues = self.TEST_RESULTS[entry][self.ANALYZE_RESULTS] + self.assertEqual(analyzeresults, expectedvalues, + "SafeTarFile analyze() failed for " + entry) + + def test_filter(self): + for entry in self.TEST_RESULTS: + filterresults = self._get_filter_results(entry) + expectedvalues = self.TEST_RESULTS[entry][self.FILTER_RESULTS] + self.assertEqual(filterresults, expectedvalues, + "SafeTarFile filter() failed for " + entry) + + def test_is_safe(self): + for entry in self.TEST_RESULTS: + issaferesults = self._get_is_safe_results(entry) + expectedvalues = self.TEST_RESULTS[entry][self.IS_SAFE_RESULTS] + self.assertEqual(issaferesults, expectedvalues, + "SafeTarFile is_safe() failed for " + entry) + + def _get_analyze_results(self, tarballpath): + with open(tarballpath, "r+b") as fileobj: + results = {} + tar = tarfile.safe_open(fileobj=fileobj) + for result in tar.analyze(): + results[result[0].name] = result[1] + + return results + + def _get_filter_results(self, tarballpath): + with open(tarballpath, "r+b") as fileobj: + results = [] + tar = tarfile.safe_open(fileobj=fileobj) + for result in tar.filter(): + results.append(result.name) + + return results + + def _get_is_safe_results(self, tarballpath): + with open(tarballpath, "r+b") as fileobj: + tar = tarfile.safe_open(fileobj=fileobj) + return tar.is_safe() + + +class FileUstarReadTest(UstarReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class FileGzipUstarReadTest(GzipTest, UstarReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class FileBz2UstarReadTest(Bz2Test, UstarReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class FileLzmaUstarReadTest(LzmaTest, UstarReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + + +class ListTest(ListTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +""" + +class GzipListTest(GzipTest, ListTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class Bz2ListTest(Bz2Test, ListTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class LzmaListTest(LzmaTest, ListTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + + + +class MiscReadTest(MiscReadTestBase, unittest.TestCase, SafeTarFileTestBase): + test_fail_comp = None + +class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase, SafeTarFileTestBase): + def requires_name_attribute(self): + self.skipTest("BZ2File have no name attribute") + +class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase, SafeTarFileTestBase): + def requires_name_attribute(self): + self.skipTest("LZMAFile have no name attribute") + + +class StreamReadTest(StreamReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class GzipStreamReadTest(GzipTest, StreamReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class Bz2StreamReadTest(Bz2Test, StreamReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class LzmaStreamReadTest(LzmaTest, StreamReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + + +class DetectReadTest(DetectReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class GzipDetectReadTest(GzipTest, DetectReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class Bz2DetectReadTest(Bz2DetectReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class LzmaDetectReadTest(LzmaTest, DetectReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +class MemberReadTest(MemberReadTestBase, unittest.TestCase, SafeTarFileTestBase): + pass + +""" + +def setUpModule(): + setUpModuleTarFile() + +def tearDownModule(): + tearDownModuleTarFile() + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 7e32cbccd6c56d..ef07c68a7d7d55 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -32,7 +32,8 @@ def md5sum(data): TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" tarextdir = TEMPDIR + '-extract-test' -tarname = support.findfile("testtar.tar") +tarname = support.findfile("testtar.tar", subdir="tarfiletestdata") +testtardir = os.path.dirname(tarname) gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") xzname = os.path.join(TEMPDIR, "testtar.tar.xz") @@ -42,12 +43,17 @@ def md5sum(data): md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" +class TarFileTestBase: + tarfile_module = tarfile.TarFile + tarfile_open = tarfile.open + def get_taropen(self): + return getattr(self.tarfile_module, self.taropen_name) class TarTest: tarname = tarname suffix = '' open = io.FileIO - taropen = tarfile.TarFile.taropen + taropen_name = 'taropen' @property def mode(self): @@ -58,21 +64,21 @@ class GzipTest: tarname = gzipname suffix = 'gz' open = gzip.GzipFile if gzip else None - taropen = tarfile.TarFile.gzopen + taropen_name = 'gzopen' @support.requires_bz2 class Bz2Test: tarname = bz2name suffix = 'bz2' open = bz2.BZ2File if bz2 else None - taropen = tarfile.TarFile.bz2open + taropen_name = 'bz2open' @support.requires_lzma class LzmaTest: tarname = xzname suffix = 'xz' open = lzma.LZMAFile if lzma else None - taropen = tarfile.TarFile.xzopen + taropen_name = 'xzopen' class ReadTest(TarTest): @@ -80,14 +86,14 @@ class ReadTest(TarTest): prefix = "r:" def setUp(self): - self.tar = tarfile.open(self.tarname, mode=self.mode, + self.tar = self.tarfile_open(self.tarname, mode=self.mode, encoding="iso8859-1") def tearDown(self): self.tar.close() -class UstarReadTest(ReadTest, unittest.TestCase): +class UstarReadTestBase(ReadTest): def test_fileobj_regular_file(self): tarinfo = self.tar.getmember("ustar/regtype") @@ -211,21 +217,23 @@ def test_fileobj_symlink2(self): def test_issue14160(self): self._test_fileobj_link("symtype2", "ustar/regtype") -class GzipUstarReadTest(GzipTest, UstarReadTest): +class UstarReadTest(UstarReadTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2UstarReadTest(Bz2Test, UstarReadTest): +class GzipUstarReadTest(GzipTest, UstarReadTestBase, unittest.TestCase, TarFileTestBase): pass -class LzmaUstarReadTest(LzmaTest, UstarReadTest): +class Bz2UstarReadTest(Bz2Test, UstarReadTestBase, unittest.TestCase, TarFileTestBase): pass +class LzmaUstarReadTest(LzmaTest, UstarReadTestBase, unittest.TestCase, TarFileTestBase): + pass -class ListTest(ReadTest, unittest.TestCase): +class ListTestBase(ReadTest): # Override setUp to use default encoding (UTF-8) def setUp(self): - self.tar = tarfile.open(self.tarname, mode=self.mode) + self.tar = self.tarfile_open(self.tarname, mode=self.mode) def test_list(self): tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') @@ -300,16 +308,16 @@ def members(tar): self.assertIn(b'ustar/regtype', out) self.assertNotIn(b'ustar/conttype', out) - -class GzipListTest(GzipTest, ListTest): +class ListTest(ListTestBase, unittest.TestCase, TarFileTestBase): pass - -class Bz2ListTest(Bz2Test, ListTest): +class GzipListTest(GzipTest, ListTestBase, unittest.TestCase, TarFileTestBase): pass +class Bz2ListTest(Bz2Test, ListTestBase, unittest.TestCase, TarFileTestBase): + pass -class LzmaListTest(LzmaTest, ListTest): +class LzmaListTest(LzmaTest, ListTestBase, unittest.TestCase, TarFileTestBase): pass @@ -317,16 +325,16 @@ class CommonReadTest(ReadTest): def test_empty_tarfile(self): # Test for issue6123: Allow opening empty archives. - # This test checks if tarfile.open() is able to open an empty tar + # This test checks if self.tarfile_open() is able to open an empty tar # archive successfully. Note that an empty tar archive is not the # same as an empty file! - with tarfile.open(tmpname, self.mode.replace("r", "w")): + with self.tarfile_open(tmpname, self.mode.replace("r", "w")): pass try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) tar.getnames() except tarfile.ReadError: - self.fail("tarfile.open() failed on empty archive") + self.fail("self.tarfile_open() failed on empty archive") else: self.assertListEqual(tar.getmembers(), []) finally: @@ -336,16 +344,16 @@ def test_non_existent_tarfile(self): # Test for issue11513: prevent non-existent gzipped tarfiles raising # multiple exceptions. with self.assertRaisesRegex(FileNotFoundError, "xxx"): - tarfile.open("xxx", self.mode) + self.tarfile_open("xxx", self.mode) def test_null_tarfile(self): # Test for issue6123: Allow opening empty archives. - # This test guarantees that tarfile.open() does not treat an empty + # This test guarantees that self.tarfile_open() does not treat an empty # file as an empty tar archive. with open(tmpname, "wb"): pass - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) + self.assertRaises(tarfile.ReadError, self.tarfile_open, tmpname, self.mode) + self.assertRaises(tarfile.ReadError, self.tarfile_open, tmpname) def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. @@ -361,7 +369,7 @@ def test_ignore_zeros(self): fobj.write(tarinfo.tobuf()) fobj.write(data) - tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) + tar = self.tarfile_open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], "ignore_zeros=True should have skipped the %r-blocks" % @@ -371,7 +379,7 @@ def test_ignore_zeros(self): def test_premature_end_of_archive(self): for size in (512, 600, 1024, 1200): - with tarfile.open(tmpname, "w:") as tar: + with self.tarfile_open(tmpname, "w:") as tar: t = tarfile.TarInfo("foo") t.size = 1024 tar.addfile(t, io.BytesIO(b"a" * 1024)) @@ -379,12 +387,12 @@ def test_premature_end_of_archive(self): with open(tmpname, "r+b") as fobj: fobj.truncate(size) - with tarfile.open(tmpname) as tar: + with self.tarfile_open(tmpname) as tar: with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): for t in tar: pass - with tarfile.open(tmpname) as tar: + with self.tarfile_open(tmpname) as tar: t = tar.next() with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): @@ -401,7 +409,7 @@ def test_no_name_argument(self): self.requires_name_attribute() with open(self.tarname, "rb") as fobj: self.assertIsInstance(fobj.name, str) - with tarfile.open(fileobj=fobj, mode=self.mode) as tar: + with self.tarfile_open(fileobj=fobj, mode=self.mode) as tar: self.assertIsInstance(tar.name, str) self.assertEqual(tar.name, os.path.abspath(fobj.name)) @@ -410,7 +418,7 @@ def test_no_name_attribute(self): data = fobj.read() fobj = io.BytesIO(data) self.assertRaises(AttributeError, getattr, fobj, "name") - tar = tarfile.open(fileobj=fobj, mode=self.mode) + tar = self.tarfile_open(fileobj=fobj, mode=self.mode) self.assertIsNone(tar.name) def test_empty_name_attribute(self): @@ -418,16 +426,16 @@ def test_empty_name_attribute(self): data = fobj.read() fobj = io.BytesIO(data) fobj.name = "" - with tarfile.open(fileobj=fobj, mode=self.mode) as tar: + with self.tarfile_open(fileobj=fobj, mode=self.mode) as tar: self.assertIsNone(tar.name) def test_int_name_attribute(self): - # Issue 21044: tarfile.open() should handle fileobj with an integer + # Issue 21044: self.tarfile_open() should handle fileobj with an integer # 'name' attribute. fd = os.open(self.tarname, os.O_RDONLY) with open(fd, 'rb') as fobj: self.assertIsInstance(fobj.name, int) - with tarfile.open(fileobj=fobj, mode=self.mode) as tar: + with self.tarfile_open(fileobj=fobj, mode=self.mode) as tar: self.assertIsNone(tar.name) def test_bytes_name_attribute(self): @@ -435,23 +443,23 @@ def test_bytes_name_attribute(self): tarname = os.fsencode(self.tarname) with open(tarname, 'rb') as fobj: self.assertIsInstance(fobj.name, bytes) - with tarfile.open(fileobj=fobj, mode=self.mode) as tar: + with self.tarfile_open(fileobj=fobj, mode=self.mode) as tar: self.assertIsInstance(tar.name, bytes) self.assertEqual(tar.name, os.path.abspath(fobj.name)) def test_pathlike_name(self): tarname = pathlib.Path(self.tarname) - with tarfile.open(tarname, mode=self.mode) as tar: + with self.tarfile_open(tarname, mode=self.mode) as tar: self.assertIsInstance(tar.name, str) self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) - with self.taropen(tarname) as tar: + with self.get_taropen()(tarname) as tar: self.assertIsInstance(tar.name, str) self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) - with tarfile.TarFile.open(tarname, mode=self.mode) as tar: + with self.tarfile_module.open(tarname, mode=self.mode) as tar: self.assertIsInstance(tar.name, str) self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) if self.suffix == '': - with tarfile.TarFile(tarname, mode='r') as tar: + with self.tarfile_module(tarname, mode='r') as tar: self.assertIsInstance(tar.name, str) self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) @@ -459,16 +467,16 @@ def test_illegal_mode_arg(self): with open(tmpname, 'wb'): pass with self.assertRaisesRegex(ValueError, 'mode must be '): - tar = self.taropen(tmpname, 'q') + tar = self.get_taropen()(tmpname, 'q') with self.assertRaisesRegex(ValueError, 'mode must be '): - tar = self.taropen(tmpname, 'rw') + tar = self.get_taropen()(tmpname, 'rw') with self.assertRaisesRegex(ValueError, 'mode must be '): - tar = self.taropen(tmpname, '') + tar = self.get_taropen()(tmpname, '') def test_fileobj_with_offset(self): # Skip the first member and store values from the second member # of the testtar. - tar = tarfile.open(self.tarname, mode=self.mode) + tar = self.tarfile_open(self.tarname, mode=self.mode) try: tar.next() t = tar.next() @@ -495,9 +503,9 @@ def test_fileobj_with_offset(self): def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) + self.assertRaises(tarfile.ReadError, self.tarfile_open, tarname, self.mode) with open(tarname, "rb") as fobj: - self.assertRaises(tarfile.ReadError, tarfile.open, + self.assertRaises(tarfile.ReadError, self.tarfile_open, fileobj=fobj, mode=self.mode) def test_v7_dirtype(self): @@ -535,7 +543,7 @@ def test_find_members(self): @support.skip_unless_symlink def test_extract_hardlink(self): # Test hardlink extraction (e.g. bug #857297). - with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: + with self.tarfile_open(tarname, errorlevel=1, encoding="iso8859-1") as tar: tar.extract("ustar/regtype", TEMPDIR) self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) @@ -554,7 +562,7 @@ def test_extract_hardlink(self): def test_extractall(self): # Test if extractall() correctly restores directory permissions # and times (see issue1735). - tar = tarfile.open(tarname, encoding="iso8859-1") + tar = self.tarfile_open(tarname, encoding="iso8859-1") DIR = os.path.join(TEMPDIR, "extractall") os.mkdir(DIR) try: @@ -586,7 +594,7 @@ def test_extract_directory(self): DIR = os.path.join(TEMPDIR, "extractdir") os.mkdir(DIR) try: - with tarfile.open(tarname, encoding="iso8859-1") as tar: + with self.tarfile_open(tarname, encoding="iso8859-1") as tar: tarinfo = tar.getmember(dirtype) tar.extract(tarinfo, path=DIR) extracted = os.path.join(DIR, dirtype) @@ -599,7 +607,7 @@ def test_extract_directory(self): def test_extractall_pathlike_name(self): DIR = pathlib.Path(TEMPDIR) / "extractall" with support.temp_dir(DIR), \ - tarfile.open(tarname, encoding="iso8859-1") as tar: + self.tarfile_open(tarname, encoding="iso8859-1") as tar: directories = [t for t in tar if t.isdir()] tar.extractall(DIR, directories) for tarinfo in directories: @@ -610,7 +618,7 @@ def test_extract_pathlike_name(self): dirtype = "ustar/dirtype" DIR = pathlib.Path(TEMPDIR) / "extractall" with support.temp_dir(DIR), \ - tarfile.open(tarname, encoding="iso8859-1") as tar: + self.tarfile_open(tarname, encoding="iso8859-1") as tar: tarinfo = tar.getmember(dirtype) tar.extract(tarinfo, path=DIR) extracted = DIR / dirtype @@ -625,7 +633,7 @@ def test_init_close_fobj(self): fobj.write(b"") try: - tar = object.__new__(tarfile.TarFile) + tar = object.__new__(self.tarfile_module) try: tar.__init__(empty) except tarfile.ReadError: @@ -638,27 +646,26 @@ def test_init_close_fobj(self): def test_parallel_iteration(self): # Issue #16601: Restarting iteration over tarfile continued # from where it left off. - with tarfile.open(self.tarname) as tar: + with self.tarfile_open(self.tarname) as tar: for m1, m2 in zip(tar, tar): self.assertEqual(m1.offset, m2.offset) self.assertEqual(m1.get_info(), m2.get_info()) -class MiscReadTest(MiscReadTestBase, unittest.TestCase): +class MiscReadTest(MiscReadTestBase, unittest.TestCase, TarFileTestBase): test_fail_comp = None -class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): +class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): +class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase, TarFileTestBase): def requires_name_attribute(self): self.skipTest("BZ2File have no name attribute") -class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): +class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase, TarFileTestBase): def requires_name_attribute(self): self.skipTest("LZMAFile have no name attribute") - -class StreamReadTest(CommonReadTest, unittest.TestCase): +class StreamReadTestBase(CommonReadTest): prefix="r|" @@ -693,7 +700,7 @@ def test_provoke_stream_error(self): self.assertRaises(tarfile.StreamError, f.read) def test_compare_members(self): - tar1 = tarfile.open(tarname, encoding="iso8859-1") + tar1 = self.tarfile_open(tarname, encoding="iso8859-1") try: tar2 = self.tar @@ -719,20 +726,23 @@ def test_compare_members(self): finally: tar1.close() -class GzipStreamReadTest(GzipTest, StreamReadTest): +class StreamReadTest(StreamReadTestBase, unittest.TestCase, TarFileTestBase): + pass + +class GzipStreamReadTest(GzipTest, StreamReadTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2StreamReadTest(Bz2Test, StreamReadTest): +class Bz2StreamReadTest(Bz2Test, StreamReadTestBase, unittest.TestCase, TarFileTestBase): pass -class LzmaStreamReadTest(LzmaTest, StreamReadTest): +class LzmaStreamReadTest(LzmaTest, StreamReadTestBase, unittest.TestCase, TarFileTestBase): pass -class DetectReadTest(TarTest, unittest.TestCase): +class DetectReadTestBase(TarTest): def _testfunc_file(self, name, mode): try: - tar = tarfile.open(name, mode) + tar = self.tarfile_open(name, mode) except tarfile.ReadError as e: self.fail() else: @@ -741,7 +751,7 @@ def _testfunc_file(self, name, mode): def _testfunc_fileobj(self, name, mode): try: with open(name, "rb") as f: - tar = tarfile.open(name, mode, fileobj=f) + tar = self.tarfile_open(name, mode, fileobj=f) except tarfile.ReadError as e: self.fail() else: @@ -750,13 +760,13 @@ def _testfunc_fileobj(self, name, mode): def _test_modes(self, testfunc): if self.suffix: with self.assertRaises(tarfile.ReadError): - tarfile.open(tarname, mode="r:" + self.suffix) + self.tarfile_open(tarname, mode="r:" + self.suffix) with self.assertRaises(tarfile.ReadError): - tarfile.open(tarname, mode="r|" + self.suffix) + self.tarfile_open(tarname, mode="r|" + self.suffix) with self.assertRaises(tarfile.ReadError): - tarfile.open(self.tarname, mode="r:") + self.tarfile_open(self.tarname, mode="r:") with self.assertRaises(tarfile.ReadError): - tarfile.open(self.tarname, mode="r|") + self.tarfile_open(self.tarname, mode="r|") testfunc(self.tarname, "r") testfunc(self.tarname, "r:" + self.suffix) testfunc(self.tarname, "r:*") @@ -769,10 +779,7 @@ def test_detect_file(self): def test_detect_fileobj(self): self._test_modes(self._testfunc_fileobj) -class GzipDetectReadTest(GzipTest, DetectReadTest): - pass - -class Bz2DetectReadTest(Bz2Test, DetectReadTest): +class Bz2DetectReadTestBase(Bz2Test, DetectReadTestBase): def test_detect_stream_bz2(self): # Originally, tarfile's stream detection looked for the string # "BZh91" at the start of the file. This is incorrect because @@ -787,11 +794,7 @@ def test_detect_stream_bz2(self): self._testfunc_file(tmpname, "r|*") -class LzmaDetectReadTest(LzmaTest, DetectReadTest): - pass - - -class MemberReadTest(ReadTest, unittest.TestCase): +class MemberReadTestBase(ReadTest): def _test_member(self, tarinfo, chksum=None, **kwargs): if chksum is not None: @@ -881,12 +884,27 @@ def test_find_regtype_oldv7(self): def test_find_pax_umlauts(self): self.tar.close() - self.tar = tarfile.open(self.tarname, mode=self.mode, + self.tar = self.tarfile_open(self.tarname, mode=self.mode, encoding="iso8859-1") tarinfo = self.tar.getmember("pax/umlauts-" "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self._test_member(tarinfo, size=7011, chksum=md5_regtype) +class DetectReadTest(DetectReadTestBase, unittest.TestCase, TarFileTestBase): + pass + +class GzipDetectReadTest(GzipTest, DetectReadTestBase, unittest.TestCase, TarFileTestBase): + pass + +class Bz2DetectReadTest(Bz2DetectReadTestBase, unittest.TestCase, TarFileTestBase): + pass + +class LzmaDetectReadTest(LzmaTest, DetectReadTestBase, unittest.TestCase, TarFileTestBase): + pass + +class MemberReadTest(MemberReadTestBase, unittest.TestCase, TarFileTestBase): + pass + class LongnameTest: @@ -916,7 +934,7 @@ def test_truncated_longname(self): self.tar.fileobj.seek(offset) fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) with self.assertRaises(tarfile.ReadError): - tarfile.open(name="foo.tar", fileobj=fobj) + self.tarfile_open(name="foo.tar", fileobj=fobj) def test_header_offset(self): # Test if the start offset of the TarInfo object includes @@ -930,7 +948,7 @@ def test_header_offset(self): self.assertEqual(tarinfo.type, self.longnametype) -class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): +class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase, TarFileTestBase): subdir = "gnu" longnametype = tarfile.GNUTYPE_LONGNAME @@ -989,13 +1007,13 @@ def _fs_supports_holes(): return False -class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): +class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase, TarFileTestBase): subdir = "pax" longnametype = tarfile.XHDTYPE def test_pax_global_headers(self): - tar = tarfile.open(tarname, encoding="iso8859-1") + tar = self.tarfile_open(tarname, encoding="iso8859-1") try: tarinfo = tar.getmember("pax/regtype1") self.assertEqual(tarinfo.uname, "foo") @@ -1019,7 +1037,7 @@ def test_pax_global_headers(self): def test_pax_number_fields(self): # All following number fields are read from the pax header. - tar = tarfile.open(tarname, encoding="iso8859-1") + tar = self.tarfile_open(tarname, encoding="iso8859-1") try: tarinfo = tar.getmember("pax/regtype4") self.assertEqual(tarinfo.size, 7011) @@ -1033,13 +1051,13 @@ def test_pax_number_fields(self): tar.close() -class WriteTestBase(TarTest): +class WriteTestBaseBase(TarTest): # Put all write tests in here that are supposed to be tested # in all possible mode combinations. def test_fileobj_no_close(self): fobj = io.BytesIO() - with tarfile.open(fileobj=fobj, mode=self.mode) as tar: + with self.tarfile_open(fileobj=fobj, mode=self.mode) as tar: tar.addfile(tarfile.TarInfo("foo")) self.assertFalse(fobj.closed, "external fileobjs must never closed") # Issue #20238: Incomplete gzip output with mode="w:gz" @@ -1054,7 +1072,7 @@ def test_eof_marker(self): # tarfile insists on aligning archives to a 20 * 512 byte recordsize. # So, we create an archive that has exactly 10240 bytes without the # marker, and has 20480 bytes once the marker is written. - with tarfile.open(tmpname, self.mode) as tar: + with self.tarfile_open(tmpname, self.mode) as tar: t = tarfile.TarInfo("foo") t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE tar.addfile(t, io.BytesIO(b"a" * t.size)) @@ -1063,7 +1081,7 @@ def test_eof_marker(self): self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) -class WriteTest(WriteTestBase, unittest.TestCase): +class WriteTestBase(WriteTestBaseBase): prefix = "w:" @@ -1073,14 +1091,14 @@ def test_100_char_name(self): # which implies that a string of exactly 100 chars is stored without # a trailing '\0'. name = "0123456789" * 10 - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: t = tarfile.TarInfo(name) tar.addfile(t) finally: tar.close() - tar = tarfile.open(tmpname) + tar = self.tarfile_open(tmpname) try: self.assertEqual(tar.getnames()[0], name, "failed to store 100 char filename") @@ -1089,7 +1107,7 @@ def test_100_char_name(self): def test_tar_size(self): # Test for bug #1013882. - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: path = os.path.join(TEMPDIR, "file") with open(path, "wb") as fobj: @@ -1102,7 +1120,7 @@ def test_tar_size(self): # The test_*_size tests test for bug #1167128. def test_file_size(self): - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: path = os.path.join(TEMPDIR, "file") with open(path, "wb"): @@ -1121,7 +1139,7 @@ def test_directory_size(self): path = os.path.join(TEMPDIR, "directory") os.mkdir(path) try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: tarinfo = tar.gettarinfo(path) self.assertEqual(tarinfo.size, 0) @@ -1138,7 +1156,7 @@ def test_ordered_recursion(self): open(os.path.join(path, "1"), "a").close() open(os.path.join(path, "2"), "a").close() try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: with unittest.mock.patch('os.listdir') as mock_listdir: mock_listdir.return_value = ["2", "1"] @@ -1155,7 +1173,7 @@ def test_ordered_recursion(self): support.rmdir(path) def test_gettarinfo_pathlike_name(self): - with tarfile.open(tmpname, self.mode) as tar: + with self.tarfile_open(tmpname, self.mode) as tar: path = pathlib.Path(TEMPDIR) / "file" with open(path, "wb") as fobj: fobj.write(b"aaa") @@ -1177,7 +1195,7 @@ def test_link_size(self): except PermissionError as e: self.skipTest('os.link(): %s' % e) try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: # Record the link target in the inodes list. tar.gettarinfo(target) @@ -1194,7 +1212,7 @@ def test_symlink_size(self): path = os.path.join(TEMPDIR, "symlink") os.symlink("link_target", path) try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: tarinfo = tar.gettarinfo(path) self.assertEqual(tarinfo.size, 0) @@ -1206,7 +1224,7 @@ def test_symlink_size(self): def test_add_self(self): # Test for #1257255. dstname = os.path.abspath(tmpname) - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: self.assertEqual(tar.name, dstname, "archive name must be absolute") @@ -1236,7 +1254,7 @@ def filter(tarinfo): tarinfo.uname = "foo" return tarinfo - tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") + tar = self.tarfile_open(tmpname, self.mode, encoding="iso8859-1") try: tar.add(tempdir, arcname="empty_dir", filter=filter) finally: @@ -1246,7 +1264,7 @@ def filter(tarinfo): with self.assertRaises(TypeError): tar.add(tempdir, "empty_dir", True, None, filter) - tar = tarfile.open(tmpname, "r") + tar = self.tarfile_open(tmpname, "r") try: for tarinfo in tar: self.assertEqual(tarinfo.uid, 123) @@ -1270,13 +1288,13 @@ def _test_pathname(self, path, cmp_path=None, dir=False): else: os.mkdir(foo) - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: tar.add(foo, arcname=path) finally: tar.close() - tar = tarfile.open(tmpname, "r") + tar = self.tarfile_open(tmpname, "r") try: t = tar.next() finally: @@ -1302,11 +1320,11 @@ def test_extractall_symlinks(self): with open(source_file,'w') as f: f.write('something\n') os.symlink(source_file, target_file) - with tarfile.open(temparchive, 'w') as tar: + with self.tarfile_open(temparchive, 'w') as tar: tar.add(source_file) tar.add(target_file) # Let's extract it to the location which contains the symlink - with tarfile.open(temparchive) as tar: + with self.tarfile_open(temparchive) as tar: # this should not raise OSError: [Errno 17] File exists try: tar.extractall(path=tempdir) @@ -1343,13 +1361,13 @@ def test_abs_pathnames(self): def test_cwd(self): # Test adding the current working directory. with support.change_cwd(TEMPDIR): - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) try: tar.add(".") finally: tar.close() - tar = tarfile.open(tmpname, "r") + tar = self.tarfile_open(tmpname, "r") try: for t in tar: if t.name != ".": @@ -1368,29 +1386,32 @@ def write(self, data): f = BadFile() with self.assertRaises(exctype): - tar = tarfile.open(tmpname, self.mode, fileobj=f, + tar = self.tarfile_open(tmpname, self.mode, fileobj=f, format=tarfile.PAX_FORMAT, pax_headers={'non': 'empty'}) self.assertFalse(f.closed) -class GzipWriteTest(GzipTest, WriteTest): +class WriteTest(WriteTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2WriteTest(Bz2Test, WriteTest): +class GzipWriteTest(GzipTest, WriteTestBase, unittest.TestCase, TarFileTestBase): pass -class LzmaWriteTest(LzmaTest, WriteTest): +class Bz2WriteTest(Bz2Test, WriteTestBase, unittest.TestCase, TarFileTestBase): pass +class LzmaWriteTest(LzmaTest, WriteTestBase, unittest.TestCase, TarFileTestBase): + pass -class StreamWriteTest(WriteTestBase, unittest.TestCase): + +class StreamWriteTestBase(WriteTestBaseBase): prefix = "w|" decompressor = None def test_stream_padding(self): # Test for bug #1543303. - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) tar.close() if self.decompressor: dec = self.decompressor() @@ -1414,24 +1435,27 @@ def test_file_mode(self): original_umask = os.umask(0o022) try: - tar = tarfile.open(tmpname, self.mode) + tar = self.tarfile_open(tmpname, self.mode) tar.close() mode = os.stat(tmpname).st_mode & 0o777 self.assertEqual(mode, 0o644, "wrong file permissions") finally: os.umask(original_umask) -class GzipStreamWriteTest(GzipTest, StreamWriteTest): +class StreamWriteTest(StreamWriteTestBase, unittest.TestCase, TarFileTestBase): + pass + +class GzipStreamWriteTest(GzipTest, StreamWriteTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): +class Bz2StreamWriteTest(Bz2Test, StreamWriteTestBase, unittest.TestCase, TarFileTestBase): decompressor = bz2.BZ2Decompressor if bz2 else None -class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): +class LzmaStreamWriteTest(LzmaTest, StreamWriteTestBase, unittest.TestCase, TarFileTestBase): decompressor = lzma.LZMADecompressor if lzma else None -class GNUWriteTest(unittest.TestCase): +class GNUWriteTest(unittest.TestCase, TarFileTestBase): # This testcase checks for correct creation of GNU Longname # and Longlink extended headers (cp. bug #812325). @@ -1459,7 +1483,7 @@ def _test(self, name, link=None): tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE - tar = tarfile.open(tmpname, "w") + tar = self.tarfile_open(tmpname, "w") try: tar.format = tarfile.GNU_FORMAT tar.addfile(tarinfo) @@ -1470,7 +1494,7 @@ def _test(self, name, link=None): finally: tar.close() - tar = tarfile.open(tmpname) + tar = self.tarfile_open(tmpname) try: member = tar.next() self.assertIsNotNone(member, @@ -1513,7 +1537,7 @@ def test_longnamelink_1025(self): ("longlnk/" * 127) + "longlink_") -class CreateTest(WriteTestBase, unittest.TestCase): +class CreateTestBase(WriteTestBaseBase, unittest.TestCase, TarFileTestBase): prefix = "x:" @@ -1532,50 +1556,50 @@ def tearDownClass(cls): support.unlink(cls.file_path) def test_create(self): - with tarfile.open(tmpname, self.mode) as tobj: + with self.tarfile_open(tmpname, self.mode) as tobj: tobj.add(self.file_path) - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) def test_create_existing(self): - with tarfile.open(tmpname, self.mode) as tobj: + with self.tarfile_open(tmpname, self.mode) as tobj: tobj.add(self.file_path) with self.assertRaises(FileExistsError): - tobj = tarfile.open(tmpname, self.mode) + tobj = self.tarfile_open(tmpname, self.mode) - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) def test_create_taropen(self): - with self.taropen(tmpname, "x") as tobj: + with self.get_taropen()(tmpname, "x") as tobj: tobj.add(self.file_path) - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) def test_create_existing_taropen(self): - with self.taropen(tmpname, "x") as tobj: + with self.get_taropen()(tmpname, "x") as tobj: tobj.add(self.file_path) with self.assertRaises(FileExistsError): - with self.taropen(tmpname, "x"): + with self.get_taropen()(tmpname, "x"): pass - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn("spameggs42", names[0]) def test_create_pathlike_name(self): - with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: + with self.tarfile_open(pathlib.Path(tmpname), self.mode) as tobj: self.assertIsInstance(tobj.name, str) self.assertEqual(tobj.name, os.path.abspath(tmpname)) tobj.add(pathlib.Path(self.file_path)) @@ -1583,13 +1607,13 @@ def test_create_pathlike_name(self): self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) def test_create_taropen_pathlike_name(self): - with self.taropen(pathlib.Path(tmpname), "x") as tobj: + with self.get_taropen()(pathlib.Path(tmpname), "x") as tobj: self.assertIsInstance(tobj.name, str) self.assertEqual(tobj.name, os.path.abspath(tmpname)) tobj.add(pathlib.Path(self.file_path)) @@ -1597,25 +1621,24 @@ def test_create_taropen_pathlike_name(self): self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) - with self.taropen(tmpname) as tobj: + with self.get_taropen()(tmpname) as tobj: names = tobj.getnames() self.assertEqual(len(names), 1) self.assertIn('spameggs42', names[0]) - -class GzipCreateTest(GzipTest, CreateTest): +class CreateTest(CreateTestBase, unittest.TestCase, TarFileTestBase): pass - -class Bz2CreateTest(Bz2Test, CreateTest): +class GzipCreateTest(GzipTest, CreateTestBase, unittest.TestCase, TarFileTestBase): pass - -class LzmaCreateTest(LzmaTest, CreateTest): +class Bz2CreateTest(Bz2Test, CreateTestBase, unittest.TestCase, TarFileTestBase): pass +class LzmaCreateTest(LzmaTest, CreateTestBase, unittest.TestCase, TarFileTestBase): + pass -class CreateWithXModeTest(CreateTest): +class CreateWithXModeTest(CreateTestBase, unittest.TestCase, TarFileTestBase): prefix = "x" @@ -1624,7 +1647,7 @@ class CreateWithXModeTest(CreateTest): @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") -class HardlinkTest(unittest.TestCase): +class HardlinkTest(unittest.TestCase, TarFileTestBase): # Test the creation of LNKTYPE (hardlink) members in an archive. def setUp(self): @@ -1639,7 +1662,7 @@ def setUp(self): except PermissionError as e: self.skipTest('os.link(): %s' % e) - self.tar = tarfile.open(tmpname, "w") + self.tar = self.tarfile_open(tmpname, "w") self.tar.add(self.foo) def tearDown(self): @@ -1675,13 +1698,13 @@ def _test(self, name, link=None): tarinfo.linkname = link tarinfo.type = tarfile.LNKTYPE - tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) + tar = self.tarfile_open(tmpname, "w", format=tarfile.PAX_FORMAT) try: tar.addfile(tarinfo) finally: tar.close() - tar = tarfile.open(tmpname) + tar = self.tarfile_open(tmpname) try: if link: l = tar.getmembers()[0].linkname @@ -1700,7 +1723,7 @@ def test_pax_global_header(self): "test": "\xe4\xf6\xfc", "\xe4\xf6\xfc": "test"} - tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, + tar = self.tarfile_open(tmpname, "w", format=tarfile.PAX_FORMAT, pax_headers=pax_headers) try: tar.addfile(tarfile.TarInfo("test")) @@ -1708,7 +1731,7 @@ def test_pax_global_header(self): tar.close() # Test if the global header was written correctly. - tar = tarfile.open(tmpname, encoding="iso8859-1") + tar = self.tarfile_open(tmpname, encoding="iso8859-1") try: self.assertEqual(tar.pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) @@ -1729,7 +1752,7 @@ def test_pax_extended_header(self): # TarInfo. pax_headers = {"path": "foo", "uid": "123"} - tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, + tar = self.tarfile_open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") try: t = tarfile.TarInfo() @@ -1740,7 +1763,7 @@ def test_pax_extended_header(self): finally: tar.close() - tar = tarfile.open(tmpname, encoding="iso8859-1") + tar = self.tarfile_open(tmpname, encoding="iso8859-1") try: t = tar.getmembers()[0] self.assertEqual(t.pax_headers, pax_headers) @@ -1762,7 +1785,7 @@ def test_utf8_filename(self): self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): - tar = tarfile.open(tmpname, "w", format=self.format, + tar = self.tarfile_open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") try: name = "\xe4\xf6\xfc" @@ -1770,14 +1793,14 @@ def _test_unicode_filename(self, encoding): finally: tar.close() - tar = tarfile.open(tmpname, encoding=encoding) + tar = self.tarfile_open(tmpname, encoding=encoding) try: self.assertEqual(tar.getmembers()[0].name, name) finally: tar.close() def test_unicode_filename_error(self): - tar = tarfile.open(tmpname, "w", format=self.format, + tar = self.tarfile_open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() @@ -1792,7 +1815,7 @@ def test_unicode_filename_error(self): tar.close() def test_unicode_argument(self): - tar = tarfile.open(tarname, "r", + tar = self.tarfile_open(tarname, "r", encoding="iso8859-1", errors="strict") try: for t in tar: @@ -1808,14 +1831,14 @@ def test_uname_unicode(self): t.uname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc" - tar = tarfile.open(tmpname, mode="w", format=self.format, + tar = self.tarfile_open(tmpname, mode="w", format=self.format, encoding="iso8859-1") try: tar.addfile(t) finally: tar.close() - tar = tarfile.open(tmpname, encoding="iso8859-1") + tar = self.tarfile_open(tmpname, encoding="iso8859-1") try: t = tar.getmember("foo") self.assertEqual(t.uname, "\xe4\xf6\xfc") @@ -1823,7 +1846,7 @@ def test_uname_unicode(self): if self.format != tarfile.PAX_FORMAT: tar.close() - tar = tarfile.open(tmpname, encoding="ascii") + tar = self.tarfile_open(tmpname, encoding="ascii") t = tar.getmember("foo") self.assertEqual(t.uname, "\udce4\udcf6\udcfc") self.assertEqual(t.gname, "\udce4\udcf6\udcfc") @@ -1831,7 +1854,7 @@ def test_uname_unicode(self): tar.close() -class UstarUnicodeTest(UnicodeTest, unittest.TestCase): +class UstarUnicodeTest(UnicodeTest, unittest.TestCase, TarFileTestBase): format = tarfile.USTAR_FORMAT @@ -1870,7 +1893,7 @@ def test_unicode_longname4(self): self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) def _test_ustar_name(self, name, exc=None): - with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: + with self.tarfile_open(tmpname, "w", format=self.format, encoding="utf-8") as tar: t = tarfile.TarInfo(name) if exc is None: tar.addfile(t) @@ -1878,7 +1901,7 @@ def _test_ustar_name(self, name, exc=None): self.assertRaises(exc, tar.addfile, t) if exc is None: - with tarfile.open(tmpname, "r", encoding="utf-8") as tar: + with self.tarfile_open(tmpname, "r", encoding="utf-8") as tar: for t in tar: self.assertEqual(name, t.name) break @@ -1895,7 +1918,7 @@ def test_unicode_link2(self): self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) def _test_ustar_link(self, name, exc=None): - with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: + with self.tarfile_open(tmpname, "w", format=self.format, encoding="utf-8") as tar: t = tarfile.TarInfo("foo") t.linkname = name if exc is None: @@ -1904,13 +1927,13 @@ def _test_ustar_link(self, name, exc=None): self.assertRaises(exc, tar.addfile, t) if exc is None: - with tarfile.open(tmpname, "r", encoding="utf-8") as tar: + with self.tarfile_open(tmpname, "r", encoding="utf-8") as tar: for t in tar: self.assertEqual(name, t.linkname) break -class GNUUnicodeTest(UnicodeTest, unittest.TestCase): +class GNUUnicodeTest(UnicodeTest, unittest.TestCase, TarFileTestBase): format = tarfile.GNU_FORMAT @@ -1920,7 +1943,7 @@ def test_bad_pax_header(self): for encoding, name in ( ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, + with self.tarfile_open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: t = tar.getmember(name) @@ -1928,7 +1951,7 @@ def test_bad_pax_header(self): self.fail("unable to read bad GNU tar pax header") -class PAXUnicodeTest(UnicodeTest, unittest.TestCase): +class PAXUnicodeTest(UnicodeTest, unittest.TestCase, TarFileTestBase): format = tarfile.PAX_FORMAT @@ -1940,7 +1963,7 @@ def test_binary_header(self): for encoding, name in ( ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, + with self.tarfile_open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: t = tar.getmember(name) @@ -1957,26 +1980,26 @@ def setUp(self): support.unlink(self.tarname) def _create_testtar(self, mode="w:"): - with tarfile.open(tarname, encoding="iso8859-1") as src: + with self.tarfile_open(tarname, encoding="iso8859-1") as src: t = src.getmember("ustar/regtype") t.name = "foo" with src.extractfile(t) as f: - with tarfile.open(self.tarname, mode) as tar: + with self.tarfile_open(self.tarname, mode) as tar: tar.addfile(t, f) def test_append_compressed(self): self._create_testtar("w:" + self.suffix) - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") + self.assertRaises(tarfile.ReadError, self.tarfile_open, tmpname, "a") -class AppendTest(AppendTestBase, unittest.TestCase): +class AppendTest(AppendTestBase, unittest.TestCase, TarFileTestBase): test_append_compressed = None def _add_testfile(self, fileobj=None): - with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: + with self.tarfile_open(self.tarname, "a", fileobj=fileobj) as tar: tar.addfile(tarfile.TarInfo("bar")) def _test(self, names=["bar"], fileobj=None): - with tarfile.open(self.tarname, fileobj=fileobj) as tar: + with self.tarfile_open(self.tarname, fileobj=fileobj) as tar: self.assertEqual(tar.getnames(), names) def test_non_existing(self): @@ -1984,7 +2007,7 @@ def test_non_existing(self): self._test() def test_empty(self): - tarfile.open(self.tarname, "w:").close() + self.tarfile_open(self.tarname, "w:").close() self._add_testfile() self._test() @@ -2032,17 +2055,17 @@ def test_trailing_garbage(self): def test_invalid(self): self._test_error(b"a" * 512) -class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): +class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase, TarFileTestBase): pass -class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): +class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase, TarFileTestBase): pass -class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): +class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase, TarFileTestBase): pass -class LimitsTest(unittest.TestCase): +class LimitsTest(unittest.TestCase, TarFileTestBase): def test_ustar_limits(self): # 100 char name @@ -2101,7 +2124,7 @@ def test_pax_limits(self): tarinfo.tobuf(tarfile.PAX_FORMAT) -class MiscTest(unittest.TestCase): +class MiscTest(unittest.TestCase, TarFileTestBase): def test_char_fields(self): self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), @@ -2185,7 +2208,7 @@ def test__all__(self): support.check__all__(self, tarfile, blacklist=blacklist) -class CommandLineTest(unittest.TestCase): +class CommandLineTest(unittest.TestCase, TarFileTestBase): def tarfilecmd(self, *args, **kwargs): rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, @@ -2200,7 +2223,7 @@ def make_simple_tarfile(self, tar_name): support.findfile('tokenize_tests-no-coding-cookie-' 'and-utf8-bom-sig-only.txt')] self.addCleanup(support.unlink, tar_name) - with tarfile.open(tar_name, 'w') as tf: + with self.tarfile_open(tar_name, 'w') as tf: for tardata in files: tf.add(tardata, arcname=os.path.basename(tardata)) @@ -2249,7 +2272,7 @@ def test_test_command_invalid_file(self): def test_list_command(self): for tar_name in testtarnames: with support.captured_stdout() as t: - with tarfile.open(tar_name, 'r') as tf: + with self.tarfile_open(tar_name, 'r') as tf: tf.list(verbose=False) expected = t.getvalue().encode('ascii', 'backslashreplace') for opt in '-l', '--list': @@ -2260,7 +2283,7 @@ def test_list_command(self): def test_list_command_verbose(self): for tar_name in testtarnames: with support.captured_stdout() as t: - with tarfile.open(tar_name, 'r') as tf: + with self.tarfile_open(tar_name, 'r') as tf: tf.list(verbose=True) expected = t.getvalue().encode('ascii', 'backslashreplace') for opt in '-v', '--verbose': @@ -2283,7 +2306,7 @@ def test_create_command(self): try: out = self.tarfilecmd(opt, tmpname, *files) self.assertEqual(out, b'') - with tarfile.open(tmpname) as tar: + with self.tarfile_open(tmpname) as tar: tar.getmembers() finally: support.unlink(tmpname) @@ -2296,7 +2319,7 @@ def test_create_command_verbose(self): try: out = self.tarfilecmd(opt, '-c', tmpname, *files) self.assertIn(b' file created.', out) - with tarfile.open(tmpname) as tar: + with self.tarfile_open(tmpname) as tar: tar.getmembers() finally: support.unlink(tmpname) @@ -2306,7 +2329,7 @@ def test_create_command_dotless_filename(self): try: out = self.tarfilecmd('-c', dotlessname, *files) self.assertEqual(out, b'') - with tarfile.open(dotlessname) as tar: + with self.tarfile_open(dotlessname) as tar: tar.getmembers() finally: support.unlink(dotlessname) @@ -2317,7 +2340,7 @@ def test_create_command_dot_started_filename(self): try: out = self.tarfilecmd('-c', tar_name, *files) self.assertEqual(out, b'') - with tarfile.open(tar_name) as tar: + with self.tarfile_open(tar_name) as tar: tar.getmembers() finally: support.unlink(tar_name) @@ -2332,8 +2355,10 @@ def test_create_command_compressed(self): try: tar_name = tmpname + '.' + filetype.suffix out = self.tarfilecmd('-c', tar_name, *files) - with filetype.taropen(tar_name) as tar: + self.taropen_name = filetype.taropen_name + with self.get_taropen()(tar_name) as tar: tar.getmembers() + del self.taropen_name finally: support.unlink(tar_name) @@ -2375,17 +2400,17 @@ def test_extract_command_invalid_file(self): self.assertEqual(rc, 1) -class ContextManagerTest(unittest.TestCase): +class ContextManagerTest(unittest.TestCase, TarFileTestBase): def test_basic(self): - with tarfile.open(tarname) as tar: + with self.tarfile_open(tarname) as tar: self.assertFalse(tar.closed, "closed inside runtime context") self.assertTrue(tar.closed, "context manager failed") def test_closed(self): # The __enter__() method is supposed to raise OSError # if the TarFile object is already closed. - tar = tarfile.open(tarname) + tar = self.tarfile_open(tarname) tar.close() with self.assertRaises(OSError): with tar: @@ -2394,7 +2419,7 @@ def test_closed(self): def test_exception(self): # Test if the OSError exception is passed through properly. with self.assertRaises(Exception) as exc: - with tarfile.open(tarname) as tar: + with self.tarfile_open(tarname) as tar: raise OSError self.assertIsInstance(exc.exception, OSError, "wrong exception raised in context manager") @@ -2404,7 +2429,7 @@ def test_no_eof(self): # __exit__() must not write end-of-archive blocks if an # exception was raised. try: - with tarfile.open(tmpname, "w") as tar: + with self.tarfile_open(tmpname, "w") as tar: raise Exception except: pass @@ -2415,7 +2440,7 @@ def test_no_eof(self): def test_eof(self): # __exit__() must write end-of-archive blocks, i.e. call # TarFile.close() if there was no error. - with tarfile.open(tmpname, "w"): + with self.tarfile_open(tmpname, "w"): pass self.assertNotEqual(os.path.getsize(tmpname), 0, "context manager wrote no end-of-archive block") @@ -2425,7 +2450,7 @@ def test_fileobj(self): # object. with open(tmpname, "wb") as fobj: try: - with tarfile.open(fileobj=fobj, mode="w") as tar: + with self.tarfile_open(fileobj=fobj, mode="w") as tar: raise Exception except: pass @@ -2434,7 +2459,7 @@ def test_fileobj(self): @unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") -class LinkEmulationTest(ReadTest, unittest.TestCase): +class LinkEmulationTest(ReadTest, unittest.TestCase, TarFileTestBase): # Test for issue #8741 regression. On platforms that do not support # symbolic or hard links tarfile tries to extract these types of members @@ -2467,7 +2492,7 @@ def test_symlink_extraction2(self): self._test_link_extraction("./ustar/linktest2/symtype") -class Bz2PartialReadTest(Bz2Test, unittest.TestCase): +class Bz2PartialReadTest(Bz2Test, unittest.TestCase, TarFileTestBase): # Issue5068: The _BZ2Proxy.read() method loops forever # on an empty or partial bzipped file. @@ -2477,7 +2502,7 @@ class MyBytesIO(io.BytesIO): def read(self, n): if self.hit_eof: raise AssertionError("infinite loop detected in " - "tarfile.open()") + "self.tarfile_open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): @@ -2487,7 +2512,7 @@ def seek(self, *args): data = bz2.compress(tarfile.TarInfo("foo").tobuf()) for x in range(len(data) + 1): try: - tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) + self.tarfile_open(fileobj=MyBytesIO(data[:x]), mode=mode) except tarfile.ReadError: pass # we have no interest in ReadErrors @@ -2512,7 +2537,7 @@ def root_is_uid_gid_0(): @unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") @unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") -class NumericOwnerTest(unittest.TestCase): +class NumericOwnerTest(unittest.TestCase, TarFileTestBase): # mock the following: # os.chown: so we can test what's being called # os.chmod: so the modes are not actually changed. if they are, we can't @@ -2530,7 +2555,7 @@ def _make_test_archive(filename_1, dirname_1, filename_2): (dirname_1, 77, 76, tarfile.DIRTYPE, None), (filename_2, 88, 87, tarfile.REGTYPE, fobj), ] - with tarfile.open(tmpname, 'w') as tarfl: + with NumericOwnerTest.tarfile_open(tmpname, 'w') as tarfl: for name, uid, gid, typ, contents in items: t = tarfile.TarInfo(name) t.uid = uid @@ -2562,7 +2587,7 @@ def _setup_test(mock_geteuid): # open the tarfile for reading. yield it and the names of the items # we stored into the file - with tarfile.open(tar_filename) as tarfl: + with NumericOwnerTest.tarfile_open(tar_filename) as tarfl: yield tarfl, filename_1, dirname_1, filename_2 @unittest.mock.patch('os.chown')