ยปCore Development>Code coverage>Lib/asyncio/subprocess.py

Python code coverage for Lib/asyncio/subprocess.py

#countcontent
1n/a__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2n/a
3n/aimport subprocess
4n/a
5n/afrom . import events
6n/afrom . import protocols
7n/afrom . import streams
8n/afrom . import tasks
9n/afrom .coroutines import coroutine
10n/afrom .log import logger
11n/a
12n/a
13n/aPIPE = subprocess.PIPE
14n/aSTDOUT = subprocess.STDOUT
15n/aDEVNULL = subprocess.DEVNULL
16n/a
17n/a
18n/aclass SubprocessStreamProtocol(streams.FlowControlMixin,
19n/a protocols.SubprocessProtocol):
20n/a """Like StreamReaderProtocol, but for a subprocess."""
21n/a
22n/a def __init__(self, limit, loop):
23n/a super().__init__(loop=loop)
24n/a self._limit = limit
25n/a self.stdin = self.stdout = self.stderr = None
26n/a self._transport = None
27n/a
28n/a def __repr__(self):
29n/a info = [self.__class__.__name__]
30n/a if self.stdin is not None:
31n/a info.append('stdin=%r' % self.stdin)
32n/a if self.stdout is not None:
33n/a info.append('stdout=%r' % self.stdout)
34n/a if self.stderr is not None:
35n/a info.append('stderr=%r' % self.stderr)
36n/a return '<%s>' % ' '.join(info)
37n/a
38n/a def connection_made(self, transport):
39n/a self._transport = transport
40n/a
41n/a stdout_transport = transport.get_pipe_transport(1)
42n/a if stdout_transport is not None:
43n/a self.stdout = streams.StreamReader(limit=self._limit,
44n/a loop=self._loop)
45n/a self.stdout.set_transport(stdout_transport)
46n/a
47n/a stderr_transport = transport.get_pipe_transport(2)
48n/a if stderr_transport is not None:
49n/a self.stderr = streams.StreamReader(limit=self._limit,
50n/a loop=self._loop)
51n/a self.stderr.set_transport(stderr_transport)
52n/a
53n/a stdin_transport = transport.get_pipe_transport(0)
54n/a if stdin_transport is not None:
55n/a self.stdin = streams.StreamWriter(stdin_transport,
56n/a protocol=self,
57n/a reader=None,
58n/a loop=self._loop)
59n/a
60n/a def pipe_data_received(self, fd, data):
61n/a if fd == 1:
62n/a reader = self.stdout
63n/a elif fd == 2:
64n/a reader = self.stderr
65n/a else:
66n/a reader = None
67n/a if reader is not None:
68n/a reader.feed_data(data)
69n/a
70n/a def pipe_connection_lost(self, fd, exc):
71n/a if fd == 0:
72n/a pipe = self.stdin
73n/a if pipe is not None:
74n/a pipe.close()
75n/a self.connection_lost(exc)
76n/a return
77n/a if fd == 1:
78n/a reader = self.stdout
79n/a elif fd == 2:
80n/a reader = self.stderr
81n/a else:
82n/a reader = None
83n/a if reader != None:
84n/a if exc is None:
85n/a reader.feed_eof()
86n/a else:
87n/a reader.set_exception(exc)
88n/a
89n/a def process_exited(self):
90n/a self._transport.close()
91n/a self._transport = None
92n/a
93n/a
94n/aclass Process:
95n/a def __init__(self, transport, protocol, loop):
96n/a self._transport = transport
97n/a self._protocol = protocol
98n/a self._loop = loop
99n/a self.stdin = protocol.stdin
100n/a self.stdout = protocol.stdout
101n/a self.stderr = protocol.stderr
102n/a self.pid = transport.get_pid()
103n/a
104n/a def __repr__(self):
105n/a return '<%s %s>' % (self.__class__.__name__, self.pid)
106n/a
107n/a @property
108n/a def returncode(self):
109n/a return self._transport.get_returncode()
110n/a
111n/a @coroutine
112n/a def wait(self):
113n/a """Wait until the process exit and return the process return code.
114n/a
115n/a This method is a coroutine."""
116n/a return (yield from self._transport._wait())
117n/a
118n/a def send_signal(self, signal):
119n/a self._transport.send_signal(signal)
120n/a
121n/a def terminate(self):
122n/a self._transport.terminate()
123n/a
124n/a def kill(self):
125n/a self._transport.kill()
126n/a
127n/a @coroutine
128n/a def _feed_stdin(self, input):
129n/a debug = self._loop.get_debug()
130n/a self.stdin.write(input)
131n/a if debug:
132n/a logger.debug('%r communicate: feed stdin (%s bytes)',
133n/a self, len(input))
134n/a try:
135n/a yield from self.stdin.drain()
136n/a except (BrokenPipeError, ConnectionResetError) as exc:
137n/a # communicate() ignores BrokenPipeError and ConnectionResetError
138n/a if debug:
139n/a logger.debug('%r communicate: stdin got %r', self, exc)
140n/a
141n/a if debug:
142n/a logger.debug('%r communicate: close stdin', self)
143n/a self.stdin.close()
144n/a
145n/a @coroutine
146n/a def _noop(self):
147n/a return None
148n/a
149n/a @coroutine
150n/a def _read_stream(self, fd):
151n/a transport = self._transport.get_pipe_transport(fd)
152n/a if fd == 2:
153n/a stream = self.stderr
154n/a else:
155n/a assert fd == 1
156n/a stream = self.stdout
157n/a if self._loop.get_debug():
158n/a name = 'stdout' if fd == 1 else 'stderr'
159n/a logger.debug('%r communicate: read %s', self, name)
160n/a output = yield from stream.read()
161n/a if self._loop.get_debug():
162n/a name = 'stdout' if fd == 1 else 'stderr'
163n/a logger.debug('%r communicate: close %s', self, name)
164n/a transport.close()
165n/a return output
166n/a
167n/a @coroutine
168n/a def communicate(self, input=None):
169n/a if input is not None:
170n/a stdin = self._feed_stdin(input)
171n/a else:
172n/a stdin = self._noop()
173n/a if self.stdout is not None:
174n/a stdout = self._read_stream(1)
175n/a else:
176n/a stdout = self._noop()
177n/a if self.stderr is not None:
178n/a stderr = self._read_stream(2)
179n/a else:
180n/a stderr = self._noop()
181n/a stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
182n/a loop=self._loop)
183n/a yield from self.wait()
184n/a return (stdout, stderr)
185n/a
186n/a
187n/a@coroutine
188n/adef create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
189n/a loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
190n/a if loop is None:
191n/a loop = events.get_event_loop()
192n/a protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
193n/a loop=loop)
194n/a transport, protocol = yield from loop.subprocess_shell(
195n/a protocol_factory,
196n/a cmd, stdin=stdin, stdout=stdout,
197n/a stderr=stderr, **kwds)
198n/a return Process(transport, protocol, loop)
199n/a
200n/a@coroutine
201n/adef create_subprocess_exec(program, *args, stdin=None, stdout=None,
202n/a stderr=None, loop=None,
203n/a limit=streams._DEFAULT_LIMIT, **kwds):
204n/a if loop is None:
205n/a loop = events.get_event_loop()
206n/a protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
207n/a loop=loop)
208n/a transport, protocol = yield from loop.subprocess_exec(
209n/a protocol_factory,
210n/a program, *args,
211n/a stdin=stdin, stdout=stdout,
212n/a stderr=stderr, **kwds)
213n/a return Process(transport, protocol, loop)