ยปCore Development>Code coverage>Lib/tempfile.py

Python code coverage for Lib/tempfile.py

#countcontent
1n/a"""Temporary files.
2n/a
3n/aThis module provides generic, low- and high-level interfaces for
4n/acreating temporary files and directories. All of the interfaces
5n/aprovided by this module can be used without fear of race conditions
6n/aexcept for 'mktemp'. 'mktemp' is subject to race conditions and
7n/ashould not be used; it is provided for backward compatibility only.
8n/a
9n/aThe default path names are returned as str. If you supply bytes as
10n/ainput, all return values will be in bytes. Ex:
11n/a
12n/a >>> tempfile.mkstemp()
13n/a (4, '/tmp/tmptpu9nin8')
14n/a >>> tempfile.mkdtemp(suffix=b'')
15n/a b'/tmp/tmppbi8f0hy'
16n/a
17n/aThis module also provides some data items to the user:
18n/a
19n/a TMP_MAX - maximum number of names that will be tried before
20n/a giving up.
21n/a tempdir - If this is set to a string before the first use of
22n/a any routine from this module, it will be considered as
23n/a another candidate location to store temporary files.
24n/a"""
25n/a
26n/a__all__ = [
27n/a "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
28n/a "SpooledTemporaryFile", "TemporaryDirectory",
29n/a "mkstemp", "mkdtemp", # low level safe interfaces
30n/a "mktemp", # deprecated unsafe interface
31n/a "TMP_MAX", "gettempprefix", # constants
32n/a "tempdir", "gettempdir",
33n/a "gettempprefixb", "gettempdirb",
34n/a ]
35n/a
36n/a
37n/a# Imports.
38n/a
39n/aimport functools as _functools
40n/aimport warnings as _warnings
41n/aimport io as _io
42n/aimport os as _os
43n/aimport shutil as _shutil
44n/aimport errno as _errno
45n/afrom random import Random as _Random
46n/aimport weakref as _weakref
47n/a
48n/atry:
49n/a import _thread
50n/aexcept ImportError:
51n/a import _dummy_thread as _thread
52n/a_allocate_lock = _thread.allocate_lock
53n/a
54n/a_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
55n/aif hasattr(_os, 'O_NOFOLLOW'):
56n/a _text_openflags |= _os.O_NOFOLLOW
57n/a
58n/a_bin_openflags = _text_openflags
59n/aif hasattr(_os, 'O_BINARY'):
60n/a _bin_openflags |= _os.O_BINARY
61n/a
62n/aif hasattr(_os, 'TMP_MAX'):
63n/a TMP_MAX = _os.TMP_MAX
64n/aelse:
65n/a TMP_MAX = 10000
66n/a
67n/a# This variable _was_ unused for legacy reasons, see issue 10354.
68n/a# But as of 3.5 we actually use it at runtime so changing it would
69n/a# have a possibly desirable side effect... But we do not want to support
70n/a# that as an API. It is undocumented on purpose. Do not depend on this.
71n/atemplate = "tmp"
72n/a
73n/a# Internal routines.
74n/a
75n/a_once_lock = _allocate_lock()
76n/a
77n/aif hasattr(_os, "lstat"):
78n/a _stat = _os.lstat
79n/aelif hasattr(_os, "stat"):
80n/a _stat = _os.stat
81n/aelse:
82n/a # Fallback. All we need is something that raises OSError if the
83n/a # file doesn't exist.
84n/a def _stat(fn):
85n/a fd = _os.open(fn, _os.O_RDONLY)
86n/a _os.close(fd)
87n/a
88n/adef _exists(fn):
89n/a try:
90n/a _stat(fn)
91n/a except OSError:
92n/a return False
93n/a else:
94n/a return True
95n/a
96n/a
97n/adef _infer_return_type(*args):
98n/a """Look at the type of all args and divine their implied return type."""
99n/a return_type = None
100n/a for arg in args:
101n/a if arg is None:
102n/a continue
103n/a if isinstance(arg, bytes):
104n/a if return_type is str:
105n/a raise TypeError("Can't mix bytes and non-bytes in "
106n/a "path components.")
107n/a return_type = bytes
108n/a else:
109n/a if return_type is bytes:
110n/a raise TypeError("Can't mix bytes and non-bytes in "
111n/a "path components.")
112n/a return_type = str
113n/a if return_type is None:
114n/a return str # tempfile APIs return a str by default.
115n/a return return_type
116n/a
117n/a
118n/adef _sanitize_params(prefix, suffix, dir):
119n/a """Common parameter processing for most APIs in this module."""
120n/a output_type = _infer_return_type(prefix, suffix, dir)
121n/a if suffix is None:
122n/a suffix = output_type()
123n/a if prefix is None:
124n/a if output_type is str:
125n/a prefix = template
126n/a else:
127n/a prefix = _os.fsencode(template)
128n/a if dir is None:
129n/a if output_type is str:
130n/a dir = gettempdir()
131n/a else:
132n/a dir = gettempdirb()
133n/a return prefix, suffix, dir, output_type
134n/a
135n/a
136n/aclass _RandomNameSequence:
137n/a """An instance of _RandomNameSequence generates an endless
138n/a sequence of unpredictable strings which can safely be incorporated
139n/a into file names. Each string is six characters long. Multiple
140n/a threads can safely use the same instance at the same time.
141n/a
142n/a _RandomNameSequence is an iterator."""
143n/a
144n/a characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
145n/a
146n/a @property
147n/a def rng(self):
148n/a cur_pid = _os.getpid()
149n/a if cur_pid != getattr(self, '_rng_pid', None):
150n/a self._rng = _Random()
151n/a self._rng_pid = cur_pid
152n/a return self._rng
153n/a
154n/a def __iter__(self):
155n/a return self
156n/a
157n/a def __next__(self):
158n/a c = self.characters
159n/a choose = self.rng.choice
160n/a letters = [choose(c) for dummy in range(8)]
161n/a return ''.join(letters)
162n/a
163n/adef _candidate_tempdir_list():
164n/a """Generate a list of candidate temporary directories which
165n/a _get_default_tempdir will try."""
166n/a
167n/a dirlist = []
168n/a
169n/a # First, try the environment.
170n/a for envname in 'TMPDIR', 'TEMP', 'TMP':
171n/a dirname = _os.getenv(envname)
172n/a if dirname: dirlist.append(dirname)
173n/a
174n/a # Failing that, try OS-specific locations.
175n/a if _os.name == 'nt':
176n/a dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
177n/a else:
178n/a dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
179n/a
180n/a # As a last resort, the current directory.
181n/a try:
182n/a dirlist.append(_os.getcwd())
183n/a except (AttributeError, OSError):
184n/a dirlist.append(_os.curdir)
185n/a
186n/a return dirlist
187n/a
188n/adef _get_default_tempdir():
189n/a """Calculate the default directory to use for temporary files.
190n/a This routine should be called exactly once.
191n/a
192n/a We determine whether or not a candidate temp dir is usable by
193n/a trying to create and write to a file in that directory. If this
194n/a is successful, the test file is deleted. To prevent denial of
195n/a service, the name of the test file must be randomized."""
196n/a
197n/a namer = _RandomNameSequence()
198n/a dirlist = _candidate_tempdir_list()
199n/a
200n/a for dir in dirlist:
201n/a if dir != _os.curdir:
202n/a dir = _os.path.abspath(dir)
203n/a # Try only a few names per directory.
204n/a for seq in range(100):
205n/a name = next(namer)
206n/a filename = _os.path.join(dir, name)
207n/a try:
208n/a fd = _os.open(filename, _bin_openflags, 0o600)
209n/a try:
210n/a try:
211n/a with _io.open(fd, 'wb', closefd=False) as fp:
212n/a fp.write(b'blat')
213n/a finally:
214n/a _os.close(fd)
215n/a finally:
216n/a _os.unlink(filename)
217n/a return dir
218n/a except FileExistsError:
219n/a pass
220n/a except PermissionError:
221n/a # This exception is thrown when a directory with the chosen name
222n/a # already exists on windows.
223n/a if (_os.name == 'nt' and _os.path.isdir(dir) and
224n/a _os.access(dir, _os.W_OK)):
225n/a continue
226n/a break # no point trying more names in this directory
227n/a except OSError:
228n/a break # no point trying more names in this directory
229n/a raise FileNotFoundError(_errno.ENOENT,
230n/a "No usable temporary directory found in %s" %
231n/a dirlist)
232n/a
233n/a_name_sequence = None
234n/a
235n/adef _get_candidate_names():
236n/a """Common setup sequence for all user-callable interfaces."""
237n/a
238n/a global _name_sequence
239n/a if _name_sequence is None:
240n/a _once_lock.acquire()
241n/a try:
242n/a if _name_sequence is None:
243n/a _name_sequence = _RandomNameSequence()
244n/a finally:
245n/a _once_lock.release()
246n/a return _name_sequence
247n/a
248n/a
249n/adef _mkstemp_inner(dir, pre, suf, flags, output_type):
250n/a """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
251n/a
252n/a names = _get_candidate_names()
253n/a if output_type is bytes:
254n/a names = map(_os.fsencode, names)
255n/a
256n/a for seq in range(TMP_MAX):
257n/a name = next(names)
258n/a file = _os.path.join(dir, pre + name + suf)
259n/a try:
260n/a fd = _os.open(file, flags, 0o600)
261n/a except FileExistsError:
262n/a continue # try again
263n/a except PermissionError:
264n/a # This exception is thrown when a directory with the chosen name
265n/a # already exists on windows.
266n/a if (_os.name == 'nt' and _os.path.isdir(dir) and
267n/a _os.access(dir, _os.W_OK)):
268n/a continue
269n/a else:
270n/a raise
271n/a return (fd, _os.path.abspath(file))
272n/a
273n/a raise FileExistsError(_errno.EEXIST,
274n/a "No usable temporary file name found")
275n/a
276n/a
277n/a# User visible interfaces.
278n/a
279n/adef gettempprefix():
280n/a """The default prefix for temporary directories."""
281n/a return template
282n/a
283n/adef gettempprefixb():
284n/a """The default prefix for temporary directories as bytes."""
285n/a return _os.fsencode(gettempprefix())
286n/a
287n/atempdir = None
288n/a
289n/adef gettempdir():
290n/a """Accessor for tempfile.tempdir."""
291n/a global tempdir
292n/a if tempdir is None:
293n/a _once_lock.acquire()
294n/a try:
295n/a if tempdir is None:
296n/a tempdir = _get_default_tempdir()
297n/a finally:
298n/a _once_lock.release()
299n/a return tempdir
300n/a
301n/adef gettempdirb():
302n/a """A bytes version of tempfile.gettempdir()."""
303n/a return _os.fsencode(gettempdir())
304n/a
305n/adef mkstemp(suffix=None, prefix=None, dir=None, text=False):
306n/a """User-callable function to create and return a unique temporary
307n/a file. The return value is a pair (fd, name) where fd is the
308n/a file descriptor returned by os.open, and name is the filename.
309n/a
310n/a If 'suffix' is not None, the file name will end with that suffix,
311n/a otherwise there will be no suffix.
312n/a
313n/a If 'prefix' is not None, the file name will begin with that prefix,
314n/a otherwise a default prefix is used.
315n/a
316n/a If 'dir' is not None, the file will be created in that directory,
317n/a otherwise a default directory is used.
318n/a
319n/a If 'text' is specified and true, the file is opened in text
320n/a mode. Else (the default) the file is opened in binary mode. On
321n/a some operating systems, this makes no difference.
322n/a
323n/a If any of 'suffix', 'prefix' and 'dir' are not None, they must be the
324n/a same type. If they are bytes, the returned name will be bytes; str
325n/a otherwise.
326n/a
327n/a The file is readable and writable only by the creating user ID.
328n/a If the operating system uses permission bits to indicate whether a
329n/a file is executable, the file is executable by no one. The file
330n/a descriptor is not inherited by children of this process.
331n/a
332n/a Caller is responsible for deleting the file when done with it.
333n/a """
334n/a
335n/a prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
336n/a
337n/a if text:
338n/a flags = _text_openflags
339n/a else:
340n/a flags = _bin_openflags
341n/a
342n/a return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
343n/a
344n/a
345n/adef mkdtemp(suffix=None, prefix=None, dir=None):
346n/a """User-callable function to create and return a unique temporary
347n/a directory. The return value is the pathname of the directory.
348n/a
349n/a Arguments are as for mkstemp, except that the 'text' argument is
350n/a not accepted.
351n/a
352n/a The directory is readable, writable, and searchable only by the
353n/a creating user.
354n/a
355n/a Caller is responsible for deleting the directory when done with it.
356n/a """
357n/a
358n/a prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
359n/a
360n/a names = _get_candidate_names()
361n/a if output_type is bytes:
362n/a names = map(_os.fsencode, names)
363n/a
364n/a for seq in range(TMP_MAX):
365n/a name = next(names)
366n/a file = _os.path.join(dir, prefix + name + suffix)
367n/a try:
368n/a _os.mkdir(file, 0o700)
369n/a except FileExistsError:
370n/a continue # try again
371n/a except PermissionError:
372n/a # This exception is thrown when a directory with the chosen name
373n/a # already exists on windows.
374n/a if (_os.name == 'nt' and _os.path.isdir(dir) and
375n/a _os.access(dir, _os.W_OK)):
376n/a continue
377n/a else:
378n/a raise
379n/a return file
380n/a
381n/a raise FileExistsError(_errno.EEXIST,
382n/a "No usable temporary directory name found")
383n/a
384n/adef mktemp(suffix="", prefix=template, dir=None):
385n/a """User-callable function to return a unique temporary file name. The
386n/a file is not created.
387n/a
388n/a Arguments are similar to mkstemp, except that the 'text' argument is
389n/a not accepted, and suffix=None, prefix=None and bytes file names are not
390n/a supported.
391n/a
392n/a THIS FUNCTION IS UNSAFE AND SHOULD NOT BE USED. The file name may
393n/a refer to a file that did not exist at some point, but by the time
394n/a you get around to creating it, someone else may have beaten you to
395n/a the punch.
396n/a """
397n/a
398n/a## from warnings import warn as _warn
399n/a## _warn("mktemp is a potential security risk to your program",
400n/a## RuntimeWarning, stacklevel=2)
401n/a
402n/a if dir is None:
403n/a dir = gettempdir()
404n/a
405n/a names = _get_candidate_names()
406n/a for seq in range(TMP_MAX):
407n/a name = next(names)
408n/a file = _os.path.join(dir, prefix + name + suffix)
409n/a if not _exists(file):
410n/a return file
411n/a
412n/a raise FileExistsError(_errno.EEXIST,
413n/a "No usable temporary filename found")
414n/a
415n/a
416n/aclass _TemporaryFileCloser:
417n/a """A separate object allowing proper closing of a temporary file's
418n/a underlying file object, without adding a __del__ method to the
419n/a temporary file."""
420n/a
421n/a file = None # Set here since __del__ checks it
422n/a close_called = False
423n/a
424n/a def __init__(self, file, name, delete=True):
425n/a self.file = file
426n/a self.name = name
427n/a self.delete = delete
428n/a
429n/a # NT provides delete-on-close as a primitive, so we don't need
430n/a # the wrapper to do anything special. We still use it so that
431n/a # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
432n/a if _os.name != 'nt':
433n/a # Cache the unlinker so we don't get spurious errors at
434n/a # shutdown when the module-level "os" is None'd out. Note
435n/a # that this must be referenced as self.unlink, because the
436n/a # name TemporaryFileWrapper may also get None'd out before
437n/a # __del__ is called.
438n/a
439n/a def close(self, unlink=_os.unlink):
440n/a if not self.close_called and self.file is not None:
441n/a self.close_called = True
442n/a try:
443n/a self.file.close()
444n/a finally:
445n/a if self.delete:
446n/a unlink(self.name)
447n/a
448n/a # Need to ensure the file is deleted on __del__
449n/a def __del__(self):
450n/a self.close()
451n/a
452n/a else:
453n/a def close(self):
454n/a if not self.close_called:
455n/a self.close_called = True
456n/a self.file.close()
457n/a
458n/a
459n/aclass _TemporaryFileWrapper:
460n/a """Temporary file wrapper
461n/a
462n/a This class provides a wrapper around files opened for
463n/a temporary use. In particular, it seeks to automatically
464n/a remove the file when it is no longer needed.
465n/a """
466n/a
467n/a def __init__(self, file, name, delete=True):
468n/a self.file = file
469n/a self.name = name
470n/a self.delete = delete
471n/a self._closer = _TemporaryFileCloser(file, name, delete)
472n/a
473n/a def __getattr__(self, name):
474n/a # Attribute lookups are delegated to the underlying file
475n/a # and cached for non-numeric results
476n/a # (i.e. methods are cached, closed and friends are not)
477n/a file = self.__dict__['file']
478n/a a = getattr(file, name)
479n/a if hasattr(a, '__call__'):
480n/a func = a
481n/a @_functools.wraps(func)
482n/a def func_wrapper(*args, **kwargs):
483n/a return func(*args, **kwargs)
484n/a # Avoid closing the file as long as the wrapper is alive,
485n/a # see issue #18879.
486n/a func_wrapper._closer = self._closer
487n/a a = func_wrapper
488n/a if not isinstance(a, int):
489n/a setattr(self, name, a)
490n/a return a
491n/a
492n/a # The underlying __enter__ method returns the wrong object
493n/a # (self.file) so override it to return the wrapper
494n/a def __enter__(self):
495n/a self.file.__enter__()
496n/a return self
497n/a
498n/a # Need to trap __exit__ as well to ensure the file gets
499n/a # deleted when used in a with statement
500n/a def __exit__(self, exc, value, tb):
501n/a result = self.file.__exit__(exc, value, tb)
502n/a self.close()
503n/a return result
504n/a
505n/a def close(self):
506n/a """
507n/a Close the temporary file, possibly deleting it.
508n/a """
509n/a self._closer.close()
510n/a
511n/a # iter() doesn't use __getattr__ to find the __iter__ method
512n/a def __iter__(self):
513n/a # Don't return iter(self.file), but yield from it to avoid closing
514n/a # file as long as it's being used as iterator (see issue #23700). We
515n/a # can't use 'yield from' here because iter(file) returns the file
516n/a # object itself, which has a close method, and thus the file would get
517n/a # closed when the generator is finalized, due to PEP380 semantics.
518n/a for line in self.file:
519n/a yield line
520n/a
521n/a
522n/adef NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
523n/a newline=None, suffix=None, prefix=None,
524n/a dir=None, delete=True):
525n/a """Create and return a temporary file.
526n/a Arguments:
527n/a 'prefix', 'suffix', 'dir' -- as for mkstemp.
528n/a 'mode' -- the mode argument to io.open (default "w+b").
529n/a 'buffering' -- the buffer size argument to io.open (default -1).
530n/a 'encoding' -- the encoding argument to io.open (default None)
531n/a 'newline' -- the newline argument to io.open (default None)
532n/a 'delete' -- whether the file is deleted on close (default True).
533n/a The file is created as mkstemp() would do it.
534n/a
535n/a Returns an object with a file-like interface; the name of the file
536n/a is accessible as its 'name' attribute. The file will be automatically
537n/a deleted when it is closed unless the 'delete' argument is set to False.
538n/a """
539n/a
540n/a prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
541n/a
542n/a flags = _bin_openflags
543n/a
544n/a # Setting O_TEMPORARY in the flags causes the OS to delete
545n/a # the file when it is closed. This is only supported by Windows.
546n/a if _os.name == 'nt' and delete:
547n/a flags |= _os.O_TEMPORARY
548n/a
549n/a (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
550n/a try:
551n/a file = _io.open(fd, mode, buffering=buffering,
552n/a newline=newline, encoding=encoding)
553n/a
554n/a return _TemporaryFileWrapper(file, name, delete)
555n/a except BaseException:
556n/a _os.unlink(name)
557n/a _os.close(fd)
558n/a raise
559n/a
560n/aif _os.name != 'posix' or _os.sys.platform == 'cygwin':
561n/a # On non-POSIX and Cygwin systems, assume that we cannot unlink a file
562n/a # while it is open.
563n/a TemporaryFile = NamedTemporaryFile
564n/a
565n/aelse:
566n/a # Is the O_TMPFILE flag available and does it work?
567n/a # The flag is set to False if os.open(dir, os.O_TMPFILE) raises an
568n/a # IsADirectoryError exception
569n/a _O_TMPFILE_WORKS = hasattr(_os, 'O_TMPFILE')
570n/a
571n/a def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
572n/a newline=None, suffix=None, prefix=None,
573n/a dir=None):
574n/a """Create and return a temporary file.
575n/a Arguments:
576n/a 'prefix', 'suffix', 'dir' -- as for mkstemp.
577n/a 'mode' -- the mode argument to io.open (default "w+b").
578n/a 'buffering' -- the buffer size argument to io.open (default -1).
579n/a 'encoding' -- the encoding argument to io.open (default None)
580n/a 'newline' -- the newline argument to io.open (default None)
581n/a The file is created as mkstemp() would do it.
582n/a
583n/a Returns an object with a file-like interface. The file has no
584n/a name, and will cease to exist when it is closed.
585n/a """
586n/a global _O_TMPFILE_WORKS
587n/a
588n/a prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
589n/a
590n/a flags = _bin_openflags
591n/a if _O_TMPFILE_WORKS:
592n/a try:
593n/a flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT
594n/a fd = _os.open(dir, flags2, 0o600)
595n/a except IsADirectoryError:
596n/a # Linux kernel older than 3.11 ignores the O_TMPFILE flag:
597n/a # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory
598n/a # with O_RDWR|O_DIRECTORY fails with IsADirectoryError, a
599n/a # directory cannot be open to write. Set flag to False to not
600n/a # try again.
601n/a _O_TMPFILE_WORKS = False
602n/a except OSError:
603n/a # The filesystem of the directory does not support O_TMPFILE.
604n/a # For example, OSError(95, 'Operation not supported').
605n/a #
606n/a # On Linux kernel older than 3.11, trying to open a regular
607n/a # file (or a symbolic link to a regular file) with O_TMPFILE
608n/a # fails with NotADirectoryError, because O_TMPFILE is read as
609n/a # O_DIRECTORY.
610n/a pass
611n/a else:
612n/a try:
613n/a return _io.open(fd, mode, buffering=buffering,
614n/a newline=newline, encoding=encoding)
615n/a except:
616n/a _os.close(fd)
617n/a raise
618n/a # Fallback to _mkstemp_inner().
619n/a
620n/a (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
621n/a try:
622n/a _os.unlink(name)
623n/a return _io.open(fd, mode, buffering=buffering,
624n/a newline=newline, encoding=encoding)
625n/a except:
626n/a _os.close(fd)
627n/a raise
628n/a
629n/aclass SpooledTemporaryFile:
630n/a """Temporary file wrapper, specialized to switch from BytesIO
631n/a or StringIO to a real file when it exceeds a certain size or
632n/a when a fileno is needed.
633n/a """
634n/a _rolled = False
635n/a
636n/a def __init__(self, max_size=0, mode='w+b', buffering=-1,
637n/a encoding=None, newline=None,
638n/a suffix=None, prefix=None, dir=None):
639n/a if 'b' in mode:
640n/a self._file = _io.BytesIO()
641n/a else:
642n/a # Setting newline="\n" avoids newline translation;
643n/a # this is important because otherwise on Windows we'd
644n/a # get double newline translation upon rollover().
645n/a self._file = _io.StringIO(newline="\n")
646n/a self._max_size = max_size
647n/a self._rolled = False
648n/a self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
649n/a 'suffix': suffix, 'prefix': prefix,
650n/a 'encoding': encoding, 'newline': newline,
651n/a 'dir': dir}
652n/a
653n/a def _check(self, file):
654n/a if self._rolled: return
655n/a max_size = self._max_size
656n/a if max_size and file.tell() > max_size:
657n/a self.rollover()
658n/a
659n/a def rollover(self):
660n/a if self._rolled: return
661n/a file = self._file
662n/a newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
663n/a del self._TemporaryFileArgs
664n/a
665n/a newfile.write(file.getvalue())
666n/a newfile.seek(file.tell(), 0)
667n/a
668n/a self._rolled = True
669n/a
670n/a # The method caching trick from NamedTemporaryFile
671n/a # won't work here, because _file may change from a
672n/a # BytesIO/StringIO instance to a real file. So we list
673n/a # all the methods directly.
674n/a
675n/a # Context management protocol
676n/a def __enter__(self):
677n/a if self._file.closed:
678n/a raise ValueError("Cannot enter context with closed file")
679n/a return self
680n/a
681n/a def __exit__(self, exc, value, tb):
682n/a self._file.close()
683n/a
684n/a # file protocol
685n/a def __iter__(self):
686n/a return self._file.__iter__()
687n/a
688n/a def close(self):
689n/a self._file.close()
690n/a
691n/a @property
692n/a def closed(self):
693n/a return self._file.closed
694n/a
695n/a @property
696n/a def encoding(self):
697n/a try:
698n/a return self._file.encoding
699n/a except AttributeError:
700n/a if 'b' in self._TemporaryFileArgs['mode']:
701n/a raise
702n/a return self._TemporaryFileArgs['encoding']
703n/a
704n/a def fileno(self):
705n/a self.rollover()
706n/a return self._file.fileno()
707n/a
708n/a def flush(self):
709n/a self._file.flush()
710n/a
711n/a def isatty(self):
712n/a return self._file.isatty()
713n/a
714n/a @property
715n/a def mode(self):
716n/a try:
717n/a return self._file.mode
718n/a except AttributeError:
719n/a return self._TemporaryFileArgs['mode']
720n/a
721n/a @property
722n/a def name(self):
723n/a try:
724n/a return self._file.name
725n/a except AttributeError:
726n/a return None
727n/a
728n/a @property
729n/a def newlines(self):
730n/a try:
731n/a return self._file.newlines
732n/a except AttributeError:
733n/a if 'b' in self._TemporaryFileArgs['mode']:
734n/a raise
735n/a return self._TemporaryFileArgs['newline']
736n/a
737n/a def read(self, *args):
738n/a return self._file.read(*args)
739n/a
740n/a def readline(self, *args):
741n/a return self._file.readline(*args)
742n/a
743n/a def readlines(self, *args):
744n/a return self._file.readlines(*args)
745n/a
746n/a def seek(self, *args):
747n/a self._file.seek(*args)
748n/a
749n/a @property
750n/a def softspace(self):
751n/a return self._file.softspace
752n/a
753n/a def tell(self):
754n/a return self._file.tell()
755n/a
756n/a def truncate(self, size=None):
757n/a if size is None:
758n/a self._file.truncate()
759n/a else:
760n/a if size > self._max_size:
761n/a self.rollover()
762n/a self._file.truncate(size)
763n/a
764n/a def write(self, s):
765n/a file = self._file
766n/a rv = file.write(s)
767n/a self._check(file)
768n/a return rv
769n/a
770n/a def writelines(self, iterable):
771n/a file = self._file
772n/a rv = file.writelines(iterable)
773n/a self._check(file)
774n/a return rv
775n/a
776n/a
777n/aclass TemporaryDirectory(object):
778n/a """Create and return a temporary directory. This has the same
779n/a behavior as mkdtemp but can be used as a context manager. For
780n/a example:
781n/a
782n/a with TemporaryDirectory() as tmpdir:
783n/a ...
784n/a
785n/a Upon exiting the context, the directory and everything contained
786n/a in it are removed.
787n/a """
788n/a
789n/a def __init__(self, suffix=None, prefix=None, dir=None):
790n/a self.name = mkdtemp(suffix, prefix, dir)
791n/a self._finalizer = _weakref.finalize(
792n/a self, self._cleanup, self.name,
793n/a warn_message="Implicitly cleaning up {!r}".format(self))
794n/a
795n/a @classmethod
796n/a def _cleanup(cls, name, warn_message):
797n/a _shutil.rmtree(name)
798n/a _warnings.warn(warn_message, ResourceWarning)
799n/a
800n/a def __repr__(self):
801n/a return "<{} {!r}>".format(self.__class__.__name__, self.name)
802n/a
803n/a def __enter__(self):
804n/a return self.name
805n/a
806n/a def __exit__(self, exc, value, tb):
807n/a self.cleanup()
808n/a
809n/a def cleanup(self):
810n/a if self._finalizer.detach():
811n/a _shutil.rmtree(self.name)