ยปCore Development>Code coverage>Lib/test/test_shutil.py

Python code coverage for Lib/test/test_shutil.py

#countcontent
1n/a# Copyright (C) 2003 Python Software Foundation
2n/a
3n/aimport unittest
4n/aimport unittest.mock
5n/aimport shutil
6n/aimport tempfile
7n/aimport sys
8n/aimport stat
9n/aimport os
10n/aimport os.path
11n/aimport errno
12n/aimport functools
13n/aimport subprocess
14n/afrom contextlib import ExitStack
15n/afrom shutil import (make_archive,
16n/a register_archive_format, unregister_archive_format,
17n/a get_archive_formats, Error, unpack_archive,
18n/a register_unpack_format, RegistryError,
19n/a unregister_unpack_format, get_unpack_formats,
20n/a SameFileError)
21n/aimport tarfile
22n/aimport zipfile
23n/a
24n/afrom test import support
25n/afrom test.support import (TESTFN, check_warnings, captured_stdout,
26n/a android_not_root)
27n/a
28n/aTESTFN2 = TESTFN + "2"
29n/a
30n/atry:
31n/a import grp
32n/a import pwd
33n/a UID_GID_SUPPORT = True
34n/aexcept ImportError:
35n/a UID_GID_SUPPORT = False
36n/a
37n/adef _fake_rename(*args, **kwargs):
38n/a # Pretend the destination path is on a different filesystem.
39n/a raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
40n/a
41n/adef mock_rename(func):
42n/a @functools.wraps(func)
43n/a def wrap(*args, **kwargs):
44n/a try:
45n/a builtin_rename = os.rename
46n/a os.rename = _fake_rename
47n/a return func(*args, **kwargs)
48n/a finally:
49n/a os.rename = builtin_rename
50n/a return wrap
51n/a
52n/adef write_file(path, content, binary=False):
53n/a """Write *content* to a file located at *path*.
54n/a
55n/a If *path* is a tuple instead of a string, os.path.join will be used to
56n/a make a path. If *binary* is true, the file will be opened in binary
57n/a mode.
58n/a """
59n/a if isinstance(path, tuple):
60n/a path = os.path.join(*path)
61n/a with open(path, 'wb' if binary else 'w') as fp:
62n/a fp.write(content)
63n/a
64n/adef read_file(path, binary=False):
65n/a """Return contents from a file located at *path*.
66n/a
67n/a If *path* is a tuple instead of a string, os.path.join will be used to
68n/a make a path. If *binary* is true, the file will be opened in binary
69n/a mode.
70n/a """
71n/a if isinstance(path, tuple):
72n/a path = os.path.join(*path)
73n/a with open(path, 'rb' if binary else 'r') as fp:
74n/a return fp.read()
75n/a
76n/adef rlistdir(path):
77n/a res = []
78n/a for name in sorted(os.listdir(path)):
79n/a p = os.path.join(path, name)
80n/a if os.path.isdir(p) and not os.path.islink(p):
81n/a res.append(name + '/')
82n/a for n in rlistdir(p):
83n/a res.append(name + '/' + n)
84n/a else:
85n/a res.append(name)
86n/a return res
87n/a
88n/a
89n/aclass TestShutil(unittest.TestCase):
90n/a
91n/a def setUp(self):
92n/a super(TestShutil, self).setUp()
93n/a self.tempdirs = []
94n/a
95n/a def tearDown(self):
96n/a super(TestShutil, self).tearDown()
97n/a while self.tempdirs:
98n/a d = self.tempdirs.pop()
99n/a shutil.rmtree(d, os.name in ('nt', 'cygwin'))
100n/a
101n/a
102n/a def mkdtemp(self):
103n/a """Create a temporary directory that will be cleaned up.
104n/a
105n/a Returns the path of the directory.
106n/a """
107n/a d = tempfile.mkdtemp()
108n/a self.tempdirs.append(d)
109n/a return d
110n/a
111n/a def test_rmtree_works_on_bytes(self):
112n/a tmp = self.mkdtemp()
113n/a victim = os.path.join(tmp, 'killme')
114n/a os.mkdir(victim)
115n/a write_file(os.path.join(victim, 'somefile'), 'foo')
116n/a victim = os.fsencode(victim)
117n/a self.assertIsInstance(victim, bytes)
118n/a shutil.rmtree(victim)
119n/a
120n/a @support.skip_unless_symlink
121n/a def test_rmtree_fails_on_symlink(self):
122n/a tmp = self.mkdtemp()
123n/a dir_ = os.path.join(tmp, 'dir')
124n/a os.mkdir(dir_)
125n/a link = os.path.join(tmp, 'link')
126n/a os.symlink(dir_, link)
127n/a self.assertRaises(OSError, shutil.rmtree, link)
128n/a self.assertTrue(os.path.exists(dir_))
129n/a self.assertTrue(os.path.lexists(link))
130n/a errors = []
131n/a def onerror(*args):
132n/a errors.append(args)
133n/a shutil.rmtree(link, onerror=onerror)
134n/a self.assertEqual(len(errors), 1)
135n/a self.assertIs(errors[0][0], os.path.islink)
136n/a self.assertEqual(errors[0][1], link)
137n/a self.assertIsInstance(errors[0][2][1], OSError)
138n/a
139n/a @support.skip_unless_symlink
140n/a def test_rmtree_works_on_symlinks(self):
141n/a tmp = self.mkdtemp()
142n/a dir1 = os.path.join(tmp, 'dir1')
143n/a dir2 = os.path.join(dir1, 'dir2')
144n/a dir3 = os.path.join(tmp, 'dir3')
145n/a for d in dir1, dir2, dir3:
146n/a os.mkdir(d)
147n/a file1 = os.path.join(tmp, 'file1')
148n/a write_file(file1, 'foo')
149n/a link1 = os.path.join(dir1, 'link1')
150n/a os.symlink(dir2, link1)
151n/a link2 = os.path.join(dir1, 'link2')
152n/a os.symlink(dir3, link2)
153n/a link3 = os.path.join(dir1, 'link3')
154n/a os.symlink(file1, link3)
155n/a # make sure symlinks are removed but not followed
156n/a shutil.rmtree(dir1)
157n/a self.assertFalse(os.path.exists(dir1))
158n/a self.assertTrue(os.path.exists(dir3))
159n/a self.assertTrue(os.path.exists(file1))
160n/a
161n/a def test_rmtree_errors(self):
162n/a # filename is guaranteed not to exist
163n/a filename = tempfile.mktemp()
164n/a self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
165n/a # test that ignore_errors option is honored
166n/a shutil.rmtree(filename, ignore_errors=True)
167n/a
168n/a # existing file
169n/a tmpdir = self.mkdtemp()
170n/a write_file((tmpdir, "tstfile"), "")
171n/a filename = os.path.join(tmpdir, "tstfile")
172n/a with self.assertRaises(NotADirectoryError) as cm:
173n/a shutil.rmtree(filename)
174n/a # The reason for this rather odd construct is that Windows sprinkles
175n/a # a \*.* at the end of file names. But only sometimes on some buildbots
176n/a possible_args = [filename, os.path.join(filename, '*.*')]
177n/a self.assertIn(cm.exception.filename, possible_args)
178n/a self.assertTrue(os.path.exists(filename))
179n/a # test that ignore_errors option is honored
180n/a shutil.rmtree(filename, ignore_errors=True)
181n/a self.assertTrue(os.path.exists(filename))
182n/a errors = []
183n/a def onerror(*args):
184n/a errors.append(args)
185n/a shutil.rmtree(filename, onerror=onerror)
186n/a self.assertEqual(len(errors), 2)
187n/a self.assertIs(errors[0][0], os.listdir)
188n/a self.assertEqual(errors[0][1], filename)
189n/a self.assertIsInstance(errors[0][2][1], NotADirectoryError)
190n/a self.assertIn(errors[0][2][1].filename, possible_args)
191n/a self.assertIs(errors[1][0], os.rmdir)
192n/a self.assertEqual(errors[1][1], filename)
193n/a self.assertIsInstance(errors[1][2][1], NotADirectoryError)
194n/a self.assertIn(errors[1][2][1].filename, possible_args)
195n/a
196n/a
197n/a @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
198n/a @unittest.skipIf(sys.platform[:6] == 'cygwin',
199n/a "This test can't be run on Cygwin (issue #1071513).")
200n/a @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
201n/a "This test can't be run reliably as root (issue #1076467).")
202n/a def test_on_error(self):
203n/a self.errorState = 0
204n/a os.mkdir(TESTFN)
205n/a self.addCleanup(shutil.rmtree, TESTFN)
206n/a
207n/a self.child_file_path = os.path.join(TESTFN, 'a')
208n/a self.child_dir_path = os.path.join(TESTFN, 'b')
209n/a support.create_empty_file(self.child_file_path)
210n/a os.mkdir(self.child_dir_path)
211n/a old_dir_mode = os.stat(TESTFN).st_mode
212n/a old_child_file_mode = os.stat(self.child_file_path).st_mode
213n/a old_child_dir_mode = os.stat(self.child_dir_path).st_mode
214n/a # Make unwritable.
215n/a new_mode = stat.S_IREAD|stat.S_IEXEC
216n/a os.chmod(self.child_file_path, new_mode)
217n/a os.chmod(self.child_dir_path, new_mode)
218n/a os.chmod(TESTFN, new_mode)
219n/a
220n/a self.addCleanup(os.chmod, TESTFN, old_dir_mode)
221n/a self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
222n/a self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
223n/a
224n/a shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
225n/a # Test whether onerror has actually been called.
226n/a self.assertEqual(self.errorState, 3,
227n/a "Expected call to onerror function did not happen.")
228n/a
229n/a def check_args_to_onerror(self, func, arg, exc):
230n/a # test_rmtree_errors deliberately runs rmtree
231n/a # on a directory that is chmod 500, which will fail.
232n/a # This function is run when shutil.rmtree fails.
233n/a # 99.9% of the time it initially fails to remove
234n/a # a file in the directory, so the first time through
235n/a # func is os.remove.
236n/a # However, some Linux machines running ZFS on
237n/a # FUSE experienced a failure earlier in the process
238n/a # at os.listdir. The first failure may legally
239n/a # be either.
240n/a if self.errorState < 2:
241n/a if func is os.unlink:
242n/a self.assertEqual(arg, self.child_file_path)
243n/a elif func is os.rmdir:
244n/a self.assertEqual(arg, self.child_dir_path)
245n/a else:
246n/a self.assertIs(func, os.listdir)
247n/a self.assertIn(arg, [TESTFN, self.child_dir_path])
248n/a self.assertTrue(issubclass(exc[0], OSError))
249n/a self.errorState += 1
250n/a else:
251n/a self.assertEqual(func, os.rmdir)
252n/a self.assertEqual(arg, TESTFN)
253n/a self.assertTrue(issubclass(exc[0], OSError))
254n/a self.errorState = 3
255n/a
256n/a def test_rmtree_does_not_choke_on_failing_lstat(self):
257n/a try:
258n/a orig_lstat = os.lstat
259n/a def raiser(fn, *args, **kwargs):
260n/a if fn != TESTFN:
261n/a raise OSError()
262n/a else:
263n/a return orig_lstat(fn)
264n/a os.lstat = raiser
265n/a
266n/a os.mkdir(TESTFN)
267n/a write_file((TESTFN, 'foo'), 'foo')
268n/a shutil.rmtree(TESTFN)
269n/a finally:
270n/a os.lstat = orig_lstat
271n/a
272n/a @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
273n/a @support.skip_unless_symlink
274n/a def test_copymode_follow_symlinks(self):
275n/a tmp_dir = self.mkdtemp()
276n/a src = os.path.join(tmp_dir, 'foo')
277n/a dst = os.path.join(tmp_dir, 'bar')
278n/a src_link = os.path.join(tmp_dir, 'baz')
279n/a dst_link = os.path.join(tmp_dir, 'quux')
280n/a write_file(src, 'foo')
281n/a write_file(dst, 'foo')
282n/a os.symlink(src, src_link)
283n/a os.symlink(dst, dst_link)
284n/a os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
285n/a # file to file
286n/a os.chmod(dst, stat.S_IRWXO)
287n/a self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
288n/a shutil.copymode(src, dst)
289n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
290n/a # On Windows, os.chmod does not follow symlinks (issue #15411)
291n/a if os.name != 'nt':
292n/a # follow src link
293n/a os.chmod(dst, stat.S_IRWXO)
294n/a shutil.copymode(src_link, dst)
295n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
296n/a # follow dst link
297n/a os.chmod(dst, stat.S_IRWXO)
298n/a shutil.copymode(src, dst_link)
299n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
300n/a # follow both links
301n/a os.chmod(dst, stat.S_IRWXO)
302n/a shutil.copymode(src_link, dst_link)
303n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
304n/a
305n/a @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
306n/a @support.skip_unless_symlink
307n/a def test_copymode_symlink_to_symlink(self):
308n/a tmp_dir = self.mkdtemp()
309n/a src = os.path.join(tmp_dir, 'foo')
310n/a dst = os.path.join(tmp_dir, 'bar')
311n/a src_link = os.path.join(tmp_dir, 'baz')
312n/a dst_link = os.path.join(tmp_dir, 'quux')
313n/a write_file(src, 'foo')
314n/a write_file(dst, 'foo')
315n/a os.symlink(src, src_link)
316n/a os.symlink(dst, dst_link)
317n/a os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
318n/a os.chmod(dst, stat.S_IRWXU)
319n/a os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
320n/a # link to link
321n/a os.lchmod(dst_link, stat.S_IRWXO)
322n/a shutil.copymode(src_link, dst_link, follow_symlinks=False)
323n/a self.assertEqual(os.lstat(src_link).st_mode,
324n/a os.lstat(dst_link).st_mode)
325n/a self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
326n/a # src link - use chmod
327n/a os.lchmod(dst_link, stat.S_IRWXO)
328n/a shutil.copymode(src_link, dst, follow_symlinks=False)
329n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
330n/a # dst link - use chmod
331n/a os.lchmod(dst_link, stat.S_IRWXO)
332n/a shutil.copymode(src, dst_link, follow_symlinks=False)
333n/a self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
334n/a
335n/a @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
336n/a @support.skip_unless_symlink
337n/a def test_copymode_symlink_to_symlink_wo_lchmod(self):
338n/a tmp_dir = self.mkdtemp()
339n/a src = os.path.join(tmp_dir, 'foo')
340n/a dst = os.path.join(tmp_dir, 'bar')
341n/a src_link = os.path.join(tmp_dir, 'baz')
342n/a dst_link = os.path.join(tmp_dir, 'quux')
343n/a write_file(src, 'foo')
344n/a write_file(dst, 'foo')
345n/a os.symlink(src, src_link)
346n/a os.symlink(dst, dst_link)
347n/a shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
348n/a
349n/a @support.skip_unless_symlink
350n/a def test_copystat_symlinks(self):
351n/a tmp_dir = self.mkdtemp()
352n/a src = os.path.join(tmp_dir, 'foo')
353n/a dst = os.path.join(tmp_dir, 'bar')
354n/a src_link = os.path.join(tmp_dir, 'baz')
355n/a dst_link = os.path.join(tmp_dir, 'qux')
356n/a write_file(src, 'foo')
357n/a src_stat = os.stat(src)
358n/a os.utime(src, (src_stat.st_atime,
359n/a src_stat.st_mtime - 42.0)) # ensure different mtimes
360n/a write_file(dst, 'bar')
361n/a self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
362n/a os.symlink(src, src_link)
363n/a os.symlink(dst, dst_link)
364n/a if hasattr(os, 'lchmod'):
365n/a os.lchmod(src_link, stat.S_IRWXO)
366n/a if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
367n/a os.lchflags(src_link, stat.UF_NODUMP)
368n/a src_link_stat = os.lstat(src_link)
369n/a # follow
370n/a if hasattr(os, 'lchmod'):
371n/a shutil.copystat(src_link, dst_link, follow_symlinks=True)
372n/a self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
373n/a # don't follow
374n/a shutil.copystat(src_link, dst_link, follow_symlinks=False)
375n/a dst_link_stat = os.lstat(dst_link)
376n/a if os.utime in os.supports_follow_symlinks:
377n/a for attr in 'st_atime', 'st_mtime':
378n/a # The modification times may be truncated in the new file.
379n/a self.assertLessEqual(getattr(src_link_stat, attr),
380n/a getattr(dst_link_stat, attr) + 1)
381n/a if hasattr(os, 'lchmod'):
382n/a self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
383n/a if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
384n/a self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
385n/a # tell to follow but dst is not a link
386n/a shutil.copystat(src_link, dst, follow_symlinks=False)
387n/a self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
388n/a 00000.1)
389n/a
390n/a @unittest.skipUnless(hasattr(os, 'chflags') and
391n/a hasattr(errno, 'EOPNOTSUPP') and
392n/a hasattr(errno, 'ENOTSUP'),
393n/a "requires os.chflags, EOPNOTSUPP & ENOTSUP")
394n/a def test_copystat_handles_harmless_chflags_errors(self):
395n/a tmpdir = self.mkdtemp()
396n/a file1 = os.path.join(tmpdir, 'file1')
397n/a file2 = os.path.join(tmpdir, 'file2')
398n/a write_file(file1, 'xxx')
399n/a write_file(file2, 'xxx')
400n/a
401n/a def make_chflags_raiser(err):
402n/a ex = OSError()
403n/a
404n/a def _chflags_raiser(path, flags, *, follow_symlinks=True):
405n/a ex.errno = err
406n/a raise ex
407n/a return _chflags_raiser
408n/a old_chflags = os.chflags
409n/a try:
410n/a for err in errno.EOPNOTSUPP, errno.ENOTSUP:
411n/a os.chflags = make_chflags_raiser(err)
412n/a shutil.copystat(file1, file2)
413n/a # assert others errors break it
414n/a os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
415n/a self.assertRaises(OSError, shutil.copystat, file1, file2)
416n/a finally:
417n/a os.chflags = old_chflags
418n/a
419n/a @support.skip_unless_xattr
420n/a def test_copyxattr(self):
421n/a tmp_dir = self.mkdtemp()
422n/a src = os.path.join(tmp_dir, 'foo')
423n/a write_file(src, 'foo')
424n/a dst = os.path.join(tmp_dir, 'bar')
425n/a write_file(dst, 'bar')
426n/a
427n/a # no xattr == no problem
428n/a shutil._copyxattr(src, dst)
429n/a # common case
430n/a os.setxattr(src, 'user.foo', b'42')
431n/a os.setxattr(src, 'user.bar', b'43')
432n/a shutil._copyxattr(src, dst)
433n/a self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
434n/a self.assertEqual(
435n/a os.getxattr(src, 'user.foo'),
436n/a os.getxattr(dst, 'user.foo'))
437n/a # check errors don't affect other attrs
438n/a os.remove(dst)
439n/a write_file(dst, 'bar')
440n/a os_error = OSError(errno.EPERM, 'EPERM')
441n/a
442n/a def _raise_on_user_foo(fname, attr, val, **kwargs):
443n/a if attr == 'user.foo':
444n/a raise os_error
445n/a else:
446n/a orig_setxattr(fname, attr, val, **kwargs)
447n/a try:
448n/a orig_setxattr = os.setxattr
449n/a os.setxattr = _raise_on_user_foo
450n/a shutil._copyxattr(src, dst)
451n/a self.assertIn('user.bar', os.listxattr(dst))
452n/a finally:
453n/a os.setxattr = orig_setxattr
454n/a # the source filesystem not supporting xattrs should be ok, too.
455n/a def _raise_on_src(fname, *, follow_symlinks=True):
456n/a if fname == src:
457n/a raise OSError(errno.ENOTSUP, 'Operation not supported')
458n/a return orig_listxattr(fname, follow_symlinks=follow_symlinks)
459n/a try:
460n/a orig_listxattr = os.listxattr
461n/a os.listxattr = _raise_on_src
462n/a shutil._copyxattr(src, dst)
463n/a finally:
464n/a os.listxattr = orig_listxattr
465n/a
466n/a # test that shutil.copystat copies xattrs
467n/a src = os.path.join(tmp_dir, 'the_original')
468n/a write_file(src, src)
469n/a os.setxattr(src, 'user.the_value', b'fiddly')
470n/a dst = os.path.join(tmp_dir, 'the_copy')
471n/a write_file(dst, dst)
472n/a shutil.copystat(src, dst)
473n/a self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
474n/a
475n/a @support.skip_unless_symlink
476n/a @support.skip_unless_xattr
477n/a @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
478n/a 'root privileges required')
479n/a def test_copyxattr_symlinks(self):
480n/a # On Linux, it's only possible to access non-user xattr for symlinks;
481n/a # which in turn require root privileges. This test should be expanded
482n/a # as soon as other platforms gain support for extended attributes.
483n/a tmp_dir = self.mkdtemp()
484n/a src = os.path.join(tmp_dir, 'foo')
485n/a src_link = os.path.join(tmp_dir, 'baz')
486n/a write_file(src, 'foo')
487n/a os.symlink(src, src_link)
488n/a os.setxattr(src, 'trusted.foo', b'42')
489n/a os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
490n/a dst = os.path.join(tmp_dir, 'bar')
491n/a dst_link = os.path.join(tmp_dir, 'qux')
492n/a write_file(dst, 'bar')
493n/a os.symlink(dst, dst_link)
494n/a shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
495n/a self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
496n/a self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
497n/a shutil._copyxattr(src_link, dst, follow_symlinks=False)
498n/a self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
499n/a
500n/a @support.skip_unless_symlink
501n/a def test_copy_symlinks(self):
502n/a tmp_dir = self.mkdtemp()
503n/a src = os.path.join(tmp_dir, 'foo')
504n/a dst = os.path.join(tmp_dir, 'bar')
505n/a src_link = os.path.join(tmp_dir, 'baz')
506n/a write_file(src, 'foo')
507n/a os.symlink(src, src_link)
508n/a if hasattr(os, 'lchmod'):
509n/a os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
510n/a # don't follow
511n/a shutil.copy(src_link, dst, follow_symlinks=True)
512n/a self.assertFalse(os.path.islink(dst))
513n/a self.assertEqual(read_file(src), read_file(dst))
514n/a os.remove(dst)
515n/a # follow
516n/a shutil.copy(src_link, dst, follow_symlinks=False)
517n/a self.assertTrue(os.path.islink(dst))
518n/a self.assertEqual(os.readlink(dst), os.readlink(src_link))
519n/a if hasattr(os, 'lchmod'):
520n/a self.assertEqual(os.lstat(src_link).st_mode,
521n/a os.lstat(dst).st_mode)
522n/a
523n/a @support.skip_unless_symlink
524n/a def test_copy2_symlinks(self):
525n/a tmp_dir = self.mkdtemp()
526n/a src = os.path.join(tmp_dir, 'foo')
527n/a dst = os.path.join(tmp_dir, 'bar')
528n/a src_link = os.path.join(tmp_dir, 'baz')
529n/a write_file(src, 'foo')
530n/a os.symlink(src, src_link)
531n/a if hasattr(os, 'lchmod'):
532n/a os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
533n/a if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
534n/a os.lchflags(src_link, stat.UF_NODUMP)
535n/a src_stat = os.stat(src)
536n/a src_link_stat = os.lstat(src_link)
537n/a # follow
538n/a shutil.copy2(src_link, dst, follow_symlinks=True)
539n/a self.assertFalse(os.path.islink(dst))
540n/a self.assertEqual(read_file(src), read_file(dst))
541n/a os.remove(dst)
542n/a # don't follow
543n/a shutil.copy2(src_link, dst, follow_symlinks=False)
544n/a self.assertTrue(os.path.islink(dst))
545n/a self.assertEqual(os.readlink(dst), os.readlink(src_link))
546n/a dst_stat = os.lstat(dst)
547n/a if os.utime in os.supports_follow_symlinks:
548n/a for attr in 'st_atime', 'st_mtime':
549n/a # The modification times may be truncated in the new file.
550n/a self.assertLessEqual(getattr(src_link_stat, attr),
551n/a getattr(dst_stat, attr) + 1)
552n/a if hasattr(os, 'lchmod'):
553n/a self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
554n/a self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
555n/a if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
556n/a self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
557n/a
558n/a @support.skip_unless_xattr
559n/a def test_copy2_xattr(self):
560n/a tmp_dir = self.mkdtemp()
561n/a src = os.path.join(tmp_dir, 'foo')
562n/a dst = os.path.join(tmp_dir, 'bar')
563n/a write_file(src, 'foo')
564n/a os.setxattr(src, 'user.foo', b'42')
565n/a shutil.copy2(src, dst)
566n/a self.assertEqual(
567n/a os.getxattr(src, 'user.foo'),
568n/a os.getxattr(dst, 'user.foo'))
569n/a os.remove(dst)
570n/a
571n/a @support.skip_unless_symlink
572n/a def test_copyfile_symlinks(self):
573n/a tmp_dir = self.mkdtemp()
574n/a src = os.path.join(tmp_dir, 'src')
575n/a dst = os.path.join(tmp_dir, 'dst')
576n/a dst_link = os.path.join(tmp_dir, 'dst_link')
577n/a link = os.path.join(tmp_dir, 'link')
578n/a write_file(src, 'foo')
579n/a os.symlink(src, link)
580n/a # don't follow
581n/a shutil.copyfile(link, dst_link, follow_symlinks=False)
582n/a self.assertTrue(os.path.islink(dst_link))
583n/a self.assertEqual(os.readlink(link), os.readlink(dst_link))
584n/a # follow
585n/a shutil.copyfile(link, dst)
586n/a self.assertFalse(os.path.islink(dst))
587n/a
588n/a def test_rmtree_uses_safe_fd_version_if_available(self):
589n/a _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
590n/a os.supports_dir_fd and
591n/a os.listdir in os.supports_fd and
592n/a os.stat in os.supports_follow_symlinks)
593n/a if _use_fd_functions:
594n/a self.assertTrue(shutil._use_fd_functions)
595n/a self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
596n/a tmp_dir = self.mkdtemp()
597n/a d = os.path.join(tmp_dir, 'a')
598n/a os.mkdir(d)
599n/a try:
600n/a real_rmtree = shutil._rmtree_safe_fd
601n/a class Called(Exception): pass
602n/a def _raiser(*args, **kwargs):
603n/a raise Called
604n/a shutil._rmtree_safe_fd = _raiser
605n/a self.assertRaises(Called, shutil.rmtree, d)
606n/a finally:
607n/a shutil._rmtree_safe_fd = real_rmtree
608n/a else:
609n/a self.assertFalse(shutil._use_fd_functions)
610n/a self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
611n/a
612n/a def test_rmtree_dont_delete_file(self):
613n/a # When called on a file instead of a directory, don't delete it.
614n/a handle, path = tempfile.mkstemp()
615n/a os.close(handle)
616n/a self.assertRaises(NotADirectoryError, shutil.rmtree, path)
617n/a os.remove(path)
618n/a
619n/a def test_copytree_simple(self):
620n/a src_dir = tempfile.mkdtemp()
621n/a dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
622n/a self.addCleanup(shutil.rmtree, src_dir)
623n/a self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
624n/a write_file((src_dir, 'test.txt'), '123')
625n/a os.mkdir(os.path.join(src_dir, 'test_dir'))
626n/a write_file((src_dir, 'test_dir', 'test.txt'), '456')
627n/a
628n/a shutil.copytree(src_dir, dst_dir)
629n/a self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
630n/a self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
631n/a self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
632n/a 'test.txt')))
633n/a actual = read_file((dst_dir, 'test.txt'))
634n/a self.assertEqual(actual, '123')
635n/a actual = read_file((dst_dir, 'test_dir', 'test.txt'))
636n/a self.assertEqual(actual, '456')
637n/a
638n/a @support.skip_unless_symlink
639n/a def test_copytree_symlinks(self):
640n/a tmp_dir = self.mkdtemp()
641n/a src_dir = os.path.join(tmp_dir, 'src')
642n/a dst_dir = os.path.join(tmp_dir, 'dst')
643n/a sub_dir = os.path.join(src_dir, 'sub')
644n/a os.mkdir(src_dir)
645n/a os.mkdir(sub_dir)
646n/a write_file((src_dir, 'file.txt'), 'foo')
647n/a src_link = os.path.join(sub_dir, 'link')
648n/a dst_link = os.path.join(dst_dir, 'sub/link')
649n/a os.symlink(os.path.join(src_dir, 'file.txt'),
650n/a src_link)
651n/a if hasattr(os, 'lchmod'):
652n/a os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
653n/a if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
654n/a os.lchflags(src_link, stat.UF_NODUMP)
655n/a src_stat = os.lstat(src_link)
656n/a shutil.copytree(src_dir, dst_dir, symlinks=True)
657n/a self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
658n/a self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
659n/a os.path.join(src_dir, 'file.txt'))
660n/a dst_stat = os.lstat(dst_link)
661n/a if hasattr(os, 'lchmod'):
662n/a self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
663n/a if hasattr(os, 'lchflags'):
664n/a self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
665n/a
666n/a def test_copytree_with_exclude(self):
667n/a # creating data
668n/a join = os.path.join
669n/a exists = os.path.exists
670n/a src_dir = tempfile.mkdtemp()
671n/a try:
672n/a dst_dir = join(tempfile.mkdtemp(), 'destination')
673n/a write_file((src_dir, 'test.txt'), '123')
674n/a write_file((src_dir, 'test.tmp'), '123')
675n/a os.mkdir(join(src_dir, 'test_dir'))
676n/a write_file((src_dir, 'test_dir', 'test.txt'), '456')
677n/a os.mkdir(join(src_dir, 'test_dir2'))
678n/a write_file((src_dir, 'test_dir2', 'test.txt'), '456')
679n/a os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
680n/a os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
681n/a write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
682n/a write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
683n/a
684n/a # testing glob-like patterns
685n/a try:
686n/a patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
687n/a shutil.copytree(src_dir, dst_dir, ignore=patterns)
688n/a # checking the result: some elements should not be copied
689n/a self.assertTrue(exists(join(dst_dir, 'test.txt')))
690n/a self.assertFalse(exists(join(dst_dir, 'test.tmp')))
691n/a self.assertFalse(exists(join(dst_dir, 'test_dir2')))
692n/a finally:
693n/a shutil.rmtree(dst_dir)
694n/a try:
695n/a patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
696n/a shutil.copytree(src_dir, dst_dir, ignore=patterns)
697n/a # checking the result: some elements should not be copied
698n/a self.assertFalse(exists(join(dst_dir, 'test.tmp')))
699n/a self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
700n/a self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
701n/a finally:
702n/a shutil.rmtree(dst_dir)
703n/a
704n/a # testing callable-style
705n/a try:
706n/a def _filter(src, names):
707n/a res = []
708n/a for name in names:
709n/a path = os.path.join(src, name)
710n/a
711n/a if (os.path.isdir(path) and
712n/a path.split()[-1] == 'subdir'):
713n/a res.append(name)
714n/a elif os.path.splitext(path)[-1] in ('.py'):
715n/a res.append(name)
716n/a return res
717n/a
718n/a shutil.copytree(src_dir, dst_dir, ignore=_filter)
719n/a
720n/a # checking the result: some elements should not be copied
721n/a self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
722n/a 'test.py')))
723n/a self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
724n/a
725n/a finally:
726n/a shutil.rmtree(dst_dir)
727n/a finally:
728n/a shutil.rmtree(src_dir)
729n/a shutil.rmtree(os.path.dirname(dst_dir))
730n/a
731n/a def test_copytree_retains_permissions(self):
732n/a tmp_dir = tempfile.mkdtemp()
733n/a src_dir = os.path.join(tmp_dir, 'source')
734n/a os.mkdir(src_dir)
735n/a dst_dir = os.path.join(tmp_dir, 'destination')
736n/a self.addCleanup(shutil.rmtree, tmp_dir)
737n/a
738n/a os.chmod(src_dir, 0o777)
739n/a write_file((src_dir, 'permissive.txt'), '123')
740n/a os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
741n/a write_file((src_dir, 'restrictive.txt'), '456')
742n/a os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
743n/a restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
744n/a os.chmod(restrictive_subdir, 0o600)
745n/a
746n/a shutil.copytree(src_dir, dst_dir)
747n/a self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
748n/a self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
749n/a os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
750n/a self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
751n/a os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
752n/a restrictive_subdir_dst = os.path.join(dst_dir,
753n/a os.path.split(restrictive_subdir)[1])
754n/a self.assertEqual(os.stat(restrictive_subdir).st_mode,
755n/a os.stat(restrictive_subdir_dst).st_mode)
756n/a
757n/a @unittest.mock.patch('os.chmod')
758n/a def test_copytree_winerror(self, mock_patch):
759n/a # When copying to VFAT, copystat() raises OSError. On Windows, the
760n/a # exception object has a meaningful 'winerror' attribute, but not
761n/a # on other operating systems. Do not assume 'winerror' is set.
762n/a src_dir = tempfile.mkdtemp()
763n/a dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
764n/a self.addCleanup(shutil.rmtree, src_dir)
765n/a self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
766n/a
767n/a mock_patch.side_effect = PermissionError('ka-boom')
768n/a with self.assertRaises(shutil.Error):
769n/a shutil.copytree(src_dir, dst_dir)
770n/a
771n/a @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
772n/a @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
773n/a @unittest.skipIf(android_not_root, "hard links not allowed, non root user")
774n/a def test_dont_copy_file_onto_link_to_itself(self):
775n/a # bug 851123.
776n/a os.mkdir(TESTFN)
777n/a src = os.path.join(TESTFN, 'cheese')
778n/a dst = os.path.join(TESTFN, 'shop')
779n/a try:
780n/a with open(src, 'w') as f:
781n/a f.write('cheddar')
782n/a os.link(src, dst)
783n/a self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
784n/a with open(src, 'r') as f:
785n/a self.assertEqual(f.read(), 'cheddar')
786n/a os.remove(dst)
787n/a finally:
788n/a shutil.rmtree(TESTFN, ignore_errors=True)
789n/a
790n/a @support.skip_unless_symlink
791n/a def test_dont_copy_file_onto_symlink_to_itself(self):
792n/a # bug 851123.
793n/a os.mkdir(TESTFN)
794n/a src = os.path.join(TESTFN, 'cheese')
795n/a dst = os.path.join(TESTFN, 'shop')
796n/a try:
797n/a with open(src, 'w') as f:
798n/a f.write('cheddar')
799n/a # Using `src` here would mean we end up with a symlink pointing
800n/a # to TESTFN/TESTFN/cheese, while it should point at
801n/a # TESTFN/cheese.
802n/a os.symlink('cheese', dst)
803n/a self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
804n/a with open(src, 'r') as f:
805n/a self.assertEqual(f.read(), 'cheddar')
806n/a os.remove(dst)
807n/a finally:
808n/a shutil.rmtree(TESTFN, ignore_errors=True)
809n/a
810n/a @support.skip_unless_symlink
811n/a def test_rmtree_on_symlink(self):
812n/a # bug 1669.
813n/a os.mkdir(TESTFN)
814n/a try:
815n/a src = os.path.join(TESTFN, 'cheese')
816n/a dst = os.path.join(TESTFN, 'shop')
817n/a os.mkdir(src)
818n/a os.symlink(src, dst)
819n/a self.assertRaises(OSError, shutil.rmtree, dst)
820n/a shutil.rmtree(dst, ignore_errors=True)
821n/a finally:
822n/a shutil.rmtree(TESTFN, ignore_errors=True)
823n/a
824n/a # Issue #3002: copyfile and copytree block indefinitely on named pipes
825n/a @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
826n/a @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
827n/a def test_copyfile_named_pipe(self):
828n/a os.mkfifo(TESTFN)
829n/a try:
830n/a self.assertRaises(shutil.SpecialFileError,
831n/a shutil.copyfile, TESTFN, TESTFN2)
832n/a self.assertRaises(shutil.SpecialFileError,
833n/a shutil.copyfile, __file__, TESTFN)
834n/a finally:
835n/a os.remove(TESTFN)
836n/a
837n/a @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
838n/a @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
839n/a @support.skip_unless_symlink
840n/a def test_copytree_named_pipe(self):
841n/a os.mkdir(TESTFN)
842n/a try:
843n/a subdir = os.path.join(TESTFN, "subdir")
844n/a os.mkdir(subdir)
845n/a pipe = os.path.join(subdir, "mypipe")
846n/a os.mkfifo(pipe)
847n/a try:
848n/a shutil.copytree(TESTFN, TESTFN2)
849n/a except shutil.Error as e:
850n/a errors = e.args[0]
851n/a self.assertEqual(len(errors), 1)
852n/a src, dst, error_msg = errors[0]
853n/a self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
854n/a else:
855n/a self.fail("shutil.Error should have been raised")
856n/a finally:
857n/a shutil.rmtree(TESTFN, ignore_errors=True)
858n/a shutil.rmtree(TESTFN2, ignore_errors=True)
859n/a
860n/a def test_copytree_special_func(self):
861n/a
862n/a src_dir = self.mkdtemp()
863n/a dst_dir = os.path.join(self.mkdtemp(), 'destination')
864n/a write_file((src_dir, 'test.txt'), '123')
865n/a os.mkdir(os.path.join(src_dir, 'test_dir'))
866n/a write_file((src_dir, 'test_dir', 'test.txt'), '456')
867n/a
868n/a copied = []
869n/a def _copy(src, dst):
870n/a copied.append((src, dst))
871n/a
872n/a shutil.copytree(src_dir, dst_dir, copy_function=_copy)
873n/a self.assertEqual(len(copied), 2)
874n/a
875n/a @support.skip_unless_symlink
876n/a def test_copytree_dangling_symlinks(self):
877n/a
878n/a # a dangling symlink raises an error at the end
879n/a src_dir = self.mkdtemp()
880n/a dst_dir = os.path.join(self.mkdtemp(), 'destination')
881n/a os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
882n/a os.mkdir(os.path.join(src_dir, 'test_dir'))
883n/a write_file((src_dir, 'test_dir', 'test.txt'), '456')
884n/a self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
885n/a
886n/a # a dangling symlink is ignored with the proper flag
887n/a dst_dir = os.path.join(self.mkdtemp(), 'destination2')
888n/a shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
889n/a self.assertNotIn('test.txt', os.listdir(dst_dir))
890n/a
891n/a # a dangling symlink is copied if symlinks=True
892n/a dst_dir = os.path.join(self.mkdtemp(), 'destination3')
893n/a shutil.copytree(src_dir, dst_dir, symlinks=True)
894n/a self.assertIn('test.txt', os.listdir(dst_dir))
895n/a
896n/a @support.skip_unless_symlink
897n/a def test_copytree_symlink_dir(self):
898n/a src_dir = self.mkdtemp()
899n/a dst_dir = os.path.join(self.mkdtemp(), 'destination')
900n/a os.mkdir(os.path.join(src_dir, 'real_dir'))
901n/a with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
902n/a pass
903n/a os.symlink(os.path.join(src_dir, 'real_dir'),
904n/a os.path.join(src_dir, 'link_to_dir'),
905n/a target_is_directory=True)
906n/a
907n/a shutil.copytree(src_dir, dst_dir, symlinks=False)
908n/a self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
909n/a self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
910n/a
911n/a dst_dir = os.path.join(self.mkdtemp(), 'destination2')
912n/a shutil.copytree(src_dir, dst_dir, symlinks=True)
913n/a self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
914n/a self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
915n/a
916n/a def _copy_file(self, method):
917n/a fname = 'test.txt'
918n/a tmpdir = self.mkdtemp()
919n/a write_file((tmpdir, fname), 'xxx')
920n/a file1 = os.path.join(tmpdir, fname)
921n/a tmpdir2 = self.mkdtemp()
922n/a method(file1, tmpdir2)
923n/a file2 = os.path.join(tmpdir2, fname)
924n/a return (file1, file2)
925n/a
926n/a @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
927n/a def test_copy(self):
928n/a # Ensure that the copied file exists and has the same mode bits.
929n/a file1, file2 = self._copy_file(shutil.copy)
930n/a self.assertTrue(os.path.exists(file2))
931n/a self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
932n/a
933n/a @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
934n/a @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
935n/a def test_copy2(self):
936n/a # Ensure that the copied file exists and has the same mode and
937n/a # modification time bits.
938n/a file1, file2 = self._copy_file(shutil.copy2)
939n/a self.assertTrue(os.path.exists(file2))
940n/a file1_stat = os.stat(file1)
941n/a file2_stat = os.stat(file2)
942n/a self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
943n/a for attr in 'st_atime', 'st_mtime':
944n/a # The modification times may be truncated in the new file.
945n/a self.assertLessEqual(getattr(file1_stat, attr),
946n/a getattr(file2_stat, attr) + 1)
947n/a if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
948n/a self.assertEqual(getattr(file1_stat, 'st_flags'),
949n/a getattr(file2_stat, 'st_flags'))
950n/a
951n/a @support.requires_zlib
952n/a def test_make_tarball(self):
953n/a # creating something to tar
954n/a root_dir, base_dir = self._create_files('')
955n/a
956n/a tmpdir2 = self.mkdtemp()
957n/a # force shutil to create the directory
958n/a os.rmdir(tmpdir2)
959n/a # working with relative paths
960n/a work_dir = os.path.dirname(tmpdir2)
961n/a rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
962n/a
963n/a with support.change_cwd(work_dir):
964n/a base_name = os.path.abspath(rel_base_name)
965n/a tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
966n/a
967n/a # check if the compressed tarball was created
968n/a self.assertEqual(tarball, base_name + '.tar.gz')
969n/a self.assertTrue(os.path.isfile(tarball))
970n/a self.assertTrue(tarfile.is_tarfile(tarball))
971n/a with tarfile.open(tarball, 'r:gz') as tf:
972n/a self.assertCountEqual(tf.getnames(),
973n/a ['.', './sub', './sub2',
974n/a './file1', './file2', './sub/file3'])
975n/a
976n/a # trying an uncompressed one
977n/a with support.change_cwd(work_dir):
978n/a tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
979n/a self.assertEqual(tarball, base_name + '.tar')
980n/a self.assertTrue(os.path.isfile(tarball))
981n/a self.assertTrue(tarfile.is_tarfile(tarball))
982n/a with tarfile.open(tarball, 'r') as tf:
983n/a self.assertCountEqual(tf.getnames(),
984n/a ['.', './sub', './sub2',
985n/a './file1', './file2', './sub/file3'])
986n/a
987n/a def _tarinfo(self, path):
988n/a with tarfile.open(path) as tar:
989n/a names = tar.getnames()
990n/a names.sort()
991n/a return tuple(names)
992n/a
993n/a def _create_files(self, base_dir='dist'):
994n/a # creating something to tar
995n/a root_dir = self.mkdtemp()
996n/a dist = os.path.join(root_dir, base_dir)
997n/a os.makedirs(dist, exist_ok=True)
998n/a write_file((dist, 'file1'), 'xxx')
999n/a write_file((dist, 'file2'), 'xxx')
1000n/a os.mkdir(os.path.join(dist, 'sub'))
1001n/a write_file((dist, 'sub', 'file3'), 'xxx')
1002n/a os.mkdir(os.path.join(dist, 'sub2'))
1003n/a if base_dir:
1004n/a write_file((root_dir, 'outer'), 'xxx')
1005n/a return root_dir, base_dir
1006n/a
1007n/a @support.requires_zlib
1008n/a @unittest.skipUnless(shutil.which('tar'),
1009n/a 'Need the tar command to run')
1010n/a def test_tarfile_vs_tar(self):
1011n/a root_dir, base_dir = self._create_files()
1012n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1013n/a tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
1014n/a
1015n/a # check if the compressed tarball was created
1016n/a self.assertEqual(tarball, base_name + '.tar.gz')
1017n/a self.assertTrue(os.path.isfile(tarball))
1018n/a
1019n/a # now create another tarball using `tar`
1020n/a tarball2 = os.path.join(root_dir, 'archive2.tar')
1021n/a tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
1022n/a subprocess.check_call(tar_cmd, cwd=root_dir,
1023n/a stdout=subprocess.DEVNULL)
1024n/a
1025n/a self.assertTrue(os.path.isfile(tarball2))
1026n/a # let's compare both tarballs
1027n/a self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
1028n/a
1029n/a # trying an uncompressed one
1030n/a tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1031n/a self.assertEqual(tarball, base_name + '.tar')
1032n/a self.assertTrue(os.path.isfile(tarball))
1033n/a
1034n/a # now for a dry_run
1035n/a tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1036n/a dry_run=True)
1037n/a self.assertEqual(tarball, base_name + '.tar')
1038n/a self.assertTrue(os.path.isfile(tarball))
1039n/a
1040n/a @support.requires_zlib
1041n/a def test_make_zipfile(self):
1042n/a # creating something to zip
1043n/a root_dir, base_dir = self._create_files()
1044n/a
1045n/a tmpdir2 = self.mkdtemp()
1046n/a # force shutil to create the directory
1047n/a os.rmdir(tmpdir2)
1048n/a # working with relative paths
1049n/a work_dir = os.path.dirname(tmpdir2)
1050n/a rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1051n/a
1052n/a with support.change_cwd(work_dir):
1053n/a base_name = os.path.abspath(rel_base_name)
1054n/a res = make_archive(rel_base_name, 'zip', root_dir)
1055n/a
1056n/a self.assertEqual(res, base_name + '.zip')
1057n/a self.assertTrue(os.path.isfile(res))
1058n/a self.assertTrue(zipfile.is_zipfile(res))
1059n/a with zipfile.ZipFile(res) as zf:
1060n/a self.assertCountEqual(zf.namelist(),
1061n/a ['dist/', 'dist/sub/', 'dist/sub2/',
1062n/a 'dist/file1', 'dist/file2', 'dist/sub/file3',
1063n/a 'outer'])
1064n/a
1065n/a with support.change_cwd(work_dir):
1066n/a base_name = os.path.abspath(rel_base_name)
1067n/a res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
1068n/a
1069n/a self.assertEqual(res, base_name + '.zip')
1070n/a self.assertTrue(os.path.isfile(res))
1071n/a self.assertTrue(zipfile.is_zipfile(res))
1072n/a with zipfile.ZipFile(res) as zf:
1073n/a self.assertCountEqual(zf.namelist(),
1074n/a ['dist/', 'dist/sub/', 'dist/sub2/',
1075n/a 'dist/file1', 'dist/file2', 'dist/sub/file3'])
1076n/a
1077n/a @support.requires_zlib
1078n/a @unittest.skipUnless(shutil.which('zip'),
1079n/a 'Need the zip command to run')
1080n/a def test_zipfile_vs_zip(self):
1081n/a root_dir, base_dir = self._create_files()
1082n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1083n/a archive = make_archive(base_name, 'zip', root_dir, base_dir)
1084n/a
1085n/a # check if ZIP file was created
1086n/a self.assertEqual(archive, base_name + '.zip')
1087n/a self.assertTrue(os.path.isfile(archive))
1088n/a
1089n/a # now create another ZIP file using `zip`
1090n/a archive2 = os.path.join(root_dir, 'archive2.zip')
1091n/a zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
1092n/a subprocess.check_call(zip_cmd, cwd=root_dir,
1093n/a stdout=subprocess.DEVNULL)
1094n/a
1095n/a self.assertTrue(os.path.isfile(archive2))
1096n/a # let's compare both ZIP files
1097n/a with zipfile.ZipFile(archive) as zf:
1098n/a names = zf.namelist()
1099n/a with zipfile.ZipFile(archive2) as zf:
1100n/a names2 = zf.namelist()
1101n/a self.assertEqual(sorted(names), sorted(names2))
1102n/a
1103n/a @support.requires_zlib
1104n/a @unittest.skipUnless(shutil.which('unzip'),
1105n/a 'Need the unzip command to run')
1106n/a def test_unzip_zipfile(self):
1107n/a root_dir, base_dir = self._create_files()
1108n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1109n/a archive = make_archive(base_name, 'zip', root_dir, base_dir)
1110n/a
1111n/a # check if ZIP file was created
1112n/a self.assertEqual(archive, base_name + '.zip')
1113n/a self.assertTrue(os.path.isfile(archive))
1114n/a
1115n/a # now check the ZIP file using `unzip -t`
1116n/a zip_cmd = ['unzip', '-t', archive]
1117n/a with support.change_cwd(root_dir):
1118n/a try:
1119n/a subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1120n/a except subprocess.CalledProcessError as exc:
1121n/a details = exc.output.decode(errors="replace")
1122n/a msg = "{}\n\n**Unzip Output**\n{}"
1123n/a self.fail(msg.format(exc, details))
1124n/a
1125n/a def test_make_archive(self):
1126n/a tmpdir = self.mkdtemp()
1127n/a base_name = os.path.join(tmpdir, 'archive')
1128n/a self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1129n/a
1130n/a @support.requires_zlib
1131n/a def test_make_archive_owner_group(self):
1132n/a # testing make_archive with owner and group, with various combinations
1133n/a # this works even if there's not gid/uid support
1134n/a if UID_GID_SUPPORT:
1135n/a group = grp.getgrgid(0)[0]
1136n/a owner = pwd.getpwuid(0)[0]
1137n/a else:
1138n/a group = owner = 'root'
1139n/a
1140n/a root_dir, base_dir = self._create_files()
1141n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1142n/a res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1143n/a group=group)
1144n/a self.assertTrue(os.path.isfile(res))
1145n/a
1146n/a res = make_archive(base_name, 'zip', root_dir, base_dir)
1147n/a self.assertTrue(os.path.isfile(res))
1148n/a
1149n/a res = make_archive(base_name, 'tar', root_dir, base_dir,
1150n/a owner=owner, group=group)
1151n/a self.assertTrue(os.path.isfile(res))
1152n/a
1153n/a res = make_archive(base_name, 'tar', root_dir, base_dir,
1154n/a owner='kjhkjhkjg', group='oihohoh')
1155n/a self.assertTrue(os.path.isfile(res))
1156n/a
1157n/a
1158n/a @support.requires_zlib
1159n/a @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1160n/a def test_tarfile_root_owner(self):
1161n/a root_dir, base_dir = self._create_files()
1162n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1163n/a group = grp.getgrgid(0)[0]
1164n/a owner = pwd.getpwuid(0)[0]
1165n/a with support.change_cwd(root_dir):
1166n/a archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1167n/a owner=owner, group=group)
1168n/a
1169n/a # check if the compressed tarball was created
1170n/a self.assertTrue(os.path.isfile(archive_name))
1171n/a
1172n/a # now checks the rights
1173n/a archive = tarfile.open(archive_name)
1174n/a try:
1175n/a for member in archive.getmembers():
1176n/a self.assertEqual(member.uid, 0)
1177n/a self.assertEqual(member.gid, 0)
1178n/a finally:
1179n/a archive.close()
1180n/a
1181n/a def test_make_archive_cwd(self):
1182n/a current_dir = os.getcwd()
1183n/a def _breaks(*args, **kw):
1184n/a raise RuntimeError()
1185n/a
1186n/a register_archive_format('xxx', _breaks, [], 'xxx file')
1187n/a try:
1188n/a try:
1189n/a make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1190n/a except Exception:
1191n/a pass
1192n/a self.assertEqual(os.getcwd(), current_dir)
1193n/a finally:
1194n/a unregister_archive_format('xxx')
1195n/a
1196n/a def test_make_tarfile_in_curdir(self):
1197n/a # Issue #21280
1198n/a root_dir = self.mkdtemp()
1199n/a with support.change_cwd(root_dir):
1200n/a self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1201n/a self.assertTrue(os.path.isfile('test.tar'))
1202n/a
1203n/a @support.requires_zlib
1204n/a def test_make_zipfile_in_curdir(self):
1205n/a # Issue #21280
1206n/a root_dir = self.mkdtemp()
1207n/a with support.change_cwd(root_dir):
1208n/a self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1209n/a self.assertTrue(os.path.isfile('test.zip'))
1210n/a
1211n/a def test_register_archive_format(self):
1212n/a
1213n/a self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1214n/a self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1215n/a 1)
1216n/a self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1217n/a [(1, 2), (1, 2, 3)])
1218n/a
1219n/a register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1220n/a formats = [name for name, params in get_archive_formats()]
1221n/a self.assertIn('xxx', formats)
1222n/a
1223n/a unregister_archive_format('xxx')
1224n/a formats = [name for name, params in get_archive_formats()]
1225n/a self.assertNotIn('xxx', formats)
1226n/a
1227n/a def check_unpack_archive(self, format):
1228n/a root_dir, base_dir = self._create_files()
1229n/a expected = rlistdir(root_dir)
1230n/a expected.remove('outer')
1231n/a
1232n/a base_name = os.path.join(self.mkdtemp(), 'archive')
1233n/a filename = make_archive(base_name, format, root_dir, base_dir)
1234n/a
1235n/a # let's try to unpack it now
1236n/a tmpdir2 = self.mkdtemp()
1237n/a unpack_archive(filename, tmpdir2)
1238n/a self.assertEqual(rlistdir(tmpdir2), expected)
1239n/a
1240n/a # and again, this time with the format specified
1241n/a tmpdir3 = self.mkdtemp()
1242n/a unpack_archive(filename, tmpdir3, format=format)
1243n/a self.assertEqual(rlistdir(tmpdir3), expected)
1244n/a
1245n/a self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
1246n/a self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
1247n/a
1248n/a def test_unpack_archive_tar(self):
1249n/a self.check_unpack_archive('tar')
1250n/a
1251n/a @support.requires_zlib
1252n/a def test_unpack_archive_gztar(self):
1253n/a self.check_unpack_archive('gztar')
1254n/a
1255n/a @support.requires_bz2
1256n/a def test_unpack_archive_bztar(self):
1257n/a self.check_unpack_archive('bztar')
1258n/a
1259n/a @support.requires_lzma
1260n/a def test_unpack_archive_xztar(self):
1261n/a self.check_unpack_archive('xztar')
1262n/a
1263n/a @support.requires_zlib
1264n/a def test_unpack_archive_zip(self):
1265n/a self.check_unpack_archive('zip')
1266n/a
1267n/a def test_unpack_registry(self):
1268n/a
1269n/a formats = get_unpack_formats()
1270n/a
1271n/a def _boo(filename, extract_dir, extra):
1272n/a self.assertEqual(extra, 1)
1273n/a self.assertEqual(filename, 'stuff.boo')
1274n/a self.assertEqual(extract_dir, 'xx')
1275n/a
1276n/a register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1277n/a unpack_archive('stuff.boo', 'xx')
1278n/a
1279n/a # trying to register a .boo unpacker again
1280n/a self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1281n/a ['.boo'], _boo)
1282n/a
1283n/a # should work now
1284n/a unregister_unpack_format('Boo')
1285n/a register_unpack_format('Boo2', ['.boo'], _boo)
1286n/a self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1287n/a self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1288n/a
1289n/a # let's leave a clean state
1290n/a unregister_unpack_format('Boo2')
1291n/a self.assertEqual(get_unpack_formats(), formats)
1292n/a
1293n/a @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1294n/a "disk_usage not available on this platform")
1295n/a def test_disk_usage(self):
1296n/a usage = shutil.disk_usage(os.getcwd())
1297n/a self.assertGreater(usage.total, 0)
1298n/a self.assertGreater(usage.used, 0)
1299n/a self.assertGreaterEqual(usage.free, 0)
1300n/a self.assertGreaterEqual(usage.total, usage.used)
1301n/a self.assertGreater(usage.total, usage.free)
1302n/a
1303n/a @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1304n/a @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1305n/a def test_chown(self):
1306n/a
1307n/a # cleaned-up automatically by TestShutil.tearDown method
1308n/a dirname = self.mkdtemp()
1309n/a filename = tempfile.mktemp(dir=dirname)
1310n/a write_file(filename, 'testing chown function')
1311n/a
1312n/a with self.assertRaises(ValueError):
1313n/a shutil.chown(filename)
1314n/a
1315n/a with self.assertRaises(LookupError):
1316n/a shutil.chown(filename, user='non-existing username')
1317n/a
1318n/a with self.assertRaises(LookupError):
1319n/a shutil.chown(filename, group='non-existing groupname')
1320n/a
1321n/a with self.assertRaises(TypeError):
1322n/a shutil.chown(filename, b'spam')
1323n/a
1324n/a with self.assertRaises(TypeError):
1325n/a shutil.chown(filename, 3.14)
1326n/a
1327n/a uid = os.getuid()
1328n/a gid = os.getgid()
1329n/a
1330n/a def check_chown(path, uid=None, gid=None):
1331n/a s = os.stat(filename)
1332n/a if uid is not None:
1333n/a self.assertEqual(uid, s.st_uid)
1334n/a if gid is not None:
1335n/a self.assertEqual(gid, s.st_gid)
1336n/a
1337n/a shutil.chown(filename, uid, gid)
1338n/a check_chown(filename, uid, gid)
1339n/a shutil.chown(filename, uid)
1340n/a check_chown(filename, uid)
1341n/a shutil.chown(filename, user=uid)
1342n/a check_chown(filename, uid)
1343n/a shutil.chown(filename, group=gid)
1344n/a check_chown(filename, gid=gid)
1345n/a
1346n/a shutil.chown(dirname, uid, gid)
1347n/a check_chown(dirname, uid, gid)
1348n/a shutil.chown(dirname, uid)
1349n/a check_chown(dirname, uid)
1350n/a shutil.chown(dirname, user=uid)
1351n/a check_chown(dirname, uid)
1352n/a shutil.chown(dirname, group=gid)
1353n/a check_chown(dirname, gid=gid)
1354n/a
1355n/a user = pwd.getpwuid(uid)[0]
1356n/a group = grp.getgrgid(gid)[0]
1357n/a shutil.chown(filename, user, group)
1358n/a check_chown(filename, uid, gid)
1359n/a shutil.chown(dirname, user, group)
1360n/a check_chown(dirname, uid, gid)
1361n/a
1362n/a def test_copy_return_value(self):
1363n/a # copy and copy2 both return their destination path.
1364n/a for fn in (shutil.copy, shutil.copy2):
1365n/a src_dir = self.mkdtemp()
1366n/a dst_dir = self.mkdtemp()
1367n/a src = os.path.join(src_dir, 'foo')
1368n/a write_file(src, 'foo')
1369n/a rv = fn(src, dst_dir)
1370n/a self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1371n/a rv = fn(src, os.path.join(dst_dir, 'bar'))
1372n/a self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1373n/a
1374n/a def test_copyfile_return_value(self):
1375n/a # copytree returns its destination path.
1376n/a src_dir = self.mkdtemp()
1377n/a dst_dir = self.mkdtemp()
1378n/a dst_file = os.path.join(dst_dir, 'bar')
1379n/a src_file = os.path.join(src_dir, 'foo')
1380n/a write_file(src_file, 'foo')
1381n/a rv = shutil.copyfile(src_file, dst_file)
1382n/a self.assertTrue(os.path.exists(rv))
1383n/a self.assertEqual(read_file(src_file), read_file(dst_file))
1384n/a
1385n/a def test_copyfile_same_file(self):
1386n/a # copyfile() should raise SameFileError if the source and destination
1387n/a # are the same.
1388n/a src_dir = self.mkdtemp()
1389n/a src_file = os.path.join(src_dir, 'foo')
1390n/a write_file(src_file, 'foo')
1391n/a self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1392n/a # But Error should work too, to stay backward compatible.
1393n/a self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1394n/a
1395n/a def test_copytree_return_value(self):
1396n/a # copytree returns its destination path.
1397n/a src_dir = self.mkdtemp()
1398n/a dst_dir = src_dir + "dest"
1399n/a self.addCleanup(shutil.rmtree, dst_dir, True)
1400n/a src = os.path.join(src_dir, 'foo')
1401n/a write_file(src, 'foo')
1402n/a rv = shutil.copytree(src_dir, dst_dir)
1403n/a self.assertEqual(['foo'], os.listdir(rv))
1404n/a
1405n/a
1406n/aclass TestWhich(unittest.TestCase):
1407n/a
1408n/a def setUp(self):
1409n/a self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
1410n/a self.addCleanup(shutil.rmtree, self.temp_dir, True)
1411n/a # Give the temp_file an ".exe" suffix for all.
1412n/a # It's needed on Windows and not harmful on other platforms.
1413n/a self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1414n/a prefix="Tmp",
1415n/a suffix=".Exe")
1416n/a os.chmod(self.temp_file.name, stat.S_IXUSR)
1417n/a self.addCleanup(self.temp_file.close)
1418n/a self.dir, self.file = os.path.split(self.temp_file.name)
1419n/a
1420n/a def test_basic(self):
1421n/a # Given an EXE in a directory, it should be returned.
1422n/a rv = shutil.which(self.file, path=self.dir)
1423n/a self.assertEqual(rv, self.temp_file.name)
1424n/a
1425n/a def test_absolute_cmd(self):
1426n/a # When given the fully qualified path to an executable that exists,
1427n/a # it should be returned.
1428n/a rv = shutil.which(self.temp_file.name, path=self.temp_dir)
1429n/a self.assertEqual(rv, self.temp_file.name)
1430n/a
1431n/a def test_relative_cmd(self):
1432n/a # When given the relative path with a directory part to an executable
1433n/a # that exists, it should be returned.
1434n/a base_dir, tail_dir = os.path.split(self.dir)
1435n/a relpath = os.path.join(tail_dir, self.file)
1436n/a with support.change_cwd(path=base_dir):
1437n/a rv = shutil.which(relpath, path=self.temp_dir)
1438n/a self.assertEqual(rv, relpath)
1439n/a # But it shouldn't be searched in PATH directories (issue #16957).
1440n/a with support.change_cwd(path=self.dir):
1441n/a rv = shutil.which(relpath, path=base_dir)
1442n/a self.assertIsNone(rv)
1443n/a
1444n/a def test_cwd(self):
1445n/a # Issue #16957
1446n/a base_dir = os.path.dirname(self.dir)
1447n/a with support.change_cwd(path=self.dir):
1448n/a rv = shutil.which(self.file, path=base_dir)
1449n/a if sys.platform == "win32":
1450n/a # Windows: current directory implicitly on PATH
1451n/a self.assertEqual(rv, os.path.join(os.curdir, self.file))
1452n/a else:
1453n/a # Other platforms: shouldn't match in the current directory.
1454n/a self.assertIsNone(rv)
1455n/a
1456n/a @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1457n/a 'non-root user required')
1458n/a def test_non_matching_mode(self):
1459n/a # Set the file read-only and ask for writeable files.
1460n/a os.chmod(self.temp_file.name, stat.S_IREAD)
1461n/a if os.access(self.temp_file.name, os.W_OK):
1462n/a self.skipTest("can't set the file read-only")
1463n/a rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1464n/a self.assertIsNone(rv)
1465n/a
1466n/a def test_relative_path(self):
1467n/a base_dir, tail_dir = os.path.split(self.dir)
1468n/a with support.change_cwd(path=base_dir):
1469n/a rv = shutil.which(self.file, path=tail_dir)
1470n/a self.assertEqual(rv, os.path.join(tail_dir, self.file))
1471n/a
1472n/a def test_nonexistent_file(self):
1473n/a # Return None when no matching executable file is found on the path.
1474n/a rv = shutil.which("foo.exe", path=self.dir)
1475n/a self.assertIsNone(rv)
1476n/a
1477n/a @unittest.skipUnless(sys.platform == "win32",
1478n/a "pathext check is Windows-only")
1479n/a def test_pathext_checking(self):
1480n/a # Ask for the file without the ".exe" extension, then ensure that
1481n/a # it gets found properly with the extension.
1482n/a rv = shutil.which(self.file[:-4], path=self.dir)
1483n/a self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
1484n/a
1485n/a def test_environ_path(self):
1486n/a with support.EnvironmentVarGuard() as env:
1487n/a env['PATH'] = self.dir
1488n/a rv = shutil.which(self.file)
1489n/a self.assertEqual(rv, self.temp_file.name)
1490n/a
1491n/a def test_empty_path(self):
1492n/a base_dir = os.path.dirname(self.dir)
1493n/a with support.change_cwd(path=self.dir), \
1494n/a support.EnvironmentVarGuard() as env:
1495n/a env['PATH'] = self.dir
1496n/a rv = shutil.which(self.file, path='')
1497n/a self.assertIsNone(rv)
1498n/a
1499n/a def test_empty_path_no_PATH(self):
1500n/a with support.EnvironmentVarGuard() as env:
1501n/a env.pop('PATH', None)
1502n/a rv = shutil.which(self.file)
1503n/a self.assertIsNone(rv)
1504n/a
1505n/a
1506n/aclass TestMove(unittest.TestCase):
1507n/a
1508n/a def setUp(self):
1509n/a filename = "foo"
1510n/a self.src_dir = tempfile.mkdtemp()
1511n/a self.dst_dir = tempfile.mkdtemp()
1512n/a self.src_file = os.path.join(self.src_dir, filename)
1513n/a self.dst_file = os.path.join(self.dst_dir, filename)
1514n/a with open(self.src_file, "wb") as f:
1515n/a f.write(b"spam")
1516n/a
1517n/a def tearDown(self):
1518n/a for d in (self.src_dir, self.dst_dir):
1519n/a try:
1520n/a if d:
1521n/a shutil.rmtree(d)
1522n/a except:
1523n/a pass
1524n/a
1525n/a def _check_move_file(self, src, dst, real_dst):
1526n/a with open(src, "rb") as f:
1527n/a contents = f.read()
1528n/a shutil.move(src, dst)
1529n/a with open(real_dst, "rb") as f:
1530n/a self.assertEqual(contents, f.read())
1531n/a self.assertFalse(os.path.exists(src))
1532n/a
1533n/a def _check_move_dir(self, src, dst, real_dst):
1534n/a contents = sorted(os.listdir(src))
1535n/a shutil.move(src, dst)
1536n/a self.assertEqual(contents, sorted(os.listdir(real_dst)))
1537n/a self.assertFalse(os.path.exists(src))
1538n/a
1539n/a def test_move_file(self):
1540n/a # Move a file to another location on the same filesystem.
1541n/a self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1542n/a
1543n/a def test_move_file_to_dir(self):
1544n/a # Move a file inside an existing dir on the same filesystem.
1545n/a self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1546n/a
1547n/a @mock_rename
1548n/a def test_move_file_other_fs(self):
1549n/a # Move a file to an existing dir on another filesystem.
1550n/a self.test_move_file()
1551n/a
1552n/a @mock_rename
1553n/a def test_move_file_to_dir_other_fs(self):
1554n/a # Move a file to another location on another filesystem.
1555n/a self.test_move_file_to_dir()
1556n/a
1557n/a def test_move_dir(self):
1558n/a # Move a dir to another location on the same filesystem.
1559n/a dst_dir = tempfile.mktemp()
1560n/a try:
1561n/a self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1562n/a finally:
1563n/a try:
1564n/a shutil.rmtree(dst_dir)
1565n/a except:
1566n/a pass
1567n/a
1568n/a @mock_rename
1569n/a def test_move_dir_other_fs(self):
1570n/a # Move a dir to another location on another filesystem.
1571n/a self.test_move_dir()
1572n/a
1573n/a def test_move_dir_to_dir(self):
1574n/a # Move a dir inside an existing dir on the same filesystem.
1575n/a self._check_move_dir(self.src_dir, self.dst_dir,
1576n/a os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1577n/a
1578n/a @mock_rename
1579n/a def test_move_dir_to_dir_other_fs(self):
1580n/a # Move a dir inside an existing dir on another filesystem.
1581n/a self.test_move_dir_to_dir()
1582n/a
1583n/a def test_move_dir_sep_to_dir(self):
1584n/a self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1585n/a os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1586n/a
1587n/a @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1588n/a def test_move_dir_altsep_to_dir(self):
1589n/a self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1590n/a os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1591n/a
1592n/a def test_existing_file_inside_dest_dir(self):
1593n/a # A file with the same name inside the destination dir already exists.
1594n/a with open(self.dst_file, "wb"):
1595n/a pass
1596n/a self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1597n/a
1598n/a def test_dont_move_dir_in_itself(self):
1599n/a # Moving a dir inside itself raises an Error.
1600n/a dst = os.path.join(self.src_dir, "bar")
1601n/a self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1602n/a
1603n/a def test_destinsrc_false_negative(self):
1604n/a os.mkdir(TESTFN)
1605n/a try:
1606n/a for src, dst in [('srcdir', 'srcdir/dest')]:
1607n/a src = os.path.join(TESTFN, src)
1608n/a dst = os.path.join(TESTFN, dst)
1609n/a self.assertTrue(shutil._destinsrc(src, dst),
1610n/a msg='_destinsrc() wrongly concluded that '
1611n/a 'dst (%s) is not in src (%s)' % (dst, src))
1612n/a finally:
1613n/a shutil.rmtree(TESTFN, ignore_errors=True)
1614n/a
1615n/a def test_destinsrc_false_positive(self):
1616n/a os.mkdir(TESTFN)
1617n/a try:
1618n/a for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1619n/a src = os.path.join(TESTFN, src)
1620n/a dst = os.path.join(TESTFN, dst)
1621n/a self.assertFalse(shutil._destinsrc(src, dst),
1622n/a msg='_destinsrc() wrongly concluded that '
1623n/a 'dst (%s) is in src (%s)' % (dst, src))
1624n/a finally:
1625n/a shutil.rmtree(TESTFN, ignore_errors=True)
1626n/a
1627n/a @support.skip_unless_symlink
1628n/a @mock_rename
1629n/a def test_move_file_symlink(self):
1630n/a dst = os.path.join(self.src_dir, 'bar')
1631n/a os.symlink(self.src_file, dst)
1632n/a shutil.move(dst, self.dst_file)
1633n/a self.assertTrue(os.path.islink(self.dst_file))
1634n/a self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1635n/a
1636n/a @support.skip_unless_symlink
1637n/a @mock_rename
1638n/a def test_move_file_symlink_to_dir(self):
1639n/a filename = "bar"
1640n/a dst = os.path.join(self.src_dir, filename)
1641n/a os.symlink(self.src_file, dst)
1642n/a shutil.move(dst, self.dst_dir)
1643n/a final_link = os.path.join(self.dst_dir, filename)
1644n/a self.assertTrue(os.path.islink(final_link))
1645n/a self.assertTrue(os.path.samefile(self.src_file, final_link))
1646n/a
1647n/a @support.skip_unless_symlink
1648n/a @mock_rename
1649n/a def test_move_dangling_symlink(self):
1650n/a src = os.path.join(self.src_dir, 'baz')
1651n/a dst = os.path.join(self.src_dir, 'bar')
1652n/a os.symlink(src, dst)
1653n/a dst_link = os.path.join(self.dst_dir, 'quux')
1654n/a shutil.move(dst, dst_link)
1655n/a self.assertTrue(os.path.islink(dst_link))
1656n/a # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1657n/a if os.name == 'nt':
1658n/a self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1659n/a else:
1660n/a self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
1661n/a
1662n/a @support.skip_unless_symlink
1663n/a @mock_rename
1664n/a def test_move_dir_symlink(self):
1665n/a src = os.path.join(self.src_dir, 'baz')
1666n/a dst = os.path.join(self.src_dir, 'bar')
1667n/a os.mkdir(src)
1668n/a os.symlink(src, dst)
1669n/a dst_link = os.path.join(self.dst_dir, 'quux')
1670n/a shutil.move(dst, dst_link)
1671n/a self.assertTrue(os.path.islink(dst_link))
1672n/a self.assertTrue(os.path.samefile(src, dst_link))
1673n/a
1674n/a def test_move_return_value(self):
1675n/a rv = shutil.move(self.src_file, self.dst_dir)
1676n/a self.assertEqual(rv,
1677n/a os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1678n/a
1679n/a def test_move_as_rename_return_value(self):
1680n/a rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1681n/a self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1682n/a
1683n/a @mock_rename
1684n/a def test_move_file_special_function(self):
1685n/a moved = []
1686n/a def _copy(src, dst):
1687n/a moved.append((src, dst))
1688n/a shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1689n/a self.assertEqual(len(moved), 1)
1690n/a
1691n/a @mock_rename
1692n/a def test_move_dir_special_function(self):
1693n/a moved = []
1694n/a def _copy(src, dst):
1695n/a moved.append((src, dst))
1696n/a support.create_empty_file(os.path.join(self.src_dir, 'child'))
1697n/a support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1698n/a shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1699n/a self.assertEqual(len(moved), 3)
1700n/a
1701n/a
1702n/aclass TestCopyFile(unittest.TestCase):
1703n/a
1704n/a _delete = False
1705n/a
1706n/a class Faux(object):
1707n/a _entered = False
1708n/a _exited_with = None
1709n/a _raised = False
1710n/a def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1711n/a self._raise_in_exit = raise_in_exit
1712n/a self._suppress_at_exit = suppress_at_exit
1713n/a def read(self, *args):
1714n/a return ''
1715n/a def __enter__(self):
1716n/a self._entered = True
1717n/a def __exit__(self, exc_type, exc_val, exc_tb):
1718n/a self._exited_with = exc_type, exc_val, exc_tb
1719n/a if self._raise_in_exit:
1720n/a self._raised = True
1721n/a raise OSError("Cannot close")
1722n/a return self._suppress_at_exit
1723n/a
1724n/a def tearDown(self):
1725n/a if self._delete:
1726n/a del shutil.open
1727n/a
1728n/a def _set_shutil_open(self, func):
1729n/a shutil.open = func
1730n/a self._delete = True
1731n/a
1732n/a def test_w_source_open_fails(self):
1733n/a def _open(filename, mode='r'):
1734n/a if filename == 'srcfile':
1735n/a raise OSError('Cannot open "srcfile"')
1736n/a assert 0 # shouldn't reach here.
1737n/a
1738n/a self._set_shutil_open(_open)
1739n/a
1740n/a self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
1741n/a
1742n/a def test_w_dest_open_fails(self):
1743n/a
1744n/a srcfile = self.Faux()
1745n/a
1746n/a def _open(filename, mode='r'):
1747n/a if filename == 'srcfile':
1748n/a return srcfile
1749n/a if filename == 'destfile':
1750n/a raise OSError('Cannot open "destfile"')
1751n/a assert 0 # shouldn't reach here.
1752n/a
1753n/a self._set_shutil_open(_open)
1754n/a
1755n/a shutil.copyfile('srcfile', 'destfile')
1756n/a self.assertTrue(srcfile._entered)
1757n/a self.assertTrue(srcfile._exited_with[0] is OSError)
1758n/a self.assertEqual(srcfile._exited_with[1].args,
1759n/a ('Cannot open "destfile"',))
1760n/a
1761n/a def test_w_dest_close_fails(self):
1762n/a
1763n/a srcfile = self.Faux()
1764n/a destfile = self.Faux(True)
1765n/a
1766n/a def _open(filename, mode='r'):
1767n/a if filename == 'srcfile':
1768n/a return srcfile
1769n/a if filename == 'destfile':
1770n/a return destfile
1771n/a assert 0 # shouldn't reach here.
1772n/a
1773n/a self._set_shutil_open(_open)
1774n/a
1775n/a shutil.copyfile('srcfile', 'destfile')
1776n/a self.assertTrue(srcfile._entered)
1777n/a self.assertTrue(destfile._entered)
1778n/a self.assertTrue(destfile._raised)
1779n/a self.assertTrue(srcfile._exited_with[0] is OSError)
1780n/a self.assertEqual(srcfile._exited_with[1].args,
1781n/a ('Cannot close',))
1782n/a
1783n/a def test_w_source_close_fails(self):
1784n/a
1785n/a srcfile = self.Faux(True)
1786n/a destfile = self.Faux()
1787n/a
1788n/a def _open(filename, mode='r'):
1789n/a if filename == 'srcfile':
1790n/a return srcfile
1791n/a if filename == 'destfile':
1792n/a return destfile
1793n/a assert 0 # shouldn't reach here.
1794n/a
1795n/a self._set_shutil_open(_open)
1796n/a
1797n/a self.assertRaises(OSError,
1798n/a shutil.copyfile, 'srcfile', 'destfile')
1799n/a self.assertTrue(srcfile._entered)
1800n/a self.assertTrue(destfile._entered)
1801n/a self.assertFalse(destfile._raised)
1802n/a self.assertTrue(srcfile._exited_with[0] is None)
1803n/a self.assertTrue(srcfile._raised)
1804n/a
1805n/a def test_move_dir_caseinsensitive(self):
1806n/a # Renames a folder to the same name
1807n/a # but a different case.
1808n/a
1809n/a self.src_dir = tempfile.mkdtemp()
1810n/a self.addCleanup(shutil.rmtree, self.src_dir, True)
1811n/a dst_dir = os.path.join(
1812n/a os.path.dirname(self.src_dir),
1813n/a os.path.basename(self.src_dir).upper())
1814n/a self.assertNotEqual(self.src_dir, dst_dir)
1815n/a
1816n/a try:
1817n/a shutil.move(self.src_dir, dst_dir)
1818n/a self.assertTrue(os.path.isdir(dst_dir))
1819n/a finally:
1820n/a os.rmdir(dst_dir)
1821n/a
1822n/aclass TermsizeTests(unittest.TestCase):
1823n/a def test_does_not_crash(self):
1824n/a """Check if get_terminal_size() returns a meaningful value.
1825n/a
1826n/a There's no easy portable way to actually check the size of the
1827n/a terminal, so let's check if it returns something sensible instead.
1828n/a """
1829n/a size = shutil.get_terminal_size()
1830n/a self.assertGreaterEqual(size.columns, 0)
1831n/a self.assertGreaterEqual(size.lines, 0)
1832n/a
1833n/a def test_os_environ_first(self):
1834n/a "Check if environment variables have precedence"
1835n/a
1836n/a with support.EnvironmentVarGuard() as env:
1837n/a env['COLUMNS'] = '777'
1838n/a del env['LINES']
1839n/a size = shutil.get_terminal_size()
1840n/a self.assertEqual(size.columns, 777)
1841n/a
1842n/a with support.EnvironmentVarGuard() as env:
1843n/a del env['COLUMNS']
1844n/a env['LINES'] = '888'
1845n/a size = shutil.get_terminal_size()
1846n/a self.assertEqual(size.lines, 888)
1847n/a
1848n/a def test_bad_environ(self):
1849n/a with support.EnvironmentVarGuard() as env:
1850n/a env['COLUMNS'] = 'xxx'
1851n/a env['LINES'] = 'yyy'
1852n/a size = shutil.get_terminal_size()
1853n/a self.assertGreaterEqual(size.columns, 0)
1854n/a self.assertGreaterEqual(size.lines, 0)
1855n/a
1856n/a @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
1857n/a @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
1858n/a 'need os.get_terminal_size()')
1859n/a def test_stty_match(self):
1860n/a """Check if stty returns the same results ignoring env
1861n/a
1862n/a This test will fail if stdin and stdout are connected to
1863n/a different terminals with different sizes. Nevertheless, such
1864n/a situations should be pretty rare.
1865n/a """
1866n/a try:
1867n/a size = subprocess.check_output(['stty', 'size']).decode().split()
1868n/a except (FileNotFoundError, PermissionError,
1869n/a subprocess.CalledProcessError):
1870n/a self.skipTest("stty invocation failed")
1871n/a expected = (int(size[1]), int(size[0])) # reversed order
1872n/a
1873n/a with support.EnvironmentVarGuard() as env:
1874n/a del env['LINES']
1875n/a del env['COLUMNS']
1876n/a actual = shutil.get_terminal_size()
1877n/a
1878n/a self.assertEqual(expected, actual)
1879n/a
1880n/a def test_fallback(self):
1881n/a with support.EnvironmentVarGuard() as env:
1882n/a del env['LINES']
1883n/a del env['COLUMNS']
1884n/a
1885n/a # sys.__stdout__ has no fileno()
1886n/a with support.swap_attr(sys, '__stdout__', None):
1887n/a size = shutil.get_terminal_size(fallback=(10, 20))
1888n/a self.assertEqual(size.columns, 10)
1889n/a self.assertEqual(size.lines, 20)
1890n/a
1891n/a # sys.__stdout__ is not a terminal on Unix
1892n/a # or fileno() not in (0, 1, 2) on Windows
1893n/a with open(os.devnull, 'w') as f, \
1894n/a support.swap_attr(sys, '__stdout__', f):
1895n/a size = shutil.get_terminal_size(fallback=(30, 40))
1896n/a self.assertEqual(size.columns, 30)
1897n/a self.assertEqual(size.lines, 40)
1898n/a
1899n/a
1900n/aclass PublicAPITests(unittest.TestCase):
1901n/a """Ensures that the correct values are exposed in the public API."""
1902n/a
1903n/a def test_module_all_attribute(self):
1904n/a self.assertTrue(hasattr(shutil, '__all__'))
1905n/a target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
1906n/a 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
1907n/a 'SpecialFileError', 'ExecError', 'make_archive',
1908n/a 'get_archive_formats', 'register_archive_format',
1909n/a 'unregister_archive_format', 'get_unpack_formats',
1910n/a 'register_unpack_format', 'unregister_unpack_format',
1911n/a 'unpack_archive', 'ignore_patterns', 'chown', 'which',
1912n/a 'get_terminal_size', 'SameFileError']
1913n/a if hasattr(os, 'statvfs') or os.name == 'nt':
1914n/a target_api.append('disk_usage')
1915n/a self.assertEqual(set(shutil.__all__), set(target_api))
1916n/a
1917n/a
1918n/aif __name__ == '__main__':
1919n/a unittest.main()