ยปCore Development>Code coverage>Lib/asyncio/unix_events.py

Python code coverage for Lib/asyncio/unix_events.py

#countcontent
1n/a"""Selector event loop for Unix with signal handling."""
2n/a
3n/aimport errno
4n/aimport os
5n/aimport signal
6n/aimport socket
7n/aimport stat
8n/aimport subprocess
9n/aimport sys
10n/aimport threading
11n/aimport warnings
12n/a
13n/a
14n/afrom . import base_events
15n/afrom . import base_subprocess
16n/afrom . import compat
17n/afrom . import constants
18n/afrom . import coroutines
19n/afrom . import events
20n/afrom . import futures
21n/afrom . import selector_events
22n/afrom . import selectors
23n/afrom . import transports
24n/afrom .coroutines import coroutine
25n/afrom .log import logger
26n/a
27n/a
28n/a__all__ = ['SelectorEventLoop',
29n/a 'AbstractChildWatcher', 'SafeChildWatcher',
30n/a 'FastChildWatcher', 'DefaultEventLoopPolicy',
31n/a ]
32n/a
33n/aif sys.platform == 'win32': # pragma: no cover
34n/a raise ImportError('Signals are not really supported on Windows')
35n/a
36n/a
37n/adef _sighandler_noop(signum, frame):
38n/a """Dummy signal handler."""
39n/a pass
40n/a
41n/a
42n/atry:
43n/a _fspath = os.fspath
44n/aexcept AttributeError:
45n/a # Python 3.5 or earlier
46n/a _fspath = lambda path: path
47n/a
48n/a
49n/aclass _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
50n/a """Unix event loop.
51n/a
52n/a Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
53n/a """
54n/a
55n/a def __init__(self, selector=None):
56n/a super().__init__(selector)
57n/a self._signal_handlers = {}
58n/a
59n/a def _socketpair(self):
60n/a return socket.socketpair()
61n/a
62n/a def close(self):
63n/a super().close()
64n/a for sig in list(self._signal_handlers):
65n/a self.remove_signal_handler(sig)
66n/a
67n/a def _process_self_data(self, data):
68n/a for signum in data:
69n/a if not signum:
70n/a # ignore null bytes written by _write_to_self()
71n/a continue
72n/a self._handle_signal(signum)
73n/a
74n/a def add_signal_handler(self, sig, callback, *args):
75n/a """Add a handler for a signal. UNIX only.
76n/a
77n/a Raise ValueError if the signal number is invalid or uncatchable.
78n/a Raise RuntimeError if there is a problem setting up the handler.
79n/a """
80n/a if (coroutines.iscoroutine(callback)
81n/a or coroutines.iscoroutinefunction(callback)):
82n/a raise TypeError("coroutines cannot be used "
83n/a "with add_signal_handler()")
84n/a self._check_signal(sig)
85n/a self._check_closed()
86n/a try:
87n/a # set_wakeup_fd() raises ValueError if this is not the
88n/a # main thread. By calling it early we ensure that an
89n/a # event loop running in another thread cannot add a signal
90n/a # handler.
91n/a signal.set_wakeup_fd(self._csock.fileno())
92n/a except (ValueError, OSError) as exc:
93n/a raise RuntimeError(str(exc))
94n/a
95n/a handle = events.Handle(callback, args, self)
96n/a self._signal_handlers[sig] = handle
97n/a
98n/a try:
99n/a # Register a dummy signal handler to ask Python to write the signal
100n/a # number in the wakup file descriptor. _process_self_data() will
101n/a # read signal numbers from this file descriptor to handle signals.
102n/a signal.signal(sig, _sighandler_noop)
103n/a
104n/a # Set SA_RESTART to limit EINTR occurrences.
105n/a signal.siginterrupt(sig, False)
106n/a except OSError as exc:
107n/a del self._signal_handlers[sig]
108n/a if not self._signal_handlers:
109n/a try:
110n/a signal.set_wakeup_fd(-1)
111n/a except (ValueError, OSError) as nexc:
112n/a logger.info('set_wakeup_fd(-1) failed: %s', nexc)
113n/a
114n/a if exc.errno == errno.EINVAL:
115n/a raise RuntimeError('sig {} cannot be caught'.format(sig))
116n/a else:
117n/a raise
118n/a
119n/a def _handle_signal(self, sig):
120n/a """Internal helper that is the actual signal handler."""
121n/a handle = self._signal_handlers.get(sig)
122n/a if handle is None:
123n/a return # Assume it's some race condition.
124n/a if handle._cancelled:
125n/a self.remove_signal_handler(sig) # Remove it properly.
126n/a else:
127n/a self._add_callback_signalsafe(handle)
128n/a
129n/a def remove_signal_handler(self, sig):
130n/a """Remove a handler for a signal. UNIX only.
131n/a
132n/a Return True if a signal handler was removed, False if not.
133n/a """
134n/a self._check_signal(sig)
135n/a try:
136n/a del self._signal_handlers[sig]
137n/a except KeyError:
138n/a return False
139n/a
140n/a if sig == signal.SIGINT:
141n/a handler = signal.default_int_handler
142n/a else:
143n/a handler = signal.SIG_DFL
144n/a
145n/a try:
146n/a signal.signal(sig, handler)
147n/a except OSError as exc:
148n/a if exc.errno == errno.EINVAL:
149n/a raise RuntimeError('sig {} cannot be caught'.format(sig))
150n/a else:
151n/a raise
152n/a
153n/a if not self._signal_handlers:
154n/a try:
155n/a signal.set_wakeup_fd(-1)
156n/a except (ValueError, OSError) as exc:
157n/a logger.info('set_wakeup_fd(-1) failed: %s', exc)
158n/a
159n/a return True
160n/a
161n/a def _check_signal(self, sig):
162n/a """Internal helper to validate a signal.
163n/a
164n/a Raise ValueError if the signal number is invalid or uncatchable.
165n/a Raise RuntimeError if there is a problem setting up the handler.
166n/a """
167n/a if not isinstance(sig, int):
168n/a raise TypeError('sig must be an int, not {!r}'.format(sig))
169n/a
170n/a if not (1 <= sig < signal.NSIG):
171n/a raise ValueError(
172n/a 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
173n/a
174n/a def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175n/a extra=None):
176n/a return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177n/a
178n/a def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179n/a extra=None):
180n/a return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181n/a
182n/a @coroutine
183n/a def _make_subprocess_transport(self, protocol, args, shell,
184n/a stdin, stdout, stderr, bufsize,
185n/a extra=None, **kwargs):
186n/a with events.get_child_watcher() as watcher:
187n/a waiter = self.create_future()
188n/a transp = _UnixSubprocessTransport(self, protocol, args, shell,
189n/a stdin, stdout, stderr, bufsize,
190n/a waiter=waiter, extra=extra,
191n/a **kwargs)
192n/a
193n/a watcher.add_child_handler(transp.get_pid(),
194n/a self._child_watcher_callback, transp)
195n/a try:
196n/a yield from waiter
197n/a except Exception as exc:
198n/a # Workaround CPython bug #23353: using yield/yield-from in an
199n/a # except block of a generator doesn't clear properly
200n/a # sys.exc_info()
201n/a err = exc
202n/a else:
203n/a err = None
204n/a
205n/a if err is not None:
206n/a transp.close()
207n/a yield from transp._wait()
208n/a raise err
209n/a
210n/a return transp
211n/a
212n/a def _child_watcher_callback(self, pid, returncode, transp):
213n/a self.call_soon_threadsafe(transp._process_exited, returncode)
214n/a
215n/a @coroutine
216n/a def create_unix_connection(self, protocol_factory, path, *,
217n/a ssl=None, sock=None,
218n/a server_hostname=None):
219n/a assert server_hostname is None or isinstance(server_hostname, str)
220n/a if ssl:
221n/a if server_hostname is None:
222n/a raise ValueError(
223n/a 'you have to pass server_hostname when using ssl')
224n/a else:
225n/a if server_hostname is not None:
226n/a raise ValueError('server_hostname is only meaningful with ssl')
227n/a
228n/a if path is not None:
229n/a if sock is not None:
230n/a raise ValueError(
231n/a 'path and sock can not be specified at the same time')
232n/a
233n/a sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
234n/a try:
235n/a sock.setblocking(False)
236n/a yield from self.sock_connect(sock, path)
237n/a except:
238n/a sock.close()
239n/a raise
240n/a
241n/a else:
242n/a if sock is None:
243n/a raise ValueError('no path and sock were specified')
244n/a if (sock.family != socket.AF_UNIX or
245n/a not base_events._is_stream_socket(sock)):
246n/a raise ValueError(
247n/a 'A UNIX Domain Stream Socket was expected, got {!r}'
248n/a .format(sock))
249n/a sock.setblocking(False)
250n/a
251n/a transport, protocol = yield from self._create_connection_transport(
252n/a sock, protocol_factory, ssl, server_hostname)
253n/a return transport, protocol
254n/a
255n/a @coroutine
256n/a def create_unix_server(self, protocol_factory, path=None, *,
257n/a sock=None, backlog=100, ssl=None):
258n/a if isinstance(ssl, bool):
259n/a raise TypeError('ssl argument must be an SSLContext or None')
260n/a
261n/a if path is not None:
262n/a if sock is not None:
263n/a raise ValueError(
264n/a 'path and sock can not be specified at the same time')
265n/a
266n/a path = _fspath(path)
267n/a sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268n/a
269n/a # Check for abstract socket. `str` and `bytes` paths are supported.
270n/a if path[0] not in (0, '\x00'):
271n/a try:
272n/a if stat.S_ISSOCK(os.stat(path).st_mode):
273n/a os.remove(path)
274n/a except FileNotFoundError:
275n/a pass
276n/a except OSError as err:
277n/a # Directory may have permissions only to create socket.
278n/a logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
279n/a
280n/a try:
281n/a sock.bind(path)
282n/a except OSError as exc:
283n/a sock.close()
284n/a if exc.errno == errno.EADDRINUSE:
285n/a # Let's improve the error message by adding
286n/a # with what exact address it occurs.
287n/a msg = 'Address {!r} is already in use'.format(path)
288n/a raise OSError(errno.EADDRINUSE, msg) from None
289n/a else:
290n/a raise
291n/a except:
292n/a sock.close()
293n/a raise
294n/a else:
295n/a if sock is None:
296n/a raise ValueError(
297n/a 'path was not specified, and no sock specified')
298n/a
299n/a if (sock.family != socket.AF_UNIX or
300n/a not base_events._is_stream_socket(sock)):
301n/a raise ValueError(
302n/a 'A UNIX Domain Stream Socket was expected, got {!r}'
303n/a .format(sock))
304n/a
305n/a server = base_events.Server(self, [sock])
306n/a sock.listen(backlog)
307n/a sock.setblocking(False)
308n/a self._start_serving(protocol_factory, sock, ssl, server)
309n/a return server
310n/a
311n/a
312n/aif hasattr(os, 'set_blocking'):
313n/a def _set_nonblocking(fd):
314n/a os.set_blocking(fd, False)
315n/aelse:
316n/a import fcntl
317n/a
318n/a def _set_nonblocking(fd):
319n/a flags = fcntl.fcntl(fd, fcntl.F_GETFL)
320n/a flags = flags | os.O_NONBLOCK
321n/a fcntl.fcntl(fd, fcntl.F_SETFL, flags)
322n/a
323n/a
324n/aclass _UnixReadPipeTransport(transports.ReadTransport):
325n/a
326n/a max_size = 256 * 1024 # max bytes we read in one event loop iteration
327n/a
328n/a def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
329n/a super().__init__(extra)
330n/a self._extra['pipe'] = pipe
331n/a self._loop = loop
332n/a self._pipe = pipe
333n/a self._fileno = pipe.fileno()
334n/a self._protocol = protocol
335n/a self._closing = False
336n/a
337n/a mode = os.fstat(self._fileno).st_mode
338n/a if not (stat.S_ISFIFO(mode) or
339n/a stat.S_ISSOCK(mode) or
340n/a stat.S_ISCHR(mode)):
341n/a self._pipe = None
342n/a self._fileno = None
343n/a self._protocol = None
344n/a raise ValueError("Pipe transport is for pipes/sockets only.")
345n/a
346n/a _set_nonblocking(self._fileno)
347n/a
348n/a self._loop.call_soon(self._protocol.connection_made, self)
349n/a # only start reading when connection_made() has been called
350n/a self._loop.call_soon(self._loop._add_reader,
351n/a self._fileno, self._read_ready)
352n/a if waiter is not None:
353n/a # only wake up the waiter when connection_made() has been called
354n/a self._loop.call_soon(futures._set_result_unless_cancelled,
355n/a waiter, None)
356n/a
357n/a def __repr__(self):
358n/a info = [self.__class__.__name__]
359n/a if self._pipe is None:
360n/a info.append('closed')
361n/a elif self._closing:
362n/a info.append('closing')
363n/a info.append('fd=%s' % self._fileno)
364n/a selector = getattr(self._loop, '_selector', None)
365n/a if self._pipe is not None and selector is not None:
366n/a polling = selector_events._test_selector_event(
367n/a selector,
368n/a self._fileno, selectors.EVENT_READ)
369n/a if polling:
370n/a info.append('polling')
371n/a else:
372n/a info.append('idle')
373n/a elif self._pipe is not None:
374n/a info.append('open')
375n/a else:
376n/a info.append('closed')
377n/a return '<%s>' % ' '.join(info)
378n/a
379n/a def _read_ready(self):
380n/a try:
381n/a data = os.read(self._fileno, self.max_size)
382n/a except (BlockingIOError, InterruptedError):
383n/a pass
384n/a except OSError as exc:
385n/a self._fatal_error(exc, 'Fatal read error on pipe transport')
386n/a else:
387n/a if data:
388n/a self._protocol.data_received(data)
389n/a else:
390n/a if self._loop.get_debug():
391n/a logger.info("%r was closed by peer", self)
392n/a self._closing = True
393n/a self._loop._remove_reader(self._fileno)
394n/a self._loop.call_soon(self._protocol.eof_received)
395n/a self._loop.call_soon(self._call_connection_lost, None)
396n/a
397n/a def pause_reading(self):
398n/a self._loop._remove_reader(self._fileno)
399n/a
400n/a def resume_reading(self):
401n/a self._loop._add_reader(self._fileno, self._read_ready)
402n/a
403n/a def set_protocol(self, protocol):
404n/a self._protocol = protocol
405n/a
406n/a def get_protocol(self):
407n/a return self._protocol
408n/a
409n/a def is_closing(self):
410n/a return self._closing
411n/a
412n/a def close(self):
413n/a if not self._closing:
414n/a self._close(None)
415n/a
416n/a # On Python 3.3 and older, objects with a destructor part of a reference
417n/a # cycle are never destroyed. It's not more the case on Python 3.4 thanks
418n/a # to the PEP 442.
419n/a if compat.PY34:
420n/a def __del__(self):
421n/a if self._pipe is not None:
422n/a warnings.warn("unclosed transport %r" % self, ResourceWarning,
423n/a source=self)
424n/a self._pipe.close()
425n/a
426n/a def _fatal_error(self, exc, message='Fatal error on pipe transport'):
427n/a # should be called by exception handler only
428n/a if (isinstance(exc, OSError) and exc.errno == errno.EIO):
429n/a if self._loop.get_debug():
430n/a logger.debug("%r: %s", self, message, exc_info=True)
431n/a else:
432n/a self._loop.call_exception_handler({
433n/a 'message': message,
434n/a 'exception': exc,
435n/a 'transport': self,
436n/a 'protocol': self._protocol,
437n/a })
438n/a self._close(exc)
439n/a
440n/a def _close(self, exc):
441n/a self._closing = True
442n/a self._loop._remove_reader(self._fileno)
443n/a self._loop.call_soon(self._call_connection_lost, exc)
444n/a
445n/a def _call_connection_lost(self, exc):
446n/a try:
447n/a self._protocol.connection_lost(exc)
448n/a finally:
449n/a self._pipe.close()
450n/a self._pipe = None
451n/a self._protocol = None
452n/a self._loop = None
453n/a
454n/a
455n/aclass _UnixWritePipeTransport(transports._FlowControlMixin,
456n/a transports.WriteTransport):
457n/a
458n/a def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
459n/a super().__init__(extra, loop)
460n/a self._extra['pipe'] = pipe
461n/a self._pipe = pipe
462n/a self._fileno = pipe.fileno()
463n/a self._protocol = protocol
464n/a self._buffer = bytearray()
465n/a self._conn_lost = 0
466n/a self._closing = False # Set when close() or write_eof() called.
467n/a
468n/a mode = os.fstat(self._fileno).st_mode
469n/a is_char = stat.S_ISCHR(mode)
470n/a is_fifo = stat.S_ISFIFO(mode)
471n/a is_socket = stat.S_ISSOCK(mode)
472n/a if not (is_char or is_fifo or is_socket):
473n/a self._pipe = None
474n/a self._fileno = None
475n/a self._protocol = None
476n/a raise ValueError("Pipe transport is only for "
477n/a "pipes, sockets and character devices")
478n/a
479n/a _set_nonblocking(self._fileno)
480n/a self._loop.call_soon(self._protocol.connection_made, self)
481n/a
482n/a # On AIX, the reader trick (to be notified when the read end of the
483n/a # socket is closed) only works for sockets. On other platforms it
484n/a # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
485n/a if is_socket or (is_fifo and not sys.platform.startswith("aix")):
486n/a # only start reading when connection_made() has been called
487n/a self._loop.call_soon(self._loop._add_reader,
488n/a self._fileno, self._read_ready)
489n/a
490n/a if waiter is not None:
491n/a # only wake up the waiter when connection_made() has been called
492n/a self._loop.call_soon(futures._set_result_unless_cancelled,
493n/a waiter, None)
494n/a
495n/a def __repr__(self):
496n/a info = [self.__class__.__name__]
497n/a if self._pipe is None:
498n/a info.append('closed')
499n/a elif self._closing:
500n/a info.append('closing')
501n/a info.append('fd=%s' % self._fileno)
502n/a selector = getattr(self._loop, '_selector', None)
503n/a if self._pipe is not None and selector is not None:
504n/a polling = selector_events._test_selector_event(
505n/a selector,
506n/a self._fileno, selectors.EVENT_WRITE)
507n/a if polling:
508n/a info.append('polling')
509n/a else:
510n/a info.append('idle')
511n/a
512n/a bufsize = self.get_write_buffer_size()
513n/a info.append('bufsize=%s' % bufsize)
514n/a elif self._pipe is not None:
515n/a info.append('open')
516n/a else:
517n/a info.append('closed')
518n/a return '<%s>' % ' '.join(info)
519n/a
520n/a def get_write_buffer_size(self):
521n/a return len(self._buffer)
522n/a
523n/a def _read_ready(self):
524n/a # Pipe was closed by peer.
525n/a if self._loop.get_debug():
526n/a logger.info("%r was closed by peer", self)
527n/a if self._buffer:
528n/a self._close(BrokenPipeError())
529n/a else:
530n/a self._close()
531n/a
532n/a def write(self, data):
533n/a assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
534n/a if isinstance(data, bytearray):
535n/a data = memoryview(data)
536n/a if not data:
537n/a return
538n/a
539n/a if self._conn_lost or self._closing:
540n/a if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
541n/a logger.warning('pipe closed by peer or '
542n/a 'os.write(pipe, data) raised exception.')
543n/a self._conn_lost += 1
544n/a return
545n/a
546n/a if not self._buffer:
547n/a # Attempt to send it right away first.
548n/a try:
549n/a n = os.write(self._fileno, data)
550n/a except (BlockingIOError, InterruptedError):
551n/a n = 0
552n/a except Exception as exc:
553n/a self._conn_lost += 1
554n/a self._fatal_error(exc, 'Fatal write error on pipe transport')
555n/a return
556n/a if n == len(data):
557n/a return
558n/a elif n > 0:
559n/a data = memoryview(data)[n:]
560n/a self._loop._add_writer(self._fileno, self._write_ready)
561n/a
562n/a self._buffer += data
563n/a self._maybe_pause_protocol()
564n/a
565n/a def _write_ready(self):
566n/a assert self._buffer, 'Data should not be empty'
567n/a
568n/a try:
569n/a n = os.write(self._fileno, self._buffer)
570n/a except (BlockingIOError, InterruptedError):
571n/a pass
572n/a except Exception as exc:
573n/a self._buffer.clear()
574n/a self._conn_lost += 1
575n/a # Remove writer here, _fatal_error() doesn't it
576n/a # because _buffer is empty.
577n/a self._loop._remove_writer(self._fileno)
578n/a self._fatal_error(exc, 'Fatal write error on pipe transport')
579n/a else:
580n/a if n == len(self._buffer):
581n/a self._buffer.clear()
582n/a self._loop._remove_writer(self._fileno)
583n/a self._maybe_resume_protocol() # May append to buffer.
584n/a if self._closing:
585n/a self._loop._remove_reader(self._fileno)
586n/a self._call_connection_lost(None)
587n/a return
588n/a elif n > 0:
589n/a del self._buffer[:n]
590n/a
591n/a def can_write_eof(self):
592n/a return True
593n/a
594n/a def write_eof(self):
595n/a if self._closing:
596n/a return
597n/a assert self._pipe
598n/a self._closing = True
599n/a if not self._buffer:
600n/a self._loop._remove_reader(self._fileno)
601n/a self._loop.call_soon(self._call_connection_lost, None)
602n/a
603n/a def set_protocol(self, protocol):
604n/a self._protocol = protocol
605n/a
606n/a def get_protocol(self):
607n/a return self._protocol
608n/a
609n/a def is_closing(self):
610n/a return self._closing
611n/a
612n/a def close(self):
613n/a if self._pipe is not None and not self._closing:
614n/a # write_eof is all what we needed to close the write pipe
615n/a self.write_eof()
616n/a
617n/a # On Python 3.3 and older, objects with a destructor part of a reference
618n/a # cycle are never destroyed. It's not more the case on Python 3.4 thanks
619n/a # to the PEP 442.
620n/a if compat.PY34:
621n/a def __del__(self):
622n/a if self._pipe is not None:
623n/a warnings.warn("unclosed transport %r" % self, ResourceWarning,
624n/a source=self)
625n/a self._pipe.close()
626n/a
627n/a def abort(self):
628n/a self._close(None)
629n/a
630n/a def _fatal_error(self, exc, message='Fatal error on pipe transport'):
631n/a # should be called by exception handler only
632n/a if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
633n/a if self._loop.get_debug():
634n/a logger.debug("%r: %s", self, message, exc_info=True)
635n/a else:
636n/a self._loop.call_exception_handler({
637n/a 'message': message,
638n/a 'exception': exc,
639n/a 'transport': self,
640n/a 'protocol': self._protocol,
641n/a })
642n/a self._close(exc)
643n/a
644n/a def _close(self, exc=None):
645n/a self._closing = True
646n/a if self._buffer:
647n/a self._loop._remove_writer(self._fileno)
648n/a self._buffer.clear()
649n/a self._loop._remove_reader(self._fileno)
650n/a self._loop.call_soon(self._call_connection_lost, exc)
651n/a
652n/a def _call_connection_lost(self, exc):
653n/a try:
654n/a self._protocol.connection_lost(exc)
655n/a finally:
656n/a self._pipe.close()
657n/a self._pipe = None
658n/a self._protocol = None
659n/a self._loop = None
660n/a
661n/a
662n/aif hasattr(os, 'set_inheritable'):
663n/a # Python 3.4 and newer
664n/a _set_inheritable = os.set_inheritable
665n/aelse:
666n/a import fcntl
667n/a
668n/a def _set_inheritable(fd, inheritable):
669n/a cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
670n/a
671n/a old = fcntl.fcntl(fd, fcntl.F_GETFD)
672n/a if not inheritable:
673n/a fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
674n/a else:
675n/a fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
676n/a
677n/a
678n/aclass _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
679n/a
680n/a def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
681n/a stdin_w = None
682n/a if stdin == subprocess.PIPE:
683n/a # Use a socket pair for stdin, since not all platforms
684n/a # support selecting read events on the write end of a
685n/a # socket (which we use in order to detect closing of the
686n/a # other end). Notably this is needed on AIX, and works
687n/a # just fine on other platforms.
688n/a stdin, stdin_w = self._loop._socketpair()
689n/a
690n/a # Mark the write end of the stdin pipe as non-inheritable,
691n/a # needed by close_fds=False on Python 3.3 and older
692n/a # (Python 3.4 implements the PEP 446, socketpair returns
693n/a # non-inheritable sockets)
694n/a _set_inheritable(stdin_w.fileno(), False)
695n/a self._proc = subprocess.Popen(
696n/a args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
697n/a universal_newlines=False, bufsize=bufsize, **kwargs)
698n/a if stdin_w is not None:
699n/a stdin.close()
700n/a self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
701n/a
702n/a
703n/aclass AbstractChildWatcher:
704n/a """Abstract base class for monitoring child processes.
705n/a
706n/a Objects derived from this class monitor a collection of subprocesses and
707n/a report their termination or interruption by a signal.
708n/a
709n/a New callbacks are registered with .add_child_handler(). Starting a new
710n/a process must be done within a 'with' block to allow the watcher to suspend
711n/a its activity until the new process if fully registered (this is needed to
712n/a prevent a race condition in some implementations).
713n/a
714n/a Example:
715n/a with watcher:
716n/a proc = subprocess.Popen("sleep 1")
717n/a watcher.add_child_handler(proc.pid, callback)
718n/a
719n/a Notes:
720n/a Implementations of this class must be thread-safe.
721n/a
722n/a Since child watcher objects may catch the SIGCHLD signal and call
723n/a waitpid(-1), there should be only one active object per process.
724n/a """
725n/a
726n/a def add_child_handler(self, pid, callback, *args):
727n/a """Register a new child handler.
728n/a
729n/a Arrange for callback(pid, returncode, *args) to be called when
730n/a process 'pid' terminates. Specifying another callback for the same
731n/a process replaces the previous handler.
732n/a
733n/a Note: callback() must be thread-safe.
734n/a """
735n/a raise NotImplementedError()
736n/a
737n/a def remove_child_handler(self, pid):
738n/a """Removes the handler for process 'pid'.
739n/a
740n/a The function returns True if the handler was successfully removed,
741n/a False if there was nothing to remove."""
742n/a
743n/a raise NotImplementedError()
744n/a
745n/a def attach_loop(self, loop):
746n/a """Attach the watcher to an event loop.
747n/a
748n/a If the watcher was previously attached to an event loop, then it is
749n/a first detached before attaching to the new loop.
750n/a
751n/a Note: loop may be None.
752n/a """
753n/a raise NotImplementedError()
754n/a
755n/a def close(self):
756n/a """Close the watcher.
757n/a
758n/a This must be called to make sure that any underlying resource is freed.
759n/a """
760n/a raise NotImplementedError()
761n/a
762n/a def __enter__(self):
763n/a """Enter the watcher's context and allow starting new processes
764n/a
765n/a This function must return self"""
766n/a raise NotImplementedError()
767n/a
768n/a def __exit__(self, a, b, c):
769n/a """Exit the watcher's context"""
770n/a raise NotImplementedError()
771n/a
772n/a
773n/aclass BaseChildWatcher(AbstractChildWatcher):
774n/a
775n/a def __init__(self):
776n/a self._loop = None
777n/a self._callbacks = {}
778n/a
779n/a def close(self):
780n/a self.attach_loop(None)
781n/a
782n/a def _do_waitpid(self, expected_pid):
783n/a raise NotImplementedError()
784n/a
785n/a def _do_waitpid_all(self):
786n/a raise NotImplementedError()
787n/a
788n/a def attach_loop(self, loop):
789n/a assert loop is None or isinstance(loop, events.AbstractEventLoop)
790n/a
791n/a if self._loop is not None and loop is None and self._callbacks:
792n/a warnings.warn(
793n/a 'A loop is being detached '
794n/a 'from a child watcher with pending handlers',
795n/a RuntimeWarning)
796n/a
797n/a if self._loop is not None:
798n/a self._loop.remove_signal_handler(signal.SIGCHLD)
799n/a
800n/a self._loop = loop
801n/a if loop is not None:
802n/a loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
803n/a
804n/a # Prevent a race condition in case a child terminated
805n/a # during the switch.
806n/a self._do_waitpid_all()
807n/a
808n/a def _sig_chld(self):
809n/a try:
810n/a self._do_waitpid_all()
811n/a except Exception as exc:
812n/a # self._loop should always be available here
813n/a # as '_sig_chld' is added as a signal handler
814n/a # in 'attach_loop'
815n/a self._loop.call_exception_handler({
816n/a 'message': 'Unknown exception in SIGCHLD handler',
817n/a 'exception': exc,
818n/a })
819n/a
820n/a def _compute_returncode(self, status):
821n/a if os.WIFSIGNALED(status):
822n/a # The child process died because of a signal.
823n/a return -os.WTERMSIG(status)
824n/a elif os.WIFEXITED(status):
825n/a # The child process exited (e.g sys.exit()).
826n/a return os.WEXITSTATUS(status)
827n/a else:
828n/a # The child exited, but we don't understand its status.
829n/a # This shouldn't happen, but if it does, let's just
830n/a # return that status; perhaps that helps debug it.
831n/a return status
832n/a
833n/a
834n/aclass SafeChildWatcher(BaseChildWatcher):
835n/a """'Safe' child watcher implementation.
836n/a
837n/a This implementation avoids disrupting other code spawning processes by
838n/a polling explicitly each process in the SIGCHLD handler instead of calling
839n/a os.waitpid(-1).
840n/a
841n/a This is a safe solution but it has a significant overhead when handling a
842n/a big number of children (O(n) each time SIGCHLD is raised)
843n/a """
844n/a
845n/a def close(self):
846n/a self._callbacks.clear()
847n/a super().close()
848n/a
849n/a def __enter__(self):
850n/a return self
851n/a
852n/a def __exit__(self, a, b, c):
853n/a pass
854n/a
855n/a def add_child_handler(self, pid, callback, *args):
856n/a if self._loop is None:
857n/a raise RuntimeError(
858n/a "Cannot add child handler, "
859n/a "the child watcher does not have a loop attached")
860n/a
861n/a self._callbacks[pid] = (callback, args)
862n/a
863n/a # Prevent a race condition in case the child is already terminated.
864n/a self._do_waitpid(pid)
865n/a
866n/a def remove_child_handler(self, pid):
867n/a try:
868n/a del self._callbacks[pid]
869n/a return True
870n/a except KeyError:
871n/a return False
872n/a
873n/a def _do_waitpid_all(self):
874n/a
875n/a for pid in list(self._callbacks):
876n/a self._do_waitpid(pid)
877n/a
878n/a def _do_waitpid(self, expected_pid):
879n/a assert expected_pid > 0
880n/a
881n/a try:
882n/a pid, status = os.waitpid(expected_pid, os.WNOHANG)
883n/a except ChildProcessError:
884n/a # The child process is already reaped
885n/a # (may happen if waitpid() is called elsewhere).
886n/a pid = expected_pid
887n/a returncode = 255
888n/a logger.warning(
889n/a "Unknown child process pid %d, will report returncode 255",
890n/a pid)
891n/a else:
892n/a if pid == 0:
893n/a # The child process is still alive.
894n/a return
895n/a
896n/a returncode = self._compute_returncode(status)
897n/a if self._loop.get_debug():
898n/a logger.debug('process %s exited with returncode %s',
899n/a expected_pid, returncode)
900n/a
901n/a try:
902n/a callback, args = self._callbacks.pop(pid)
903n/a except KeyError: # pragma: no cover
904n/a # May happen if .remove_child_handler() is called
905n/a # after os.waitpid() returns.
906n/a if self._loop.get_debug():
907n/a logger.warning("Child watcher got an unexpected pid: %r",
908n/a pid, exc_info=True)
909n/a else:
910n/a callback(pid, returncode, *args)
911n/a
912n/a
913n/aclass FastChildWatcher(BaseChildWatcher):
914n/a """'Fast' child watcher implementation.
915n/a
916n/a This implementation reaps every terminated processes by calling
917n/a os.waitpid(-1) directly, possibly breaking other code spawning processes
918n/a and waiting for their termination.
919n/a
920n/a There is no noticeable overhead when handling a big number of children
921n/a (O(1) each time a child terminates).
922n/a """
923n/a def __init__(self):
924n/a super().__init__()
925n/a self._lock = threading.Lock()
926n/a self._zombies = {}
927n/a self._forks = 0
928n/a
929n/a def close(self):
930n/a self._callbacks.clear()
931n/a self._zombies.clear()
932n/a super().close()
933n/a
934n/a def __enter__(self):
935n/a with self._lock:
936n/a self._forks += 1
937n/a
938n/a return self
939n/a
940n/a def __exit__(self, a, b, c):
941n/a with self._lock:
942n/a self._forks -= 1
943n/a
944n/a if self._forks or not self._zombies:
945n/a return
946n/a
947n/a collateral_victims = str(self._zombies)
948n/a self._zombies.clear()
949n/a
950n/a logger.warning(
951n/a "Caught subprocesses termination from unknown pids: %s",
952n/a collateral_victims)
953n/a
954n/a def add_child_handler(self, pid, callback, *args):
955n/a assert self._forks, "Must use the context manager"
956n/a
957n/a if self._loop is None:
958n/a raise RuntimeError(
959n/a "Cannot add child handler, "
960n/a "the child watcher does not have a loop attached")
961n/a
962n/a with self._lock:
963n/a try:
964n/a returncode = self._zombies.pop(pid)
965n/a except KeyError:
966n/a # The child is running.
967n/a self._callbacks[pid] = callback, args
968n/a return
969n/a
970n/a # The child is dead already. We can fire the callback.
971n/a callback(pid, returncode, *args)
972n/a
973n/a def remove_child_handler(self, pid):
974n/a try:
975n/a del self._callbacks[pid]
976n/a return True
977n/a except KeyError:
978n/a return False
979n/a
980n/a def _do_waitpid_all(self):
981n/a # Because of signal coalescing, we must keep calling waitpid() as
982n/a # long as we're able to reap a child.
983n/a while True:
984n/a try:
985n/a pid, status = os.waitpid(-1, os.WNOHANG)
986n/a except ChildProcessError:
987n/a # No more child processes exist.
988n/a return
989n/a else:
990n/a if pid == 0:
991n/a # A child process is still alive.
992n/a return
993n/a
994n/a returncode = self._compute_returncode(status)
995n/a
996n/a with self._lock:
997n/a try:
998n/a callback, args = self._callbacks.pop(pid)
999n/a except KeyError:
1000n/a # unknown child
1001n/a if self._forks:
1002n/a # It may not be registered yet.
1003n/a self._zombies[pid] = returncode
1004n/a if self._loop.get_debug():
1005n/a logger.debug('unknown process %s exited '
1006n/a 'with returncode %s',
1007n/a pid, returncode)
1008n/a continue
1009n/a callback = None
1010n/a else:
1011n/a if self._loop.get_debug():
1012n/a logger.debug('process %s exited with returncode %s',
1013n/a pid, returncode)
1014n/a
1015n/a if callback is None:
1016n/a logger.warning(
1017n/a "Caught subprocess termination from unknown pid: "
1018n/a "%d -> %d", pid, returncode)
1019n/a else:
1020n/a callback(pid, returncode, *args)
1021n/a
1022n/a
1023n/aclass _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1024n/a """UNIX event loop policy with a watcher for child processes."""
1025n/a _loop_factory = _UnixSelectorEventLoop
1026n/a
1027n/a def __init__(self):
1028n/a super().__init__()
1029n/a self._watcher = None
1030n/a
1031n/a def _init_watcher(self):
1032n/a with events._lock:
1033n/a if self._watcher is None: # pragma: no branch
1034n/a self._watcher = SafeChildWatcher()
1035n/a if isinstance(threading.current_thread(),
1036n/a threading._MainThread):
1037n/a self._watcher.attach_loop(self._local._loop)
1038n/a
1039n/a def set_event_loop(self, loop):
1040n/a """Set the event loop.
1041n/a
1042n/a As a side effect, if a child watcher was set before, then calling
1043n/a .set_event_loop() from the main thread will call .attach_loop(loop) on
1044n/a the child watcher.
1045n/a """
1046n/a
1047n/a super().set_event_loop(loop)
1048n/a
1049n/a if self._watcher is not None and \
1050n/a isinstance(threading.current_thread(), threading._MainThread):
1051n/a self._watcher.attach_loop(loop)
1052n/a
1053n/a def get_child_watcher(self):
1054n/a """Get the watcher for child processes.
1055n/a
1056n/a If not yet set, a SafeChildWatcher object is automatically created.
1057n/a """
1058n/a if self._watcher is None:
1059n/a self._init_watcher()
1060n/a
1061n/a return self._watcher
1062n/a
1063n/a def set_child_watcher(self, watcher):
1064n/a """Set the watcher for child processes."""
1065n/a
1066n/a assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1067n/a
1068n/a if self._watcher is not None:
1069n/a self._watcher.close()
1070n/a
1071n/a self._watcher = watcher
1072n/a
1073n/aSelectorEventLoop = _UnixSelectorEventLoop
1074n/aDefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy