ยปCore Development>Code coverage>Lib/test/test_subprocess.py

Python code coverage for Lib/test/test_subprocess.py

#countcontent
1n/aimport unittest
2n/afrom unittest import mock
3n/afrom test import support
4n/aimport subprocess
5n/aimport sys
6n/aimport platform
7n/aimport signal
8n/aimport io
9n/aimport os
10n/aimport errno
11n/aimport tempfile
12n/aimport time
13n/aimport selectors
14n/aimport sysconfig
15n/aimport select
16n/aimport shutil
17n/aimport gc
18n/aimport textwrap
19n/a
20n/atry:
21n/a import ctypes
22n/aexcept ImportError:
23n/a ctypes = None
24n/a
25n/atry:
26n/a import threading
27n/aexcept ImportError:
28n/a threading = None
29n/a
30n/aif support.PGO:
31n/a raise unittest.SkipTest("test is not helpful for PGO")
32n/a
33n/amswindows = (sys.platform == "win32")
34n/a
35n/a#
36n/a# Depends on the following external programs: Python
37n/a#
38n/a
39n/aif mswindows:
40n/a SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
41n/a 'os.O_BINARY);')
42n/aelse:
43n/a SETBINARY = ''
44n/a
45n/a
46n/aclass BaseTestCase(unittest.TestCase):
47n/a def setUp(self):
48n/a # Try to minimize the number of children we have so this test
49n/a # doesn't crash on some buildbots (Alphas in particular).
50n/a support.reap_children()
51n/a
52n/a def tearDown(self):
53n/a for inst in subprocess._active:
54n/a inst.wait()
55n/a subprocess._cleanup()
56n/a self.assertFalse(subprocess._active, "subprocess._active not empty")
57n/a
58n/a def assertStderrEqual(self, stderr, expected, msg=None):
59n/a # In a debug build, stuff like "[6580 refs]" is printed to stderr at
60n/a # shutdown time. That frustrates tests trying to check stderr produced
61n/a # from a spawned Python process.
62n/a actual = support.strip_python_stderr(stderr)
63n/a # strip_python_stderr also strips whitespace, so we do too.
64n/a expected = expected.strip()
65n/a self.assertEqual(actual, expected, msg)
66n/a
67n/a
68n/aclass PopenTestException(Exception):
69n/a pass
70n/a
71n/a
72n/aclass PopenExecuteChildRaises(subprocess.Popen):
73n/a """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
74n/a _execute_child fails.
75n/a """
76n/a def _execute_child(self, *args, **kwargs):
77n/a raise PopenTestException("Forced Exception for Test")
78n/a
79n/a
80n/aclass ProcessTestCase(BaseTestCase):
81n/a
82n/a def test_io_buffered_by_default(self):
83n/a p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
84n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
85n/a stderr=subprocess.PIPE)
86n/a try:
87n/a self.assertIsInstance(p.stdin, io.BufferedIOBase)
88n/a self.assertIsInstance(p.stdout, io.BufferedIOBase)
89n/a self.assertIsInstance(p.stderr, io.BufferedIOBase)
90n/a finally:
91n/a p.stdin.close()
92n/a p.stdout.close()
93n/a p.stderr.close()
94n/a p.wait()
95n/a
96n/a def test_io_unbuffered_works(self):
97n/a p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
98n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
99n/a stderr=subprocess.PIPE, bufsize=0)
100n/a try:
101n/a self.assertIsInstance(p.stdin, io.RawIOBase)
102n/a self.assertIsInstance(p.stdout, io.RawIOBase)
103n/a self.assertIsInstance(p.stderr, io.RawIOBase)
104n/a finally:
105n/a p.stdin.close()
106n/a p.stdout.close()
107n/a p.stderr.close()
108n/a p.wait()
109n/a
110n/a def test_call_seq(self):
111n/a # call() function with sequence argument
112n/a rc = subprocess.call([sys.executable, "-c",
113n/a "import sys; sys.exit(47)"])
114n/a self.assertEqual(rc, 47)
115n/a
116n/a def test_call_timeout(self):
117n/a # call() function with timeout argument; we want to test that the child
118n/a # process gets killed when the timeout expires. If the child isn't
119n/a # killed, this call will deadlock since subprocess.call waits for the
120n/a # child.
121n/a self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
122n/a [sys.executable, "-c", "while True: pass"],
123n/a timeout=0.1)
124n/a
125n/a def test_check_call_zero(self):
126n/a # check_call() function with zero return code
127n/a rc = subprocess.check_call([sys.executable, "-c",
128n/a "import sys; sys.exit(0)"])
129n/a self.assertEqual(rc, 0)
130n/a
131n/a def test_check_call_nonzero(self):
132n/a # check_call() function with non-zero return code
133n/a with self.assertRaises(subprocess.CalledProcessError) as c:
134n/a subprocess.check_call([sys.executable, "-c",
135n/a "import sys; sys.exit(47)"])
136n/a self.assertEqual(c.exception.returncode, 47)
137n/a
138n/a def test_check_output(self):
139n/a # check_output() function with zero return code
140n/a output = subprocess.check_output(
141n/a [sys.executable, "-c", "print('BDFL')"])
142n/a self.assertIn(b'BDFL', output)
143n/a
144n/a def test_check_output_nonzero(self):
145n/a # check_call() function with non-zero return code
146n/a with self.assertRaises(subprocess.CalledProcessError) as c:
147n/a subprocess.check_output(
148n/a [sys.executable, "-c", "import sys; sys.exit(5)"])
149n/a self.assertEqual(c.exception.returncode, 5)
150n/a
151n/a def test_check_output_stderr(self):
152n/a # check_output() function stderr redirected to stdout
153n/a output = subprocess.check_output(
154n/a [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
155n/a stderr=subprocess.STDOUT)
156n/a self.assertIn(b'BDFL', output)
157n/a
158n/a def test_check_output_stdin_arg(self):
159n/a # check_output() can be called with stdin set to a file
160n/a tf = tempfile.TemporaryFile()
161n/a self.addCleanup(tf.close)
162n/a tf.write(b'pear')
163n/a tf.seek(0)
164n/a output = subprocess.check_output(
165n/a [sys.executable, "-c",
166n/a "import sys; sys.stdout.write(sys.stdin.read().upper())"],
167n/a stdin=tf)
168n/a self.assertIn(b'PEAR', output)
169n/a
170n/a def test_check_output_input_arg(self):
171n/a # check_output() can be called with input set to a string
172n/a output = subprocess.check_output(
173n/a [sys.executable, "-c",
174n/a "import sys; sys.stdout.write(sys.stdin.read().upper())"],
175n/a input=b'pear')
176n/a self.assertIn(b'PEAR', output)
177n/a
178n/a def test_check_output_stdout_arg(self):
179n/a # check_output() refuses to accept 'stdout' argument
180n/a with self.assertRaises(ValueError) as c:
181n/a output = subprocess.check_output(
182n/a [sys.executable, "-c", "print('will not be run')"],
183n/a stdout=sys.stdout)
184n/a self.fail("Expected ValueError when stdout arg supplied.")
185n/a self.assertIn('stdout', c.exception.args[0])
186n/a
187n/a def test_check_output_stdin_with_input_arg(self):
188n/a # check_output() refuses to accept 'stdin' with 'input'
189n/a tf = tempfile.TemporaryFile()
190n/a self.addCleanup(tf.close)
191n/a tf.write(b'pear')
192n/a tf.seek(0)
193n/a with self.assertRaises(ValueError) as c:
194n/a output = subprocess.check_output(
195n/a [sys.executable, "-c", "print('will not be run')"],
196n/a stdin=tf, input=b'hare')
197n/a self.fail("Expected ValueError when stdin and input args supplied.")
198n/a self.assertIn('stdin', c.exception.args[0])
199n/a self.assertIn('input', c.exception.args[0])
200n/a
201n/a def test_check_output_timeout(self):
202n/a # check_output() function with timeout arg
203n/a with self.assertRaises(subprocess.TimeoutExpired) as c:
204n/a output = subprocess.check_output(
205n/a [sys.executable, "-c",
206n/a "import sys, time\n"
207n/a "sys.stdout.write('BDFL')\n"
208n/a "sys.stdout.flush()\n"
209n/a "time.sleep(3600)"],
210n/a # Some heavily loaded buildbots (sparc Debian 3.x) require
211n/a # this much time to start and print.
212n/a timeout=3)
213n/a self.fail("Expected TimeoutExpired.")
214n/a self.assertEqual(c.exception.output, b'BDFL')
215n/a
216n/a def test_call_kwargs(self):
217n/a # call() function with keyword args
218n/a newenv = os.environ.copy()
219n/a newenv["FRUIT"] = "banana"
220n/a rc = subprocess.call([sys.executable, "-c",
221n/a 'import sys, os;'
222n/a 'sys.exit(os.getenv("FRUIT")=="banana")'],
223n/a env=newenv)
224n/a self.assertEqual(rc, 1)
225n/a
226n/a def test_invalid_args(self):
227n/a # Popen() called with invalid arguments should raise TypeError
228n/a # but Popen.__del__ should not complain (issue #12085)
229n/a with support.captured_stderr() as s:
230n/a self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
231n/a argcount = subprocess.Popen.__init__.__code__.co_argcount
232n/a too_many_args = [0] * (argcount + 1)
233n/a self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
234n/a self.assertEqual(s.getvalue(), '')
235n/a
236n/a def test_stdin_none(self):
237n/a # .stdin is None when not redirected
238n/a p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
239n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
240n/a self.addCleanup(p.stdout.close)
241n/a self.addCleanup(p.stderr.close)
242n/a p.wait()
243n/a self.assertEqual(p.stdin, None)
244n/a
245n/a def test_stdout_none(self):
246n/a # .stdout is None when not redirected, and the child's stdout will
247n/a # be inherited from the parent. In order to test this we run a
248n/a # subprocess in a subprocess:
249n/a # this_test
250n/a # \-- subprocess created by this test (parent)
251n/a # \-- subprocess created by the parent subprocess (child)
252n/a # The parent doesn't specify stdout, so the child will use the
253n/a # parent's stdout. This test checks that the message printed by the
254n/a # child goes to the parent stdout. The parent also checks that the
255n/a # child's stdout is None. See #11963.
256n/a code = ('import sys; from subprocess import Popen, PIPE;'
257n/a 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
258n/a ' stdin=PIPE, stderr=PIPE);'
259n/a 'p.wait(); assert p.stdout is None;')
260n/a p = subprocess.Popen([sys.executable, "-c", code],
261n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
262n/a self.addCleanup(p.stdout.close)
263n/a self.addCleanup(p.stderr.close)
264n/a out, err = p.communicate()
265n/a self.assertEqual(p.returncode, 0, err)
266n/a self.assertEqual(out.rstrip(), b'test_stdout_none')
267n/a
268n/a def test_stderr_none(self):
269n/a # .stderr is None when not redirected
270n/a p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
271n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE)
272n/a self.addCleanup(p.stdout.close)
273n/a self.addCleanup(p.stdin.close)
274n/a p.wait()
275n/a self.assertEqual(p.stderr, None)
276n/a
277n/a def _assert_python(self, pre_args, **kwargs):
278n/a # We include sys.exit() to prevent the test runner from hanging
279n/a # whenever python is found.
280n/a args = pre_args + ["import sys; sys.exit(47)"]
281n/a p = subprocess.Popen(args, **kwargs)
282n/a p.wait()
283n/a self.assertEqual(47, p.returncode)
284n/a
285n/a def test_executable(self):
286n/a # Check that the executable argument works.
287n/a #
288n/a # On Unix (non-Mac and non-Windows), Python looks at args[0] to
289n/a # determine where its standard library is, so we need the directory
290n/a # of args[0] to be valid for the Popen() call to Python to succeed.
291n/a # See also issue #16170 and issue #7774.
292n/a doesnotexist = os.path.join(os.path.dirname(sys.executable),
293n/a "doesnotexist")
294n/a self._assert_python([doesnotexist, "-c"], executable=sys.executable)
295n/a
296n/a def test_executable_takes_precedence(self):
297n/a # Check that the executable argument takes precedence over args[0].
298n/a #
299n/a # Verify first that the call succeeds without the executable arg.
300n/a pre_args = [sys.executable, "-c"]
301n/a self._assert_python(pre_args)
302n/a self.assertRaises((FileNotFoundError, PermissionError),
303n/a self._assert_python, pre_args,
304n/a executable="doesnotexist")
305n/a
306n/a @unittest.skipIf(mswindows, "executable argument replaces shell")
307n/a def test_executable_replaces_shell(self):
308n/a # Check that the executable argument replaces the default shell
309n/a # when shell=True.
310n/a self._assert_python([], executable=sys.executable, shell=True)
311n/a
312n/a # For use in the test_cwd* tests below.
313n/a def _normalize_cwd(self, cwd):
314n/a # Normalize an expected cwd (for Tru64 support).
315n/a # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
316n/a # strings. See bug #1063571.
317n/a with support.change_cwd(cwd):
318n/a return os.getcwd()
319n/a
320n/a # For use in the test_cwd* tests below.
321n/a def _split_python_path(self):
322n/a # Return normalized (python_dir, python_base).
323n/a python_path = os.path.realpath(sys.executable)
324n/a return os.path.split(python_path)
325n/a
326n/a # For use in the test_cwd* tests below.
327n/a def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
328n/a # Invoke Python via Popen, and assert that (1) the call succeeds,
329n/a # and that (2) the current working directory of the child process
330n/a # matches *expected_cwd*.
331n/a p = subprocess.Popen([python_arg, "-c",
332n/a "import os, sys; "
333n/a "sys.stdout.write(os.getcwd()); "
334n/a "sys.exit(47)"],
335n/a stdout=subprocess.PIPE,
336n/a **kwargs)
337n/a self.addCleanup(p.stdout.close)
338n/a p.wait()
339n/a self.assertEqual(47, p.returncode)
340n/a normcase = os.path.normcase
341n/a self.assertEqual(normcase(expected_cwd),
342n/a normcase(p.stdout.read().decode("utf-8")))
343n/a
344n/a def test_cwd(self):
345n/a # Check that cwd changes the cwd for the child process.
346n/a temp_dir = tempfile.gettempdir()
347n/a temp_dir = self._normalize_cwd(temp_dir)
348n/a self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
349n/a
350n/a @unittest.skipIf(mswindows, "pending resolution of issue #15533")
351n/a def test_cwd_with_relative_arg(self):
352n/a # Check that Popen looks for args[0] relative to cwd if args[0]
353n/a # is relative.
354n/a python_dir, python_base = self._split_python_path()
355n/a rel_python = os.path.join(os.curdir, python_base)
356n/a with support.temp_cwd() as wrong_dir:
357n/a # Before calling with the correct cwd, confirm that the call fails
358n/a # without cwd and with the wrong cwd.
359n/a self.assertRaises(FileNotFoundError, subprocess.Popen,
360n/a [rel_python])
361n/a self.assertRaises(FileNotFoundError, subprocess.Popen,
362n/a [rel_python], cwd=wrong_dir)
363n/a python_dir = self._normalize_cwd(python_dir)
364n/a self._assert_cwd(python_dir, rel_python, cwd=python_dir)
365n/a
366n/a @unittest.skipIf(mswindows, "pending resolution of issue #15533")
367n/a def test_cwd_with_relative_executable(self):
368n/a # Check that Popen looks for executable relative to cwd if executable
369n/a # is relative (and that executable takes precedence over args[0]).
370n/a python_dir, python_base = self._split_python_path()
371n/a rel_python = os.path.join(os.curdir, python_base)
372n/a doesntexist = "somethingyoudonthave"
373n/a with support.temp_cwd() as wrong_dir:
374n/a # Before calling with the correct cwd, confirm that the call fails
375n/a # without cwd and with the wrong cwd.
376n/a self.assertRaises(FileNotFoundError, subprocess.Popen,
377n/a [doesntexist], executable=rel_python)
378n/a self.assertRaises(FileNotFoundError, subprocess.Popen,
379n/a [doesntexist], executable=rel_python,
380n/a cwd=wrong_dir)
381n/a python_dir = self._normalize_cwd(python_dir)
382n/a self._assert_cwd(python_dir, doesntexist, executable=rel_python,
383n/a cwd=python_dir)
384n/a
385n/a def test_cwd_with_absolute_arg(self):
386n/a # Check that Popen can find the executable when the cwd is wrong
387n/a # if args[0] is an absolute path.
388n/a python_dir, python_base = self._split_python_path()
389n/a abs_python = os.path.join(python_dir, python_base)
390n/a rel_python = os.path.join(os.curdir, python_base)
391n/a with support.temp_dir() as wrong_dir:
392n/a # Before calling with an absolute path, confirm that using a
393n/a # relative path fails.
394n/a self.assertRaises(FileNotFoundError, subprocess.Popen,
395n/a [rel_python], cwd=wrong_dir)
396n/a wrong_dir = self._normalize_cwd(wrong_dir)
397n/a self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
398n/a
399n/a @unittest.skipIf(sys.base_prefix != sys.prefix,
400n/a 'Test is not venv-compatible')
401n/a def test_executable_with_cwd(self):
402n/a python_dir, python_base = self._split_python_path()
403n/a python_dir = self._normalize_cwd(python_dir)
404n/a self._assert_cwd(python_dir, "somethingyoudonthave",
405n/a executable=sys.executable, cwd=python_dir)
406n/a
407n/a @unittest.skipIf(sys.base_prefix != sys.prefix,
408n/a 'Test is not venv-compatible')
409n/a @unittest.skipIf(sysconfig.is_python_build(),
410n/a "need an installed Python. See #7774")
411n/a def test_executable_without_cwd(self):
412n/a # For a normal installation, it should work without 'cwd'
413n/a # argument. For test runs in the build directory, see #7774.
414n/a self._assert_cwd(os.getcwd(), "somethingyoudonthave",
415n/a executable=sys.executable)
416n/a
417n/a def test_stdin_pipe(self):
418n/a # stdin redirection
419n/a p = subprocess.Popen([sys.executable, "-c",
420n/a 'import sys; sys.exit(sys.stdin.read() == "pear")'],
421n/a stdin=subprocess.PIPE)
422n/a p.stdin.write(b"pear")
423n/a p.stdin.close()
424n/a p.wait()
425n/a self.assertEqual(p.returncode, 1)
426n/a
427n/a def test_stdin_filedes(self):
428n/a # stdin is set to open file descriptor
429n/a tf = tempfile.TemporaryFile()
430n/a self.addCleanup(tf.close)
431n/a d = tf.fileno()
432n/a os.write(d, b"pear")
433n/a os.lseek(d, 0, 0)
434n/a p = subprocess.Popen([sys.executable, "-c",
435n/a 'import sys; sys.exit(sys.stdin.read() == "pear")'],
436n/a stdin=d)
437n/a p.wait()
438n/a self.assertEqual(p.returncode, 1)
439n/a
440n/a def test_stdin_fileobj(self):
441n/a # stdin is set to open file object
442n/a tf = tempfile.TemporaryFile()
443n/a self.addCleanup(tf.close)
444n/a tf.write(b"pear")
445n/a tf.seek(0)
446n/a p = subprocess.Popen([sys.executable, "-c",
447n/a 'import sys; sys.exit(sys.stdin.read() == "pear")'],
448n/a stdin=tf)
449n/a p.wait()
450n/a self.assertEqual(p.returncode, 1)
451n/a
452n/a def test_stdout_pipe(self):
453n/a # stdout redirection
454n/a p = subprocess.Popen([sys.executable, "-c",
455n/a 'import sys; sys.stdout.write("orange")'],
456n/a stdout=subprocess.PIPE)
457n/a with p:
458n/a self.assertEqual(p.stdout.read(), b"orange")
459n/a
460n/a def test_stdout_filedes(self):
461n/a # stdout is set to open file descriptor
462n/a tf = tempfile.TemporaryFile()
463n/a self.addCleanup(tf.close)
464n/a d = tf.fileno()
465n/a p = subprocess.Popen([sys.executable, "-c",
466n/a 'import sys; sys.stdout.write("orange")'],
467n/a stdout=d)
468n/a p.wait()
469n/a os.lseek(d, 0, 0)
470n/a self.assertEqual(os.read(d, 1024), b"orange")
471n/a
472n/a def test_stdout_fileobj(self):
473n/a # stdout is set to open file object
474n/a tf = tempfile.TemporaryFile()
475n/a self.addCleanup(tf.close)
476n/a p = subprocess.Popen([sys.executable, "-c",
477n/a 'import sys; sys.stdout.write("orange")'],
478n/a stdout=tf)
479n/a p.wait()
480n/a tf.seek(0)
481n/a self.assertEqual(tf.read(), b"orange")
482n/a
483n/a def test_stderr_pipe(self):
484n/a # stderr redirection
485n/a p = subprocess.Popen([sys.executable, "-c",
486n/a 'import sys; sys.stderr.write("strawberry")'],
487n/a stderr=subprocess.PIPE)
488n/a with p:
489n/a self.assertStderrEqual(p.stderr.read(), b"strawberry")
490n/a
491n/a def test_stderr_filedes(self):
492n/a # stderr is set to open file descriptor
493n/a tf = tempfile.TemporaryFile()
494n/a self.addCleanup(tf.close)
495n/a d = tf.fileno()
496n/a p = subprocess.Popen([sys.executable, "-c",
497n/a 'import sys; sys.stderr.write("strawberry")'],
498n/a stderr=d)
499n/a p.wait()
500n/a os.lseek(d, 0, 0)
501n/a self.assertStderrEqual(os.read(d, 1024), b"strawberry")
502n/a
503n/a def test_stderr_fileobj(self):
504n/a # stderr is set to open file object
505n/a tf = tempfile.TemporaryFile()
506n/a self.addCleanup(tf.close)
507n/a p = subprocess.Popen([sys.executable, "-c",
508n/a 'import sys; sys.stderr.write("strawberry")'],
509n/a stderr=tf)
510n/a p.wait()
511n/a tf.seek(0)
512n/a self.assertStderrEqual(tf.read(), b"strawberry")
513n/a
514n/a def test_stderr_redirect_with_no_stdout_redirect(self):
515n/a # test stderr=STDOUT while stdout=None (not set)
516n/a
517n/a # - grandchild prints to stderr
518n/a # - child redirects grandchild's stderr to its stdout
519n/a # - the parent should get grandchild's stderr in child's stdout
520n/a p = subprocess.Popen([sys.executable, "-c",
521n/a 'import sys, subprocess;'
522n/a 'rc = subprocess.call([sys.executable, "-c",'
523n/a ' "import sys;"'
524n/a ' "sys.stderr.write(\'42\')"],'
525n/a ' stderr=subprocess.STDOUT);'
526n/a 'sys.exit(rc)'],
527n/a stdout=subprocess.PIPE,
528n/a stderr=subprocess.PIPE)
529n/a stdout, stderr = p.communicate()
530n/a #NOTE: stdout should get stderr from grandchild
531n/a self.assertStderrEqual(stdout, b'42')
532n/a self.assertStderrEqual(stderr, b'') # should be empty
533n/a self.assertEqual(p.returncode, 0)
534n/a
535n/a def test_stdout_stderr_pipe(self):
536n/a # capture stdout and stderr to the same pipe
537n/a p = subprocess.Popen([sys.executable, "-c",
538n/a 'import sys;'
539n/a 'sys.stdout.write("apple");'
540n/a 'sys.stdout.flush();'
541n/a 'sys.stderr.write("orange")'],
542n/a stdout=subprocess.PIPE,
543n/a stderr=subprocess.STDOUT)
544n/a with p:
545n/a self.assertStderrEqual(p.stdout.read(), b"appleorange")
546n/a
547n/a def test_stdout_stderr_file(self):
548n/a # capture stdout and stderr to the same open file
549n/a tf = tempfile.TemporaryFile()
550n/a self.addCleanup(tf.close)
551n/a p = subprocess.Popen([sys.executable, "-c",
552n/a 'import sys;'
553n/a 'sys.stdout.write("apple");'
554n/a 'sys.stdout.flush();'
555n/a 'sys.stderr.write("orange")'],
556n/a stdout=tf,
557n/a stderr=tf)
558n/a p.wait()
559n/a tf.seek(0)
560n/a self.assertStderrEqual(tf.read(), b"appleorange")
561n/a
562n/a def test_stdout_filedes_of_stdout(self):
563n/a # stdout is set to 1 (#1531862).
564n/a # To avoid printing the text on stdout, we do something similar to
565n/a # test_stdout_none (see above). The parent subprocess calls the child
566n/a # subprocess passing stdout=1, and this test uses stdout=PIPE in
567n/a # order to capture and check the output of the parent. See #11963.
568n/a code = ('import sys, subprocess; '
569n/a 'rc = subprocess.call([sys.executable, "-c", '
570n/a ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
571n/a 'b\'test with stdout=1\'))"], stdout=1); '
572n/a 'assert rc == 18')
573n/a p = subprocess.Popen([sys.executable, "-c", code],
574n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
575n/a self.addCleanup(p.stdout.close)
576n/a self.addCleanup(p.stderr.close)
577n/a out, err = p.communicate()
578n/a self.assertEqual(p.returncode, 0, err)
579n/a self.assertEqual(out.rstrip(), b'test with stdout=1')
580n/a
581n/a def test_stdout_devnull(self):
582n/a p = subprocess.Popen([sys.executable, "-c",
583n/a 'for i in range(10240):'
584n/a 'print("x" * 1024)'],
585n/a stdout=subprocess.DEVNULL)
586n/a p.wait()
587n/a self.assertEqual(p.stdout, None)
588n/a
589n/a def test_stderr_devnull(self):
590n/a p = subprocess.Popen([sys.executable, "-c",
591n/a 'import sys\n'
592n/a 'for i in range(10240):'
593n/a 'sys.stderr.write("x" * 1024)'],
594n/a stderr=subprocess.DEVNULL)
595n/a p.wait()
596n/a self.assertEqual(p.stderr, None)
597n/a
598n/a def test_stdin_devnull(self):
599n/a p = subprocess.Popen([sys.executable, "-c",
600n/a 'import sys;'
601n/a 'sys.stdin.read(1)'],
602n/a stdin=subprocess.DEVNULL)
603n/a p.wait()
604n/a self.assertEqual(p.stdin, None)
605n/a
606n/a def test_env(self):
607n/a newenv = os.environ.copy()
608n/a newenv["FRUIT"] = "orange"
609n/a with subprocess.Popen([sys.executable, "-c",
610n/a 'import sys,os;'
611n/a 'sys.stdout.write(os.getenv("FRUIT"))'],
612n/a stdout=subprocess.PIPE,
613n/a env=newenv) as p:
614n/a stdout, stderr = p.communicate()
615n/a self.assertEqual(stdout, b"orange")
616n/a
617n/a # Windows requires at least the SYSTEMROOT environment variable to start
618n/a # Python
619n/a @unittest.skipIf(sys.platform == 'win32',
620n/a 'cannot test an empty env on Windows')
621n/a @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') is not None,
622n/a 'the python library cannot be loaded '
623n/a 'with an empty environment')
624n/a def test_empty_env(self):
625n/a with subprocess.Popen([sys.executable, "-c",
626n/a 'import os; '
627n/a 'print(list(os.environ.keys()))'],
628n/a stdout=subprocess.PIPE,
629n/a env={}) as p:
630n/a stdout, stderr = p.communicate()
631n/a self.assertIn(stdout.strip(),
632n/a (b"[]",
633n/a # Mac OS X adds __CF_USER_TEXT_ENCODING variable to an empty
634n/a # environment
635n/a b"['__CF_USER_TEXT_ENCODING']"))
636n/a
637n/a def test_communicate_stdin(self):
638n/a p = subprocess.Popen([sys.executable, "-c",
639n/a 'import sys;'
640n/a 'sys.exit(sys.stdin.read() == "pear")'],
641n/a stdin=subprocess.PIPE)
642n/a p.communicate(b"pear")
643n/a self.assertEqual(p.returncode, 1)
644n/a
645n/a def test_communicate_stdout(self):
646n/a p = subprocess.Popen([sys.executable, "-c",
647n/a 'import sys; sys.stdout.write("pineapple")'],
648n/a stdout=subprocess.PIPE)
649n/a (stdout, stderr) = p.communicate()
650n/a self.assertEqual(stdout, b"pineapple")
651n/a self.assertEqual(stderr, None)
652n/a
653n/a def test_communicate_stderr(self):
654n/a p = subprocess.Popen([sys.executable, "-c",
655n/a 'import sys; sys.stderr.write("pineapple")'],
656n/a stderr=subprocess.PIPE)
657n/a (stdout, stderr) = p.communicate()
658n/a self.assertEqual(stdout, None)
659n/a self.assertStderrEqual(stderr, b"pineapple")
660n/a
661n/a def test_communicate(self):
662n/a p = subprocess.Popen([sys.executable, "-c",
663n/a 'import sys,os;'
664n/a 'sys.stderr.write("pineapple");'
665n/a 'sys.stdout.write(sys.stdin.read())'],
666n/a stdin=subprocess.PIPE,
667n/a stdout=subprocess.PIPE,
668n/a stderr=subprocess.PIPE)
669n/a self.addCleanup(p.stdout.close)
670n/a self.addCleanup(p.stderr.close)
671n/a self.addCleanup(p.stdin.close)
672n/a (stdout, stderr) = p.communicate(b"banana")
673n/a self.assertEqual(stdout, b"banana")
674n/a self.assertStderrEqual(stderr, b"pineapple")
675n/a
676n/a def test_communicate_timeout(self):
677n/a p = subprocess.Popen([sys.executable, "-c",
678n/a 'import sys,os,time;'
679n/a 'sys.stderr.write("pineapple\\n");'
680n/a 'time.sleep(1);'
681n/a 'sys.stderr.write("pear\\n");'
682n/a 'sys.stdout.write(sys.stdin.read())'],
683n/a universal_newlines=True,
684n/a stdin=subprocess.PIPE,
685n/a stdout=subprocess.PIPE,
686n/a stderr=subprocess.PIPE)
687n/a self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
688n/a timeout=0.3)
689n/a # Make sure we can keep waiting for it, and that we get the whole output
690n/a # after it completes.
691n/a (stdout, stderr) = p.communicate()
692n/a self.assertEqual(stdout, "banana")
693n/a self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
694n/a
695n/a def test_communicate_timeout_large_output(self):
696n/a # Test an expiring timeout while the child is outputting lots of data.
697n/a p = subprocess.Popen([sys.executable, "-c",
698n/a 'import sys,os,time;'
699n/a 'sys.stdout.write("a" * (64 * 1024));'
700n/a 'time.sleep(0.2);'
701n/a 'sys.stdout.write("a" * (64 * 1024));'
702n/a 'time.sleep(0.2);'
703n/a 'sys.stdout.write("a" * (64 * 1024));'
704n/a 'time.sleep(0.2);'
705n/a 'sys.stdout.write("a" * (64 * 1024));'],
706n/a stdout=subprocess.PIPE)
707n/a self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
708n/a (stdout, _) = p.communicate()
709n/a self.assertEqual(len(stdout), 4 * 64 * 1024)
710n/a
711n/a # Test for the fd leak reported in http://bugs.python.org/issue2791.
712n/a def test_communicate_pipe_fd_leak(self):
713n/a for stdin_pipe in (False, True):
714n/a for stdout_pipe in (False, True):
715n/a for stderr_pipe in (False, True):
716n/a options = {}
717n/a if stdin_pipe:
718n/a options['stdin'] = subprocess.PIPE
719n/a if stdout_pipe:
720n/a options['stdout'] = subprocess.PIPE
721n/a if stderr_pipe:
722n/a options['stderr'] = subprocess.PIPE
723n/a if not options:
724n/a continue
725n/a p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
726n/a p.communicate()
727n/a if p.stdin is not None:
728n/a self.assertTrue(p.stdin.closed)
729n/a if p.stdout is not None:
730n/a self.assertTrue(p.stdout.closed)
731n/a if p.stderr is not None:
732n/a self.assertTrue(p.stderr.closed)
733n/a
734n/a def test_communicate_returns(self):
735n/a # communicate() should return None if no redirection is active
736n/a p = subprocess.Popen([sys.executable, "-c",
737n/a "import sys; sys.exit(47)"])
738n/a (stdout, stderr) = p.communicate()
739n/a self.assertEqual(stdout, None)
740n/a self.assertEqual(stderr, None)
741n/a
742n/a def test_communicate_pipe_buf(self):
743n/a # communicate() with writes larger than pipe_buf
744n/a # This test will probably deadlock rather than fail, if
745n/a # communicate() does not work properly.
746n/a x, y = os.pipe()
747n/a os.close(x)
748n/a os.close(y)
749n/a p = subprocess.Popen([sys.executable, "-c",
750n/a 'import sys,os;'
751n/a 'sys.stdout.write(sys.stdin.read(47));'
752n/a 'sys.stderr.write("x" * %d);'
753n/a 'sys.stdout.write(sys.stdin.read())' %
754n/a support.PIPE_MAX_SIZE],
755n/a stdin=subprocess.PIPE,
756n/a stdout=subprocess.PIPE,
757n/a stderr=subprocess.PIPE)
758n/a self.addCleanup(p.stdout.close)
759n/a self.addCleanup(p.stderr.close)
760n/a self.addCleanup(p.stdin.close)
761n/a string_to_write = b"a" * support.PIPE_MAX_SIZE
762n/a (stdout, stderr) = p.communicate(string_to_write)
763n/a self.assertEqual(stdout, string_to_write)
764n/a
765n/a def test_writes_before_communicate(self):
766n/a # stdin.write before communicate()
767n/a p = subprocess.Popen([sys.executable, "-c",
768n/a 'import sys,os;'
769n/a 'sys.stdout.write(sys.stdin.read())'],
770n/a stdin=subprocess.PIPE,
771n/a stdout=subprocess.PIPE,
772n/a stderr=subprocess.PIPE)
773n/a self.addCleanup(p.stdout.close)
774n/a self.addCleanup(p.stderr.close)
775n/a self.addCleanup(p.stdin.close)
776n/a p.stdin.write(b"banana")
777n/a (stdout, stderr) = p.communicate(b"split")
778n/a self.assertEqual(stdout, b"bananasplit")
779n/a self.assertStderrEqual(stderr, b"")
780n/a
781n/a def test_universal_newlines(self):
782n/a p = subprocess.Popen([sys.executable, "-c",
783n/a 'import sys,os;' + SETBINARY +
784n/a 'buf = sys.stdout.buffer;'
785n/a 'buf.write(sys.stdin.readline().encode());'
786n/a 'buf.flush();'
787n/a 'buf.write(b"line2\\n");'
788n/a 'buf.flush();'
789n/a 'buf.write(sys.stdin.read().encode());'
790n/a 'buf.flush();'
791n/a 'buf.write(b"line4\\n");'
792n/a 'buf.flush();'
793n/a 'buf.write(b"line5\\r\\n");'
794n/a 'buf.flush();'
795n/a 'buf.write(b"line6\\r");'
796n/a 'buf.flush();'
797n/a 'buf.write(b"\\nline7");'
798n/a 'buf.flush();'
799n/a 'buf.write(b"\\nline8");'],
800n/a stdin=subprocess.PIPE,
801n/a stdout=subprocess.PIPE,
802n/a universal_newlines=1)
803n/a with p:
804n/a p.stdin.write("line1\n")
805n/a p.stdin.flush()
806n/a self.assertEqual(p.stdout.readline(), "line1\n")
807n/a p.stdin.write("line3\n")
808n/a p.stdin.close()
809n/a self.addCleanup(p.stdout.close)
810n/a self.assertEqual(p.stdout.readline(),
811n/a "line2\n")
812n/a self.assertEqual(p.stdout.read(6),
813n/a "line3\n")
814n/a self.assertEqual(p.stdout.read(),
815n/a "line4\nline5\nline6\nline7\nline8")
816n/a
817n/a def test_universal_newlines_communicate(self):
818n/a # universal newlines through communicate()
819n/a p = subprocess.Popen([sys.executable, "-c",
820n/a 'import sys,os;' + SETBINARY +
821n/a 'buf = sys.stdout.buffer;'
822n/a 'buf.write(b"line2\\n");'
823n/a 'buf.flush();'
824n/a 'buf.write(b"line4\\n");'
825n/a 'buf.flush();'
826n/a 'buf.write(b"line5\\r\\n");'
827n/a 'buf.flush();'
828n/a 'buf.write(b"line6\\r");'
829n/a 'buf.flush();'
830n/a 'buf.write(b"\\nline7");'
831n/a 'buf.flush();'
832n/a 'buf.write(b"\\nline8");'],
833n/a stderr=subprocess.PIPE,
834n/a stdout=subprocess.PIPE,
835n/a universal_newlines=1)
836n/a self.addCleanup(p.stdout.close)
837n/a self.addCleanup(p.stderr.close)
838n/a (stdout, stderr) = p.communicate()
839n/a self.assertEqual(stdout,
840n/a "line2\nline4\nline5\nline6\nline7\nline8")
841n/a
842n/a def test_universal_newlines_communicate_stdin(self):
843n/a # universal newlines through communicate(), with only stdin
844n/a p = subprocess.Popen([sys.executable, "-c",
845n/a 'import sys,os;' + SETBINARY + textwrap.dedent('''
846n/a s = sys.stdin.readline()
847n/a assert s == "line1\\n", repr(s)
848n/a s = sys.stdin.read()
849n/a assert s == "line3\\n", repr(s)
850n/a ''')],
851n/a stdin=subprocess.PIPE,
852n/a universal_newlines=1)
853n/a (stdout, stderr) = p.communicate("line1\nline3\n")
854n/a self.assertEqual(p.returncode, 0)
855n/a
856n/a def test_universal_newlines_communicate_input_none(self):
857n/a # Test communicate(input=None) with universal newlines.
858n/a #
859n/a # We set stdout to PIPE because, as of this writing, a different
860n/a # code path is tested when the number of pipes is zero or one.
861n/a p = subprocess.Popen([sys.executable, "-c", "pass"],
862n/a stdin=subprocess.PIPE,
863n/a stdout=subprocess.PIPE,
864n/a universal_newlines=True)
865n/a p.communicate()
866n/a self.assertEqual(p.returncode, 0)
867n/a
868n/a def test_universal_newlines_communicate_stdin_stdout_stderr(self):
869n/a # universal newlines through communicate(), with stdin, stdout, stderr
870n/a p = subprocess.Popen([sys.executable, "-c",
871n/a 'import sys,os;' + SETBINARY + textwrap.dedent('''
872n/a s = sys.stdin.buffer.readline()
873n/a sys.stdout.buffer.write(s)
874n/a sys.stdout.buffer.write(b"line2\\r")
875n/a sys.stderr.buffer.write(b"eline2\\n")
876n/a s = sys.stdin.buffer.read()
877n/a sys.stdout.buffer.write(s)
878n/a sys.stdout.buffer.write(b"line4\\n")
879n/a sys.stdout.buffer.write(b"line5\\r\\n")
880n/a sys.stderr.buffer.write(b"eline6\\r")
881n/a sys.stderr.buffer.write(b"eline7\\r\\nz")
882n/a ''')],
883n/a stdin=subprocess.PIPE,
884n/a stderr=subprocess.PIPE,
885n/a stdout=subprocess.PIPE,
886n/a universal_newlines=True)
887n/a self.addCleanup(p.stdout.close)
888n/a self.addCleanup(p.stderr.close)
889n/a (stdout, stderr) = p.communicate("line1\nline3\n")
890n/a self.assertEqual(p.returncode, 0)
891n/a self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
892n/a # Python debug build push something like "[42442 refs]\n"
893n/a # to stderr at exit of subprocess.
894n/a # Don't use assertStderrEqual because it strips CR and LF from output.
895n/a self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
896n/a
897n/a def test_universal_newlines_communicate_encodings(self):
898n/a # Check that universal newlines mode works for various encodings,
899n/a # in particular for encodings in the UTF-16 and UTF-32 families.
900n/a # See issue #15595.
901n/a #
902n/a # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
903n/a # without, and UTF-16 and UTF-32.
904n/a for encoding in ['utf-16', 'utf-32-be']:
905n/a code = ("import sys; "
906n/a r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
907n/a encoding)
908n/a args = [sys.executable, '-c', code]
909n/a # We set stdin to be non-None because, as of this writing,
910n/a # a different code path is used when the number of pipes is
911n/a # zero or one.
912n/a popen = subprocess.Popen(args,
913n/a stdin=subprocess.PIPE,
914n/a stdout=subprocess.PIPE,
915n/a encoding=encoding)
916n/a stdout, stderr = popen.communicate(input='')
917n/a self.assertEqual(stdout, '1\n2\n3\n4')
918n/a
919n/a def test_communicate_errors(self):
920n/a for errors, expected in [
921n/a ('ignore', ''),
922n/a ('replace', '\ufffd\ufffd'),
923n/a ('surrogateescape', '\udc80\udc80'),
924n/a ('backslashreplace', '\\x80\\x80'),
925n/a ]:
926n/a code = ("import sys; "
927n/a r"sys.stdout.buffer.write(b'[\x80\x80]')")
928n/a args = [sys.executable, '-c', code]
929n/a # We set stdin to be non-None because, as of this writing,
930n/a # a different code path is used when the number of pipes is
931n/a # zero or one.
932n/a popen = subprocess.Popen(args,
933n/a stdin=subprocess.PIPE,
934n/a stdout=subprocess.PIPE,
935n/a encoding='utf-8',
936n/a errors=errors)
937n/a stdout, stderr = popen.communicate(input='')
938n/a self.assertEqual(stdout, '[{}]'.format(expected))
939n/a
940n/a def test_no_leaking(self):
941n/a # Make sure we leak no resources
942n/a if not mswindows:
943n/a max_handles = 1026 # too much for most UNIX systems
944n/a else:
945n/a max_handles = 2050 # too much for (at least some) Windows setups
946n/a handles = []
947n/a tmpdir = tempfile.mkdtemp()
948n/a try:
949n/a for i in range(max_handles):
950n/a try:
951n/a tmpfile = os.path.join(tmpdir, support.TESTFN)
952n/a handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
953n/a except OSError as e:
954n/a if e.errno != errno.EMFILE:
955n/a raise
956n/a break
957n/a else:
958n/a self.skipTest("failed to reach the file descriptor limit "
959n/a "(tried %d)" % max_handles)
960n/a # Close a couple of them (should be enough for a subprocess)
961n/a for i in range(10):
962n/a os.close(handles.pop())
963n/a # Loop creating some subprocesses. If one of them leaks some fds,
964n/a # the next loop iteration will fail by reaching the max fd limit.
965n/a for i in range(15):
966n/a p = subprocess.Popen([sys.executable, "-c",
967n/a "import sys;"
968n/a "sys.stdout.write(sys.stdin.read())"],
969n/a stdin=subprocess.PIPE,
970n/a stdout=subprocess.PIPE,
971n/a stderr=subprocess.PIPE)
972n/a data = p.communicate(b"lime")[0]
973n/a self.assertEqual(data, b"lime")
974n/a finally:
975n/a for h in handles:
976n/a os.close(h)
977n/a shutil.rmtree(tmpdir)
978n/a
979n/a def test_list2cmdline(self):
980n/a self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
981n/a '"a b c" d e')
982n/a self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
983n/a 'ab\\"c \\ d')
984n/a self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
985n/a 'ab\\"c " \\\\" d')
986n/a self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
987n/a 'a\\\\\\b "de fg" h')
988n/a self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
989n/a 'a\\\\\\"b c d')
990n/a self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
991n/a '"a\\\\b c" d e')
992n/a self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
993n/a '"a\\\\b\\ c" d e')
994n/a self.assertEqual(subprocess.list2cmdline(['ab', '']),
995n/a 'ab ""')
996n/a
997n/a def test_poll(self):
998n/a p = subprocess.Popen([sys.executable, "-c",
999n/a "import os; os.read(0, 1)"],
1000n/a stdin=subprocess.PIPE)
1001n/a self.addCleanup(p.stdin.close)
1002n/a self.assertIsNone(p.poll())
1003n/a os.write(p.stdin.fileno(), b'A')
1004n/a p.wait()
1005n/a # Subsequent invocations should just return the returncode
1006n/a self.assertEqual(p.poll(), 0)
1007n/a
1008n/a def test_wait(self):
1009n/a p = subprocess.Popen([sys.executable, "-c", "pass"])
1010n/a self.assertEqual(p.wait(), 0)
1011n/a # Subsequent invocations should just return the returncode
1012n/a self.assertEqual(p.wait(), 0)
1013n/a
1014n/a def test_wait_timeout(self):
1015n/a p = subprocess.Popen([sys.executable,
1016n/a "-c", "import time; time.sleep(0.3)"])
1017n/a with self.assertRaises(subprocess.TimeoutExpired) as c:
1018n/a p.wait(timeout=0.0001)
1019n/a self.assertIn("0.0001", str(c.exception)) # For coverage of __str__.
1020n/a # Some heavily loaded buildbots (sparc Debian 3.x) require this much
1021n/a # time to start.
1022n/a self.assertEqual(p.wait(timeout=3), 0)
1023n/a
1024n/a def test_invalid_bufsize(self):
1025n/a # an invalid type of the bufsize argument should raise
1026n/a # TypeError.
1027n/a with self.assertRaises(TypeError):
1028n/a subprocess.Popen([sys.executable, "-c", "pass"], "orange")
1029n/a
1030n/a def test_bufsize_is_none(self):
1031n/a # bufsize=None should be the same as bufsize=0.
1032n/a p = subprocess.Popen([sys.executable, "-c", "pass"], None)
1033n/a self.assertEqual(p.wait(), 0)
1034n/a # Again with keyword arg
1035n/a p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
1036n/a self.assertEqual(p.wait(), 0)
1037n/a
1038n/a def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1039n/a # subprocess may deadlock with bufsize=1, see issue #21332
1040n/a with subprocess.Popen([sys.executable, "-c", "import sys;"
1041n/a "sys.stdout.write(sys.stdin.readline());"
1042n/a "sys.stdout.flush()"],
1043n/a stdin=subprocess.PIPE,
1044n/a stdout=subprocess.PIPE,
1045n/a stderr=subprocess.DEVNULL,
1046n/a bufsize=1,
1047n/a universal_newlines=universal_newlines) as p:
1048n/a p.stdin.write(line) # expect that it flushes the line in text mode
1049n/a os.close(p.stdin.fileno()) # close it without flushing the buffer
1050n/a read_line = p.stdout.readline()
1051n/a try:
1052n/a p.stdin.close()
1053n/a except OSError:
1054n/a pass
1055n/a p.stdin = None
1056n/a self.assertEqual(p.returncode, 0)
1057n/a self.assertEqual(read_line, expected)
1058n/a
1059n/a def test_bufsize_equal_one_text_mode(self):
1060n/a # line is flushed in text mode with bufsize=1.
1061n/a # we should get the full line in return
1062n/a line = "line\n"
1063n/a self._test_bufsize_equal_one(line, line, universal_newlines=True)
1064n/a
1065n/a def test_bufsize_equal_one_binary_mode(self):
1066n/a # line is not flushed in binary mode with bufsize=1.
1067n/a # we should get empty response
1068n/a line = b'line' + os.linesep.encode() # assume ascii-based locale
1069n/a self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1070n/a
1071n/a def test_leaking_fds_on_error(self):
1072n/a # see bug #5179: Popen leaks file descriptors to PIPEs if
1073n/a # the child fails to execute; this will eventually exhaust
1074n/a # the maximum number of open fds. 1024 seems a very common
1075n/a # value for that limit, but Windows has 2048, so we loop
1076n/a # 1024 times (each call leaked two fds).
1077n/a for i in range(1024):
1078n/a with self.assertRaises(OSError) as c:
1079n/a subprocess.Popen(['nonexisting_i_hope'],
1080n/a stdout=subprocess.PIPE,
1081n/a stderr=subprocess.PIPE)
1082n/a # ignore errors that indicate the command was not found
1083n/a if c.exception.errno not in (errno.ENOENT, errno.EACCES):
1084n/a raise c.exception
1085n/a
1086n/a @unittest.skipIf(threading is None, "threading required")
1087n/a def test_double_close_on_error(self):
1088n/a # Issue #18851
1089n/a fds = []
1090n/a def open_fds():
1091n/a for i in range(20):
1092n/a fds.extend(os.pipe())
1093n/a time.sleep(0.001)
1094n/a t = threading.Thread(target=open_fds)
1095n/a t.start()
1096n/a try:
1097n/a with self.assertRaises(EnvironmentError):
1098n/a subprocess.Popen(['nonexisting_i_hope'],
1099n/a stdin=subprocess.PIPE,
1100n/a stdout=subprocess.PIPE,
1101n/a stderr=subprocess.PIPE)
1102n/a finally:
1103n/a t.join()
1104n/a exc = None
1105n/a for fd in fds:
1106n/a # If a double close occurred, some of those fds will
1107n/a # already have been closed by mistake, and os.close()
1108n/a # here will raise.
1109n/a try:
1110n/a os.close(fd)
1111n/a except OSError as e:
1112n/a exc = e
1113n/a if exc is not None:
1114n/a raise exc
1115n/a
1116n/a @unittest.skipIf(threading is None, "threading required")
1117n/a def test_threadsafe_wait(self):
1118n/a """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1119n/a proc = subprocess.Popen([sys.executable, '-c',
1120n/a 'import time; time.sleep(12)'])
1121n/a self.assertEqual(proc.returncode, None)
1122n/a results = []
1123n/a
1124n/a def kill_proc_timer_thread():
1125n/a results.append(('thread-start-poll-result', proc.poll()))
1126n/a # terminate it from the thread and wait for the result.
1127n/a proc.kill()
1128n/a proc.wait()
1129n/a results.append(('thread-after-kill-and-wait', proc.returncode))
1130n/a # this wait should be a no-op given the above.
1131n/a proc.wait()
1132n/a results.append(('thread-after-second-wait', proc.returncode))
1133n/a
1134n/a # This is a timing sensitive test, the failure mode is
1135n/a # triggered when both the main thread and this thread are in
1136n/a # the wait() call at once. The delay here is to allow the
1137n/a # main thread to most likely be blocked in its wait() call.
1138n/a t = threading.Timer(0.2, kill_proc_timer_thread)
1139n/a t.start()
1140n/a
1141n/a if mswindows:
1142n/a expected_errorcode = 1
1143n/a else:
1144n/a # Should be -9 because of the proc.kill() from the thread.
1145n/a expected_errorcode = -9
1146n/a
1147n/a # Wait for the process to finish; the thread should kill it
1148n/a # long before it finishes on its own. Supplying a timeout
1149n/a # triggers a different code path for better coverage.
1150n/a proc.wait(timeout=20)
1151n/a self.assertEqual(proc.returncode, expected_errorcode,
1152n/a msg="unexpected result in wait from main thread")
1153n/a
1154n/a # This should be a no-op with no change in returncode.
1155n/a proc.wait()
1156n/a self.assertEqual(proc.returncode, expected_errorcode,
1157n/a msg="unexpected result in second main wait.")
1158n/a
1159n/a t.join()
1160n/a # Ensure that all of the thread results are as expected.
1161n/a # When a race condition occurs in wait(), the returncode could
1162n/a # be set by the wrong thread that doesn't actually have it
1163n/a # leading to an incorrect value.
1164n/a self.assertEqual([('thread-start-poll-result', None),
1165n/a ('thread-after-kill-and-wait', expected_errorcode),
1166n/a ('thread-after-second-wait', expected_errorcode)],
1167n/a results)
1168n/a
1169n/a def test_issue8780(self):
1170n/a # Ensure that stdout is inherited from the parent
1171n/a # if stdout=PIPE is not used
1172n/a code = ';'.join((
1173n/a 'import subprocess, sys',
1174n/a 'retcode = subprocess.call('
1175n/a "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1176n/a 'assert retcode == 0'))
1177n/a output = subprocess.check_output([sys.executable, '-c', code])
1178n/a self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1179n/a
1180n/a def test_handles_closed_on_exception(self):
1181n/a # If CreateProcess exits with an error, ensure the
1182n/a # duplicate output handles are released
1183n/a ifhandle, ifname = tempfile.mkstemp()
1184n/a ofhandle, ofname = tempfile.mkstemp()
1185n/a efhandle, efname = tempfile.mkstemp()
1186n/a try:
1187n/a subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1188n/a stderr=efhandle)
1189n/a except OSError:
1190n/a os.close(ifhandle)
1191n/a os.remove(ifname)
1192n/a os.close(ofhandle)
1193n/a os.remove(ofname)
1194n/a os.close(efhandle)
1195n/a os.remove(efname)
1196n/a self.assertFalse(os.path.exists(ifname))
1197n/a self.assertFalse(os.path.exists(ofname))
1198n/a self.assertFalse(os.path.exists(efname))
1199n/a
1200n/a def test_communicate_epipe(self):
1201n/a # Issue 10963: communicate() should hide EPIPE
1202n/a p = subprocess.Popen([sys.executable, "-c", 'pass'],
1203n/a stdin=subprocess.PIPE,
1204n/a stdout=subprocess.PIPE,
1205n/a stderr=subprocess.PIPE)
1206n/a self.addCleanup(p.stdout.close)
1207n/a self.addCleanup(p.stderr.close)
1208n/a self.addCleanup(p.stdin.close)
1209n/a p.communicate(b"x" * 2**20)
1210n/a
1211n/a def test_communicate_epipe_only_stdin(self):
1212n/a # Issue 10963: communicate() should hide EPIPE
1213n/a p = subprocess.Popen([sys.executable, "-c", 'pass'],
1214n/a stdin=subprocess.PIPE)
1215n/a self.addCleanup(p.stdin.close)
1216n/a p.wait()
1217n/a p.communicate(b"x" * 2**20)
1218n/a
1219n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1220n/a "Requires signal.SIGUSR1")
1221n/a @unittest.skipUnless(hasattr(os, 'kill'),
1222n/a "Requires os.kill")
1223n/a @unittest.skipUnless(hasattr(os, 'getppid'),
1224n/a "Requires os.getppid")
1225n/a def test_communicate_eintr(self):
1226n/a # Issue #12493: communicate() should handle EINTR
1227n/a def handler(signum, frame):
1228n/a pass
1229n/a old_handler = signal.signal(signal.SIGUSR1, handler)
1230n/a self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1231n/a
1232n/a args = [sys.executable, "-c",
1233n/a 'import os, signal;'
1234n/a 'os.kill(os.getppid(), signal.SIGUSR1)']
1235n/a for stream in ('stdout', 'stderr'):
1236n/a kw = {stream: subprocess.PIPE}
1237n/a with subprocess.Popen(args, **kw) as process:
1238n/a # communicate() will be interrupted by SIGUSR1
1239n/a process.communicate()
1240n/a
1241n/a
1242n/a # This test is Linux-ish specific for simplicity to at least have
1243n/a # some coverage. It is not a platform specific bug.
1244n/a @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1245n/a "Linux specific")
1246n/a def test_failed_child_execute_fd_leak(self):
1247n/a """Test for the fork() failure fd leak reported in issue16327."""
1248n/a fd_directory = '/proc/%d/fd' % os.getpid()
1249n/a fds_before_popen = os.listdir(fd_directory)
1250n/a with self.assertRaises(PopenTestException):
1251n/a PopenExecuteChildRaises(
1252n/a [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
1253n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1254n/a
1255n/a # NOTE: This test doesn't verify that the real _execute_child
1256n/a # does not close the file descriptors itself on the way out
1257n/a # during an exception. Code inspection has confirmed that.
1258n/a
1259n/a fds_after_exception = os.listdir(fd_directory)
1260n/a self.assertEqual(fds_before_popen, fds_after_exception)
1261n/a
1262n/a
1263n/aclass RunFuncTestCase(BaseTestCase):
1264n/a def run_python(self, code, **kwargs):
1265n/a """Run Python code in a subprocess using subprocess.run"""
1266n/a argv = [sys.executable, "-c", code]
1267n/a return subprocess.run(argv, **kwargs)
1268n/a
1269n/a def test_returncode(self):
1270n/a # call() function with sequence argument
1271n/a cp = self.run_python("import sys; sys.exit(47)")
1272n/a self.assertEqual(cp.returncode, 47)
1273n/a with self.assertRaises(subprocess.CalledProcessError):
1274n/a cp.check_returncode()
1275n/a
1276n/a def test_check(self):
1277n/a with self.assertRaises(subprocess.CalledProcessError) as c:
1278n/a self.run_python("import sys; sys.exit(47)", check=True)
1279n/a self.assertEqual(c.exception.returncode, 47)
1280n/a
1281n/a def test_check_zero(self):
1282n/a # check_returncode shouldn't raise when returncode is zero
1283n/a cp = self.run_python("import sys; sys.exit(0)", check=True)
1284n/a self.assertEqual(cp.returncode, 0)
1285n/a
1286n/a def test_timeout(self):
1287n/a # run() function with timeout argument; we want to test that the child
1288n/a # process gets killed when the timeout expires. If the child isn't
1289n/a # killed, this call will deadlock since subprocess.run waits for the
1290n/a # child.
1291n/a with self.assertRaises(subprocess.TimeoutExpired):
1292n/a self.run_python("while True: pass", timeout=0.0001)
1293n/a
1294n/a def test_capture_stdout(self):
1295n/a # capture stdout with zero return code
1296n/a cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1297n/a self.assertIn(b'BDFL', cp.stdout)
1298n/a
1299n/a def test_capture_stderr(self):
1300n/a cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1301n/a stderr=subprocess.PIPE)
1302n/a self.assertIn(b'BDFL', cp.stderr)
1303n/a
1304n/a def test_check_output_stdin_arg(self):
1305n/a # run() can be called with stdin set to a file
1306n/a tf = tempfile.TemporaryFile()
1307n/a self.addCleanup(tf.close)
1308n/a tf.write(b'pear')
1309n/a tf.seek(0)
1310n/a cp = self.run_python(
1311n/a "import sys; sys.stdout.write(sys.stdin.read().upper())",
1312n/a stdin=tf, stdout=subprocess.PIPE)
1313n/a self.assertIn(b'PEAR', cp.stdout)
1314n/a
1315n/a def test_check_output_input_arg(self):
1316n/a # check_output() can be called with input set to a string
1317n/a cp = self.run_python(
1318n/a "import sys; sys.stdout.write(sys.stdin.read().upper())",
1319n/a input=b'pear', stdout=subprocess.PIPE)
1320n/a self.assertIn(b'PEAR', cp.stdout)
1321n/a
1322n/a def test_check_output_stdin_with_input_arg(self):
1323n/a # run() refuses to accept 'stdin' with 'input'
1324n/a tf = tempfile.TemporaryFile()
1325n/a self.addCleanup(tf.close)
1326n/a tf.write(b'pear')
1327n/a tf.seek(0)
1328n/a with self.assertRaises(ValueError,
1329n/a msg="Expected ValueError when stdin and input args supplied.") as c:
1330n/a output = self.run_python("print('will not be run')",
1331n/a stdin=tf, input=b'hare')
1332n/a self.assertIn('stdin', c.exception.args[0])
1333n/a self.assertIn('input', c.exception.args[0])
1334n/a
1335n/a def test_check_output_timeout(self):
1336n/a with self.assertRaises(subprocess.TimeoutExpired) as c:
1337n/a cp = self.run_python((
1338n/a "import sys, time\n"
1339n/a "sys.stdout.write('BDFL')\n"
1340n/a "sys.stdout.flush()\n"
1341n/a "time.sleep(3600)"),
1342n/a # Some heavily loaded buildbots (sparc Debian 3.x) require
1343n/a # this much time to start and print.
1344n/a timeout=3, stdout=subprocess.PIPE)
1345n/a self.assertEqual(c.exception.output, b'BDFL')
1346n/a # output is aliased to stdout
1347n/a self.assertEqual(c.exception.stdout, b'BDFL')
1348n/a
1349n/a def test_run_kwargs(self):
1350n/a newenv = os.environ.copy()
1351n/a newenv["FRUIT"] = "banana"
1352n/a cp = self.run_python(('import sys, os;'
1353n/a 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1354n/a env=newenv)
1355n/a self.assertEqual(cp.returncode, 33)
1356n/a
1357n/a
1358n/a@unittest.skipIf(mswindows, "POSIX specific tests")
1359n/aclass POSIXProcessTestCase(BaseTestCase):
1360n/a
1361n/a def setUp(self):
1362n/a super().setUp()
1363n/a self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1364n/a
1365n/a def _get_chdir_exception(self):
1366n/a try:
1367n/a os.chdir(self._nonexistent_dir)
1368n/a except OSError as e:
1369n/a # This avoids hard coding the errno value or the OS perror()
1370n/a # string and instead capture the exception that we want to see
1371n/a # below for comparison.
1372n/a desired_exception = e
1373n/a desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
1374n/a else:
1375n/a self.fail("chdir to nonexistent directory %s succeeded." %
1376n/a self._nonexistent_dir)
1377n/a return desired_exception
1378n/a
1379n/a def test_exception_cwd(self):
1380n/a """Test error in the child raised in the parent for a bad cwd."""
1381n/a desired_exception = self._get_chdir_exception()
1382n/a try:
1383n/a p = subprocess.Popen([sys.executable, "-c", ""],
1384n/a cwd=self._nonexistent_dir)
1385n/a except OSError as e:
1386n/a # Test that the child process chdir failure actually makes
1387n/a # it up to the parent process as the correct exception.
1388n/a self.assertEqual(desired_exception.errno, e.errno)
1389n/a self.assertEqual(desired_exception.strerror, e.strerror)
1390n/a else:
1391n/a self.fail("Expected OSError: %s" % desired_exception)
1392n/a
1393n/a def test_exception_bad_executable(self):
1394n/a """Test error in the child raised in the parent for a bad executable."""
1395n/a desired_exception = self._get_chdir_exception()
1396n/a try:
1397n/a p = subprocess.Popen([sys.executable, "-c", ""],
1398n/a executable=self._nonexistent_dir)
1399n/a except OSError as e:
1400n/a # Test that the child process exec failure actually makes
1401n/a # it up to the parent process as the correct exception.
1402n/a self.assertEqual(desired_exception.errno, e.errno)
1403n/a self.assertEqual(desired_exception.strerror, e.strerror)
1404n/a else:
1405n/a self.fail("Expected OSError: %s" % desired_exception)
1406n/a
1407n/a def test_exception_bad_args_0(self):
1408n/a """Test error in the child raised in the parent for a bad args[0]."""
1409n/a desired_exception = self._get_chdir_exception()
1410n/a try:
1411n/a p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1412n/a except OSError as e:
1413n/a # Test that the child process exec failure actually makes
1414n/a # it up to the parent process as the correct exception.
1415n/a self.assertEqual(desired_exception.errno, e.errno)
1416n/a self.assertEqual(desired_exception.strerror, e.strerror)
1417n/a else:
1418n/a self.fail("Expected OSError: %s" % desired_exception)
1419n/a
1420n/a def test_restore_signals(self):
1421n/a # Code coverage for both values of restore_signals to make sure it
1422n/a # at least does not blow up.
1423n/a # A test for behavior would be complex. Contributions welcome.
1424n/a subprocess.call([sys.executable, "-c", ""], restore_signals=True)
1425n/a subprocess.call([sys.executable, "-c", ""], restore_signals=False)
1426n/a
1427n/a def test_start_new_session(self):
1428n/a # For code coverage of calling setsid(). We don't care if we get an
1429n/a # EPERM error from it depending on the test execution environment, that
1430n/a # still indicates that it was called.
1431n/a try:
1432n/a output = subprocess.check_output(
1433n/a [sys.executable, "-c",
1434n/a "import os; print(os.getpgid(os.getpid()))"],
1435n/a start_new_session=True)
1436n/a except OSError as e:
1437n/a if e.errno != errno.EPERM:
1438n/a raise
1439n/a else:
1440n/a parent_pgid = os.getpgid(os.getpid())
1441n/a child_pgid = int(output)
1442n/a self.assertNotEqual(parent_pgid, child_pgid)
1443n/a
1444n/a def test_run_abort(self):
1445n/a # returncode handles signal termination
1446n/a with support.SuppressCrashReport():
1447n/a p = subprocess.Popen([sys.executable, "-c",
1448n/a 'import os; os.abort()'])
1449n/a p.wait()
1450n/a self.assertEqual(-p.returncode, signal.SIGABRT)
1451n/a
1452n/a def test_CalledProcessError_str_signal(self):
1453n/a err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
1454n/a error_string = str(err)
1455n/a # We're relying on the repr() of the signal.Signals intenum to provide
1456n/a # the word signal, the signal name and the numeric value.
1457n/a self.assertIn("signal", error_string.lower())
1458n/a # We're not being specific about the signal name as some signals have
1459n/a # multiple names and which name is revealed can vary.
1460n/a self.assertIn("SIG", error_string)
1461n/a self.assertIn(str(signal.SIGABRT), error_string)
1462n/a
1463n/a def test_CalledProcessError_str_unknown_signal(self):
1464n/a err = subprocess.CalledProcessError(-9876543, "fake cmd")
1465n/a error_string = str(err)
1466n/a self.assertIn("unknown signal 9876543.", error_string)
1467n/a
1468n/a def test_CalledProcessError_str_non_zero(self):
1469n/a err = subprocess.CalledProcessError(2, "fake cmd")
1470n/a error_string = str(err)
1471n/a self.assertIn("non-zero exit status 2.", error_string)
1472n/a
1473n/a def test_preexec(self):
1474n/a # DISCLAIMER: Setting environment variables is *not* a good use
1475n/a # of a preexec_fn. This is merely a test.
1476n/a p = subprocess.Popen([sys.executable, "-c",
1477n/a 'import sys,os;'
1478n/a 'sys.stdout.write(os.getenv("FRUIT"))'],
1479n/a stdout=subprocess.PIPE,
1480n/a preexec_fn=lambda: os.putenv("FRUIT", "apple"))
1481n/a with p:
1482n/a self.assertEqual(p.stdout.read(), b"apple")
1483n/a
1484n/a def test_preexec_exception(self):
1485n/a def raise_it():
1486n/a raise ValueError("What if two swallows carried a coconut?")
1487n/a try:
1488n/a p = subprocess.Popen([sys.executable, "-c", ""],
1489n/a preexec_fn=raise_it)
1490n/a except subprocess.SubprocessError as e:
1491n/a self.assertTrue(
1492n/a subprocess._posixsubprocess,
1493n/a "Expected a ValueError from the preexec_fn")
1494n/a except ValueError as e:
1495n/a self.assertIn("coconut", e.args[0])
1496n/a else:
1497n/a self.fail("Exception raised by preexec_fn did not make it "
1498n/a "to the parent process.")
1499n/a
1500n/a class _TestExecuteChildPopen(subprocess.Popen):
1501n/a """Used to test behavior at the end of _execute_child."""
1502n/a def __init__(self, testcase, *args, **kwargs):
1503n/a self._testcase = testcase
1504n/a subprocess.Popen.__init__(self, *args, **kwargs)
1505n/a
1506n/a def _execute_child(self, *args, **kwargs):
1507n/a try:
1508n/a subprocess.Popen._execute_child(self, *args, **kwargs)
1509n/a finally:
1510n/a # Open a bunch of file descriptors and verify that
1511n/a # none of them are the same as the ones the Popen
1512n/a # instance is using for stdin/stdout/stderr.
1513n/a devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
1514n/a for _ in range(8)]
1515n/a try:
1516n/a for fd in devzero_fds:
1517n/a self._testcase.assertNotIn(
1518n/a fd, (self.stdin.fileno(), self.stdout.fileno(),
1519n/a self.stderr.fileno()),
1520n/a msg="At least one fd was closed early.")
1521n/a finally:
1522n/a for fd in devzero_fds:
1523n/a os.close(fd)
1524n/a
1525n/a @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
1526n/a def test_preexec_errpipe_does_not_double_close_pipes(self):
1527n/a """Issue16140: Don't double close pipes on preexec error."""
1528n/a
1529n/a def raise_it():
1530n/a raise subprocess.SubprocessError(
1531n/a "force the _execute_child() errpipe_data path.")
1532n/a
1533n/a with self.assertRaises(subprocess.SubprocessError):
1534n/a self._TestExecuteChildPopen(
1535n/a self, [sys.executable, "-c", "pass"],
1536n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1537n/a stderr=subprocess.PIPE, preexec_fn=raise_it)
1538n/a
1539n/a def test_preexec_gc_module_failure(self):
1540n/a # This tests the code that disables garbage collection if the child
1541n/a # process will execute any Python.
1542n/a def raise_runtime_error():
1543n/a raise RuntimeError("this shouldn't escape")
1544n/a enabled = gc.isenabled()
1545n/a orig_gc_disable = gc.disable
1546n/a orig_gc_isenabled = gc.isenabled
1547n/a try:
1548n/a gc.disable()
1549n/a self.assertFalse(gc.isenabled())
1550n/a subprocess.call([sys.executable, '-c', ''],
1551n/a preexec_fn=lambda: None)
1552n/a self.assertFalse(gc.isenabled(),
1553n/a "Popen enabled gc when it shouldn't.")
1554n/a
1555n/a gc.enable()
1556n/a self.assertTrue(gc.isenabled())
1557n/a subprocess.call([sys.executable, '-c', ''],
1558n/a preexec_fn=lambda: None)
1559n/a self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
1560n/a
1561n/a gc.disable = raise_runtime_error
1562n/a self.assertRaises(RuntimeError, subprocess.Popen,
1563n/a [sys.executable, '-c', ''],
1564n/a preexec_fn=lambda: None)
1565n/a
1566n/a del gc.isenabled # force an AttributeError
1567n/a self.assertRaises(AttributeError, subprocess.Popen,
1568n/a [sys.executable, '-c', ''],
1569n/a preexec_fn=lambda: None)
1570n/a finally:
1571n/a gc.disable = orig_gc_disable
1572n/a gc.isenabled = orig_gc_isenabled
1573n/a if not enabled:
1574n/a gc.disable()
1575n/a
1576n/a @unittest.skipIf(
1577n/a sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
1578n/a def test_preexec_fork_failure(self):
1579n/a # The internal code did not preserve the previous exception when
1580n/a # re-enabling garbage collection
1581n/a try:
1582n/a from resource import getrlimit, setrlimit, RLIMIT_NPROC
1583n/a except ImportError as err:
1584n/a self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD
1585n/a limits = getrlimit(RLIMIT_NPROC)
1586n/a [_, hard] = limits
1587n/a setrlimit(RLIMIT_NPROC, (0, hard))
1588n/a self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
1589n/a try:
1590n/a subprocess.call([sys.executable, '-c', ''],
1591n/a preexec_fn=lambda: None)
1592n/a except BlockingIOError:
1593n/a # Forking should raise EAGAIN, translated to BlockingIOError
1594n/a pass
1595n/a else:
1596n/a self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
1597n/a
1598n/a def test_args_string(self):
1599n/a # args is a string
1600n/a fd, fname = tempfile.mkstemp()
1601n/a # reopen in text mode
1602n/a with open(fd, "w", errors="surrogateescape") as fobj:
1603n/a fobj.write("#!%s\n" % support.unix_shell)
1604n/a fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1605n/a sys.executable)
1606n/a os.chmod(fname, 0o700)
1607n/a p = subprocess.Popen(fname)
1608n/a p.wait()
1609n/a os.remove(fname)
1610n/a self.assertEqual(p.returncode, 47)
1611n/a
1612n/a def test_invalid_args(self):
1613n/a # invalid arguments should raise ValueError
1614n/a self.assertRaises(ValueError, subprocess.call,
1615n/a [sys.executable, "-c",
1616n/a "import sys; sys.exit(47)"],
1617n/a startupinfo=47)
1618n/a self.assertRaises(ValueError, subprocess.call,
1619n/a [sys.executable, "-c",
1620n/a "import sys; sys.exit(47)"],
1621n/a creationflags=47)
1622n/a
1623n/a def test_shell_sequence(self):
1624n/a # Run command through the shell (sequence)
1625n/a newenv = os.environ.copy()
1626n/a newenv["FRUIT"] = "apple"
1627n/a p = subprocess.Popen(["echo $FRUIT"], shell=1,
1628n/a stdout=subprocess.PIPE,
1629n/a env=newenv)
1630n/a with p:
1631n/a self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1632n/a
1633n/a def test_shell_string(self):
1634n/a # Run command through the shell (string)
1635n/a newenv = os.environ.copy()
1636n/a newenv["FRUIT"] = "apple"
1637n/a p = subprocess.Popen("echo $FRUIT", shell=1,
1638n/a stdout=subprocess.PIPE,
1639n/a env=newenv)
1640n/a with p:
1641n/a self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1642n/a
1643n/a def test_call_string(self):
1644n/a # call() function with string argument on UNIX
1645n/a fd, fname = tempfile.mkstemp()
1646n/a # reopen in text mode
1647n/a with open(fd, "w", errors="surrogateescape") as fobj:
1648n/a fobj.write("#!%s\n" % support.unix_shell)
1649n/a fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1650n/a sys.executable)
1651n/a os.chmod(fname, 0o700)
1652n/a rc = subprocess.call(fname)
1653n/a os.remove(fname)
1654n/a self.assertEqual(rc, 47)
1655n/a
1656n/a def test_specific_shell(self):
1657n/a # Issue #9265: Incorrect name passed as arg[0].
1658n/a shells = []
1659n/a for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
1660n/a for name in ['bash', 'ksh']:
1661n/a sh = os.path.join(prefix, name)
1662n/a if os.path.isfile(sh):
1663n/a shells.append(sh)
1664n/a if not shells: # Will probably work for any shell but csh.
1665n/a self.skipTest("bash or ksh required for this test")
1666n/a sh = '/bin/sh'
1667n/a if os.path.isfile(sh) and not os.path.islink(sh):
1668n/a # Test will fail if /bin/sh is a symlink to csh.
1669n/a shells.append(sh)
1670n/a for sh in shells:
1671n/a p = subprocess.Popen("echo $0", executable=sh, shell=True,
1672n/a stdout=subprocess.PIPE)
1673n/a with p:
1674n/a self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
1675n/a
1676n/a def _kill_process(self, method, *args):
1677n/a # Do not inherit file handles from the parent.
1678n/a # It should fix failures on some platforms.
1679n/a # Also set the SIGINT handler to the default to make sure it's not
1680n/a # being ignored (some tests rely on that.)
1681n/a old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
1682n/a try:
1683n/a p = subprocess.Popen([sys.executable, "-c", """if 1:
1684n/a import sys, time
1685n/a sys.stdout.write('x\\n')
1686n/a sys.stdout.flush()
1687n/a time.sleep(30)
1688n/a """],
1689n/a close_fds=True,
1690n/a stdin=subprocess.PIPE,
1691n/a stdout=subprocess.PIPE,
1692n/a stderr=subprocess.PIPE)
1693n/a finally:
1694n/a signal.signal(signal.SIGINT, old_handler)
1695n/a # Wait for the interpreter to be completely initialized before
1696n/a # sending any signal.
1697n/a p.stdout.read(1)
1698n/a getattr(p, method)(*args)
1699n/a return p
1700n/a
1701n/a @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
1702n/a "Due to known OS bug (issue #16762)")
1703n/a def _kill_dead_process(self, method, *args):
1704n/a # Do not inherit file handles from the parent.
1705n/a # It should fix failures on some platforms.
1706n/a p = subprocess.Popen([sys.executable, "-c", """if 1:
1707n/a import sys, time
1708n/a sys.stdout.write('x\\n')
1709n/a sys.stdout.flush()
1710n/a """],
1711n/a close_fds=True,
1712n/a stdin=subprocess.PIPE,
1713n/a stdout=subprocess.PIPE,
1714n/a stderr=subprocess.PIPE)
1715n/a # Wait for the interpreter to be completely initialized before
1716n/a # sending any signal.
1717n/a p.stdout.read(1)
1718n/a # The process should end after this
1719n/a time.sleep(1)
1720n/a # This shouldn't raise even though the child is now dead
1721n/a getattr(p, method)(*args)
1722n/a p.communicate()
1723n/a
1724n/a def test_send_signal(self):
1725n/a p = self._kill_process('send_signal', signal.SIGINT)
1726n/a _, stderr = p.communicate()
1727n/a self.assertIn(b'KeyboardInterrupt', stderr)
1728n/a self.assertNotEqual(p.wait(), 0)
1729n/a
1730n/a def test_kill(self):
1731n/a p = self._kill_process('kill')
1732n/a _, stderr = p.communicate()
1733n/a self.assertStderrEqual(stderr, b'')
1734n/a self.assertEqual(p.wait(), -signal.SIGKILL)
1735n/a
1736n/a def test_terminate(self):
1737n/a p = self._kill_process('terminate')
1738n/a _, stderr = p.communicate()
1739n/a self.assertStderrEqual(stderr, b'')
1740n/a self.assertEqual(p.wait(), -signal.SIGTERM)
1741n/a
1742n/a def test_send_signal_dead(self):
1743n/a # Sending a signal to a dead process
1744n/a self._kill_dead_process('send_signal', signal.SIGINT)
1745n/a
1746n/a def test_kill_dead(self):
1747n/a # Killing a dead process
1748n/a self._kill_dead_process('kill')
1749n/a
1750n/a def test_terminate_dead(self):
1751n/a # Terminating a dead process
1752n/a self._kill_dead_process('terminate')
1753n/a
1754n/a def _save_fds(self, save_fds):
1755n/a fds = []
1756n/a for fd in save_fds:
1757n/a inheritable = os.get_inheritable(fd)
1758n/a saved = os.dup(fd)
1759n/a fds.append((fd, saved, inheritable))
1760n/a return fds
1761n/a
1762n/a def _restore_fds(self, fds):
1763n/a for fd, saved, inheritable in fds:
1764n/a os.dup2(saved, fd, inheritable=inheritable)
1765n/a os.close(saved)
1766n/a
1767n/a def check_close_std_fds(self, fds):
1768n/a # Issue #9905: test that subprocess pipes still work properly with
1769n/a # some standard fds closed
1770n/a stdin = 0
1771n/a saved_fds = self._save_fds(fds)
1772n/a for fd, saved, inheritable in saved_fds:
1773n/a if fd == 0:
1774n/a stdin = saved
1775n/a break
1776n/a try:
1777n/a for fd in fds:
1778n/a os.close(fd)
1779n/a out, err = subprocess.Popen([sys.executable, "-c",
1780n/a 'import sys;'
1781n/a 'sys.stdout.write("apple");'
1782n/a 'sys.stdout.flush();'
1783n/a 'sys.stderr.write("orange")'],
1784n/a stdin=stdin,
1785n/a stdout=subprocess.PIPE,
1786n/a stderr=subprocess.PIPE).communicate()
1787n/a err = support.strip_python_stderr(err)
1788n/a self.assertEqual((out, err), (b'apple', b'orange'))
1789n/a finally:
1790n/a self._restore_fds(saved_fds)
1791n/a
1792n/a def test_close_fd_0(self):
1793n/a self.check_close_std_fds([0])
1794n/a
1795n/a def test_close_fd_1(self):
1796n/a self.check_close_std_fds([1])
1797n/a
1798n/a def test_close_fd_2(self):
1799n/a self.check_close_std_fds([2])
1800n/a
1801n/a def test_close_fds_0_1(self):
1802n/a self.check_close_std_fds([0, 1])
1803n/a
1804n/a def test_close_fds_0_2(self):
1805n/a self.check_close_std_fds([0, 2])
1806n/a
1807n/a def test_close_fds_1_2(self):
1808n/a self.check_close_std_fds([1, 2])
1809n/a
1810n/a def test_close_fds_0_1_2(self):
1811n/a # Issue #10806: test that subprocess pipes still work properly with
1812n/a # all standard fds closed.
1813n/a self.check_close_std_fds([0, 1, 2])
1814n/a
1815n/a def test_small_errpipe_write_fd(self):
1816n/a """Issue #15798: Popen should work when stdio fds are available."""
1817n/a new_stdin = os.dup(0)
1818n/a new_stdout = os.dup(1)
1819n/a try:
1820n/a os.close(0)
1821n/a os.close(1)
1822n/a
1823n/a # Side test: if errpipe_write fails to have its CLOEXEC
1824n/a # flag set this should cause the parent to think the exec
1825n/a # failed. Extremely unlikely: everyone supports CLOEXEC.
1826n/a subprocess.Popen([
1827n/a sys.executable, "-c",
1828n/a "print('AssertionError:0:CLOEXEC failure.')"]).wait()
1829n/a finally:
1830n/a # Restore original stdin and stdout
1831n/a os.dup2(new_stdin, 0)
1832n/a os.dup2(new_stdout, 1)
1833n/a os.close(new_stdin)
1834n/a os.close(new_stdout)
1835n/a
1836n/a def test_remapping_std_fds(self):
1837n/a # open up some temporary files
1838n/a temps = [tempfile.mkstemp() for i in range(3)]
1839n/a try:
1840n/a temp_fds = [fd for fd, fname in temps]
1841n/a
1842n/a # unlink the files -- we won't need to reopen them
1843n/a for fd, fname in temps:
1844n/a os.unlink(fname)
1845n/a
1846n/a # write some data to what will become stdin, and rewind
1847n/a os.write(temp_fds[1], b"STDIN")
1848n/a os.lseek(temp_fds[1], 0, 0)
1849n/a
1850n/a # move the standard file descriptors out of the way
1851n/a saved_fds = self._save_fds(range(3))
1852n/a try:
1853n/a # duplicate the file objects over the standard fd's
1854n/a for fd, temp_fd in enumerate(temp_fds):
1855n/a os.dup2(temp_fd, fd)
1856n/a
1857n/a # now use those files in the "wrong" order, so that subprocess
1858n/a # has to rearrange them in the child
1859n/a p = subprocess.Popen([sys.executable, "-c",
1860n/a 'import sys; got = sys.stdin.read();'
1861n/a 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1862n/a stdin=temp_fds[1],
1863n/a stdout=temp_fds[2],
1864n/a stderr=temp_fds[0])
1865n/a p.wait()
1866n/a finally:
1867n/a self._restore_fds(saved_fds)
1868n/a
1869n/a for fd in temp_fds:
1870n/a os.lseek(fd, 0, 0)
1871n/a
1872n/a out = os.read(temp_fds[2], 1024)
1873n/a err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
1874n/a self.assertEqual(out, b"got STDIN")
1875n/a self.assertEqual(err, b"err")
1876n/a
1877n/a finally:
1878n/a for fd in temp_fds:
1879n/a os.close(fd)
1880n/a
1881n/a def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1882n/a # open up some temporary files
1883n/a temps = [tempfile.mkstemp() for i in range(3)]
1884n/a temp_fds = [fd for fd, fname in temps]
1885n/a try:
1886n/a # unlink the files -- we won't need to reopen them
1887n/a for fd, fname in temps:
1888n/a os.unlink(fname)
1889n/a
1890n/a # save a copy of the standard file descriptors
1891n/a saved_fds = self._save_fds(range(3))
1892n/a try:
1893n/a # duplicate the temp files over the standard fd's 0, 1, 2
1894n/a for fd, temp_fd in enumerate(temp_fds):
1895n/a os.dup2(temp_fd, fd)
1896n/a
1897n/a # write some data to what will become stdin, and rewind
1898n/a os.write(stdin_no, b"STDIN")
1899n/a os.lseek(stdin_no, 0, 0)
1900n/a
1901n/a # now use those files in the given order, so that subprocess
1902n/a # has to rearrange them in the child
1903n/a p = subprocess.Popen([sys.executable, "-c",
1904n/a 'import sys; got = sys.stdin.read();'
1905n/a 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1906n/a stdin=stdin_no,
1907n/a stdout=stdout_no,
1908n/a stderr=stderr_no)
1909n/a p.wait()
1910n/a
1911n/a for fd in temp_fds:
1912n/a os.lseek(fd, 0, 0)
1913n/a
1914n/a out = os.read(stdout_no, 1024)
1915n/a err = support.strip_python_stderr(os.read(stderr_no, 1024))
1916n/a finally:
1917n/a self._restore_fds(saved_fds)
1918n/a
1919n/a self.assertEqual(out, b"got STDIN")
1920n/a self.assertEqual(err, b"err")
1921n/a
1922n/a finally:
1923n/a for fd in temp_fds:
1924n/a os.close(fd)
1925n/a
1926n/a # When duping fds, if there arises a situation where one of the fds is
1927n/a # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1928n/a # This tests all combinations of this.
1929n/a def test_swap_fds(self):
1930n/a self.check_swap_fds(0, 1, 2)
1931n/a self.check_swap_fds(0, 2, 1)
1932n/a self.check_swap_fds(1, 0, 2)
1933n/a self.check_swap_fds(1, 2, 0)
1934n/a self.check_swap_fds(2, 0, 1)
1935n/a self.check_swap_fds(2, 1, 0)
1936n/a
1937n/a def test_surrogates_error_message(self):
1938n/a def prepare():
1939n/a raise ValueError("surrogate:\uDCff")
1940n/a
1941n/a try:
1942n/a subprocess.call(
1943n/a [sys.executable, "-c", "pass"],
1944n/a preexec_fn=prepare)
1945n/a except ValueError as err:
1946n/a # Pure Python implementations keeps the message
1947n/a self.assertIsNone(subprocess._posixsubprocess)
1948n/a self.assertEqual(str(err), "surrogate:\uDCff")
1949n/a except subprocess.SubprocessError as err:
1950n/a # _posixsubprocess uses a default message
1951n/a self.assertIsNotNone(subprocess._posixsubprocess)
1952n/a self.assertEqual(str(err), "Exception occurred in preexec_fn.")
1953n/a else:
1954n/a self.fail("Expected ValueError or subprocess.SubprocessError")
1955n/a
1956n/a def test_undecodable_env(self):
1957n/a for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
1958n/a encoded_value = value.encode("ascii", "surrogateescape")
1959n/a
1960n/a # test str with surrogates
1961n/a script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
1962n/a env = os.environ.copy()
1963n/a env[key] = value
1964n/a # Use C locale to get ASCII for the locale encoding to force
1965n/a # surrogate-escaping of \xFF in the child process; otherwise it can
1966n/a # be decoded as-is if the default locale is latin-1.
1967n/a env['LC_ALL'] = 'C'
1968n/a if sys.platform.startswith("aix"):
1969n/a # On AIX, the C locale uses the Latin1 encoding
1970n/a decoded_value = encoded_value.decode("latin1", "surrogateescape")
1971n/a else:
1972n/a # On other UNIXes, the C locale uses the ASCII encoding
1973n/a decoded_value = value
1974n/a stdout = subprocess.check_output(
1975n/a [sys.executable, "-c", script],
1976n/a env=env)
1977n/a stdout = stdout.rstrip(b'\n\r')
1978n/a self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
1979n/a
1980n/a # test bytes
1981n/a key = key.encode("ascii", "surrogateescape")
1982n/a script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
1983n/a env = os.environ.copy()
1984n/a env[key] = encoded_value
1985n/a stdout = subprocess.check_output(
1986n/a [sys.executable, "-c", script],
1987n/a env=env)
1988n/a stdout = stdout.rstrip(b'\n\r')
1989n/a self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
1990n/a
1991n/a def test_bytes_program(self):
1992n/a abs_program = os.fsencode(sys.executable)
1993n/a path, program = os.path.split(sys.executable)
1994n/a program = os.fsencode(program)
1995n/a
1996n/a # absolute bytes path
1997n/a exitcode = subprocess.call([abs_program, "-c", "pass"])
1998n/a self.assertEqual(exitcode, 0)
1999n/a
2000n/a # absolute bytes path as a string
2001n/a cmd = b"'" + abs_program + b"' -c pass"
2002n/a exitcode = subprocess.call(cmd, shell=True)
2003n/a self.assertEqual(exitcode, 0)
2004n/a
2005n/a # bytes program, unicode PATH
2006n/a env = os.environ.copy()
2007n/a env["PATH"] = path
2008n/a exitcode = subprocess.call([program, "-c", "pass"], env=env)
2009n/a self.assertEqual(exitcode, 0)
2010n/a
2011n/a # bytes program, bytes PATH
2012n/a envb = os.environb.copy()
2013n/a envb[b"PATH"] = os.fsencode(path)
2014n/a exitcode = subprocess.call([program, "-c", "pass"], env=envb)
2015n/a self.assertEqual(exitcode, 0)
2016n/a
2017n/a def test_pipe_cloexec(self):
2018n/a sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2019n/a fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2020n/a
2021n/a p1 = subprocess.Popen([sys.executable, sleeper],
2022n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2023n/a stderr=subprocess.PIPE, close_fds=False)
2024n/a
2025n/a self.addCleanup(p1.communicate, b'')
2026n/a
2027n/a p2 = subprocess.Popen([sys.executable, fd_status],
2028n/a stdout=subprocess.PIPE, close_fds=False)
2029n/a
2030n/a output, error = p2.communicate()
2031n/a result_fds = set(map(int, output.split(b',')))
2032n/a unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2033n/a p1.stderr.fileno()])
2034n/a
2035n/a self.assertFalse(result_fds & unwanted_fds,
2036n/a "Expected no fds from %r to be open in child, "
2037n/a "found %r" %
2038n/a (unwanted_fds, result_fds & unwanted_fds))
2039n/a
2040n/a def test_pipe_cloexec_real_tools(self):
2041n/a qcat = support.findfile("qcat.py", subdir="subprocessdata")
2042n/a qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2043n/a
2044n/a subdata = b'zxcvbn'
2045n/a data = subdata * 4 + b'\n'
2046n/a
2047n/a p1 = subprocess.Popen([sys.executable, qcat],
2048n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2049n/a close_fds=False)
2050n/a
2051n/a p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2052n/a stdin=p1.stdout, stdout=subprocess.PIPE,
2053n/a close_fds=False)
2054n/a
2055n/a self.addCleanup(p1.wait)
2056n/a self.addCleanup(p2.wait)
2057n/a def kill_p1():
2058n/a try:
2059n/a p1.terminate()
2060n/a except ProcessLookupError:
2061n/a pass
2062n/a def kill_p2():
2063n/a try:
2064n/a p2.terminate()
2065n/a except ProcessLookupError:
2066n/a pass
2067n/a self.addCleanup(kill_p1)
2068n/a self.addCleanup(kill_p2)
2069n/a
2070n/a p1.stdin.write(data)
2071n/a p1.stdin.close()
2072n/a
2073n/a readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2074n/a
2075n/a self.assertTrue(readfiles, "The child hung")
2076n/a self.assertEqual(p2.stdout.read(), data)
2077n/a
2078n/a p1.stdout.close()
2079n/a p2.stdout.close()
2080n/a
2081n/a def test_close_fds(self):
2082n/a fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2083n/a
2084n/a fds = os.pipe()
2085n/a self.addCleanup(os.close, fds[0])
2086n/a self.addCleanup(os.close, fds[1])
2087n/a
2088n/a open_fds = set(fds)
2089n/a # add a bunch more fds
2090n/a for _ in range(9):
2091n/a fd = os.open(os.devnull, os.O_RDONLY)
2092n/a self.addCleanup(os.close, fd)
2093n/a open_fds.add(fd)
2094n/a
2095n/a for fd in open_fds:
2096n/a os.set_inheritable(fd, True)
2097n/a
2098n/a p = subprocess.Popen([sys.executable, fd_status],
2099n/a stdout=subprocess.PIPE, close_fds=False)
2100n/a output, ignored = p.communicate()
2101n/a remaining_fds = set(map(int, output.split(b',')))
2102n/a
2103n/a self.assertEqual(remaining_fds & open_fds, open_fds,
2104n/a "Some fds were closed")
2105n/a
2106n/a p = subprocess.Popen([sys.executable, fd_status],
2107n/a stdout=subprocess.PIPE, close_fds=True)
2108n/a output, ignored = p.communicate()
2109n/a remaining_fds = set(map(int, output.split(b',')))
2110n/a
2111n/a self.assertFalse(remaining_fds & open_fds,
2112n/a "Some fds were left open")
2113n/a self.assertIn(1, remaining_fds, "Subprocess failed")
2114n/a
2115n/a # Keep some of the fd's we opened open in the subprocess.
2116n/a # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2117n/a fds_to_keep = set(open_fds.pop() for _ in range(8))
2118n/a p = subprocess.Popen([sys.executable, fd_status],
2119n/a stdout=subprocess.PIPE, close_fds=True,
2120n/a pass_fds=())
2121n/a output, ignored = p.communicate()
2122n/a remaining_fds = set(map(int, output.split(b',')))
2123n/a
2124n/a self.assertFalse(remaining_fds & fds_to_keep & open_fds,
2125n/a "Some fds not in pass_fds were left open")
2126n/a self.assertIn(1, remaining_fds, "Subprocess failed")
2127n/a
2128n/a
2129n/a @unittest.skipIf(sys.platform.startswith("freebsd") and
2130n/a os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2131n/a "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2132n/a def test_close_fds_when_max_fd_is_lowered(self):
2133n/a """Confirm that issue21618 is fixed (may fail under valgrind)."""
2134n/a fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2135n/a
2136n/a # This launches the meat of the test in a child process to
2137n/a # avoid messing with the larger unittest processes maximum
2138n/a # number of file descriptors.
2139n/a # This process launches:
2140n/a # +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2141n/a # a bunch of high open fds above the new lower rlimit.
2142n/a # Those are reported via stdout before launching a new
2143n/a # process with close_fds=False to run the actual test:
2144n/a # +--> The TEST: This one launches a fd_status.py
2145n/a # subprocess with close_fds=True so we can find out if
2146n/a # any of the fds above the lowered rlimit are still open.
2147n/a p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2148n/a '''
2149n/a import os, resource, subprocess, sys, textwrap
2150n/a open_fds = set()
2151n/a # Add a bunch more fds to pass down.
2152n/a for _ in range(40):
2153n/a fd = os.open(os.devnull, os.O_RDONLY)
2154n/a open_fds.add(fd)
2155n/a
2156n/a # Leave a two pairs of low ones available for use by the
2157n/a # internal child error pipe and the stdout pipe.
2158n/a # We also leave 10 more open as some Python buildbots run into
2159n/a # "too many open files" errors during the test if we do not.
2160n/a for fd in sorted(open_fds)[:14]:
2161n/a os.close(fd)
2162n/a open_fds.remove(fd)
2163n/a
2164n/a for fd in open_fds:
2165n/a #self.addCleanup(os.close, fd)
2166n/a os.set_inheritable(fd, True)
2167n/a
2168n/a max_fd_open = max(open_fds)
2169n/a
2170n/a # Communicate the open_fds to the parent unittest.TestCase process.
2171n/a print(','.join(map(str, sorted(open_fds))))
2172n/a sys.stdout.flush()
2173n/a
2174n/a rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2175n/a try:
2176n/a # 29 is lower than the highest fds we are leaving open.
2177n/a resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2178n/a # Launch a new Python interpreter with our low fd rlim_cur that
2179n/a # inherits open fds above that limit. It then uses subprocess
2180n/a # with close_fds=True to get a report of open fds in the child.
2181n/a # An explicit list of fds to check is passed to fd_status.py as
2182n/a # letting fd_status rely on its default logic would miss the
2183n/a # fds above rlim_cur as it normally only checks up to that limit.
2184n/a subprocess.Popen(
2185n/a [sys.executable, '-c',
2186n/a textwrap.dedent("""
2187n/a import subprocess, sys
2188n/a subprocess.Popen([sys.executable, %r] +
2189n/a [str(x) for x in range({max_fd})],
2190n/a close_fds=True).wait()
2191n/a """.format(max_fd=max_fd_open+1))],
2192n/a close_fds=False).wait()
2193n/a finally:
2194n/a resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2195n/a ''' % fd_status)], stdout=subprocess.PIPE)
2196n/a
2197n/a output, unused_stderr = p.communicate()
2198n/a output_lines = output.splitlines()
2199n/a self.assertEqual(len(output_lines), 2,
2200n/a msg="expected exactly two lines of output:\n%r" % output)
2201n/a opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2202n/a remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2203n/a
2204n/a self.assertFalse(remaining_fds & opened_fds,
2205n/a msg="Some fds were left open.")
2206n/a
2207n/a
2208n/a # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2209n/a # descriptor of a pipe closed in the parent process is valid in the
2210n/a # child process according to fstat(), but the mode of the file
2211n/a # descriptor is invalid, and read or write raise an error.
2212n/a @support.requires_mac_ver(10, 5)
2213n/a def test_pass_fds(self):
2214n/a fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2215n/a
2216n/a open_fds = set()
2217n/a
2218n/a for x in range(5):
2219n/a fds = os.pipe()
2220n/a self.addCleanup(os.close, fds[0])
2221n/a self.addCleanup(os.close, fds[1])
2222n/a os.set_inheritable(fds[0], True)
2223n/a os.set_inheritable(fds[1], True)
2224n/a open_fds.update(fds)
2225n/a
2226n/a for fd in open_fds:
2227n/a p = subprocess.Popen([sys.executable, fd_status],
2228n/a stdout=subprocess.PIPE, close_fds=True,
2229n/a pass_fds=(fd, ))
2230n/a output, ignored = p.communicate()
2231n/a
2232n/a remaining_fds = set(map(int, output.split(b',')))
2233n/a to_be_closed = open_fds - {fd}
2234n/a
2235n/a self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2236n/a self.assertFalse(remaining_fds & to_be_closed,
2237n/a "fd to be closed passed")
2238n/a
2239n/a # pass_fds overrides close_fds with a warning.
2240n/a with self.assertWarns(RuntimeWarning) as context:
2241n/a self.assertFalse(subprocess.call(
2242n/a [sys.executable, "-c", "import sys; sys.exit(0)"],
2243n/a close_fds=False, pass_fds=(fd, )))
2244n/a self.assertIn('overriding close_fds', str(context.warning))
2245n/a
2246n/a def test_pass_fds_inheritable(self):
2247n/a script = support.findfile("fd_status.py", subdir="subprocessdata")
2248n/a
2249n/a inheritable, non_inheritable = os.pipe()
2250n/a self.addCleanup(os.close, inheritable)
2251n/a self.addCleanup(os.close, non_inheritable)
2252n/a os.set_inheritable(inheritable, True)
2253n/a os.set_inheritable(non_inheritable, False)
2254n/a pass_fds = (inheritable, non_inheritable)
2255n/a args = [sys.executable, script]
2256n/a args += list(map(str, pass_fds))
2257n/a
2258n/a p = subprocess.Popen(args,
2259n/a stdout=subprocess.PIPE, close_fds=True,
2260n/a pass_fds=pass_fds)
2261n/a output, ignored = p.communicate()
2262n/a fds = set(map(int, output.split(b',')))
2263n/a
2264n/a # the inheritable file descriptor must be inherited, so its inheritable
2265n/a # flag must be set in the child process after fork() and before exec()
2266n/a self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2267n/a
2268n/a # inheritable flag must not be changed in the parent process
2269n/a self.assertEqual(os.get_inheritable(inheritable), True)
2270n/a self.assertEqual(os.get_inheritable(non_inheritable), False)
2271n/a
2272n/a def test_stdout_stdin_are_single_inout_fd(self):
2273n/a with io.open(os.devnull, "r+") as inout:
2274n/a p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2275n/a stdout=inout, stdin=inout)
2276n/a p.wait()
2277n/a
2278n/a def test_stdout_stderr_are_single_inout_fd(self):
2279n/a with io.open(os.devnull, "r+") as inout:
2280n/a p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2281n/a stdout=inout, stderr=inout)
2282n/a p.wait()
2283n/a
2284n/a def test_stderr_stdin_are_single_inout_fd(self):
2285n/a with io.open(os.devnull, "r+") as inout:
2286n/a p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2287n/a stderr=inout, stdin=inout)
2288n/a p.wait()
2289n/a
2290n/a def test_wait_when_sigchild_ignored(self):
2291n/a # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2292n/a sigchild_ignore = support.findfile("sigchild_ignore.py",
2293n/a subdir="subprocessdata")
2294n/a p = subprocess.Popen([sys.executable, sigchild_ignore],
2295n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2296n/a stdout, stderr = p.communicate()
2297n/a self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2298n/a " non-zero with this error:\n%s" %
2299n/a stderr.decode('utf-8'))
2300n/a
2301n/a def test_select_unbuffered(self):
2302n/a # Issue #11459: bufsize=0 should really set the pipes as
2303n/a # unbuffered (and therefore let select() work properly).
2304n/a select = support.import_module("select")
2305n/a p = subprocess.Popen([sys.executable, "-c",
2306n/a 'import sys;'
2307n/a 'sys.stdout.write("apple")'],
2308n/a stdout=subprocess.PIPE,
2309n/a bufsize=0)
2310n/a f = p.stdout
2311n/a self.addCleanup(f.close)
2312n/a try:
2313n/a self.assertEqual(f.read(4), b"appl")
2314n/a self.assertIn(f, select.select([f], [], [], 0.0)[0])
2315n/a finally:
2316n/a p.wait()
2317n/a
2318n/a def test_zombie_fast_process_del(self):
2319n/a # Issue #12650: on Unix, if Popen.__del__() was called before the
2320n/a # process exited, it wouldn't be added to subprocess._active, and would
2321n/a # remain a zombie.
2322n/a # spawn a Popen, and delete its reference before it exits
2323n/a p = subprocess.Popen([sys.executable, "-c",
2324n/a 'import sys, time;'
2325n/a 'time.sleep(0.2)'],
2326n/a stdout=subprocess.PIPE,
2327n/a stderr=subprocess.PIPE)
2328n/a self.addCleanup(p.stdout.close)
2329n/a self.addCleanup(p.stderr.close)
2330n/a ident = id(p)
2331n/a pid = p.pid
2332n/a with support.check_warnings(('', ResourceWarning)):
2333n/a p = None
2334n/a
2335n/a # check that p is in the active processes list
2336n/a self.assertIn(ident, [id(o) for o in subprocess._active])
2337n/a
2338n/a def test_leak_fast_process_del_killed(self):
2339n/a # Issue #12650: on Unix, if Popen.__del__() was called before the
2340n/a # process exited, and the process got killed by a signal, it would never
2341n/a # be removed from subprocess._active, which triggered a FD and memory
2342n/a # leak.
2343n/a # spawn a Popen, delete its reference and kill it
2344n/a p = subprocess.Popen([sys.executable, "-c",
2345n/a 'import time;'
2346n/a 'time.sleep(3)'],
2347n/a stdout=subprocess.PIPE,
2348n/a stderr=subprocess.PIPE)
2349n/a self.addCleanup(p.stdout.close)
2350n/a self.addCleanup(p.stderr.close)
2351n/a ident = id(p)
2352n/a pid = p.pid
2353n/a with support.check_warnings(('', ResourceWarning)):
2354n/a p = None
2355n/a
2356n/a os.kill(pid, signal.SIGKILL)
2357n/a # check that p is in the active processes list
2358n/a self.assertIn(ident, [id(o) for o in subprocess._active])
2359n/a
2360n/a # let some time for the process to exit, and create a new Popen: this
2361n/a # should trigger the wait() of p
2362n/a time.sleep(0.2)
2363n/a with self.assertRaises(OSError) as c:
2364n/a with subprocess.Popen(['nonexisting_i_hope'],
2365n/a stdout=subprocess.PIPE,
2366n/a stderr=subprocess.PIPE) as proc:
2367n/a pass
2368n/a # p should have been wait()ed on, and removed from the _active list
2369n/a self.assertRaises(OSError, os.waitpid, pid, 0)
2370n/a self.assertNotIn(ident, [id(o) for o in subprocess._active])
2371n/a
2372n/a def test_close_fds_after_preexec(self):
2373n/a fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2374n/a
2375n/a # this FD is used as dup2() target by preexec_fn, and should be closed
2376n/a # in the child process
2377n/a fd = os.dup(1)
2378n/a self.addCleanup(os.close, fd)
2379n/a
2380n/a p = subprocess.Popen([sys.executable, fd_status],
2381n/a stdout=subprocess.PIPE, close_fds=True,
2382n/a preexec_fn=lambda: os.dup2(1, fd))
2383n/a output, ignored = p.communicate()
2384n/a
2385n/a remaining_fds = set(map(int, output.split(b',')))
2386n/a
2387n/a self.assertNotIn(fd, remaining_fds)
2388n/a
2389n/a @support.cpython_only
2390n/a def test_fork_exec(self):
2391n/a # Issue #22290: fork_exec() must not crash on memory allocation failure
2392n/a # or other errors
2393n/a import _posixsubprocess
2394n/a gc_enabled = gc.isenabled()
2395n/a try:
2396n/a # Use a preexec function and enable the garbage collector
2397n/a # to force fork_exec() to re-enable the garbage collector
2398n/a # on error.
2399n/a func = lambda: None
2400n/a gc.enable()
2401n/a
2402n/a for args, exe_list, cwd, env_list in (
2403n/a (123, [b"exe"], None, [b"env"]),
2404n/a ([b"arg"], 123, None, [b"env"]),
2405n/a ([b"arg"], [b"exe"], 123, [b"env"]),
2406n/a ([b"arg"], [b"exe"], None, 123),
2407n/a ):
2408n/a with self.assertRaises(TypeError):
2409n/a _posixsubprocess.fork_exec(
2410n/a args, exe_list,
2411n/a True, [], cwd, env_list,
2412n/a -1, -1, -1, -1,
2413n/a 1, 2, 3, 4,
2414n/a True, True, func)
2415n/a finally:
2416n/a if not gc_enabled:
2417n/a gc.disable()
2418n/a
2419n/a @support.cpython_only
2420n/a def test_fork_exec_sorted_fd_sanity_check(self):
2421n/a # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
2422n/a import _posixsubprocess
2423n/a gc_enabled = gc.isenabled()
2424n/a try:
2425n/a gc.enable()
2426n/a
2427n/a for fds_to_keep in (
2428n/a (-1, 2, 3, 4, 5), # Negative number.
2429n/a ('str', 4), # Not an int.
2430n/a (18, 23, 42, 2**63), # Out of range.
2431n/a (5, 4), # Not sorted.
2432n/a (6, 7, 7, 8), # Duplicate.
2433n/a ):
2434n/a with self.assertRaises(
2435n/a ValueError,
2436n/a msg='fds_to_keep={}'.format(fds_to_keep)) as c:
2437n/a _posixsubprocess.fork_exec(
2438n/a [b"false"], [b"false"],
2439n/a True, fds_to_keep, None, [b"env"],
2440n/a -1, -1, -1, -1,
2441n/a 1, 2, 3, 4,
2442n/a True, True, None)
2443n/a self.assertIn('fds_to_keep', str(c.exception))
2444n/a finally:
2445n/a if not gc_enabled:
2446n/a gc.disable()
2447n/a
2448n/a def test_communicate_BrokenPipeError_stdin_close(self):
2449n/a # By not setting stdout or stderr or a timeout we force the fast path
2450n/a # that just calls _stdin_write() internally due to our mock.
2451n/a proc = subprocess.Popen([sys.executable, '-c', 'pass'])
2452n/a with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2453n/a mock_proc_stdin.close.side_effect = BrokenPipeError
2454n/a proc.communicate() # Should swallow BrokenPipeError from close.
2455n/a mock_proc_stdin.close.assert_called_with()
2456n/a
2457n/a def test_communicate_BrokenPipeError_stdin_write(self):
2458n/a # By not setting stdout or stderr or a timeout we force the fast path
2459n/a # that just calls _stdin_write() internally due to our mock.
2460n/a proc = subprocess.Popen([sys.executable, '-c', 'pass'])
2461n/a with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2462n/a mock_proc_stdin.write.side_effect = BrokenPipeError
2463n/a proc.communicate(b'stuff') # Should swallow the BrokenPipeError.
2464n/a mock_proc_stdin.write.assert_called_once_with(b'stuff')
2465n/a mock_proc_stdin.close.assert_called_once_with()
2466n/a
2467n/a def test_communicate_BrokenPipeError_stdin_flush(self):
2468n/a # Setting stdin and stdout forces the ._communicate() code path.
2469n/a # python -h exits faster than python -c pass (but spams stdout).
2470n/a proc = subprocess.Popen([sys.executable, '-h'],
2471n/a stdin=subprocess.PIPE,
2472n/a stdout=subprocess.PIPE)
2473n/a with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
2474n/a open(os.devnull, 'wb') as dev_null:
2475n/a mock_proc_stdin.flush.side_effect = BrokenPipeError
2476n/a # because _communicate registers a selector using proc.stdin...
2477n/a mock_proc_stdin.fileno.return_value = dev_null.fileno()
2478n/a # _communicate() should swallow BrokenPipeError from flush.
2479n/a proc.communicate(b'stuff')
2480n/a mock_proc_stdin.flush.assert_called_once_with()
2481n/a
2482n/a def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
2483n/a # Setting stdin and stdout forces the ._communicate() code path.
2484n/a # python -h exits faster than python -c pass (but spams stdout).
2485n/a proc = subprocess.Popen([sys.executable, '-h'],
2486n/a stdin=subprocess.PIPE,
2487n/a stdout=subprocess.PIPE)
2488n/a with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2489n/a mock_proc_stdin.close.side_effect = BrokenPipeError
2490n/a # _communicate() should swallow BrokenPipeError from close.
2491n/a proc.communicate(timeout=999)
2492n/a mock_proc_stdin.close.assert_called_once_with()
2493n/a
2494n/a _libc_file_extensions = {
2495n/a 'Linux': 'so.6',
2496n/a 'Darwin': 'dylib',
2497n/a }
2498n/a @unittest.skipIf(not ctypes, 'ctypes module required.')
2499n/a @unittest.skipIf(platform.uname()[0] not in _libc_file_extensions,
2500n/a 'Test requires a libc this code can load with ctypes.')
2501n/a @unittest.skipIf(not sys.executable, 'Test requires sys.executable.')
2502n/a def test_child_terminated_in_stopped_state(self):
2503n/a """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
2504n/a PTRACE_TRACEME = 0 # From glibc and MacOS (PT_TRACE_ME).
2505n/a libc_name = 'libc.' + self._libc_file_extensions[platform.uname()[0]]
2506n/a libc = ctypes.CDLL(libc_name)
2507n/a if not hasattr(libc, 'ptrace'):
2508n/a raise unittest.SkipTest('ptrace() required.')
2509n/a test_ptrace = subprocess.Popen(
2510n/a [sys.executable, '-c', """if True:
2511n/a import ctypes
2512n/a libc = ctypes.CDLL({libc_name!r})
2513n/a libc.ptrace({PTRACE_TRACEME}, 0, 0)
2514n/a """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
2515n/a ])
2516n/a if test_ptrace.wait() != 0:
2517n/a raise unittest.SkipTest('ptrace() failed - unable to test.')
2518n/a child = subprocess.Popen(
2519n/a [sys.executable, '-c', """if True:
2520n/a import ctypes
2521n/a libc = ctypes.CDLL({libc_name!r})
2522n/a libc.ptrace({PTRACE_TRACEME}, 0, 0)
2523n/a libc.printf(ctypes.c_char_p(0xdeadbeef)) # Crash the process.
2524n/a """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
2525n/a ])
2526n/a try:
2527n/a returncode = child.wait()
2528n/a except Exception as e:
2529n/a child.kill() # Clean up the hung stopped process.
2530n/a raise e
2531n/a self.assertNotEqual(0, returncode)
2532n/a self.assertLess(returncode, 0) # signal death, likely SIGSEGV.
2533n/a
2534n/a
2535n/a@unittest.skipUnless(mswindows, "Windows specific tests")
2536n/aclass Win32ProcessTestCase(BaseTestCase):
2537n/a
2538n/a def test_startupinfo(self):
2539n/a # startupinfo argument
2540n/a # We uses hardcoded constants, because we do not want to
2541n/a # depend on win32all.
2542n/a STARTF_USESHOWWINDOW = 1
2543n/a SW_MAXIMIZE = 3
2544n/a startupinfo = subprocess.STARTUPINFO()
2545n/a startupinfo.dwFlags = STARTF_USESHOWWINDOW
2546n/a startupinfo.wShowWindow = SW_MAXIMIZE
2547n/a # Since Python is a console process, it won't be affected
2548n/a # by wShowWindow, but the argument should be silently
2549n/a # ignored
2550n/a subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
2551n/a startupinfo=startupinfo)
2552n/a
2553n/a def test_creationflags(self):
2554n/a # creationflags argument
2555n/a CREATE_NEW_CONSOLE = 16
2556n/a sys.stderr.write(" a DOS box should flash briefly ...\n")
2557n/a subprocess.call(sys.executable +
2558n/a ' -c "import time; time.sleep(0.25)"',
2559n/a creationflags=CREATE_NEW_CONSOLE)
2560n/a
2561n/a def test_invalid_args(self):
2562n/a # invalid arguments should raise ValueError
2563n/a self.assertRaises(ValueError, subprocess.call,
2564n/a [sys.executable, "-c",
2565n/a "import sys; sys.exit(47)"],
2566n/a preexec_fn=lambda: 1)
2567n/a self.assertRaises(ValueError, subprocess.call,
2568n/a [sys.executable, "-c",
2569n/a "import sys; sys.exit(47)"],
2570n/a stdout=subprocess.PIPE,
2571n/a close_fds=True)
2572n/a
2573n/a def test_close_fds(self):
2574n/a # close file descriptors
2575n/a rc = subprocess.call([sys.executable, "-c",
2576n/a "import sys; sys.exit(47)"],
2577n/a close_fds=True)
2578n/a self.assertEqual(rc, 47)
2579n/a
2580n/a def test_shell_sequence(self):
2581n/a # Run command through the shell (sequence)
2582n/a newenv = os.environ.copy()
2583n/a newenv["FRUIT"] = "physalis"
2584n/a p = subprocess.Popen(["set"], shell=1,
2585n/a stdout=subprocess.PIPE,
2586n/a env=newenv)
2587n/a with p:
2588n/a self.assertIn(b"physalis", p.stdout.read())
2589n/a
2590n/a def test_shell_string(self):
2591n/a # Run command through the shell (string)
2592n/a newenv = os.environ.copy()
2593n/a newenv["FRUIT"] = "physalis"
2594n/a p = subprocess.Popen("set", shell=1,
2595n/a stdout=subprocess.PIPE,
2596n/a env=newenv)
2597n/a with p:
2598n/a self.assertIn(b"physalis", p.stdout.read())
2599n/a
2600n/a def test_shell_encodings(self):
2601n/a # Run command through the shell (string)
2602n/a for enc in ['ansi', 'oem']:
2603n/a newenv = os.environ.copy()
2604n/a newenv["FRUIT"] = "physalis"
2605n/a p = subprocess.Popen("set", shell=1,
2606n/a stdout=subprocess.PIPE,
2607n/a env=newenv,
2608n/a encoding=enc)
2609n/a with p:
2610n/a self.assertIn("physalis", p.stdout.read(), enc)
2611n/a
2612n/a def test_call_string(self):
2613n/a # call() function with string argument on Windows
2614n/a rc = subprocess.call(sys.executable +
2615n/a ' -c "import sys; sys.exit(47)"')
2616n/a self.assertEqual(rc, 47)
2617n/a
2618n/a def _kill_process(self, method, *args):
2619n/a # Some win32 buildbot raises EOFError if stdin is inherited
2620n/a p = subprocess.Popen([sys.executable, "-c", """if 1:
2621n/a import sys, time
2622n/a sys.stdout.write('x\\n')
2623n/a sys.stdout.flush()
2624n/a time.sleep(30)
2625n/a """],
2626n/a stdin=subprocess.PIPE,
2627n/a stdout=subprocess.PIPE,
2628n/a stderr=subprocess.PIPE)
2629n/a with p:
2630n/a # Wait for the interpreter to be completely initialized before
2631n/a # sending any signal.
2632n/a p.stdout.read(1)
2633n/a getattr(p, method)(*args)
2634n/a _, stderr = p.communicate()
2635n/a self.assertStderrEqual(stderr, b'')
2636n/a returncode = p.wait()
2637n/a self.assertNotEqual(returncode, 0)
2638n/a
2639n/a def _kill_dead_process(self, method, *args):
2640n/a p = subprocess.Popen([sys.executable, "-c", """if 1:
2641n/a import sys, time
2642n/a sys.stdout.write('x\\n')
2643n/a sys.stdout.flush()
2644n/a sys.exit(42)
2645n/a """],
2646n/a stdin=subprocess.PIPE,
2647n/a stdout=subprocess.PIPE,
2648n/a stderr=subprocess.PIPE)
2649n/a with p:
2650n/a # Wait for the interpreter to be completely initialized before
2651n/a # sending any signal.
2652n/a p.stdout.read(1)
2653n/a # The process should end after this
2654n/a time.sleep(1)
2655n/a # This shouldn't raise even though the child is now dead
2656n/a getattr(p, method)(*args)
2657n/a _, stderr = p.communicate()
2658n/a self.assertStderrEqual(stderr, b'')
2659n/a rc = p.wait()
2660n/a self.assertEqual(rc, 42)
2661n/a
2662n/a def test_send_signal(self):
2663n/a self._kill_process('send_signal', signal.SIGTERM)
2664n/a
2665n/a def test_kill(self):
2666n/a self._kill_process('kill')
2667n/a
2668n/a def test_terminate(self):
2669n/a self._kill_process('terminate')
2670n/a
2671n/a def test_send_signal_dead(self):
2672n/a self._kill_dead_process('send_signal', signal.SIGTERM)
2673n/a
2674n/a def test_kill_dead(self):
2675n/a self._kill_dead_process('kill')
2676n/a
2677n/a def test_terminate_dead(self):
2678n/a self._kill_dead_process('terminate')
2679n/a
2680n/aclass MiscTests(unittest.TestCase):
2681n/a def test_getoutput(self):
2682n/a self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
2683n/a self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
2684n/a (0, 'xyzzy'))
2685n/a
2686n/a # we use mkdtemp in the next line to create an empty directory
2687n/a # under our exclusive control; from that, we can invent a pathname
2688n/a # that we _know_ won't exist. This is guaranteed to fail.
2689n/a dir = None
2690n/a try:
2691n/a dir = tempfile.mkdtemp()
2692n/a name = os.path.join(dir, "foo")
2693n/a status, output = subprocess.getstatusoutput(
2694n/a ("type " if mswindows else "cat ") + name)
2695n/a self.assertNotEqual(status, 0)
2696n/a finally:
2697n/a if dir is not None:
2698n/a os.rmdir(dir)
2699n/a
2700n/a def test__all__(self):
2701n/a """Ensure that __all__ is populated properly."""
2702n/a intentionally_excluded = {"list2cmdline", "Handle"}
2703n/a exported = set(subprocess.__all__)
2704n/a possible_exports = set()
2705n/a import types
2706n/a for name, value in subprocess.__dict__.items():
2707n/a if name.startswith('_'):
2708n/a continue
2709n/a if isinstance(value, (types.ModuleType,)):
2710n/a continue
2711n/a possible_exports.add(name)
2712n/a self.assertEqual(exported, possible_exports - intentionally_excluded)
2713n/a
2714n/a
2715n/a@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
2716n/a "Test needs selectors.PollSelector")
2717n/aclass ProcessTestCaseNoPoll(ProcessTestCase):
2718n/a def setUp(self):
2719n/a self.orig_selector = subprocess._PopenSelector
2720n/a subprocess._PopenSelector = selectors.SelectSelector
2721n/a ProcessTestCase.setUp(self)
2722n/a
2723n/a def tearDown(self):
2724n/a subprocess._PopenSelector = self.orig_selector
2725n/a ProcessTestCase.tearDown(self)
2726n/a
2727n/a
2728n/a@unittest.skipUnless(mswindows, "Windows-specific tests")
2729n/aclass CommandsWithSpaces (BaseTestCase):
2730n/a
2731n/a def setUp(self):
2732n/a super().setUp()
2733n/a f, fname = tempfile.mkstemp(".py", "te st")
2734n/a self.fname = fname.lower ()
2735n/a os.write(f, b"import sys;"
2736n/a b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
2737n/a )
2738n/a os.close(f)
2739n/a
2740n/a def tearDown(self):
2741n/a os.remove(self.fname)
2742n/a super().tearDown()
2743n/a
2744n/a def with_spaces(self, *args, **kwargs):
2745n/a kwargs['stdout'] = subprocess.PIPE
2746n/a p = subprocess.Popen(*args, **kwargs)
2747n/a with p:
2748n/a self.assertEqual(
2749n/a p.stdout.read ().decode("mbcs"),
2750n/a "2 [%r, 'ab cd']" % self.fname
2751n/a )
2752n/a
2753n/a def test_shell_string_with_spaces(self):
2754n/a # call() function with string argument with spaces on Windows
2755n/a self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
2756n/a "ab cd"), shell=1)
2757n/a
2758n/a def test_shell_sequence_with_spaces(self):
2759n/a # call() function with sequence argument with spaces on Windows
2760n/a self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
2761n/a
2762n/a def test_noshell_string_with_spaces(self):
2763n/a # call() function with string argument with spaces on Windows
2764n/a self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
2765n/a "ab cd"))
2766n/a
2767n/a def test_noshell_sequence_with_spaces(self):
2768n/a # call() function with sequence argument with spaces on Windows
2769n/a self.with_spaces([sys.executable, self.fname, "ab cd"])
2770n/a
2771n/a
2772n/aclass ContextManagerTests(BaseTestCase):
2773n/a
2774n/a def test_pipe(self):
2775n/a with subprocess.Popen([sys.executable, "-c",
2776n/a "import sys;"
2777n/a "sys.stdout.write('stdout');"
2778n/a "sys.stderr.write('stderr');"],
2779n/a stdout=subprocess.PIPE,
2780n/a stderr=subprocess.PIPE) as proc:
2781n/a self.assertEqual(proc.stdout.read(), b"stdout")
2782n/a self.assertStderrEqual(proc.stderr.read(), b"stderr")
2783n/a
2784n/a self.assertTrue(proc.stdout.closed)
2785n/a self.assertTrue(proc.stderr.closed)
2786n/a
2787n/a def test_returncode(self):
2788n/a with subprocess.Popen([sys.executable, "-c",
2789n/a "import sys; sys.exit(100)"]) as proc:
2790n/a pass
2791n/a # __exit__ calls wait(), so the returncode should be set
2792n/a self.assertEqual(proc.returncode, 100)
2793n/a
2794n/a def test_communicate_stdin(self):
2795n/a with subprocess.Popen([sys.executable, "-c",
2796n/a "import sys;"
2797n/a "sys.exit(sys.stdin.read() == 'context')"],
2798n/a stdin=subprocess.PIPE) as proc:
2799n/a proc.communicate(b"context")
2800n/a self.assertEqual(proc.returncode, 1)
2801n/a
2802n/a def test_invalid_args(self):
2803n/a with self.assertRaises((FileNotFoundError, PermissionError)) as c:
2804n/a with subprocess.Popen(['nonexisting_i_hope'],
2805n/a stdout=subprocess.PIPE,
2806n/a stderr=subprocess.PIPE) as proc:
2807n/a pass
2808n/a
2809n/a def test_broken_pipe_cleanup(self):
2810n/a """Broken pipe error should not prevent wait() (Issue 21619)"""
2811n/a proc = subprocess.Popen([sys.executable, '-c', 'pass'],
2812n/a stdin=subprocess.PIPE,
2813n/a bufsize=support.PIPE_MAX_SIZE*2)
2814n/a proc = proc.__enter__()
2815n/a # Prepare to send enough data to overflow any OS pipe buffering and
2816n/a # guarantee a broken pipe error. Data is held in BufferedWriter
2817n/a # buffer until closed.
2818n/a proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
2819n/a self.assertIsNone(proc.returncode)
2820n/a # EPIPE expected under POSIX; EINVAL under Windows
2821n/a self.assertRaises(OSError, proc.__exit__, None, None, None)
2822n/a self.assertEqual(proc.returncode, 0)
2823n/a self.assertTrue(proc.stdin.closed)
2824n/a
2825n/a
2826n/aif __name__ == "__main__":
2827n/a unittest.main()