ยปCore Development>Code coverage>Lib/multiprocessing/forking.py

Python code coverage for Lib/multiprocessing/forking.py

#countcontent
1n/a#
2n/a# Module for starting a process object using os.fork() or CreateProcess()
3n/a#
4n/a# multiprocessing/forking.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/aimport io
11n/aimport os
12n/aimport pickle
13n/aimport sys
14n/aimport signal
15n/aimport errno
16n/a
17n/afrom multiprocessing import util, process
18n/a
19n/a__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
20n/a
21n/a#
22n/a# Check that the current thread is spawning a child process
23n/a#
24n/a
25n/adef assert_spawning(self):
26n/a if not Popen.thread_is_spawning():
27n/a raise RuntimeError(
28n/a '%s objects should only be shared between processes'
29n/a ' through inheritance' % type(self).__name__
30n/a )
31n/a
32n/a#
33n/a# Try making some callable types picklable
34n/a#
35n/a
36n/afrom pickle import Pickler
37n/afrom copyreg import dispatch_table
38n/a
39n/aclass ForkingPickler(Pickler):
40n/a _extra_reducers = {}
41n/a def __init__(self, *args):
42n/a Pickler.__init__(self, *args)
43n/a self.dispatch_table = dispatch_table.copy()
44n/a self.dispatch_table.update(self._extra_reducers)
45n/a @classmethod
46n/a def register(cls, type, reduce):
47n/a cls._extra_reducers[type] = reduce
48n/a
49n/a @staticmethod
50n/a def dumps(obj):
51n/a buf = io.BytesIO()
52n/a ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
53n/a return buf.getbuffer()
54n/a
55n/a loads = pickle.loads
56n/a
57n/a
58n/adef _reduce_method(m):
59n/a if m.__self__ is None:
60n/a return getattr, (m.__class__, m.__func__.__name__)
61n/a else:
62n/a return getattr, (m.__self__, m.__func__.__name__)
63n/aclass _C:
64n/a def f(self):
65n/a pass
66n/aForkingPickler.register(type(_C().f), _reduce_method)
67n/a
68n/a
69n/adef _reduce_method_descriptor(m):
70n/a return getattr, (m.__objclass__, m.__name__)
71n/aForkingPickler.register(type(list.append), _reduce_method_descriptor)
72n/aForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
73n/a
74n/atry:
75n/a from functools import partial
76n/aexcept ImportError:
77n/a pass
78n/aelse:
79n/a def _reduce_partial(p):
80n/a return _rebuild_partial, (p.func, p.args, p.keywords or {})
81n/a def _rebuild_partial(func, args, keywords):
82n/a return partial(func, *args, **keywords)
83n/a ForkingPickler.register(partial, _reduce_partial)
84n/a
85n/a#
86n/a# Unix
87n/a#
88n/a
89n/aif sys.platform != 'win32':
90n/a duplicate = os.dup
91n/a close = os.close
92n/a
93n/a #
94n/a # We define a Popen class similar to the one from subprocess, but
95n/a # whose constructor takes a process object as its argument.
96n/a #
97n/a
98n/a class Popen(object):
99n/a
100n/a def __init__(self, process_obj):
101n/a sys.stdout.flush()
102n/a sys.stderr.flush()
103n/a self.returncode = None
104n/a
105n/a r, w = os.pipe()
106n/a self.sentinel = r
107n/a
108n/a self.pid = os.fork()
109n/a if self.pid == 0:
110n/a os.close(r)
111n/a if 'random' in sys.modules:
112n/a import random
113n/a random.seed()
114n/a code = process_obj._bootstrap()
115n/a os._exit(code)
116n/a
117n/a # `w` will be closed when the child exits, at which point `r`
118n/a # will become ready for reading (using e.g. select()).
119n/a os.close(w)
120n/a util.Finalize(self, os.close, (r,))
121n/a
122n/a def poll(self, flag=os.WNOHANG):
123n/a if self.returncode is None:
124n/a while True:
125n/a try:
126n/a pid, sts = os.waitpid(self.pid, flag)
127n/a except OSError as e:
128n/a if e.errno == errno.EINTR:
129n/a continue
130n/a # Child process not yet created. See #1731717
131n/a # e.errno == errno.ECHILD == 10
132n/a return None
133n/a else:
134n/a break
135n/a if pid == self.pid:
136n/a if os.WIFSIGNALED(sts):
137n/a self.returncode = -os.WTERMSIG(sts)
138n/a else:
139n/a assert os.WIFEXITED(sts)
140n/a self.returncode = os.WEXITSTATUS(sts)
141n/a return self.returncode
142n/a
143n/a def wait(self, timeout=None):
144n/a if self.returncode is None:
145n/a if timeout is not None:
146n/a from .connection import wait
147n/a if not wait([self.sentinel], timeout):
148n/a return None
149n/a # This shouldn't block if wait() returned successfully.
150n/a return self.poll(os.WNOHANG if timeout == 0.0 else 0)
151n/a return self.returncode
152n/a
153n/a def terminate(self):
154n/a if self.returncode is None:
155n/a try:
156n/a os.kill(self.pid, signal.SIGTERM)
157n/a except OSError:
158n/a if self.wait(timeout=0.1) is None:
159n/a raise
160n/a
161n/a @staticmethod
162n/a def thread_is_spawning():
163n/a return False
164n/a
165n/a#
166n/a# Windows
167n/a#
168n/a
169n/aelse:
170n/a import _thread
171n/a import msvcrt
172n/a import _winapi
173n/a
174n/a from pickle import load, HIGHEST_PROTOCOL
175n/a
176n/a def dump(obj, file, protocol=None):
177n/a ForkingPickler(file, protocol).dump(obj)
178n/a
179n/a #
180n/a #
181n/a #
182n/a
183n/a TERMINATE = 0x10000
184n/a WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
185n/a WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
186n/a
187n/a close = _winapi.CloseHandle
188n/a
189n/a #
190n/a # _python_exe is the assumed path to the python executable.
191n/a # People embedding Python want to modify it.
192n/a #
193n/a
194n/a if WINSERVICE:
195n/a _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
196n/a else:
197n/a _python_exe = sys.executable
198n/a
199n/a def set_executable(exe):
200n/a global _python_exe
201n/a _python_exe = exe
202n/a
203n/a #
204n/a #
205n/a #
206n/a
207n/a def duplicate(handle, target_process=None, inheritable=False):
208n/a if target_process is None:
209n/a target_process = _winapi.GetCurrentProcess()
210n/a return _winapi.DuplicateHandle(
211n/a _winapi.GetCurrentProcess(), handle, target_process,
212n/a 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
213n/a )
214n/a
215n/a #
216n/a # We define a Popen class similar to the one from subprocess, but
217n/a # whose constructor takes a process object as its argument.
218n/a #
219n/a
220n/a class Popen(object):
221n/a '''
222n/a Start a subprocess to run the code of a process object
223n/a '''
224n/a _tls = _thread._local()
225n/a
226n/a def __init__(self, process_obj):
227n/a cmd = ' '.join('"%s"' % x for x in get_command_line())
228n/a prep_data = get_preparation_data(process_obj._name)
229n/a
230n/a # create pipe for communication with child
231n/a rfd, wfd = os.pipe()
232n/a
233n/a # get handle for read end of the pipe and make it inheritable
234n/a rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
235n/a os.close(rfd)
236n/a
237n/a with open(wfd, 'wb', closefd=True) as to_child:
238n/a # start process
239n/a try:
240n/a hp, ht, pid, tid = _winapi.CreateProcess(
241n/a _python_exe, cmd + (' %s' % rhandle),
242n/a None, None, 1, 0, None, None, None
243n/a )
244n/a _winapi.CloseHandle(ht)
245n/a finally:
246n/a close(rhandle)
247n/a
248n/a # set attributes of self
249n/a self.pid = pid
250n/a self.returncode = None
251n/a self._handle = hp
252n/a self.sentinel = int(hp)
253n/a util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
254n/a
255n/a # send information to child
256n/a Popen._tls.process_handle = int(hp)
257n/a try:
258n/a dump(prep_data, to_child, HIGHEST_PROTOCOL)
259n/a dump(process_obj, to_child, HIGHEST_PROTOCOL)
260n/a finally:
261n/a del Popen._tls.process_handle
262n/a
263n/a @staticmethod
264n/a def thread_is_spawning():
265n/a return getattr(Popen._tls, 'process_handle', None) is not None
266n/a
267n/a @staticmethod
268n/a def duplicate_for_child(handle):
269n/a return duplicate(handle, Popen._tls.process_handle)
270n/a
271n/a def wait(self, timeout=None):
272n/a if self.returncode is None:
273n/a if timeout is None:
274n/a msecs = _winapi.INFINITE
275n/a else:
276n/a msecs = max(0, int(timeout * 1000 + 0.5))
277n/a
278n/a res = _winapi.WaitForSingleObject(int(self._handle), msecs)
279n/a if res == _winapi.WAIT_OBJECT_0:
280n/a code = _winapi.GetExitCodeProcess(self._handle)
281n/a if code == TERMINATE:
282n/a code = -signal.SIGTERM
283n/a self.returncode = code
284n/a
285n/a return self.returncode
286n/a
287n/a def poll(self):
288n/a return self.wait(timeout=0)
289n/a
290n/a def terminate(self):
291n/a if self.returncode is None:
292n/a try:
293n/a _winapi.TerminateProcess(int(self._handle), TERMINATE)
294n/a except OSError:
295n/a if self.wait(timeout=1.0) is None:
296n/a raise
297n/a
298n/a #
299n/a #
300n/a #
301n/a
302n/a def is_forking(argv):
303n/a '''
304n/a Return whether commandline indicates we are forking
305n/a '''
306n/a if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
307n/a assert len(argv) == 3
308n/a return True
309n/a else:
310n/a return False
311n/a
312n/a
313n/a def freeze_support():
314n/a '''
315n/a Run code for process object if this in not the main process
316n/a '''
317n/a if is_forking(sys.argv):
318n/a main()
319n/a sys.exit()
320n/a
321n/a
322n/a def get_command_line():
323n/a '''
324n/a Returns prefix of command line used for spawning a child process
325n/a '''
326n/a if getattr(process.current_process(), '_inheriting', False):
327n/a raise RuntimeError('''
328n/a Attempt to start a new process before the current process
329n/a has finished its bootstrapping phase.
330n/a
331n/a This probably means that you are on Windows and you have
332n/a forgotten to use the proper idiom in the main module:
333n/a
334n/a if __name__ == '__main__':
335n/a freeze_support()
336n/a ...
337n/a
338n/a The "freeze_support()" line can be omitted if the program
339n/a is not going to be frozen to produce a Windows executable.''')
340n/a
341n/a if getattr(sys, 'frozen', False):
342n/a return [sys.executable, '--multiprocessing-fork']
343n/a else:
344n/a prog = 'from multiprocessing.forking import main; main()'
345n/a opts = util._args_from_interpreter_flags()
346n/a return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
347n/a
348n/a
349n/a def main():
350n/a '''
351n/a Run code specifed by data received over pipe
352n/a '''
353n/a assert is_forking(sys.argv)
354n/a
355n/a handle = int(sys.argv[-1])
356n/a fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
357n/a from_parent = os.fdopen(fd, 'rb')
358n/a
359n/a process.current_process()._inheriting = True
360n/a preparation_data = load(from_parent)
361n/a prepare(preparation_data)
362n/a self = load(from_parent)
363n/a process.current_process()._inheriting = False
364n/a
365n/a from_parent.close()
366n/a
367n/a exitcode = self._bootstrap()
368n/a sys.exit(exitcode)
369n/a
370n/a
371n/a def get_preparation_data(name):
372n/a '''
373n/a Return info about parent needed by child to unpickle process object
374n/a '''
375n/a from .util import _logger, _log_to_stderr
376n/a
377n/a d = dict(
378n/a name=name,
379n/a sys_path=sys.path,
380n/a sys_argv=sys.argv,
381n/a log_to_stderr=_log_to_stderr,
382n/a orig_dir=process.ORIGINAL_DIR,
383n/a authkey=process.current_process().authkey,
384n/a )
385n/a
386n/a if _logger is not None:
387n/a d['log_level'] = _logger.getEffectiveLevel()
388n/a
389n/a if not WINEXE and not WINSERVICE:
390n/a main_path = getattr(sys.modules['__main__'], '__file__', None)
391n/a if not main_path and sys.argv[0] not in ('', '-c'):
392n/a main_path = sys.argv[0]
393n/a if main_path is not None:
394n/a if not os.path.isabs(main_path) and \
395n/a process.ORIGINAL_DIR is not None:
396n/a main_path = os.path.join(process.ORIGINAL_DIR, main_path)
397n/a d['main_path'] = os.path.normpath(main_path)
398n/a
399n/a return d
400n/a
401n/a#
402n/a# Prepare current process
403n/a#
404n/a
405n/aold_main_modules = []
406n/a
407n/adef prepare(data):
408n/a '''
409n/a Try to get current process ready to unpickle process object
410n/a '''
411n/a old_main_modules.append(sys.modules['__main__'])
412n/a
413n/a if 'name' in data:
414n/a process.current_process().name = data['name']
415n/a
416n/a if 'authkey' in data:
417n/a process.current_process()._authkey = data['authkey']
418n/a
419n/a if 'log_to_stderr' in data and data['log_to_stderr']:
420n/a util.log_to_stderr()
421n/a
422n/a if 'log_level' in data:
423n/a util.get_logger().setLevel(data['log_level'])
424n/a
425n/a if 'sys_path' in data:
426n/a sys.path = data['sys_path']
427n/a
428n/a if 'sys_argv' in data:
429n/a sys.argv = data['sys_argv']
430n/a
431n/a if 'dir' in data:
432n/a os.chdir(data['dir'])
433n/a
434n/a if 'orig_dir' in data:
435n/a process.ORIGINAL_DIR = data['orig_dir']
436n/a
437n/a if 'main_path' in data:
438n/a # XXX (ncoghlan): The following code makes several bogus
439n/a # assumptions regarding the relationship between __file__
440n/a # and a module's real name. See PEP 302 and issue #10845
441n/a main_path = data['main_path']
442n/a main_name = os.path.splitext(os.path.basename(main_path))[0]
443n/a if main_name == '__init__':
444n/a main_name = os.path.basename(os.path.dirname(main_path))
445n/a
446n/a if main_name == '__main__':
447n/a main_module = sys.modules['__main__']
448n/a main_module.__file__ = main_path
449n/a elif main_name != 'ipython':
450n/a # Main modules not actually called __main__.py may
451n/a # contain additional code that should still be executed
452n/a import importlib
453n/a import types
454n/a
455n/a if main_path is None:
456n/a dirs = None
457n/a elif os.path.basename(main_path).startswith('__init__.py'):
458n/a dirs = [os.path.dirname(os.path.dirname(main_path))]
459n/a else:
460n/a dirs = [os.path.dirname(main_path)]
461n/a
462n/a assert main_name not in sys.modules, main_name
463n/a sys.modules.pop('__mp_main__', None)
464n/a # We should not try to load __main__
465n/a # since that would execute 'if __name__ == "__main__"'
466n/a # clauses, potentially causing a psuedo fork bomb.
467n/a loader = importlib.find_loader(main_name, path=dirs)
468n/a main_module = types.ModuleType(main_name)
469n/a try:
470n/a loader.init_module_attrs(main_module)
471n/a except AttributeError: # init_module_attrs is optional
472n/a pass
473n/a main_module.__name__ = '__mp_main__'
474n/a code = loader.get_code(main_name)
475n/a exec(code, main_module.__dict__)
476n/a
477n/a sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module