ยปCore Development>Code coverage>Lib/multiprocessing/context.py

Python code coverage for Lib/multiprocessing/context.py

#countcontent
1n/aimport os
2n/aimport sys
3n/aimport threading
4n/a
5n/afrom . import process
6n/afrom . import reduction
7n/a
8n/a__all__ = [] # things are copied from here to __init__.py
9n/a
10n/a#
11n/a# Exceptions
12n/a#
13n/a
14n/aclass ProcessError(Exception):
15n/a pass
16n/a
17n/aclass BufferTooShort(ProcessError):
18n/a pass
19n/a
20n/aclass TimeoutError(ProcessError):
21n/a pass
22n/a
23n/aclass AuthenticationError(ProcessError):
24n/a pass
25n/a
26n/a#
27n/a# Base type for contexts
28n/a#
29n/a
30n/aclass BaseContext(object):
31n/a
32n/a ProcessError = ProcessError
33n/a BufferTooShort = BufferTooShort
34n/a TimeoutError = TimeoutError
35n/a AuthenticationError = AuthenticationError
36n/a
37n/a current_process = staticmethod(process.current_process)
38n/a active_children = staticmethod(process.active_children)
39n/a
40n/a def cpu_count(self):
41n/a '''Returns the number of CPUs in the system'''
42n/a num = os.cpu_count()
43n/a if num is None:
44n/a raise NotImplementedError('cannot determine number of cpus')
45n/a else:
46n/a return num
47n/a
48n/a def Manager(self):
49n/a '''Returns a manager associated with a running server process
50n/a
51n/a The managers methods such as `Lock()`, `Condition()` and `Queue()`
52n/a can be used to create shared objects.
53n/a '''
54n/a from .managers import SyncManager
55n/a m = SyncManager(ctx=self.get_context())
56n/a m.start()
57n/a return m
58n/a
59n/a def Pipe(self, duplex=True):
60n/a '''Returns two connection object connected by a pipe'''
61n/a from .connection import Pipe
62n/a return Pipe(duplex)
63n/a
64n/a def Lock(self):
65n/a '''Returns a non-recursive lock object'''
66n/a from .synchronize import Lock
67n/a return Lock(ctx=self.get_context())
68n/a
69n/a def RLock(self):
70n/a '''Returns a recursive lock object'''
71n/a from .synchronize import RLock
72n/a return RLock(ctx=self.get_context())
73n/a
74n/a def Condition(self, lock=None):
75n/a '''Returns a condition object'''
76n/a from .synchronize import Condition
77n/a return Condition(lock, ctx=self.get_context())
78n/a
79n/a def Semaphore(self, value=1):
80n/a '''Returns a semaphore object'''
81n/a from .synchronize import Semaphore
82n/a return Semaphore(value, ctx=self.get_context())
83n/a
84n/a def BoundedSemaphore(self, value=1):
85n/a '''Returns a bounded semaphore object'''
86n/a from .synchronize import BoundedSemaphore
87n/a return BoundedSemaphore(value, ctx=self.get_context())
88n/a
89n/a def Event(self):
90n/a '''Returns an event object'''
91n/a from .synchronize import Event
92n/a return Event(ctx=self.get_context())
93n/a
94n/a def Barrier(self, parties, action=None, timeout=None):
95n/a '''Returns a barrier object'''
96n/a from .synchronize import Barrier
97n/a return Barrier(parties, action, timeout, ctx=self.get_context())
98n/a
99n/a def Queue(self, maxsize=0):
100n/a '''Returns a queue object'''
101n/a from .queues import Queue
102n/a return Queue(maxsize, ctx=self.get_context())
103n/a
104n/a def JoinableQueue(self, maxsize=0):
105n/a '''Returns a queue object'''
106n/a from .queues import JoinableQueue
107n/a return JoinableQueue(maxsize, ctx=self.get_context())
108n/a
109n/a def SimpleQueue(self):
110n/a '''Returns a queue object'''
111n/a from .queues import SimpleQueue
112n/a return SimpleQueue(ctx=self.get_context())
113n/a
114n/a def Pool(self, processes=None, initializer=None, initargs=(),
115n/a maxtasksperchild=None):
116n/a '''Returns a process pool object'''
117n/a from .pool import Pool
118n/a return Pool(processes, initializer, initargs, maxtasksperchild,
119n/a context=self.get_context())
120n/a
121n/a def RawValue(self, typecode_or_type, *args):
122n/a '''Returns a shared object'''
123n/a from .sharedctypes import RawValue
124n/a return RawValue(typecode_or_type, *args)
125n/a
126n/a def RawArray(self, typecode_or_type, size_or_initializer):
127n/a '''Returns a shared array'''
128n/a from .sharedctypes import RawArray
129n/a return RawArray(typecode_or_type, size_or_initializer)
130n/a
131n/a def Value(self, typecode_or_type, *args, lock=True):
132n/a '''Returns a synchronized shared object'''
133n/a from .sharedctypes import Value
134n/a return Value(typecode_or_type, *args, lock=lock,
135n/a ctx=self.get_context())
136n/a
137n/a def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
138n/a '''Returns a synchronized shared array'''
139n/a from .sharedctypes import Array
140n/a return Array(typecode_or_type, size_or_initializer, lock=lock,
141n/a ctx=self.get_context())
142n/a
143n/a def freeze_support(self):
144n/a '''Check whether this is a fake forked process in a frozen executable.
145n/a If so then run code specified by commandline and exit.
146n/a '''
147n/a if sys.platform == 'win32' and getattr(sys, 'frozen', False):
148n/a from .spawn import freeze_support
149n/a freeze_support()
150n/a
151n/a def get_logger(self):
152n/a '''Return package logger -- if it does not already exist then
153n/a it is created.
154n/a '''
155n/a from .util import get_logger
156n/a return get_logger()
157n/a
158n/a def log_to_stderr(self, level=None):
159n/a '''Turn on logging and add a handler which prints to stderr'''
160n/a from .util import log_to_stderr
161n/a return log_to_stderr(level)
162n/a
163n/a def allow_connection_pickling(self):
164n/a '''Install support for sending connections and sockets
165n/a between processes
166n/a '''
167n/a # This is undocumented. In previous versions of multiprocessing
168n/a # its only effect was to make socket objects inheritable on Windows.
169n/a from . import connection
170n/a
171n/a def set_executable(self, executable):
172n/a '''Sets the path to a python.exe or pythonw.exe binary used to run
173n/a child processes instead of sys.executable when using the 'spawn'
174n/a start method. Useful for people embedding Python.
175n/a '''
176n/a from .spawn import set_executable
177n/a set_executable(executable)
178n/a
179n/a def set_forkserver_preload(self, module_names):
180n/a '''Set list of module names to try to load in forkserver process.
181n/a This is really just a hint.
182n/a '''
183n/a from .forkserver import set_forkserver_preload
184n/a set_forkserver_preload(module_names)
185n/a
186n/a def get_context(self, method=None):
187n/a if method is None:
188n/a return self
189n/a try:
190n/a ctx = _concrete_contexts[method]
191n/a except KeyError:
192n/a raise ValueError('cannot find context for %r' % method)
193n/a ctx._check_available()
194n/a return ctx
195n/a
196n/a def get_start_method(self, allow_none=False):
197n/a return self._name
198n/a
199n/a def set_start_method(self, method, force=False):
200n/a raise ValueError('cannot set start method of concrete context')
201n/a
202n/a @property
203n/a def reducer(self):
204n/a '''Controls how objects will be reduced to a form that can be
205n/a shared with other processes.'''
206n/a return globals().get('reduction')
207n/a
208n/a @reducer.setter
209n/a def reducer(self, reduction):
210n/a globals()['reduction'] = reduction
211n/a
212n/a def _check_available(self):
213n/a pass
214n/a
215n/a#
216n/a# Type of default context -- underlying context can be set at most once
217n/a#
218n/a
219n/aclass Process(process.BaseProcess):
220n/a _start_method = None
221n/a @staticmethod
222n/a def _Popen(process_obj):
223n/a return _default_context.get_context().Process._Popen(process_obj)
224n/a
225n/aclass DefaultContext(BaseContext):
226n/a Process = Process
227n/a
228n/a def __init__(self, context):
229n/a self._default_context = context
230n/a self._actual_context = None
231n/a
232n/a def get_context(self, method=None):
233n/a if method is None:
234n/a if self._actual_context is None:
235n/a self._actual_context = self._default_context
236n/a return self._actual_context
237n/a else:
238n/a return super().get_context(method)
239n/a
240n/a def set_start_method(self, method, force=False):
241n/a if self._actual_context is not None and not force:
242n/a raise RuntimeError('context has already been set')
243n/a if method is None and force:
244n/a self._actual_context = None
245n/a return
246n/a self._actual_context = self.get_context(method)
247n/a
248n/a def get_start_method(self, allow_none=False):
249n/a if self._actual_context is None:
250n/a if allow_none:
251n/a return None
252n/a self._actual_context = self._default_context
253n/a return self._actual_context._name
254n/a
255n/a def get_all_start_methods(self):
256n/a if sys.platform == 'win32':
257n/a return ['spawn']
258n/a else:
259n/a if reduction.HAVE_SEND_HANDLE:
260n/a return ['fork', 'spawn', 'forkserver']
261n/a else:
262n/a return ['fork', 'spawn']
263n/a
264n/aDefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
265n/a
266n/a#
267n/a# Context types for fixed start method
268n/a#
269n/a
270n/aif sys.platform != 'win32':
271n/a
272n/a class ForkProcess(process.BaseProcess):
273n/a _start_method = 'fork'
274n/a @staticmethod
275n/a def _Popen(process_obj):
276n/a from .popen_fork import Popen
277n/a return Popen(process_obj)
278n/a
279n/a class SpawnProcess(process.BaseProcess):
280n/a _start_method = 'spawn'
281n/a @staticmethod
282n/a def _Popen(process_obj):
283n/a from .popen_spawn_posix import Popen
284n/a return Popen(process_obj)
285n/a
286n/a class ForkServerProcess(process.BaseProcess):
287n/a _start_method = 'forkserver'
288n/a @staticmethod
289n/a def _Popen(process_obj):
290n/a from .popen_forkserver import Popen
291n/a return Popen(process_obj)
292n/a
293n/a class ForkContext(BaseContext):
294n/a _name = 'fork'
295n/a Process = ForkProcess
296n/a
297n/a class SpawnContext(BaseContext):
298n/a _name = 'spawn'
299n/a Process = SpawnProcess
300n/a
301n/a class ForkServerContext(BaseContext):
302n/a _name = 'forkserver'
303n/a Process = ForkServerProcess
304n/a def _check_available(self):
305n/a if not reduction.HAVE_SEND_HANDLE:
306n/a raise ValueError('forkserver start method not available')
307n/a
308n/a _concrete_contexts = {
309n/a 'fork': ForkContext(),
310n/a 'spawn': SpawnContext(),
311n/a 'forkserver': ForkServerContext(),
312n/a }
313n/a _default_context = DefaultContext(_concrete_contexts['fork'])
314n/a
315n/aelse:
316n/a
317n/a class SpawnProcess(process.BaseProcess):
318n/a _start_method = 'spawn'
319n/a @staticmethod
320n/a def _Popen(process_obj):
321n/a from .popen_spawn_win32 import Popen
322n/a return Popen(process_obj)
323n/a
324n/a class SpawnContext(BaseContext):
325n/a _name = 'spawn'
326n/a Process = SpawnProcess
327n/a
328n/a _concrete_contexts = {
329n/a 'spawn': SpawnContext(),
330n/a }
331n/a _default_context = DefaultContext(_concrete_contexts['spawn'])
332n/a
333n/a#
334n/a# Force the start method
335n/a#
336n/a
337n/adef _force_start_method(method):
338n/a _default_context._actual_context = _concrete_contexts[method]
339n/a
340n/a#
341n/a# Check that the current thread is spawning a child process
342n/a#
343n/a
344n/a_tls = threading.local()
345n/a
346n/adef get_spawning_popen():
347n/a return getattr(_tls, 'spawning_popen', None)
348n/a
349n/adef set_spawning_popen(popen):
350n/a _tls.spawning_popen = popen
351n/a
352n/adef assert_spawning(obj):
353n/a if get_spawning_popen() is None:
354n/a raise RuntimeError(
355n/a '%s objects should only be shared between processes'
356n/a ' through inheritance' % type(obj).__name__
357n/a )