ยปCore Development>Code coverage>Lib/concurrent/futures/process.py

Python code coverage for Lib/concurrent/futures/process.py

#countcontent
1n/a# Copyright 2009 Brian Quinlan. All Rights Reserved.
2n/a# Licensed to PSF under a Contributor Agreement.
3n/a
4n/a"""Implements ProcessPoolExecutor.
5n/a
6n/aThe follow diagram and text describe the data-flow through the system:
7n/a
8n/a|======================= In-process =====================|== Out-of-process ==|
9n/a
10n/a+----------+ +----------+ +--------+ +-----------+ +---------+
11n/a| | => | Work Ids | => | | => | Call Q | => | |
12n/a| | +----------+ | | +-----------+ | |
13n/a| | | ... | | | | ... | | |
14n/a| | | 6 | | | | 5, call() | | |
15n/a| | | 7 | | | | ... | | |
16n/a| Process | | ... | | Local | +-----------+ | Process |
17n/a| Pool | +----------+ | Worker | | #1..n |
18n/a| Executor | | Thread | | |
19n/a| | +----------- + | | +-----------+ | |
20n/a| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21n/a| | +------------+ | | +-----------+ | |
22n/a| | | 6: call() | | | | ... | | |
23n/a| | | future | | | | 4, result | | |
24n/a| | | ... | | | | 3, except | | |
25n/a+----------+ +------------+ +--------+ +-----------+ +---------+
26n/a
27n/aExecutor.submit() called:
28n/a- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29n/a- adds the id of the _WorkItem to the "Work Ids" queue
30n/a
31n/aLocal worker thread:
32n/a- reads work ids from the "Work Ids" queue and looks up the corresponding
33n/a WorkItem from the "Work Items" dict: if the work item has been cancelled then
34n/a it is simply removed from the dict, otherwise it is repackaged as a
35n/a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36n/a until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37n/a calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38n/a- reads _ResultItems from "Result Q", updates the future stored in the
39n/a "Work Items" dict and deletes the dict entry
40n/a
41n/aProcess #1..n:
42n/a- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43n/a _ResultItems in "Result Q"
44n/a"""
45n/a
46n/a__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47n/a
48n/aimport atexit
49n/aimport os
50n/afrom concurrent.futures import _base
51n/aimport queue
52n/afrom queue import Full
53n/aimport multiprocessing
54n/afrom multiprocessing import SimpleQueue
55n/afrom multiprocessing.connection import wait
56n/aimport threading
57n/aimport weakref
58n/afrom functools import partial
59n/aimport itertools
60n/aimport traceback
61n/a
62n/a# Workers are created as daemon threads and processes. This is done to allow the
63n/a# interpreter to exit when there are still idle processes in a
64n/a# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65n/a# allowing workers to die with the interpreter has two undesirable properties:
66n/a# - The workers would still be running during interpreter shutdown,
67n/a# meaning that they would fail in unpredictable ways.
68n/a# - The workers could be killed while evaluating a work item, which could
69n/a# be bad if the callable being evaluated has external side-effects e.g.
70n/a# writing to a file.
71n/a#
72n/a# To work around this problem, an exit handler is installed which tells the
73n/a# workers to exit when their work queues are empty and then waits until the
74n/a# threads/processes finish.
75n/a
76n/a_threads_queues = weakref.WeakKeyDictionary()
77n/a_shutdown = False
78n/a
79n/adef _python_exit():
80n/a global _shutdown
81n/a _shutdown = True
82n/a items = list(_threads_queues.items())
83n/a for t, q in items:
84n/a q.put(None)
85n/a for t, q in items:
86n/a t.join()
87n/a
88n/a# Controls how many more calls than processes will be queued in the call queue.
89n/a# A smaller number will mean that processes spend more time idle waiting for
90n/a# work while a larger number will make Future.cancel() succeed less frequently
91n/a# (Futures in the call queue cannot be cancelled).
92n/aEXTRA_QUEUED_CALLS = 1
93n/a
94n/a# Hack to embed stringification of remote traceback in local traceback
95n/a
96n/aclass _RemoteTraceback(Exception):
97n/a def __init__(self, tb):
98n/a self.tb = tb
99n/a def __str__(self):
100n/a return self.tb
101n/a
102n/aclass _ExceptionWithTraceback:
103n/a def __init__(self, exc, tb):
104n/a tb = traceback.format_exception(type(exc), exc, tb)
105n/a tb = ''.join(tb)
106n/a self.exc = exc
107n/a self.tb = '\n"""\n%s"""' % tb
108n/a def __reduce__(self):
109n/a return _rebuild_exc, (self.exc, self.tb)
110n/a
111n/adef _rebuild_exc(exc, tb):
112n/a exc.__cause__ = _RemoteTraceback(tb)
113n/a return exc
114n/a
115n/aclass _WorkItem(object):
116n/a def __init__(self, future, fn, args, kwargs):
117n/a self.future = future
118n/a self.fn = fn
119n/a self.args = args
120n/a self.kwargs = kwargs
121n/a
122n/aclass _ResultItem(object):
123n/a def __init__(self, work_id, exception=None, result=None):
124n/a self.work_id = work_id
125n/a self.exception = exception
126n/a self.result = result
127n/a
128n/aclass _CallItem(object):
129n/a def __init__(self, work_id, fn, args, kwargs):
130n/a self.work_id = work_id
131n/a self.fn = fn
132n/a self.args = args
133n/a self.kwargs = kwargs
134n/a
135n/adef _get_chunks(*iterables, chunksize):
136n/a """ Iterates over zip()ed iterables in chunks. """
137n/a it = zip(*iterables)
138n/a while True:
139n/a chunk = tuple(itertools.islice(it, chunksize))
140n/a if not chunk:
141n/a return
142n/a yield chunk
143n/a
144n/adef _process_chunk(fn, chunk):
145n/a """ Processes a chunk of an iterable passed to map.
146n/a
147n/a Runs the function passed to map() on a chunk of the
148n/a iterable passed to map.
149n/a
150n/a This function is run in a separate process.
151n/a
152n/a """
153n/a return [fn(*args) for args in chunk]
154n/a
155n/adef _process_worker(call_queue, result_queue):
156n/a """Evaluates calls from call_queue and places the results in result_queue.
157n/a
158n/a This worker is run in a separate process.
159n/a
160n/a Args:
161n/a call_queue: A multiprocessing.Queue of _CallItems that will be read and
162n/a evaluated by the worker.
163n/a result_queue: A multiprocessing.Queue of _ResultItems that will written
164n/a to by the worker.
165n/a shutdown: A multiprocessing.Event that will be set as a signal to the
166n/a worker that it should exit when call_queue is empty.
167n/a """
168n/a while True:
169n/a call_item = call_queue.get(block=True)
170n/a if call_item is None:
171n/a # Wake up queue management thread
172n/a result_queue.put(os.getpid())
173n/a return
174n/a try:
175n/a r = call_item.fn(*call_item.args, **call_item.kwargs)
176n/a except BaseException as e:
177n/a exc = _ExceptionWithTraceback(e, e.__traceback__)
178n/a result_queue.put(_ResultItem(call_item.work_id, exception=exc))
179n/a else:
180n/a result_queue.put(_ResultItem(call_item.work_id,
181n/a result=r))
182n/a
183n/adef _add_call_item_to_queue(pending_work_items,
184n/a work_ids,
185n/a call_queue):
186n/a """Fills call_queue with _WorkItems from pending_work_items.
187n/a
188n/a This function never blocks.
189n/a
190n/a Args:
191n/a pending_work_items: A dict mapping work ids to _WorkItems e.g.
192n/a {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193n/a work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
194n/a are consumed and the corresponding _WorkItems from
195n/a pending_work_items are transformed into _CallItems and put in
196n/a call_queue.
197n/a call_queue: A multiprocessing.Queue that will be filled with _CallItems
198n/a derived from _WorkItems.
199n/a """
200n/a while True:
201n/a if call_queue.full():
202n/a return
203n/a try:
204n/a work_id = work_ids.get(block=False)
205n/a except queue.Empty:
206n/a return
207n/a else:
208n/a work_item = pending_work_items[work_id]
209n/a
210n/a if work_item.future.set_running_or_notify_cancel():
211n/a call_queue.put(_CallItem(work_id,
212n/a work_item.fn,
213n/a work_item.args,
214n/a work_item.kwargs),
215n/a block=True)
216n/a else:
217n/a del pending_work_items[work_id]
218n/a continue
219n/a
220n/adef _queue_management_worker(executor_reference,
221n/a processes,
222n/a pending_work_items,
223n/a work_ids_queue,
224n/a call_queue,
225n/a result_queue):
226n/a """Manages the communication between this process and the worker processes.
227n/a
228n/a This function is run in a local thread.
229n/a
230n/a Args:
231n/a executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
232n/a this thread. Used to determine if the ProcessPoolExecutor has been
233n/a garbage collected and that this function can exit.
234n/a process: A list of the multiprocessing.Process instances used as
235n/a workers.
236n/a pending_work_items: A dict mapping work ids to _WorkItems e.g.
237n/a {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
238n/a work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
239n/a call_queue: A multiprocessing.Queue that will be filled with _CallItems
240n/a derived from _WorkItems for processing by the process workers.
241n/a result_queue: A multiprocessing.Queue of _ResultItems generated by the
242n/a process workers.
243n/a """
244n/a executor = None
245n/a
246n/a def shutting_down():
247n/a return _shutdown or executor is None or executor._shutdown_thread
248n/a
249n/a def shutdown_worker():
250n/a # This is an upper bound
251n/a nb_children_alive = sum(p.is_alive() for p in processes.values())
252n/a for i in range(0, nb_children_alive):
253n/a call_queue.put_nowait(None)
254n/a # Release the queue's resources as soon as possible.
255n/a call_queue.close()
256n/a # If .join() is not called on the created processes then
257n/a # some multiprocessing.Queue methods may deadlock on Mac OS X.
258n/a for p in processes.values():
259n/a p.join()
260n/a
261n/a reader = result_queue._reader
262n/a
263n/a while True:
264n/a _add_call_item_to_queue(pending_work_items,
265n/a work_ids_queue,
266n/a call_queue)
267n/a
268n/a sentinels = [p.sentinel for p in processes.values()]
269n/a assert sentinels
270n/a ready = wait([reader] + sentinels)
271n/a if reader in ready:
272n/a result_item = reader.recv()
273n/a else:
274n/a # Mark the process pool broken so that submits fail right now.
275n/a executor = executor_reference()
276n/a if executor is not None:
277n/a executor._broken = True
278n/a executor._shutdown_thread = True
279n/a executor = None
280n/a # All futures in flight must be marked failed
281n/a for work_id, work_item in pending_work_items.items():
282n/a work_item.future.set_exception(
283n/a BrokenProcessPool(
284n/a "A process in the process pool was "
285n/a "terminated abruptly while the future was "
286n/a "running or pending."
287n/a ))
288n/a # Delete references to object. See issue16284
289n/a del work_item
290n/a pending_work_items.clear()
291n/a # Terminate remaining workers forcibly: the queues or their
292n/a # locks may be in a dirty state and block forever.
293n/a for p in processes.values():
294n/a p.terminate()
295n/a shutdown_worker()
296n/a return
297n/a if isinstance(result_item, int):
298n/a # Clean shutdown of a worker using its PID
299n/a # (avoids marking the executor broken)
300n/a assert shutting_down()
301n/a p = processes.pop(result_item)
302n/a p.join()
303n/a if not processes:
304n/a shutdown_worker()
305n/a return
306n/a elif result_item is not None:
307n/a work_item = pending_work_items.pop(result_item.work_id, None)
308n/a # work_item can be None if another process terminated (see above)
309n/a if work_item is not None:
310n/a if result_item.exception:
311n/a work_item.future.set_exception(result_item.exception)
312n/a else:
313n/a work_item.future.set_result(result_item.result)
314n/a # Delete references to object. See issue16284
315n/a del work_item
316n/a # Check whether we should start shutting down.
317n/a executor = executor_reference()
318n/a # No more work items can be added if:
319n/a # - The interpreter is shutting down OR
320n/a # - The executor that owns this worker has been collected OR
321n/a # - The executor that owns this worker has been shutdown.
322n/a if shutting_down():
323n/a try:
324n/a # Since no new work items can be added, it is safe to shutdown
325n/a # this thread if there are no pending work items.
326n/a if not pending_work_items:
327n/a shutdown_worker()
328n/a return
329n/a except Full:
330n/a # This is not a problem: we will eventually be woken up (in
331n/a # result_queue.get()) and be able to send a sentinel again.
332n/a pass
333n/a executor = None
334n/a
335n/a_system_limits_checked = False
336n/a_system_limited = None
337n/adef _check_system_limits():
338n/a global _system_limits_checked, _system_limited
339n/a if _system_limits_checked:
340n/a if _system_limited:
341n/a raise NotImplementedError(_system_limited)
342n/a _system_limits_checked = True
343n/a try:
344n/a nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
345n/a except (AttributeError, ValueError):
346n/a # sysconf not available or setting not available
347n/a return
348n/a if nsems_max == -1:
349n/a # indetermined limit, assume that limit is determined
350n/a # by available memory only
351n/a return
352n/a if nsems_max >= 256:
353n/a # minimum number of semaphores available
354n/a # according to POSIX
355n/a return
356n/a _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
357n/a raise NotImplementedError(_system_limited)
358n/a
359n/a
360n/aclass BrokenProcessPool(RuntimeError):
361n/a """
362n/a Raised when a process in a ProcessPoolExecutor terminated abruptly
363n/a while a future was in the running state.
364n/a """
365n/a
366n/a
367n/aclass ProcessPoolExecutor(_base.Executor):
368n/a def __init__(self, max_workers=None):
369n/a """Initializes a new ProcessPoolExecutor instance.
370n/a
371n/a Args:
372n/a max_workers: The maximum number of processes that can be used to
373n/a execute the given calls. If None or not given then as many
374n/a worker processes will be created as the machine has processors.
375n/a """
376n/a _check_system_limits()
377n/a
378n/a if max_workers is None:
379n/a self._max_workers = os.cpu_count() or 1
380n/a else:
381n/a if max_workers <= 0:
382n/a raise ValueError("max_workers must be greater than 0")
383n/a
384n/a self._max_workers = max_workers
385n/a
386n/a # Make the call queue slightly larger than the number of processes to
387n/a # prevent the worker processes from idling. But don't make it too big
388n/a # because futures in the call queue cannot be cancelled.
389n/a self._call_queue = multiprocessing.Queue(self._max_workers +
390n/a EXTRA_QUEUED_CALLS)
391n/a # Killed worker processes can produce spurious "broken pipe"
392n/a # tracebacks in the queue's own worker thread. But we detect killed
393n/a # processes anyway, so silence the tracebacks.
394n/a self._call_queue._ignore_epipe = True
395n/a self._result_queue = SimpleQueue()
396n/a self._work_ids = queue.Queue()
397n/a self._queue_management_thread = None
398n/a # Map of pids to processes
399n/a self._processes = {}
400n/a
401n/a # Shutdown is a two-step process.
402n/a self._shutdown_thread = False
403n/a self._shutdown_lock = threading.Lock()
404n/a self._broken = False
405n/a self._queue_count = 0
406n/a self._pending_work_items = {}
407n/a
408n/a def _start_queue_management_thread(self):
409n/a # When the executor gets lost, the weakref callback will wake up
410n/a # the queue management thread.
411n/a def weakref_cb(_, q=self._result_queue):
412n/a q.put(None)
413n/a if self._queue_management_thread is None:
414n/a # Start the processes so that their sentinels are known.
415n/a self._adjust_process_count()
416n/a self._queue_management_thread = threading.Thread(
417n/a target=_queue_management_worker,
418n/a args=(weakref.ref(self, weakref_cb),
419n/a self._processes,
420n/a self._pending_work_items,
421n/a self._work_ids,
422n/a self._call_queue,
423n/a self._result_queue))
424n/a self._queue_management_thread.daemon = True
425n/a self._queue_management_thread.start()
426n/a _threads_queues[self._queue_management_thread] = self._result_queue
427n/a
428n/a def _adjust_process_count(self):
429n/a for _ in range(len(self._processes), self._max_workers):
430n/a p = multiprocessing.Process(
431n/a target=_process_worker,
432n/a args=(self._call_queue,
433n/a self._result_queue))
434n/a p.start()
435n/a self._processes[p.pid] = p
436n/a
437n/a def submit(self, fn, *args, **kwargs):
438n/a with self._shutdown_lock:
439n/a if self._broken:
440n/a raise BrokenProcessPool('A child process terminated '
441n/a 'abruptly, the process pool is not usable anymore')
442n/a if self._shutdown_thread:
443n/a raise RuntimeError('cannot schedule new futures after shutdown')
444n/a
445n/a f = _base.Future()
446n/a w = _WorkItem(f, fn, args, kwargs)
447n/a
448n/a self._pending_work_items[self._queue_count] = w
449n/a self._work_ids.put(self._queue_count)
450n/a self._queue_count += 1
451n/a # Wake up queue management thread
452n/a self._result_queue.put(None)
453n/a
454n/a self._start_queue_management_thread()
455n/a return f
456n/a submit.__doc__ = _base.Executor.submit.__doc__
457n/a
458n/a def map(self, fn, *iterables, timeout=None, chunksize=1):
459n/a """Returns an iterator equivalent to map(fn, iter).
460n/a
461n/a Args:
462n/a fn: A callable that will take as many arguments as there are
463n/a passed iterables.
464n/a timeout: The maximum number of seconds to wait. If None, then there
465n/a is no limit on the wait time.
466n/a chunksize: If greater than one, the iterables will be chopped into
467n/a chunks of size chunksize and submitted to the process pool.
468n/a If set to one, the items in the list will be sent one at a time.
469n/a
470n/a Returns:
471n/a An iterator equivalent to: map(func, *iterables) but the calls may
472n/a be evaluated out-of-order.
473n/a
474n/a Raises:
475n/a TimeoutError: If the entire result iterator could not be generated
476n/a before the given timeout.
477n/a Exception: If fn(*args) raises for any values.
478n/a """
479n/a if chunksize < 1:
480n/a raise ValueError("chunksize must be >= 1.")
481n/a
482n/a results = super().map(partial(_process_chunk, fn),
483n/a _get_chunks(*iterables, chunksize=chunksize),
484n/a timeout=timeout)
485n/a return itertools.chain.from_iterable(results)
486n/a
487n/a def shutdown(self, wait=True):
488n/a with self._shutdown_lock:
489n/a self._shutdown_thread = True
490n/a if self._queue_management_thread:
491n/a # Wake up queue management thread
492n/a self._result_queue.put(None)
493n/a if wait:
494n/a self._queue_management_thread.join()
495n/a # To reduce the risk of opening too many files, remove references to
496n/a # objects that use file descriptors.
497n/a self._queue_management_thread = None
498n/a self._call_queue = None
499n/a self._result_queue = None
500n/a self._processes = None
501n/a shutdown.__doc__ = _base.Executor.shutdown.__doc__
502n/a
503n/aatexit.register(_python_exit)