ยปCore Development>Code coverage>Lib/test/test_tempfile.py

Python code coverage for Lib/test/test_tempfile.py

#countcontent
1n/a# tempfile.py unit tests.
2n/aimport tempfile
3n/aimport errno
4n/aimport io
5n/aimport os
6n/aimport signal
7n/aimport sys
8n/aimport re
9n/aimport warnings
10n/aimport contextlib
11n/aimport weakref
12n/afrom unittest import mock
13n/a
14n/aimport unittest
15n/afrom test import support
16n/afrom test.support import script_helper
17n/a
18n/a
19n/aif hasattr(os, 'stat'):
20n/a import stat
21n/a has_stat = 1
22n/aelse:
23n/a has_stat = 0
24n/a
25n/ahas_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
26n/ahas_spawnl = hasattr(os, 'spawnl')
27n/a
28n/a# TEST_FILES may need to be tweaked for systems depending on the maximum
29n/a# number of files that can be opened at one time (see ulimit -n)
30n/aif sys.platform.startswith('openbsd'):
31n/a TEST_FILES = 48
32n/aelse:
33n/a TEST_FILES = 100
34n/a
35n/a# This is organized as one test for each chunk of code in tempfile.py,
36n/a# in order of their appearance in the file. Testing which requires
37n/a# threads is not done here.
38n/a
39n/aclass TestLowLevelInternals(unittest.TestCase):
40n/a def test_infer_return_type_singles(self):
41n/a self.assertIs(str, tempfile._infer_return_type(''))
42n/a self.assertIs(bytes, tempfile._infer_return_type(b''))
43n/a self.assertIs(str, tempfile._infer_return_type(None))
44n/a
45n/a def test_infer_return_type_multiples(self):
46n/a self.assertIs(str, tempfile._infer_return_type('', ''))
47n/a self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
48n/a with self.assertRaises(TypeError):
49n/a tempfile._infer_return_type('', b'')
50n/a with self.assertRaises(TypeError):
51n/a tempfile._infer_return_type(b'', '')
52n/a
53n/a def test_infer_return_type_multiples_and_none(self):
54n/a self.assertIs(str, tempfile._infer_return_type(None, ''))
55n/a self.assertIs(str, tempfile._infer_return_type('', None))
56n/a self.assertIs(str, tempfile._infer_return_type(None, None))
57n/a self.assertIs(bytes, tempfile._infer_return_type(b'', None))
58n/a self.assertIs(bytes, tempfile._infer_return_type(None, b''))
59n/a with self.assertRaises(TypeError):
60n/a tempfile._infer_return_type('', None, b'')
61n/a with self.assertRaises(TypeError):
62n/a tempfile._infer_return_type(b'', None, '')
63n/a
64n/a
65n/a# Common functionality.
66n/a
67n/aclass BaseTestCase(unittest.TestCase):
68n/a
69n/a str_check = re.compile(r"^[a-z0-9_-]{8}$")
70n/a b_check = re.compile(br"^[a-z0-9_-]{8}$")
71n/a
72n/a def setUp(self):
73n/a self._warnings_manager = support.check_warnings()
74n/a self._warnings_manager.__enter__()
75n/a warnings.filterwarnings("ignore", category=RuntimeWarning,
76n/a message="mktemp", module=__name__)
77n/a
78n/a def tearDown(self):
79n/a self._warnings_manager.__exit__(None, None, None)
80n/a
81n/a
82n/a def nameCheck(self, name, dir, pre, suf):
83n/a (ndir, nbase) = os.path.split(name)
84n/a npre = nbase[:len(pre)]
85n/a nsuf = nbase[len(nbase)-len(suf):]
86n/a
87n/a if dir is not None:
88n/a self.assertIs(type(name), str if type(dir) is str else bytes,
89n/a "unexpected return type")
90n/a if pre is not None:
91n/a self.assertIs(type(name), str if type(pre) is str else bytes,
92n/a "unexpected return type")
93n/a if suf is not None:
94n/a self.assertIs(type(name), str if type(suf) is str else bytes,
95n/a "unexpected return type")
96n/a if (dir, pre, suf) == (None, None, None):
97n/a self.assertIs(type(name), str, "default return type must be str")
98n/a
99n/a # check for equality of the absolute paths!
100n/a self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
101n/a "file %r not in directory %r" % (name, dir))
102n/a self.assertEqual(npre, pre,
103n/a "file %r does not begin with %r" % (nbase, pre))
104n/a self.assertEqual(nsuf, suf,
105n/a "file %r does not end with %r" % (nbase, suf))
106n/a
107n/a nbase = nbase[len(pre):len(nbase)-len(suf)]
108n/a check = self.str_check if isinstance(nbase, str) else self.b_check
109n/a self.assertTrue(check.match(nbase),
110n/a "random characters %r do not match %r"
111n/a % (nbase, check.pattern))
112n/a
113n/a
114n/aclass TestExports(BaseTestCase):
115n/a def test_exports(self):
116n/a # There are no surprising symbols in the tempfile module
117n/a dict = tempfile.__dict__
118n/a
119n/a expected = {
120n/a "NamedTemporaryFile" : 1,
121n/a "TemporaryFile" : 1,
122n/a "mkstemp" : 1,
123n/a "mkdtemp" : 1,
124n/a "mktemp" : 1,
125n/a "TMP_MAX" : 1,
126n/a "gettempprefix" : 1,
127n/a "gettempprefixb" : 1,
128n/a "gettempdir" : 1,
129n/a "gettempdirb" : 1,
130n/a "tempdir" : 1,
131n/a "template" : 1,
132n/a "SpooledTemporaryFile" : 1,
133n/a "TemporaryDirectory" : 1,
134n/a }
135n/a
136n/a unexp = []
137n/a for key in dict:
138n/a if key[0] != '_' and key not in expected:
139n/a unexp.append(key)
140n/a self.assertTrue(len(unexp) == 0,
141n/a "unexpected keys: %s" % unexp)
142n/a
143n/a
144n/aclass TestRandomNameSequence(BaseTestCase):
145n/a """Test the internal iterator object _RandomNameSequence."""
146n/a
147n/a def setUp(self):
148n/a self.r = tempfile._RandomNameSequence()
149n/a super().setUp()
150n/a
151n/a def test_get_six_char_str(self):
152n/a # _RandomNameSequence returns a six-character string
153n/a s = next(self.r)
154n/a self.nameCheck(s, '', '', '')
155n/a
156n/a def test_many(self):
157n/a # _RandomNameSequence returns no duplicate strings (stochastic)
158n/a
159n/a dict = {}
160n/a r = self.r
161n/a for i in range(TEST_FILES):
162n/a s = next(r)
163n/a self.nameCheck(s, '', '', '')
164n/a self.assertNotIn(s, dict)
165n/a dict[s] = 1
166n/a
167n/a def supports_iter(self):
168n/a # _RandomNameSequence supports the iterator protocol
169n/a
170n/a i = 0
171n/a r = self.r
172n/a for s in r:
173n/a i += 1
174n/a if i == 20:
175n/a break
176n/a
177n/a @unittest.skipUnless(hasattr(os, 'fork'),
178n/a "os.fork is required for this test")
179n/a def test_process_awareness(self):
180n/a # ensure that the random source differs between
181n/a # child and parent.
182n/a read_fd, write_fd = os.pipe()
183n/a pid = None
184n/a try:
185n/a pid = os.fork()
186n/a if not pid:
187n/a os.close(read_fd)
188n/a os.write(write_fd, next(self.r).encode("ascii"))
189n/a os.close(write_fd)
190n/a # bypass the normal exit handlers- leave those to
191n/a # the parent.
192n/a os._exit(0)
193n/a parent_value = next(self.r)
194n/a child_value = os.read(read_fd, len(parent_value)).decode("ascii")
195n/a finally:
196n/a if pid:
197n/a # best effort to ensure the process can't bleed out
198n/a # via any bugs above
199n/a try:
200n/a os.kill(pid, signal.SIGKILL)
201n/a except OSError:
202n/a pass
203n/a os.close(read_fd)
204n/a os.close(write_fd)
205n/a self.assertNotEqual(child_value, parent_value)
206n/a
207n/a
208n/a
209n/aclass TestCandidateTempdirList(BaseTestCase):
210n/a """Test the internal function _candidate_tempdir_list."""
211n/a
212n/a def test_nonempty_list(self):
213n/a # _candidate_tempdir_list returns a nonempty list of strings
214n/a
215n/a cand = tempfile._candidate_tempdir_list()
216n/a
217n/a self.assertFalse(len(cand) == 0)
218n/a for c in cand:
219n/a self.assertIsInstance(c, str)
220n/a
221n/a def test_wanted_dirs(self):
222n/a # _candidate_tempdir_list contains the expected directories
223n/a
224n/a # Make sure the interesting environment variables are all set.
225n/a with support.EnvironmentVarGuard() as env:
226n/a for envname in 'TMPDIR', 'TEMP', 'TMP':
227n/a dirname = os.getenv(envname)
228n/a if not dirname:
229n/a env[envname] = os.path.abspath(envname)
230n/a
231n/a cand = tempfile._candidate_tempdir_list()
232n/a
233n/a for envname in 'TMPDIR', 'TEMP', 'TMP':
234n/a dirname = os.getenv(envname)
235n/a if not dirname: raise ValueError
236n/a self.assertIn(dirname, cand)
237n/a
238n/a try:
239n/a dirname = os.getcwd()
240n/a except (AttributeError, OSError):
241n/a dirname = os.curdir
242n/a
243n/a self.assertIn(dirname, cand)
244n/a
245n/a # Not practical to try to verify the presence of OS-specific
246n/a # paths in this list.
247n/a
248n/a
249n/a# We test _get_default_tempdir some more by testing gettempdir.
250n/a
251n/aclass TestGetDefaultTempdir(BaseTestCase):
252n/a """Test _get_default_tempdir()."""
253n/a
254n/a def test_no_files_left_behind(self):
255n/a # use a private empty directory
256n/a with tempfile.TemporaryDirectory() as our_temp_directory:
257n/a # force _get_default_tempdir() to consider our empty directory
258n/a def our_candidate_list():
259n/a return [our_temp_directory]
260n/a
261n/a with support.swap_attr(tempfile, "_candidate_tempdir_list",
262n/a our_candidate_list):
263n/a # verify our directory is empty after _get_default_tempdir()
264n/a tempfile._get_default_tempdir()
265n/a self.assertEqual(os.listdir(our_temp_directory), [])
266n/a
267n/a def raise_OSError(*args, **kwargs):
268n/a raise OSError()
269n/a
270n/a with support.swap_attr(io, "open", raise_OSError):
271n/a # test again with failing io.open()
272n/a with self.assertRaises(FileNotFoundError):
273n/a tempfile._get_default_tempdir()
274n/a self.assertEqual(os.listdir(our_temp_directory), [])
275n/a
276n/a open = io.open
277n/a def bad_writer(*args, **kwargs):
278n/a fp = open(*args, **kwargs)
279n/a fp.write = raise_OSError
280n/a return fp
281n/a
282n/a with support.swap_attr(io, "open", bad_writer):
283n/a # test again with failing write()
284n/a with self.assertRaises(FileNotFoundError):
285n/a tempfile._get_default_tempdir()
286n/a self.assertEqual(os.listdir(our_temp_directory), [])
287n/a
288n/a
289n/aclass TestGetCandidateNames(BaseTestCase):
290n/a """Test the internal function _get_candidate_names."""
291n/a
292n/a def test_retval(self):
293n/a # _get_candidate_names returns a _RandomNameSequence object
294n/a obj = tempfile._get_candidate_names()
295n/a self.assertIsInstance(obj, tempfile._RandomNameSequence)
296n/a
297n/a def test_same_thing(self):
298n/a # _get_candidate_names always returns the same object
299n/a a = tempfile._get_candidate_names()
300n/a b = tempfile._get_candidate_names()
301n/a
302n/a self.assertTrue(a is b)
303n/a
304n/a
305n/a@contextlib.contextmanager
306n/adef _inside_empty_temp_dir():
307n/a dir = tempfile.mkdtemp()
308n/a try:
309n/a with support.swap_attr(tempfile, 'tempdir', dir):
310n/a yield
311n/a finally:
312n/a support.rmtree(dir)
313n/a
314n/a
315n/adef _mock_candidate_names(*names):
316n/a return support.swap_attr(tempfile,
317n/a '_get_candidate_names',
318n/a lambda: iter(names))
319n/a
320n/a
321n/aclass TestBadTempdir:
322n/a
323n/a def test_read_only_directory(self):
324n/a with _inside_empty_temp_dir():
325n/a oldmode = mode = os.stat(tempfile.tempdir).st_mode
326n/a mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
327n/a os.chmod(tempfile.tempdir, mode)
328n/a try:
329n/a if os.access(tempfile.tempdir, os.W_OK):
330n/a self.skipTest("can't set the directory read-only")
331n/a with self.assertRaises(PermissionError):
332n/a self.make_temp()
333n/a self.assertEqual(os.listdir(tempfile.tempdir), [])
334n/a finally:
335n/a os.chmod(tempfile.tempdir, oldmode)
336n/a
337n/a def test_nonexisting_directory(self):
338n/a with _inside_empty_temp_dir():
339n/a tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
340n/a with support.swap_attr(tempfile, 'tempdir', tempdir):
341n/a with self.assertRaises(FileNotFoundError):
342n/a self.make_temp()
343n/a
344n/a def test_non_directory(self):
345n/a with _inside_empty_temp_dir():
346n/a tempdir = os.path.join(tempfile.tempdir, 'file')
347n/a open(tempdir, 'wb').close()
348n/a with support.swap_attr(tempfile, 'tempdir', tempdir):
349n/a with self.assertRaises((NotADirectoryError, FileNotFoundError)):
350n/a self.make_temp()
351n/a
352n/a
353n/aclass TestMkstempInner(TestBadTempdir, BaseTestCase):
354n/a """Test the internal function _mkstemp_inner."""
355n/a
356n/a class mkstemped:
357n/a _bflags = tempfile._bin_openflags
358n/a _tflags = tempfile._text_openflags
359n/a _close = os.close
360n/a _unlink = os.unlink
361n/a
362n/a def __init__(self, dir, pre, suf, bin):
363n/a if bin: flags = self._bflags
364n/a else: flags = self._tflags
365n/a
366n/a output_type = tempfile._infer_return_type(dir, pre, suf)
367n/a (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
368n/a
369n/a def write(self, str):
370n/a os.write(self.fd, str)
371n/a
372n/a def __del__(self):
373n/a self._close(self.fd)
374n/a self._unlink(self.name)
375n/a
376n/a def do_create(self, dir=None, pre=None, suf=None, bin=1):
377n/a output_type = tempfile._infer_return_type(dir, pre, suf)
378n/a if dir is None:
379n/a if output_type is str:
380n/a dir = tempfile.gettempdir()
381n/a else:
382n/a dir = tempfile.gettempdirb()
383n/a if pre is None:
384n/a pre = output_type()
385n/a if suf is None:
386n/a suf = output_type()
387n/a file = self.mkstemped(dir, pre, suf, bin)
388n/a
389n/a self.nameCheck(file.name, dir, pre, suf)
390n/a return file
391n/a
392n/a def test_basic(self):
393n/a # _mkstemp_inner can create files
394n/a self.do_create().write(b"blat")
395n/a self.do_create(pre="a").write(b"blat")
396n/a self.do_create(suf="b").write(b"blat")
397n/a self.do_create(pre="a", suf="b").write(b"blat")
398n/a self.do_create(pre="aa", suf=".txt").write(b"blat")
399n/a
400n/a def test_basic_with_bytes_names(self):
401n/a # _mkstemp_inner can create files when given name parts all
402n/a # specified as bytes.
403n/a dir_b = tempfile.gettempdirb()
404n/a self.do_create(dir=dir_b, suf=b"").write(b"blat")
405n/a self.do_create(dir=dir_b, pre=b"a").write(b"blat")
406n/a self.do_create(dir=dir_b, suf=b"b").write(b"blat")
407n/a self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
408n/a self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
409n/a # Can't mix str & binary types in the args.
410n/a with self.assertRaises(TypeError):
411n/a self.do_create(dir="", suf=b"").write(b"blat")
412n/a with self.assertRaises(TypeError):
413n/a self.do_create(dir=dir_b, pre="").write(b"blat")
414n/a with self.assertRaises(TypeError):
415n/a self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
416n/a
417n/a def test_basic_many(self):
418n/a # _mkstemp_inner can create many files (stochastic)
419n/a extant = list(range(TEST_FILES))
420n/a for i in extant:
421n/a extant[i] = self.do_create(pre="aa")
422n/a
423n/a def test_choose_directory(self):
424n/a # _mkstemp_inner can create files in a user-selected directory
425n/a dir = tempfile.mkdtemp()
426n/a try:
427n/a self.do_create(dir=dir).write(b"blat")
428n/a finally:
429n/a os.rmdir(dir)
430n/a
431n/a @unittest.skipUnless(has_stat, 'os.stat not available')
432n/a def test_file_mode(self):
433n/a # _mkstemp_inner creates files with the proper mode
434n/a
435n/a file = self.do_create()
436n/a mode = stat.S_IMODE(os.stat(file.name).st_mode)
437n/a expected = 0o600
438n/a if sys.platform == 'win32':
439n/a # There's no distinction among 'user', 'group' and 'world';
440n/a # replicate the 'user' bits.
441n/a user = expected >> 6
442n/a expected = user * (1 + 8 + 64)
443n/a self.assertEqual(mode, expected)
444n/a
445n/a @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
446n/a def test_noinherit(self):
447n/a # _mkstemp_inner file handles are not inherited by child processes
448n/a
449n/a if support.verbose:
450n/a v="v"
451n/a else:
452n/a v="q"
453n/a
454n/a file = self.do_create()
455n/a self.assertEqual(os.get_inheritable(file.fd), False)
456n/a fd = "%d" % file.fd
457n/a
458n/a try:
459n/a me = __file__
460n/a except NameError:
461n/a me = sys.argv[0]
462n/a
463n/a # We have to exec something, so that FD_CLOEXEC will take
464n/a # effect. The core of this test is therefore in
465n/a # tf_inherit_check.py, which see.
466n/a tester = os.path.join(os.path.dirname(os.path.abspath(me)),
467n/a "tf_inherit_check.py")
468n/a
469n/a # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
470n/a # but an arg with embedded spaces should be decorated with double
471n/a # quotes on each end
472n/a if sys.platform == 'win32':
473n/a decorated = '"%s"' % sys.executable
474n/a tester = '"%s"' % tester
475n/a else:
476n/a decorated = sys.executable
477n/a
478n/a retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
479n/a self.assertFalse(retval < 0,
480n/a "child process caught fatal signal %d" % -retval)
481n/a self.assertFalse(retval > 0, "child process reports failure %d"%retval)
482n/a
483n/a @unittest.skipUnless(has_textmode, "text mode not available")
484n/a def test_textmode(self):
485n/a # _mkstemp_inner can create files in text mode
486n/a
487n/a # A text file is truncated at the first Ctrl+Z byte
488n/a f = self.do_create(bin=0)
489n/a f.write(b"blat\x1a")
490n/a f.write(b"extra\n")
491n/a os.lseek(f.fd, 0, os.SEEK_SET)
492n/a self.assertEqual(os.read(f.fd, 20), b"blat")
493n/a
494n/a def make_temp(self):
495n/a return tempfile._mkstemp_inner(tempfile.gettempdir(),
496n/a tempfile.gettempprefix(),
497n/a '',
498n/a tempfile._bin_openflags,
499n/a str)
500n/a
501n/a def test_collision_with_existing_file(self):
502n/a # _mkstemp_inner tries another name when a file with
503n/a # the chosen name already exists
504n/a with _inside_empty_temp_dir(), \
505n/a _mock_candidate_names('aaa', 'aaa', 'bbb'):
506n/a (fd1, name1) = self.make_temp()
507n/a os.close(fd1)
508n/a self.assertTrue(name1.endswith('aaa'))
509n/a
510n/a (fd2, name2) = self.make_temp()
511n/a os.close(fd2)
512n/a self.assertTrue(name2.endswith('bbb'))
513n/a
514n/a def test_collision_with_existing_directory(self):
515n/a # _mkstemp_inner tries another name when a directory with
516n/a # the chosen name already exists
517n/a with _inside_empty_temp_dir(), \
518n/a _mock_candidate_names('aaa', 'aaa', 'bbb'):
519n/a dir = tempfile.mkdtemp()
520n/a self.assertTrue(dir.endswith('aaa'))
521n/a
522n/a (fd, name) = self.make_temp()
523n/a os.close(fd)
524n/a self.assertTrue(name.endswith('bbb'))
525n/a
526n/a
527n/aclass TestGetTempPrefix(BaseTestCase):
528n/a """Test gettempprefix()."""
529n/a
530n/a def test_sane_template(self):
531n/a # gettempprefix returns a nonempty prefix string
532n/a p = tempfile.gettempprefix()
533n/a
534n/a self.assertIsInstance(p, str)
535n/a self.assertGreater(len(p), 0)
536n/a
537n/a pb = tempfile.gettempprefixb()
538n/a
539n/a self.assertIsInstance(pb, bytes)
540n/a self.assertGreater(len(pb), 0)
541n/a
542n/a def test_usable_template(self):
543n/a # gettempprefix returns a usable prefix string
544n/a
545n/a # Create a temp directory, avoiding use of the prefix.
546n/a # Then attempt to create a file whose name is
547n/a # prefix + 'xxxxxx.xxx' in that directory.
548n/a p = tempfile.gettempprefix() + "xxxxxx.xxx"
549n/a d = tempfile.mkdtemp(prefix="")
550n/a try:
551n/a p = os.path.join(d, p)
552n/a fd = os.open(p, os.O_RDWR | os.O_CREAT)
553n/a os.close(fd)
554n/a os.unlink(p)
555n/a finally:
556n/a os.rmdir(d)
557n/a
558n/a
559n/aclass TestGetTempDir(BaseTestCase):
560n/a """Test gettempdir()."""
561n/a
562n/a def test_directory_exists(self):
563n/a # gettempdir returns a directory which exists
564n/a
565n/a for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
566n/a self.assertTrue(os.path.isabs(d) or d == os.curdir,
567n/a "%r is not an absolute path" % d)
568n/a self.assertTrue(os.path.isdir(d),
569n/a "%r is not a directory" % d)
570n/a
571n/a def test_directory_writable(self):
572n/a # gettempdir returns a directory writable by the user
573n/a
574n/a # sneaky: just instantiate a NamedTemporaryFile, which
575n/a # defaults to writing into the directory returned by
576n/a # gettempdir.
577n/a file = tempfile.NamedTemporaryFile()
578n/a file.write(b"blat")
579n/a file.close()
580n/a
581n/a def test_same_thing(self):
582n/a # gettempdir always returns the same object
583n/a a = tempfile.gettempdir()
584n/a b = tempfile.gettempdir()
585n/a c = tempfile.gettempdirb()
586n/a
587n/a self.assertTrue(a is b)
588n/a self.assertNotEqual(type(a), type(c))
589n/a self.assertEqual(a, os.fsdecode(c))
590n/a
591n/a def test_case_sensitive(self):
592n/a # gettempdir should not flatten its case
593n/a # even on a case-insensitive file system
594n/a case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
595n/a _tempdir, tempfile.tempdir = tempfile.tempdir, None
596n/a try:
597n/a with support.EnvironmentVarGuard() as env:
598n/a # Fake the first env var which is checked as a candidate
599n/a env["TMPDIR"] = case_sensitive_tempdir
600n/a self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
601n/a finally:
602n/a tempfile.tempdir = _tempdir
603n/a support.rmdir(case_sensitive_tempdir)
604n/a
605n/a
606n/aclass TestMkstemp(BaseTestCase):
607n/a """Test mkstemp()."""
608n/a
609n/a def do_create(self, dir=None, pre=None, suf=None):
610n/a output_type = tempfile._infer_return_type(dir, pre, suf)
611n/a if dir is None:
612n/a if output_type is str:
613n/a dir = tempfile.gettempdir()
614n/a else:
615n/a dir = tempfile.gettempdirb()
616n/a if pre is None:
617n/a pre = output_type()
618n/a if suf is None:
619n/a suf = output_type()
620n/a (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
621n/a (ndir, nbase) = os.path.split(name)
622n/a adir = os.path.abspath(dir)
623n/a self.assertEqual(adir, ndir,
624n/a "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
625n/a
626n/a try:
627n/a self.nameCheck(name, dir, pre, suf)
628n/a finally:
629n/a os.close(fd)
630n/a os.unlink(name)
631n/a
632n/a def test_basic(self):
633n/a # mkstemp can create files
634n/a self.do_create()
635n/a self.do_create(pre="a")
636n/a self.do_create(suf="b")
637n/a self.do_create(pre="a", suf="b")
638n/a self.do_create(pre="aa", suf=".txt")
639n/a self.do_create(dir=".")
640n/a
641n/a def test_basic_with_bytes_names(self):
642n/a # mkstemp can create files when given name parts all
643n/a # specified as bytes.
644n/a d = tempfile.gettempdirb()
645n/a self.do_create(dir=d, suf=b"")
646n/a self.do_create(dir=d, pre=b"a")
647n/a self.do_create(dir=d, suf=b"b")
648n/a self.do_create(dir=d, pre=b"a", suf=b"b")
649n/a self.do_create(dir=d, pre=b"aa", suf=b".txt")
650n/a self.do_create(dir=b".")
651n/a with self.assertRaises(TypeError):
652n/a self.do_create(dir=".", pre=b"aa", suf=b".txt")
653n/a with self.assertRaises(TypeError):
654n/a self.do_create(dir=b".", pre="aa", suf=b".txt")
655n/a with self.assertRaises(TypeError):
656n/a self.do_create(dir=b".", pre=b"aa", suf=".txt")
657n/a
658n/a
659n/a def test_choose_directory(self):
660n/a # mkstemp can create directories in a user-selected directory
661n/a dir = tempfile.mkdtemp()
662n/a try:
663n/a self.do_create(dir=dir)
664n/a finally:
665n/a os.rmdir(dir)
666n/a
667n/a
668n/aclass TestMkdtemp(TestBadTempdir, BaseTestCase):
669n/a """Test mkdtemp()."""
670n/a
671n/a def make_temp(self):
672n/a return tempfile.mkdtemp()
673n/a
674n/a def do_create(self, dir=None, pre=None, suf=None):
675n/a output_type = tempfile._infer_return_type(dir, pre, suf)
676n/a if dir is None:
677n/a if output_type is str:
678n/a dir = tempfile.gettempdir()
679n/a else:
680n/a dir = tempfile.gettempdirb()
681n/a if pre is None:
682n/a pre = output_type()
683n/a if suf is None:
684n/a suf = output_type()
685n/a name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
686n/a
687n/a try:
688n/a self.nameCheck(name, dir, pre, suf)
689n/a return name
690n/a except:
691n/a os.rmdir(name)
692n/a raise
693n/a
694n/a def test_basic(self):
695n/a # mkdtemp can create directories
696n/a os.rmdir(self.do_create())
697n/a os.rmdir(self.do_create(pre="a"))
698n/a os.rmdir(self.do_create(suf="b"))
699n/a os.rmdir(self.do_create(pre="a", suf="b"))
700n/a os.rmdir(self.do_create(pre="aa", suf=".txt"))
701n/a
702n/a def test_basic_with_bytes_names(self):
703n/a # mkdtemp can create directories when given all binary parts
704n/a d = tempfile.gettempdirb()
705n/a os.rmdir(self.do_create(dir=d))
706n/a os.rmdir(self.do_create(dir=d, pre=b"a"))
707n/a os.rmdir(self.do_create(dir=d, suf=b"b"))
708n/a os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
709n/a os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
710n/a with self.assertRaises(TypeError):
711n/a os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
712n/a with self.assertRaises(TypeError):
713n/a os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
714n/a with self.assertRaises(TypeError):
715n/a os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
716n/a
717n/a def test_basic_many(self):
718n/a # mkdtemp can create many directories (stochastic)
719n/a extant = list(range(TEST_FILES))
720n/a try:
721n/a for i in extant:
722n/a extant[i] = self.do_create(pre="aa")
723n/a finally:
724n/a for i in extant:
725n/a if(isinstance(i, str)):
726n/a os.rmdir(i)
727n/a
728n/a def test_choose_directory(self):
729n/a # mkdtemp can create directories in a user-selected directory
730n/a dir = tempfile.mkdtemp()
731n/a try:
732n/a os.rmdir(self.do_create(dir=dir))
733n/a finally:
734n/a os.rmdir(dir)
735n/a
736n/a @unittest.skipUnless(has_stat, 'os.stat not available')
737n/a def test_mode(self):
738n/a # mkdtemp creates directories with the proper mode
739n/a
740n/a dir = self.do_create()
741n/a try:
742n/a mode = stat.S_IMODE(os.stat(dir).st_mode)
743n/a mode &= 0o777 # Mask off sticky bits inherited from /tmp
744n/a expected = 0o700
745n/a if sys.platform == 'win32':
746n/a # There's no distinction among 'user', 'group' and 'world';
747n/a # replicate the 'user' bits.
748n/a user = expected >> 6
749n/a expected = user * (1 + 8 + 64)
750n/a self.assertEqual(mode, expected)
751n/a finally:
752n/a os.rmdir(dir)
753n/a
754n/a def test_collision_with_existing_file(self):
755n/a # mkdtemp tries another name when a file with
756n/a # the chosen name already exists
757n/a with _inside_empty_temp_dir(), \
758n/a _mock_candidate_names('aaa', 'aaa', 'bbb'):
759n/a file = tempfile.NamedTemporaryFile(delete=False)
760n/a file.close()
761n/a self.assertTrue(file.name.endswith('aaa'))
762n/a dir = tempfile.mkdtemp()
763n/a self.assertTrue(dir.endswith('bbb'))
764n/a
765n/a def test_collision_with_existing_directory(self):
766n/a # mkdtemp tries another name when a directory with
767n/a # the chosen name already exists
768n/a with _inside_empty_temp_dir(), \
769n/a _mock_candidate_names('aaa', 'aaa', 'bbb'):
770n/a dir1 = tempfile.mkdtemp()
771n/a self.assertTrue(dir1.endswith('aaa'))
772n/a dir2 = tempfile.mkdtemp()
773n/a self.assertTrue(dir2.endswith('bbb'))
774n/a
775n/a
776n/aclass TestMktemp(BaseTestCase):
777n/a """Test mktemp()."""
778n/a
779n/a # For safety, all use of mktemp must occur in a private directory.
780n/a # We must also suppress the RuntimeWarning it generates.
781n/a def setUp(self):
782n/a self.dir = tempfile.mkdtemp()
783n/a super().setUp()
784n/a
785n/a def tearDown(self):
786n/a if self.dir:
787n/a os.rmdir(self.dir)
788n/a self.dir = None
789n/a super().tearDown()
790n/a
791n/a class mktemped:
792n/a _unlink = os.unlink
793n/a _bflags = tempfile._bin_openflags
794n/a
795n/a def __init__(self, dir, pre, suf):
796n/a self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
797n/a # Create the file. This will raise an exception if it's
798n/a # mysteriously appeared in the meanwhile.
799n/a os.close(os.open(self.name, self._bflags, 0o600))
800n/a
801n/a def __del__(self):
802n/a self._unlink(self.name)
803n/a
804n/a def do_create(self, pre="", suf=""):
805n/a file = self.mktemped(self.dir, pre, suf)
806n/a
807n/a self.nameCheck(file.name, self.dir, pre, suf)
808n/a return file
809n/a
810n/a def test_basic(self):
811n/a # mktemp can choose usable file names
812n/a self.do_create()
813n/a self.do_create(pre="a")
814n/a self.do_create(suf="b")
815n/a self.do_create(pre="a", suf="b")
816n/a self.do_create(pre="aa", suf=".txt")
817n/a
818n/a def test_many(self):
819n/a # mktemp can choose many usable file names (stochastic)
820n/a extant = list(range(TEST_FILES))
821n/a for i in extant:
822n/a extant[i] = self.do_create(pre="aa")
823n/a
824n/a## def test_warning(self):
825n/a## # mktemp issues a warning when used
826n/a## warnings.filterwarnings("error",
827n/a## category=RuntimeWarning,
828n/a## message="mktemp")
829n/a## self.assertRaises(RuntimeWarning,
830n/a## tempfile.mktemp, dir=self.dir)
831n/a
832n/a
833n/a# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
834n/a
835n/a
836n/aclass TestNamedTemporaryFile(BaseTestCase):
837n/a """Test NamedTemporaryFile()."""
838n/a
839n/a def do_create(self, dir=None, pre="", suf="", delete=True):
840n/a if dir is None:
841n/a dir = tempfile.gettempdir()
842n/a file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
843n/a delete=delete)
844n/a
845n/a self.nameCheck(file.name, dir, pre, suf)
846n/a return file
847n/a
848n/a
849n/a def test_basic(self):
850n/a # NamedTemporaryFile can create files
851n/a self.do_create()
852n/a self.do_create(pre="a")
853n/a self.do_create(suf="b")
854n/a self.do_create(pre="a", suf="b")
855n/a self.do_create(pre="aa", suf=".txt")
856n/a
857n/a def test_method_lookup(self):
858n/a # Issue #18879: Looking up a temporary file method should keep it
859n/a # alive long enough.
860n/a f = self.do_create()
861n/a wr = weakref.ref(f)
862n/a write = f.write
863n/a write2 = f.write
864n/a del f
865n/a write(b'foo')
866n/a del write
867n/a write2(b'bar')
868n/a del write2
869n/a if support.check_impl_detail(cpython=True):
870n/a # No reference cycle was created.
871n/a self.assertIsNone(wr())
872n/a
873n/a def test_iter(self):
874n/a # Issue #23700: getting iterator from a temporary file should keep
875n/a # it alive as long as it's being iterated over
876n/a lines = [b'spam\n', b'eggs\n', b'beans\n']
877n/a def make_file():
878n/a f = tempfile.NamedTemporaryFile(mode='w+b')
879n/a f.write(b''.join(lines))
880n/a f.seek(0)
881n/a return f
882n/a for i, l in enumerate(make_file()):
883n/a self.assertEqual(l, lines[i])
884n/a self.assertEqual(i, len(lines) - 1)
885n/a
886n/a def test_creates_named(self):
887n/a # NamedTemporaryFile creates files with names
888n/a f = tempfile.NamedTemporaryFile()
889n/a self.assertTrue(os.path.exists(f.name),
890n/a "NamedTemporaryFile %s does not exist" % f.name)
891n/a
892n/a def test_del_on_close(self):
893n/a # A NamedTemporaryFile is deleted when closed
894n/a dir = tempfile.mkdtemp()
895n/a try:
896n/a f = tempfile.NamedTemporaryFile(dir=dir)
897n/a f.write(b'blat')
898n/a f.close()
899n/a self.assertFalse(os.path.exists(f.name),
900n/a "NamedTemporaryFile %s exists after close" % f.name)
901n/a finally:
902n/a os.rmdir(dir)
903n/a
904n/a def test_dis_del_on_close(self):
905n/a # Tests that delete-on-close can be disabled
906n/a dir = tempfile.mkdtemp()
907n/a tmp = None
908n/a try:
909n/a f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
910n/a tmp = f.name
911n/a f.write(b'blat')
912n/a f.close()
913n/a self.assertTrue(os.path.exists(f.name),
914n/a "NamedTemporaryFile %s missing after close" % f.name)
915n/a finally:
916n/a if tmp is not None:
917n/a os.unlink(tmp)
918n/a os.rmdir(dir)
919n/a
920n/a def test_multiple_close(self):
921n/a # A NamedTemporaryFile can be closed many times without error
922n/a f = tempfile.NamedTemporaryFile()
923n/a f.write(b'abc\n')
924n/a f.close()
925n/a f.close()
926n/a f.close()
927n/a
928n/a def test_context_manager(self):
929n/a # A NamedTemporaryFile can be used as a context manager
930n/a with tempfile.NamedTemporaryFile() as f:
931n/a self.assertTrue(os.path.exists(f.name))
932n/a self.assertFalse(os.path.exists(f.name))
933n/a def use_closed():
934n/a with f:
935n/a pass
936n/a self.assertRaises(ValueError, use_closed)
937n/a
938n/a def test_no_leak_fd(self):
939n/a # Issue #21058: don't leak file descriptor when io.open() fails
940n/a closed = []
941n/a os_close = os.close
942n/a def close(fd):
943n/a closed.append(fd)
944n/a os_close(fd)
945n/a
946n/a with mock.patch('os.close', side_effect=close):
947n/a with mock.patch('io.open', side_effect=ValueError):
948n/a self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
949n/a self.assertEqual(len(closed), 1)
950n/a
951n/a def test_bad_mode(self):
952n/a dir = tempfile.mkdtemp()
953n/a self.addCleanup(support.rmtree, dir)
954n/a with self.assertRaises(ValueError):
955n/a tempfile.NamedTemporaryFile(mode='wr', dir=dir)
956n/a with self.assertRaises(TypeError):
957n/a tempfile.NamedTemporaryFile(mode=2, dir=dir)
958n/a self.assertEqual(os.listdir(dir), [])
959n/a
960n/a # How to test the mode and bufsize parameters?
961n/a
962n/aclass TestSpooledTemporaryFile(BaseTestCase):
963n/a """Test SpooledTemporaryFile()."""
964n/a
965n/a def do_create(self, max_size=0, dir=None, pre="", suf=""):
966n/a if dir is None:
967n/a dir = tempfile.gettempdir()
968n/a file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
969n/a
970n/a return file
971n/a
972n/a
973n/a def test_basic(self):
974n/a # SpooledTemporaryFile can create files
975n/a f = self.do_create()
976n/a self.assertFalse(f._rolled)
977n/a f = self.do_create(max_size=100, pre="a", suf=".txt")
978n/a self.assertFalse(f._rolled)
979n/a
980n/a def test_del_on_close(self):
981n/a # A SpooledTemporaryFile is deleted when closed
982n/a dir = tempfile.mkdtemp()
983n/a try:
984n/a f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
985n/a self.assertFalse(f._rolled)
986n/a f.write(b'blat ' * 5)
987n/a self.assertTrue(f._rolled)
988n/a filename = f.name
989n/a f.close()
990n/a self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
991n/a "SpooledTemporaryFile %s exists after close" % filename)
992n/a finally:
993n/a os.rmdir(dir)
994n/a
995n/a def test_rewrite_small(self):
996n/a # A SpooledTemporaryFile can be written to multiple within the max_size
997n/a f = self.do_create(max_size=30)
998n/a self.assertFalse(f._rolled)
999n/a for i in range(5):
1000n/a f.seek(0, 0)
1001n/a f.write(b'x' * 20)
1002n/a self.assertFalse(f._rolled)
1003n/a
1004n/a def test_write_sequential(self):
1005n/a # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1006n/a # over afterward
1007n/a f = self.do_create(max_size=30)
1008n/a self.assertFalse(f._rolled)
1009n/a f.write(b'x' * 20)
1010n/a self.assertFalse(f._rolled)
1011n/a f.write(b'x' * 10)
1012n/a self.assertFalse(f._rolled)
1013n/a f.write(b'x')
1014n/a self.assertTrue(f._rolled)
1015n/a
1016n/a def test_writelines(self):
1017n/a # Verify writelines with a SpooledTemporaryFile
1018n/a f = self.do_create()
1019n/a f.writelines((b'x', b'y', b'z'))
1020n/a f.seek(0)
1021n/a buf = f.read()
1022n/a self.assertEqual(buf, b'xyz')
1023n/a
1024n/a def test_writelines_sequential(self):
1025n/a # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1026n/a # over afterward
1027n/a f = self.do_create(max_size=35)
1028n/a f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1029n/a self.assertFalse(f._rolled)
1030n/a f.write(b'x')
1031n/a self.assertTrue(f._rolled)
1032n/a
1033n/a def test_sparse(self):
1034n/a # A SpooledTemporaryFile that is written late in the file will extend
1035n/a # when that occurs
1036n/a f = self.do_create(max_size=30)
1037n/a self.assertFalse(f._rolled)
1038n/a f.seek(100, 0)
1039n/a self.assertFalse(f._rolled)
1040n/a f.write(b'x')
1041n/a self.assertTrue(f._rolled)
1042n/a
1043n/a def test_fileno(self):
1044n/a # A SpooledTemporaryFile should roll over to a real file on fileno()
1045n/a f = self.do_create(max_size=30)
1046n/a self.assertFalse(f._rolled)
1047n/a self.assertTrue(f.fileno() > 0)
1048n/a self.assertTrue(f._rolled)
1049n/a
1050n/a def test_multiple_close_before_rollover(self):
1051n/a # A SpooledTemporaryFile can be closed many times without error
1052n/a f = tempfile.SpooledTemporaryFile()
1053n/a f.write(b'abc\n')
1054n/a self.assertFalse(f._rolled)
1055n/a f.close()
1056n/a f.close()
1057n/a f.close()
1058n/a
1059n/a def test_multiple_close_after_rollover(self):
1060n/a # A SpooledTemporaryFile can be closed many times without error
1061n/a f = tempfile.SpooledTemporaryFile(max_size=1)
1062n/a f.write(b'abc\n')
1063n/a self.assertTrue(f._rolled)
1064n/a f.close()
1065n/a f.close()
1066n/a f.close()
1067n/a
1068n/a def test_bound_methods(self):
1069n/a # It should be OK to steal a bound method from a SpooledTemporaryFile
1070n/a # and use it independently; when the file rolls over, those bound
1071n/a # methods should continue to function
1072n/a f = self.do_create(max_size=30)
1073n/a read = f.read
1074n/a write = f.write
1075n/a seek = f.seek
1076n/a
1077n/a write(b"a" * 35)
1078n/a write(b"b" * 35)
1079n/a seek(0, 0)
1080n/a self.assertEqual(read(70), b'a'*35 + b'b'*35)
1081n/a
1082n/a def test_properties(self):
1083n/a f = tempfile.SpooledTemporaryFile(max_size=10)
1084n/a f.write(b'x' * 10)
1085n/a self.assertFalse(f._rolled)
1086n/a self.assertEqual(f.mode, 'w+b')
1087n/a self.assertIsNone(f.name)
1088n/a with self.assertRaises(AttributeError):
1089n/a f.newlines
1090n/a with self.assertRaises(AttributeError):
1091n/a f.encoding
1092n/a
1093n/a f.write(b'x')
1094n/a self.assertTrue(f._rolled)
1095n/a self.assertEqual(f.mode, 'rb+')
1096n/a self.assertIsNotNone(f.name)
1097n/a with self.assertRaises(AttributeError):
1098n/a f.newlines
1099n/a with self.assertRaises(AttributeError):
1100n/a f.encoding
1101n/a
1102n/a def test_text_mode(self):
1103n/a # Creating a SpooledTemporaryFile with a text mode should produce
1104n/a # a file object reading and writing (Unicode) text strings.
1105n/a f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
1106n/a f.write("abc\n")
1107n/a f.seek(0)
1108n/a self.assertEqual(f.read(), "abc\n")
1109n/a f.write("def\n")
1110n/a f.seek(0)
1111n/a self.assertEqual(f.read(), "abc\ndef\n")
1112n/a self.assertFalse(f._rolled)
1113n/a self.assertEqual(f.mode, 'w+')
1114n/a self.assertIsNone(f.name)
1115n/a self.assertIsNone(f.newlines)
1116n/a self.assertIsNone(f.encoding)
1117n/a
1118n/a f.write("xyzzy\n")
1119n/a f.seek(0)
1120n/a self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1121n/a # Check that Ctrl+Z doesn't truncate the file
1122n/a f.write("foo\x1abar\n")
1123n/a f.seek(0)
1124n/a self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1125n/a self.assertTrue(f._rolled)
1126n/a self.assertEqual(f.mode, 'w+')
1127n/a self.assertIsNotNone(f.name)
1128n/a self.assertEqual(f.newlines, os.linesep)
1129n/a self.assertIsNotNone(f.encoding)
1130n/a
1131n/a def test_text_newline_and_encoding(self):
1132n/a f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1133n/a newline='', encoding='utf-8')
1134n/a f.write("\u039B\r\n")
1135n/a f.seek(0)
1136n/a self.assertEqual(f.read(), "\u039B\r\n")
1137n/a self.assertFalse(f._rolled)
1138n/a self.assertEqual(f.mode, 'w+')
1139n/a self.assertIsNone(f.name)
1140n/a self.assertIsNone(f.newlines)
1141n/a self.assertIsNone(f.encoding)
1142n/a
1143n/a f.write("\u039B" * 20 + "\r\n")
1144n/a f.seek(0)
1145n/a self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
1146n/a self.assertTrue(f._rolled)
1147n/a self.assertEqual(f.mode, 'w+')
1148n/a self.assertIsNotNone(f.name)
1149n/a self.assertIsNotNone(f.newlines)
1150n/a self.assertEqual(f.encoding, 'utf-8')
1151n/a
1152n/a def test_context_manager_before_rollover(self):
1153n/a # A SpooledTemporaryFile can be used as a context manager
1154n/a with tempfile.SpooledTemporaryFile(max_size=1) as f:
1155n/a self.assertFalse(f._rolled)
1156n/a self.assertFalse(f.closed)
1157n/a self.assertTrue(f.closed)
1158n/a def use_closed():
1159n/a with f:
1160n/a pass
1161n/a self.assertRaises(ValueError, use_closed)
1162n/a
1163n/a def test_context_manager_during_rollover(self):
1164n/a # A SpooledTemporaryFile can be used as a context manager
1165n/a with tempfile.SpooledTemporaryFile(max_size=1) as f:
1166n/a self.assertFalse(f._rolled)
1167n/a f.write(b'abc\n')
1168n/a f.flush()
1169n/a self.assertTrue(f._rolled)
1170n/a self.assertFalse(f.closed)
1171n/a self.assertTrue(f.closed)
1172n/a def use_closed():
1173n/a with f:
1174n/a pass
1175n/a self.assertRaises(ValueError, use_closed)
1176n/a
1177n/a def test_context_manager_after_rollover(self):
1178n/a # A SpooledTemporaryFile can be used as a context manager
1179n/a f = tempfile.SpooledTemporaryFile(max_size=1)
1180n/a f.write(b'abc\n')
1181n/a f.flush()
1182n/a self.assertTrue(f._rolled)
1183n/a with f:
1184n/a self.assertFalse(f.closed)
1185n/a self.assertTrue(f.closed)
1186n/a def use_closed():
1187n/a with f:
1188n/a pass
1189n/a self.assertRaises(ValueError, use_closed)
1190n/a
1191n/a def test_truncate_with_size_parameter(self):
1192n/a # A SpooledTemporaryFile can be truncated to zero size
1193n/a f = tempfile.SpooledTemporaryFile(max_size=10)
1194n/a f.write(b'abcdefg\n')
1195n/a f.seek(0)
1196n/a f.truncate()
1197n/a self.assertFalse(f._rolled)
1198n/a self.assertEqual(f._file.getvalue(), b'')
1199n/a # A SpooledTemporaryFile can be truncated to a specific size
1200n/a f = tempfile.SpooledTemporaryFile(max_size=10)
1201n/a f.write(b'abcdefg\n')
1202n/a f.truncate(4)
1203n/a self.assertFalse(f._rolled)
1204n/a self.assertEqual(f._file.getvalue(), b'abcd')
1205n/a # A SpooledTemporaryFile rolls over if truncated to large size
1206n/a f = tempfile.SpooledTemporaryFile(max_size=10)
1207n/a f.write(b'abcdefg\n')
1208n/a f.truncate(20)
1209n/a self.assertTrue(f._rolled)
1210n/a if has_stat:
1211n/a self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1212n/a
1213n/a
1214n/aif tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1215n/a
1216n/a class TestTemporaryFile(BaseTestCase):
1217n/a """Test TemporaryFile()."""
1218n/a
1219n/a def test_basic(self):
1220n/a # TemporaryFile can create files
1221n/a # No point in testing the name params - the file has no name.
1222n/a tempfile.TemporaryFile()
1223n/a
1224n/a def test_has_no_name(self):
1225n/a # TemporaryFile creates files with no names (on this system)
1226n/a dir = tempfile.mkdtemp()
1227n/a f = tempfile.TemporaryFile(dir=dir)
1228n/a f.write(b'blat')
1229n/a
1230n/a # Sneaky: because this file has no name, it should not prevent
1231n/a # us from removing the directory it was created in.
1232n/a try:
1233n/a os.rmdir(dir)
1234n/a except:
1235n/a # cleanup
1236n/a f.close()
1237n/a os.rmdir(dir)
1238n/a raise
1239n/a
1240n/a def test_multiple_close(self):
1241n/a # A TemporaryFile can be closed many times without error
1242n/a f = tempfile.TemporaryFile()
1243n/a f.write(b'abc\n')
1244n/a f.close()
1245n/a f.close()
1246n/a f.close()
1247n/a
1248n/a # How to test the mode and bufsize parameters?
1249n/a def test_mode_and_encoding(self):
1250n/a
1251n/a def roundtrip(input, *args, **kwargs):
1252n/a with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1253n/a fileobj.write(input)
1254n/a fileobj.seek(0)
1255n/a self.assertEqual(input, fileobj.read())
1256n/a
1257n/a roundtrip(b"1234", "w+b")
1258n/a roundtrip("abdc\n", "w+")
1259n/a roundtrip("\u039B", "w+", encoding="utf-16")
1260n/a roundtrip("foo\r\n", "w+", newline="")
1261n/a
1262n/a def test_no_leak_fd(self):
1263n/a # Issue #21058: don't leak file descriptor when io.open() fails
1264n/a closed = []
1265n/a os_close = os.close
1266n/a def close(fd):
1267n/a closed.append(fd)
1268n/a os_close(fd)
1269n/a
1270n/a with mock.patch('os.close', side_effect=close):
1271n/a with mock.patch('io.open', side_effect=ValueError):
1272n/a self.assertRaises(ValueError, tempfile.TemporaryFile)
1273n/a self.assertEqual(len(closed), 1)
1274n/a
1275n/a
1276n/a
1277n/a# Helper for test_del_on_shutdown
1278n/aclass NulledModules:
1279n/a def __init__(self, *modules):
1280n/a self.refs = [mod.__dict__ for mod in modules]
1281n/a self.contents = [ref.copy() for ref in self.refs]
1282n/a
1283n/a def __enter__(self):
1284n/a for d in self.refs:
1285n/a for key in d:
1286n/a d[key] = None
1287n/a
1288n/a def __exit__(self, *exc_info):
1289n/a for d, c in zip(self.refs, self.contents):
1290n/a d.clear()
1291n/a d.update(c)
1292n/a
1293n/aclass TestTemporaryDirectory(BaseTestCase):
1294n/a """Test TemporaryDirectory()."""
1295n/a
1296n/a def do_create(self, dir=None, pre="", suf="", recurse=1):
1297n/a if dir is None:
1298n/a dir = tempfile.gettempdir()
1299n/a tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1300n/a self.nameCheck(tmp.name, dir, pre, suf)
1301n/a # Create a subdirectory and some files
1302n/a if recurse:
1303n/a d1 = self.do_create(tmp.name, pre, suf, recurse-1)
1304n/a d1.name = None
1305n/a with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
1306n/a f.write(b"Hello world!")
1307n/a return tmp
1308n/a
1309n/a def test_mkdtemp_failure(self):
1310n/a # Check no additional exception if mkdtemp fails
1311n/a # Previously would raise AttributeError instead
1312n/a # (noted as part of Issue #10188)
1313n/a with tempfile.TemporaryDirectory() as nonexistent:
1314n/a pass
1315n/a with self.assertRaises(FileNotFoundError) as cm:
1316n/a tempfile.TemporaryDirectory(dir=nonexistent)
1317n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
1318n/a
1319n/a def test_explicit_cleanup(self):
1320n/a # A TemporaryDirectory is deleted when cleaned up
1321n/a dir = tempfile.mkdtemp()
1322n/a try:
1323n/a d = self.do_create(dir=dir)
1324n/a self.assertTrue(os.path.exists(d.name),
1325n/a "TemporaryDirectory %s does not exist" % d.name)
1326n/a d.cleanup()
1327n/a self.assertFalse(os.path.exists(d.name),
1328n/a "TemporaryDirectory %s exists after cleanup" % d.name)
1329n/a finally:
1330n/a os.rmdir(dir)
1331n/a
1332n/a @support.skip_unless_symlink
1333n/a def test_cleanup_with_symlink_to_a_directory(self):
1334n/a # cleanup() should not follow symlinks to directories (issue #12464)
1335n/a d1 = self.do_create()
1336n/a d2 = self.do_create(recurse=0)
1337n/a
1338n/a # Symlink d1/foo -> d2
1339n/a os.symlink(d2.name, os.path.join(d1.name, "foo"))
1340n/a
1341n/a # This call to cleanup() should not follow the "foo" symlink
1342n/a d1.cleanup()
1343n/a
1344n/a self.assertFalse(os.path.exists(d1.name),
1345n/a "TemporaryDirectory %s exists after cleanup" % d1.name)
1346n/a self.assertTrue(os.path.exists(d2.name),
1347n/a "Directory pointed to by a symlink was deleted")
1348n/a self.assertEqual(os.listdir(d2.name), ['test.txt'],
1349n/a "Contents of the directory pointed to by a symlink "
1350n/a "were deleted")
1351n/a d2.cleanup()
1352n/a
1353n/a @support.cpython_only
1354n/a def test_del_on_collection(self):
1355n/a # A TemporaryDirectory is deleted when garbage collected
1356n/a dir = tempfile.mkdtemp()
1357n/a try:
1358n/a d = self.do_create(dir=dir)
1359n/a name = d.name
1360n/a del d # Rely on refcounting to invoke __del__
1361n/a self.assertFalse(os.path.exists(name),
1362n/a "TemporaryDirectory %s exists after __del__" % name)
1363n/a finally:
1364n/a os.rmdir(dir)
1365n/a
1366n/a def test_del_on_shutdown(self):
1367n/a # A TemporaryDirectory may be cleaned up during shutdown
1368n/a with self.do_create() as dir:
1369n/a for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1370n/a code = """if True:
1371n/a import builtins
1372n/a import os
1373n/a import shutil
1374n/a import sys
1375n/a import tempfile
1376n/a import warnings
1377n/a
1378n/a tmp = tempfile.TemporaryDirectory(dir={dir!r})
1379n/a sys.stdout.buffer.write(tmp.name.encode())
1380n/a
1381n/a tmp2 = os.path.join(tmp.name, 'test_dir')
1382n/a os.mkdir(tmp2)
1383n/a with open(os.path.join(tmp2, "test.txt"), "w") as f:
1384n/a f.write("Hello world!")
1385n/a
1386n/a {mod}.tmp = tmp
1387n/a
1388n/a warnings.filterwarnings("always", category=ResourceWarning)
1389n/a """.format(dir=dir, mod=mod)
1390n/a rc, out, err = script_helper.assert_python_ok("-c", code)
1391n/a tmp_name = out.decode().strip()
1392n/a self.assertFalse(os.path.exists(tmp_name),
1393n/a "TemporaryDirectory %s exists after cleanup" % tmp_name)
1394n/a err = err.decode('utf-8', 'backslashreplace')
1395n/a self.assertNotIn("Exception ", err)
1396n/a self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1397n/a
1398n/a def test_exit_on_shutdown(self):
1399n/a # Issue #22427
1400n/a with self.do_create() as dir:
1401n/a code = """if True:
1402n/a import sys
1403n/a import tempfile
1404n/a import warnings
1405n/a
1406n/a def generator():
1407n/a with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1408n/a yield tmp
1409n/a g = generator()
1410n/a sys.stdout.buffer.write(next(g).encode())
1411n/a
1412n/a warnings.filterwarnings("always", category=ResourceWarning)
1413n/a """.format(dir=dir)
1414n/a rc, out, err = script_helper.assert_python_ok("-c", code)
1415n/a tmp_name = out.decode().strip()
1416n/a self.assertFalse(os.path.exists(tmp_name),
1417n/a "TemporaryDirectory %s exists after cleanup" % tmp_name)
1418n/a err = err.decode('utf-8', 'backslashreplace')
1419n/a self.assertNotIn("Exception ", err)
1420n/a self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1421n/a
1422n/a def test_warnings_on_cleanup(self):
1423n/a # ResourceWarning will be triggered by __del__
1424n/a with self.do_create() as dir:
1425n/a d = self.do_create(dir=dir, recurse=3)
1426n/a name = d.name
1427n/a
1428n/a # Check for the resource warning
1429n/a with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
1430n/a warnings.filterwarnings("always", category=ResourceWarning)
1431n/a del d
1432n/a support.gc_collect()
1433n/a self.assertFalse(os.path.exists(name),
1434n/a "TemporaryDirectory %s exists after __del__" % name)
1435n/a
1436n/a def test_multiple_close(self):
1437n/a # Can be cleaned-up many times without error
1438n/a d = self.do_create()
1439n/a d.cleanup()
1440n/a d.cleanup()
1441n/a d.cleanup()
1442n/a
1443n/a def test_context_manager(self):
1444n/a # Can be used as a context manager
1445n/a d = self.do_create()
1446n/a with d as name:
1447n/a self.assertTrue(os.path.exists(name))
1448n/a self.assertEqual(name, d.name)
1449n/a self.assertFalse(os.path.exists(name))
1450n/a
1451n/a
1452n/aif __name__ == "__main__":
1453n/a unittest.main()