ยปCore Development>Code coverage>Lib/test/test_os.py

Python code coverage for Lib/test/test_os.py

#countcontent
1n/a# As a test suite for the os module, this is woefully inadequate, but this
2n/a# does add tests for a few functions which have been determined to be more
3n/a# portable than they had been thought to be.
4n/a
5n/aimport asynchat
6n/aimport asyncore
7n/aimport codecs
8n/aimport contextlib
9n/aimport decimal
10n/aimport errno
11n/aimport fractions
12n/aimport getpass
13n/aimport itertools
14n/aimport locale
15n/aimport mmap
16n/aimport os
17n/aimport pickle
18n/aimport shutil
19n/aimport signal
20n/aimport socket
21n/aimport stat
22n/aimport subprocess
23n/aimport sys
24n/aimport sysconfig
25n/aimport time
26n/aimport unittest
27n/aimport uuid
28n/aimport warnings
29n/afrom test import support
30n/atry:
31n/a import threading
32n/aexcept ImportError:
33n/a threading = None
34n/atry:
35n/a import resource
36n/aexcept ImportError:
37n/a resource = None
38n/atry:
39n/a import fcntl
40n/aexcept ImportError:
41n/a fcntl = None
42n/atry:
43n/a import _winapi
44n/aexcept ImportError:
45n/a _winapi = None
46n/atry:
47n/a import grp
48n/a groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49n/a if hasattr(os, 'getgid'):
50n/a process_gid = os.getgid()
51n/a if process_gid not in groups:
52n/a groups.append(process_gid)
53n/aexcept ImportError:
54n/a groups = []
55n/atry:
56n/a import pwd
57n/a all_users = [u.pw_uid for u in pwd.getpwall()]
58n/aexcept (ImportError, AttributeError):
59n/a all_users = []
60n/atry:
61n/a from _testcapi import INT_MAX, PY_SSIZE_T_MAX
62n/aexcept ImportError:
63n/a INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
64n/a
65n/afrom test.support.script_helper import assert_python_ok
66n/afrom test.support import unix_shell
67n/a
68n/a
69n/aroot_in_posix = False
70n/aif hasattr(os, 'geteuid'):
71n/a root_in_posix = (os.geteuid() == 0)
72n/a
73n/a# Detect whether we're on a Linux system that uses the (now outdated
74n/a# and unmaintained) linuxthreads threading library. There's an issue
75n/a# when combining linuxthreads with a failed execv call: see
76n/a# http://bugs.python.org/issue4970.
77n/aif hasattr(sys, 'thread_info') and sys.thread_info.version:
78n/a USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79n/aelse:
80n/a USING_LINUXTHREADS = False
81n/a
82n/a# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83n/aHAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84n/a
85n/a
86n/a@contextlib.contextmanager
87n/adef ignore_deprecation_warnings(msg_regex, quiet=False):
88n/a with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89n/a yield
90n/a
91n/a
92n/adef requires_os_func(name):
93n/a return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94n/a
95n/a
96n/aclass _PathLike(os.PathLike):
97n/a
98n/a def __init__(self, path=""):
99n/a self.path = path
100n/a
101n/a def __str__(self):
102n/a return str(self.path)
103n/a
104n/a def __fspath__(self):
105n/a if isinstance(self.path, BaseException):
106n/a raise self.path
107n/a else:
108n/a return self.path
109n/a
110n/a
111n/adef create_file(filename, content=b'content'):
112n/a with open(filename, "xb", 0) as fp:
113n/a fp.write(content)
114n/a
115n/a
116n/a# Tests creating TESTFN
117n/aclass FileTests(unittest.TestCase):
118n/a def setUp(self):
119n/a if os.path.lexists(support.TESTFN):
120n/a os.unlink(support.TESTFN)
121n/a tearDown = setUp
122n/a
123n/a def test_access(self):
124n/a f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
125n/a os.close(f)
126n/a self.assertTrue(os.access(support.TESTFN, os.W_OK))
127n/a
128n/a def test_closerange(self):
129n/a first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130n/a # We must allocate two consecutive file descriptors, otherwise
131n/a # it will mess up other file descriptors (perhaps even the three
132n/a # standard ones).
133n/a second = os.dup(first)
134n/a try:
135n/a retries = 0
136n/a while second != first + 1:
137n/a os.close(first)
138n/a retries += 1
139n/a if retries > 10:
140n/a # XXX test skipped
141n/a self.skipTest("couldn't allocate two consecutive fds")
142n/a first, second = second, os.dup(second)
143n/a finally:
144n/a os.close(second)
145n/a # close a fd that is open, and one that isn't
146n/a os.closerange(first, first + 2)
147n/a self.assertRaises(OSError, os.write, first, b"a")
148n/a
149n/a @support.cpython_only
150n/a def test_rename(self):
151n/a path = support.TESTFN
152n/a old = sys.getrefcount(path)
153n/a self.assertRaises(TypeError, os.rename, path, 0)
154n/a new = sys.getrefcount(path)
155n/a self.assertEqual(old, new)
156n/a
157n/a def test_read(self):
158n/a with open(support.TESTFN, "w+b") as fobj:
159n/a fobj.write(b"spam")
160n/a fobj.flush()
161n/a fd = fobj.fileno()
162n/a os.lseek(fd, 0, 0)
163n/a s = os.read(fd, 4)
164n/a self.assertEqual(type(s), bytes)
165n/a self.assertEqual(s, b"spam")
166n/a
167n/a @support.cpython_only
168n/a # Skip the test on 32-bit platforms: the number of bytes must fit in a
169n/a # Py_ssize_t type
170n/a @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171n/a "needs INT_MAX < PY_SSIZE_T_MAX")
172n/a @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173n/a def test_large_read(self, size):
174n/a self.addCleanup(support.unlink, support.TESTFN)
175n/a create_file(support.TESTFN, b'test')
176n/a
177n/a # Issue #21932: Make sure that os.read() does not raise an
178n/a # OverflowError for size larger than INT_MAX
179n/a with open(support.TESTFN, "rb") as fp:
180n/a data = os.read(fp.fileno(), size)
181n/a
182n/a # The test does not try to read more than 2 GB at once because the
183n/a # operating system is free to return less bytes than requested.
184n/a self.assertEqual(data, b'test')
185n/a
186n/a def test_write(self):
187n/a # os.write() accepts bytes- and buffer-like objects but not strings
188n/a fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189n/a self.assertRaises(TypeError, os.write, fd, "beans")
190n/a os.write(fd, b"bacon\n")
191n/a os.write(fd, bytearray(b"eggs\n"))
192n/a os.write(fd, memoryview(b"spam\n"))
193n/a os.close(fd)
194n/a with open(support.TESTFN, "rb") as fobj:
195n/a self.assertEqual(fobj.read().splitlines(),
196n/a [b"bacon", b"eggs", b"spam"])
197n/a
198n/a def write_windows_console(self, *args):
199n/a retcode = subprocess.call(args,
200n/a # use a new console to not flood the test output
201n/a creationflags=subprocess.CREATE_NEW_CONSOLE,
202n/a # use a shell to hide the console window (SW_HIDE)
203n/a shell=True)
204n/a self.assertEqual(retcode, 0)
205n/a
206n/a @unittest.skipUnless(sys.platform == 'win32',
207n/a 'test specific to the Windows console')
208n/a def test_write_windows_console(self):
209n/a # Issue #11395: the Windows console returns an error (12: not enough
210n/a # space error) on writing into stdout if stdout mode is binary and the
211n/a # length is greater than 66,000 bytes (or less, depending on heap
212n/a # usage).
213n/a code = "print('x' * 100000)"
214n/a self.write_windows_console(sys.executable, "-c", code)
215n/a self.write_windows_console(sys.executable, "-u", "-c", code)
216n/a
217n/a def fdopen_helper(self, *args):
218n/a fd = os.open(support.TESTFN, os.O_RDONLY)
219n/a f = os.fdopen(fd, *args)
220n/a f.close()
221n/a
222n/a def test_fdopen(self):
223n/a fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224n/a os.close(fd)
225n/a
226n/a self.fdopen_helper()
227n/a self.fdopen_helper('r')
228n/a self.fdopen_helper('r', 100)
229n/a
230n/a def test_replace(self):
231n/a TESTFN2 = support.TESTFN + ".2"
232n/a self.addCleanup(support.unlink, support.TESTFN)
233n/a self.addCleanup(support.unlink, TESTFN2)
234n/a
235n/a create_file(support.TESTFN, b"1")
236n/a create_file(TESTFN2, b"2")
237n/a
238n/a os.replace(support.TESTFN, TESTFN2)
239n/a self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240n/a with open(TESTFN2, 'r') as f:
241n/a self.assertEqual(f.read(), "1")
242n/a
243n/a def test_open_keywords(self):
244n/a f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245n/a dir_fd=None)
246n/a os.close(f)
247n/a
248n/a def test_symlink_keywords(self):
249n/a symlink = support.get_attribute(os, "symlink")
250n/a try:
251n/a symlink(src='target', dst=support.TESTFN,
252n/a target_is_directory=False, dir_fd=None)
253n/a except (NotImplementedError, OSError):
254n/a pass # No OS support or unprivileged user
255n/a
256n/a
257n/a# Test attributes on return values from os.*stat* family.
258n/aclass StatAttributeTests(unittest.TestCase):
259n/a def setUp(self):
260n/a self.fname = support.TESTFN
261n/a self.addCleanup(support.unlink, self.fname)
262n/a create_file(self.fname, b"ABC")
263n/a
264n/a @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
265n/a def check_stat_attributes(self, fname):
266n/a result = os.stat(fname)
267n/a
268n/a # Make sure direct access works
269n/a self.assertEqual(result[stat.ST_SIZE], 3)
270n/a self.assertEqual(result.st_size, 3)
271n/a
272n/a # Make sure all the attributes are there
273n/a members = dir(result)
274n/a for name in dir(stat):
275n/a if name[:3] == 'ST_':
276n/a attr = name.lower()
277n/a if name.endswith("TIME"):
278n/a def trunc(x): return int(x)
279n/a else:
280n/a def trunc(x): return x
281n/a self.assertEqual(trunc(getattr(result, attr)),
282n/a result[getattr(stat, name)])
283n/a self.assertIn(attr, members)
284n/a
285n/a # Make sure that the st_?time and st_?time_ns fields roughly agree
286n/a # (they should always agree up to around tens-of-microseconds)
287n/a for name in 'st_atime st_mtime st_ctime'.split():
288n/a floaty = int(getattr(result, name) * 100000)
289n/a nanosecondy = getattr(result, name + "_ns") // 10000
290n/a self.assertAlmostEqual(floaty, nanosecondy, delta=2)
291n/a
292n/a try:
293n/a result[200]
294n/a self.fail("No exception raised")
295n/a except IndexError:
296n/a pass
297n/a
298n/a # Make sure that assignment fails
299n/a try:
300n/a result.st_mode = 1
301n/a self.fail("No exception raised")
302n/a except AttributeError:
303n/a pass
304n/a
305n/a try:
306n/a result.st_rdev = 1
307n/a self.fail("No exception raised")
308n/a except (AttributeError, TypeError):
309n/a pass
310n/a
311n/a try:
312n/a result.parrot = 1
313n/a self.fail("No exception raised")
314n/a except AttributeError:
315n/a pass
316n/a
317n/a # Use the stat_result constructor with a too-short tuple.
318n/a try:
319n/a result2 = os.stat_result((10,))
320n/a self.fail("No exception raised")
321n/a except TypeError:
322n/a pass
323n/a
324n/a # Use the constructor with a too-long tuple.
325n/a try:
326n/a result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327n/a except TypeError:
328n/a pass
329n/a
330n/a def test_stat_attributes(self):
331n/a self.check_stat_attributes(self.fname)
332n/a
333n/a def test_stat_attributes_bytes(self):
334n/a try:
335n/a fname = self.fname.encode(sys.getfilesystemencoding())
336n/a except UnicodeEncodeError:
337n/a self.skipTest("cannot encode %a for the filesystem" % self.fname)
338n/a self.check_stat_attributes(fname)
339n/a
340n/a def test_stat_result_pickle(self):
341n/a result = os.stat(self.fname)
342n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343n/a p = pickle.dumps(result, proto)
344n/a self.assertIn(b'stat_result', p)
345n/a if proto < 4:
346n/a self.assertIn(b'cos\nstat_result\n', p)
347n/a unpickled = pickle.loads(p)
348n/a self.assertEqual(result, unpickled)
349n/a
350n/a @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
351n/a def test_statvfs_attributes(self):
352n/a try:
353n/a result = os.statvfs(self.fname)
354n/a except OSError as e:
355n/a # On AtheOS, glibc always returns ENOSYS
356n/a if e.errno == errno.ENOSYS:
357n/a self.skipTest('os.statvfs() failed with ENOSYS')
358n/a
359n/a # Make sure direct access works
360n/a self.assertEqual(result.f_bfree, result[3])
361n/a
362n/a # Make sure all the attributes are there.
363n/a members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364n/a 'ffree', 'favail', 'flag', 'namemax')
365n/a for value, member in enumerate(members):
366n/a self.assertEqual(getattr(result, 'f_' + member), result[value])
367n/a
368n/a # Make sure that assignment really fails
369n/a try:
370n/a result.f_bfree = 1
371n/a self.fail("No exception raised")
372n/a except AttributeError:
373n/a pass
374n/a
375n/a try:
376n/a result.parrot = 1
377n/a self.fail("No exception raised")
378n/a except AttributeError:
379n/a pass
380n/a
381n/a # Use the constructor with a too-short tuple.
382n/a try:
383n/a result2 = os.statvfs_result((10,))
384n/a self.fail("No exception raised")
385n/a except TypeError:
386n/a pass
387n/a
388n/a # Use the constructor with a too-long tuple.
389n/a try:
390n/a result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391n/a except TypeError:
392n/a pass
393n/a
394n/a @unittest.skipUnless(hasattr(os, 'statvfs'),
395n/a "need os.statvfs()")
396n/a def test_statvfs_result_pickle(self):
397n/a try:
398n/a result = os.statvfs(self.fname)
399n/a except OSError as e:
400n/a # On AtheOS, glibc always returns ENOSYS
401n/a if e.errno == errno.ENOSYS:
402n/a self.skipTest('os.statvfs() failed with ENOSYS')
403n/a
404n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405n/a p = pickle.dumps(result, proto)
406n/a self.assertIn(b'statvfs_result', p)
407n/a if proto < 4:
408n/a self.assertIn(b'cos\nstatvfs_result\n', p)
409n/a unpickled = pickle.loads(p)
410n/a self.assertEqual(result, unpickled)
411n/a
412n/a @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413n/a def test_1686475(self):
414n/a # Verify that an open file can be stat'ed
415n/a try:
416n/a os.stat(r"c:\pagefile.sys")
417n/a except FileNotFoundError:
418n/a self.skipTest(r'c:\pagefile.sys does not exist')
419n/a except OSError as e:
420n/a self.fail("Could not stat pagefile.sys")
421n/a
422n/a @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423n/a @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424n/a def test_15261(self):
425n/a # Verify that stat'ing a closed fd does not cause crash
426n/a r, w = os.pipe()
427n/a try:
428n/a os.stat(r) # should not raise error
429n/a finally:
430n/a os.close(r)
431n/a os.close(w)
432n/a with self.assertRaises(OSError) as ctx:
433n/a os.stat(r)
434n/a self.assertEqual(ctx.exception.errno, errno.EBADF)
435n/a
436n/a def check_file_attributes(self, result):
437n/a self.assertTrue(hasattr(result, 'st_file_attributes'))
438n/a self.assertTrue(isinstance(result.st_file_attributes, int))
439n/a self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440n/a
441n/a @unittest.skipUnless(sys.platform == "win32",
442n/a "st_file_attributes is Win32 specific")
443n/a def test_file_attributes(self):
444n/a # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445n/a result = os.stat(self.fname)
446n/a self.check_file_attributes(result)
447n/a self.assertEqual(
448n/a result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449n/a 0)
450n/a
451n/a # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
452n/a dirname = support.TESTFN + "dir"
453n/a os.mkdir(dirname)
454n/a self.addCleanup(os.rmdir, dirname)
455n/a
456n/a result = os.stat(dirname)
457n/a self.check_file_attributes(result)
458n/a self.assertEqual(
459n/a result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460n/a stat.FILE_ATTRIBUTE_DIRECTORY)
461n/a
462n/a @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463n/a def test_access_denied(self):
464n/a # Default to FindFirstFile WIN32_FIND_DATA when access is
465n/a # denied. See issue 28075.
466n/a # os.environ['TEMP'] should be located on a volume that
467n/a # supports file ACLs.
468n/a fname = os.path.join(os.environ['TEMP'], self.fname)
469n/a self.addCleanup(support.unlink, fname)
470n/a create_file(fname, b'ABC')
471n/a # Deny the right to [S]YNCHRONIZE on the file to
472n/a # force CreateFile to fail with ERROR_ACCESS_DENIED.
473n/a DETACHED_PROCESS = 8
474n/a subprocess.check_call(
475n/a ['icacls.exe', fname, '/deny', 'Users:(S)'],
476n/a creationflags=DETACHED_PROCESS
477n/a )
478n/a result = os.stat(fname)
479n/a self.assertNotEqual(result.st_size, 0)
480n/a
481n/a
482n/aclass UtimeTests(unittest.TestCase):
483n/a def setUp(self):
484n/a self.dirname = support.TESTFN
485n/a self.fname = os.path.join(self.dirname, "f1")
486n/a
487n/a self.addCleanup(support.rmtree, self.dirname)
488n/a os.mkdir(self.dirname)
489n/a create_file(self.fname)
490n/a
491n/a def restore_float_times(state):
492n/a with ignore_deprecation_warnings('stat_float_times'):
493n/a os.stat_float_times(state)
494n/a
495n/a # ensure that st_atime and st_mtime are float
496n/a with ignore_deprecation_warnings('stat_float_times'):
497n/a old_float_times = os.stat_float_times(-1)
498n/a self.addCleanup(restore_float_times, old_float_times)
499n/a
500n/a os.stat_float_times(True)
501n/a
502n/a def support_subsecond(self, filename):
503n/a # Heuristic to check if the filesystem supports timestamp with
504n/a # subsecond resolution: check if float and int timestamps are different
505n/a st = os.stat(filename)
506n/a return ((st.st_atime != st[7])
507n/a or (st.st_mtime != st[8])
508n/a or (st.st_ctime != st[9]))
509n/a
510n/a def _test_utime(self, set_time, filename=None):
511n/a if not filename:
512n/a filename = self.fname
513n/a
514n/a support_subsecond = self.support_subsecond(filename)
515n/a if support_subsecond:
516n/a # Timestamp with a resolution of 1 microsecond (10^-6).
517n/a #
518n/a # The resolution of the C internal function used by os.utime()
519n/a # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
520n/a # test with a resolution of 1 ns requires more work:
521n/a # see the issue #15745.
522n/a atime_ns = 1002003000 # 1.002003 seconds
523n/a mtime_ns = 4005006000 # 4.005006 seconds
524n/a else:
525n/a # use a resolution of 1 second
526n/a atime_ns = 5 * 10**9
527n/a mtime_ns = 8 * 10**9
528n/a
529n/a set_time(filename, (atime_ns, mtime_ns))
530n/a st = os.stat(filename)
531n/a
532n/a if support_subsecond:
533n/a self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
534n/a self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
535n/a else:
536n/a self.assertEqual(st.st_atime, atime_ns * 1e-9)
537n/a self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
538n/a self.assertEqual(st.st_atime_ns, atime_ns)
539n/a self.assertEqual(st.st_mtime_ns, mtime_ns)
540n/a
541n/a def test_utime(self):
542n/a def set_time(filename, ns):
543n/a # test the ns keyword parameter
544n/a os.utime(filename, ns=ns)
545n/a self._test_utime(set_time)
546n/a
547n/a @staticmethod
548n/a def ns_to_sec(ns):
549n/a # Convert a number of nanosecond (int) to a number of seconds (float).
550n/a # Round towards infinity by adding 0.5 nanosecond to avoid rounding
551n/a # issue, os.utime() rounds towards minus infinity.
552n/a return (ns * 1e-9) + 0.5e-9
553n/a
554n/a def test_utime_by_indexed(self):
555n/a # pass times as floating point seconds as the second indexed parameter
556n/a def set_time(filename, ns):
557n/a atime_ns, mtime_ns = ns
558n/a atime = self.ns_to_sec(atime_ns)
559n/a mtime = self.ns_to_sec(mtime_ns)
560n/a # test utimensat(timespec), utimes(timeval), utime(utimbuf)
561n/a # or utime(time_t)
562n/a os.utime(filename, (atime, mtime))
563n/a self._test_utime(set_time)
564n/a
565n/a def test_utime_by_times(self):
566n/a def set_time(filename, ns):
567n/a atime_ns, mtime_ns = ns
568n/a atime = self.ns_to_sec(atime_ns)
569n/a mtime = self.ns_to_sec(mtime_ns)
570n/a # test the times keyword parameter
571n/a os.utime(filename, times=(atime, mtime))
572n/a self._test_utime(set_time)
573n/a
574n/a @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
575n/a "follow_symlinks support for utime required "
576n/a "for this test.")
577n/a def test_utime_nofollow_symlinks(self):
578n/a def set_time(filename, ns):
579n/a # use follow_symlinks=False to test utimensat(timespec)
580n/a # or lutimes(timeval)
581n/a os.utime(filename, ns=ns, follow_symlinks=False)
582n/a self._test_utime(set_time)
583n/a
584n/a @unittest.skipUnless(os.utime in os.supports_fd,
585n/a "fd support for utime required for this test.")
586n/a def test_utime_fd(self):
587n/a def set_time(filename, ns):
588n/a with open(filename, 'wb', 0) as fp:
589n/a # use a file descriptor to test futimens(timespec)
590n/a # or futimes(timeval)
591n/a os.utime(fp.fileno(), ns=ns)
592n/a self._test_utime(set_time)
593n/a
594n/a @unittest.skipUnless(os.utime in os.supports_dir_fd,
595n/a "dir_fd support for utime required for this test.")
596n/a def test_utime_dir_fd(self):
597n/a def set_time(filename, ns):
598n/a dirname, name = os.path.split(filename)
599n/a dirfd = os.open(dirname, os.O_RDONLY)
600n/a try:
601n/a # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
602n/a os.utime(name, dir_fd=dirfd, ns=ns)
603n/a finally:
604n/a os.close(dirfd)
605n/a self._test_utime(set_time)
606n/a
607n/a def test_utime_directory(self):
608n/a def set_time(filename, ns):
609n/a # test calling os.utime() on a directory
610n/a os.utime(filename, ns=ns)
611n/a self._test_utime(set_time, filename=self.dirname)
612n/a
613n/a def _test_utime_current(self, set_time):
614n/a # Get the system clock
615n/a current = time.time()
616n/a
617n/a # Call os.utime() to set the timestamp to the current system clock
618n/a set_time(self.fname)
619n/a
620n/a if not self.support_subsecond(self.fname):
621n/a delta = 1.0
622n/a else:
623n/a # On Windows, the usual resolution of time.time() is 15.6 ms
624n/a delta = 0.020
625n/a st = os.stat(self.fname)
626n/a msg = ("st_time=%r, current=%r, dt=%r"
627n/a % (st.st_mtime, current, st.st_mtime - current))
628n/a self.assertAlmostEqual(st.st_mtime, current,
629n/a delta=delta, msg=msg)
630n/a
631n/a def test_utime_current(self):
632n/a def set_time(filename):
633n/a # Set to the current time in the new way
634n/a os.utime(self.fname)
635n/a self._test_utime_current(set_time)
636n/a
637n/a def test_utime_current_old(self):
638n/a def set_time(filename):
639n/a # Set to the current time in the old explicit way.
640n/a os.utime(self.fname, None)
641n/a self._test_utime_current(set_time)
642n/a
643n/a def get_file_system(self, path):
644n/a if sys.platform == 'win32':
645n/a root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
646n/a import ctypes
647n/a kernel32 = ctypes.windll.kernel32
648n/a buf = ctypes.create_unicode_buffer("", 100)
649n/a ok = kernel32.GetVolumeInformationW(root, None, 0,
650n/a None, None, None,
651n/a buf, len(buf))
652n/a if ok:
653n/a return buf.value
654n/a # return None if the filesystem is unknown
655n/a
656n/a def test_large_time(self):
657n/a # Many filesystems are limited to the year 2038. At least, the test
658n/a # pass with NTFS filesystem.
659n/a if self.get_file_system(self.dirname) != "NTFS":
660n/a self.skipTest("requires NTFS")
661n/a
662n/a large = 5000000000 # some day in 2128
663n/a os.utime(self.fname, (large, large))
664n/a self.assertEqual(os.stat(self.fname).st_mtime, large)
665n/a
666n/a def test_utime_invalid_arguments(self):
667n/a # seconds and nanoseconds parameters are mutually exclusive
668n/a with self.assertRaises(ValueError):
669n/a os.utime(self.fname, (5, 5), ns=(5, 5))
670n/a
671n/a
672n/afrom test import mapping_tests
673n/a
674n/aclass EnvironTests(mapping_tests.BasicTestMappingProtocol):
675n/a """check that os.environ object conform to mapping protocol"""
676n/a type2test = None
677n/a
678n/a def setUp(self):
679n/a self.__save = dict(os.environ)
680n/a if os.supports_bytes_environ:
681n/a self.__saveb = dict(os.environb)
682n/a for key, value in self._reference().items():
683n/a os.environ[key] = value
684n/a
685n/a def tearDown(self):
686n/a os.environ.clear()
687n/a os.environ.update(self.__save)
688n/a if os.supports_bytes_environ:
689n/a os.environb.clear()
690n/a os.environb.update(self.__saveb)
691n/a
692n/a def _reference(self):
693n/a return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
694n/a
695n/a def _empty_mapping(self):
696n/a os.environ.clear()
697n/a return os.environ
698n/a
699n/a # Bug 1110478
700n/a @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
701n/a 'requires a shell')
702n/a def test_update2(self):
703n/a os.environ.clear()
704n/a os.environ.update(HELLO="World")
705n/a with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
706n/a value = popen.read().strip()
707n/a self.assertEqual(value, "World")
708n/a
709n/a @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
710n/a 'requires a shell')
711n/a def test_os_popen_iter(self):
712n/a with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
713n/a % unix_shell) as popen:
714n/a it = iter(popen)
715n/a self.assertEqual(next(it), "line1\n")
716n/a self.assertEqual(next(it), "line2\n")
717n/a self.assertEqual(next(it), "line3\n")
718n/a self.assertRaises(StopIteration, next, it)
719n/a
720n/a # Verify environ keys and values from the OS are of the
721n/a # correct str type.
722n/a def test_keyvalue_types(self):
723n/a for key, val in os.environ.items():
724n/a self.assertEqual(type(key), str)
725n/a self.assertEqual(type(val), str)
726n/a
727n/a def test_items(self):
728n/a for key, value in self._reference().items():
729n/a self.assertEqual(os.environ.get(key), value)
730n/a
731n/a # Issue 7310
732n/a def test___repr__(self):
733n/a """Check that the repr() of os.environ looks like environ({...})."""
734n/a env = os.environ
735n/a self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
736n/a '{!r}: {!r}'.format(key, value)
737n/a for key, value in env.items())))
738n/a
739n/a def test_get_exec_path(self):
740n/a defpath_list = os.defpath.split(os.pathsep)
741n/a test_path = ['/monty', '/python', '', '/flying/circus']
742n/a test_env = {'PATH': os.pathsep.join(test_path)}
743n/a
744n/a saved_environ = os.environ
745n/a try:
746n/a os.environ = dict(test_env)
747n/a # Test that defaulting to os.environ works.
748n/a self.assertSequenceEqual(test_path, os.get_exec_path())
749n/a self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
750n/a finally:
751n/a os.environ = saved_environ
752n/a
753n/a # No PATH environment variable
754n/a self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
755n/a # Empty PATH environment variable
756n/a self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
757n/a # Supplied PATH environment variable
758n/a self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
759n/a
760n/a if os.supports_bytes_environ:
761n/a # env cannot contain 'PATH' and b'PATH' keys
762n/a try:
763n/a # ignore BytesWarning warning
764n/a with warnings.catch_warnings(record=True):
765n/a mixed_env = {'PATH': '1', b'PATH': b'2'}
766n/a except BytesWarning:
767n/a # mixed_env cannot be created with python -bb
768n/a pass
769n/a else:
770n/a self.assertRaises(ValueError, os.get_exec_path, mixed_env)
771n/a
772n/a # bytes key and/or value
773n/a self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
774n/a ['abc'])
775n/a self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
776n/a ['abc'])
777n/a self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
778n/a ['abc'])
779n/a
780n/a @unittest.skipUnless(os.supports_bytes_environ,
781n/a "os.environb required for this test.")
782n/a def test_environb(self):
783n/a # os.environ -> os.environb
784n/a value = 'euro\u20ac'
785n/a try:
786n/a value_bytes = value.encode(sys.getfilesystemencoding(),
787n/a 'surrogateescape')
788n/a except UnicodeEncodeError:
789n/a msg = "U+20AC character is not encodable to %s" % (
790n/a sys.getfilesystemencoding(),)
791n/a self.skipTest(msg)
792n/a os.environ['unicode'] = value
793n/a self.assertEqual(os.environ['unicode'], value)
794n/a self.assertEqual(os.environb[b'unicode'], value_bytes)
795n/a
796n/a # os.environb -> os.environ
797n/a value = b'\xff'
798n/a os.environb[b'bytes'] = value
799n/a self.assertEqual(os.environb[b'bytes'], value)
800n/a value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
801n/a self.assertEqual(os.environ['bytes'], value_str)
802n/a
803n/a # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
804n/a # #13415).
805n/a @support.requires_freebsd_version(7)
806n/a @support.requires_mac_ver(10, 6)
807n/a def test_unset_error(self):
808n/a if sys.platform == "win32":
809n/a # an environment variable is limited to 32,767 characters
810n/a key = 'x' * 50000
811n/a self.assertRaises(ValueError, os.environ.__delitem__, key)
812n/a else:
813n/a # "=" is not allowed in a variable name
814n/a key = 'key='
815n/a self.assertRaises(OSError, os.environ.__delitem__, key)
816n/a
817n/a def test_key_type(self):
818n/a missing = 'missingkey'
819n/a self.assertNotIn(missing, os.environ)
820n/a
821n/a with self.assertRaises(KeyError) as cm:
822n/a os.environ[missing]
823n/a self.assertIs(cm.exception.args[0], missing)
824n/a self.assertTrue(cm.exception.__suppress_context__)
825n/a
826n/a with self.assertRaises(KeyError) as cm:
827n/a del os.environ[missing]
828n/a self.assertIs(cm.exception.args[0], missing)
829n/a self.assertTrue(cm.exception.__suppress_context__)
830n/a
831n/a
832n/aclass WalkTests(unittest.TestCase):
833n/a """Tests for os.walk()."""
834n/a
835n/a # Wrapper to hide minor differences between os.walk and os.fwalk
836n/a # to tests both functions with the same code base
837n/a def walk(self, top, **kwargs):
838n/a if 'follow_symlinks' in kwargs:
839n/a kwargs['followlinks'] = kwargs.pop('follow_symlinks')
840n/a return os.walk(top, **kwargs)
841n/a
842n/a def setUp(self):
843n/a join = os.path.join
844n/a self.addCleanup(support.rmtree, support.TESTFN)
845n/a
846n/a # Build:
847n/a # TESTFN/
848n/a # TEST1/ a file kid and two directory kids
849n/a # tmp1
850n/a # SUB1/ a file kid and a directory kid
851n/a # tmp2
852n/a # SUB11/ no kids
853n/a # SUB2/ a file kid and a dirsymlink kid
854n/a # tmp3
855n/a # SUB21/ not readable
856n/a # tmp5
857n/a # link/ a symlink to TESTFN.2
858n/a # broken_link
859n/a # broken_link2
860n/a # broken_link3
861n/a # TEST2/
862n/a # tmp4 a lone file
863n/a self.walk_path = join(support.TESTFN, "TEST1")
864n/a self.sub1_path = join(self.walk_path, "SUB1")
865n/a self.sub11_path = join(self.sub1_path, "SUB11")
866n/a sub2_path = join(self.walk_path, "SUB2")
867n/a sub21_path = join(sub2_path, "SUB21")
868n/a tmp1_path = join(self.walk_path, "tmp1")
869n/a tmp2_path = join(self.sub1_path, "tmp2")
870n/a tmp3_path = join(sub2_path, "tmp3")
871n/a tmp5_path = join(sub21_path, "tmp3")
872n/a self.link_path = join(sub2_path, "link")
873n/a t2_path = join(support.TESTFN, "TEST2")
874n/a tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
875n/a broken_link_path = join(sub2_path, "broken_link")
876n/a broken_link2_path = join(sub2_path, "broken_link2")
877n/a broken_link3_path = join(sub2_path, "broken_link3")
878n/a
879n/a # Create stuff.
880n/a os.makedirs(self.sub11_path)
881n/a os.makedirs(sub2_path)
882n/a os.makedirs(sub21_path)
883n/a os.makedirs(t2_path)
884n/a
885n/a for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
886n/a with open(path, "x") as f:
887n/a f.write("I'm " + path + " and proud of it. Blame test_os.\n")
888n/a
889n/a if support.can_symlink():
890n/a os.symlink(os.path.abspath(t2_path), self.link_path)
891n/a os.symlink('broken', broken_link_path, True)
892n/a os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
893n/a os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
894n/a self.sub2_tree = (sub2_path, ["SUB21", "link"],
895n/a ["broken_link", "broken_link2", "broken_link3",
896n/a "tmp3"])
897n/a else:
898n/a self.sub2_tree = (sub2_path, [], ["tmp3"])
899n/a
900n/a os.chmod(sub21_path, 0)
901n/a try:
902n/a os.listdir(sub21_path)
903n/a except PermissionError:
904n/a self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
905n/a else:
906n/a os.chmod(sub21_path, stat.S_IRWXU)
907n/a os.unlink(tmp5_path)
908n/a os.rmdir(sub21_path)
909n/a del self.sub2_tree[1][:1]
910n/a
911n/a def test_walk_topdown(self):
912n/a # Walk top-down.
913n/a all = list(self.walk(self.walk_path))
914n/a
915n/a self.assertEqual(len(all), 4)
916n/a # We can't know which order SUB1 and SUB2 will appear in.
917n/a # Not flipped: TESTFN, SUB1, SUB11, SUB2
918n/a # flipped: TESTFN, SUB2, SUB1, SUB11
919n/a flipped = all[0][1][0] != "SUB1"
920n/a all[0][1].sort()
921n/a all[3 - 2 * flipped][-1].sort()
922n/a all[3 - 2 * flipped][1].sort()
923n/a self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
924n/a self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
925n/a self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
926n/a self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
927n/a
928n/a def test_walk_prune(self, walk_path=None):
929n/a if walk_path is None:
930n/a walk_path = self.walk_path
931n/a # Prune the search.
932n/a all = []
933n/a for root, dirs, files in self.walk(walk_path):
934n/a all.append((root, dirs, files))
935n/a # Don't descend into SUB1.
936n/a if 'SUB1' in dirs:
937n/a # Note that this also mutates the dirs we appended to all!
938n/a dirs.remove('SUB1')
939n/a
940n/a self.assertEqual(len(all), 2)
941n/a self.assertEqual(all[0],
942n/a (str(walk_path), ["SUB2"], ["tmp1"]))
943n/a
944n/a all[1][-1].sort()
945n/a all[1][1].sort()
946n/a self.assertEqual(all[1], self.sub2_tree)
947n/a
948n/a def test_file_like_path(self):
949n/a self.test_walk_prune(_PathLike(self.walk_path))
950n/a
951n/a def test_walk_bottom_up(self):
952n/a # Walk bottom-up.
953n/a all = list(self.walk(self.walk_path, topdown=False))
954n/a
955n/a self.assertEqual(len(all), 4, all)
956n/a # We can't know which order SUB1 and SUB2 will appear in.
957n/a # Not flipped: SUB11, SUB1, SUB2, TESTFN
958n/a # flipped: SUB2, SUB11, SUB1, TESTFN
959n/a flipped = all[3][1][0] != "SUB1"
960n/a all[3][1].sort()
961n/a all[2 - 2 * flipped][-1].sort()
962n/a all[2 - 2 * flipped][1].sort()
963n/a self.assertEqual(all[3],
964n/a (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
965n/a self.assertEqual(all[flipped],
966n/a (self.sub11_path, [], []))
967n/a self.assertEqual(all[flipped + 1],
968n/a (self.sub1_path, ["SUB11"], ["tmp2"]))
969n/a self.assertEqual(all[2 - 2 * flipped],
970n/a self.sub2_tree)
971n/a
972n/a def test_walk_symlink(self):
973n/a if not support.can_symlink():
974n/a self.skipTest("need symlink support")
975n/a
976n/a # Walk, following symlinks.
977n/a walk_it = self.walk(self.walk_path, follow_symlinks=True)
978n/a for root, dirs, files in walk_it:
979n/a if root == self.link_path:
980n/a self.assertEqual(dirs, [])
981n/a self.assertEqual(files, ["tmp4"])
982n/a break
983n/a else:
984n/a self.fail("Didn't follow symlink with followlinks=True")
985n/a
986n/a def test_walk_bad_dir(self):
987n/a # Walk top-down.
988n/a errors = []
989n/a walk_it = self.walk(self.walk_path, onerror=errors.append)
990n/a root, dirs, files = next(walk_it)
991n/a self.assertEqual(errors, [])
992n/a dir1 = 'SUB1'
993n/a path1 = os.path.join(root, dir1)
994n/a path1new = os.path.join(root, dir1 + '.new')
995n/a os.rename(path1, path1new)
996n/a try:
997n/a roots = [r for r, d, f in walk_it]
998n/a self.assertTrue(errors)
999n/a self.assertNotIn(path1, roots)
1000n/a self.assertNotIn(path1new, roots)
1001n/a for dir2 in dirs:
1002n/a if dir2 != dir1:
1003n/a self.assertIn(os.path.join(root, dir2), roots)
1004n/a finally:
1005n/a os.rename(path1new, path1)
1006n/a
1007n/a
1008n/a@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1009n/aclass FwalkTests(WalkTests):
1010n/a """Tests for os.fwalk()."""
1011n/a
1012n/a def walk(self, top, **kwargs):
1013n/a for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
1014n/a yield (root, dirs, files)
1015n/a
1016n/a def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1017n/a """
1018n/a compare with walk() results.
1019n/a """
1020n/a walk_kwargs = walk_kwargs.copy()
1021n/a fwalk_kwargs = fwalk_kwargs.copy()
1022n/a for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1023n/a walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1024n/a fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1025n/a
1026n/a expected = {}
1027n/a for root, dirs, files in os.walk(**walk_kwargs):
1028n/a expected[root] = (set(dirs), set(files))
1029n/a
1030n/a for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
1031n/a self.assertIn(root, expected)
1032n/a self.assertEqual(expected[root], (set(dirs), set(files)))
1033n/a
1034n/a def test_compare_to_walk(self):
1035n/a kwargs = {'top': support.TESTFN}
1036n/a self._compare_to_walk(kwargs, kwargs)
1037n/a
1038n/a def test_dir_fd(self):
1039n/a try:
1040n/a fd = os.open(".", os.O_RDONLY)
1041n/a walk_kwargs = {'top': support.TESTFN}
1042n/a fwalk_kwargs = walk_kwargs.copy()
1043n/a fwalk_kwargs['dir_fd'] = fd
1044n/a self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1045n/a finally:
1046n/a os.close(fd)
1047n/a
1048n/a def test_yields_correct_dir_fd(self):
1049n/a # check returned file descriptors
1050n/a for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1051n/a args = support.TESTFN, topdown, None
1052n/a for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
1053n/a # check that the FD is valid
1054n/a os.fstat(rootfd)
1055n/a # redundant check
1056n/a os.stat(rootfd)
1057n/a # check that listdir() returns consistent information
1058n/a self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1059n/a
1060n/a def test_fd_leak(self):
1061n/a # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1062n/a # we both check that calling fwalk() a large number of times doesn't
1063n/a # yield EMFILE, and that the minimum allocated FD hasn't changed.
1064n/a minfd = os.dup(1)
1065n/a os.close(minfd)
1066n/a for i in range(256):
1067n/a for x in os.fwalk(support.TESTFN):
1068n/a pass
1069n/a newfd = os.dup(1)
1070n/a self.addCleanup(os.close, newfd)
1071n/a self.assertEqual(newfd, minfd)
1072n/a
1073n/aclass BytesWalkTests(WalkTests):
1074n/a """Tests for os.walk() with bytes."""
1075n/a def setUp(self):
1076n/a super().setUp()
1077n/a self.stack = contextlib.ExitStack()
1078n/a
1079n/a def tearDown(self):
1080n/a self.stack.close()
1081n/a super().tearDown()
1082n/a
1083n/a def walk(self, top, **kwargs):
1084n/a if 'follow_symlinks' in kwargs:
1085n/a kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1086n/a for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1087n/a root = os.fsdecode(broot)
1088n/a dirs = list(map(os.fsdecode, bdirs))
1089n/a files = list(map(os.fsdecode, bfiles))
1090n/a yield (root, dirs, files)
1091n/a bdirs[:] = list(map(os.fsencode, dirs))
1092n/a bfiles[:] = list(map(os.fsencode, files))
1093n/a
1094n/a
1095n/aclass MakedirTests(unittest.TestCase):
1096n/a def setUp(self):
1097n/a os.mkdir(support.TESTFN)
1098n/a
1099n/a def test_makedir(self):
1100n/a base = support.TESTFN
1101n/a path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1102n/a os.makedirs(path) # Should work
1103n/a path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1104n/a os.makedirs(path)
1105n/a
1106n/a # Try paths with a '.' in them
1107n/a self.assertRaises(OSError, os.makedirs, os.curdir)
1108n/a path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1109n/a os.makedirs(path)
1110n/a path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1111n/a 'dir5', 'dir6')
1112n/a os.makedirs(path)
1113n/a
1114n/a def test_exist_ok_existing_directory(self):
1115n/a path = os.path.join(support.TESTFN, 'dir1')
1116n/a mode = 0o777
1117n/a old_mask = os.umask(0o022)
1118n/a os.makedirs(path, mode)
1119n/a self.assertRaises(OSError, os.makedirs, path, mode)
1120n/a self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1121n/a os.makedirs(path, 0o776, exist_ok=True)
1122n/a os.makedirs(path, mode=mode, exist_ok=True)
1123n/a os.umask(old_mask)
1124n/a
1125n/a # Issue #25583: A drive root could raise PermissionError on Windows
1126n/a os.makedirs(os.path.abspath('/'), exist_ok=True)
1127n/a
1128n/a def test_exist_ok_s_isgid_directory(self):
1129n/a path = os.path.join(support.TESTFN, 'dir1')
1130n/a S_ISGID = stat.S_ISGID
1131n/a mode = 0o777
1132n/a old_mask = os.umask(0o022)
1133n/a try:
1134n/a existing_testfn_mode = stat.S_IMODE(
1135n/a os.lstat(support.TESTFN).st_mode)
1136n/a try:
1137n/a os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
1138n/a except PermissionError:
1139n/a raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1140n/a if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1141n/a raise unittest.SkipTest('No support for S_ISGID dir mode.')
1142n/a # The os should apply S_ISGID from the parent dir for us, but
1143n/a # this test need not depend on that behavior. Be explicit.
1144n/a os.makedirs(path, mode | S_ISGID)
1145n/a # http://bugs.python.org/issue14992
1146n/a # Should not fail when the bit is already set.
1147n/a os.makedirs(path, mode, exist_ok=True)
1148n/a # remove the bit.
1149n/a os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1150n/a # May work even when the bit is not already set when demanded.
1151n/a os.makedirs(path, mode | S_ISGID, exist_ok=True)
1152n/a finally:
1153n/a os.umask(old_mask)
1154n/a
1155n/a def test_exist_ok_existing_regular_file(self):
1156n/a base = support.TESTFN
1157n/a path = os.path.join(support.TESTFN, 'dir1')
1158n/a f = open(path, 'w')
1159n/a f.write('abc')
1160n/a f.close()
1161n/a self.assertRaises(OSError, os.makedirs, path)
1162n/a self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1163n/a self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1164n/a os.remove(path)
1165n/a
1166n/a def tearDown(self):
1167n/a path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
1168n/a 'dir4', 'dir5', 'dir6')
1169n/a # If the tests failed, the bottom-most directory ('../dir6')
1170n/a # may not have been created, so we look for the outermost directory
1171n/a # that exists.
1172n/a while not os.path.exists(path) and path != support.TESTFN:
1173n/a path = os.path.dirname(path)
1174n/a
1175n/a os.removedirs(path)
1176n/a
1177n/a
1178n/a@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1179n/aclass ChownFileTests(unittest.TestCase):
1180n/a
1181n/a @classmethod
1182n/a def setUpClass(cls):
1183n/a os.mkdir(support.TESTFN)
1184n/a
1185n/a def test_chown_uid_gid_arguments_must_be_index(self):
1186n/a stat = os.stat(support.TESTFN)
1187n/a uid = stat.st_uid
1188n/a gid = stat.st_gid
1189n/a for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1190n/a self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1191n/a self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1192n/a self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1193n/a self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1194n/a
1195n/a @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1196n/a def test_chown(self):
1197n/a gid_1, gid_2 = groups[:2]
1198n/a uid = os.stat(support.TESTFN).st_uid
1199n/a os.chown(support.TESTFN, uid, gid_1)
1200n/a gid = os.stat(support.TESTFN).st_gid
1201n/a self.assertEqual(gid, gid_1)
1202n/a os.chown(support.TESTFN, uid, gid_2)
1203n/a gid = os.stat(support.TESTFN).st_gid
1204n/a self.assertEqual(gid, gid_2)
1205n/a
1206n/a @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1207n/a "test needs root privilege and more than one user")
1208n/a def test_chown_with_root(self):
1209n/a uid_1, uid_2 = all_users[:2]
1210n/a gid = os.stat(support.TESTFN).st_gid
1211n/a os.chown(support.TESTFN, uid_1, gid)
1212n/a uid = os.stat(support.TESTFN).st_uid
1213n/a self.assertEqual(uid, uid_1)
1214n/a os.chown(support.TESTFN, uid_2, gid)
1215n/a uid = os.stat(support.TESTFN).st_uid
1216n/a self.assertEqual(uid, uid_2)
1217n/a
1218n/a @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1219n/a "test needs non-root account and more than one user")
1220n/a def test_chown_without_permission(self):
1221n/a uid_1, uid_2 = all_users[:2]
1222n/a gid = os.stat(support.TESTFN).st_gid
1223n/a with self.assertRaises(PermissionError):
1224n/a os.chown(support.TESTFN, uid_1, gid)
1225n/a os.chown(support.TESTFN, uid_2, gid)
1226n/a
1227n/a @classmethod
1228n/a def tearDownClass(cls):
1229n/a os.rmdir(support.TESTFN)
1230n/a
1231n/a
1232n/aclass RemoveDirsTests(unittest.TestCase):
1233n/a def setUp(self):
1234n/a os.makedirs(support.TESTFN)
1235n/a
1236n/a def tearDown(self):
1237n/a support.rmtree(support.TESTFN)
1238n/a
1239n/a def test_remove_all(self):
1240n/a dira = os.path.join(support.TESTFN, 'dira')
1241n/a os.mkdir(dira)
1242n/a dirb = os.path.join(dira, 'dirb')
1243n/a os.mkdir(dirb)
1244n/a os.removedirs(dirb)
1245n/a self.assertFalse(os.path.exists(dirb))
1246n/a self.assertFalse(os.path.exists(dira))
1247n/a self.assertFalse(os.path.exists(support.TESTFN))
1248n/a
1249n/a def test_remove_partial(self):
1250n/a dira = os.path.join(support.TESTFN, 'dira')
1251n/a os.mkdir(dira)
1252n/a dirb = os.path.join(dira, 'dirb')
1253n/a os.mkdir(dirb)
1254n/a create_file(os.path.join(dira, 'file.txt'))
1255n/a os.removedirs(dirb)
1256n/a self.assertFalse(os.path.exists(dirb))
1257n/a self.assertTrue(os.path.exists(dira))
1258n/a self.assertTrue(os.path.exists(support.TESTFN))
1259n/a
1260n/a def test_remove_nothing(self):
1261n/a dira = os.path.join(support.TESTFN, 'dira')
1262n/a os.mkdir(dira)
1263n/a dirb = os.path.join(dira, 'dirb')
1264n/a os.mkdir(dirb)
1265n/a create_file(os.path.join(dirb, 'file.txt'))
1266n/a with self.assertRaises(OSError):
1267n/a os.removedirs(dirb)
1268n/a self.assertTrue(os.path.exists(dirb))
1269n/a self.assertTrue(os.path.exists(dira))
1270n/a self.assertTrue(os.path.exists(support.TESTFN))
1271n/a
1272n/a
1273n/aclass DevNullTests(unittest.TestCase):
1274n/a def test_devnull(self):
1275n/a with open(os.devnull, 'wb', 0) as f:
1276n/a f.write(b'hello')
1277n/a f.close()
1278n/a with open(os.devnull, 'rb') as f:
1279n/a self.assertEqual(f.read(), b'')
1280n/a
1281n/a
1282n/aclass URandomTests(unittest.TestCase):
1283n/a def test_urandom_length(self):
1284n/a self.assertEqual(len(os.urandom(0)), 0)
1285n/a self.assertEqual(len(os.urandom(1)), 1)
1286n/a self.assertEqual(len(os.urandom(10)), 10)
1287n/a self.assertEqual(len(os.urandom(100)), 100)
1288n/a self.assertEqual(len(os.urandom(1000)), 1000)
1289n/a
1290n/a def test_urandom_value(self):
1291n/a data1 = os.urandom(16)
1292n/a self.assertIsInstance(data1, bytes)
1293n/a data2 = os.urandom(16)
1294n/a self.assertNotEqual(data1, data2)
1295n/a
1296n/a def get_urandom_subprocess(self, count):
1297n/a code = '\n'.join((
1298n/a 'import os, sys',
1299n/a 'data = os.urandom(%s)' % count,
1300n/a 'sys.stdout.buffer.write(data)',
1301n/a 'sys.stdout.buffer.flush()'))
1302n/a out = assert_python_ok('-c', code)
1303n/a stdout = out[1]
1304n/a self.assertEqual(len(stdout), 16)
1305n/a return stdout
1306n/a
1307n/a def test_urandom_subprocess(self):
1308n/a data1 = self.get_urandom_subprocess(16)
1309n/a data2 = self.get_urandom_subprocess(16)
1310n/a self.assertNotEqual(data1, data2)
1311n/a
1312n/a
1313n/a@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1314n/aclass GetRandomTests(unittest.TestCase):
1315n/a @classmethod
1316n/a def setUpClass(cls):
1317n/a try:
1318n/a os.getrandom(1)
1319n/a except OSError as exc:
1320n/a if exc.errno == errno.ENOSYS:
1321n/a # Python compiled on a more recent Linux version
1322n/a # than the current Linux kernel
1323n/a raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1324n/a else:
1325n/a raise
1326n/a
1327n/a def test_getrandom_type(self):
1328n/a data = os.getrandom(16)
1329n/a self.assertIsInstance(data, bytes)
1330n/a self.assertEqual(len(data), 16)
1331n/a
1332n/a def test_getrandom0(self):
1333n/a empty = os.getrandom(0)
1334n/a self.assertEqual(empty, b'')
1335n/a
1336n/a def test_getrandom_random(self):
1337n/a self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1338n/a
1339n/a # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1340n/a # resource /dev/random
1341n/a
1342n/a def test_getrandom_nonblock(self):
1343n/a # The call must not fail. Check also that the flag exists
1344n/a try:
1345n/a os.getrandom(1, os.GRND_NONBLOCK)
1346n/a except BlockingIOError:
1347n/a # System urandom is not initialized yet
1348n/a pass
1349n/a
1350n/a def test_getrandom_value(self):
1351n/a data1 = os.getrandom(16)
1352n/a data2 = os.getrandom(16)
1353n/a self.assertNotEqual(data1, data2)
1354n/a
1355n/a
1356n/a# os.urandom() doesn't use a file descriptor when it is implemented with the
1357n/a# getentropy() function, the getrandom() function or the getrandom() syscall
1358n/aOS_URANDOM_DONT_USE_FD = (
1359n/a sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1360n/a or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1361n/a or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1362n/a
1363n/a@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1364n/a "os.random() does not use a file descriptor")
1365n/aclass URandomFDTests(unittest.TestCase):
1366n/a @unittest.skipUnless(resource, "test requires the resource module")
1367n/a def test_urandom_failure(self):
1368n/a # Check urandom() failing when it is not able to open /dev/random.
1369n/a # We spawn a new process to make the test more robust (if getrlimit()
1370n/a # failed to restore the file descriptor limit after this, the whole
1371n/a # test suite would crash; this actually happened on the OS X Tiger
1372n/a # buildbot).
1373n/a code = """if 1:
1374n/a import errno
1375n/a import os
1376n/a import resource
1377n/a
1378n/a soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1379n/a resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1380n/a try:
1381n/a os.urandom(16)
1382n/a except OSError as e:
1383n/a assert e.errno == errno.EMFILE, e.errno
1384n/a else:
1385n/a raise AssertionError("OSError not raised")
1386n/a """
1387n/a assert_python_ok('-c', code)
1388n/a
1389n/a def test_urandom_fd_closed(self):
1390n/a # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1391n/a # closed.
1392n/a code = """if 1:
1393n/a import os
1394n/a import sys
1395n/a import test.support
1396n/a os.urandom(4)
1397n/a with test.support.SuppressCrashReport():
1398n/a os.closerange(3, 256)
1399n/a sys.stdout.buffer.write(os.urandom(4))
1400n/a """
1401n/a rc, out, err = assert_python_ok('-Sc', code)
1402n/a
1403n/a def test_urandom_fd_reopened(self):
1404n/a # Issue #21207: urandom() should detect its fd to /dev/urandom
1405n/a # changed to something else, and reopen it.
1406n/a self.addCleanup(support.unlink, support.TESTFN)
1407n/a create_file(support.TESTFN, b"x" * 256)
1408n/a
1409n/a code = """if 1:
1410n/a import os
1411n/a import sys
1412n/a import test.support
1413n/a os.urandom(4)
1414n/a with test.support.SuppressCrashReport():
1415n/a for fd in range(3, 256):
1416n/a try:
1417n/a os.close(fd)
1418n/a except OSError:
1419n/a pass
1420n/a else:
1421n/a # Found the urandom fd (XXX hopefully)
1422n/a break
1423n/a os.closerange(3, 256)
1424n/a with open({TESTFN!r}, 'rb') as f:
1425n/a new_fd = f.fileno()
1426n/a # Issue #26935: posix allows new_fd and fd to be equal but
1427n/a # some libc implementations have dup2 return an error in this
1428n/a # case.
1429n/a if new_fd != fd:
1430n/a os.dup2(new_fd, fd)
1431n/a sys.stdout.buffer.write(os.urandom(4))
1432n/a sys.stdout.buffer.write(os.urandom(4))
1433n/a """.format(TESTFN=support.TESTFN)
1434n/a rc, out, err = assert_python_ok('-Sc', code)
1435n/a self.assertEqual(len(out), 8)
1436n/a self.assertNotEqual(out[0:4], out[4:8])
1437n/a rc, out2, err2 = assert_python_ok('-Sc', code)
1438n/a self.assertEqual(len(out2), 8)
1439n/a self.assertNotEqual(out2, out)
1440n/a
1441n/a
1442n/a@contextlib.contextmanager
1443n/adef _execvpe_mockup(defpath=None):
1444n/a """
1445n/a Stubs out execv and execve functions when used as context manager.
1446n/a Records exec calls. The mock execv and execve functions always raise an
1447n/a exception as they would normally never return.
1448n/a """
1449n/a # A list of tuples containing (function name, first arg, args)
1450n/a # of calls to execv or execve that have been made.
1451n/a calls = []
1452n/a
1453n/a def mock_execv(name, *args):
1454n/a calls.append(('execv', name, args))
1455n/a raise RuntimeError("execv called")
1456n/a
1457n/a def mock_execve(name, *args):
1458n/a calls.append(('execve', name, args))
1459n/a raise OSError(errno.ENOTDIR, "execve called")
1460n/a
1461n/a try:
1462n/a orig_execv = os.execv
1463n/a orig_execve = os.execve
1464n/a orig_defpath = os.defpath
1465n/a os.execv = mock_execv
1466n/a os.execve = mock_execve
1467n/a if defpath is not None:
1468n/a os.defpath = defpath
1469n/a yield calls
1470n/a finally:
1471n/a os.execv = orig_execv
1472n/a os.execve = orig_execve
1473n/a os.defpath = orig_defpath
1474n/a
1475n/a
1476n/aclass ExecTests(unittest.TestCase):
1477n/a @unittest.skipIf(USING_LINUXTHREADS,
1478n/a "avoid triggering a linuxthreads bug: see issue #4970")
1479n/a def test_execvpe_with_bad_program(self):
1480n/a self.assertRaises(OSError, os.execvpe, 'no such app-',
1481n/a ['no such app-'], None)
1482n/a
1483n/a def test_execv_with_bad_arglist(self):
1484n/a self.assertRaises(ValueError, os.execv, 'notepad', ())
1485n/a self.assertRaises(ValueError, os.execv, 'notepad', [])
1486n/a self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1487n/a self.assertRaises(ValueError, os.execv, 'notepad', [''])
1488n/a
1489n/a def test_execvpe_with_bad_arglist(self):
1490n/a self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1491n/a self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1492n/a self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
1493n/a
1494n/a @unittest.skipUnless(hasattr(os, '_execvpe'),
1495n/a "No internal os._execvpe function to test.")
1496n/a def _test_internal_execvpe(self, test_type):
1497n/a program_path = os.sep + 'absolutepath'
1498n/a if test_type is bytes:
1499n/a program = b'executable'
1500n/a fullpath = os.path.join(os.fsencode(program_path), program)
1501n/a native_fullpath = fullpath
1502n/a arguments = [b'progname', 'arg1', 'arg2']
1503n/a else:
1504n/a program = 'executable'
1505n/a arguments = ['progname', 'arg1', 'arg2']
1506n/a fullpath = os.path.join(program_path, program)
1507n/a if os.name != "nt":
1508n/a native_fullpath = os.fsencode(fullpath)
1509n/a else:
1510n/a native_fullpath = fullpath
1511n/a env = {'spam': 'beans'}
1512n/a
1513n/a # test os._execvpe() with an absolute path
1514n/a with _execvpe_mockup() as calls:
1515n/a self.assertRaises(RuntimeError,
1516n/a os._execvpe, fullpath, arguments)
1517n/a self.assertEqual(len(calls), 1)
1518n/a self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1519n/a
1520n/a # test os._execvpe() with a relative path:
1521n/a # os.get_exec_path() returns defpath
1522n/a with _execvpe_mockup(defpath=program_path) as calls:
1523n/a self.assertRaises(OSError,
1524n/a os._execvpe, program, arguments, env=env)
1525n/a self.assertEqual(len(calls), 1)
1526n/a self.assertSequenceEqual(calls[0],
1527n/a ('execve', native_fullpath, (arguments, env)))
1528n/a
1529n/a # test os._execvpe() with a relative path:
1530n/a # os.get_exec_path() reads the 'PATH' variable
1531n/a with _execvpe_mockup() as calls:
1532n/a env_path = env.copy()
1533n/a if test_type is bytes:
1534n/a env_path[b'PATH'] = program_path
1535n/a else:
1536n/a env_path['PATH'] = program_path
1537n/a self.assertRaises(OSError,
1538n/a os._execvpe, program, arguments, env=env_path)
1539n/a self.assertEqual(len(calls), 1)
1540n/a self.assertSequenceEqual(calls[0],
1541n/a ('execve', native_fullpath, (arguments, env_path)))
1542n/a
1543n/a def test_internal_execvpe_str(self):
1544n/a self._test_internal_execvpe(str)
1545n/a if os.name != "nt":
1546n/a self._test_internal_execvpe(bytes)
1547n/a
1548n/a
1549n/a@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1550n/aclass Win32ErrorTests(unittest.TestCase):
1551n/a def setUp(self):
1552n/a try:
1553n/a os.stat(support.TESTFN)
1554n/a except FileNotFoundError:
1555n/a exists = False
1556n/a except OSError as exc:
1557n/a exists = True
1558n/a self.fail("file %s must not exist; os.stat failed with %s"
1559n/a % (support.TESTFN, exc))
1560n/a else:
1561n/a self.fail("file %s must not exist" % support.TESTFN)
1562n/a
1563n/a def test_rename(self):
1564n/a self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
1565n/a
1566n/a def test_remove(self):
1567n/a self.assertRaises(OSError, os.remove, support.TESTFN)
1568n/a
1569n/a def test_chdir(self):
1570n/a self.assertRaises(OSError, os.chdir, support.TESTFN)
1571n/a
1572n/a def test_mkdir(self):
1573n/a self.addCleanup(support.unlink, support.TESTFN)
1574n/a
1575n/a with open(support.TESTFN, "x") as f:
1576n/a self.assertRaises(OSError, os.mkdir, support.TESTFN)
1577n/a
1578n/a def test_utime(self):
1579n/a self.assertRaises(OSError, os.utime, support.TESTFN, None)
1580n/a
1581n/a def test_chmod(self):
1582n/a self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1583n/a
1584n/a
1585n/aclass TestInvalidFD(unittest.TestCase):
1586n/a singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1587n/a "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1588n/a #singles.append("close")
1589n/a #We omit close because it doesn't raise an exception on some platforms
1590n/a def get_single(f):
1591n/a def helper(self):
1592n/a if hasattr(os, f):
1593n/a self.check(getattr(os, f))
1594n/a return helper
1595n/a for f in singles:
1596n/a locals()["test_"+f] = get_single(f)
1597n/a
1598n/a def check(self, f, *args):
1599n/a try:
1600n/a f(support.make_bad_fd(), *args)
1601n/a except OSError as e:
1602n/a self.assertEqual(e.errno, errno.EBADF)
1603n/a else:
1604n/a self.fail("%r didn't raise an OSError with a bad file descriptor"
1605n/a % f)
1606n/a
1607n/a @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1608n/a def test_isatty(self):
1609n/a self.assertEqual(os.isatty(support.make_bad_fd()), False)
1610n/a
1611n/a @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
1612n/a def test_closerange(self):
1613n/a fd = support.make_bad_fd()
1614n/a # Make sure none of the descriptors we are about to close are
1615n/a # currently valid (issue 6542).
1616n/a for i in range(10):
1617n/a try: os.fstat(fd+i)
1618n/a except OSError:
1619n/a pass
1620n/a else:
1621n/a break
1622n/a if i < 2:
1623n/a raise unittest.SkipTest(
1624n/a "Unable to acquire a range of invalid file descriptors")
1625n/a self.assertEqual(os.closerange(fd, fd + i-1), None)
1626n/a
1627n/a @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
1628n/a def test_dup2(self):
1629n/a self.check(os.dup2, 20)
1630n/a
1631n/a @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
1632n/a def test_fchmod(self):
1633n/a self.check(os.fchmod, 0)
1634n/a
1635n/a @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
1636n/a def test_fchown(self):
1637n/a self.check(os.fchown, -1, -1)
1638n/a
1639n/a @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
1640n/a def test_fpathconf(self):
1641n/a self.check(os.pathconf, "PC_NAME_MAX")
1642n/a self.check(os.fpathconf, "PC_NAME_MAX")
1643n/a
1644n/a @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
1645n/a def test_ftruncate(self):
1646n/a self.check(os.truncate, 0)
1647n/a self.check(os.ftruncate, 0)
1648n/a
1649n/a @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
1650n/a def test_lseek(self):
1651n/a self.check(os.lseek, 0, 0)
1652n/a
1653n/a @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
1654n/a def test_read(self):
1655n/a self.check(os.read, 1)
1656n/a
1657n/a @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1658n/a def test_readv(self):
1659n/a buf = bytearray(10)
1660n/a self.check(os.readv, [buf])
1661n/a
1662n/a @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
1663n/a def test_tcsetpgrpt(self):
1664n/a self.check(os.tcsetpgrp, 0)
1665n/a
1666n/a @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1667n/a def test_write(self):
1668n/a self.check(os.write, b" ")
1669n/a
1670n/a @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1671n/a def test_writev(self):
1672n/a self.check(os.writev, [b'abc'])
1673n/a
1674n/a def test_inheritable(self):
1675n/a self.check(os.get_inheritable)
1676n/a self.check(os.set_inheritable, True)
1677n/a
1678n/a @unittest.skipUnless(hasattr(os, 'get_blocking'),
1679n/a 'needs os.get_blocking() and os.set_blocking()')
1680n/a def test_blocking(self):
1681n/a self.check(os.get_blocking)
1682n/a self.check(os.set_blocking, True)
1683n/a
1684n/a
1685n/aclass LinkTests(unittest.TestCase):
1686n/a def setUp(self):
1687n/a self.file1 = support.TESTFN
1688n/a self.file2 = os.path.join(support.TESTFN + "2")
1689n/a
1690n/a def tearDown(self):
1691n/a for file in (self.file1, self.file2):
1692n/a if os.path.exists(file):
1693n/a os.unlink(file)
1694n/a
1695n/a def _test_link(self, file1, file2):
1696n/a create_file(file1)
1697n/a
1698n/a os.link(file1, file2)
1699n/a with open(file1, "r") as f1, open(file2, "r") as f2:
1700n/a self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1701n/a
1702n/a def test_link(self):
1703n/a self._test_link(self.file1, self.file2)
1704n/a
1705n/a def test_link_bytes(self):
1706n/a self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1707n/a bytes(self.file2, sys.getfilesystemencoding()))
1708n/a
1709n/a def test_unicode_name(self):
1710n/a try:
1711n/a os.fsencode("\xf1")
1712n/a except UnicodeError:
1713n/a raise unittest.SkipTest("Unable to encode for this platform.")
1714n/a
1715n/a self.file1 += "\xf1"
1716n/a self.file2 = self.file1 + "2"
1717n/a self._test_link(self.file1, self.file2)
1718n/a
1719n/a@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1720n/aclass PosixUidGidTests(unittest.TestCase):
1721n/a @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1722n/a def test_setuid(self):
1723n/a if os.getuid() != 0:
1724n/a self.assertRaises(OSError, os.setuid, 0)
1725n/a self.assertRaises(OverflowError, os.setuid, 1<<32)
1726n/a
1727n/a @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1728n/a def test_setgid(self):
1729n/a if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1730n/a self.assertRaises(OSError, os.setgid, 0)
1731n/a self.assertRaises(OverflowError, os.setgid, 1<<32)
1732n/a
1733n/a @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1734n/a def test_seteuid(self):
1735n/a if os.getuid() != 0:
1736n/a self.assertRaises(OSError, os.seteuid, 0)
1737n/a self.assertRaises(OverflowError, os.seteuid, 1<<32)
1738n/a
1739n/a @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1740n/a def test_setegid(self):
1741n/a if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1742n/a self.assertRaises(OSError, os.setegid, 0)
1743n/a self.assertRaises(OverflowError, os.setegid, 1<<32)
1744n/a
1745n/a @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1746n/a def test_setreuid(self):
1747n/a if os.getuid() != 0:
1748n/a self.assertRaises(OSError, os.setreuid, 0, 0)
1749n/a self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1750n/a self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
1751n/a
1752n/a @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1753n/a def test_setreuid_neg1(self):
1754n/a # Needs to accept -1. We run this in a subprocess to avoid
1755n/a # altering the test runner's process state (issue8045).
1756n/a subprocess.check_call([
1757n/a sys.executable, '-c',
1758n/a 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
1759n/a
1760n/a @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1761n/a def test_setregid(self):
1762n/a if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1763n/a self.assertRaises(OSError, os.setregid, 0, 0)
1764n/a self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1765n/a self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
1766n/a
1767n/a @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1768n/a def test_setregid_neg1(self):
1769n/a # Needs to accept -1. We run this in a subprocess to avoid
1770n/a # altering the test runner's process state (issue8045).
1771n/a subprocess.check_call([
1772n/a sys.executable, '-c',
1773n/a 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
1774n/a
1775n/a@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1776n/aclass Pep383Tests(unittest.TestCase):
1777n/a def setUp(self):
1778n/a if support.TESTFN_UNENCODABLE:
1779n/a self.dir = support.TESTFN_UNENCODABLE
1780n/a elif support.TESTFN_NONASCII:
1781n/a self.dir = support.TESTFN_NONASCII
1782n/a else:
1783n/a self.dir = support.TESTFN
1784n/a self.bdir = os.fsencode(self.dir)
1785n/a
1786n/a bytesfn = []
1787n/a def add_filename(fn):
1788n/a try:
1789n/a fn = os.fsencode(fn)
1790n/a except UnicodeEncodeError:
1791n/a return
1792n/a bytesfn.append(fn)
1793n/a add_filename(support.TESTFN_UNICODE)
1794n/a if support.TESTFN_UNENCODABLE:
1795n/a add_filename(support.TESTFN_UNENCODABLE)
1796n/a if support.TESTFN_NONASCII:
1797n/a add_filename(support.TESTFN_NONASCII)
1798n/a if not bytesfn:
1799n/a self.skipTest("couldn't create any non-ascii filename")
1800n/a
1801n/a self.unicodefn = set()
1802n/a os.mkdir(self.dir)
1803n/a try:
1804n/a for fn in bytesfn:
1805n/a support.create_empty_file(os.path.join(self.bdir, fn))
1806n/a fn = os.fsdecode(fn)
1807n/a if fn in self.unicodefn:
1808n/a raise ValueError("duplicate filename")
1809n/a self.unicodefn.add(fn)
1810n/a except:
1811n/a shutil.rmtree(self.dir)
1812n/a raise
1813n/a
1814n/a def tearDown(self):
1815n/a shutil.rmtree(self.dir)
1816n/a
1817n/a def test_listdir(self):
1818n/a expected = self.unicodefn
1819n/a found = set(os.listdir(self.dir))
1820n/a self.assertEqual(found, expected)
1821n/a # test listdir without arguments
1822n/a current_directory = os.getcwd()
1823n/a try:
1824n/a os.chdir(os.sep)
1825n/a self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1826n/a finally:
1827n/a os.chdir(current_directory)
1828n/a
1829n/a def test_open(self):
1830n/a for fn in self.unicodefn:
1831n/a f = open(os.path.join(self.dir, fn), 'rb')
1832n/a f.close()
1833n/a
1834n/a @unittest.skipUnless(hasattr(os, 'statvfs'),
1835n/a "need os.statvfs()")
1836n/a def test_statvfs(self):
1837n/a # issue #9645
1838n/a for fn in self.unicodefn:
1839n/a # should not fail with file not found error
1840n/a fullname = os.path.join(self.dir, fn)
1841n/a os.statvfs(fullname)
1842n/a
1843n/a def test_stat(self):
1844n/a for fn in self.unicodefn:
1845n/a os.stat(os.path.join(self.dir, fn))
1846n/a
1847n/a@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1848n/aclass Win32KillTests(unittest.TestCase):
1849n/a def _kill(self, sig):
1850n/a # Start sys.executable as a subprocess and communicate from the
1851n/a # subprocess to the parent that the interpreter is ready. When it
1852n/a # becomes ready, send *sig* via os.kill to the subprocess and check
1853n/a # that the return code is equal to *sig*.
1854n/a import ctypes
1855n/a from ctypes import wintypes
1856n/a import msvcrt
1857n/a
1858n/a # Since we can't access the contents of the process' stdout until the
1859n/a # process has exited, use PeekNamedPipe to see what's inside stdout
1860n/a # without waiting. This is done so we can tell that the interpreter
1861n/a # is started and running at a point where it could handle a signal.
1862n/a PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1863n/a PeekNamedPipe.restype = wintypes.BOOL
1864n/a PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1865n/a ctypes.POINTER(ctypes.c_char), # stdout buf
1866n/a wintypes.DWORD, # Buffer size
1867n/a ctypes.POINTER(wintypes.DWORD), # bytes read
1868n/a ctypes.POINTER(wintypes.DWORD), # bytes avail
1869n/a ctypes.POINTER(wintypes.DWORD)) # bytes left
1870n/a msg = "running"
1871n/a proc = subprocess.Popen([sys.executable, "-c",
1872n/a "import sys;"
1873n/a "sys.stdout.write('{}');"
1874n/a "sys.stdout.flush();"
1875n/a "input()".format(msg)],
1876n/a stdout=subprocess.PIPE,
1877n/a stderr=subprocess.PIPE,
1878n/a stdin=subprocess.PIPE)
1879n/a self.addCleanup(proc.stdout.close)
1880n/a self.addCleanup(proc.stderr.close)
1881n/a self.addCleanup(proc.stdin.close)
1882n/a
1883n/a count, max = 0, 100
1884n/a while count < max and proc.poll() is None:
1885n/a # Create a string buffer to store the result of stdout from the pipe
1886n/a buf = ctypes.create_string_buffer(len(msg))
1887n/a # Obtain the text currently in proc.stdout
1888n/a # Bytes read/avail/left are left as NULL and unused
1889n/a rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1890n/a buf, ctypes.sizeof(buf), None, None, None)
1891n/a self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1892n/a if buf.value:
1893n/a self.assertEqual(msg, buf.value.decode())
1894n/a break
1895n/a time.sleep(0.1)
1896n/a count += 1
1897n/a else:
1898n/a self.fail("Did not receive communication from the subprocess")
1899n/a
1900n/a os.kill(proc.pid, sig)
1901n/a self.assertEqual(proc.wait(), sig)
1902n/a
1903n/a def test_kill_sigterm(self):
1904n/a # SIGTERM doesn't mean anything special, but make sure it works
1905n/a self._kill(signal.SIGTERM)
1906n/a
1907n/a def test_kill_int(self):
1908n/a # os.kill on Windows can take an int which gets set as the exit code
1909n/a self._kill(100)
1910n/a
1911n/a def _kill_with_event(self, event, name):
1912n/a tagname = "test_os_%s" % uuid.uuid1()
1913n/a m = mmap.mmap(-1, 1, tagname)
1914n/a m[0] = 0
1915n/a # Run a script which has console control handling enabled.
1916n/a proc = subprocess.Popen([sys.executable,
1917n/a os.path.join(os.path.dirname(__file__),
1918n/a "win_console_handler.py"), tagname],
1919n/a creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1920n/a # Let the interpreter startup before we send signals. See #3137.
1921n/a count, max = 0, 100
1922n/a while count < max and proc.poll() is None:
1923n/a if m[0] == 1:
1924n/a break
1925n/a time.sleep(0.1)
1926n/a count += 1
1927n/a else:
1928n/a # Forcefully kill the process if we weren't able to signal it.
1929n/a os.kill(proc.pid, signal.SIGINT)
1930n/a self.fail("Subprocess didn't finish initialization")
1931n/a os.kill(proc.pid, event)
1932n/a # proc.send_signal(event) could also be done here.
1933n/a # Allow time for the signal to be passed and the process to exit.
1934n/a time.sleep(0.5)
1935n/a if not proc.poll():
1936n/a # Forcefully kill the process if we weren't able to signal it.
1937n/a os.kill(proc.pid, signal.SIGINT)
1938n/a self.fail("subprocess did not stop on {}".format(name))
1939n/a
1940n/a @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
1941n/a def test_CTRL_C_EVENT(self):
1942n/a from ctypes import wintypes
1943n/a import ctypes
1944n/a
1945n/a # Make a NULL value by creating a pointer with no argument.
1946n/a NULL = ctypes.POINTER(ctypes.c_int)()
1947n/a SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1948n/a SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1949n/a wintypes.BOOL)
1950n/a SetConsoleCtrlHandler.restype = wintypes.BOOL
1951n/a
1952n/a # Calling this with NULL and FALSE causes the calling process to
1953n/a # handle Ctrl+C, rather than ignore it. This property is inherited
1954n/a # by subprocesses.
1955n/a SetConsoleCtrlHandler(NULL, 0)
1956n/a
1957n/a self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1958n/a
1959n/a def test_CTRL_BREAK_EVENT(self):
1960n/a self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1961n/a
1962n/a
1963n/a@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1964n/aclass Win32ListdirTests(unittest.TestCase):
1965n/a """Test listdir on Windows."""
1966n/a
1967n/a def setUp(self):
1968n/a self.created_paths = []
1969n/a for i in range(2):
1970n/a dir_name = 'SUB%d' % i
1971n/a dir_path = os.path.join(support.TESTFN, dir_name)
1972n/a file_name = 'FILE%d' % i
1973n/a file_path = os.path.join(support.TESTFN, file_name)
1974n/a os.makedirs(dir_path)
1975n/a with open(file_path, 'w') as f:
1976n/a f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1977n/a self.created_paths.extend([dir_name, file_name])
1978n/a self.created_paths.sort()
1979n/a
1980n/a def tearDown(self):
1981n/a shutil.rmtree(support.TESTFN)
1982n/a
1983n/a def test_listdir_no_extended_path(self):
1984n/a """Test when the path is not an "extended" path."""
1985n/a # unicode
1986n/a self.assertEqual(
1987n/a sorted(os.listdir(support.TESTFN)),
1988n/a self.created_paths)
1989n/a
1990n/a # bytes
1991n/a self.assertEqual(
1992n/a sorted(os.listdir(os.fsencode(support.TESTFN))),
1993n/a [os.fsencode(path) for path in self.created_paths])
1994n/a
1995n/a def test_listdir_extended_path(self):
1996n/a """Test when the path starts with '\\\\?\\'."""
1997n/a # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
1998n/a # unicode
1999n/a path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2000n/a self.assertEqual(
2001n/a sorted(os.listdir(path)),
2002n/a self.created_paths)
2003n/a
2004n/a # bytes
2005n/a path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2006n/a self.assertEqual(
2007n/a sorted(os.listdir(path)),
2008n/a [os.fsencode(path) for path in self.created_paths])
2009n/a
2010n/a
2011n/a@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2012n/a@support.skip_unless_symlink
2013n/aclass Win32SymlinkTests(unittest.TestCase):
2014n/a filelink = 'filelinktest'
2015n/a filelink_target = os.path.abspath(__file__)
2016n/a dirlink = 'dirlinktest'
2017n/a dirlink_target = os.path.dirname(filelink_target)
2018n/a missing_link = 'missing link'
2019n/a
2020n/a def setUp(self):
2021n/a assert os.path.exists(self.dirlink_target)
2022n/a assert os.path.exists(self.filelink_target)
2023n/a assert not os.path.exists(self.dirlink)
2024n/a assert not os.path.exists(self.filelink)
2025n/a assert not os.path.exists(self.missing_link)
2026n/a
2027n/a def tearDown(self):
2028n/a if os.path.exists(self.filelink):
2029n/a os.remove(self.filelink)
2030n/a if os.path.exists(self.dirlink):
2031n/a os.rmdir(self.dirlink)
2032n/a if os.path.lexists(self.missing_link):
2033n/a os.remove(self.missing_link)
2034n/a
2035n/a def test_directory_link(self):
2036n/a os.symlink(self.dirlink_target, self.dirlink)
2037n/a self.assertTrue(os.path.exists(self.dirlink))
2038n/a self.assertTrue(os.path.isdir(self.dirlink))
2039n/a self.assertTrue(os.path.islink(self.dirlink))
2040n/a self.check_stat(self.dirlink, self.dirlink_target)
2041n/a
2042n/a def test_file_link(self):
2043n/a os.symlink(self.filelink_target, self.filelink)
2044n/a self.assertTrue(os.path.exists(self.filelink))
2045n/a self.assertTrue(os.path.isfile(self.filelink))
2046n/a self.assertTrue(os.path.islink(self.filelink))
2047n/a self.check_stat(self.filelink, self.filelink_target)
2048n/a
2049n/a def _create_missing_dir_link(self):
2050n/a 'Create a "directory" link to a non-existent target'
2051n/a linkname = self.missing_link
2052n/a if os.path.lexists(linkname):
2053n/a os.remove(linkname)
2054n/a target = r'c:\\target does not exist.29r3c740'
2055n/a assert not os.path.exists(target)
2056n/a target_is_dir = True
2057n/a os.symlink(target, linkname, target_is_dir)
2058n/a
2059n/a def test_remove_directory_link_to_missing_target(self):
2060n/a self._create_missing_dir_link()
2061n/a # For compatibility with Unix, os.remove will check the
2062n/a # directory status and call RemoveDirectory if the symlink
2063n/a # was created with target_is_dir==True.
2064n/a os.remove(self.missing_link)
2065n/a
2066n/a @unittest.skip("currently fails; consider for improvement")
2067n/a def test_isdir_on_directory_link_to_missing_target(self):
2068n/a self._create_missing_dir_link()
2069n/a # consider having isdir return true for directory links
2070n/a self.assertTrue(os.path.isdir(self.missing_link))
2071n/a
2072n/a @unittest.skip("currently fails; consider for improvement")
2073n/a def test_rmdir_on_directory_link_to_missing_target(self):
2074n/a self._create_missing_dir_link()
2075n/a # consider allowing rmdir to remove directory links
2076n/a os.rmdir(self.missing_link)
2077n/a
2078n/a def check_stat(self, link, target):
2079n/a self.assertEqual(os.stat(link), os.stat(target))
2080n/a self.assertNotEqual(os.lstat(link), os.stat(link))
2081n/a
2082n/a bytes_link = os.fsencode(link)
2083n/a self.assertEqual(os.stat(bytes_link), os.stat(target))
2084n/a self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2085n/a
2086n/a def test_12084(self):
2087n/a level1 = os.path.abspath(support.TESTFN)
2088n/a level2 = os.path.join(level1, "level2")
2089n/a level3 = os.path.join(level2, "level3")
2090n/a self.addCleanup(support.rmtree, level1)
2091n/a
2092n/a os.mkdir(level1)
2093n/a os.mkdir(level2)
2094n/a os.mkdir(level3)
2095n/a
2096n/a file1 = os.path.abspath(os.path.join(level1, "file1"))
2097n/a create_file(file1)
2098n/a
2099n/a orig_dir = os.getcwd()
2100n/a try:
2101n/a os.chdir(level2)
2102n/a link = os.path.join(level2, "link")
2103n/a os.symlink(os.path.relpath(file1), "link")
2104n/a self.assertIn("link", os.listdir(os.getcwd()))
2105n/a
2106n/a # Check os.stat calls from the same dir as the link
2107n/a self.assertEqual(os.stat(file1), os.stat("link"))
2108n/a
2109n/a # Check os.stat calls from a dir below the link
2110n/a os.chdir(level1)
2111n/a self.assertEqual(os.stat(file1),
2112n/a os.stat(os.path.relpath(link)))
2113n/a
2114n/a # Check os.stat calls from a dir above the link
2115n/a os.chdir(level3)
2116n/a self.assertEqual(os.stat(file1),
2117n/a os.stat(os.path.relpath(link)))
2118n/a finally:
2119n/a os.chdir(orig_dir)
2120n/a
2121n/a
2122n/a@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2123n/aclass Win32JunctionTests(unittest.TestCase):
2124n/a junction = 'junctiontest'
2125n/a junction_target = os.path.dirname(os.path.abspath(__file__))
2126n/a
2127n/a def setUp(self):
2128n/a assert os.path.exists(self.junction_target)
2129n/a assert not os.path.exists(self.junction)
2130n/a
2131n/a def tearDown(self):
2132n/a if os.path.exists(self.junction):
2133n/a # os.rmdir delegates to Windows' RemoveDirectoryW,
2134n/a # which removes junction points safely.
2135n/a os.rmdir(self.junction)
2136n/a
2137n/a def test_create_junction(self):
2138n/a _winapi.CreateJunction(self.junction_target, self.junction)
2139n/a self.assertTrue(os.path.exists(self.junction))
2140n/a self.assertTrue(os.path.isdir(self.junction))
2141n/a
2142n/a # Junctions are not recognized as links.
2143n/a self.assertFalse(os.path.islink(self.junction))
2144n/a
2145n/a def test_unlink_removes_junction(self):
2146n/a _winapi.CreateJunction(self.junction_target, self.junction)
2147n/a self.assertTrue(os.path.exists(self.junction))
2148n/a
2149n/a os.unlink(self.junction)
2150n/a self.assertFalse(os.path.exists(self.junction))
2151n/a
2152n/a
2153n/a@support.skip_unless_symlink
2154n/aclass NonLocalSymlinkTests(unittest.TestCase):
2155n/a
2156n/a def setUp(self):
2157n/a r"""
2158n/a Create this structure:
2159n/a
2160n/a base
2161n/a \___ some_dir
2162n/a """
2163n/a os.makedirs('base/some_dir')
2164n/a
2165n/a def tearDown(self):
2166n/a shutil.rmtree('base')
2167n/a
2168n/a def test_directory_link_nonlocal(self):
2169n/a """
2170n/a The symlink target should resolve relative to the link, not relative
2171n/a to the current directory.
2172n/a
2173n/a Then, link base/some_link -> base/some_dir and ensure that some_link
2174n/a is resolved as a directory.
2175n/a
2176n/a In issue13772, it was discovered that directory detection failed if
2177n/a the symlink target was not specified relative to the current
2178n/a directory, which was a defect in the implementation.
2179n/a """
2180n/a src = os.path.join('base', 'some_link')
2181n/a os.symlink('some_dir', src)
2182n/a assert os.path.isdir(src)
2183n/a
2184n/a
2185n/aclass FSEncodingTests(unittest.TestCase):
2186n/a def test_nop(self):
2187n/a self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2188n/a self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
2189n/a
2190n/a def test_identity(self):
2191n/a # assert fsdecode(fsencode(x)) == x
2192n/a for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2193n/a try:
2194n/a bytesfn = os.fsencode(fn)
2195n/a except UnicodeEncodeError:
2196n/a continue
2197n/a self.assertEqual(os.fsdecode(bytesfn), fn)
2198n/a
2199n/a
2200n/a
2201n/aclass DeviceEncodingTests(unittest.TestCase):
2202n/a
2203n/a def test_bad_fd(self):
2204n/a # Return None when an fd doesn't actually exist.
2205n/a self.assertIsNone(os.device_encoding(123456))
2206n/a
2207n/a @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2208n/a (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
2209n/a 'test requires a tty and either Windows or nl_langinfo(CODESET)')
2210n/a def test_device_encoding(self):
2211n/a encoding = os.device_encoding(0)
2212n/a self.assertIsNotNone(encoding)
2213n/a self.assertTrue(codecs.lookup(encoding))
2214n/a
2215n/a
2216n/aclass PidTests(unittest.TestCase):
2217n/a @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2218n/a def test_getppid(self):
2219n/a p = subprocess.Popen([sys.executable, '-c',
2220n/a 'import os; print(os.getppid())'],
2221n/a stdout=subprocess.PIPE)
2222n/a stdout, _ = p.communicate()
2223n/a # We are the parent of our subprocess
2224n/a self.assertEqual(int(stdout), os.getpid())
2225n/a
2226n/a def test_waitpid(self):
2227n/a args = [sys.executable, '-c', 'pass']
2228n/a # Add an implicit test for PyUnicode_FSConverter().
2229n/a pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
2230n/a status = os.waitpid(pid, 0)
2231n/a self.assertEqual(status, (pid, 0))
2232n/a
2233n/a
2234n/aclass SpawnTests(unittest.TestCase):
2235n/a def create_args(self, *, with_env=False, use_bytes=False):
2236n/a self.exitcode = 17
2237n/a
2238n/a filename = support.TESTFN
2239n/a self.addCleanup(support.unlink, filename)
2240n/a
2241n/a if not with_env:
2242n/a code = 'import sys; sys.exit(%s)' % self.exitcode
2243n/a else:
2244n/a self.env = dict(os.environ)
2245n/a # create an unique key
2246n/a self.key = str(uuid.uuid4())
2247n/a self.env[self.key] = self.key
2248n/a # read the variable from os.environ to check that it exists
2249n/a code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2250n/a % (self.key, self.exitcode))
2251n/a
2252n/a with open(filename, "w") as fp:
2253n/a fp.write(code)
2254n/a
2255n/a args = [sys.executable, filename]
2256n/a if use_bytes:
2257n/a args = [os.fsencode(a) for a in args]
2258n/a self.env = {os.fsencode(k): os.fsencode(v)
2259n/a for k, v in self.env.items()}
2260n/a
2261n/a return args
2262n/a
2263n/a @requires_os_func('spawnl')
2264n/a def test_spawnl(self):
2265n/a args = self.create_args()
2266n/a exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2267n/a self.assertEqual(exitcode, self.exitcode)
2268n/a
2269n/a @requires_os_func('spawnle')
2270n/a def test_spawnle(self):
2271n/a args = self.create_args(with_env=True)
2272n/a exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2273n/a self.assertEqual(exitcode, self.exitcode)
2274n/a
2275n/a @requires_os_func('spawnlp')
2276n/a def test_spawnlp(self):
2277n/a args = self.create_args()
2278n/a exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2279n/a self.assertEqual(exitcode, self.exitcode)
2280n/a
2281n/a @requires_os_func('spawnlpe')
2282n/a def test_spawnlpe(self):
2283n/a args = self.create_args(with_env=True)
2284n/a exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2285n/a self.assertEqual(exitcode, self.exitcode)
2286n/a
2287n/a @requires_os_func('spawnv')
2288n/a def test_spawnv(self):
2289n/a args = self.create_args()
2290n/a exitcode = os.spawnv(os.P_WAIT, args[0], args)
2291n/a self.assertEqual(exitcode, self.exitcode)
2292n/a
2293n/a @requires_os_func('spawnve')
2294n/a def test_spawnve(self):
2295n/a args = self.create_args(with_env=True)
2296n/a exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2297n/a self.assertEqual(exitcode, self.exitcode)
2298n/a
2299n/a @requires_os_func('spawnvp')
2300n/a def test_spawnvp(self):
2301n/a args = self.create_args()
2302n/a exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2303n/a self.assertEqual(exitcode, self.exitcode)
2304n/a
2305n/a @requires_os_func('spawnvpe')
2306n/a def test_spawnvpe(self):
2307n/a args = self.create_args(with_env=True)
2308n/a exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2309n/a self.assertEqual(exitcode, self.exitcode)
2310n/a
2311n/a @requires_os_func('spawnv')
2312n/a def test_nowait(self):
2313n/a args = self.create_args()
2314n/a pid = os.spawnv(os.P_NOWAIT, args[0], args)
2315n/a result = os.waitpid(pid, 0)
2316n/a self.assertEqual(result[0], pid)
2317n/a status = result[1]
2318n/a if hasattr(os, 'WIFEXITED'):
2319n/a self.assertTrue(os.WIFEXITED(status))
2320n/a self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2321n/a else:
2322n/a self.assertEqual(status, self.exitcode << 8)
2323n/a
2324n/a @requires_os_func('spawnve')
2325n/a def test_spawnve_bytes(self):
2326n/a # Test bytes handling in parse_arglist and parse_envlist (#28114)
2327n/a args = self.create_args(with_env=True, use_bytes=True)
2328n/a exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2329n/a self.assertEqual(exitcode, self.exitcode)
2330n/a
2331n/a @requires_os_func('spawnl')
2332n/a def test_spawnl_noargs(self):
2333n/a args = self.create_args()
2334n/a self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
2335n/a self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
2336n/a
2337n/a @requires_os_func('spawnle')
2338n/a def test_spawnle_noargs(self):
2339n/a args = self.create_args()
2340n/a self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
2341n/a self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
2342n/a
2343n/a @requires_os_func('spawnv')
2344n/a def test_spawnv_noargs(self):
2345n/a args = self.create_args()
2346n/a self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2347n/a self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
2348n/a self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2349n/a self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
2350n/a
2351n/a @requires_os_func('spawnve')
2352n/a def test_spawnve_noargs(self):
2353n/a args = self.create_args()
2354n/a self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2355n/a self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
2356n/a self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2357n/a self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
2358n/a
2359n/a# The introduction of this TestCase caused at least two different errors on
2360n/a# *nix buildbots. Temporarily skip this to let the buildbots move along.
2361n/a@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
2362n/a@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2363n/aclass LoginTests(unittest.TestCase):
2364n/a def test_getlogin(self):
2365n/a user_name = os.getlogin()
2366n/a self.assertNotEqual(len(user_name), 0)
2367n/a
2368n/a
2369n/a@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2370n/a "needs os.getpriority and os.setpriority")
2371n/aclass ProgramPriorityTests(unittest.TestCase):
2372n/a """Tests for os.getpriority() and os.setpriority()."""
2373n/a
2374n/a def test_set_get_priority(self):
2375n/a
2376n/a base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2377n/a os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2378n/a try:
2379n/a new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2380n/a if base >= 19 and new_prio <= 19:
2381n/a raise unittest.SkipTest("unable to reliably test setpriority "
2382n/a "at current nice level of %s" % base)
2383n/a else:
2384n/a self.assertEqual(new_prio, base + 1)
2385n/a finally:
2386n/a try:
2387n/a os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2388n/a except OSError as err:
2389n/a if err.errno != errno.EACCES:
2390n/a raise
2391n/a
2392n/a
2393n/aif threading is not None:
2394n/a class SendfileTestServer(asyncore.dispatcher, threading.Thread):
2395n/a
2396n/a class Handler(asynchat.async_chat):
2397n/a
2398n/a def __init__(self, conn):
2399n/a asynchat.async_chat.__init__(self, conn)
2400n/a self.in_buffer = []
2401n/a self.closed = False
2402n/a self.push(b"220 ready\r\n")
2403n/a
2404n/a def handle_read(self):
2405n/a data = self.recv(4096)
2406n/a self.in_buffer.append(data)
2407n/a
2408n/a def get_data(self):
2409n/a return b''.join(self.in_buffer)
2410n/a
2411n/a def handle_close(self):
2412n/a self.close()
2413n/a self.closed = True
2414n/a
2415n/a def handle_error(self):
2416n/a raise
2417n/a
2418n/a def __init__(self, address):
2419n/a threading.Thread.__init__(self)
2420n/a asyncore.dispatcher.__init__(self)
2421n/a self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2422n/a self.bind(address)
2423n/a self.listen(5)
2424n/a self.host, self.port = self.socket.getsockname()[:2]
2425n/a self.handler_instance = None
2426n/a self._active = False
2427n/a self._active_lock = threading.Lock()
2428n/a
2429n/a # --- public API
2430n/a
2431n/a @property
2432n/a def running(self):
2433n/a return self._active
2434n/a
2435n/a def start(self):
2436n/a assert not self.running
2437n/a self.__flag = threading.Event()
2438n/a threading.Thread.start(self)
2439n/a self.__flag.wait()
2440n/a
2441n/a def stop(self):
2442n/a assert self.running
2443n/a self._active = False
2444n/a self.join()
2445n/a
2446n/a def wait(self):
2447n/a # wait for handler connection to be closed, then stop the server
2448n/a while not getattr(self.handler_instance, "closed", False):
2449n/a time.sleep(0.001)
2450n/a self.stop()
2451n/a
2452n/a # --- internals
2453n/a
2454n/a def run(self):
2455n/a self._active = True
2456n/a self.__flag.set()
2457n/a while self._active and asyncore.socket_map:
2458n/a self._active_lock.acquire()
2459n/a asyncore.loop(timeout=0.001, count=1)
2460n/a self._active_lock.release()
2461n/a asyncore.close_all()
2462n/a
2463n/a def handle_accept(self):
2464n/a conn, addr = self.accept()
2465n/a self.handler_instance = self.Handler(conn)
2466n/a
2467n/a def handle_connect(self):
2468n/a self.close()
2469n/a handle_read = handle_connect
2470n/a
2471n/a def writable(self):
2472n/a return 0
2473n/a
2474n/a def handle_error(self):
2475n/a raise
2476n/a
2477n/a
2478n/a@unittest.skipUnless(threading is not None, "test needs threading module")
2479n/a@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2480n/aclass TestSendfile(unittest.TestCase):
2481n/a
2482n/a DATA = b"12345abcde" * 16 * 1024 # 160 KB
2483n/a SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
2484n/a not sys.platform.startswith("solaris") and \
2485n/a not sys.platform.startswith("sunos")
2486n/a requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2487n/a 'requires headers and trailers support')
2488n/a
2489n/a @classmethod
2490n/a def setUpClass(cls):
2491n/a cls.key = support.threading_setup()
2492n/a create_file(support.TESTFN, cls.DATA)
2493n/a
2494n/a @classmethod
2495n/a def tearDownClass(cls):
2496n/a support.threading_cleanup(*cls.key)
2497n/a support.unlink(support.TESTFN)
2498n/a
2499n/a def setUp(self):
2500n/a self.server = SendfileTestServer((support.HOST, 0))
2501n/a self.server.start()
2502n/a self.client = socket.socket()
2503n/a self.client.connect((self.server.host, self.server.port))
2504n/a self.client.settimeout(1)
2505n/a # synchronize by waiting for "220 ready" response
2506n/a self.client.recv(1024)
2507n/a self.sockno = self.client.fileno()
2508n/a self.file = open(support.TESTFN, 'rb')
2509n/a self.fileno = self.file.fileno()
2510n/a
2511n/a def tearDown(self):
2512n/a self.file.close()
2513n/a self.client.close()
2514n/a if self.server.running:
2515n/a self.server.stop()
2516n/a
2517n/a def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2518n/a """A higher level wrapper representing how an application is
2519n/a supposed to use sendfile().
2520n/a """
2521n/a while 1:
2522n/a try:
2523n/a if self.SUPPORT_HEADERS_TRAILERS:
2524n/a return os.sendfile(sock, file, offset, nbytes, headers,
2525n/a trailers)
2526n/a else:
2527n/a return os.sendfile(sock, file, offset, nbytes)
2528n/a except OSError as err:
2529n/a if err.errno == errno.ECONNRESET:
2530n/a # disconnected
2531n/a raise
2532n/a elif err.errno in (errno.EAGAIN, errno.EBUSY):
2533n/a # we have to retry send data
2534n/a continue
2535n/a else:
2536n/a raise
2537n/a
2538n/a def test_send_whole_file(self):
2539n/a # normal send
2540n/a total_sent = 0
2541n/a offset = 0
2542n/a nbytes = 4096
2543n/a while total_sent < len(self.DATA):
2544n/a sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2545n/a if sent == 0:
2546n/a break
2547n/a offset += sent
2548n/a total_sent += sent
2549n/a self.assertTrue(sent <= nbytes)
2550n/a self.assertEqual(offset, total_sent)
2551n/a
2552n/a self.assertEqual(total_sent, len(self.DATA))
2553n/a self.client.shutdown(socket.SHUT_RDWR)
2554n/a self.client.close()
2555n/a self.server.wait()
2556n/a data = self.server.handler_instance.get_data()
2557n/a self.assertEqual(len(data), len(self.DATA))
2558n/a self.assertEqual(data, self.DATA)
2559n/a
2560n/a def test_send_at_certain_offset(self):
2561n/a # start sending a file at a certain offset
2562n/a total_sent = 0
2563n/a offset = len(self.DATA) // 2
2564n/a must_send = len(self.DATA) - offset
2565n/a nbytes = 4096
2566n/a while total_sent < must_send:
2567n/a sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2568n/a if sent == 0:
2569n/a break
2570n/a offset += sent
2571n/a total_sent += sent
2572n/a self.assertTrue(sent <= nbytes)
2573n/a
2574n/a self.client.shutdown(socket.SHUT_RDWR)
2575n/a self.client.close()
2576n/a self.server.wait()
2577n/a data = self.server.handler_instance.get_data()
2578n/a expected = self.DATA[len(self.DATA) // 2:]
2579n/a self.assertEqual(total_sent, len(expected))
2580n/a self.assertEqual(len(data), len(expected))
2581n/a self.assertEqual(data, expected)
2582n/a
2583n/a def test_offset_overflow(self):
2584n/a # specify an offset > file size
2585n/a offset = len(self.DATA) + 4096
2586n/a try:
2587n/a sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2588n/a except OSError as e:
2589n/a # Solaris can raise EINVAL if offset >= file length, ignore.
2590n/a if e.errno != errno.EINVAL:
2591n/a raise
2592n/a else:
2593n/a self.assertEqual(sent, 0)
2594n/a self.client.shutdown(socket.SHUT_RDWR)
2595n/a self.client.close()
2596n/a self.server.wait()
2597n/a data = self.server.handler_instance.get_data()
2598n/a self.assertEqual(data, b'')
2599n/a
2600n/a def test_invalid_offset(self):
2601n/a with self.assertRaises(OSError) as cm:
2602n/a os.sendfile(self.sockno, self.fileno, -1, 4096)
2603n/a self.assertEqual(cm.exception.errno, errno.EINVAL)
2604n/a
2605n/a def test_keywords(self):
2606n/a # Keyword arguments should be supported
2607n/a os.sendfile(out=self.sockno, offset=0, count=4096,
2608n/a **{'in': self.fileno})
2609n/a if self.SUPPORT_HEADERS_TRAILERS:
2610n/a os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
2611n/a headers=(), trailers=(), flags=0)
2612n/a
2613n/a # --- headers / trailers tests
2614n/a
2615n/a @requires_headers_trailers
2616n/a def test_headers(self):
2617n/a total_sent = 0
2618n/a sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2619n/a headers=[b"x" * 512])
2620n/a total_sent += sent
2621n/a offset = 4096
2622n/a nbytes = 4096
2623n/a while 1:
2624n/a sent = self.sendfile_wrapper(self.sockno, self.fileno,
2625n/a offset, nbytes)
2626n/a if sent == 0:
2627n/a break
2628n/a total_sent += sent
2629n/a offset += sent
2630n/a
2631n/a expected_data = b"x" * 512 + self.DATA
2632n/a self.assertEqual(total_sent, len(expected_data))
2633n/a self.client.close()
2634n/a self.server.wait()
2635n/a data = self.server.handler_instance.get_data()
2636n/a self.assertEqual(hash(data), hash(expected_data))
2637n/a
2638n/a @requires_headers_trailers
2639n/a def test_trailers(self):
2640n/a TESTFN2 = support.TESTFN + "2"
2641n/a file_data = b"abcdef"
2642n/a
2643n/a self.addCleanup(support.unlink, TESTFN2)
2644n/a create_file(TESTFN2, file_data)
2645n/a
2646n/a with open(TESTFN2, 'rb') as f:
2647n/a os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2648n/a trailers=[b"1234"])
2649n/a self.client.close()
2650n/a self.server.wait()
2651n/a data = self.server.handler_instance.get_data()
2652n/a self.assertEqual(data, b"abcdef1234")
2653n/a
2654n/a @requires_headers_trailers
2655n/a @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2656n/a 'test needs os.SF_NODISKIO')
2657n/a def test_flags(self):
2658n/a try:
2659n/a os.sendfile(self.sockno, self.fileno, 0, 4096,
2660n/a flags=os.SF_NODISKIO)
2661n/a except OSError as err:
2662n/a if err.errno not in (errno.EBUSY, errno.EAGAIN):
2663n/a raise
2664n/a
2665n/a
2666n/adef supports_extended_attributes():
2667n/a if not hasattr(os, "setxattr"):
2668n/a return False
2669n/a
2670n/a try:
2671n/a with open(support.TESTFN, "xb", 0) as fp:
2672n/a try:
2673n/a os.setxattr(fp.fileno(), b"user.test", b"")
2674n/a except OSError:
2675n/a return False
2676n/a finally:
2677n/a support.unlink(support.TESTFN)
2678n/a
2679n/a return True
2680n/a
2681n/a
2682n/a@unittest.skipUnless(supports_extended_attributes(),
2683n/a "no non-broken extended attribute support")
2684n/a# Kernels < 2.6.39 don't respect setxattr flags.
2685n/a@support.requires_linux_version(2, 6, 39)
2686n/aclass ExtendedAttributeTests(unittest.TestCase):
2687n/a
2688n/a def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
2689n/a fn = support.TESTFN
2690n/a self.addCleanup(support.unlink, fn)
2691n/a create_file(fn)
2692n/a
2693n/a with self.assertRaises(OSError) as cm:
2694n/a getxattr(fn, s("user.test"), **kwargs)
2695n/a self.assertEqual(cm.exception.errno, errno.ENODATA)
2696n/a
2697n/a init_xattr = listxattr(fn)
2698n/a self.assertIsInstance(init_xattr, list)
2699n/a
2700n/a setxattr(fn, s("user.test"), b"", **kwargs)
2701n/a xattr = set(init_xattr)
2702n/a xattr.add("user.test")
2703n/a self.assertEqual(set(listxattr(fn)), xattr)
2704n/a self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2705n/a setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2706n/a self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
2707n/a
2708n/a with self.assertRaises(OSError) as cm:
2709n/a setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
2710n/a self.assertEqual(cm.exception.errno, errno.EEXIST)
2711n/a
2712n/a with self.assertRaises(OSError) as cm:
2713n/a setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
2714n/a self.assertEqual(cm.exception.errno, errno.ENODATA)
2715n/a
2716n/a setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
2717n/a xattr.add("user.test2")
2718n/a self.assertEqual(set(listxattr(fn)), xattr)
2719n/a removexattr(fn, s("user.test"), **kwargs)
2720n/a
2721n/a with self.assertRaises(OSError) as cm:
2722n/a getxattr(fn, s("user.test"), **kwargs)
2723n/a self.assertEqual(cm.exception.errno, errno.ENODATA)
2724n/a
2725n/a xattr.remove("user.test")
2726n/a self.assertEqual(set(listxattr(fn)), xattr)
2727n/a self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2728n/a setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2729n/a self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2730n/a removexattr(fn, s("user.test"), **kwargs)
2731n/a many = sorted("user.test{}".format(i) for i in range(100))
2732n/a for thing in many:
2733n/a setxattr(fn, thing, b"x", **kwargs)
2734n/a self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
2735n/a
2736n/a def _check_xattrs(self, *args, **kwargs):
2737n/a self._check_xattrs_str(str, *args, **kwargs)
2738n/a support.unlink(support.TESTFN)
2739n/a
2740n/a self._check_xattrs_str(os.fsencode, *args, **kwargs)
2741n/a support.unlink(support.TESTFN)
2742n/a
2743n/a def test_simple(self):
2744n/a self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2745n/a os.listxattr)
2746n/a
2747n/a def test_lpath(self):
2748n/a self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2749n/a os.listxattr, follow_symlinks=False)
2750n/a
2751n/a def test_fds(self):
2752n/a def getxattr(path, *args):
2753n/a with open(path, "rb") as fp:
2754n/a return os.getxattr(fp.fileno(), *args)
2755n/a def setxattr(path, *args):
2756n/a with open(path, "wb", 0) as fp:
2757n/a os.setxattr(fp.fileno(), *args)
2758n/a def removexattr(path, *args):
2759n/a with open(path, "wb", 0) as fp:
2760n/a os.removexattr(fp.fileno(), *args)
2761n/a def listxattr(path, *args):
2762n/a with open(path, "rb") as fp:
2763n/a return os.listxattr(fp.fileno(), *args)
2764n/a self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2765n/a
2766n/a
2767n/a@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2768n/aclass TermsizeTests(unittest.TestCase):
2769n/a def test_does_not_crash(self):
2770n/a """Check if get_terminal_size() returns a meaningful value.
2771n/a
2772n/a There's no easy portable way to actually check the size of the
2773n/a terminal, so let's check if it returns something sensible instead.
2774n/a """
2775n/a try:
2776n/a size = os.get_terminal_size()
2777n/a except OSError as e:
2778n/a if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2779n/a # Under win32 a generic OSError can be thrown if the
2780n/a # handle cannot be retrieved
2781n/a self.skipTest("failed to query terminal size")
2782n/a raise
2783n/a
2784n/a self.assertGreaterEqual(size.columns, 0)
2785n/a self.assertGreaterEqual(size.lines, 0)
2786n/a
2787n/a def test_stty_match(self):
2788n/a """Check if stty returns the same results
2789n/a
2790n/a stty actually tests stdin, so get_terminal_size is invoked on
2791n/a stdin explicitly. If stty succeeded, then get_terminal_size()
2792n/a should work too.
2793n/a """
2794n/a try:
2795n/a size = subprocess.check_output(['stty', 'size']).decode().split()
2796n/a except (FileNotFoundError, subprocess.CalledProcessError):
2797n/a self.skipTest("stty invocation failed")
2798n/a expected = (int(size[1]), int(size[0])) # reversed order
2799n/a
2800n/a try:
2801n/a actual = os.get_terminal_size(sys.__stdin__.fileno())
2802n/a except OSError as e:
2803n/a if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2804n/a # Under win32 a generic OSError can be thrown if the
2805n/a # handle cannot be retrieved
2806n/a self.skipTest("failed to query terminal size")
2807n/a raise
2808n/a self.assertEqual(expected, actual)
2809n/a
2810n/a
2811n/aclass OSErrorTests(unittest.TestCase):
2812n/a def setUp(self):
2813n/a class Str(str):
2814n/a pass
2815n/a
2816n/a self.bytes_filenames = []
2817n/a self.unicode_filenames = []
2818n/a if support.TESTFN_UNENCODABLE is not None:
2819n/a decoded = support.TESTFN_UNENCODABLE
2820n/a else:
2821n/a decoded = support.TESTFN
2822n/a self.unicode_filenames.append(decoded)
2823n/a self.unicode_filenames.append(Str(decoded))
2824n/a if support.TESTFN_UNDECODABLE is not None:
2825n/a encoded = support.TESTFN_UNDECODABLE
2826n/a else:
2827n/a encoded = os.fsencode(support.TESTFN)
2828n/a self.bytes_filenames.append(encoded)
2829n/a self.bytes_filenames.append(bytearray(encoded))
2830n/a self.bytes_filenames.append(memoryview(encoded))
2831n/a
2832n/a self.filenames = self.bytes_filenames + self.unicode_filenames
2833n/a
2834n/a def test_oserror_filename(self):
2835n/a funcs = [
2836n/a (self.filenames, os.chdir,),
2837n/a (self.filenames, os.chmod, 0o777),
2838n/a (self.filenames, os.lstat,),
2839n/a (self.filenames, os.open, os.O_RDONLY),
2840n/a (self.filenames, os.rmdir,),
2841n/a (self.filenames, os.stat,),
2842n/a (self.filenames, os.unlink,),
2843n/a ]
2844n/a if sys.platform == "win32":
2845n/a funcs.extend((
2846n/a (self.bytes_filenames, os.rename, b"dst"),
2847n/a (self.bytes_filenames, os.replace, b"dst"),
2848n/a (self.unicode_filenames, os.rename, "dst"),
2849n/a (self.unicode_filenames, os.replace, "dst"),
2850n/a (self.unicode_filenames, os.listdir, ),
2851n/a ))
2852n/a else:
2853n/a funcs.extend((
2854n/a (self.filenames, os.listdir,),
2855n/a (self.filenames, os.rename, "dst"),
2856n/a (self.filenames, os.replace, "dst"),
2857n/a ))
2858n/a if hasattr(os, "chown"):
2859n/a funcs.append((self.filenames, os.chown, 0, 0))
2860n/a if hasattr(os, "lchown"):
2861n/a funcs.append((self.filenames, os.lchown, 0, 0))
2862n/a if hasattr(os, "truncate"):
2863n/a funcs.append((self.filenames, os.truncate, 0))
2864n/a if hasattr(os, "chflags"):
2865n/a funcs.append((self.filenames, os.chflags, 0))
2866n/a if hasattr(os, "lchflags"):
2867n/a funcs.append((self.filenames, os.lchflags, 0))
2868n/a if hasattr(os, "chroot"):
2869n/a funcs.append((self.filenames, os.chroot,))
2870n/a if hasattr(os, "link"):
2871n/a if sys.platform == "win32":
2872n/a funcs.append((self.bytes_filenames, os.link, b"dst"))
2873n/a funcs.append((self.unicode_filenames, os.link, "dst"))
2874n/a else:
2875n/a funcs.append((self.filenames, os.link, "dst"))
2876n/a if hasattr(os, "listxattr"):
2877n/a funcs.extend((
2878n/a (self.filenames, os.listxattr,),
2879n/a (self.filenames, os.getxattr, "user.test"),
2880n/a (self.filenames, os.setxattr, "user.test", b'user'),
2881n/a (self.filenames, os.removexattr, "user.test"),
2882n/a ))
2883n/a if hasattr(os, "lchmod"):
2884n/a funcs.append((self.filenames, os.lchmod, 0o777))
2885n/a if hasattr(os, "readlink"):
2886n/a if sys.platform == "win32":
2887n/a funcs.append((self.unicode_filenames, os.readlink,))
2888n/a else:
2889n/a funcs.append((self.filenames, os.readlink,))
2890n/a
2891n/a
2892n/a for filenames, func, *func_args in funcs:
2893n/a for name in filenames:
2894n/a try:
2895n/a if isinstance(name, (str, bytes)):
2896n/a func(name, *func_args)
2897n/a else:
2898n/a with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2899n/a func(name, *func_args)
2900n/a except OSError as err:
2901n/a self.assertIs(err.filename, name, str(func))
2902n/a except UnicodeDecodeError:
2903n/a pass
2904n/a else:
2905n/a self.fail("No exception thrown by {}".format(func))
2906n/a
2907n/aclass CPUCountTests(unittest.TestCase):
2908n/a def test_cpu_count(self):
2909n/a cpus = os.cpu_count()
2910n/a if cpus is not None:
2911n/a self.assertIsInstance(cpus, int)
2912n/a self.assertGreater(cpus, 0)
2913n/a else:
2914n/a self.skipTest("Could not determine the number of CPUs")
2915n/a
2916n/a
2917n/aclass FDInheritanceTests(unittest.TestCase):
2918n/a def test_get_set_inheritable(self):
2919n/a fd = os.open(__file__, os.O_RDONLY)
2920n/a self.addCleanup(os.close, fd)
2921n/a self.assertEqual(os.get_inheritable(fd), False)
2922n/a
2923n/a os.set_inheritable(fd, True)
2924n/a self.assertEqual(os.get_inheritable(fd), True)
2925n/a
2926n/a @unittest.skipIf(fcntl is None, "need fcntl")
2927n/a def test_get_inheritable_cloexec(self):
2928n/a fd = os.open(__file__, os.O_RDONLY)
2929n/a self.addCleanup(os.close, fd)
2930n/a self.assertEqual(os.get_inheritable(fd), False)
2931n/a
2932n/a # clear FD_CLOEXEC flag
2933n/a flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2934n/a flags &= ~fcntl.FD_CLOEXEC
2935n/a fcntl.fcntl(fd, fcntl.F_SETFD, flags)
2936n/a
2937n/a self.assertEqual(os.get_inheritable(fd), True)
2938n/a
2939n/a @unittest.skipIf(fcntl is None, "need fcntl")
2940n/a def test_set_inheritable_cloexec(self):
2941n/a fd = os.open(__file__, os.O_RDONLY)
2942n/a self.addCleanup(os.close, fd)
2943n/a self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2944n/a fcntl.FD_CLOEXEC)
2945n/a
2946n/a os.set_inheritable(fd, True)
2947n/a self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2948n/a 0)
2949n/a
2950n/a def test_open(self):
2951n/a fd = os.open(__file__, os.O_RDONLY)
2952n/a self.addCleanup(os.close, fd)
2953n/a self.assertEqual(os.get_inheritable(fd), False)
2954n/a
2955n/a @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2956n/a def test_pipe(self):
2957n/a rfd, wfd = os.pipe()
2958n/a self.addCleanup(os.close, rfd)
2959n/a self.addCleanup(os.close, wfd)
2960n/a self.assertEqual(os.get_inheritable(rfd), False)
2961n/a self.assertEqual(os.get_inheritable(wfd), False)
2962n/a
2963n/a def test_dup(self):
2964n/a fd1 = os.open(__file__, os.O_RDONLY)
2965n/a self.addCleanup(os.close, fd1)
2966n/a
2967n/a fd2 = os.dup(fd1)
2968n/a self.addCleanup(os.close, fd2)
2969n/a self.assertEqual(os.get_inheritable(fd2), False)
2970n/a
2971n/a @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2972n/a def test_dup2(self):
2973n/a fd = os.open(__file__, os.O_RDONLY)
2974n/a self.addCleanup(os.close, fd)
2975n/a
2976n/a # inheritable by default
2977n/a fd2 = os.open(__file__, os.O_RDONLY)
2978n/a try:
2979n/a os.dup2(fd, fd2)
2980n/a self.assertEqual(os.get_inheritable(fd2), True)
2981n/a finally:
2982n/a os.close(fd2)
2983n/a
2984n/a # force non-inheritable
2985n/a fd3 = os.open(__file__, os.O_RDONLY)
2986n/a try:
2987n/a os.dup2(fd, fd3, inheritable=False)
2988n/a self.assertEqual(os.get_inheritable(fd3), False)
2989n/a finally:
2990n/a os.close(fd3)
2991n/a
2992n/a @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2993n/a def test_openpty(self):
2994n/a master_fd, slave_fd = os.openpty()
2995n/a self.addCleanup(os.close, master_fd)
2996n/a self.addCleanup(os.close, slave_fd)
2997n/a self.assertEqual(os.get_inheritable(master_fd), False)
2998n/a self.assertEqual(os.get_inheritable(slave_fd), False)
2999n/a
3000n/a
3001n/aclass PathTConverterTests(unittest.TestCase):
3002n/a # tuples of (function name, allows fd arguments, additional arguments to
3003n/a # function, cleanup function)
3004n/a functions = [
3005n/a ('stat', True, (), None),
3006n/a ('lstat', False, (), None),
3007n/a ('access', False, (os.F_OK,), None),
3008n/a ('chflags', False, (0,), None),
3009n/a ('lchflags', False, (0,), None),
3010n/a ('open', False, (0,), getattr(os, 'close', None)),
3011n/a ]
3012n/a
3013n/a def test_path_t_converter(self):
3014n/a str_filename = support.TESTFN
3015n/a if os.name == 'nt':
3016n/a bytes_fspath = bytes_filename = None
3017n/a else:
3018n/a bytes_filename = support.TESTFN.encode('ascii')
3019n/a bytes_fspath = _PathLike(bytes_filename)
3020n/a fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
3021n/a self.addCleanup(support.unlink, support.TESTFN)
3022n/a self.addCleanup(os.close, fd)
3023n/a
3024n/a int_fspath = _PathLike(fd)
3025n/a str_fspath = _PathLike(str_filename)
3026n/a
3027n/a for name, allow_fd, extra_args, cleanup_fn in self.functions:
3028n/a with self.subTest(name=name):
3029n/a try:
3030n/a fn = getattr(os, name)
3031n/a except AttributeError:
3032n/a continue
3033n/a
3034n/a for path in (str_filename, bytes_filename, str_fspath,
3035n/a bytes_fspath):
3036n/a if path is None:
3037n/a continue
3038n/a with self.subTest(name=name, path=path):
3039n/a result = fn(path, *extra_args)
3040n/a if cleanup_fn is not None:
3041n/a cleanup_fn(result)
3042n/a
3043n/a with self.assertRaisesRegex(
3044n/a TypeError, 'should be string, bytes'):
3045n/a fn(int_fspath, *extra_args)
3046n/a
3047n/a if allow_fd:
3048n/a result = fn(fd, *extra_args) # should not fail
3049n/a if cleanup_fn is not None:
3050n/a cleanup_fn(result)
3051n/a else:
3052n/a with self.assertRaisesRegex(
3053n/a TypeError,
3054n/a 'os.PathLike'):
3055n/a fn(fd, *extra_args)
3056n/a
3057n/a
3058n/a@unittest.skipUnless(hasattr(os, 'get_blocking'),
3059n/a 'needs os.get_blocking() and os.set_blocking()')
3060n/aclass BlockingTests(unittest.TestCase):
3061n/a def test_blocking(self):
3062n/a fd = os.open(__file__, os.O_RDONLY)
3063n/a self.addCleanup(os.close, fd)
3064n/a self.assertEqual(os.get_blocking(fd), True)
3065n/a
3066n/a os.set_blocking(fd, False)
3067n/a self.assertEqual(os.get_blocking(fd), False)
3068n/a
3069n/a os.set_blocking(fd, True)
3070n/a self.assertEqual(os.get_blocking(fd), True)
3071n/a
3072n/a
3073n/a
3074n/aclass ExportsTests(unittest.TestCase):
3075n/a def test_os_all(self):
3076n/a self.assertIn('open', os.__all__)
3077n/a self.assertIn('walk', os.__all__)
3078n/a
3079n/a
3080n/aclass TestScandir(unittest.TestCase):
3081n/a check_no_resource_warning = support.check_no_resource_warning
3082n/a
3083n/a def setUp(self):
3084n/a self.path = os.path.realpath(support.TESTFN)
3085n/a self.bytes_path = os.fsencode(self.path)
3086n/a self.addCleanup(support.rmtree, self.path)
3087n/a os.mkdir(self.path)
3088n/a
3089n/a def create_file(self, name="file.txt"):
3090n/a path = self.bytes_path if isinstance(name, bytes) else self.path
3091n/a filename = os.path.join(path, name)
3092n/a create_file(filename, b'python')
3093n/a return filename
3094n/a
3095n/a def get_entries(self, names):
3096n/a entries = dict((entry.name, entry)
3097n/a for entry in os.scandir(self.path))
3098n/a self.assertEqual(sorted(entries.keys()), names)
3099n/a return entries
3100n/a
3101n/a def assert_stat_equal(self, stat1, stat2, skip_fields):
3102n/a if skip_fields:
3103n/a for attr in dir(stat1):
3104n/a if not attr.startswith("st_"):
3105n/a continue
3106n/a if attr in ("st_dev", "st_ino", "st_nlink"):
3107n/a continue
3108n/a self.assertEqual(getattr(stat1, attr),
3109n/a getattr(stat2, attr),
3110n/a (stat1, stat2, attr))
3111n/a else:
3112n/a self.assertEqual(stat1, stat2)
3113n/a
3114n/a def check_entry(self, entry, name, is_dir, is_file, is_symlink):
3115n/a self.assertIsInstance(entry, os.DirEntry)
3116n/a self.assertEqual(entry.name, name)
3117n/a self.assertEqual(entry.path, os.path.join(self.path, name))
3118n/a self.assertEqual(entry.inode(),
3119n/a os.stat(entry.path, follow_symlinks=False).st_ino)
3120n/a
3121n/a entry_stat = os.stat(entry.path)
3122n/a self.assertEqual(entry.is_dir(),
3123n/a stat.S_ISDIR(entry_stat.st_mode))
3124n/a self.assertEqual(entry.is_file(),
3125n/a stat.S_ISREG(entry_stat.st_mode))
3126n/a self.assertEqual(entry.is_symlink(),
3127n/a os.path.islink(entry.path))
3128n/a
3129n/a entry_lstat = os.stat(entry.path, follow_symlinks=False)
3130n/a self.assertEqual(entry.is_dir(follow_symlinks=False),
3131n/a stat.S_ISDIR(entry_lstat.st_mode))
3132n/a self.assertEqual(entry.is_file(follow_symlinks=False),
3133n/a stat.S_ISREG(entry_lstat.st_mode))
3134n/a
3135n/a self.assert_stat_equal(entry.stat(),
3136n/a entry_stat,
3137n/a os.name == 'nt' and not is_symlink)
3138n/a self.assert_stat_equal(entry.stat(follow_symlinks=False),
3139n/a entry_lstat,
3140n/a os.name == 'nt')
3141n/a
3142n/a def test_attributes(self):
3143n/a link = hasattr(os, 'link')
3144n/a symlink = support.can_symlink()
3145n/a
3146n/a dirname = os.path.join(self.path, "dir")
3147n/a os.mkdir(dirname)
3148n/a filename = self.create_file("file.txt")
3149n/a if link:
3150n/a os.link(filename, os.path.join(self.path, "link_file.txt"))
3151n/a if symlink:
3152n/a os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3153n/a target_is_directory=True)
3154n/a os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3155n/a
3156n/a names = ['dir', 'file.txt']
3157n/a if link:
3158n/a names.append('link_file.txt')
3159n/a if symlink:
3160n/a names.extend(('symlink_dir', 'symlink_file.txt'))
3161n/a entries = self.get_entries(names)
3162n/a
3163n/a entry = entries['dir']
3164n/a self.check_entry(entry, 'dir', True, False, False)
3165n/a
3166n/a entry = entries['file.txt']
3167n/a self.check_entry(entry, 'file.txt', False, True, False)
3168n/a
3169n/a if link:
3170n/a entry = entries['link_file.txt']
3171n/a self.check_entry(entry, 'link_file.txt', False, True, False)
3172n/a
3173n/a if symlink:
3174n/a entry = entries['symlink_dir']
3175n/a self.check_entry(entry, 'symlink_dir', True, False, True)
3176n/a
3177n/a entry = entries['symlink_file.txt']
3178n/a self.check_entry(entry, 'symlink_file.txt', False, True, True)
3179n/a
3180n/a def get_entry(self, name):
3181n/a path = self.bytes_path if isinstance(name, bytes) else self.path
3182n/a entries = list(os.scandir(path))
3183n/a self.assertEqual(len(entries), 1)
3184n/a
3185n/a entry = entries[0]
3186n/a self.assertEqual(entry.name, name)
3187n/a return entry
3188n/a
3189n/a def create_file_entry(self, name='file.txt'):
3190n/a filename = self.create_file(name=name)
3191n/a return self.get_entry(os.path.basename(filename))
3192n/a
3193n/a def test_current_directory(self):
3194n/a filename = self.create_file()
3195n/a old_dir = os.getcwd()
3196n/a try:
3197n/a os.chdir(self.path)
3198n/a
3199n/a # call scandir() without parameter: it must list the content
3200n/a # of the current directory
3201n/a entries = dict((entry.name, entry) for entry in os.scandir())
3202n/a self.assertEqual(sorted(entries.keys()),
3203n/a [os.path.basename(filename)])
3204n/a finally:
3205n/a os.chdir(old_dir)
3206n/a
3207n/a def test_repr(self):
3208n/a entry = self.create_file_entry()
3209n/a self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3210n/a
3211n/a def test_fspath_protocol(self):
3212n/a entry = self.create_file_entry()
3213n/a self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3214n/a
3215n/a def test_fspath_protocol_bytes(self):
3216n/a bytes_filename = os.fsencode('bytesfile.txt')
3217n/a bytes_entry = self.create_file_entry(name=bytes_filename)
3218n/a fspath = os.fspath(bytes_entry)
3219n/a self.assertIsInstance(fspath, bytes)
3220n/a self.assertEqual(fspath,
3221n/a os.path.join(os.fsencode(self.path),bytes_filename))
3222n/a
3223n/a def test_removed_dir(self):
3224n/a path = os.path.join(self.path, 'dir')
3225n/a
3226n/a os.mkdir(path)
3227n/a entry = self.get_entry('dir')
3228n/a os.rmdir(path)
3229n/a
3230n/a # On POSIX, is_dir() result depends if scandir() filled d_type or not
3231n/a if os.name == 'nt':
3232n/a self.assertTrue(entry.is_dir())
3233n/a self.assertFalse(entry.is_file())
3234n/a self.assertFalse(entry.is_symlink())
3235n/a if os.name == 'nt':
3236n/a self.assertRaises(FileNotFoundError, entry.inode)
3237n/a # don't fail
3238n/a entry.stat()
3239n/a entry.stat(follow_symlinks=False)
3240n/a else:
3241n/a self.assertGreater(entry.inode(), 0)
3242n/a self.assertRaises(FileNotFoundError, entry.stat)
3243n/a self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3244n/a
3245n/a def test_removed_file(self):
3246n/a entry = self.create_file_entry()
3247n/a os.unlink(entry.path)
3248n/a
3249n/a self.assertFalse(entry.is_dir())
3250n/a # On POSIX, is_dir() result depends if scandir() filled d_type or not
3251n/a if os.name == 'nt':
3252n/a self.assertTrue(entry.is_file())
3253n/a self.assertFalse(entry.is_symlink())
3254n/a if os.name == 'nt':
3255n/a self.assertRaises(FileNotFoundError, entry.inode)
3256n/a # don't fail
3257n/a entry.stat()
3258n/a entry.stat(follow_symlinks=False)
3259n/a else:
3260n/a self.assertGreater(entry.inode(), 0)
3261n/a self.assertRaises(FileNotFoundError, entry.stat)
3262n/a self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3263n/a
3264n/a def test_broken_symlink(self):
3265n/a if not support.can_symlink():
3266n/a return self.skipTest('cannot create symbolic link')
3267n/a
3268n/a filename = self.create_file("file.txt")
3269n/a os.symlink(filename,
3270n/a os.path.join(self.path, "symlink.txt"))
3271n/a entries = self.get_entries(['file.txt', 'symlink.txt'])
3272n/a entry = entries['symlink.txt']
3273n/a os.unlink(filename)
3274n/a
3275n/a self.assertGreater(entry.inode(), 0)
3276n/a self.assertFalse(entry.is_dir())
3277n/a self.assertFalse(entry.is_file()) # broken symlink returns False
3278n/a self.assertFalse(entry.is_dir(follow_symlinks=False))
3279n/a self.assertFalse(entry.is_file(follow_symlinks=False))
3280n/a self.assertTrue(entry.is_symlink())
3281n/a self.assertRaises(FileNotFoundError, entry.stat)
3282n/a # don't fail
3283n/a entry.stat(follow_symlinks=False)
3284n/a
3285n/a def test_bytes(self):
3286n/a self.create_file("file.txt")
3287n/a
3288n/a path_bytes = os.fsencode(self.path)
3289n/a entries = list(os.scandir(path_bytes))
3290n/a self.assertEqual(len(entries), 1, entries)
3291n/a entry = entries[0]
3292n/a
3293n/a self.assertEqual(entry.name, b'file.txt')
3294n/a self.assertEqual(entry.path,
3295n/a os.fsencode(os.path.join(self.path, 'file.txt')))
3296n/a
3297n/a def test_empty_path(self):
3298n/a self.assertRaises(FileNotFoundError, os.scandir, '')
3299n/a
3300n/a def test_consume_iterator_twice(self):
3301n/a self.create_file("file.txt")
3302n/a iterator = os.scandir(self.path)
3303n/a
3304n/a entries = list(iterator)
3305n/a self.assertEqual(len(entries), 1, entries)
3306n/a
3307n/a # check than consuming the iterator twice doesn't raise exception
3308n/a entries2 = list(iterator)
3309n/a self.assertEqual(len(entries2), 0, entries2)
3310n/a
3311n/a def test_bad_path_type(self):
3312n/a for obj in [1234, 1.234, {}, []]:
3313n/a self.assertRaises(TypeError, os.scandir, obj)
3314n/a
3315n/a def test_close(self):
3316n/a self.create_file("file.txt")
3317n/a self.create_file("file2.txt")
3318n/a iterator = os.scandir(self.path)
3319n/a next(iterator)
3320n/a iterator.close()
3321n/a # multiple closes
3322n/a iterator.close()
3323n/a with self.check_no_resource_warning():
3324n/a del iterator
3325n/a
3326n/a def test_context_manager(self):
3327n/a self.create_file("file.txt")
3328n/a self.create_file("file2.txt")
3329n/a with os.scandir(self.path) as iterator:
3330n/a next(iterator)
3331n/a with self.check_no_resource_warning():
3332n/a del iterator
3333n/a
3334n/a def test_context_manager_close(self):
3335n/a self.create_file("file.txt")
3336n/a self.create_file("file2.txt")
3337n/a with os.scandir(self.path) as iterator:
3338n/a next(iterator)
3339n/a iterator.close()
3340n/a
3341n/a def test_context_manager_exception(self):
3342n/a self.create_file("file.txt")
3343n/a self.create_file("file2.txt")
3344n/a with self.assertRaises(ZeroDivisionError):
3345n/a with os.scandir(self.path) as iterator:
3346n/a next(iterator)
3347n/a 1/0
3348n/a with self.check_no_resource_warning():
3349n/a del iterator
3350n/a
3351n/a def test_resource_warning(self):
3352n/a self.create_file("file.txt")
3353n/a self.create_file("file2.txt")
3354n/a iterator = os.scandir(self.path)
3355n/a next(iterator)
3356n/a with self.assertWarns(ResourceWarning):
3357n/a del iterator
3358n/a support.gc_collect()
3359n/a # exhausted iterator
3360n/a iterator = os.scandir(self.path)
3361n/a list(iterator)
3362n/a with self.check_no_resource_warning():
3363n/a del iterator
3364n/a
3365n/a
3366n/aclass TestPEP519(unittest.TestCase):
3367n/a
3368n/a # Abstracted so it can be overridden to test pure Python implementation
3369n/a # if a C version is provided.
3370n/a fspath = staticmethod(os.fspath)
3371n/a
3372n/a def test_return_bytes(self):
3373n/a for b in b'hello', b'goodbye', b'some/path/and/file':
3374n/a self.assertEqual(b, self.fspath(b))
3375n/a
3376n/a def test_return_string(self):
3377n/a for s in 'hello', 'goodbye', 'some/path/and/file':
3378n/a self.assertEqual(s, self.fspath(s))
3379n/a
3380n/a def test_fsencode_fsdecode(self):
3381n/a for p in "path/like/object", b"path/like/object":
3382n/a pathlike = _PathLike(p)
3383n/a
3384n/a self.assertEqual(p, self.fspath(pathlike))
3385n/a self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3386n/a self.assertEqual("path/like/object", os.fsdecode(pathlike))
3387n/a
3388n/a def test_pathlike(self):
3389n/a self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3390n/a self.assertTrue(issubclass(_PathLike, os.PathLike))
3391n/a self.assertTrue(isinstance(_PathLike(), os.PathLike))
3392n/a
3393n/a def test_garbage_in_exception_out(self):
3394n/a vapor = type('blah', (), {})
3395n/a for o in int, type, os, vapor():
3396n/a self.assertRaises(TypeError, self.fspath, o)
3397n/a
3398n/a def test_argument_required(self):
3399n/a self.assertRaises(TypeError, self.fspath)
3400n/a
3401n/a def test_bad_pathlike(self):
3402n/a # __fspath__ returns a value other than str or bytes.
3403n/a self.assertRaises(TypeError, self.fspath, _PathLike(42))
3404n/a # __fspath__ attribute that is not callable.
3405n/a c = type('foo', (), {})
3406n/a c.__fspath__ = 1
3407n/a self.assertRaises(TypeError, self.fspath, c())
3408n/a # __fspath__ raises an exception.
3409n/a self.assertRaises(ZeroDivisionError, self.fspath,
3410n/a _PathLike(ZeroDivisionError()))
3411n/a
3412n/a# Only test if the C version is provided, otherwise TestPEP519 already tested
3413n/a# the pure Python implementation.
3414n/aif hasattr(os, "_fspath"):
3415n/a class TestPEP519PurePython(TestPEP519):
3416n/a
3417n/a """Explicitly test the pure Python implementation of os.fspath()."""
3418n/a
3419n/a fspath = staticmethod(os._fspath)
3420n/a
3421n/a
3422n/aif __name__ == "__main__":
3423n/a unittest.main()