ยปCore Development>Code coverage>Lib/multiprocessing/util.py

Python code coverage for Lib/multiprocessing/util.py

#countcontent
1n/a#
2n/a# Module providing various facilities to other parts of the package
3n/a#
4n/a# multiprocessing/util.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/aimport os
11n/aimport itertools
12n/aimport sys
13n/aimport weakref
14n/aimport atexit
15n/aimport threading # we want threading to install it's
16n/a # cleanup function before multiprocessing does
17n/afrom subprocess import _args_from_interpreter_flags
18n/a
19n/afrom . import process
20n/a
21n/a__all__ = [
22n/a 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
23n/a 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
24n/a 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
25n/a 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
26n/a ]
27n/a
28n/a#
29n/a# Logging
30n/a#
31n/a
32n/aNOTSET = 0
33n/aSUBDEBUG = 5
34n/aDEBUG = 10
35n/aINFO = 20
36n/aSUBWARNING = 25
37n/a
38n/aLOGGER_NAME = 'multiprocessing'
39n/aDEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
40n/a
41n/a_logger = None
42n/a_log_to_stderr = False
43n/a
44n/adef sub_debug(msg, *args):
45n/a if _logger:
46n/a _logger.log(SUBDEBUG, msg, *args)
47n/a
48n/adef debug(msg, *args):
49n/a if _logger:
50n/a _logger.log(DEBUG, msg, *args)
51n/a
52n/adef info(msg, *args):
53n/a if _logger:
54n/a _logger.log(INFO, msg, *args)
55n/a
56n/adef sub_warning(msg, *args):
57n/a if _logger:
58n/a _logger.log(SUBWARNING, msg, *args)
59n/a
60n/adef get_logger():
61n/a '''
62n/a Returns logger used by multiprocessing
63n/a '''
64n/a global _logger
65n/a import logging
66n/a
67n/a logging._acquireLock()
68n/a try:
69n/a if not _logger:
70n/a
71n/a _logger = logging.getLogger(LOGGER_NAME)
72n/a _logger.propagate = 0
73n/a
74n/a # XXX multiprocessing should cleanup before logging
75n/a if hasattr(atexit, 'unregister'):
76n/a atexit.unregister(_exit_function)
77n/a atexit.register(_exit_function)
78n/a else:
79n/a atexit._exithandlers.remove((_exit_function, (), {}))
80n/a atexit._exithandlers.append((_exit_function, (), {}))
81n/a
82n/a finally:
83n/a logging._releaseLock()
84n/a
85n/a return _logger
86n/a
87n/adef log_to_stderr(level=None):
88n/a '''
89n/a Turn on logging and add a handler which prints to stderr
90n/a '''
91n/a global _log_to_stderr
92n/a import logging
93n/a
94n/a logger = get_logger()
95n/a formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
96n/a handler = logging.StreamHandler()
97n/a handler.setFormatter(formatter)
98n/a logger.addHandler(handler)
99n/a
100n/a if level:
101n/a logger.setLevel(level)
102n/a _log_to_stderr = True
103n/a return _logger
104n/a
105n/a#
106n/a# Function returning a temp directory which will be removed on exit
107n/a#
108n/a
109n/adef get_temp_dir():
110n/a # get name of a temp directory which will be automatically cleaned up
111n/a tempdir = process.current_process()._config.get('tempdir')
112n/a if tempdir is None:
113n/a import shutil, tempfile
114n/a tempdir = tempfile.mkdtemp(prefix='pymp-')
115n/a info('created temp directory %s', tempdir)
116n/a Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
117n/a process.current_process()._config['tempdir'] = tempdir
118n/a return tempdir
119n/a
120n/a#
121n/a# Support for reinitialization of objects when bootstrapping a child process
122n/a#
123n/a
124n/a_afterfork_registry = weakref.WeakValueDictionary()
125n/a_afterfork_counter = itertools.count()
126n/a
127n/adef _run_after_forkers():
128n/a items = list(_afterfork_registry.items())
129n/a items.sort()
130n/a for (index, ident, func), obj in items:
131n/a try:
132n/a func(obj)
133n/a except Exception as e:
134n/a info('after forker raised exception %s', e)
135n/a
136n/adef register_after_fork(obj, func):
137n/a _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
138n/a
139n/a#
140n/a# Finalization using weakrefs
141n/a#
142n/a
143n/a_finalizer_registry = {}
144n/a_finalizer_counter = itertools.count()
145n/a
146n/a
147n/aclass Finalize(object):
148n/a '''
149n/a Class which supports object finalization using weakrefs
150n/a '''
151n/a def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
152n/a assert exitpriority is None or type(exitpriority) is int
153n/a
154n/a if obj is not None:
155n/a self._weakref = weakref.ref(obj, self)
156n/a else:
157n/a assert exitpriority is not None
158n/a
159n/a self._callback = callback
160n/a self._args = args
161n/a self._kwargs = kwargs or {}
162n/a self._key = (exitpriority, next(_finalizer_counter))
163n/a self._pid = os.getpid()
164n/a
165n/a _finalizer_registry[self._key] = self
166n/a
167n/a def __call__(self, wr=None,
168n/a # Need to bind these locally because the globals can have
169n/a # been cleared at shutdown
170n/a _finalizer_registry=_finalizer_registry,
171n/a sub_debug=sub_debug, getpid=os.getpid):
172n/a '''
173n/a Run the callback unless it has already been called or cancelled
174n/a '''
175n/a try:
176n/a del _finalizer_registry[self._key]
177n/a except KeyError:
178n/a sub_debug('finalizer no longer registered')
179n/a else:
180n/a if self._pid != getpid():
181n/a sub_debug('finalizer ignored because different process')
182n/a res = None
183n/a else:
184n/a sub_debug('finalizer calling %s with args %s and kwargs %s',
185n/a self._callback, self._args, self._kwargs)
186n/a res = self._callback(*self._args, **self._kwargs)
187n/a self._weakref = self._callback = self._args = \
188n/a self._kwargs = self._key = None
189n/a return res
190n/a
191n/a def cancel(self):
192n/a '''
193n/a Cancel finalization of the object
194n/a '''
195n/a try:
196n/a del _finalizer_registry[self._key]
197n/a except KeyError:
198n/a pass
199n/a else:
200n/a self._weakref = self._callback = self._args = \
201n/a self._kwargs = self._key = None
202n/a
203n/a def still_active(self):
204n/a '''
205n/a Return whether this finalizer is still waiting to invoke callback
206n/a '''
207n/a return self._key in _finalizer_registry
208n/a
209n/a def __repr__(self):
210n/a try:
211n/a obj = self._weakref()
212n/a except (AttributeError, TypeError):
213n/a obj = None
214n/a
215n/a if obj is None:
216n/a return '<%s object, dead>' % self.__class__.__name__
217n/a
218n/a x = '<%s object, callback=%s' % (
219n/a self.__class__.__name__,
220n/a getattr(self._callback, '__name__', self._callback))
221n/a if self._args:
222n/a x += ', args=' + str(self._args)
223n/a if self._kwargs:
224n/a x += ', kwargs=' + str(self._kwargs)
225n/a if self._key[0] is not None:
226n/a x += ', exitprority=' + str(self._key[0])
227n/a return x + '>'
228n/a
229n/a
230n/adef _run_finalizers(minpriority=None):
231n/a '''
232n/a Run all finalizers whose exit priority is not None and at least minpriority
233n/a
234n/a Finalizers with highest priority are called first; finalizers with
235n/a the same priority will be called in reverse order of creation.
236n/a '''
237n/a if _finalizer_registry is None:
238n/a # This function may be called after this module's globals are
239n/a # destroyed. See the _exit_function function in this module for more
240n/a # notes.
241n/a return
242n/a
243n/a if minpriority is None:
244n/a f = lambda p : p[0][0] is not None
245n/a else:
246n/a f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
247n/a
248n/a items = [x for x in list(_finalizer_registry.items()) if f(x)]
249n/a items.sort(reverse=True)
250n/a
251n/a for key, finalizer in items:
252n/a sub_debug('calling %s', finalizer)
253n/a try:
254n/a finalizer()
255n/a except Exception:
256n/a import traceback
257n/a traceback.print_exc()
258n/a
259n/a if minpriority is None:
260n/a _finalizer_registry.clear()
261n/a
262n/a#
263n/a# Clean up on exit
264n/a#
265n/a
266n/adef is_exiting():
267n/a '''
268n/a Returns true if the process is shutting down
269n/a '''
270n/a return _exiting or _exiting is None
271n/a
272n/a_exiting = False
273n/a
274n/adef _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
275n/a active_children=process.active_children,
276n/a current_process=process.current_process):
277n/a # We hold on to references to functions in the arglist due to the
278n/a # situation described below, where this function is called after this
279n/a # module's globals are destroyed.
280n/a
281n/a global _exiting
282n/a
283n/a if not _exiting:
284n/a _exiting = True
285n/a
286n/a info('process shutting down')
287n/a debug('running all "atexit" finalizers with priority >= 0')
288n/a _run_finalizers(0)
289n/a
290n/a if current_process() is not None:
291n/a # We check if the current process is None here because if
292n/a # it's None, any call to ``active_children()`` will raise
293n/a # an AttributeError (active_children winds up trying to
294n/a # get attributes from util._current_process). One
295n/a # situation where this can happen is if someone has
296n/a # manipulated sys.modules, causing this module to be
297n/a # garbage collected. The destructor for the module type
298n/a # then replaces all values in the module dict with None.
299n/a # For instance, after setuptools runs a test it replaces
300n/a # sys.modules with a copy created earlier. See issues
301n/a # #9775 and #15881. Also related: #4106, #9205, and
302n/a # #9207.
303n/a
304n/a for p in active_children():
305n/a if p.daemon:
306n/a info('calling terminate() for daemon %s', p.name)
307n/a p._popen.terminate()
308n/a
309n/a for p in active_children():
310n/a info('calling join() for process %s', p.name)
311n/a p.join()
312n/a
313n/a debug('running the remaining "atexit" finalizers')
314n/a _run_finalizers()
315n/a
316n/aatexit.register(_exit_function)
317n/a
318n/a#
319n/a# Some fork aware types
320n/a#
321n/a
322n/aclass ForkAwareThreadLock(object):
323n/a def __init__(self):
324n/a self._reset()
325n/a register_after_fork(self, ForkAwareThreadLock._reset)
326n/a
327n/a def _reset(self):
328n/a self._lock = threading.Lock()
329n/a self.acquire = self._lock.acquire
330n/a self.release = self._lock.release
331n/a
332n/a def __enter__(self):
333n/a return self._lock.__enter__()
334n/a
335n/a def __exit__(self, *args):
336n/a return self._lock.__exit__(*args)
337n/a
338n/a
339n/aclass ForkAwareLocal(threading.local):
340n/a def __init__(self):
341n/a register_after_fork(self, lambda obj : obj.__dict__.clear())
342n/a def __reduce__(self):
343n/a return type(self), ()
344n/a
345n/a#
346n/a# Close fds except those specified
347n/a#
348n/a
349n/atry:
350n/a MAXFD = os.sysconf("SC_OPEN_MAX")
351n/aexcept Exception:
352n/a MAXFD = 256
353n/a
354n/adef close_all_fds_except(fds):
355n/a fds = list(fds) + [-1, MAXFD]
356n/a fds.sort()
357n/a assert fds[-1] == MAXFD, 'fd too large'
358n/a for i in range(len(fds) - 1):
359n/a os.closerange(fds[i]+1, fds[i+1])
360n/a#
361n/a# Close sys.stdin and replace stdin with os.devnull
362n/a#
363n/a
364n/adef _close_stdin():
365n/a if sys.stdin is None:
366n/a return
367n/a
368n/a try:
369n/a sys.stdin.close()
370n/a except (OSError, ValueError):
371n/a pass
372n/a
373n/a try:
374n/a fd = os.open(os.devnull, os.O_RDONLY)
375n/a try:
376n/a sys.stdin = open(fd, closefd=False)
377n/a except:
378n/a os.close(fd)
379n/a raise
380n/a except (OSError, ValueError):
381n/a pass
382n/a
383n/a#
384n/a# Start a program with only specified fds kept open
385n/a#
386n/a
387n/adef spawnv_passfds(path, args, passfds):
388n/a import _posixsubprocess
389n/a passfds = sorted(passfds)
390n/a errpipe_read, errpipe_write = os.pipe()
391n/a try:
392n/a return _posixsubprocess.fork_exec(
393n/a args, [os.fsencode(path)], True, passfds, None, None,
394n/a -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
395n/a False, False, None)
396n/a finally:
397n/a os.close(errpipe_read)
398n/a os.close(errpipe_write)