ยปCore Development>Code coverage>Lib/test/test_asyncio/test_subprocess.py

Python code coverage for Lib/test/test_asyncio/test_subprocess.py

#countcontent
1n/aimport signal
2n/aimport sys
3n/aimport unittest
4n/aimport warnings
5n/afrom unittest import mock
6n/a
7n/aimport asyncio
8n/afrom asyncio import base_subprocess
9n/afrom asyncio import subprocess
10n/afrom asyncio import test_utils
11n/atry:
12n/a from test import support
13n/aexcept ImportError:
14n/a from asyncio import test_support as support
15n/aif sys.platform != 'win32':
16n/a from asyncio import unix_events
17n/a
18n/a# Program blocking
19n/aPROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
20n/a
21n/a# Program copying input to output
22n/aPROGRAM_CAT = [
23n/a sys.executable, '-c',
24n/a ';'.join(('import sys',
25n/a 'data = sys.stdin.buffer.read()',
26n/a 'sys.stdout.buffer.write(data)'))]
27n/a
28n/aclass TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
29n/a def _start(self, *args, **kwargs):
30n/a self._proc = mock.Mock()
31n/a self._proc.stdin = None
32n/a self._proc.stdout = None
33n/a self._proc.stderr = None
34n/a
35n/a
36n/aclass SubprocessTransportTests(test_utils.TestCase):
37n/a def setUp(self):
38n/a super().setUp()
39n/a self.loop = self.new_test_loop()
40n/a self.set_event_loop(self.loop)
41n/a
42n/a
43n/a def create_transport(self, waiter=None):
44n/a protocol = mock.Mock()
45n/a protocol.connection_made._is_coroutine = False
46n/a protocol.process_exited._is_coroutine = False
47n/a transport = TestSubprocessTransport(
48n/a self.loop, protocol, ['test'], False,
49n/a None, None, None, 0, waiter=waiter)
50n/a return (transport, protocol)
51n/a
52n/a def test_proc_exited(self):
53n/a waiter = asyncio.Future(loop=self.loop)
54n/a transport, protocol = self.create_transport(waiter)
55n/a transport._process_exited(6)
56n/a self.loop.run_until_complete(waiter)
57n/a
58n/a self.assertEqual(transport.get_returncode(), 6)
59n/a
60n/a self.assertTrue(protocol.connection_made.called)
61n/a self.assertTrue(protocol.process_exited.called)
62n/a self.assertTrue(protocol.connection_lost.called)
63n/a self.assertEqual(protocol.connection_lost.call_args[0], (None,))
64n/a
65n/a self.assertFalse(transport.is_closing())
66n/a self.assertIsNone(transport._loop)
67n/a self.assertIsNone(transport._proc)
68n/a self.assertIsNone(transport._protocol)
69n/a
70n/a # methods must raise ProcessLookupError if the process exited
71n/a self.assertRaises(ProcessLookupError,
72n/a transport.send_signal, signal.SIGTERM)
73n/a self.assertRaises(ProcessLookupError, transport.terminate)
74n/a self.assertRaises(ProcessLookupError, transport.kill)
75n/a
76n/a transport.close()
77n/a
78n/a
79n/aclass SubprocessMixin:
80n/a
81n/a def test_stdin_stdout(self):
82n/a args = PROGRAM_CAT
83n/a
84n/a @asyncio.coroutine
85n/a def run(data):
86n/a proc = yield from asyncio.create_subprocess_exec(
87n/a *args,
88n/a stdin=subprocess.PIPE,
89n/a stdout=subprocess.PIPE,
90n/a loop=self.loop)
91n/a
92n/a # feed data
93n/a proc.stdin.write(data)
94n/a yield from proc.stdin.drain()
95n/a proc.stdin.close()
96n/a
97n/a # get output and exitcode
98n/a data = yield from proc.stdout.read()
99n/a exitcode = yield from proc.wait()
100n/a return (exitcode, data)
101n/a
102n/a task = run(b'some data')
103n/a task = asyncio.wait_for(task, 60.0, loop=self.loop)
104n/a exitcode, stdout = self.loop.run_until_complete(task)
105n/a self.assertEqual(exitcode, 0)
106n/a self.assertEqual(stdout, b'some data')
107n/a
108n/a def test_communicate(self):
109n/a args = PROGRAM_CAT
110n/a
111n/a @asyncio.coroutine
112n/a def run(data):
113n/a proc = yield from asyncio.create_subprocess_exec(
114n/a *args,
115n/a stdin=subprocess.PIPE,
116n/a stdout=subprocess.PIPE,
117n/a loop=self.loop)
118n/a stdout, stderr = yield from proc.communicate(data)
119n/a return proc.returncode, stdout
120n/a
121n/a task = run(b'some data')
122n/a task = asyncio.wait_for(task, 60.0, loop=self.loop)
123n/a exitcode, stdout = self.loop.run_until_complete(task)
124n/a self.assertEqual(exitcode, 0)
125n/a self.assertEqual(stdout, b'some data')
126n/a
127n/a def test_shell(self):
128n/a create = asyncio.create_subprocess_shell('exit 7',
129n/a loop=self.loop)
130n/a proc = self.loop.run_until_complete(create)
131n/a exitcode = self.loop.run_until_complete(proc.wait())
132n/a self.assertEqual(exitcode, 7)
133n/a
134n/a def test_start_new_session(self):
135n/a # start the new process in a new session
136n/a create = asyncio.create_subprocess_shell('exit 8',
137n/a start_new_session=True,
138n/a loop=self.loop)
139n/a proc = self.loop.run_until_complete(create)
140n/a exitcode = self.loop.run_until_complete(proc.wait())
141n/a self.assertEqual(exitcode, 8)
142n/a
143n/a def test_kill(self):
144n/a args = PROGRAM_BLOCKED
145n/a create = asyncio.create_subprocess_exec(*args, loop=self.loop)
146n/a proc = self.loop.run_until_complete(create)
147n/a proc.kill()
148n/a returncode = self.loop.run_until_complete(proc.wait())
149n/a if sys.platform == 'win32':
150n/a self.assertIsInstance(returncode, int)
151n/a # expect 1 but sometimes get 0
152n/a else:
153n/a self.assertEqual(-signal.SIGKILL, returncode)
154n/a
155n/a def test_terminate(self):
156n/a args = PROGRAM_BLOCKED
157n/a create = asyncio.create_subprocess_exec(*args, loop=self.loop)
158n/a proc = self.loop.run_until_complete(create)
159n/a proc.terminate()
160n/a returncode = self.loop.run_until_complete(proc.wait())
161n/a if sys.platform == 'win32':
162n/a self.assertIsInstance(returncode, int)
163n/a # expect 1 but sometimes get 0
164n/a else:
165n/a self.assertEqual(-signal.SIGTERM, returncode)
166n/a
167n/a @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
168n/a def test_send_signal(self):
169n/a code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
170n/a args = [sys.executable, '-c', code]
171n/a create = asyncio.create_subprocess_exec(*args,
172n/a stdout=subprocess.PIPE,
173n/a loop=self.loop)
174n/a proc = self.loop.run_until_complete(create)
175n/a
176n/a @asyncio.coroutine
177n/a def send_signal(proc):
178n/a # basic synchronization to wait until the program is sleeping
179n/a line = yield from proc.stdout.readline()
180n/a self.assertEqual(line, b'sleeping\n')
181n/a
182n/a proc.send_signal(signal.SIGHUP)
183n/a returncode = (yield from proc.wait())
184n/a return returncode
185n/a
186n/a returncode = self.loop.run_until_complete(send_signal(proc))
187n/a self.assertEqual(-signal.SIGHUP, returncode)
188n/a
189n/a def prepare_broken_pipe_test(self):
190n/a # buffer large enough to feed the whole pipe buffer
191n/a large_data = b'x' * support.PIPE_MAX_SIZE
192n/a
193n/a # the program ends before the stdin can be feeded
194n/a create = asyncio.create_subprocess_exec(
195n/a sys.executable, '-c', 'pass',
196n/a stdin=subprocess.PIPE,
197n/a loop=self.loop)
198n/a proc = self.loop.run_until_complete(create)
199n/a return (proc, large_data)
200n/a
201n/a def test_stdin_broken_pipe(self):
202n/a proc, large_data = self.prepare_broken_pipe_test()
203n/a
204n/a @asyncio.coroutine
205n/a def write_stdin(proc, data):
206n/a proc.stdin.write(data)
207n/a yield from proc.stdin.drain()
208n/a
209n/a coro = write_stdin(proc, large_data)
210n/a # drain() must raise BrokenPipeError or ConnectionResetError
211n/a with test_utils.disable_logger():
212n/a self.assertRaises((BrokenPipeError, ConnectionResetError),
213n/a self.loop.run_until_complete, coro)
214n/a self.loop.run_until_complete(proc.wait())
215n/a
216n/a def test_communicate_ignore_broken_pipe(self):
217n/a proc, large_data = self.prepare_broken_pipe_test()
218n/a
219n/a # communicate() must ignore BrokenPipeError when feeding stdin
220n/a with test_utils.disable_logger():
221n/a self.loop.run_until_complete(proc.communicate(large_data))
222n/a self.loop.run_until_complete(proc.wait())
223n/a
224n/a def test_pause_reading(self):
225n/a limit = 10
226n/a size = (limit * 2 + 1)
227n/a
228n/a @asyncio.coroutine
229n/a def test_pause_reading():
230n/a code = '\n'.join((
231n/a 'import sys',
232n/a 'sys.stdout.write("x" * %s)' % size,
233n/a 'sys.stdout.flush()',
234n/a ))
235n/a
236n/a connect_read_pipe = self.loop.connect_read_pipe
237n/a
238n/a @asyncio.coroutine
239n/a def connect_read_pipe_mock(*args, **kw):
240n/a transport, protocol = yield from connect_read_pipe(*args, **kw)
241n/a transport.pause_reading = mock.Mock()
242n/a transport.resume_reading = mock.Mock()
243n/a return (transport, protocol)
244n/a
245n/a self.loop.connect_read_pipe = connect_read_pipe_mock
246n/a
247n/a proc = yield from asyncio.create_subprocess_exec(
248n/a sys.executable, '-c', code,
249n/a stdin=asyncio.subprocess.PIPE,
250n/a stdout=asyncio.subprocess.PIPE,
251n/a limit=limit,
252n/a loop=self.loop)
253n/a stdout_transport = proc._transport.get_pipe_transport(1)
254n/a
255n/a stdout, stderr = yield from proc.communicate()
256n/a
257n/a # The child process produced more than limit bytes of output,
258n/a # the stream reader transport should pause the protocol to not
259n/a # allocate too much memory.
260n/a return (stdout, stdout_transport)
261n/a
262n/a # Issue #22685: Ensure that the stream reader pauses the protocol
263n/a # when the child process produces too much data
264n/a stdout, transport = self.loop.run_until_complete(test_pause_reading())
265n/a
266n/a self.assertEqual(stdout, b'x' * size)
267n/a self.assertTrue(transport.pause_reading.called)
268n/a self.assertTrue(transport.resume_reading.called)
269n/a
270n/a def test_stdin_not_inheritable(self):
271n/a # asyncio issue #209: stdin must not be inheritable, otherwise
272n/a # the Process.communicate() hangs
273n/a @asyncio.coroutine
274n/a def len_message(message):
275n/a code = 'import sys; data = sys.stdin.read(); print(len(data))'
276n/a proc = yield from asyncio.create_subprocess_exec(
277n/a sys.executable, '-c', code,
278n/a stdin=asyncio.subprocess.PIPE,
279n/a stdout=asyncio.subprocess.PIPE,
280n/a stderr=asyncio.subprocess.PIPE,
281n/a close_fds=False,
282n/a loop=self.loop)
283n/a stdout, stderr = yield from proc.communicate(message)
284n/a exitcode = yield from proc.wait()
285n/a return (stdout, exitcode)
286n/a
287n/a output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
288n/a self.assertEqual(output.rstrip(), b'3')
289n/a self.assertEqual(exitcode, 0)
290n/a
291n/a def test_empty_input(self):
292n/a @asyncio.coroutine
293n/a def empty_input():
294n/a code = 'import sys; data = sys.stdin.read(); print(len(data))'
295n/a proc = yield from asyncio.create_subprocess_exec(
296n/a sys.executable, '-c', code,
297n/a stdin=asyncio.subprocess.PIPE,
298n/a stdout=asyncio.subprocess.PIPE,
299n/a stderr=asyncio.subprocess.PIPE,
300n/a close_fds=False,
301n/a loop=self.loop)
302n/a stdout, stderr = yield from proc.communicate(b'')
303n/a exitcode = yield from proc.wait()
304n/a return (stdout, exitcode)
305n/a
306n/a output, exitcode = self.loop.run_until_complete(empty_input())
307n/a self.assertEqual(output.rstrip(), b'0')
308n/a self.assertEqual(exitcode, 0)
309n/a
310n/a def test_cancel_process_wait(self):
311n/a # Issue #23140: cancel Process.wait()
312n/a
313n/a @asyncio.coroutine
314n/a def cancel_wait():
315n/a proc = yield from asyncio.create_subprocess_exec(
316n/a *PROGRAM_BLOCKED,
317n/a loop=self.loop)
318n/a
319n/a # Create an internal future waiting on the process exit
320n/a task = self.loop.create_task(proc.wait())
321n/a self.loop.call_soon(task.cancel)
322n/a try:
323n/a yield from task
324n/a except asyncio.CancelledError:
325n/a pass
326n/a
327n/a # Cancel the future
328n/a task.cancel()
329n/a
330n/a # Kill the process and wait until it is done
331n/a proc.kill()
332n/a yield from proc.wait()
333n/a
334n/a self.loop.run_until_complete(cancel_wait())
335n/a
336n/a def test_cancel_make_subprocess_transport_exec(self):
337n/a @asyncio.coroutine
338n/a def cancel_make_transport():
339n/a coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED,
340n/a loop=self.loop)
341n/a task = self.loop.create_task(coro)
342n/a
343n/a self.loop.call_soon(task.cancel)
344n/a try:
345n/a yield from task
346n/a except asyncio.CancelledError:
347n/a pass
348n/a
349n/a # ignore the log:
350n/a # "Exception during subprocess creation, kill the subprocess"
351n/a with test_utils.disable_logger():
352n/a self.loop.run_until_complete(cancel_make_transport())
353n/a
354n/a def test_cancel_post_init(self):
355n/a @asyncio.coroutine
356n/a def cancel_make_transport():
357n/a coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
358n/a *PROGRAM_BLOCKED)
359n/a task = self.loop.create_task(coro)
360n/a
361n/a self.loop.call_soon(task.cancel)
362n/a try:
363n/a yield from task
364n/a except asyncio.CancelledError:
365n/a pass
366n/a
367n/a # ignore the log:
368n/a # "Exception during subprocess creation, kill the subprocess"
369n/a with test_utils.disable_logger():
370n/a self.loop.run_until_complete(cancel_make_transport())
371n/a test_utils.run_briefly(self.loop)
372n/a
373n/a def test_close_kill_running(self):
374n/a @asyncio.coroutine
375n/a def kill_running():
376n/a create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
377n/a *PROGRAM_BLOCKED)
378n/a transport, protocol = yield from create
379n/a
380n/a kill_called = False
381n/a def kill():
382n/a nonlocal kill_called
383n/a kill_called = True
384n/a orig_kill()
385n/a
386n/a proc = transport.get_extra_info('subprocess')
387n/a orig_kill = proc.kill
388n/a proc.kill = kill
389n/a returncode = transport.get_returncode()
390n/a transport.close()
391n/a yield from transport._wait()
392n/a return (returncode, kill_called)
393n/a
394n/a # Ignore "Close running child process: kill ..." log
395n/a with test_utils.disable_logger():
396n/a returncode, killed = self.loop.run_until_complete(kill_running())
397n/a self.assertIsNone(returncode)
398n/a
399n/a # transport.close() must kill the process if it is still running
400n/a self.assertTrue(killed)
401n/a test_utils.run_briefly(self.loop)
402n/a
403n/a def test_close_dont_kill_finished(self):
404n/a @asyncio.coroutine
405n/a def kill_running():
406n/a create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
407n/a *PROGRAM_BLOCKED)
408n/a transport, protocol = yield from create
409n/a proc = transport.get_extra_info('subprocess')
410n/a
411n/a # kill the process (but asyncio is not notified immediately)
412n/a proc.kill()
413n/a proc.wait()
414n/a
415n/a proc.kill = mock.Mock()
416n/a proc_returncode = proc.poll()
417n/a transport_returncode = transport.get_returncode()
418n/a transport.close()
419n/a return (proc_returncode, transport_returncode, proc.kill.called)
420n/a
421n/a # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
422n/a # emitted because the test already consumes the exit status:
423n/a # proc.wait()
424n/a with test_utils.disable_logger():
425n/a result = self.loop.run_until_complete(kill_running())
426n/a test_utils.run_briefly(self.loop)
427n/a
428n/a proc_returncode, transport_return_code, killed = result
429n/a
430n/a self.assertIsNotNone(proc_returncode)
431n/a self.assertIsNone(transport_return_code)
432n/a
433n/a # transport.close() must not kill the process if it finished, even if
434n/a # the transport was not notified yet
435n/a self.assertFalse(killed)
436n/a
437n/a # Unlike SafeChildWatcher, FastChildWatcher does not pop the
438n/a # callbacks if waitpid() is called elsewhere. Let's clear them
439n/a # manually to avoid a warning when the watcher is detached.
440n/a if sys.platform != 'win32' and \
441n/a isinstance(self, SubprocessFastWatcherTests):
442n/a asyncio.get_child_watcher()._callbacks.clear()
443n/a
444n/a def test_popen_error(self):
445n/a # Issue #24763: check that the subprocess transport is closed
446n/a # when BaseSubprocessTransport fails
447n/a if sys.platform == 'win32':
448n/a target = 'asyncio.windows_utils.Popen'
449n/a else:
450n/a target = 'subprocess.Popen'
451n/a with mock.patch(target) as popen:
452n/a exc = ZeroDivisionError
453n/a popen.side_effect = exc
454n/a
455n/a create = asyncio.create_subprocess_exec(sys.executable, '-c',
456n/a 'pass', loop=self.loop)
457n/a with warnings.catch_warnings(record=True) as warns:
458n/a with self.assertRaises(exc):
459n/a self.loop.run_until_complete(create)
460n/a self.assertEqual(warns, [])
461n/a
462n/a
463n/aif sys.platform != 'win32':
464n/a # Unix
465n/a class SubprocessWatcherMixin(SubprocessMixin):
466n/a
467n/a Watcher = None
468n/a
469n/a def setUp(self):
470n/a super().setUp()
471n/a policy = asyncio.get_event_loop_policy()
472n/a self.loop = policy.new_event_loop()
473n/a self.set_event_loop(self.loop)
474n/a
475n/a watcher = self.Watcher()
476n/a watcher.attach_loop(self.loop)
477n/a policy.set_child_watcher(watcher)
478n/a self.addCleanup(policy.set_child_watcher, None)
479n/a
480n/a class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
481n/a test_utils.TestCase):
482n/a
483n/a Watcher = unix_events.SafeChildWatcher
484n/a
485n/a class SubprocessFastWatcherTests(SubprocessWatcherMixin,
486n/a test_utils.TestCase):
487n/a
488n/a Watcher = unix_events.FastChildWatcher
489n/a
490n/aelse:
491n/a # Windows
492n/a class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
493n/a
494n/a def setUp(self):
495n/a super().setUp()
496n/a self.loop = asyncio.ProactorEventLoop()
497n/a self.set_event_loop(self.loop)
498n/a
499n/a
500n/aif __name__ == '__main__':
501n/a unittest.main()