ยปCore Development>Code coverage>Lib/concurrent/futures/_base.py

Python code coverage for Lib/concurrent/futures/_base.py

#countcontent
1n/a# Copyright 2009 Brian Quinlan. All Rights Reserved.
2n/a# Licensed to PSF under a Contributor Agreement.
3n/a
4n/a__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5n/a
6n/aimport collections
7n/aimport logging
8n/aimport threading
9n/aimport time
10n/a
11n/aFIRST_COMPLETED = 'FIRST_COMPLETED'
12n/aFIRST_EXCEPTION = 'FIRST_EXCEPTION'
13n/aALL_COMPLETED = 'ALL_COMPLETED'
14n/a_AS_COMPLETED = '_AS_COMPLETED'
15n/a
16n/a# Possible future states (for internal use by the futures package).
17n/aPENDING = 'PENDING'
18n/aRUNNING = 'RUNNING'
19n/a# The future was cancelled by the user...
20n/aCANCELLED = 'CANCELLED'
21n/a# ...and _Waiter.add_cancelled() was called by a worker.
22n/aCANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
23n/aFINISHED = 'FINISHED'
24n/a
25n/a_FUTURE_STATES = [
26n/a PENDING,
27n/a RUNNING,
28n/a CANCELLED,
29n/a CANCELLED_AND_NOTIFIED,
30n/a FINISHED
31n/a]
32n/a
33n/a_STATE_TO_DESCRIPTION_MAP = {
34n/a PENDING: "pending",
35n/a RUNNING: "running",
36n/a CANCELLED: "cancelled",
37n/a CANCELLED_AND_NOTIFIED: "cancelled",
38n/a FINISHED: "finished"
39n/a}
40n/a
41n/a# Logger for internal use by the futures package.
42n/aLOGGER = logging.getLogger("concurrent.futures")
43n/a
44n/aclass Error(Exception):
45n/a """Base class for all future-related exceptions."""
46n/a pass
47n/a
48n/aclass CancelledError(Error):
49n/a """The Future was cancelled."""
50n/a pass
51n/a
52n/aclass TimeoutError(Error):
53n/a """The operation exceeded the given deadline."""
54n/a pass
55n/a
56n/aclass _Waiter(object):
57n/a """Provides the event that wait() and as_completed() block on."""
58n/a def __init__(self):
59n/a self.event = threading.Event()
60n/a self.finished_futures = []
61n/a
62n/a def add_result(self, future):
63n/a self.finished_futures.append(future)
64n/a
65n/a def add_exception(self, future):
66n/a self.finished_futures.append(future)
67n/a
68n/a def add_cancelled(self, future):
69n/a self.finished_futures.append(future)
70n/a
71n/aclass _AsCompletedWaiter(_Waiter):
72n/a """Used by as_completed()."""
73n/a
74n/a def __init__(self):
75n/a super(_AsCompletedWaiter, self).__init__()
76n/a self.lock = threading.Lock()
77n/a
78n/a def add_result(self, future):
79n/a with self.lock:
80n/a super(_AsCompletedWaiter, self).add_result(future)
81n/a self.event.set()
82n/a
83n/a def add_exception(self, future):
84n/a with self.lock:
85n/a super(_AsCompletedWaiter, self).add_exception(future)
86n/a self.event.set()
87n/a
88n/a def add_cancelled(self, future):
89n/a with self.lock:
90n/a super(_AsCompletedWaiter, self).add_cancelled(future)
91n/a self.event.set()
92n/a
93n/aclass _FirstCompletedWaiter(_Waiter):
94n/a """Used by wait(return_when=FIRST_COMPLETED)."""
95n/a
96n/a def add_result(self, future):
97n/a super().add_result(future)
98n/a self.event.set()
99n/a
100n/a def add_exception(self, future):
101n/a super().add_exception(future)
102n/a self.event.set()
103n/a
104n/a def add_cancelled(self, future):
105n/a super().add_cancelled(future)
106n/a self.event.set()
107n/a
108n/aclass _AllCompletedWaiter(_Waiter):
109n/a """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
110n/a
111n/a def __init__(self, num_pending_calls, stop_on_exception):
112n/a self.num_pending_calls = num_pending_calls
113n/a self.stop_on_exception = stop_on_exception
114n/a self.lock = threading.Lock()
115n/a super().__init__()
116n/a
117n/a def _decrement_pending_calls(self):
118n/a with self.lock:
119n/a self.num_pending_calls -= 1
120n/a if not self.num_pending_calls:
121n/a self.event.set()
122n/a
123n/a def add_result(self, future):
124n/a super().add_result(future)
125n/a self._decrement_pending_calls()
126n/a
127n/a def add_exception(self, future):
128n/a super().add_exception(future)
129n/a if self.stop_on_exception:
130n/a self.event.set()
131n/a else:
132n/a self._decrement_pending_calls()
133n/a
134n/a def add_cancelled(self, future):
135n/a super().add_cancelled(future)
136n/a self._decrement_pending_calls()
137n/a
138n/aclass _AcquireFutures(object):
139n/a """A context manager that does an ordered acquire of Future conditions."""
140n/a
141n/a def __init__(self, futures):
142n/a self.futures = sorted(futures, key=id)
143n/a
144n/a def __enter__(self):
145n/a for future in self.futures:
146n/a future._condition.acquire()
147n/a
148n/a def __exit__(self, *args):
149n/a for future in self.futures:
150n/a future._condition.release()
151n/a
152n/adef _create_and_install_waiters(fs, return_when):
153n/a if return_when == _AS_COMPLETED:
154n/a waiter = _AsCompletedWaiter()
155n/a elif return_when == FIRST_COMPLETED:
156n/a waiter = _FirstCompletedWaiter()
157n/a else:
158n/a pending_count = sum(
159n/a f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
160n/a
161n/a if return_when == FIRST_EXCEPTION:
162n/a waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
163n/a elif return_when == ALL_COMPLETED:
164n/a waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
165n/a else:
166n/a raise ValueError("Invalid return condition: %r" % return_when)
167n/a
168n/a for f in fs:
169n/a f._waiters.append(waiter)
170n/a
171n/a return waiter
172n/a
173n/adef as_completed(fs, timeout=None):
174n/a """An iterator over the given futures that yields each as it completes.
175n/a
176n/a Args:
177n/a fs: The sequence of Futures (possibly created by different Executors) to
178n/a iterate over.
179n/a timeout: The maximum number of seconds to wait. If None, then there
180n/a is no limit on the wait time.
181n/a
182n/a Returns:
183n/a An iterator that yields the given Futures as they complete (finished or
184n/a cancelled). If any given Futures are duplicated, they will be returned
185n/a once.
186n/a
187n/a Raises:
188n/a TimeoutError: If the entire result iterator could not be generated
189n/a before the given timeout.
190n/a """
191n/a if timeout is not None:
192n/a end_time = timeout + time.time()
193n/a
194n/a fs = set(fs)
195n/a with _AcquireFutures(fs):
196n/a finished = set(
197n/a f for f in fs
198n/a if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
199n/a pending = fs - finished
200n/a waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
201n/a
202n/a try:
203n/a yield from finished
204n/a
205n/a while pending:
206n/a if timeout is None:
207n/a wait_timeout = None
208n/a else:
209n/a wait_timeout = end_time - time.time()
210n/a if wait_timeout < 0:
211n/a raise TimeoutError(
212n/a '%d (of %d) futures unfinished' % (
213n/a len(pending), len(fs)))
214n/a
215n/a waiter.event.wait(wait_timeout)
216n/a
217n/a with waiter.lock:
218n/a finished = waiter.finished_futures
219n/a waiter.finished_futures = []
220n/a waiter.event.clear()
221n/a
222n/a for future in finished:
223n/a yield future
224n/a pending.remove(future)
225n/a
226n/a finally:
227n/a for f in fs:
228n/a with f._condition:
229n/a f._waiters.remove(waiter)
230n/a
231n/aDoneAndNotDoneFutures = collections.namedtuple(
232n/a 'DoneAndNotDoneFutures', 'done not_done')
233n/adef wait(fs, timeout=None, return_when=ALL_COMPLETED):
234n/a """Wait for the futures in the given sequence to complete.
235n/a
236n/a Args:
237n/a fs: The sequence of Futures (possibly created by different Executors) to
238n/a wait upon.
239n/a timeout: The maximum number of seconds to wait. If None, then there
240n/a is no limit on the wait time.
241n/a return_when: Indicates when this function should return. The options
242n/a are:
243n/a
244n/a FIRST_COMPLETED - Return when any future finishes or is
245n/a cancelled.
246n/a FIRST_EXCEPTION - Return when any future finishes by raising an
247n/a exception. If no future raises an exception
248n/a then it is equivalent to ALL_COMPLETED.
249n/a ALL_COMPLETED - Return when all futures finish or are cancelled.
250n/a
251n/a Returns:
252n/a A named 2-tuple of sets. The first set, named 'done', contains the
253n/a futures that completed (is finished or cancelled) before the wait
254n/a completed. The second set, named 'not_done', contains uncompleted
255n/a futures.
256n/a """
257n/a with _AcquireFutures(fs):
258n/a done = set(f for f in fs
259n/a if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
260n/a not_done = set(fs) - done
261n/a
262n/a if (return_when == FIRST_COMPLETED) and done:
263n/a return DoneAndNotDoneFutures(done, not_done)
264n/a elif (return_when == FIRST_EXCEPTION) and done:
265n/a if any(f for f in done
266n/a if not f.cancelled() and f.exception() is not None):
267n/a return DoneAndNotDoneFutures(done, not_done)
268n/a
269n/a if len(done) == len(fs):
270n/a return DoneAndNotDoneFutures(done, not_done)
271n/a
272n/a waiter = _create_and_install_waiters(fs, return_when)
273n/a
274n/a waiter.event.wait(timeout)
275n/a for f in fs:
276n/a with f._condition:
277n/a f._waiters.remove(waiter)
278n/a
279n/a done.update(waiter.finished_futures)
280n/a return DoneAndNotDoneFutures(done, set(fs) - done)
281n/a
282n/aclass Future(object):
283n/a """Represents the result of an asynchronous computation."""
284n/a
285n/a def __init__(self):
286n/a """Initializes the future. Should not be called by clients."""
287n/a self._condition = threading.Condition()
288n/a self._state = PENDING
289n/a self._result = None
290n/a self._exception = None
291n/a self._waiters = []
292n/a self._done_callbacks = []
293n/a
294n/a def _invoke_callbacks(self):
295n/a for callback in self._done_callbacks:
296n/a try:
297n/a callback(self)
298n/a except Exception:
299n/a LOGGER.exception('exception calling callback for %r', self)
300n/a
301n/a def __repr__(self):
302n/a with self._condition:
303n/a if self._state == FINISHED:
304n/a if self._exception:
305n/a return '<%s at %#x state=%s raised %s>' % (
306n/a self.__class__.__name__,
307n/a id(self),
308n/a _STATE_TO_DESCRIPTION_MAP[self._state],
309n/a self._exception.__class__.__name__)
310n/a else:
311n/a return '<%s at %#x state=%s returned %s>' % (
312n/a self.__class__.__name__,
313n/a id(self),
314n/a _STATE_TO_DESCRIPTION_MAP[self._state],
315n/a self._result.__class__.__name__)
316n/a return '<%s at %#x state=%s>' % (
317n/a self.__class__.__name__,
318n/a id(self),
319n/a _STATE_TO_DESCRIPTION_MAP[self._state])
320n/a
321n/a def cancel(self):
322n/a """Cancel the future if possible.
323n/a
324n/a Returns True if the future was cancelled, False otherwise. A future
325n/a cannot be cancelled if it is running or has already completed.
326n/a """
327n/a with self._condition:
328n/a if self._state in [RUNNING, FINISHED]:
329n/a return False
330n/a
331n/a if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
332n/a return True
333n/a
334n/a self._state = CANCELLED
335n/a self._condition.notify_all()
336n/a
337n/a self._invoke_callbacks()
338n/a return True
339n/a
340n/a def cancelled(self):
341n/a """Return True if the future was cancelled."""
342n/a with self._condition:
343n/a return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
344n/a
345n/a def running(self):
346n/a """Return True if the future is currently executing."""
347n/a with self._condition:
348n/a return self._state == RUNNING
349n/a
350n/a def done(self):
351n/a """Return True of the future was cancelled or finished executing."""
352n/a with self._condition:
353n/a return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
354n/a
355n/a def __get_result(self):
356n/a if self._exception:
357n/a raise self._exception
358n/a else:
359n/a return self._result
360n/a
361n/a def add_done_callback(self, fn):
362n/a """Attaches a callable that will be called when the future finishes.
363n/a
364n/a Args:
365n/a fn: A callable that will be called with this future as its only
366n/a argument when the future completes or is cancelled. The callable
367n/a will always be called by a thread in the same process in which
368n/a it was added. If the future has already completed or been
369n/a cancelled then the callable will be called immediately. These
370n/a callables are called in the order that they were added.
371n/a """
372n/a with self._condition:
373n/a if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
374n/a self._done_callbacks.append(fn)
375n/a return
376n/a fn(self)
377n/a
378n/a def result(self, timeout=None):
379n/a """Return the result of the call that the future represents.
380n/a
381n/a Args:
382n/a timeout: The number of seconds to wait for the result if the future
383n/a isn't done. If None, then there is no limit on the wait time.
384n/a
385n/a Returns:
386n/a The result of the call that the future represents.
387n/a
388n/a Raises:
389n/a CancelledError: If the future was cancelled.
390n/a TimeoutError: If the future didn't finish executing before the given
391n/a timeout.
392n/a Exception: If the call raised then that exception will be raised.
393n/a """
394n/a with self._condition:
395n/a if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
396n/a raise CancelledError()
397n/a elif self._state == FINISHED:
398n/a return self.__get_result()
399n/a
400n/a self._condition.wait(timeout)
401n/a
402n/a if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
403n/a raise CancelledError()
404n/a elif self._state == FINISHED:
405n/a return self.__get_result()
406n/a else:
407n/a raise TimeoutError()
408n/a
409n/a def exception(self, timeout=None):
410n/a """Return the exception raised by the call that the future represents.
411n/a
412n/a Args:
413n/a timeout: The number of seconds to wait for the exception if the
414n/a future isn't done. If None, then there is no limit on the wait
415n/a time.
416n/a
417n/a Returns:
418n/a The exception raised by the call that the future represents or None
419n/a if the call completed without raising.
420n/a
421n/a Raises:
422n/a CancelledError: If the future was cancelled.
423n/a TimeoutError: If the future didn't finish executing before the given
424n/a timeout.
425n/a """
426n/a
427n/a with self._condition:
428n/a if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
429n/a raise CancelledError()
430n/a elif self._state == FINISHED:
431n/a return self._exception
432n/a
433n/a self._condition.wait(timeout)
434n/a
435n/a if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436n/a raise CancelledError()
437n/a elif self._state == FINISHED:
438n/a return self._exception
439n/a else:
440n/a raise TimeoutError()
441n/a
442n/a # The following methods should only be used by Executors and in tests.
443n/a def set_running_or_notify_cancel(self):
444n/a """Mark the future as running or process any cancel notifications.
445n/a
446n/a Should only be used by Executor implementations and unit tests.
447n/a
448n/a If the future has been cancelled (cancel() was called and returned
449n/a True) then any threads waiting on the future completing (though calls
450n/a to as_completed() or wait()) are notified and False is returned.
451n/a
452n/a If the future was not cancelled then it is put in the running state
453n/a (future calls to running() will return True) and True is returned.
454n/a
455n/a This method should be called by Executor implementations before
456n/a executing the work associated with this future. If this method returns
457n/a False then the work should not be executed.
458n/a
459n/a Returns:
460n/a False if the Future was cancelled, True otherwise.
461n/a
462n/a Raises:
463n/a RuntimeError: if this method was already called or if set_result()
464n/a or set_exception() was called.
465n/a """
466n/a with self._condition:
467n/a if self._state == CANCELLED:
468n/a self._state = CANCELLED_AND_NOTIFIED
469n/a for waiter in self._waiters:
470n/a waiter.add_cancelled(self)
471n/a # self._condition.notify_all() is not necessary because
472n/a # self.cancel() triggers a notification.
473n/a return False
474n/a elif self._state == PENDING:
475n/a self._state = RUNNING
476n/a return True
477n/a else:
478n/a LOGGER.critical('Future %s in unexpected state: %s',
479n/a id(self),
480n/a self._state)
481n/a raise RuntimeError('Future in unexpected state')
482n/a
483n/a def set_result(self, result):
484n/a """Sets the return value of work associated with the future.
485n/a
486n/a Should only be used by Executor implementations and unit tests.
487n/a """
488n/a with self._condition:
489n/a self._result = result
490n/a self._state = FINISHED
491n/a for waiter in self._waiters:
492n/a waiter.add_result(self)
493n/a self._condition.notify_all()
494n/a self._invoke_callbacks()
495n/a
496n/a def set_exception(self, exception):
497n/a """Sets the result of the future as being the given exception.
498n/a
499n/a Should only be used by Executor implementations and unit tests.
500n/a """
501n/a with self._condition:
502n/a self._exception = exception
503n/a self._state = FINISHED
504n/a for waiter in self._waiters:
505n/a waiter.add_exception(self)
506n/a self._condition.notify_all()
507n/a self._invoke_callbacks()
508n/a
509n/aclass Executor(object):
510n/a """This is an abstract base class for concrete asynchronous executors."""
511n/a
512n/a def submit(self, fn, *args, **kwargs):
513n/a """Submits a callable to be executed with the given arguments.
514n/a
515n/a Schedules the callable to be executed as fn(*args, **kwargs) and returns
516n/a a Future instance representing the execution of the callable.
517n/a
518n/a Returns:
519n/a A Future representing the given call.
520n/a """
521n/a raise NotImplementedError()
522n/a
523n/a def map(self, fn, *iterables, timeout=None, chunksize=1):
524n/a """Returns an iterator equivalent to map(fn, iter).
525n/a
526n/a Args:
527n/a fn: A callable that will take as many arguments as there are
528n/a passed iterables.
529n/a timeout: The maximum number of seconds to wait. If None, then there
530n/a is no limit on the wait time.
531n/a chunksize: The size of the chunks the iterable will be broken into
532n/a before being passed to a child process. This argument is only
533n/a used by ProcessPoolExecutor; it is ignored by
534n/a ThreadPoolExecutor.
535n/a
536n/a Returns:
537n/a An iterator equivalent to: map(func, *iterables) but the calls may
538n/a be evaluated out-of-order.
539n/a
540n/a Raises:
541n/a TimeoutError: If the entire result iterator could not be generated
542n/a before the given timeout.
543n/a Exception: If fn(*args) raises for any values.
544n/a """
545n/a if timeout is not None:
546n/a end_time = timeout + time.time()
547n/a
548n/a fs = [self.submit(fn, *args) for args in zip(*iterables)]
549n/a
550n/a # Yield must be hidden in closure so that the futures are submitted
551n/a # before the first iterator value is required.
552n/a def result_iterator():
553n/a try:
554n/a for future in fs:
555n/a if timeout is None:
556n/a yield future.result()
557n/a else:
558n/a yield future.result(end_time - time.time())
559n/a finally:
560n/a for future in fs:
561n/a future.cancel()
562n/a return result_iterator()
563n/a
564n/a def shutdown(self, wait=True):
565n/a """Clean-up the resources associated with the Executor.
566n/a
567n/a It is safe to call this method several times. Otherwise, no other
568n/a methods can be called after this one.
569n/a
570n/a Args:
571n/a wait: If True then shutdown will not return until all running
572n/a futures have finished executing and the resources used by the
573n/a executor have been reclaimed.
574n/a """
575n/a pass
576n/a
577n/a def __enter__(self):
578n/a return self
579n/a
580n/a def __exit__(self, exc_type, exc_val, exc_tb):
581n/a self.shutdown(wait=True)
582n/a return False