ยปCore Development>Code coverage>Lib/shutil.py

Python code coverage for Lib/shutil.py

#countcontent
1n/a"""Utility functions for copying and archiving files and directory trees.
2n/a
3n/aXXX The functions here don't copy the resource fork or other metadata on Mac.
4n/a
5n/a"""
6n/a
7n/aimport os
8n/aimport sys
9n/aimport stat
10n/aimport fnmatch
11n/aimport collections
12n/aimport errno
13n/a
14n/atry:
15n/a import zlib
16n/a del zlib
17n/a _ZLIB_SUPPORTED = True
18n/aexcept ImportError:
19n/a _ZLIB_SUPPORTED = False
20n/a
21n/atry:
22n/a import bz2
23n/a del bz2
24n/a _BZ2_SUPPORTED = True
25n/aexcept ImportError:
26n/a _BZ2_SUPPORTED = False
27n/a
28n/atry:
29n/a import lzma
30n/a del lzma
31n/a _LZMA_SUPPORTED = True
32n/aexcept ImportError:
33n/a _LZMA_SUPPORTED = False
34n/a
35n/atry:
36n/a from pwd import getpwnam
37n/aexcept ImportError:
38n/a getpwnam = None
39n/a
40n/atry:
41n/a from grp import getgrnam
42n/aexcept ImportError:
43n/a getgrnam = None
44n/a
45n/a__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
46n/a "copytree", "move", "rmtree", "Error", "SpecialFileError",
47n/a "ExecError", "make_archive", "get_archive_formats",
48n/a "register_archive_format", "unregister_archive_format",
49n/a "get_unpack_formats", "register_unpack_format",
50n/a "unregister_unpack_format", "unpack_archive",
51n/a "ignore_patterns", "chown", "which", "get_terminal_size",
52n/a "SameFileError"]
53n/a # disk_usage is added later, if available on the platform
54n/a
55n/aclass Error(OSError):
56n/a pass
57n/a
58n/aclass SameFileError(Error):
59n/a """Raised when source and destination are the same file."""
60n/a
61n/aclass SpecialFileError(OSError):
62n/a """Raised when trying to do a kind of operation (e.g. copying) which is
63n/a not supported on a special file (e.g. a named pipe)"""
64n/a
65n/aclass ExecError(OSError):
66n/a """Raised when a command could not be executed"""
67n/a
68n/aclass ReadError(OSError):
69n/a """Raised when an archive cannot be read"""
70n/a
71n/aclass RegistryError(Exception):
72n/a """Raised when a registry operation with the archiving
73n/a and unpacking registries fails"""
74n/a
75n/a
76n/adef copyfileobj(fsrc, fdst, length=16*1024):
77n/a """copy data from file-like object fsrc to file-like object fdst"""
78n/a while 1:
79n/a buf = fsrc.read(length)
80n/a if not buf:
81n/a break
82n/a fdst.write(buf)
83n/a
84n/adef _samefile(src, dst):
85n/a # Macintosh, Unix.
86n/a if hasattr(os.path, 'samefile'):
87n/a try:
88n/a return os.path.samefile(src, dst)
89n/a except OSError:
90n/a return False
91n/a
92n/a # All other platforms: check for same pathname.
93n/a return (os.path.normcase(os.path.abspath(src)) ==
94n/a os.path.normcase(os.path.abspath(dst)))
95n/a
96n/adef copyfile(src, dst, *, follow_symlinks=True):
97n/a """Copy data from src to dst.
98n/a
99n/a If follow_symlinks is not set and src is a symbolic link, a new
100n/a symlink will be created instead of copying the file it points to.
101n/a
102n/a """
103n/a if _samefile(src, dst):
104n/a raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
105n/a
106n/a for fn in [src, dst]:
107n/a try:
108n/a st = os.stat(fn)
109n/a except OSError:
110n/a # File most likely does not exist
111n/a pass
112n/a else:
113n/a # XXX What about other special files? (sockets, devices...)
114n/a if stat.S_ISFIFO(st.st_mode):
115n/a raise SpecialFileError("`%s` is a named pipe" % fn)
116n/a
117n/a if not follow_symlinks and os.path.islink(src):
118n/a os.symlink(os.readlink(src), dst)
119n/a else:
120n/a with open(src, 'rb') as fsrc:
121n/a with open(dst, 'wb') as fdst:
122n/a copyfileobj(fsrc, fdst)
123n/a return dst
124n/a
125n/adef copymode(src, dst, *, follow_symlinks=True):
126n/a """Copy mode bits from src to dst.
127n/a
128n/a If follow_symlinks is not set, symlinks aren't followed if and only
129n/a if both `src` and `dst` are symlinks. If `lchmod` isn't available
130n/a (e.g. Linux) this method does nothing.
131n/a
132n/a """
133n/a if not follow_symlinks and os.path.islink(src) and os.path.islink(dst):
134n/a if hasattr(os, 'lchmod'):
135n/a stat_func, chmod_func = os.lstat, os.lchmod
136n/a else:
137n/a return
138n/a elif hasattr(os, 'chmod'):
139n/a stat_func, chmod_func = os.stat, os.chmod
140n/a else:
141n/a return
142n/a
143n/a st = stat_func(src)
144n/a chmod_func(dst, stat.S_IMODE(st.st_mode))
145n/a
146n/aif hasattr(os, 'listxattr'):
147n/a def _copyxattr(src, dst, *, follow_symlinks=True):
148n/a """Copy extended filesystem attributes from `src` to `dst`.
149n/a
150n/a Overwrite existing attributes.
151n/a
152n/a If `follow_symlinks` is false, symlinks won't be followed.
153n/a
154n/a """
155n/a
156n/a try:
157n/a names = os.listxattr(src, follow_symlinks=follow_symlinks)
158n/a except OSError as e:
159n/a if e.errno not in (errno.ENOTSUP, errno.ENODATA):
160n/a raise
161n/a return
162n/a for name in names:
163n/a try:
164n/a value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
165n/a os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
166n/a except OSError as e:
167n/a if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA):
168n/a raise
169n/aelse:
170n/a def _copyxattr(*args, **kwargs):
171n/a pass
172n/a
173n/adef copystat(src, dst, *, follow_symlinks=True):
174n/a """Copy all stat info (mode bits, atime, mtime, flags) from src to dst.
175n/a
176n/a If the optional flag `follow_symlinks` is not set, symlinks aren't followed if and
177n/a only if both `src` and `dst` are symlinks.
178n/a
179n/a """
180n/a def _nop(*args, ns=None, follow_symlinks=None):
181n/a pass
182n/a
183n/a # follow symlinks (aka don't not follow symlinks)
184n/a follow = follow_symlinks or not (os.path.islink(src) and os.path.islink(dst))
185n/a if follow:
186n/a # use the real function if it exists
187n/a def lookup(name):
188n/a return getattr(os, name, _nop)
189n/a else:
190n/a # use the real function only if it exists
191n/a # *and* it supports follow_symlinks
192n/a def lookup(name):
193n/a fn = getattr(os, name, _nop)
194n/a if fn in os.supports_follow_symlinks:
195n/a return fn
196n/a return _nop
197n/a
198n/a st = lookup("stat")(src, follow_symlinks=follow)
199n/a mode = stat.S_IMODE(st.st_mode)
200n/a lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
201n/a follow_symlinks=follow)
202n/a try:
203n/a lookup("chmod")(dst, mode, follow_symlinks=follow)
204n/a except NotImplementedError:
205n/a # if we got a NotImplementedError, it's because
206n/a # * follow_symlinks=False,
207n/a # * lchown() is unavailable, and
208n/a # * either
209n/a # * fchownat() is unavailable or
210n/a # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
211n/a # (it returned ENOSUP.)
212n/a # therefore we're out of options--we simply cannot chown the
213n/a # symlink. give up, suppress the error.
214n/a # (which is what shutil always did in this circumstance.)
215n/a pass
216n/a if hasattr(st, 'st_flags'):
217n/a try:
218n/a lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
219n/a except OSError as why:
220n/a for err in 'EOPNOTSUPP', 'ENOTSUP':
221n/a if hasattr(errno, err) and why.errno == getattr(errno, err):
222n/a break
223n/a else:
224n/a raise
225n/a _copyxattr(src, dst, follow_symlinks=follow)
226n/a
227n/adef copy(src, dst, *, follow_symlinks=True):
228n/a """Copy data and mode bits ("cp src dst"). Return the file's destination.
229n/a
230n/a The destination may be a directory.
231n/a
232n/a If follow_symlinks is false, symlinks won't be followed. This
233n/a resembles GNU's "cp -P src dst".
234n/a
235n/a If source and destination are the same file, a SameFileError will be
236n/a raised.
237n/a
238n/a """
239n/a if os.path.isdir(dst):
240n/a dst = os.path.join(dst, os.path.basename(src))
241n/a copyfile(src, dst, follow_symlinks=follow_symlinks)
242n/a copymode(src, dst, follow_symlinks=follow_symlinks)
243n/a return dst
244n/a
245n/adef copy2(src, dst, *, follow_symlinks=True):
246n/a """Copy data and all stat info ("cp -p src dst"). Return the file's
247n/a destination."
248n/a
249n/a The destination may be a directory.
250n/a
251n/a If follow_symlinks is false, symlinks won't be followed. This
252n/a resembles GNU's "cp -P src dst".
253n/a
254n/a """
255n/a if os.path.isdir(dst):
256n/a dst = os.path.join(dst, os.path.basename(src))
257n/a copyfile(src, dst, follow_symlinks=follow_symlinks)
258n/a copystat(src, dst, follow_symlinks=follow_symlinks)
259n/a return dst
260n/a
261n/adef ignore_patterns(*patterns):
262n/a """Function that can be used as copytree() ignore parameter.
263n/a
264n/a Patterns is a sequence of glob-style patterns
265n/a that are used to exclude files"""
266n/a def _ignore_patterns(path, names):
267n/a ignored_names = []
268n/a for pattern in patterns:
269n/a ignored_names.extend(fnmatch.filter(names, pattern))
270n/a return set(ignored_names)
271n/a return _ignore_patterns
272n/a
273n/adef copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
274n/a ignore_dangling_symlinks=False):
275n/a """Recursively copy a directory tree.
276n/a
277n/a The destination directory must not already exist.
278n/a If exception(s) occur, an Error is raised with a list of reasons.
279n/a
280n/a If the optional symlinks flag is true, symbolic links in the
281n/a source tree result in symbolic links in the destination tree; if
282n/a it is false, the contents of the files pointed to by symbolic
283n/a links are copied. If the file pointed by the symlink doesn't
284n/a exist, an exception will be added in the list of errors raised in
285n/a an Error exception at the end of the copy process.
286n/a
287n/a You can set the optional ignore_dangling_symlinks flag to true if you
288n/a want to silence this exception. Notice that this has no effect on
289n/a platforms that don't support os.symlink.
290n/a
291n/a The optional ignore argument is a callable. If given, it
292n/a is called with the `src` parameter, which is the directory
293n/a being visited by copytree(), and `names` which is the list of
294n/a `src` contents, as returned by os.listdir():
295n/a
296n/a callable(src, names) -> ignored_names
297n/a
298n/a Since copytree() is called recursively, the callable will be
299n/a called once for each directory that is copied. It returns a
300n/a list of names relative to the `src` directory that should
301n/a not be copied.
302n/a
303n/a The optional copy_function argument is a callable that will be used
304n/a to copy each file. It will be called with the source path and the
305n/a destination path as arguments. By default, copy2() is used, but any
306n/a function that supports the same signature (like copy()) can be used.
307n/a
308n/a """
309n/a names = os.listdir(src)
310n/a if ignore is not None:
311n/a ignored_names = ignore(src, names)
312n/a else:
313n/a ignored_names = set()
314n/a
315n/a os.makedirs(dst)
316n/a errors = []
317n/a for name in names:
318n/a if name in ignored_names:
319n/a continue
320n/a srcname = os.path.join(src, name)
321n/a dstname = os.path.join(dst, name)
322n/a try:
323n/a if os.path.islink(srcname):
324n/a linkto = os.readlink(srcname)
325n/a if symlinks:
326n/a # We can't just leave it to `copy_function` because legacy
327n/a # code with a custom `copy_function` may rely on copytree
328n/a # doing the right thing.
329n/a os.symlink(linkto, dstname)
330n/a copystat(srcname, dstname, follow_symlinks=not symlinks)
331n/a else:
332n/a # ignore dangling symlink if the flag is on
333n/a if not os.path.exists(linkto) and ignore_dangling_symlinks:
334n/a continue
335n/a # otherwise let the copy occurs. copy2 will raise an error
336n/a if os.path.isdir(srcname):
337n/a copytree(srcname, dstname, symlinks, ignore,
338n/a copy_function)
339n/a else:
340n/a copy_function(srcname, dstname)
341n/a elif os.path.isdir(srcname):
342n/a copytree(srcname, dstname, symlinks, ignore, copy_function)
343n/a else:
344n/a # Will raise a SpecialFileError for unsupported file types
345n/a copy_function(srcname, dstname)
346n/a # catch the Error from the recursive copytree so that we can
347n/a # continue with other files
348n/a except Error as err:
349n/a errors.extend(err.args[0])
350n/a except OSError as why:
351n/a errors.append((srcname, dstname, str(why)))
352n/a try:
353n/a copystat(src, dst)
354n/a except OSError as why:
355n/a # Copying file access times may fail on Windows
356n/a if getattr(why, 'winerror', None) is None:
357n/a errors.append((src, dst, str(why)))
358n/a if errors:
359n/a raise Error(errors)
360n/a return dst
361n/a
362n/a# version vulnerable to race conditions
363n/adef _rmtree_unsafe(path, onerror):
364n/a try:
365n/a if os.path.islink(path):
366n/a # symlinks to directories are forbidden, see bug #1669
367n/a raise OSError("Cannot call rmtree on a symbolic link")
368n/a except OSError:
369n/a onerror(os.path.islink, path, sys.exc_info())
370n/a # can't continue even if onerror hook returns
371n/a return
372n/a names = []
373n/a try:
374n/a names = os.listdir(path)
375n/a except OSError:
376n/a onerror(os.listdir, path, sys.exc_info())
377n/a for name in names:
378n/a fullname = os.path.join(path, name)
379n/a try:
380n/a mode = os.lstat(fullname).st_mode
381n/a except OSError:
382n/a mode = 0
383n/a if stat.S_ISDIR(mode):
384n/a _rmtree_unsafe(fullname, onerror)
385n/a else:
386n/a try:
387n/a os.unlink(fullname)
388n/a except OSError:
389n/a onerror(os.unlink, fullname, sys.exc_info())
390n/a try:
391n/a os.rmdir(path)
392n/a except OSError:
393n/a onerror(os.rmdir, path, sys.exc_info())
394n/a
395n/a# Version using fd-based APIs to protect against races
396n/adef _rmtree_safe_fd(topfd, path, onerror):
397n/a names = []
398n/a try:
399n/a names = os.listdir(topfd)
400n/a except OSError as err:
401n/a err.filename = path
402n/a onerror(os.listdir, path, sys.exc_info())
403n/a for name in names:
404n/a fullname = os.path.join(path, name)
405n/a try:
406n/a orig_st = os.stat(name, dir_fd=topfd, follow_symlinks=False)
407n/a mode = orig_st.st_mode
408n/a except OSError:
409n/a mode = 0
410n/a if stat.S_ISDIR(mode):
411n/a try:
412n/a dirfd = os.open(name, os.O_RDONLY, dir_fd=topfd)
413n/a except OSError:
414n/a onerror(os.open, fullname, sys.exc_info())
415n/a else:
416n/a try:
417n/a if os.path.samestat(orig_st, os.fstat(dirfd)):
418n/a _rmtree_safe_fd(dirfd, fullname, onerror)
419n/a try:
420n/a os.rmdir(name, dir_fd=topfd)
421n/a except OSError:
422n/a onerror(os.rmdir, fullname, sys.exc_info())
423n/a else:
424n/a try:
425n/a # This can only happen if someone replaces
426n/a # a directory with a symlink after the call to
427n/a # stat.S_ISDIR above.
428n/a raise OSError("Cannot call rmtree on a symbolic "
429n/a "link")
430n/a except OSError:
431n/a onerror(os.path.islink, fullname, sys.exc_info())
432n/a finally:
433n/a os.close(dirfd)
434n/a else:
435n/a try:
436n/a os.unlink(name, dir_fd=topfd)
437n/a except OSError:
438n/a onerror(os.unlink, fullname, sys.exc_info())
439n/a
440n/a_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
441n/a os.supports_dir_fd and
442n/a os.listdir in os.supports_fd and
443n/a os.stat in os.supports_follow_symlinks)
444n/a
445n/adef rmtree(path, ignore_errors=False, onerror=None):
446n/a """Recursively delete a directory tree.
447n/a
448n/a If ignore_errors is set, errors are ignored; otherwise, if onerror
449n/a is set, it is called to handle the error with arguments (func,
450n/a path, exc_info) where func is platform and implementation dependent;
451n/a path is the argument to that function that caused it to fail; and
452n/a exc_info is a tuple returned by sys.exc_info(). If ignore_errors
453n/a is false and onerror is None, an exception is raised.
454n/a
455n/a """
456n/a if ignore_errors:
457n/a def onerror(*args):
458n/a pass
459n/a elif onerror is None:
460n/a def onerror(*args):
461n/a raise
462n/a if _use_fd_functions:
463n/a # While the unsafe rmtree works fine on bytes, the fd based does not.
464n/a if isinstance(path, bytes):
465n/a path = os.fsdecode(path)
466n/a # Note: To guard against symlink races, we use the standard
467n/a # lstat()/open()/fstat() trick.
468n/a try:
469n/a orig_st = os.lstat(path)
470n/a except Exception:
471n/a onerror(os.lstat, path, sys.exc_info())
472n/a return
473n/a try:
474n/a fd = os.open(path, os.O_RDONLY)
475n/a except Exception:
476n/a onerror(os.lstat, path, sys.exc_info())
477n/a return
478n/a try:
479n/a if os.path.samestat(orig_st, os.fstat(fd)):
480n/a _rmtree_safe_fd(fd, path, onerror)
481n/a try:
482n/a os.rmdir(path)
483n/a except OSError:
484n/a onerror(os.rmdir, path, sys.exc_info())
485n/a else:
486n/a try:
487n/a # symlinks to directories are forbidden, see bug #1669
488n/a raise OSError("Cannot call rmtree on a symbolic link")
489n/a except OSError:
490n/a onerror(os.path.islink, path, sys.exc_info())
491n/a finally:
492n/a os.close(fd)
493n/a else:
494n/a return _rmtree_unsafe(path, onerror)
495n/a
496n/a# Allow introspection of whether or not the hardening against symlink
497n/a# attacks is supported on the current platform
498n/armtree.avoids_symlink_attacks = _use_fd_functions
499n/a
500n/adef _basename(path):
501n/a # A basename() variant which first strips the trailing slash, if present.
502n/a # Thus we always get the last component of the path, even for directories.
503n/a sep = os.path.sep + (os.path.altsep or '')
504n/a return os.path.basename(path.rstrip(sep))
505n/a
506n/adef move(src, dst, copy_function=copy2):
507n/a """Recursively move a file or directory to another location. This is
508n/a similar to the Unix "mv" command. Return the file or directory's
509n/a destination.
510n/a
511n/a If the destination is a directory or a symlink to a directory, the source
512n/a is moved inside the directory. The destination path must not already
513n/a exist.
514n/a
515n/a If the destination already exists but is not a directory, it may be
516n/a overwritten depending on os.rename() semantics.
517n/a
518n/a If the destination is on our current filesystem, then rename() is used.
519n/a Otherwise, src is copied to the destination and then removed. Symlinks are
520n/a recreated under the new name if os.rename() fails because of cross
521n/a filesystem renames.
522n/a
523n/a The optional `copy_function` argument is a callable that will be used
524n/a to copy the source or it will be delegated to `copytree`.
525n/a By default, copy2() is used, but any function that supports the same
526n/a signature (like copy()) can be used.
527n/a
528n/a A lot more could be done here... A look at a mv.c shows a lot of
529n/a the issues this implementation glosses over.
530n/a
531n/a """
532n/a real_dst = dst
533n/a if os.path.isdir(dst):
534n/a if _samefile(src, dst):
535n/a # We might be on a case insensitive filesystem,
536n/a # perform the rename anyway.
537n/a os.rename(src, dst)
538n/a return
539n/a
540n/a real_dst = os.path.join(dst, _basename(src))
541n/a if os.path.exists(real_dst):
542n/a raise Error("Destination path '%s' already exists" % real_dst)
543n/a try:
544n/a os.rename(src, real_dst)
545n/a except OSError:
546n/a if os.path.islink(src):
547n/a linkto = os.readlink(src)
548n/a os.symlink(linkto, real_dst)
549n/a os.unlink(src)
550n/a elif os.path.isdir(src):
551n/a if _destinsrc(src, dst):
552n/a raise Error("Cannot move a directory '%s' into itself"
553n/a " '%s'." % (src, dst))
554n/a copytree(src, real_dst, copy_function=copy_function,
555n/a symlinks=True)
556n/a rmtree(src)
557n/a else:
558n/a copy_function(src, real_dst)
559n/a os.unlink(src)
560n/a return real_dst
561n/a
562n/adef _destinsrc(src, dst):
563n/a src = os.path.abspath(src)
564n/a dst = os.path.abspath(dst)
565n/a if not src.endswith(os.path.sep):
566n/a src += os.path.sep
567n/a if not dst.endswith(os.path.sep):
568n/a dst += os.path.sep
569n/a return dst.startswith(src)
570n/a
571n/adef _get_gid(name):
572n/a """Returns a gid, given a group name."""
573n/a if getgrnam is None or name is None:
574n/a return None
575n/a try:
576n/a result = getgrnam(name)
577n/a except KeyError:
578n/a result = None
579n/a if result is not None:
580n/a return result[2]
581n/a return None
582n/a
583n/adef _get_uid(name):
584n/a """Returns an uid, given a user name."""
585n/a if getpwnam is None or name is None:
586n/a return None
587n/a try:
588n/a result = getpwnam(name)
589n/a except KeyError:
590n/a result = None
591n/a if result is not None:
592n/a return result[2]
593n/a return None
594n/a
595n/adef _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
596n/a owner=None, group=None, logger=None):
597n/a """Create a (possibly compressed) tar file from all the files under
598n/a 'base_dir'.
599n/a
600n/a 'compress' must be "gzip" (the default), "bzip2", "xz", or None.
601n/a
602n/a 'owner' and 'group' can be used to define an owner and a group for the
603n/a archive that is being built. If not provided, the current owner and group
604n/a will be used.
605n/a
606n/a The output tar file will be named 'base_name' + ".tar", possibly plus
607n/a the appropriate compression extension (".gz", ".bz2", or ".xz").
608n/a
609n/a Returns the output filename.
610n/a """
611n/a if compress is None:
612n/a tar_compression = ''
613n/a elif _ZLIB_SUPPORTED and compress == 'gzip':
614n/a tar_compression = 'gz'
615n/a elif _BZ2_SUPPORTED and compress == 'bzip2':
616n/a tar_compression = 'bz2'
617n/a elif _LZMA_SUPPORTED and compress == 'xz':
618n/a tar_compression = 'xz'
619n/a else:
620n/a raise ValueError("bad value for 'compress', or compression format not "
621n/a "supported : {0}".format(compress))
622n/a
623n/a import tarfile # late import for breaking circular dependency
624n/a
625n/a compress_ext = '.' + tar_compression if compress else ''
626n/a archive_name = base_name + '.tar' + compress_ext
627n/a archive_dir = os.path.dirname(archive_name)
628n/a
629n/a if archive_dir and not os.path.exists(archive_dir):
630n/a if logger is not None:
631n/a logger.info("creating %s", archive_dir)
632n/a if not dry_run:
633n/a os.makedirs(archive_dir)
634n/a
635n/a # creating the tarball
636n/a if logger is not None:
637n/a logger.info('Creating tar archive')
638n/a
639n/a uid = _get_uid(owner)
640n/a gid = _get_gid(group)
641n/a
642n/a def _set_uid_gid(tarinfo):
643n/a if gid is not None:
644n/a tarinfo.gid = gid
645n/a tarinfo.gname = group
646n/a if uid is not None:
647n/a tarinfo.uid = uid
648n/a tarinfo.uname = owner
649n/a return tarinfo
650n/a
651n/a if not dry_run:
652n/a tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
653n/a try:
654n/a tar.add(base_dir, filter=_set_uid_gid)
655n/a finally:
656n/a tar.close()
657n/a
658n/a return archive_name
659n/a
660n/adef _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
661n/a """Create a zip file from all the files under 'base_dir'.
662n/a
663n/a The output zip file will be named 'base_name' + ".zip". Returns the
664n/a name of the output zip file.
665n/a """
666n/a import zipfile # late import for breaking circular dependency
667n/a
668n/a zip_filename = base_name + ".zip"
669n/a archive_dir = os.path.dirname(base_name)
670n/a
671n/a if archive_dir and not os.path.exists(archive_dir):
672n/a if logger is not None:
673n/a logger.info("creating %s", archive_dir)
674n/a if not dry_run:
675n/a os.makedirs(archive_dir)
676n/a
677n/a if logger is not None:
678n/a logger.info("creating '%s' and adding '%s' to it",
679n/a zip_filename, base_dir)
680n/a
681n/a if not dry_run:
682n/a with zipfile.ZipFile(zip_filename, "w",
683n/a compression=zipfile.ZIP_DEFLATED) as zf:
684n/a path = os.path.normpath(base_dir)
685n/a if path != os.curdir:
686n/a zf.write(path, path)
687n/a if logger is not None:
688n/a logger.info("adding '%s'", path)
689n/a for dirpath, dirnames, filenames in os.walk(base_dir):
690n/a for name in sorted(dirnames):
691n/a path = os.path.normpath(os.path.join(dirpath, name))
692n/a zf.write(path, path)
693n/a if logger is not None:
694n/a logger.info("adding '%s'", path)
695n/a for name in filenames:
696n/a path = os.path.normpath(os.path.join(dirpath, name))
697n/a if os.path.isfile(path):
698n/a zf.write(path, path)
699n/a if logger is not None:
700n/a logger.info("adding '%s'", path)
701n/a
702n/a return zip_filename
703n/a
704n/a_ARCHIVE_FORMATS = {
705n/a 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
706n/a}
707n/a
708n/aif _ZLIB_SUPPORTED:
709n/a _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
710n/a "gzip'ed tar-file")
711n/a _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
712n/a
713n/aif _BZ2_SUPPORTED:
714n/a _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
715n/a "bzip2'ed tar-file")
716n/a
717n/aif _LZMA_SUPPORTED:
718n/a _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
719n/a "xz'ed tar-file")
720n/a
721n/adef get_archive_formats():
722n/a """Returns a list of supported formats for archiving and unarchiving.
723n/a
724n/a Each element of the returned sequence is a tuple (name, description)
725n/a """
726n/a formats = [(name, registry[2]) for name, registry in
727n/a _ARCHIVE_FORMATS.items()]
728n/a formats.sort()
729n/a return formats
730n/a
731n/adef register_archive_format(name, function, extra_args=None, description=''):
732n/a """Registers an archive format.
733n/a
734n/a name is the name of the format. function is the callable that will be
735n/a used to create archives. If provided, extra_args is a sequence of
736n/a (name, value) tuples that will be passed as arguments to the callable.
737n/a description can be provided to describe the format, and will be returned
738n/a by the get_archive_formats() function.
739n/a """
740n/a if extra_args is None:
741n/a extra_args = []
742n/a if not callable(function):
743n/a raise TypeError('The %s object is not callable' % function)
744n/a if not isinstance(extra_args, (tuple, list)):
745n/a raise TypeError('extra_args needs to be a sequence')
746n/a for element in extra_args:
747n/a if not isinstance(element, (tuple, list)) or len(element) !=2:
748n/a raise TypeError('extra_args elements are : (arg_name, value)')
749n/a
750n/a _ARCHIVE_FORMATS[name] = (function, extra_args, description)
751n/a
752n/adef unregister_archive_format(name):
753n/a del _ARCHIVE_FORMATS[name]
754n/a
755n/adef make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
756n/a dry_run=0, owner=None, group=None, logger=None):
757n/a """Create an archive file (eg. zip or tar).
758n/a
759n/a 'base_name' is the name of the file to create, minus any format-specific
760n/a extension; 'format' is the archive format: one of "zip", "tar", "gztar",
761n/a "bztar", or "xztar". Or any other registered format.
762n/a
763n/a 'root_dir' is a directory that will be the root directory of the
764n/a archive; ie. we typically chdir into 'root_dir' before creating the
765n/a archive. 'base_dir' is the directory where we start archiving from;
766n/a ie. 'base_dir' will be the common prefix of all files and
767n/a directories in the archive. 'root_dir' and 'base_dir' both default
768n/a to the current directory. Returns the name of the archive file.
769n/a
770n/a 'owner' and 'group' are used when creating a tar archive. By default,
771n/a uses the current owner and group.
772n/a """
773n/a save_cwd = os.getcwd()
774n/a if root_dir is not None:
775n/a if logger is not None:
776n/a logger.debug("changing into '%s'", root_dir)
777n/a base_name = os.path.abspath(base_name)
778n/a if not dry_run:
779n/a os.chdir(root_dir)
780n/a
781n/a if base_dir is None:
782n/a base_dir = os.curdir
783n/a
784n/a kwargs = {'dry_run': dry_run, 'logger': logger}
785n/a
786n/a try:
787n/a format_info = _ARCHIVE_FORMATS[format]
788n/a except KeyError:
789n/a raise ValueError("unknown archive format '%s'" % format)
790n/a
791n/a func = format_info[0]
792n/a for arg, val in format_info[1]:
793n/a kwargs[arg] = val
794n/a
795n/a if format != 'zip':
796n/a kwargs['owner'] = owner
797n/a kwargs['group'] = group
798n/a
799n/a try:
800n/a filename = func(base_name, base_dir, **kwargs)
801n/a finally:
802n/a if root_dir is not None:
803n/a if logger is not None:
804n/a logger.debug("changing back to '%s'", save_cwd)
805n/a os.chdir(save_cwd)
806n/a
807n/a return filename
808n/a
809n/a
810n/adef get_unpack_formats():
811n/a """Returns a list of supported formats for unpacking.
812n/a
813n/a Each element of the returned sequence is a tuple
814n/a (name, extensions, description)
815n/a """
816n/a formats = [(name, info[0], info[3]) for name, info in
817n/a _UNPACK_FORMATS.items()]
818n/a formats.sort()
819n/a return formats
820n/a
821n/adef _check_unpack_options(extensions, function, extra_args):
822n/a """Checks what gets registered as an unpacker."""
823n/a # first make sure no other unpacker is registered for this extension
824n/a existing_extensions = {}
825n/a for name, info in _UNPACK_FORMATS.items():
826n/a for ext in info[0]:
827n/a existing_extensions[ext] = name
828n/a
829n/a for extension in extensions:
830n/a if extension in existing_extensions:
831n/a msg = '%s is already registered for "%s"'
832n/a raise RegistryError(msg % (extension,
833n/a existing_extensions[extension]))
834n/a
835n/a if not callable(function):
836n/a raise TypeError('The registered function must be a callable')
837n/a
838n/a
839n/adef register_unpack_format(name, extensions, function, extra_args=None,
840n/a description=''):
841n/a """Registers an unpack format.
842n/a
843n/a `name` is the name of the format. `extensions` is a list of extensions
844n/a corresponding to the format.
845n/a
846n/a `function` is the callable that will be
847n/a used to unpack archives. The callable will receive archives to unpack.
848n/a If it's unable to handle an archive, it needs to raise a ReadError
849n/a exception.
850n/a
851n/a If provided, `extra_args` is a sequence of
852n/a (name, value) tuples that will be passed as arguments to the callable.
853n/a description can be provided to describe the format, and will be returned
854n/a by the get_unpack_formats() function.
855n/a """
856n/a if extra_args is None:
857n/a extra_args = []
858n/a _check_unpack_options(extensions, function, extra_args)
859n/a _UNPACK_FORMATS[name] = extensions, function, extra_args, description
860n/a
861n/adef unregister_unpack_format(name):
862n/a """Removes the pack format from the registry."""
863n/a del _UNPACK_FORMATS[name]
864n/a
865n/adef _ensure_directory(path):
866n/a """Ensure that the parent directory of `path` exists"""
867n/a dirname = os.path.dirname(path)
868n/a if not os.path.isdir(dirname):
869n/a os.makedirs(dirname)
870n/a
871n/adef _unpack_zipfile(filename, extract_dir):
872n/a """Unpack zip `filename` to `extract_dir`
873n/a """
874n/a import zipfile # late import for breaking circular dependency
875n/a
876n/a if not zipfile.is_zipfile(filename):
877n/a raise ReadError("%s is not a zip file" % filename)
878n/a
879n/a zip = zipfile.ZipFile(filename)
880n/a try:
881n/a for info in zip.infolist():
882n/a name = info.filename
883n/a
884n/a # don't extract absolute paths or ones with .. in them
885n/a if name.startswith('/') or '..' in name:
886n/a continue
887n/a
888n/a target = os.path.join(extract_dir, *name.split('/'))
889n/a if not target:
890n/a continue
891n/a
892n/a _ensure_directory(target)
893n/a if not name.endswith('/'):
894n/a # file
895n/a data = zip.read(info.filename)
896n/a f = open(target, 'wb')
897n/a try:
898n/a f.write(data)
899n/a finally:
900n/a f.close()
901n/a del data
902n/a finally:
903n/a zip.close()
904n/a
905n/adef _unpack_tarfile(filename, extract_dir):
906n/a """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
907n/a """
908n/a import tarfile # late import for breaking circular dependency
909n/a try:
910n/a tarobj = tarfile.open(filename)
911n/a except tarfile.TarError:
912n/a raise ReadError(
913n/a "%s is not a compressed or uncompressed tar file" % filename)
914n/a try:
915n/a tarobj.extractall(extract_dir)
916n/a finally:
917n/a tarobj.close()
918n/a
919n/a_UNPACK_FORMATS = {
920n/a 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
921n/a 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),
922n/a}
923n/a
924n/aif _ZLIB_SUPPORTED:
925n/a _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
926n/a "gzip'ed tar-file")
927n/a
928n/aif _BZ2_SUPPORTED:
929n/a _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
930n/a "bzip2'ed tar-file")
931n/a
932n/aif _LZMA_SUPPORTED:
933n/a _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
934n/a "xz'ed tar-file")
935n/a
936n/adef _find_unpack_format(filename):
937n/a for name, info in _UNPACK_FORMATS.items():
938n/a for extension in info[0]:
939n/a if filename.endswith(extension):
940n/a return name
941n/a return None
942n/a
943n/adef unpack_archive(filename, extract_dir=None, format=None):
944n/a """Unpack an archive.
945n/a
946n/a `filename` is the name of the archive.
947n/a
948n/a `extract_dir` is the name of the target directory, where the archive
949n/a is unpacked. If not provided, the current working directory is used.
950n/a
951n/a `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
952n/a or "xztar". Or any other registered format. If not provided,
953n/a unpack_archive will use the filename extension and see if an unpacker
954n/a was registered for that extension.
955n/a
956n/a In case none is found, a ValueError is raised.
957n/a """
958n/a if extract_dir is None:
959n/a extract_dir = os.getcwd()
960n/a
961n/a if format is not None:
962n/a try:
963n/a format_info = _UNPACK_FORMATS[format]
964n/a except KeyError:
965n/a raise ValueError("Unknown unpack format '{0}'".format(format))
966n/a
967n/a func = format_info[1]
968n/a func(filename, extract_dir, **dict(format_info[2]))
969n/a else:
970n/a # we need to look at the registered unpackers supported extensions
971n/a format = _find_unpack_format(filename)
972n/a if format is None:
973n/a raise ReadError("Unknown archive format '{0}'".format(filename))
974n/a
975n/a func = _UNPACK_FORMATS[format][1]
976n/a kwargs = dict(_UNPACK_FORMATS[format][2])
977n/a func(filename, extract_dir, **kwargs)
978n/a
979n/a
980n/aif hasattr(os, 'statvfs'):
981n/a
982n/a __all__.append('disk_usage')
983n/a _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
984n/a _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
985n/a _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
986n/a _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
987n/a
988n/a def disk_usage(path):
989n/a """Return disk usage statistics about the given path.
990n/a
991n/a Returned value is a named tuple with attributes 'total', 'used' and
992n/a 'free', which are the amount of total, used and free space, in bytes.
993n/a """
994n/a st = os.statvfs(path)
995n/a free = st.f_bavail * st.f_frsize
996n/a total = st.f_blocks * st.f_frsize
997n/a used = (st.f_blocks - st.f_bfree) * st.f_frsize
998n/a return _ntuple_diskusage(total, used, free)
999n/a
1000n/aelif os.name == 'nt':
1001n/a
1002n/a import nt
1003n/a __all__.append('disk_usage')
1004n/a _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1005n/a
1006n/a def disk_usage(path):
1007n/a """Return disk usage statistics about the given path.
1008n/a
1009n/a Returned values is a named tuple with attributes 'total', 'used' and
1010n/a 'free', which are the amount of total, used and free space, in bytes.
1011n/a """
1012n/a total, free = nt._getdiskusage(path)
1013n/a used = total - free
1014n/a return _ntuple_diskusage(total, used, free)
1015n/a
1016n/a
1017n/adef chown(path, user=None, group=None):
1018n/a """Change owner user and group of the given path.
1019n/a
1020n/a user and group can be the uid/gid or the user/group names, and in that case,
1021n/a they are converted to their respective uid/gid.
1022n/a """
1023n/a
1024n/a if user is None and group is None:
1025n/a raise ValueError("user and/or group must be set")
1026n/a
1027n/a _user = user
1028n/a _group = group
1029n/a
1030n/a # -1 means don't change it
1031n/a if user is None:
1032n/a _user = -1
1033n/a # user can either be an int (the uid) or a string (the system username)
1034n/a elif isinstance(user, str):
1035n/a _user = _get_uid(user)
1036n/a if _user is None:
1037n/a raise LookupError("no such user: {!r}".format(user))
1038n/a
1039n/a if group is None:
1040n/a _group = -1
1041n/a elif not isinstance(group, int):
1042n/a _group = _get_gid(group)
1043n/a if _group is None:
1044n/a raise LookupError("no such group: {!r}".format(group))
1045n/a
1046n/a os.chown(path, _user, _group)
1047n/a
1048n/adef get_terminal_size(fallback=(80, 24)):
1049n/a """Get the size of the terminal window.
1050n/a
1051n/a For each of the two dimensions, the environment variable, COLUMNS
1052n/a and LINES respectively, is checked. If the variable is defined and
1053n/a the value is a positive integer, it is used.
1054n/a
1055n/a When COLUMNS or LINES is not defined, which is the common case,
1056n/a the terminal connected to sys.__stdout__ is queried
1057n/a by invoking os.get_terminal_size.
1058n/a
1059n/a If the terminal size cannot be successfully queried, either because
1060n/a the system doesn't support querying, or because we are not
1061n/a connected to a terminal, the value given in fallback parameter
1062n/a is used. Fallback defaults to (80, 24) which is the default
1063n/a size used by many terminal emulators.
1064n/a
1065n/a The value returned is a named tuple of type os.terminal_size.
1066n/a """
1067n/a # columns, lines are the working values
1068n/a try:
1069n/a columns = int(os.environ['COLUMNS'])
1070n/a except (KeyError, ValueError):
1071n/a columns = 0
1072n/a
1073n/a try:
1074n/a lines = int(os.environ['LINES'])
1075n/a except (KeyError, ValueError):
1076n/a lines = 0
1077n/a
1078n/a # only query if necessary
1079n/a if columns <= 0 or lines <= 0:
1080n/a try:
1081n/a size = os.get_terminal_size(sys.__stdout__.fileno())
1082n/a except (AttributeError, ValueError, OSError):
1083n/a # stdout is None, closed, detached, or not a terminal, or
1084n/a # os.get_terminal_size() is unsupported
1085n/a size = os.terminal_size(fallback)
1086n/a if columns <= 0:
1087n/a columns = size.columns
1088n/a if lines <= 0:
1089n/a lines = size.lines
1090n/a
1091n/a return os.terminal_size((columns, lines))
1092n/a
1093n/adef which(cmd, mode=os.F_OK | os.X_OK, path=None):
1094n/a """Given a command, mode, and a PATH string, return the path which
1095n/a conforms to the given mode on the PATH, or None if there is no such
1096n/a file.
1097n/a
1098n/a `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1099n/a of os.environ.get("PATH"), or can be overridden with a custom search
1100n/a path.
1101n/a
1102n/a """
1103n/a # Check that a given file can be accessed with the correct mode.
1104n/a # Additionally check that `file` is not a directory, as on Windows
1105n/a # directories pass the os.access check.
1106n/a def _access_check(fn, mode):
1107n/a return (os.path.exists(fn) and os.access(fn, mode)
1108n/a and not os.path.isdir(fn))
1109n/a
1110n/a # If we're given a path with a directory part, look it up directly rather
1111n/a # than referring to PATH directories. This includes checking relative to the
1112n/a # current directory, e.g. ./script
1113n/a if os.path.dirname(cmd):
1114n/a if _access_check(cmd, mode):
1115n/a return cmd
1116n/a return None
1117n/a
1118n/a if path is None:
1119n/a path = os.environ.get("PATH", os.defpath)
1120n/a if not path:
1121n/a return None
1122n/a path = path.split(os.pathsep)
1123n/a
1124n/a if sys.platform == "win32":
1125n/a # The current directory takes precedence on Windows.
1126n/a if not os.curdir in path:
1127n/a path.insert(0, os.curdir)
1128n/a
1129n/a # PATHEXT is necessary to check on Windows.
1130n/a pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
1131n/a # See if the given file matches any of the expected path extensions.
1132n/a # This will allow us to short circuit when given "python.exe".
1133n/a # If it does match, only test that one, otherwise we have to try
1134n/a # others.
1135n/a if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
1136n/a files = [cmd]
1137n/a else:
1138n/a files = [cmd + ext for ext in pathext]
1139n/a else:
1140n/a # On other platforms you don't have things like PATHEXT to tell you
1141n/a # what file suffixes are executable, so just pass on cmd as-is.
1142n/a files = [cmd]
1143n/a
1144n/a seen = set()
1145n/a for dir in path:
1146n/a normdir = os.path.normcase(dir)
1147n/a if not normdir in seen:
1148n/a seen.add(normdir)
1149n/a for thefile in files:
1150n/a name = os.path.join(dir, thefile)
1151n/a if _access_check(name, mode):
1152n/a return name
1153n/a return None