ยปCore Development>Code coverage>Lib/test/test_tarfile.py

Python code coverage for Lib/test/test_tarfile.py

#countcontent
1n/aimport sys
2n/aimport os
3n/aimport io
4n/afrom hashlib import md5
5n/afrom contextlib import contextmanager
6n/afrom random import Random
7n/a
8n/aimport unittest
9n/aimport unittest.mock
10n/aimport tarfile
11n/a
12n/afrom test import support
13n/afrom test.support import script_helper
14n/a
15n/a# Check for our compression modules.
16n/atry:
17n/a import gzip
18n/aexcept ImportError:
19n/a gzip = None
20n/atry:
21n/a import bz2
22n/aexcept ImportError:
23n/a bz2 = None
24n/atry:
25n/a import lzma
26n/aexcept ImportError:
27n/a lzma = None
28n/a
29n/adef md5sum(data):
30n/a return md5(data).hexdigest()
31n/a
32n/aTEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
33n/atarextdir = TEMPDIR + '-extract-test'
34n/atarname = support.findfile("testtar.tar")
35n/agzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
36n/abz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
37n/axzname = os.path.join(TEMPDIR, "testtar.tar.xz")
38n/atmpname = os.path.join(TEMPDIR, "tmp.tar")
39n/adotlessname = os.path.join(TEMPDIR, "testtar")
40n/a
41n/amd5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
42n/amd5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
43n/a
44n/a
45n/aclass TarTest:
46n/a tarname = tarname
47n/a suffix = ''
48n/a open = io.FileIO
49n/a taropen = tarfile.TarFile.taropen
50n/a
51n/a @property
52n/a def mode(self):
53n/a return self.prefix + self.suffix
54n/a
55n/a@support.requires_gzip
56n/aclass GzipTest:
57n/a tarname = gzipname
58n/a suffix = 'gz'
59n/a open = gzip.GzipFile if gzip else None
60n/a taropen = tarfile.TarFile.gzopen
61n/a
62n/a@support.requires_bz2
63n/aclass Bz2Test:
64n/a tarname = bz2name
65n/a suffix = 'bz2'
66n/a open = bz2.BZ2File if bz2 else None
67n/a taropen = tarfile.TarFile.bz2open
68n/a
69n/a@support.requires_lzma
70n/aclass LzmaTest:
71n/a tarname = xzname
72n/a suffix = 'xz'
73n/a open = lzma.LZMAFile if lzma else None
74n/a taropen = tarfile.TarFile.xzopen
75n/a
76n/a
77n/aclass ReadTest(TarTest):
78n/a
79n/a prefix = "r:"
80n/a
81n/a def setUp(self):
82n/a self.tar = tarfile.open(self.tarname, mode=self.mode,
83n/a encoding="iso8859-1")
84n/a
85n/a def tearDown(self):
86n/a self.tar.close()
87n/a
88n/a
89n/aclass UstarReadTest(ReadTest, unittest.TestCase):
90n/a
91n/a def test_fileobj_regular_file(self):
92n/a tarinfo = self.tar.getmember("ustar/regtype")
93n/a with self.tar.extractfile(tarinfo) as fobj:
94n/a data = fobj.read()
95n/a self.assertEqual(len(data), tarinfo.size,
96n/a "regular file extraction failed")
97n/a self.assertEqual(md5sum(data), md5_regtype,
98n/a "regular file extraction failed")
99n/a
100n/a def test_fileobj_readlines(self):
101n/a self.tar.extract("ustar/regtype", TEMPDIR)
102n/a tarinfo = self.tar.getmember("ustar/regtype")
103n/a with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
104n/a lines1 = fobj1.readlines()
105n/a
106n/a with self.tar.extractfile(tarinfo) as fobj:
107n/a fobj2 = io.TextIOWrapper(fobj)
108n/a lines2 = fobj2.readlines()
109n/a self.assertEqual(lines1, lines2,
110n/a "fileobj.readlines() failed")
111n/a self.assertEqual(len(lines2), 114,
112n/a "fileobj.readlines() failed")
113n/a self.assertEqual(lines2[83],
114n/a "I will gladly admit that Python is not the fastest "
115n/a "running scripting language.\n",
116n/a "fileobj.readlines() failed")
117n/a
118n/a def test_fileobj_iter(self):
119n/a self.tar.extract("ustar/regtype", TEMPDIR)
120n/a tarinfo = self.tar.getmember("ustar/regtype")
121n/a with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
122n/a lines1 = fobj1.readlines()
123n/a with self.tar.extractfile(tarinfo) as fobj2:
124n/a lines2 = list(io.TextIOWrapper(fobj2))
125n/a self.assertEqual(lines1, lines2,
126n/a "fileobj.__iter__() failed")
127n/a
128n/a def test_fileobj_seek(self):
129n/a self.tar.extract("ustar/regtype", TEMPDIR)
130n/a with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
131n/a data = fobj.read()
132n/a
133n/a tarinfo = self.tar.getmember("ustar/regtype")
134n/a fobj = self.tar.extractfile(tarinfo)
135n/a
136n/a text = fobj.read()
137n/a fobj.seek(0)
138n/a self.assertEqual(0, fobj.tell(),
139n/a "seek() to file's start failed")
140n/a fobj.seek(2048, 0)
141n/a self.assertEqual(2048, fobj.tell(),
142n/a "seek() to absolute position failed")
143n/a fobj.seek(-1024, 1)
144n/a self.assertEqual(1024, fobj.tell(),
145n/a "seek() to negative relative position failed")
146n/a fobj.seek(1024, 1)
147n/a self.assertEqual(2048, fobj.tell(),
148n/a "seek() to positive relative position failed")
149n/a s = fobj.read(10)
150n/a self.assertEqual(s, data[2048:2058],
151n/a "read() after seek failed")
152n/a fobj.seek(0, 2)
153n/a self.assertEqual(tarinfo.size, fobj.tell(),
154n/a "seek() to file's end failed")
155n/a self.assertEqual(fobj.read(), b"",
156n/a "read() at file's end did not return empty string")
157n/a fobj.seek(-tarinfo.size, 2)
158n/a self.assertEqual(0, fobj.tell(),
159n/a "relative seek() to file's end failed")
160n/a fobj.seek(512)
161n/a s1 = fobj.readlines()
162n/a fobj.seek(512)
163n/a s2 = fobj.readlines()
164n/a self.assertEqual(s1, s2,
165n/a "readlines() after seek failed")
166n/a fobj.seek(0)
167n/a self.assertEqual(len(fobj.readline()), fobj.tell(),
168n/a "tell() after readline() failed")
169n/a fobj.seek(512)
170n/a self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
171n/a "tell() after seek() and readline() failed")
172n/a fobj.seek(0)
173n/a line = fobj.readline()
174n/a self.assertEqual(fobj.read(), data[len(line):],
175n/a "read() after readline() failed")
176n/a fobj.close()
177n/a
178n/a def test_fileobj_text(self):
179n/a with self.tar.extractfile("ustar/regtype") as fobj:
180n/a fobj = io.TextIOWrapper(fobj)
181n/a data = fobj.read().encode("iso8859-1")
182n/a self.assertEqual(md5sum(data), md5_regtype)
183n/a try:
184n/a fobj.seek(100)
185n/a except AttributeError:
186n/a # Issue #13815: seek() complained about a missing
187n/a # flush() method.
188n/a self.fail("seeking failed in text mode")
189n/a
190n/a # Test if symbolic and hard links are resolved by extractfile(). The
191n/a # test link members each point to a regular member whose data is
192n/a # supposed to be exported.
193n/a def _test_fileobj_link(self, lnktype, regtype):
194n/a with self.tar.extractfile(lnktype) as a, \
195n/a self.tar.extractfile(regtype) as b:
196n/a self.assertEqual(a.name, b.name)
197n/a
198n/a def test_fileobj_link1(self):
199n/a self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
200n/a
201n/a def test_fileobj_link2(self):
202n/a self._test_fileobj_link("./ustar/linktest2/lnktype",
203n/a "ustar/linktest1/regtype")
204n/a
205n/a def test_fileobj_symlink1(self):
206n/a self._test_fileobj_link("ustar/symtype", "ustar/regtype")
207n/a
208n/a def test_fileobj_symlink2(self):
209n/a self._test_fileobj_link("./ustar/linktest2/symtype",
210n/a "ustar/linktest1/regtype")
211n/a
212n/a def test_issue14160(self):
213n/a self._test_fileobj_link("symtype2", "ustar/regtype")
214n/a
215n/aclass GzipUstarReadTest(GzipTest, UstarReadTest):
216n/a pass
217n/a
218n/aclass Bz2UstarReadTest(Bz2Test, UstarReadTest):
219n/a pass
220n/a
221n/aclass LzmaUstarReadTest(LzmaTest, UstarReadTest):
222n/a pass
223n/a
224n/a
225n/aclass ListTest(ReadTest, unittest.TestCase):
226n/a
227n/a # Override setUp to use default encoding (UTF-8)
228n/a def setUp(self):
229n/a self.tar = tarfile.open(self.tarname, mode=self.mode)
230n/a
231n/a def test_list(self):
232n/a tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
233n/a with support.swap_attr(sys, 'stdout', tio):
234n/a self.tar.list(verbose=False)
235n/a out = tio.detach().getvalue()
236n/a self.assertIn(b'ustar/conttype', out)
237n/a self.assertIn(b'ustar/regtype', out)
238n/a self.assertIn(b'ustar/lnktype', out)
239n/a self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
240n/a self.assertIn(b'./ustar/linktest2/symtype', out)
241n/a self.assertIn(b'./ustar/linktest2/lnktype', out)
242n/a # Make sure it puts trailing slash for directory
243n/a self.assertIn(b'ustar/dirtype/', out)
244n/a self.assertIn(b'ustar/dirtype-with-size/', out)
245n/a # Make sure it is able to print unencodable characters
246n/a def conv(b):
247n/a s = b.decode(self.tar.encoding, 'surrogateescape')
248n/a return s.encode('ascii', 'backslashreplace')
249n/a self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
250n/a self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
251n/a b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
252n/a self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
253n/a b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
254n/a self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
255n/a self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
256n/a # Make sure it prints files separated by one newline without any
257n/a # 'ls -l'-like accessories if verbose flag is not being used
258n/a # ...
259n/a # ustar/conttype
260n/a # ustar/regtype
261n/a # ...
262n/a self.assertRegex(out, br'ustar/conttype ?\r?\n'
263n/a br'ustar/regtype ?\r?\n')
264n/a # Make sure it does not print the source of link without verbose flag
265n/a self.assertNotIn(b'link to', out)
266n/a self.assertNotIn(b'->', out)
267n/a
268n/a def test_list_verbose(self):
269n/a tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
270n/a with support.swap_attr(sys, 'stdout', tio):
271n/a self.tar.list(verbose=True)
272n/a out = tio.detach().getvalue()
273n/a # Make sure it prints files separated by one newline with 'ls -l'-like
274n/a # accessories if verbose flag is being used
275n/a # ...
276n/a # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype
277n/a # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype
278n/a # ...
279n/a self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
280n/a br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
281n/a br'ustar/\w+type ?\r?\n') * 2)
282n/a # Make sure it prints the source of link with verbose flag
283n/a self.assertIn(b'ustar/symtype -> regtype', out)
284n/a self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
285n/a self.assertIn(b'./ustar/linktest2/lnktype link to '
286n/a b'./ustar/linktest1/regtype', out)
287n/a self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
288n/a (b'/123' * 125) + b'/longname', out)
289n/a self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
290n/a (b'/123' * 125) + b'/longname', out)
291n/a
292n/a def test_list_members(self):
293n/a tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
294n/a def members(tar):
295n/a for tarinfo in tar.getmembers():
296n/a if 'reg' in tarinfo.name:
297n/a yield tarinfo
298n/a with support.swap_attr(sys, 'stdout', tio):
299n/a self.tar.list(verbose=False, members=members(self.tar))
300n/a out = tio.detach().getvalue()
301n/a self.assertIn(b'ustar/regtype', out)
302n/a self.assertNotIn(b'ustar/conttype', out)
303n/a
304n/a
305n/aclass GzipListTest(GzipTest, ListTest):
306n/a pass
307n/a
308n/a
309n/aclass Bz2ListTest(Bz2Test, ListTest):
310n/a pass
311n/a
312n/a
313n/aclass LzmaListTest(LzmaTest, ListTest):
314n/a pass
315n/a
316n/a
317n/aclass CommonReadTest(ReadTest):
318n/a
319n/a def test_empty_tarfile(self):
320n/a # Test for issue6123: Allow opening empty archives.
321n/a # This test checks if tarfile.open() is able to open an empty tar
322n/a # archive successfully. Note that an empty tar archive is not the
323n/a # same as an empty file!
324n/a with tarfile.open(tmpname, self.mode.replace("r", "w")):
325n/a pass
326n/a try:
327n/a tar = tarfile.open(tmpname, self.mode)
328n/a tar.getnames()
329n/a except tarfile.ReadError:
330n/a self.fail("tarfile.open() failed on empty archive")
331n/a else:
332n/a self.assertListEqual(tar.getmembers(), [])
333n/a finally:
334n/a tar.close()
335n/a
336n/a def test_non_existent_tarfile(self):
337n/a # Test for issue11513: prevent non-existent gzipped tarfiles raising
338n/a # multiple exceptions.
339n/a with self.assertRaisesRegex(FileNotFoundError, "xxx"):
340n/a tarfile.open("xxx", self.mode)
341n/a
342n/a def test_null_tarfile(self):
343n/a # Test for issue6123: Allow opening empty archives.
344n/a # This test guarantees that tarfile.open() does not treat an empty
345n/a # file as an empty tar archive.
346n/a with open(tmpname, "wb"):
347n/a pass
348n/a self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
349n/a self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
350n/a
351n/a def test_ignore_zeros(self):
352n/a # Test TarFile's ignore_zeros option.
353n/a # generate 512 pseudorandom bytes
354n/a data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
355n/a for char in (b'\0', b'a'):
356n/a # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
357n/a # are ignored correctly.
358n/a with self.open(tmpname, "w") as fobj:
359n/a fobj.write(char * 1024)
360n/a tarinfo = tarfile.TarInfo("foo")
361n/a tarinfo.size = len(data)
362n/a fobj.write(tarinfo.tobuf())
363n/a fobj.write(data)
364n/a
365n/a tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
366n/a try:
367n/a self.assertListEqual(tar.getnames(), ["foo"],
368n/a "ignore_zeros=True should have skipped the %r-blocks" %
369n/a char)
370n/a finally:
371n/a tar.close()
372n/a
373n/a def test_premature_end_of_archive(self):
374n/a for size in (512, 600, 1024, 1200):
375n/a with tarfile.open(tmpname, "w:") as tar:
376n/a t = tarfile.TarInfo("foo")
377n/a t.size = 1024
378n/a tar.addfile(t, io.BytesIO(b"a" * 1024))
379n/a
380n/a with open(tmpname, "r+b") as fobj:
381n/a fobj.truncate(size)
382n/a
383n/a with tarfile.open(tmpname) as tar:
384n/a with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
385n/a for t in tar:
386n/a pass
387n/a
388n/a with tarfile.open(tmpname) as tar:
389n/a t = tar.next()
390n/a
391n/a with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
392n/a tar.extract(t, TEMPDIR)
393n/a
394n/a with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
395n/a tar.extractfile(t).read()
396n/a
397n/aclass MiscReadTestBase(CommonReadTest):
398n/a def requires_name_attribute(self):
399n/a pass
400n/a
401n/a def test_no_name_argument(self):
402n/a self.requires_name_attribute()
403n/a with open(self.tarname, "rb") as fobj:
404n/a self.assertIsInstance(fobj.name, str)
405n/a with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
406n/a self.assertIsInstance(tar.name, str)
407n/a self.assertEqual(tar.name, os.path.abspath(fobj.name))
408n/a
409n/a def test_no_name_attribute(self):
410n/a with open(self.tarname, "rb") as fobj:
411n/a data = fobj.read()
412n/a fobj = io.BytesIO(data)
413n/a self.assertRaises(AttributeError, getattr, fobj, "name")
414n/a tar = tarfile.open(fileobj=fobj, mode=self.mode)
415n/a self.assertIsNone(tar.name)
416n/a
417n/a def test_empty_name_attribute(self):
418n/a with open(self.tarname, "rb") as fobj:
419n/a data = fobj.read()
420n/a fobj = io.BytesIO(data)
421n/a fobj.name = ""
422n/a with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
423n/a self.assertIsNone(tar.name)
424n/a
425n/a def test_int_name_attribute(self):
426n/a # Issue 21044: tarfile.open() should handle fileobj with an integer
427n/a # 'name' attribute.
428n/a fd = os.open(self.tarname, os.O_RDONLY)
429n/a with open(fd, 'rb') as fobj:
430n/a self.assertIsInstance(fobj.name, int)
431n/a with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
432n/a self.assertIsNone(tar.name)
433n/a
434n/a def test_bytes_name_attribute(self):
435n/a self.requires_name_attribute()
436n/a tarname = os.fsencode(self.tarname)
437n/a with open(tarname, 'rb') as fobj:
438n/a self.assertIsInstance(fobj.name, bytes)
439n/a with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
440n/a self.assertIsInstance(tar.name, bytes)
441n/a self.assertEqual(tar.name, os.path.abspath(fobj.name))
442n/a
443n/a def test_illegal_mode_arg(self):
444n/a with open(tmpname, 'wb'):
445n/a pass
446n/a with self.assertRaisesRegex(ValueError, 'mode must be '):
447n/a tar = self.taropen(tmpname, 'q')
448n/a with self.assertRaisesRegex(ValueError, 'mode must be '):
449n/a tar = self.taropen(tmpname, 'rw')
450n/a with self.assertRaisesRegex(ValueError, 'mode must be '):
451n/a tar = self.taropen(tmpname, '')
452n/a
453n/a def test_fileobj_with_offset(self):
454n/a # Skip the first member and store values from the second member
455n/a # of the testtar.
456n/a tar = tarfile.open(self.tarname, mode=self.mode)
457n/a try:
458n/a tar.next()
459n/a t = tar.next()
460n/a name = t.name
461n/a offset = t.offset
462n/a with tar.extractfile(t) as f:
463n/a data = f.read()
464n/a finally:
465n/a tar.close()
466n/a
467n/a # Open the testtar and seek to the offset of the second member.
468n/a with self.open(self.tarname) as fobj:
469n/a fobj.seek(offset)
470n/a
471n/a # Test if the tarfile starts with the second member.
472n/a tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
473n/a t = tar.next()
474n/a self.assertEqual(t.name, name)
475n/a # Read to the end of fileobj and test if seeking back to the
476n/a # beginning works.
477n/a tar.getmembers()
478n/a self.assertEqual(tar.extractfile(t).read(), data,
479n/a "seek back did not work")
480n/a tar.close()
481n/a
482n/a def test_fail_comp(self):
483n/a # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
484n/a self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
485n/a with open(tarname, "rb") as fobj:
486n/a self.assertRaises(tarfile.ReadError, tarfile.open,
487n/a fileobj=fobj, mode=self.mode)
488n/a
489n/a def test_v7_dirtype(self):
490n/a # Test old style dirtype member (bug #1336623):
491n/a # Old V7 tars create directory members using an AREGTYPE
492n/a # header with a "/" appended to the filename field.
493n/a tarinfo = self.tar.getmember("misc/dirtype-old-v7")
494n/a self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
495n/a "v7 dirtype failed")
496n/a
497n/a def test_xstar_type(self):
498n/a # The xstar format stores extra atime and ctime fields inside the
499n/a # space reserved for the prefix field. The prefix field must be
500n/a # ignored in this case, otherwise it will mess up the name.
501n/a try:
502n/a self.tar.getmember("misc/regtype-xstar")
503n/a except KeyError:
504n/a self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
505n/a
506n/a def test_check_members(self):
507n/a for tarinfo in self.tar:
508n/a self.assertEqual(int(tarinfo.mtime), 0o7606136617,
509n/a "wrong mtime for %s" % tarinfo.name)
510n/a if not tarinfo.name.startswith("ustar/"):
511n/a continue
512n/a self.assertEqual(tarinfo.uname, "tarfile",
513n/a "wrong uname for %s" % tarinfo.name)
514n/a
515n/a def test_find_members(self):
516n/a self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
517n/a "could not find all members")
518n/a
519n/a @unittest.skipUnless(hasattr(os, "link"),
520n/a "Missing hardlink implementation")
521n/a @support.skip_unless_symlink
522n/a def test_extract_hardlink(self):
523n/a # Test hardlink extraction (e.g. bug #857297).
524n/a with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
525n/a tar.extract("ustar/regtype", TEMPDIR)
526n/a self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
527n/a
528n/a tar.extract("ustar/lnktype", TEMPDIR)
529n/a self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
530n/a with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
531n/a data = f.read()
532n/a self.assertEqual(md5sum(data), md5_regtype)
533n/a
534n/a tar.extract("ustar/symtype", TEMPDIR)
535n/a self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
536n/a with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
537n/a data = f.read()
538n/a self.assertEqual(md5sum(data), md5_regtype)
539n/a
540n/a def test_extractall(self):
541n/a # Test if extractall() correctly restores directory permissions
542n/a # and times (see issue1735).
543n/a tar = tarfile.open(tarname, encoding="iso8859-1")
544n/a DIR = os.path.join(TEMPDIR, "extractall")
545n/a os.mkdir(DIR)
546n/a try:
547n/a directories = [t for t in tar if t.isdir()]
548n/a tar.extractall(DIR, directories)
549n/a for tarinfo in directories:
550n/a path = os.path.join(DIR, tarinfo.name)
551n/a if sys.platform != "win32":
552n/a # Win32 has no support for fine grained permissions.
553n/a self.assertEqual(tarinfo.mode & 0o777,
554n/a os.stat(path).st_mode & 0o777)
555n/a def format_mtime(mtime):
556n/a if isinstance(mtime, float):
557n/a return "{} ({})".format(mtime, mtime.hex())
558n/a else:
559n/a return "{!r} (int)".format(mtime)
560n/a file_mtime = os.path.getmtime(path)
561n/a errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
562n/a format_mtime(tarinfo.mtime),
563n/a format_mtime(file_mtime),
564n/a path)
565n/a self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
566n/a finally:
567n/a tar.close()
568n/a support.rmtree(DIR)
569n/a
570n/a def test_extract_directory(self):
571n/a dirtype = "ustar/dirtype"
572n/a DIR = os.path.join(TEMPDIR, "extractdir")
573n/a os.mkdir(DIR)
574n/a try:
575n/a with tarfile.open(tarname, encoding="iso8859-1") as tar:
576n/a tarinfo = tar.getmember(dirtype)
577n/a tar.extract(tarinfo, path=DIR)
578n/a extracted = os.path.join(DIR, dirtype)
579n/a self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
580n/a if sys.platform != "win32":
581n/a self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
582n/a finally:
583n/a support.rmtree(DIR)
584n/a
585n/a def test_init_close_fobj(self):
586n/a # Issue #7341: Close the internal file object in the TarFile
587n/a # constructor in case of an error. For the test we rely on
588n/a # the fact that opening an empty file raises a ReadError.
589n/a empty = os.path.join(TEMPDIR, "empty")
590n/a with open(empty, "wb") as fobj:
591n/a fobj.write(b"")
592n/a
593n/a try:
594n/a tar = object.__new__(tarfile.TarFile)
595n/a try:
596n/a tar.__init__(empty)
597n/a except tarfile.ReadError:
598n/a self.assertTrue(tar.fileobj.closed)
599n/a else:
600n/a self.fail("ReadError not raised")
601n/a finally:
602n/a support.unlink(empty)
603n/a
604n/a def test_parallel_iteration(self):
605n/a # Issue #16601: Restarting iteration over tarfile continued
606n/a # from where it left off.
607n/a with tarfile.open(self.tarname) as tar:
608n/a for m1, m2 in zip(tar, tar):
609n/a self.assertEqual(m1.offset, m2.offset)
610n/a self.assertEqual(m1.get_info(), m2.get_info())
611n/a
612n/aclass MiscReadTest(MiscReadTestBase, unittest.TestCase):
613n/a test_fail_comp = None
614n/a
615n/aclass GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
616n/a pass
617n/a
618n/aclass Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
619n/a def requires_name_attribute(self):
620n/a self.skipTest("BZ2File have no name attribute")
621n/a
622n/aclass LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
623n/a def requires_name_attribute(self):
624n/a self.skipTest("LZMAFile have no name attribute")
625n/a
626n/a
627n/aclass StreamReadTest(CommonReadTest, unittest.TestCase):
628n/a
629n/a prefix="r|"
630n/a
631n/a def test_read_through(self):
632n/a # Issue #11224: A poorly designed _FileInFile.read() method
633n/a # caused seeking errors with stream tar files.
634n/a for tarinfo in self.tar:
635n/a if not tarinfo.isreg():
636n/a continue
637n/a with self.tar.extractfile(tarinfo) as fobj:
638n/a while True:
639n/a try:
640n/a buf = fobj.read(512)
641n/a except tarfile.StreamError:
642n/a self.fail("simple read-through using "
643n/a "TarFile.extractfile() failed")
644n/a if not buf:
645n/a break
646n/a
647n/a def test_fileobj_regular_file(self):
648n/a tarinfo = self.tar.next() # get "regtype" (can't use getmember)
649n/a with self.tar.extractfile(tarinfo) as fobj:
650n/a data = fobj.read()
651n/a self.assertEqual(len(data), tarinfo.size,
652n/a "regular file extraction failed")
653n/a self.assertEqual(md5sum(data), md5_regtype,
654n/a "regular file extraction failed")
655n/a
656n/a def test_provoke_stream_error(self):
657n/a tarinfos = self.tar.getmembers()
658n/a with self.tar.extractfile(tarinfos[0]) as f: # read the first member
659n/a self.assertRaises(tarfile.StreamError, f.read)
660n/a
661n/a def test_compare_members(self):
662n/a tar1 = tarfile.open(tarname, encoding="iso8859-1")
663n/a try:
664n/a tar2 = self.tar
665n/a
666n/a while True:
667n/a t1 = tar1.next()
668n/a t2 = tar2.next()
669n/a if t1 is None:
670n/a break
671n/a self.assertIsNotNone(t2, "stream.next() failed.")
672n/a
673n/a if t2.islnk() or t2.issym():
674n/a with self.assertRaises(tarfile.StreamError):
675n/a tar2.extractfile(t2)
676n/a continue
677n/a
678n/a v1 = tar1.extractfile(t1)
679n/a v2 = tar2.extractfile(t2)
680n/a if v1 is None:
681n/a continue
682n/a self.assertIsNotNone(v2, "stream.extractfile() failed")
683n/a self.assertEqual(v1.read(), v2.read(),
684n/a "stream extraction failed")
685n/a finally:
686n/a tar1.close()
687n/a
688n/aclass GzipStreamReadTest(GzipTest, StreamReadTest):
689n/a pass
690n/a
691n/aclass Bz2StreamReadTest(Bz2Test, StreamReadTest):
692n/a pass
693n/a
694n/aclass LzmaStreamReadTest(LzmaTest, StreamReadTest):
695n/a pass
696n/a
697n/a
698n/aclass DetectReadTest(TarTest, unittest.TestCase):
699n/a def _testfunc_file(self, name, mode):
700n/a try:
701n/a tar = tarfile.open(name, mode)
702n/a except tarfile.ReadError as e:
703n/a self.fail()
704n/a else:
705n/a tar.close()
706n/a
707n/a def _testfunc_fileobj(self, name, mode):
708n/a try:
709n/a with open(name, "rb") as f:
710n/a tar = tarfile.open(name, mode, fileobj=f)
711n/a except tarfile.ReadError as e:
712n/a self.fail()
713n/a else:
714n/a tar.close()
715n/a
716n/a def _test_modes(self, testfunc):
717n/a if self.suffix:
718n/a with self.assertRaises(tarfile.ReadError):
719n/a tarfile.open(tarname, mode="r:" + self.suffix)
720n/a with self.assertRaises(tarfile.ReadError):
721n/a tarfile.open(tarname, mode="r|" + self.suffix)
722n/a with self.assertRaises(tarfile.ReadError):
723n/a tarfile.open(self.tarname, mode="r:")
724n/a with self.assertRaises(tarfile.ReadError):
725n/a tarfile.open(self.tarname, mode="r|")
726n/a testfunc(self.tarname, "r")
727n/a testfunc(self.tarname, "r:" + self.suffix)
728n/a testfunc(self.tarname, "r:*")
729n/a testfunc(self.tarname, "r|" + self.suffix)
730n/a testfunc(self.tarname, "r|*")
731n/a
732n/a def test_detect_file(self):
733n/a self._test_modes(self._testfunc_file)
734n/a
735n/a def test_detect_fileobj(self):
736n/a self._test_modes(self._testfunc_fileobj)
737n/a
738n/aclass GzipDetectReadTest(GzipTest, DetectReadTest):
739n/a pass
740n/a
741n/aclass Bz2DetectReadTest(Bz2Test, DetectReadTest):
742n/a def test_detect_stream_bz2(self):
743n/a # Originally, tarfile's stream detection looked for the string
744n/a # "BZh91" at the start of the file. This is incorrect because
745n/a # the '9' represents the blocksize (900kB). If the file was
746n/a # compressed using another blocksize autodetection fails.
747n/a with open(tarname, "rb") as fobj:
748n/a data = fobj.read()
749n/a
750n/a # Compress with blocksize 100kB, the file starts with "BZh11".
751n/a with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
752n/a fobj.write(data)
753n/a
754n/a self._testfunc_file(tmpname, "r|*")
755n/a
756n/aclass LzmaDetectReadTest(LzmaTest, DetectReadTest):
757n/a pass
758n/a
759n/a
760n/aclass MemberReadTest(ReadTest, unittest.TestCase):
761n/a
762n/a def _test_member(self, tarinfo, chksum=None, **kwargs):
763n/a if chksum is not None:
764n/a with self.tar.extractfile(tarinfo) as f:
765n/a self.assertEqual(md5sum(f.read()), chksum,
766n/a "wrong md5sum for %s" % tarinfo.name)
767n/a
768n/a kwargs["mtime"] = 0o7606136617
769n/a kwargs["uid"] = 1000
770n/a kwargs["gid"] = 100
771n/a if "old-v7" not in tarinfo.name:
772n/a # V7 tar can't handle alphabetic owners.
773n/a kwargs["uname"] = "tarfile"
774n/a kwargs["gname"] = "tarfile"
775n/a for k, v in kwargs.items():
776n/a self.assertEqual(getattr(tarinfo, k), v,
777n/a "wrong value in %s field of %s" % (k, tarinfo.name))
778n/a
779n/a def test_find_regtype(self):
780n/a tarinfo = self.tar.getmember("ustar/regtype")
781n/a self._test_member(tarinfo, size=7011, chksum=md5_regtype)
782n/a
783n/a def test_find_conttype(self):
784n/a tarinfo = self.tar.getmember("ustar/conttype")
785n/a self._test_member(tarinfo, size=7011, chksum=md5_regtype)
786n/a
787n/a def test_find_dirtype(self):
788n/a tarinfo = self.tar.getmember("ustar/dirtype")
789n/a self._test_member(tarinfo, size=0)
790n/a
791n/a def test_find_dirtype_with_size(self):
792n/a tarinfo = self.tar.getmember("ustar/dirtype-with-size")
793n/a self._test_member(tarinfo, size=255)
794n/a
795n/a def test_find_lnktype(self):
796n/a tarinfo = self.tar.getmember("ustar/lnktype")
797n/a self._test_member(tarinfo, size=0, linkname="ustar/regtype")
798n/a
799n/a def test_find_symtype(self):
800n/a tarinfo = self.tar.getmember("ustar/symtype")
801n/a self._test_member(tarinfo, size=0, linkname="regtype")
802n/a
803n/a def test_find_blktype(self):
804n/a tarinfo = self.tar.getmember("ustar/blktype")
805n/a self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
806n/a
807n/a def test_find_chrtype(self):
808n/a tarinfo = self.tar.getmember("ustar/chrtype")
809n/a self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
810n/a
811n/a def test_find_fifotype(self):
812n/a tarinfo = self.tar.getmember("ustar/fifotype")
813n/a self._test_member(tarinfo, size=0)
814n/a
815n/a def test_find_sparse(self):
816n/a tarinfo = self.tar.getmember("ustar/sparse")
817n/a self._test_member(tarinfo, size=86016, chksum=md5_sparse)
818n/a
819n/a def test_find_gnusparse(self):
820n/a tarinfo = self.tar.getmember("gnu/sparse")
821n/a self._test_member(tarinfo, size=86016, chksum=md5_sparse)
822n/a
823n/a def test_find_gnusparse_00(self):
824n/a tarinfo = self.tar.getmember("gnu/sparse-0.0")
825n/a self._test_member(tarinfo, size=86016, chksum=md5_sparse)
826n/a
827n/a def test_find_gnusparse_01(self):
828n/a tarinfo = self.tar.getmember("gnu/sparse-0.1")
829n/a self._test_member(tarinfo, size=86016, chksum=md5_sparse)
830n/a
831n/a def test_find_gnusparse_10(self):
832n/a tarinfo = self.tar.getmember("gnu/sparse-1.0")
833n/a self._test_member(tarinfo, size=86016, chksum=md5_sparse)
834n/a
835n/a def test_find_umlauts(self):
836n/a tarinfo = self.tar.getmember("ustar/umlauts-"
837n/a "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
838n/a self._test_member(tarinfo, size=7011, chksum=md5_regtype)
839n/a
840n/a def test_find_ustar_longname(self):
841n/a name = "ustar/" + "12345/" * 39 + "1234567/longname"
842n/a self.assertIn(name, self.tar.getnames())
843n/a
844n/a def test_find_regtype_oldv7(self):
845n/a tarinfo = self.tar.getmember("misc/regtype-old-v7")
846n/a self._test_member(tarinfo, size=7011, chksum=md5_regtype)
847n/a
848n/a def test_find_pax_umlauts(self):
849n/a self.tar.close()
850n/a self.tar = tarfile.open(self.tarname, mode=self.mode,
851n/a encoding="iso8859-1")
852n/a tarinfo = self.tar.getmember("pax/umlauts-"
853n/a "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
854n/a self._test_member(tarinfo, size=7011, chksum=md5_regtype)
855n/a
856n/a
857n/aclass LongnameTest:
858n/a
859n/a def test_read_longname(self):
860n/a # Test reading of longname (bug #1471427).
861n/a longname = self.subdir + "/" + "123/" * 125 + "longname"
862n/a try:
863n/a tarinfo = self.tar.getmember(longname)
864n/a except KeyError:
865n/a self.fail("longname not found")
866n/a self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
867n/a "read longname as dirtype")
868n/a
869n/a def test_read_longlink(self):
870n/a longname = self.subdir + "/" + "123/" * 125 + "longname"
871n/a longlink = self.subdir + "/" + "123/" * 125 + "longlink"
872n/a try:
873n/a tarinfo = self.tar.getmember(longlink)
874n/a except KeyError:
875n/a self.fail("longlink not found")
876n/a self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
877n/a
878n/a def test_truncated_longname(self):
879n/a longname = self.subdir + "/" + "123/" * 125 + "longname"
880n/a tarinfo = self.tar.getmember(longname)
881n/a offset = tarinfo.offset
882n/a self.tar.fileobj.seek(offset)
883n/a fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
884n/a with self.assertRaises(tarfile.ReadError):
885n/a tarfile.open(name="foo.tar", fileobj=fobj)
886n/a
887n/a def test_header_offset(self):
888n/a # Test if the start offset of the TarInfo object includes
889n/a # the preceding extended header.
890n/a longname = self.subdir + "/" + "123/" * 125 + "longname"
891n/a offset = self.tar.getmember(longname).offset
892n/a with open(tarname, "rb") as fobj:
893n/a fobj.seek(offset)
894n/a tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
895n/a "iso8859-1", "strict")
896n/a self.assertEqual(tarinfo.type, self.longnametype)
897n/a
898n/a
899n/aclass GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
900n/a
901n/a subdir = "gnu"
902n/a longnametype = tarfile.GNUTYPE_LONGNAME
903n/a
904n/a # Since 3.2 tarfile is supposed to accurately restore sparse members and
905n/a # produce files with holes. This is what we actually want to test here.
906n/a # Unfortunately, not all platforms/filesystems support sparse files, and
907n/a # even on platforms that do it is non-trivial to make reliable assertions
908n/a # about holes in files. Therefore, we first do one basic test which works
909n/a # an all platforms, and after that a test that will work only on
910n/a # platforms/filesystems that prove to support sparse files.
911n/a def _test_sparse_file(self, name):
912n/a self.tar.extract(name, TEMPDIR)
913n/a filename = os.path.join(TEMPDIR, name)
914n/a with open(filename, "rb") as fobj:
915n/a data = fobj.read()
916n/a self.assertEqual(md5sum(data), md5_sparse,
917n/a "wrong md5sum for %s" % name)
918n/a
919n/a if self._fs_supports_holes():
920n/a s = os.stat(filename)
921n/a self.assertLess(s.st_blocks * 512, s.st_size)
922n/a
923n/a def test_sparse_file_old(self):
924n/a self._test_sparse_file("gnu/sparse")
925n/a
926n/a def test_sparse_file_00(self):
927n/a self._test_sparse_file("gnu/sparse-0.0")
928n/a
929n/a def test_sparse_file_01(self):
930n/a self._test_sparse_file("gnu/sparse-0.1")
931n/a
932n/a def test_sparse_file_10(self):
933n/a self._test_sparse_file("gnu/sparse-1.0")
934n/a
935n/a @staticmethod
936n/a def _fs_supports_holes():
937n/a # Return True if the platform knows the st_blocks stat attribute and
938n/a # uses st_blocks units of 512 bytes, and if the filesystem is able to
939n/a # store holes in files.
940n/a if sys.platform.startswith("linux"):
941n/a # Linux evidentially has 512 byte st_blocks units.
942n/a name = os.path.join(TEMPDIR, "sparse-test")
943n/a with open(name, "wb") as fobj:
944n/a fobj.seek(4096)
945n/a fobj.truncate()
946n/a s = os.stat(name)
947n/a support.unlink(name)
948n/a return s.st_blocks == 0
949n/a else:
950n/a return False
951n/a
952n/a
953n/aclass PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
954n/a
955n/a subdir = "pax"
956n/a longnametype = tarfile.XHDTYPE
957n/a
958n/a def test_pax_global_headers(self):
959n/a tar = tarfile.open(tarname, encoding="iso8859-1")
960n/a try:
961n/a tarinfo = tar.getmember("pax/regtype1")
962n/a self.assertEqual(tarinfo.uname, "foo")
963n/a self.assertEqual(tarinfo.gname, "bar")
964n/a self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
965n/a "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
966n/a
967n/a tarinfo = tar.getmember("pax/regtype2")
968n/a self.assertEqual(tarinfo.uname, "")
969n/a self.assertEqual(tarinfo.gname, "bar")
970n/a self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
971n/a "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
972n/a
973n/a tarinfo = tar.getmember("pax/regtype3")
974n/a self.assertEqual(tarinfo.uname, "tarfile")
975n/a self.assertEqual(tarinfo.gname, "tarfile")
976n/a self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
977n/a "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
978n/a finally:
979n/a tar.close()
980n/a
981n/a def test_pax_number_fields(self):
982n/a # All following number fields are read from the pax header.
983n/a tar = tarfile.open(tarname, encoding="iso8859-1")
984n/a try:
985n/a tarinfo = tar.getmember("pax/regtype4")
986n/a self.assertEqual(tarinfo.size, 7011)
987n/a self.assertEqual(tarinfo.uid, 123)
988n/a self.assertEqual(tarinfo.gid, 123)
989n/a self.assertEqual(tarinfo.mtime, 1041808783.0)
990n/a self.assertEqual(type(tarinfo.mtime), float)
991n/a self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
992n/a self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
993n/a finally:
994n/a tar.close()
995n/a
996n/a
997n/aclass WriteTestBase(TarTest):
998n/a # Put all write tests in here that are supposed to be tested
999n/a # in all possible mode combinations.
1000n/a
1001n/a def test_fileobj_no_close(self):
1002n/a fobj = io.BytesIO()
1003n/a tar = tarfile.open(fileobj=fobj, mode=self.mode)
1004n/a tar.addfile(tarfile.TarInfo("foo"))
1005n/a tar.close()
1006n/a self.assertFalse(fobj.closed, "external fileobjs must never closed")
1007n/a # Issue #20238: Incomplete gzip output with mode="w:gz"
1008n/a data = fobj.getvalue()
1009n/a del tar
1010n/a support.gc_collect()
1011n/a self.assertFalse(fobj.closed)
1012n/a self.assertEqual(data, fobj.getvalue())
1013n/a
1014n/a def test_eof_marker(self):
1015n/a # Make sure an end of archive marker is written (two zero blocks).
1016n/a # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1017n/a # So, we create an archive that has exactly 10240 bytes without the
1018n/a # marker, and has 20480 bytes once the marker is written.
1019n/a with tarfile.open(tmpname, self.mode) as tar:
1020n/a t = tarfile.TarInfo("foo")
1021n/a t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1022n/a tar.addfile(t, io.BytesIO(b"a" * t.size))
1023n/a
1024n/a with self.open(tmpname, "rb") as fobj:
1025n/a self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1026n/a
1027n/a
1028n/aclass WriteTest(WriteTestBase, unittest.TestCase):
1029n/a
1030n/a prefix = "w:"
1031n/a
1032n/a def test_100_char_name(self):
1033n/a # The name field in a tar header stores strings of at most 100 chars.
1034n/a # If a string is shorter than 100 chars it has to be padded with '\0',
1035n/a # which implies that a string of exactly 100 chars is stored without
1036n/a # a trailing '\0'.
1037n/a name = "0123456789" * 10
1038n/a tar = tarfile.open(tmpname, self.mode)
1039n/a try:
1040n/a t = tarfile.TarInfo(name)
1041n/a tar.addfile(t)
1042n/a finally:
1043n/a tar.close()
1044n/a
1045n/a tar = tarfile.open(tmpname)
1046n/a try:
1047n/a self.assertEqual(tar.getnames()[0], name,
1048n/a "failed to store 100 char filename")
1049n/a finally:
1050n/a tar.close()
1051n/a
1052n/a def test_tar_size(self):
1053n/a # Test for bug #1013882.
1054n/a tar = tarfile.open(tmpname, self.mode)
1055n/a try:
1056n/a path = os.path.join(TEMPDIR, "file")
1057n/a with open(path, "wb") as fobj:
1058n/a fobj.write(b"aaa")
1059n/a tar.add(path)
1060n/a finally:
1061n/a tar.close()
1062n/a self.assertGreater(os.path.getsize(tmpname), 0,
1063n/a "tarfile is empty")
1064n/a
1065n/a # The test_*_size tests test for bug #1167128.
1066n/a def test_file_size(self):
1067n/a tar = tarfile.open(tmpname, self.mode)
1068n/a try:
1069n/a path = os.path.join(TEMPDIR, "file")
1070n/a with open(path, "wb"):
1071n/a pass
1072n/a tarinfo = tar.gettarinfo(path)
1073n/a self.assertEqual(tarinfo.size, 0)
1074n/a
1075n/a with open(path, "wb") as fobj:
1076n/a fobj.write(b"aaa")
1077n/a tarinfo = tar.gettarinfo(path)
1078n/a self.assertEqual(tarinfo.size, 3)
1079n/a finally:
1080n/a tar.close()
1081n/a
1082n/a def test_directory_size(self):
1083n/a path = os.path.join(TEMPDIR, "directory")
1084n/a os.mkdir(path)
1085n/a try:
1086n/a tar = tarfile.open(tmpname, self.mode)
1087n/a try:
1088n/a tarinfo = tar.gettarinfo(path)
1089n/a self.assertEqual(tarinfo.size, 0)
1090n/a finally:
1091n/a tar.close()
1092n/a finally:
1093n/a support.rmdir(path)
1094n/a
1095n/a @unittest.skipUnless(hasattr(os, "link"),
1096n/a "Missing hardlink implementation")
1097n/a def test_link_size(self):
1098n/a link = os.path.join(TEMPDIR, "link")
1099n/a target = os.path.join(TEMPDIR, "link_target")
1100n/a with open(target, "wb") as fobj:
1101n/a fobj.write(b"aaa")
1102n/a os.link(target, link)
1103n/a try:
1104n/a tar = tarfile.open(tmpname, self.mode)
1105n/a try:
1106n/a # Record the link target in the inodes list.
1107n/a tar.gettarinfo(target)
1108n/a tarinfo = tar.gettarinfo(link)
1109n/a self.assertEqual(tarinfo.size, 0)
1110n/a finally:
1111n/a tar.close()
1112n/a finally:
1113n/a support.unlink(target)
1114n/a support.unlink(link)
1115n/a
1116n/a @support.skip_unless_symlink
1117n/a def test_symlink_size(self):
1118n/a path = os.path.join(TEMPDIR, "symlink")
1119n/a os.symlink("link_target", path)
1120n/a try:
1121n/a tar = tarfile.open(tmpname, self.mode)
1122n/a try:
1123n/a tarinfo = tar.gettarinfo(path)
1124n/a self.assertEqual(tarinfo.size, 0)
1125n/a finally:
1126n/a tar.close()
1127n/a finally:
1128n/a support.unlink(path)
1129n/a
1130n/a def test_add_self(self):
1131n/a # Test for #1257255.
1132n/a dstname = os.path.abspath(tmpname)
1133n/a tar = tarfile.open(tmpname, self.mode)
1134n/a try:
1135n/a self.assertEqual(tar.name, dstname,
1136n/a "archive name must be absolute")
1137n/a tar.add(dstname)
1138n/a self.assertEqual(tar.getnames(), [],
1139n/a "added the archive to itself")
1140n/a
1141n/a with support.change_cwd(TEMPDIR):
1142n/a tar.add(dstname)
1143n/a self.assertEqual(tar.getnames(), [],
1144n/a "added the archive to itself")
1145n/a finally:
1146n/a tar.close()
1147n/a
1148n/a def test_filter(self):
1149n/a tempdir = os.path.join(TEMPDIR, "filter")
1150n/a os.mkdir(tempdir)
1151n/a try:
1152n/a for name in ("foo", "bar", "baz"):
1153n/a name = os.path.join(tempdir, name)
1154n/a support.create_empty_file(name)
1155n/a
1156n/a def filter(tarinfo):
1157n/a if os.path.basename(tarinfo.name) == "bar":
1158n/a return
1159n/a tarinfo.uid = 123
1160n/a tarinfo.uname = "foo"
1161n/a return tarinfo
1162n/a
1163n/a tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1164n/a try:
1165n/a tar.add(tempdir, arcname="empty_dir", filter=filter)
1166n/a finally:
1167n/a tar.close()
1168n/a
1169n/a # Verify that filter is a keyword-only argument
1170n/a with self.assertRaises(TypeError):
1171n/a tar.add(tempdir, "empty_dir", True, None, filter)
1172n/a
1173n/a tar = tarfile.open(tmpname, "r")
1174n/a try:
1175n/a for tarinfo in tar:
1176n/a self.assertEqual(tarinfo.uid, 123)
1177n/a self.assertEqual(tarinfo.uname, "foo")
1178n/a self.assertEqual(len(tar.getmembers()), 3)
1179n/a finally:
1180n/a tar.close()
1181n/a finally:
1182n/a support.rmtree(tempdir)
1183n/a
1184n/a # Guarantee that stored pathnames are not modified. Don't
1185n/a # remove ./ or ../ or double slashes. Still make absolute
1186n/a # pathnames relative.
1187n/a # For details see bug #6054.
1188n/a def _test_pathname(self, path, cmp_path=None, dir=False):
1189n/a # Create a tarfile with an empty member named path
1190n/a # and compare the stored name with the original.
1191n/a foo = os.path.join(TEMPDIR, "foo")
1192n/a if not dir:
1193n/a support.create_empty_file(foo)
1194n/a else:
1195n/a os.mkdir(foo)
1196n/a
1197n/a tar = tarfile.open(tmpname, self.mode)
1198n/a try:
1199n/a tar.add(foo, arcname=path)
1200n/a finally:
1201n/a tar.close()
1202n/a
1203n/a tar = tarfile.open(tmpname, "r")
1204n/a try:
1205n/a t = tar.next()
1206n/a finally:
1207n/a tar.close()
1208n/a
1209n/a if not dir:
1210n/a support.unlink(foo)
1211n/a else:
1212n/a support.rmdir(foo)
1213n/a
1214n/a self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1215n/a
1216n/a
1217n/a @support.skip_unless_symlink
1218n/a def test_extractall_symlinks(self):
1219n/a # Test if extractall works properly when tarfile contains symlinks
1220n/a tempdir = os.path.join(TEMPDIR, "testsymlinks")
1221n/a temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1222n/a os.mkdir(tempdir)
1223n/a try:
1224n/a source_file = os.path.join(tempdir,'source')
1225n/a target_file = os.path.join(tempdir,'symlink')
1226n/a with open(source_file,'w') as f:
1227n/a f.write('something\n')
1228n/a os.symlink(source_file, target_file)
1229n/a tar = tarfile.open(temparchive,'w')
1230n/a tar.add(source_file)
1231n/a tar.add(target_file)
1232n/a tar.close()
1233n/a # Let's extract it to the location which contains the symlink
1234n/a tar = tarfile.open(temparchive,'r')
1235n/a # this should not raise OSError: [Errno 17] File exists
1236n/a try:
1237n/a tar.extractall(path=tempdir)
1238n/a except OSError:
1239n/a self.fail("extractall failed with symlinked files")
1240n/a finally:
1241n/a tar.close()
1242n/a finally:
1243n/a support.unlink(temparchive)
1244n/a support.rmtree(tempdir)
1245n/a
1246n/a def test_pathnames(self):
1247n/a self._test_pathname("foo")
1248n/a self._test_pathname(os.path.join("foo", ".", "bar"))
1249n/a self._test_pathname(os.path.join("foo", "..", "bar"))
1250n/a self._test_pathname(os.path.join(".", "foo"))
1251n/a self._test_pathname(os.path.join(".", "foo", "."))
1252n/a self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1253n/a self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1254n/a self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1255n/a self._test_pathname(os.path.join("..", "foo"))
1256n/a self._test_pathname(os.path.join("..", "foo", ".."))
1257n/a self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1258n/a self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1259n/a
1260n/a self._test_pathname("foo" + os.sep + os.sep + "bar")
1261n/a self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1262n/a
1263n/a def test_abs_pathnames(self):
1264n/a if sys.platform == "win32":
1265n/a self._test_pathname("C:\\foo", "foo")
1266n/a else:
1267n/a self._test_pathname("/foo", "foo")
1268n/a self._test_pathname("///foo", "foo")
1269n/a
1270n/a def test_cwd(self):
1271n/a # Test adding the current working directory.
1272n/a with support.change_cwd(TEMPDIR):
1273n/a tar = tarfile.open(tmpname, self.mode)
1274n/a try:
1275n/a tar.add(".")
1276n/a finally:
1277n/a tar.close()
1278n/a
1279n/a tar = tarfile.open(tmpname, "r")
1280n/a try:
1281n/a for t in tar:
1282n/a if t.name != ".":
1283n/a self.assertTrue(t.name.startswith("./"), t.name)
1284n/a finally:
1285n/a tar.close()
1286n/a
1287n/a def test_open_nonwritable_fileobj(self):
1288n/a for exctype in OSError, EOFError, RuntimeError:
1289n/a class BadFile(io.BytesIO):
1290n/a first = True
1291n/a def write(self, data):
1292n/a if self.first:
1293n/a self.first = False
1294n/a raise exctype
1295n/a
1296n/a f = BadFile()
1297n/a with self.assertRaises(exctype):
1298n/a tar = tarfile.open(tmpname, self.mode, fileobj=f,
1299n/a format=tarfile.PAX_FORMAT,
1300n/a pax_headers={'non': 'empty'})
1301n/a self.assertFalse(f.closed)
1302n/a
1303n/aclass GzipWriteTest(GzipTest, WriteTest):
1304n/a pass
1305n/a
1306n/aclass Bz2WriteTest(Bz2Test, WriteTest):
1307n/a pass
1308n/a
1309n/aclass LzmaWriteTest(LzmaTest, WriteTest):
1310n/a pass
1311n/a
1312n/a
1313n/aclass StreamWriteTest(WriteTestBase, unittest.TestCase):
1314n/a
1315n/a prefix = "w|"
1316n/a decompressor = None
1317n/a
1318n/a def test_stream_padding(self):
1319n/a # Test for bug #1543303.
1320n/a tar = tarfile.open(tmpname, self.mode)
1321n/a tar.close()
1322n/a if self.decompressor:
1323n/a dec = self.decompressor()
1324n/a with open(tmpname, "rb") as fobj:
1325n/a data = fobj.read()
1326n/a data = dec.decompress(data)
1327n/a self.assertFalse(dec.unused_data, "found trailing data")
1328n/a else:
1329n/a with self.open(tmpname) as fobj:
1330n/a data = fobj.read()
1331n/a self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1332n/a "incorrect zero padding")
1333n/a
1334n/a @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1335n/a "Missing umask implementation")
1336n/a def test_file_mode(self):
1337n/a # Test for issue #8464: Create files with correct
1338n/a # permissions.
1339n/a if os.path.exists(tmpname):
1340n/a support.unlink(tmpname)
1341n/a
1342n/a original_umask = os.umask(0o022)
1343n/a try:
1344n/a tar = tarfile.open(tmpname, self.mode)
1345n/a tar.close()
1346n/a mode = os.stat(tmpname).st_mode & 0o777
1347n/a self.assertEqual(mode, 0o644, "wrong file permissions")
1348n/a finally:
1349n/a os.umask(original_umask)
1350n/a
1351n/aclass GzipStreamWriteTest(GzipTest, StreamWriteTest):
1352n/a pass
1353n/a
1354n/aclass Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1355n/a decompressor = bz2.BZ2Decompressor if bz2 else None
1356n/a
1357n/aclass LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1358n/a decompressor = lzma.LZMADecompressor if lzma else None
1359n/a
1360n/a
1361n/aclass GNUWriteTest(unittest.TestCase):
1362n/a # This testcase checks for correct creation of GNU Longname
1363n/a # and Longlink extended headers (cp. bug #812325).
1364n/a
1365n/a def _length(self, s):
1366n/a blocks = len(s) // 512 + 1
1367n/a return blocks * 512
1368n/a
1369n/a def _calc_size(self, name, link=None):
1370n/a # Initial tar header
1371n/a count = 512
1372n/a
1373n/a if len(name) > tarfile.LENGTH_NAME:
1374n/a # GNU longname extended header + longname
1375n/a count += 512
1376n/a count += self._length(name)
1377n/a if link is not None and len(link) > tarfile.LENGTH_LINK:
1378n/a # GNU longlink extended header + longlink
1379n/a count += 512
1380n/a count += self._length(link)
1381n/a return count
1382n/a
1383n/a def _test(self, name, link=None):
1384n/a tarinfo = tarfile.TarInfo(name)
1385n/a if link:
1386n/a tarinfo.linkname = link
1387n/a tarinfo.type = tarfile.LNKTYPE
1388n/a
1389n/a tar = tarfile.open(tmpname, "w")
1390n/a try:
1391n/a tar.format = tarfile.GNU_FORMAT
1392n/a tar.addfile(tarinfo)
1393n/a
1394n/a v1 = self._calc_size(name, link)
1395n/a v2 = tar.offset
1396n/a self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1397n/a finally:
1398n/a tar.close()
1399n/a
1400n/a tar = tarfile.open(tmpname)
1401n/a try:
1402n/a member = tar.next()
1403n/a self.assertIsNotNone(member,
1404n/a "unable to read longname member")
1405n/a self.assertEqual(tarinfo.name, member.name,
1406n/a "unable to read longname member")
1407n/a self.assertEqual(tarinfo.linkname, member.linkname,
1408n/a "unable to read longname member")
1409n/a finally:
1410n/a tar.close()
1411n/a
1412n/a def test_longname_1023(self):
1413n/a self._test(("longnam/" * 127) + "longnam")
1414n/a
1415n/a def test_longname_1024(self):
1416n/a self._test(("longnam/" * 127) + "longname")
1417n/a
1418n/a def test_longname_1025(self):
1419n/a self._test(("longnam/" * 127) + "longname_")
1420n/a
1421n/a def test_longlink_1023(self):
1422n/a self._test("name", ("longlnk/" * 127) + "longlnk")
1423n/a
1424n/a def test_longlink_1024(self):
1425n/a self._test("name", ("longlnk/" * 127) + "longlink")
1426n/a
1427n/a def test_longlink_1025(self):
1428n/a self._test("name", ("longlnk/" * 127) + "longlink_")
1429n/a
1430n/a def test_longnamelink_1023(self):
1431n/a self._test(("longnam/" * 127) + "longnam",
1432n/a ("longlnk/" * 127) + "longlnk")
1433n/a
1434n/a def test_longnamelink_1024(self):
1435n/a self._test(("longnam/" * 127) + "longname",
1436n/a ("longlnk/" * 127) + "longlink")
1437n/a
1438n/a def test_longnamelink_1025(self):
1439n/a self._test(("longnam/" * 127) + "longname_",
1440n/a ("longlnk/" * 127) + "longlink_")
1441n/a
1442n/a
1443n/aclass CreateTest(WriteTestBase, unittest.TestCase):
1444n/a
1445n/a prefix = "x:"
1446n/a
1447n/a file_path = os.path.join(TEMPDIR, "spameggs42")
1448n/a
1449n/a def setUp(self):
1450n/a support.unlink(tmpname)
1451n/a
1452n/a @classmethod
1453n/a def setUpClass(cls):
1454n/a with open(cls.file_path, "wb") as fobj:
1455n/a fobj.write(b"aaa")
1456n/a
1457n/a @classmethod
1458n/a def tearDownClass(cls):
1459n/a support.unlink(cls.file_path)
1460n/a
1461n/a def test_create(self):
1462n/a with tarfile.open(tmpname, self.mode) as tobj:
1463n/a tobj.add(self.file_path)
1464n/a
1465n/a with self.taropen(tmpname) as tobj:
1466n/a names = tobj.getnames()
1467n/a self.assertEqual(len(names), 1)
1468n/a self.assertIn('spameggs42', names[0])
1469n/a
1470n/a def test_create_existing(self):
1471n/a with tarfile.open(tmpname, self.mode) as tobj:
1472n/a tobj.add(self.file_path)
1473n/a
1474n/a with self.assertRaises(FileExistsError):
1475n/a tobj = tarfile.open(tmpname, self.mode)
1476n/a
1477n/a with self.taropen(tmpname) as tobj:
1478n/a names = tobj.getnames()
1479n/a self.assertEqual(len(names), 1)
1480n/a self.assertIn('spameggs42', names[0])
1481n/a
1482n/a def test_create_taropen(self):
1483n/a with self.taropen(tmpname, "x") as tobj:
1484n/a tobj.add(self.file_path)
1485n/a
1486n/a with self.taropen(tmpname) as tobj:
1487n/a names = tobj.getnames()
1488n/a self.assertEqual(len(names), 1)
1489n/a self.assertIn('spameggs42', names[0])
1490n/a
1491n/a def test_create_existing_taropen(self):
1492n/a with self.taropen(tmpname, "x") as tobj:
1493n/a tobj.add(self.file_path)
1494n/a
1495n/a with self.assertRaises(FileExistsError):
1496n/a with self.taropen(tmpname, "x"):
1497n/a pass
1498n/a
1499n/a with self.taropen(tmpname) as tobj:
1500n/a names = tobj.getnames()
1501n/a self.assertEqual(len(names), 1)
1502n/a self.assertIn("spameggs42", names[0])
1503n/a
1504n/a
1505n/aclass GzipCreateTest(GzipTest, CreateTest):
1506n/a pass
1507n/a
1508n/a
1509n/aclass Bz2CreateTest(Bz2Test, CreateTest):
1510n/a pass
1511n/a
1512n/a
1513n/aclass LzmaCreateTest(LzmaTest, CreateTest):
1514n/a pass
1515n/a
1516n/a
1517n/aclass CreateWithXModeTest(CreateTest):
1518n/a
1519n/a prefix = "x"
1520n/a
1521n/a test_create_taropen = None
1522n/a test_create_existing_taropen = None
1523n/a
1524n/a
1525n/a@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1526n/aclass HardlinkTest(unittest.TestCase):
1527n/a # Test the creation of LNKTYPE (hardlink) members in an archive.
1528n/a
1529n/a def setUp(self):
1530n/a self.foo = os.path.join(TEMPDIR, "foo")
1531n/a self.bar = os.path.join(TEMPDIR, "bar")
1532n/a
1533n/a with open(self.foo, "wb") as fobj:
1534n/a fobj.write(b"foo")
1535n/a
1536n/a os.link(self.foo, self.bar)
1537n/a
1538n/a self.tar = tarfile.open(tmpname, "w")
1539n/a self.tar.add(self.foo)
1540n/a
1541n/a def tearDown(self):
1542n/a self.tar.close()
1543n/a support.unlink(self.foo)
1544n/a support.unlink(self.bar)
1545n/a
1546n/a def test_add_twice(self):
1547n/a # The same name will be added as a REGTYPE every
1548n/a # time regardless of st_nlink.
1549n/a tarinfo = self.tar.gettarinfo(self.foo)
1550n/a self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1551n/a "add file as regular failed")
1552n/a
1553n/a def test_add_hardlink(self):
1554n/a tarinfo = self.tar.gettarinfo(self.bar)
1555n/a self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1556n/a "add file as hardlink failed")
1557n/a
1558n/a def test_dereference_hardlink(self):
1559n/a self.tar.dereference = True
1560n/a tarinfo = self.tar.gettarinfo(self.bar)
1561n/a self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1562n/a "dereferencing hardlink failed")
1563n/a
1564n/a
1565n/aclass PaxWriteTest(GNUWriteTest):
1566n/a
1567n/a def _test(self, name, link=None):
1568n/a # See GNUWriteTest.
1569n/a tarinfo = tarfile.TarInfo(name)
1570n/a if link:
1571n/a tarinfo.linkname = link
1572n/a tarinfo.type = tarfile.LNKTYPE
1573n/a
1574n/a tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1575n/a try:
1576n/a tar.addfile(tarinfo)
1577n/a finally:
1578n/a tar.close()
1579n/a
1580n/a tar = tarfile.open(tmpname)
1581n/a try:
1582n/a if link:
1583n/a l = tar.getmembers()[0].linkname
1584n/a self.assertEqual(link, l, "PAX longlink creation failed")
1585n/a else:
1586n/a n = tar.getmembers()[0].name
1587n/a self.assertEqual(name, n, "PAX longname creation failed")
1588n/a finally:
1589n/a tar.close()
1590n/a
1591n/a def test_pax_global_header(self):
1592n/a pax_headers = {
1593n/a "foo": "bar",
1594n/a "uid": "0",
1595n/a "mtime": "1.23",
1596n/a "test": "\xe4\xf6\xfc",
1597n/a "\xe4\xf6\xfc": "test"}
1598n/a
1599n/a tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1600n/a pax_headers=pax_headers)
1601n/a try:
1602n/a tar.addfile(tarfile.TarInfo("test"))
1603n/a finally:
1604n/a tar.close()
1605n/a
1606n/a # Test if the global header was written correctly.
1607n/a tar = tarfile.open(tmpname, encoding="iso8859-1")
1608n/a try:
1609n/a self.assertEqual(tar.pax_headers, pax_headers)
1610n/a self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1611n/a # Test if all the fields are strings.
1612n/a for key, val in tar.pax_headers.items():
1613n/a self.assertIsNot(type(key), bytes)
1614n/a self.assertIsNot(type(val), bytes)
1615n/a if key in tarfile.PAX_NUMBER_FIELDS:
1616n/a try:
1617n/a tarfile.PAX_NUMBER_FIELDS[key](val)
1618n/a except (TypeError, ValueError):
1619n/a self.fail("unable to convert pax header field")
1620n/a finally:
1621n/a tar.close()
1622n/a
1623n/a def test_pax_extended_header(self):
1624n/a # The fields from the pax header have priority over the
1625n/a # TarInfo.
1626n/a pax_headers = {"path": "foo", "uid": "123"}
1627n/a
1628n/a tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1629n/a encoding="iso8859-1")
1630n/a try:
1631n/a t = tarfile.TarInfo()
1632n/a t.name = "\xe4\xf6\xfc" # non-ASCII
1633n/a t.uid = 8**8 # too large
1634n/a t.pax_headers = pax_headers
1635n/a tar.addfile(t)
1636n/a finally:
1637n/a tar.close()
1638n/a
1639n/a tar = tarfile.open(tmpname, encoding="iso8859-1")
1640n/a try:
1641n/a t = tar.getmembers()[0]
1642n/a self.assertEqual(t.pax_headers, pax_headers)
1643n/a self.assertEqual(t.name, "foo")
1644n/a self.assertEqual(t.uid, 123)
1645n/a finally:
1646n/a tar.close()
1647n/a
1648n/a
1649n/aclass UnicodeTest:
1650n/a
1651n/a def test_iso8859_1_filename(self):
1652n/a self._test_unicode_filename("iso8859-1")
1653n/a
1654n/a def test_utf7_filename(self):
1655n/a self._test_unicode_filename("utf7")
1656n/a
1657n/a def test_utf8_filename(self):
1658n/a self._test_unicode_filename("utf-8")
1659n/a
1660n/a def _test_unicode_filename(self, encoding):
1661n/a tar = tarfile.open(tmpname, "w", format=self.format,
1662n/a encoding=encoding, errors="strict")
1663n/a try:
1664n/a name = "\xe4\xf6\xfc"
1665n/a tar.addfile(tarfile.TarInfo(name))
1666n/a finally:
1667n/a tar.close()
1668n/a
1669n/a tar = tarfile.open(tmpname, encoding=encoding)
1670n/a try:
1671n/a self.assertEqual(tar.getmembers()[0].name, name)
1672n/a finally:
1673n/a tar.close()
1674n/a
1675n/a def test_unicode_filename_error(self):
1676n/a tar = tarfile.open(tmpname, "w", format=self.format,
1677n/a encoding="ascii", errors="strict")
1678n/a try:
1679n/a tarinfo = tarfile.TarInfo()
1680n/a
1681n/a tarinfo.name = "\xe4\xf6\xfc"
1682n/a self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1683n/a
1684n/a tarinfo.name = "foo"
1685n/a tarinfo.uname = "\xe4\xf6\xfc"
1686n/a self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1687n/a finally:
1688n/a tar.close()
1689n/a
1690n/a def test_unicode_argument(self):
1691n/a tar = tarfile.open(tarname, "r",
1692n/a encoding="iso8859-1", errors="strict")
1693n/a try:
1694n/a for t in tar:
1695n/a self.assertIs(type(t.name), str)
1696n/a self.assertIs(type(t.linkname), str)
1697n/a self.assertIs(type(t.uname), str)
1698n/a self.assertIs(type(t.gname), str)
1699n/a finally:
1700n/a tar.close()
1701n/a
1702n/a def test_uname_unicode(self):
1703n/a t = tarfile.TarInfo("foo")
1704n/a t.uname = "\xe4\xf6\xfc"
1705n/a t.gname = "\xe4\xf6\xfc"
1706n/a
1707n/a tar = tarfile.open(tmpname, mode="w", format=self.format,
1708n/a encoding="iso8859-1")
1709n/a try:
1710n/a tar.addfile(t)
1711n/a finally:
1712n/a tar.close()
1713n/a
1714n/a tar = tarfile.open(tmpname, encoding="iso8859-1")
1715n/a try:
1716n/a t = tar.getmember("foo")
1717n/a self.assertEqual(t.uname, "\xe4\xf6\xfc")
1718n/a self.assertEqual(t.gname, "\xe4\xf6\xfc")
1719n/a
1720n/a if self.format != tarfile.PAX_FORMAT:
1721n/a tar.close()
1722n/a tar = tarfile.open(tmpname, encoding="ascii")
1723n/a t = tar.getmember("foo")
1724n/a self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1725n/a self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1726n/a finally:
1727n/a tar.close()
1728n/a
1729n/a
1730n/aclass UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1731n/a
1732n/a format = tarfile.USTAR_FORMAT
1733n/a
1734n/a # Test whether the utf-8 encoded version of a filename exceeds the 100
1735n/a # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1736n/a # bytes).
1737n/a def test_unicode_name1(self):
1738n/a self._test_ustar_name("0123456789" * 10)
1739n/a self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1740n/a self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1741n/a self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1742n/a
1743n/a def test_unicode_name2(self):
1744n/a self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1745n/a self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1746n/a
1747n/a # Test whether the utf-8 encoded version of a filename exceeds the 155
1748n/a # bytes prefix + '/' + 100 bytes name limit.
1749n/a def test_unicode_longname1(self):
1750n/a self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1751n/a self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1752n/a self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1753n/a self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1754n/a
1755n/a def test_unicode_longname2(self):
1756n/a self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1757n/a self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1758n/a
1759n/a def test_unicode_longname3(self):
1760n/a self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1761n/a self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1762n/a self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1763n/a
1764n/a def test_unicode_longname4(self):
1765n/a self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1766n/a self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1767n/a
1768n/a def _test_ustar_name(self, name, exc=None):
1769n/a with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1770n/a t = tarfile.TarInfo(name)
1771n/a if exc is None:
1772n/a tar.addfile(t)
1773n/a else:
1774n/a self.assertRaises(exc, tar.addfile, t)
1775n/a
1776n/a if exc is None:
1777n/a with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1778n/a for t in tar:
1779n/a self.assertEqual(name, t.name)
1780n/a break
1781n/a
1782n/a # Test the same as above for the 100 bytes link field.
1783n/a def test_unicode_link1(self):
1784n/a self._test_ustar_link("0123456789" * 10)
1785n/a self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1786n/a self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1787n/a self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1788n/a
1789n/a def test_unicode_link2(self):
1790n/a self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1791n/a self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1792n/a
1793n/a def _test_ustar_link(self, name, exc=None):
1794n/a with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1795n/a t = tarfile.TarInfo("foo")
1796n/a t.linkname = name
1797n/a if exc is None:
1798n/a tar.addfile(t)
1799n/a else:
1800n/a self.assertRaises(exc, tar.addfile, t)
1801n/a
1802n/a if exc is None:
1803n/a with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1804n/a for t in tar:
1805n/a self.assertEqual(name, t.linkname)
1806n/a break
1807n/a
1808n/a
1809n/aclass GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1810n/a
1811n/a format = tarfile.GNU_FORMAT
1812n/a
1813n/a def test_bad_pax_header(self):
1814n/a # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1815n/a # without a hdrcharset=BINARY header.
1816n/a for encoding, name in (
1817n/a ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1818n/a ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1819n/a with tarfile.open(tarname, encoding=encoding,
1820n/a errors="surrogateescape") as tar:
1821n/a try:
1822n/a t = tar.getmember(name)
1823n/a except KeyError:
1824n/a self.fail("unable to read bad GNU tar pax header")
1825n/a
1826n/a
1827n/aclass PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1828n/a
1829n/a format = tarfile.PAX_FORMAT
1830n/a
1831n/a # PAX_FORMAT ignores encoding in write mode.
1832n/a test_unicode_filename_error = None
1833n/a
1834n/a def test_binary_header(self):
1835n/a # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1836n/a for encoding, name in (
1837n/a ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1838n/a ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1839n/a with tarfile.open(tarname, encoding=encoding,
1840n/a errors="surrogateescape") as tar:
1841n/a try:
1842n/a t = tar.getmember(name)
1843n/a except KeyError:
1844n/a self.fail("unable to read POSIX.1-2008 binary header")
1845n/a
1846n/a
1847n/aclass AppendTestBase:
1848n/a # Test append mode (cp. patch #1652681).
1849n/a
1850n/a def setUp(self):
1851n/a self.tarname = tmpname
1852n/a if os.path.exists(self.tarname):
1853n/a support.unlink(self.tarname)
1854n/a
1855n/a def _create_testtar(self, mode="w:"):
1856n/a with tarfile.open(tarname, encoding="iso8859-1") as src:
1857n/a t = src.getmember("ustar/regtype")
1858n/a t.name = "foo"
1859n/a with src.extractfile(t) as f:
1860n/a with tarfile.open(self.tarname, mode) as tar:
1861n/a tar.addfile(t, f)
1862n/a
1863n/a def test_append_compressed(self):
1864n/a self._create_testtar("w:" + self.suffix)
1865n/a self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1866n/a
1867n/aclass AppendTest(AppendTestBase, unittest.TestCase):
1868n/a test_append_compressed = None
1869n/a
1870n/a def _add_testfile(self, fileobj=None):
1871n/a with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1872n/a tar.addfile(tarfile.TarInfo("bar"))
1873n/a
1874n/a def _test(self, names=["bar"], fileobj=None):
1875n/a with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1876n/a self.assertEqual(tar.getnames(), names)
1877n/a
1878n/a def test_non_existing(self):
1879n/a self._add_testfile()
1880n/a self._test()
1881n/a
1882n/a def test_empty(self):
1883n/a tarfile.open(self.tarname, "w:").close()
1884n/a self._add_testfile()
1885n/a self._test()
1886n/a
1887n/a def test_empty_fileobj(self):
1888n/a fobj = io.BytesIO(b"\0" * 1024)
1889n/a self._add_testfile(fobj)
1890n/a fobj.seek(0)
1891n/a self._test(fileobj=fobj)
1892n/a
1893n/a def test_fileobj(self):
1894n/a self._create_testtar()
1895n/a with open(self.tarname, "rb") as fobj:
1896n/a data = fobj.read()
1897n/a fobj = io.BytesIO(data)
1898n/a self._add_testfile(fobj)
1899n/a fobj.seek(0)
1900n/a self._test(names=["foo", "bar"], fileobj=fobj)
1901n/a
1902n/a def test_existing(self):
1903n/a self._create_testtar()
1904n/a self._add_testfile()
1905n/a self._test(names=["foo", "bar"])
1906n/a
1907n/a # Append mode is supposed to fail if the tarfile to append to
1908n/a # does not end with a zero block.
1909n/a def _test_error(self, data):
1910n/a with open(self.tarname, "wb") as fobj:
1911n/a fobj.write(data)
1912n/a self.assertRaises(tarfile.ReadError, self._add_testfile)
1913n/a
1914n/a def test_null(self):
1915n/a self._test_error(b"")
1916n/a
1917n/a def test_incomplete(self):
1918n/a self._test_error(b"\0" * 13)
1919n/a
1920n/a def test_premature_eof(self):
1921n/a data = tarfile.TarInfo("foo").tobuf()
1922n/a self._test_error(data)
1923n/a
1924n/a def test_trailing_garbage(self):
1925n/a data = tarfile.TarInfo("foo").tobuf()
1926n/a self._test_error(data + b"\0" * 13)
1927n/a
1928n/a def test_invalid(self):
1929n/a self._test_error(b"a" * 512)
1930n/a
1931n/aclass GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
1932n/a pass
1933n/a
1934n/aclass Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
1935n/a pass
1936n/a
1937n/aclass LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
1938n/a pass
1939n/a
1940n/a
1941n/aclass LimitsTest(unittest.TestCase):
1942n/a
1943n/a def test_ustar_limits(self):
1944n/a # 100 char name
1945n/a tarinfo = tarfile.TarInfo("0123456789" * 10)
1946n/a tarinfo.tobuf(tarfile.USTAR_FORMAT)
1947n/a
1948n/a # 101 char name that cannot be stored
1949n/a tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1950n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1951n/a
1952n/a # 256 char name with a slash at pos 156
1953n/a tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1954n/a tarinfo.tobuf(tarfile.USTAR_FORMAT)
1955n/a
1956n/a # 256 char name that cannot be stored
1957n/a tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1958n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1959n/a
1960n/a # 512 char name
1961n/a tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1962n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1963n/a
1964n/a # 512 char linkname
1965n/a tarinfo = tarfile.TarInfo("longlink")
1966n/a tarinfo.linkname = "123/" * 126 + "longname"
1967n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1968n/a
1969n/a # uid > 8 digits
1970n/a tarinfo = tarfile.TarInfo("name")
1971n/a tarinfo.uid = 0o10000000
1972n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1973n/a
1974n/a def test_gnu_limits(self):
1975n/a tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1976n/a tarinfo.tobuf(tarfile.GNU_FORMAT)
1977n/a
1978n/a tarinfo = tarfile.TarInfo("longlink")
1979n/a tarinfo.linkname = "123/" * 126 + "longname"
1980n/a tarinfo.tobuf(tarfile.GNU_FORMAT)
1981n/a
1982n/a # uid >= 256 ** 7
1983n/a tarinfo = tarfile.TarInfo("name")
1984n/a tarinfo.uid = 0o4000000000000000000
1985n/a self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1986n/a
1987n/a def test_pax_limits(self):
1988n/a tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1989n/a tarinfo.tobuf(tarfile.PAX_FORMAT)
1990n/a
1991n/a tarinfo = tarfile.TarInfo("longlink")
1992n/a tarinfo.linkname = "123/" * 126 + "longname"
1993n/a tarinfo.tobuf(tarfile.PAX_FORMAT)
1994n/a
1995n/a tarinfo = tarfile.TarInfo("name")
1996n/a tarinfo.uid = 0o4000000000000000000
1997n/a tarinfo.tobuf(tarfile.PAX_FORMAT)
1998n/a
1999n/a
2000n/aclass MiscTest(unittest.TestCase):
2001n/a
2002n/a def test_char_fields(self):
2003n/a self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2004n/a b"foo\0\0\0\0\0")
2005n/a self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2006n/a b"foo")
2007n/a self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2008n/a "foo")
2009n/a self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2010n/a "foo")
2011n/a
2012n/a def test_read_number_fields(self):
2013n/a # Issue 13158: Test if GNU tar specific base-256 number fields
2014n/a # are decoded correctly.
2015n/a self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2016n/a self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2017n/a self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2018n/a 0o10000000)
2019n/a self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2020n/a 0xffffffff)
2021n/a self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2022n/a -1)
2023n/a self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2024n/a -100)
2025n/a self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2026n/a -0x100000000000000)
2027n/a
2028n/a # Issue 24514: Test if empty number fields are converted to zero.
2029n/a self.assertEqual(tarfile.nti(b"\0"), 0)
2030n/a self.assertEqual(tarfile.nti(b" \0"), 0)
2031n/a
2032n/a def test_write_number_fields(self):
2033n/a self.assertEqual(tarfile.itn(1), b"0000001\x00")
2034n/a self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2035n/a self.assertEqual(tarfile.itn(0o10000000),
2036n/a b"\x80\x00\x00\x00\x00\x20\x00\x00")
2037n/a self.assertEqual(tarfile.itn(0xffffffff),
2038n/a b"\x80\x00\x00\x00\xff\xff\xff\xff")
2039n/a self.assertEqual(tarfile.itn(-1),
2040n/a b"\xff\xff\xff\xff\xff\xff\xff\xff")
2041n/a self.assertEqual(tarfile.itn(-100),
2042n/a b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2043n/a self.assertEqual(tarfile.itn(-0x100000000000000),
2044n/a b"\xff\x00\x00\x00\x00\x00\x00\x00")
2045n/a
2046n/a def test_number_field_limits(self):
2047n/a with self.assertRaises(ValueError):
2048n/a tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2049n/a with self.assertRaises(ValueError):
2050n/a tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2051n/a with self.assertRaises(ValueError):
2052n/a tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2053n/a with self.assertRaises(ValueError):
2054n/a tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2055n/a
2056n/a def test__all__(self):
2057n/a blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2058n/a 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2059n/a 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2060n/a 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2061n/a 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2062n/a 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2063n/a 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2064n/a 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2065n/a 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2066n/a 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2067n/a 'filemode',
2068n/a 'EmptyHeaderError', 'TruncatedHeaderError',
2069n/a 'EOFHeaderError', 'InvalidHeaderError',
2070n/a 'SubsequentHeaderError', 'ExFileObject',
2071n/a 'main'}
2072n/a support.check__all__(self, tarfile, blacklist=blacklist)
2073n/a
2074n/a
2075n/aclass CommandLineTest(unittest.TestCase):
2076n/a
2077n/a def tarfilecmd(self, *args, **kwargs):
2078n/a rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2079n/a **kwargs)
2080n/a return out.replace(os.linesep.encode(), b'\n')
2081n/a
2082n/a def tarfilecmd_failure(self, *args):
2083n/a return script_helper.assert_python_failure('-m', 'tarfile', *args)
2084n/a
2085n/a def make_simple_tarfile(self, tar_name):
2086n/a files = [support.findfile('tokenize_tests.txt'),
2087n/a support.findfile('tokenize_tests-no-coding-cookie-'
2088n/a 'and-utf8-bom-sig-only.txt')]
2089n/a self.addCleanup(support.unlink, tar_name)
2090n/a with tarfile.open(tar_name, 'w') as tf:
2091n/a for tardata in files:
2092n/a tf.add(tardata, arcname=os.path.basename(tardata))
2093n/a
2094n/a def test_test_command(self):
2095n/a for tar_name in testtarnames:
2096n/a for opt in '-t', '--test':
2097n/a out = self.tarfilecmd(opt, tar_name)
2098n/a self.assertEqual(out, b'')
2099n/a
2100n/a def test_test_command_verbose(self):
2101n/a for tar_name in testtarnames:
2102n/a for opt in '-v', '--verbose':
2103n/a out = self.tarfilecmd(opt, '-t', tar_name)
2104n/a self.assertIn(b'is a tar archive.\n', out)
2105n/a
2106n/a def test_test_command_invalid_file(self):
2107n/a zipname = support.findfile('zipdir.zip')
2108n/a rc, out, err = self.tarfilecmd_failure('-t', zipname)
2109n/a self.assertIn(b' is not a tar archive.', err)
2110n/a self.assertEqual(out, b'')
2111n/a self.assertEqual(rc, 1)
2112n/a
2113n/a for tar_name in testtarnames:
2114n/a with self.subTest(tar_name=tar_name):
2115n/a with open(tar_name, 'rb') as f:
2116n/a data = f.read()
2117n/a try:
2118n/a with open(tmpname, 'wb') as f:
2119n/a f.write(data[:511])
2120n/a rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2121n/a self.assertEqual(out, b'')
2122n/a self.assertEqual(rc, 1)
2123n/a finally:
2124n/a support.unlink(tmpname)
2125n/a
2126n/a def test_list_command(self):
2127n/a for tar_name in testtarnames:
2128n/a with support.captured_stdout() as t:
2129n/a with tarfile.open(tar_name, 'r') as tf:
2130n/a tf.list(verbose=False)
2131n/a expected = t.getvalue().encode('ascii', 'backslashreplace')
2132n/a for opt in '-l', '--list':
2133n/a out = self.tarfilecmd(opt, tar_name,
2134n/a PYTHONIOENCODING='ascii')
2135n/a self.assertEqual(out, expected)
2136n/a
2137n/a def test_list_command_verbose(self):
2138n/a for tar_name in testtarnames:
2139n/a with support.captured_stdout() as t:
2140n/a with tarfile.open(tar_name, 'r') as tf:
2141n/a tf.list(verbose=True)
2142n/a expected = t.getvalue().encode('ascii', 'backslashreplace')
2143n/a for opt in '-v', '--verbose':
2144n/a out = self.tarfilecmd(opt, '-l', tar_name,
2145n/a PYTHONIOENCODING='ascii')
2146n/a self.assertEqual(out, expected)
2147n/a
2148n/a def test_list_command_invalid_file(self):
2149n/a zipname = support.findfile('zipdir.zip')
2150n/a rc, out, err = self.tarfilecmd_failure('-l', zipname)
2151n/a self.assertIn(b' is not a tar archive.', err)
2152n/a self.assertEqual(out, b'')
2153n/a self.assertEqual(rc, 1)
2154n/a
2155n/a def test_create_command(self):
2156n/a files = [support.findfile('tokenize_tests.txt'),
2157n/a support.findfile('tokenize_tests-no-coding-cookie-'
2158n/a 'and-utf8-bom-sig-only.txt')]
2159n/a for opt in '-c', '--create':
2160n/a try:
2161n/a out = self.tarfilecmd(opt, tmpname, *files)
2162n/a self.assertEqual(out, b'')
2163n/a with tarfile.open(tmpname) as tar:
2164n/a tar.getmembers()
2165n/a finally:
2166n/a support.unlink(tmpname)
2167n/a
2168n/a def test_create_command_verbose(self):
2169n/a files = [support.findfile('tokenize_tests.txt'),
2170n/a support.findfile('tokenize_tests-no-coding-cookie-'
2171n/a 'and-utf8-bom-sig-only.txt')]
2172n/a for opt in '-v', '--verbose':
2173n/a try:
2174n/a out = self.tarfilecmd(opt, '-c', tmpname, *files)
2175n/a self.assertIn(b' file created.', out)
2176n/a with tarfile.open(tmpname) as tar:
2177n/a tar.getmembers()
2178n/a finally:
2179n/a support.unlink(tmpname)
2180n/a
2181n/a def test_create_command_dotless_filename(self):
2182n/a files = [support.findfile('tokenize_tests.txt')]
2183n/a try:
2184n/a out = self.tarfilecmd('-c', dotlessname, *files)
2185n/a self.assertEqual(out, b'')
2186n/a with tarfile.open(dotlessname) as tar:
2187n/a tar.getmembers()
2188n/a finally:
2189n/a support.unlink(dotlessname)
2190n/a
2191n/a def test_create_command_dot_started_filename(self):
2192n/a tar_name = os.path.join(TEMPDIR, ".testtar")
2193n/a files = [support.findfile('tokenize_tests.txt')]
2194n/a try:
2195n/a out = self.tarfilecmd('-c', tar_name, *files)
2196n/a self.assertEqual(out, b'')
2197n/a with tarfile.open(tar_name) as tar:
2198n/a tar.getmembers()
2199n/a finally:
2200n/a support.unlink(tar_name)
2201n/a
2202n/a def test_create_command_compressed(self):
2203n/a files = [support.findfile('tokenize_tests.txt'),
2204n/a support.findfile('tokenize_tests-no-coding-cookie-'
2205n/a 'and-utf8-bom-sig-only.txt')]
2206n/a for filetype in (GzipTest, Bz2Test, LzmaTest):
2207n/a if not filetype.open:
2208n/a continue
2209n/a try:
2210n/a tar_name = tmpname + '.' + filetype.suffix
2211n/a out = self.tarfilecmd('-c', tar_name, *files)
2212n/a with filetype.taropen(tar_name) as tar:
2213n/a tar.getmembers()
2214n/a finally:
2215n/a support.unlink(tar_name)
2216n/a
2217n/a def test_extract_command(self):
2218n/a self.make_simple_tarfile(tmpname)
2219n/a for opt in '-e', '--extract':
2220n/a try:
2221n/a with support.temp_cwd(tarextdir):
2222n/a out = self.tarfilecmd(opt, tmpname)
2223n/a self.assertEqual(out, b'')
2224n/a finally:
2225n/a support.rmtree(tarextdir)
2226n/a
2227n/a def test_extract_command_verbose(self):
2228n/a self.make_simple_tarfile(tmpname)
2229n/a for opt in '-v', '--verbose':
2230n/a try:
2231n/a with support.temp_cwd(tarextdir):
2232n/a out = self.tarfilecmd(opt, '-e', tmpname)
2233n/a self.assertIn(b' file is extracted.', out)
2234n/a finally:
2235n/a support.rmtree(tarextdir)
2236n/a
2237n/a def test_extract_command_different_directory(self):
2238n/a self.make_simple_tarfile(tmpname)
2239n/a try:
2240n/a with support.temp_cwd(tarextdir):
2241n/a out = self.tarfilecmd('-e', tmpname, 'spamdir')
2242n/a self.assertEqual(out, b'')
2243n/a finally:
2244n/a support.rmtree(tarextdir)
2245n/a
2246n/a def test_extract_command_invalid_file(self):
2247n/a zipname = support.findfile('zipdir.zip')
2248n/a with support.temp_cwd(tarextdir):
2249n/a rc, out, err = self.tarfilecmd_failure('-e', zipname)
2250n/a self.assertIn(b' is not a tar archive.', err)
2251n/a self.assertEqual(out, b'')
2252n/a self.assertEqual(rc, 1)
2253n/a
2254n/a
2255n/aclass ContextManagerTest(unittest.TestCase):
2256n/a
2257n/a def test_basic(self):
2258n/a with tarfile.open(tarname) as tar:
2259n/a self.assertFalse(tar.closed, "closed inside runtime context")
2260n/a self.assertTrue(tar.closed, "context manager failed")
2261n/a
2262n/a def test_closed(self):
2263n/a # The __enter__() method is supposed to raise OSError
2264n/a # if the TarFile object is already closed.
2265n/a tar = tarfile.open(tarname)
2266n/a tar.close()
2267n/a with self.assertRaises(OSError):
2268n/a with tar:
2269n/a pass
2270n/a
2271n/a def test_exception(self):
2272n/a # Test if the OSError exception is passed through properly.
2273n/a with self.assertRaises(Exception) as exc:
2274n/a with tarfile.open(tarname) as tar:
2275n/a raise OSError
2276n/a self.assertIsInstance(exc.exception, OSError,
2277n/a "wrong exception raised in context manager")
2278n/a self.assertTrue(tar.closed, "context manager failed")
2279n/a
2280n/a def test_no_eof(self):
2281n/a # __exit__() must not write end-of-archive blocks if an
2282n/a # exception was raised.
2283n/a try:
2284n/a with tarfile.open(tmpname, "w") as tar:
2285n/a raise Exception
2286n/a except:
2287n/a pass
2288n/a self.assertEqual(os.path.getsize(tmpname), 0,
2289n/a "context manager wrote an end-of-archive block")
2290n/a self.assertTrue(tar.closed, "context manager failed")
2291n/a
2292n/a def test_eof(self):
2293n/a # __exit__() must write end-of-archive blocks, i.e. call
2294n/a # TarFile.close() if there was no error.
2295n/a with tarfile.open(tmpname, "w"):
2296n/a pass
2297n/a self.assertNotEqual(os.path.getsize(tmpname), 0,
2298n/a "context manager wrote no end-of-archive block")
2299n/a
2300n/a def test_fileobj(self):
2301n/a # Test that __exit__() did not close the external file
2302n/a # object.
2303n/a with open(tmpname, "wb") as fobj:
2304n/a try:
2305n/a with tarfile.open(fileobj=fobj, mode="w") as tar:
2306n/a raise Exception
2307n/a except:
2308n/a pass
2309n/a self.assertFalse(fobj.closed, "external file object was closed")
2310n/a self.assertTrue(tar.closed, "context manager failed")
2311n/a
2312n/a
2313n/a@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2314n/aclass LinkEmulationTest(ReadTest, unittest.TestCase):
2315n/a
2316n/a # Test for issue #8741 regression. On platforms that do not support
2317n/a # symbolic or hard links tarfile tries to extract these types of members
2318n/a # as the regular files they point to.
2319n/a def _test_link_extraction(self, name):
2320n/a self.tar.extract(name, TEMPDIR)
2321n/a with open(os.path.join(TEMPDIR, name), "rb") as f:
2322n/a data = f.read()
2323n/a self.assertEqual(md5sum(data), md5_regtype)
2324n/a
2325n/a # See issues #1578269, #8879, and #17689 for some history on these skips
2326n/a @unittest.skipIf(hasattr(os.path, "islink"),
2327n/a "Skip emulation - has os.path.islink but not os.link")
2328n/a def test_hardlink_extraction1(self):
2329n/a self._test_link_extraction("ustar/lnktype")
2330n/a
2331n/a @unittest.skipIf(hasattr(os.path, "islink"),
2332n/a "Skip emulation - has os.path.islink but not os.link")
2333n/a def test_hardlink_extraction2(self):
2334n/a self._test_link_extraction("./ustar/linktest2/lnktype")
2335n/a
2336n/a @unittest.skipIf(hasattr(os, "symlink"),
2337n/a "Skip emulation if symlink exists")
2338n/a def test_symlink_extraction1(self):
2339n/a self._test_link_extraction("ustar/symtype")
2340n/a
2341n/a @unittest.skipIf(hasattr(os, "symlink"),
2342n/a "Skip emulation if symlink exists")
2343n/a def test_symlink_extraction2(self):
2344n/a self._test_link_extraction("./ustar/linktest2/symtype")
2345n/a
2346n/a
2347n/aclass Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2348n/a # Issue5068: The _BZ2Proxy.read() method loops forever
2349n/a # on an empty or partial bzipped file.
2350n/a
2351n/a def _test_partial_input(self, mode):
2352n/a class MyBytesIO(io.BytesIO):
2353n/a hit_eof = False
2354n/a def read(self, n):
2355n/a if self.hit_eof:
2356n/a raise AssertionError("infinite loop detected in "
2357n/a "tarfile.open()")
2358n/a self.hit_eof = self.tell() == len(self.getvalue())
2359n/a return super(MyBytesIO, self).read(n)
2360n/a def seek(self, *args):
2361n/a self.hit_eof = False
2362n/a return super(MyBytesIO, self).seek(*args)
2363n/a
2364n/a data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2365n/a for x in range(len(data) + 1):
2366n/a try:
2367n/a tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2368n/a except tarfile.ReadError:
2369n/a pass # we have no interest in ReadErrors
2370n/a
2371n/a def test_partial_input(self):
2372n/a self._test_partial_input("r")
2373n/a
2374n/a def test_partial_input_bz2(self):
2375n/a self._test_partial_input("r:bz2")
2376n/a
2377n/a
2378n/adef root_is_uid_gid_0():
2379n/a try:
2380n/a import pwd, grp
2381n/a except ImportError:
2382n/a return False
2383n/a if pwd.getpwuid(0)[0] != 'root':
2384n/a return False
2385n/a if grp.getgrgid(0)[0] != 'root':
2386n/a return False
2387n/a return True
2388n/a
2389n/a
2390n/a@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2391n/a@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2392n/aclass NumericOwnerTest(unittest.TestCase):
2393n/a # mock the following:
2394n/a # os.chown: so we can test what's being called
2395n/a # os.chmod: so the modes are not actually changed. if they are, we can't
2396n/a # delete the files/directories
2397n/a # os.geteuid: so we can lie and say we're root (uid = 0)
2398n/a
2399n/a @staticmethod
2400n/a def _make_test_archive(filename_1, dirname_1, filename_2):
2401n/a # the file contents to write
2402n/a fobj = io.BytesIO(b"content")
2403n/a
2404n/a # create a tar file with a file, a directory, and a file within that
2405n/a # directory. Assign various .uid/.gid values to them
2406n/a items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2407n/a (dirname_1, 77, 76, tarfile.DIRTYPE, None),
2408n/a (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2409n/a ]
2410n/a with tarfile.open(tmpname, 'w') as tarfl:
2411n/a for name, uid, gid, typ, contents in items:
2412n/a t = tarfile.TarInfo(name)
2413n/a t.uid = uid
2414n/a t.gid = gid
2415n/a t.uname = 'root'
2416n/a t.gname = 'root'
2417n/a t.type = typ
2418n/a tarfl.addfile(t, contents)
2419n/a
2420n/a # return the full pathname to the tar file
2421n/a return tmpname
2422n/a
2423n/a @staticmethod
2424n/a @contextmanager
2425n/a def _setup_test(mock_geteuid):
2426n/a mock_geteuid.return_value = 0 # lie and say we're root
2427n/a fname = 'numeric-owner-testfile'
2428n/a dirname = 'dir'
2429n/a
2430n/a # the names we want stored in the tarfile
2431n/a filename_1 = fname
2432n/a dirname_1 = dirname
2433n/a filename_2 = os.path.join(dirname, fname)
2434n/a
2435n/a # create the tarfile with the contents we're after
2436n/a tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2437n/a dirname_1,
2438n/a filename_2)
2439n/a
2440n/a # open the tarfile for reading. yield it and the names of the items
2441n/a # we stored into the file
2442n/a with tarfile.open(tar_filename) as tarfl:
2443n/a yield tarfl, filename_1, dirname_1, filename_2
2444n/a
2445n/a @unittest.mock.patch('os.chown')
2446n/a @unittest.mock.patch('os.chmod')
2447n/a @unittest.mock.patch('os.geteuid')
2448n/a def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2449n/a mock_chown):
2450n/a with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2451n/a filename_2):
2452n/a tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2453n/a tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2454n/a
2455n/a # convert to filesystem paths
2456n/a f_filename_1 = os.path.join(TEMPDIR, filename_1)
2457n/a f_filename_2 = os.path.join(TEMPDIR, filename_2)
2458n/a
2459n/a mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2460n/a unittest.mock.call(f_filename_2, 88, 87),
2461n/a ],
2462n/a any_order=True)
2463n/a
2464n/a @unittest.mock.patch('os.chown')
2465n/a @unittest.mock.patch('os.chmod')
2466n/a @unittest.mock.patch('os.geteuid')
2467n/a def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2468n/a mock_chown):
2469n/a with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2470n/a filename_2):
2471n/a tarfl.extractall(TEMPDIR, numeric_owner=True)
2472n/a
2473n/a # convert to filesystem paths
2474n/a f_filename_1 = os.path.join(TEMPDIR, filename_1)
2475n/a f_dirname_1 = os.path.join(TEMPDIR, dirname_1)
2476n/a f_filename_2 = os.path.join(TEMPDIR, filename_2)
2477n/a
2478n/a mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2479n/a unittest.mock.call(f_dirname_1, 77, 76),
2480n/a unittest.mock.call(f_filename_2, 88, 87),
2481n/a ],
2482n/a any_order=True)
2483n/a
2484n/a # this test requires that uid=0 and gid=0 really be named 'root'. that's
2485n/a # because the uname and gname in the test file are 'root', and extract()
2486n/a # will look them up using pwd and grp to find their uid and gid, which we
2487n/a # test here to be 0.
2488n/a @unittest.skipUnless(root_is_uid_gid_0(),
2489n/a 'uid=0,gid=0 must be named "root"')
2490n/a @unittest.mock.patch('os.chown')
2491n/a @unittest.mock.patch('os.chmod')
2492n/a @unittest.mock.patch('os.geteuid')
2493n/a def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2494n/a mock_chown):
2495n/a with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2496n/a tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2497n/a
2498n/a # convert to filesystem paths
2499n/a f_filename_1 = os.path.join(TEMPDIR, filename_1)
2500n/a
2501n/a mock_chown.assert_called_with(f_filename_1, 0, 0)
2502n/a
2503n/a @unittest.mock.patch('os.geteuid')
2504n/a def test_keyword_only(self, mock_geteuid):
2505n/a with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2506n/a self.assertRaises(TypeError,
2507n/a tarfl.extract, filename_1, TEMPDIR, False, True)
2508n/a
2509n/a
2510n/adef setUpModule():
2511n/a support.unlink(TEMPDIR)
2512n/a os.makedirs(TEMPDIR)
2513n/a
2514n/a global testtarnames
2515n/a testtarnames = [tarname]
2516n/a with open(tarname, "rb") as fobj:
2517n/a data = fobj.read()
2518n/a
2519n/a # Create compressed tarfiles.
2520n/a for c in GzipTest, Bz2Test, LzmaTest:
2521n/a if c.open:
2522n/a support.unlink(c.tarname)
2523n/a testtarnames.append(c.tarname)
2524n/a with c.open(c.tarname, "wb") as tar:
2525n/a tar.write(data)
2526n/a
2527n/adef tearDownModule():
2528n/a if os.path.exists(TEMPDIR):
2529n/a support.rmtree(TEMPDIR)
2530n/a
2531n/aif __name__ == "__main__":
2532n/a unittest.main()