ยปCore Development>Code coverage>Lib/asyncio/tasks.py

Python code coverage for Lib/asyncio/tasks.py

#countcontent
1n/a"""Support for tasks, coroutines and the scheduler."""
2n/a
3n/a__all__ = ['Task',
4n/a 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5n/a 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
6n/a 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
7n/a ]
8n/a
9n/aimport concurrent.futures
10n/aimport functools
11n/aimport inspect
12n/aimport warnings
13n/aimport weakref
14n/a
15n/afrom . import base_tasks
16n/afrom . import compat
17n/afrom . import coroutines
18n/afrom . import events
19n/afrom . import futures
20n/afrom .coroutines import coroutine
21n/a
22n/a
23n/aclass Task(futures.Future):
24n/a """A coroutine wrapped in a Future."""
25n/a
26n/a # An important invariant maintained while a Task not done:
27n/a #
28n/a # - Either _fut_waiter is None, and _step() is scheduled;
29n/a # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30n/a #
31n/a # The only transition from the latter to the former is through
32n/a # _wakeup(). When _fut_waiter is not None, one of its callbacks
33n/a # must be _wakeup().
34n/a
35n/a # Weak set containing all tasks alive.
36n/a _all_tasks = weakref.WeakSet()
37n/a
38n/a # Dictionary containing tasks that are currently active in
39n/a # all running event loops. {EventLoop: Task}
40n/a _current_tasks = {}
41n/a
42n/a # If False, don't log a message if the task is destroyed whereas its
43n/a # status is still pending
44n/a _log_destroy_pending = True
45n/a
46n/a @classmethod
47n/a def current_task(cls, loop=None):
48n/a """Return the currently running task in an event loop or None.
49n/a
50n/a By default the current task for the current event loop is returned.
51n/a
52n/a None is returned when called not in the context of a Task.
53n/a """
54n/a if loop is None:
55n/a loop = events.get_event_loop()
56n/a return cls._current_tasks.get(loop)
57n/a
58n/a @classmethod
59n/a def all_tasks(cls, loop=None):
60n/a """Return a set of all tasks for an event loop.
61n/a
62n/a By default all tasks for the current event loop are returned.
63n/a """
64n/a if loop is None:
65n/a loop = events.get_event_loop()
66n/a return {t for t in cls._all_tasks if t._loop is loop}
67n/a
68n/a def __init__(self, coro, *, loop=None):
69n/a assert coroutines.iscoroutine(coro), repr(coro)
70n/a super().__init__(loop=loop)
71n/a if self._source_traceback:
72n/a del self._source_traceback[-1]
73n/a self._coro = coro
74n/a self._fut_waiter = None
75n/a self._must_cancel = False
76n/a self._loop.call_soon(self._step)
77n/a self.__class__._all_tasks.add(self)
78n/a
79n/a # On Python 3.3 or older, objects with a destructor that are part of a
80n/a # reference cycle are never destroyed. That's not the case any more on
81n/a # Python 3.4 thanks to the PEP 442.
82n/a if compat.PY34:
83n/a def __del__(self):
84n/a if self._state == futures._PENDING and self._log_destroy_pending:
85n/a context = {
86n/a 'task': self,
87n/a 'message': 'Task was destroyed but it is pending!',
88n/a }
89n/a if self._source_traceback:
90n/a context['source_traceback'] = self._source_traceback
91n/a self._loop.call_exception_handler(context)
92n/a futures.Future.__del__(self)
93n/a
94n/a def _repr_info(self):
95n/a return base_tasks._task_repr_info(self)
96n/a
97n/a def get_stack(self, *, limit=None):
98n/a """Return the list of stack frames for this task's coroutine.
99n/a
100n/a If the coroutine is not done, this returns the stack where it is
101n/a suspended. If the coroutine has completed successfully or was
102n/a cancelled, this returns an empty list. If the coroutine was
103n/a terminated by an exception, this returns the list of traceback
104n/a frames.
105n/a
106n/a The frames are always ordered from oldest to newest.
107n/a
108n/a The optional limit gives the maximum number of frames to
109n/a return; by default all available frames are returned. Its
110n/a meaning differs depending on whether a stack or a traceback is
111n/a returned: the newest frames of a stack are returned, but the
112n/a oldest frames of a traceback are returned. (This matches the
113n/a behavior of the traceback module.)
114n/a
115n/a For reasons beyond our control, only one stack frame is
116n/a returned for a suspended coroutine.
117n/a """
118n/a return base_tasks._task_get_stack(self, limit)
119n/a
120n/a def print_stack(self, *, limit=None, file=None):
121n/a """Print the stack or traceback for this task's coroutine.
122n/a
123n/a This produces output similar to that of the traceback module,
124n/a for the frames retrieved by get_stack(). The limit argument
125n/a is passed to get_stack(). The file argument is an I/O stream
126n/a to which the output is written; by default output is written
127n/a to sys.stderr.
128n/a """
129n/a return base_tasks._task_print_stack(self, limit, file)
130n/a
131n/a def cancel(self):
132n/a """Request that this task cancel itself.
133n/a
134n/a This arranges for a CancelledError to be thrown into the
135n/a wrapped coroutine on the next cycle through the event loop.
136n/a The coroutine then has a chance to clean up or even deny
137n/a the request using try/except/finally.
138n/a
139n/a Unlike Future.cancel, this does not guarantee that the
140n/a task will be cancelled: the exception might be caught and
141n/a acted upon, delaying cancellation of the task or preventing
142n/a cancellation completely. The task may also return a value or
143n/a raise a different exception.
144n/a
145n/a Immediately after this method is called, Task.cancelled() will
146n/a not return True (unless the task was already cancelled). A
147n/a task will be marked as cancelled when the wrapped coroutine
148n/a terminates with a CancelledError exception (even if cancel()
149n/a was not called).
150n/a """
151n/a if self.done():
152n/a return False
153n/a if self._fut_waiter is not None:
154n/a if self._fut_waiter.cancel():
155n/a # Leave self._fut_waiter; it may be a Task that
156n/a # catches and ignores the cancellation so we may have
157n/a # to cancel it again later.
158n/a return True
159n/a # It must be the case that self._step is already scheduled.
160n/a self._must_cancel = True
161n/a return True
162n/a
163n/a def _step(self, exc=None):
164n/a assert not self.done(), \
165n/a '_step(): already done: {!r}, {!r}'.format(self, exc)
166n/a if self._must_cancel:
167n/a if not isinstance(exc, futures.CancelledError):
168n/a exc = futures.CancelledError()
169n/a self._must_cancel = False
170n/a coro = self._coro
171n/a self._fut_waiter = None
172n/a
173n/a self.__class__._current_tasks[self._loop] = self
174n/a # Call either coro.throw(exc) or coro.send(None).
175n/a try:
176n/a if exc is None:
177n/a # We use the `send` method directly, because coroutines
178n/a # don't have `__iter__` and `__next__` methods.
179n/a result = coro.send(None)
180n/a else:
181n/a result = coro.throw(exc)
182n/a except StopIteration as exc:
183n/a self.set_result(exc.value)
184n/a except futures.CancelledError:
185n/a super().cancel() # I.e., Future.cancel(self).
186n/a except Exception as exc:
187n/a self.set_exception(exc)
188n/a except BaseException as exc:
189n/a self.set_exception(exc)
190n/a raise
191n/a else:
192n/a blocking = getattr(result, '_asyncio_future_blocking', None)
193n/a if blocking is not None:
194n/a # Yielded Future must come from Future.__iter__().
195n/a if result._loop is not self._loop:
196n/a self._loop.call_soon(
197n/a self._step,
198n/a RuntimeError(
199n/a 'Task {!r} got Future {!r} attached to a '
200n/a 'different loop'.format(self, result)))
201n/a elif blocking:
202n/a if result is self:
203n/a self._loop.call_soon(
204n/a self._step,
205n/a RuntimeError(
206n/a 'Task cannot await on itself: {!r}'.format(
207n/a self)))
208n/a else:
209n/a result._asyncio_future_blocking = False
210n/a result.add_done_callback(self._wakeup)
211n/a self._fut_waiter = result
212n/a if self._must_cancel:
213n/a if self._fut_waiter.cancel():
214n/a self._must_cancel = False
215n/a else:
216n/a self._loop.call_soon(
217n/a self._step,
218n/a RuntimeError(
219n/a 'yield was used instead of yield from '
220n/a 'in task {!r} with {!r}'.format(self, result)))
221n/a elif result is None:
222n/a # Bare yield relinquishes control for one event loop iteration.
223n/a self._loop.call_soon(self._step)
224n/a elif inspect.isgenerator(result):
225n/a # Yielding a generator is just wrong.
226n/a self._loop.call_soon(
227n/a self._step,
228n/a RuntimeError(
229n/a 'yield was used instead of yield from for '
230n/a 'generator in task {!r} with {}'.format(
231n/a self, result)))
232n/a else:
233n/a # Yielding something else is an error.
234n/a self._loop.call_soon(
235n/a self._step,
236n/a RuntimeError(
237n/a 'Task got bad yield: {!r}'.format(result)))
238n/a finally:
239n/a self.__class__._current_tasks.pop(self._loop)
240n/a self = None # Needed to break cycles when an exception occurs.
241n/a
242n/a def _wakeup(self, future):
243n/a try:
244n/a future.result()
245n/a except Exception as exc:
246n/a # This may also be a cancellation.
247n/a self._step(exc)
248n/a else:
249n/a # Don't pass the value of `future.result()` explicitly,
250n/a # as `Future.__iter__` and `Future.__await__` don't need it.
251n/a # If we call `_step(value, None)` instead of `_step()`,
252n/a # Python eval loop would use `.send(value)` method call,
253n/a # instead of `__next__()`, which is slower for futures
254n/a # that return non-generator iterators from their `__iter__`.
255n/a self._step()
256n/a self = None # Needed to break cycles when an exception occurs.
257n/a
258n/a
259n/a_PyTask = Task
260n/a
261n/a
262n/atry:
263n/a import _asyncio
264n/aexcept ImportError:
265n/a pass
266n/aelse:
267n/a # _CTask is needed for tests.
268n/a Task = _CTask = _asyncio.Task
269n/a
270n/a
271n/a# wait() and as_completed() similar to those in PEP 3148.
272n/a
273n/aFIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
274n/aFIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
275n/aALL_COMPLETED = concurrent.futures.ALL_COMPLETED
276n/a
277n/a
278n/a@coroutine
279n/adef wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
280n/a """Wait for the Futures and coroutines given by fs to complete.
281n/a
282n/a The sequence futures must not be empty.
283n/a
284n/a Coroutines will be wrapped in Tasks.
285n/a
286n/a Returns two sets of Future: (done, pending).
287n/a
288n/a Usage:
289n/a
290n/a done, pending = yield from asyncio.wait(fs)
291n/a
292n/a Note: This does not raise TimeoutError! Futures that aren't done
293n/a when the timeout occurs are returned in the second set.
294n/a """
295n/a if futures.isfuture(fs) or coroutines.iscoroutine(fs):
296n/a raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
297n/a if not fs:
298n/a raise ValueError('Set of coroutines/Futures is empty.')
299n/a if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
300n/a raise ValueError('Invalid return_when value: {}'.format(return_when))
301n/a
302n/a if loop is None:
303n/a loop = events.get_event_loop()
304n/a
305n/a fs = {ensure_future(f, loop=loop) for f in set(fs)}
306n/a
307n/a return (yield from _wait(fs, timeout, return_when, loop))
308n/a
309n/a
310n/adef _release_waiter(waiter, *args):
311n/a if not waiter.done():
312n/a waiter.set_result(None)
313n/a
314n/a
315n/a@coroutine
316n/adef wait_for(fut, timeout, *, loop=None):
317n/a """Wait for the single Future or coroutine to complete, with timeout.
318n/a
319n/a Coroutine will be wrapped in Task.
320n/a
321n/a Returns result of the Future or coroutine. When a timeout occurs,
322n/a it cancels the task and raises TimeoutError. To avoid the task
323n/a cancellation, wrap it in shield().
324n/a
325n/a If the wait is cancelled, the task is also cancelled.
326n/a
327n/a This function is a coroutine.
328n/a """
329n/a if loop is None:
330n/a loop = events.get_event_loop()
331n/a
332n/a if timeout is None:
333n/a return (yield from fut)
334n/a
335n/a waiter = loop.create_future()
336n/a timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
337n/a cb = functools.partial(_release_waiter, waiter)
338n/a
339n/a fut = ensure_future(fut, loop=loop)
340n/a fut.add_done_callback(cb)
341n/a
342n/a try:
343n/a # wait until the future completes or the timeout
344n/a try:
345n/a yield from waiter
346n/a except futures.CancelledError:
347n/a fut.remove_done_callback(cb)
348n/a fut.cancel()
349n/a raise
350n/a
351n/a if fut.done():
352n/a return fut.result()
353n/a else:
354n/a fut.remove_done_callback(cb)
355n/a fut.cancel()
356n/a raise futures.TimeoutError()
357n/a finally:
358n/a timeout_handle.cancel()
359n/a
360n/a
361n/a@coroutine
362n/adef _wait(fs, timeout, return_when, loop):
363n/a """Internal helper for wait() and wait_for().
364n/a
365n/a The fs argument must be a collection of Futures.
366n/a """
367n/a assert fs, 'Set of Futures is empty.'
368n/a waiter = loop.create_future()
369n/a timeout_handle = None
370n/a if timeout is not None:
371n/a timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
372n/a counter = len(fs)
373n/a
374n/a def _on_completion(f):
375n/a nonlocal counter
376n/a counter -= 1
377n/a if (counter <= 0 or
378n/a return_when == FIRST_COMPLETED or
379n/a return_when == FIRST_EXCEPTION and (not f.cancelled() and
380n/a f.exception() is not None)):
381n/a if timeout_handle is not None:
382n/a timeout_handle.cancel()
383n/a if not waiter.done():
384n/a waiter.set_result(None)
385n/a
386n/a for f in fs:
387n/a f.add_done_callback(_on_completion)
388n/a
389n/a try:
390n/a yield from waiter
391n/a finally:
392n/a if timeout_handle is not None:
393n/a timeout_handle.cancel()
394n/a
395n/a done, pending = set(), set()
396n/a for f in fs:
397n/a f.remove_done_callback(_on_completion)
398n/a if f.done():
399n/a done.add(f)
400n/a else:
401n/a pending.add(f)
402n/a return done, pending
403n/a
404n/a
405n/a# This is *not* a @coroutine! It is just an iterator (yielding Futures).
406n/adef as_completed(fs, *, loop=None, timeout=None):
407n/a """Return an iterator whose values are coroutines.
408n/a
409n/a When waiting for the yielded coroutines you'll get the results (or
410n/a exceptions!) of the original Futures (or coroutines), in the order
411n/a in which and as soon as they complete.
412n/a
413n/a This differs from PEP 3148; the proper way to use this is:
414n/a
415n/a for f in as_completed(fs):
416n/a result = yield from f # The 'yield from' may raise.
417n/a # Use result.
418n/a
419n/a If a timeout is specified, the 'yield from' will raise
420n/a TimeoutError when the timeout occurs before all Futures are done.
421n/a
422n/a Note: The futures 'f' are not necessarily members of fs.
423n/a """
424n/a if futures.isfuture(fs) or coroutines.iscoroutine(fs):
425n/a raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
426n/a loop = loop if loop is not None else events.get_event_loop()
427n/a todo = {ensure_future(f, loop=loop) for f in set(fs)}
428n/a from .queues import Queue # Import here to avoid circular import problem.
429n/a done = Queue(loop=loop)
430n/a timeout_handle = None
431n/a
432n/a def _on_timeout():
433n/a for f in todo:
434n/a f.remove_done_callback(_on_completion)
435n/a done.put_nowait(None) # Queue a dummy value for _wait_for_one().
436n/a todo.clear() # Can't do todo.remove(f) in the loop.
437n/a
438n/a def _on_completion(f):
439n/a if not todo:
440n/a return # _on_timeout() was here first.
441n/a todo.remove(f)
442n/a done.put_nowait(f)
443n/a if not todo and timeout_handle is not None:
444n/a timeout_handle.cancel()
445n/a
446n/a @coroutine
447n/a def _wait_for_one():
448n/a f = yield from done.get()
449n/a if f is None:
450n/a # Dummy value from _on_timeout().
451n/a raise futures.TimeoutError
452n/a return f.result() # May raise f.exception().
453n/a
454n/a for f in todo:
455n/a f.add_done_callback(_on_completion)
456n/a if todo and timeout is not None:
457n/a timeout_handle = loop.call_later(timeout, _on_timeout)
458n/a for _ in range(len(todo)):
459n/a yield _wait_for_one()
460n/a
461n/a
462n/a@coroutine
463n/adef sleep(delay, result=None, *, loop=None):
464n/a """Coroutine that completes after a given time (in seconds)."""
465n/a if delay == 0:
466n/a yield
467n/a return result
468n/a
469n/a if loop is None:
470n/a loop = events.get_event_loop()
471n/a future = loop.create_future()
472n/a h = future._loop.call_later(delay,
473n/a futures._set_result_unless_cancelled,
474n/a future, result)
475n/a try:
476n/a return (yield from future)
477n/a finally:
478n/a h.cancel()
479n/a
480n/a
481n/adef async_(coro_or_future, *, loop=None):
482n/a """Wrap a coroutine in a future.
483n/a
484n/a If the argument is a Future, it is returned directly.
485n/a
486n/a This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
487n/a """
488n/a
489n/a warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
490n/a DeprecationWarning,
491n/a stacklevel=2)
492n/a
493n/a return ensure_future(coro_or_future, loop=loop)
494n/a
495n/a# Silence DeprecationWarning:
496n/aglobals()['async'] = async_
497n/aasync_.__name__ = 'async'
498n/adel async_
499n/a
500n/a
501n/adef ensure_future(coro_or_future, *, loop=None):
502n/a """Wrap a coroutine or an awaitable in a future.
503n/a
504n/a If the argument is a Future, it is returned directly.
505n/a """
506n/a if futures.isfuture(coro_or_future):
507n/a if loop is not None and loop is not coro_or_future._loop:
508n/a raise ValueError('loop argument must agree with Future')
509n/a return coro_or_future
510n/a elif coroutines.iscoroutine(coro_or_future):
511n/a if loop is None:
512n/a loop = events.get_event_loop()
513n/a task = loop.create_task(coro_or_future)
514n/a if task._source_traceback:
515n/a del task._source_traceback[-1]
516n/a return task
517n/a elif compat.PY35 and inspect.isawaitable(coro_or_future):
518n/a return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
519n/a else:
520n/a raise TypeError('A Future, a coroutine or an awaitable is required')
521n/a
522n/a
523n/a@coroutine
524n/adef _wrap_awaitable(awaitable):
525n/a """Helper for asyncio.ensure_future().
526n/a
527n/a Wraps awaitable (an object with __await__) into a coroutine
528n/a that will later be wrapped in a Task by ensure_future().
529n/a """
530n/a return (yield from awaitable.__await__())
531n/a
532n/a
533n/aclass _GatheringFuture(futures.Future):
534n/a """Helper for gather().
535n/a
536n/a This overrides cancel() to cancel all the children and act more
537n/a like Task.cancel(), which doesn't immediately mark itself as
538n/a cancelled.
539n/a """
540n/a
541n/a def __init__(self, children, *, loop=None):
542n/a super().__init__(loop=loop)
543n/a self._children = children
544n/a
545n/a def cancel(self):
546n/a if self.done():
547n/a return False
548n/a ret = False
549n/a for child in self._children:
550n/a if child.cancel():
551n/a ret = True
552n/a return ret
553n/a
554n/a
555n/adef gather(*coros_or_futures, loop=None, return_exceptions=False):
556n/a """Return a future aggregating results from the given coroutines
557n/a or futures.
558n/a
559n/a Coroutines will be wrapped in a future and scheduled in the event
560n/a loop. They will not necessarily be scheduled in the same order as
561n/a passed in.
562n/a
563n/a All futures must share the same event loop. If all the tasks are
564n/a done successfully, the returned future's result is the list of
565n/a results (in the order of the original sequence, not necessarily
566n/a the order of results arrival). If *return_exceptions* is True,
567n/a exceptions in the tasks are treated the same as successful
568n/a results, and gathered in the result list; otherwise, the first
569n/a raised exception will be immediately propagated to the returned
570n/a future.
571n/a
572n/a Cancellation: if the outer Future is cancelled, all children (that
573n/a have not completed yet) are also cancelled. If any child is
574n/a cancelled, this is treated as if it raised CancelledError --
575n/a the outer Future is *not* cancelled in this case. (This is to
576n/a prevent the cancellation of one child to cause other children to
577n/a be cancelled.)
578n/a """
579n/a if not coros_or_futures:
580n/a if loop is None:
581n/a loop = events.get_event_loop()
582n/a outer = loop.create_future()
583n/a outer.set_result([])
584n/a return outer
585n/a
586n/a arg_to_fut = {}
587n/a for arg in set(coros_or_futures):
588n/a if not futures.isfuture(arg):
589n/a fut = ensure_future(arg, loop=loop)
590n/a if loop is None:
591n/a loop = fut._loop
592n/a # The caller cannot control this future, the "destroy pending task"
593n/a # warning should not be emitted.
594n/a fut._log_destroy_pending = False
595n/a else:
596n/a fut = arg
597n/a if loop is None:
598n/a loop = fut._loop
599n/a elif fut._loop is not loop:
600n/a raise ValueError("futures are tied to different event loops")
601n/a arg_to_fut[arg] = fut
602n/a
603n/a children = [arg_to_fut[arg] for arg in coros_or_futures]
604n/a nchildren = len(children)
605n/a outer = _GatheringFuture(children, loop=loop)
606n/a nfinished = 0
607n/a results = [None] * nchildren
608n/a
609n/a def _done_callback(i, fut):
610n/a nonlocal nfinished
611n/a if outer.done():
612n/a if not fut.cancelled():
613n/a # Mark exception retrieved.
614n/a fut.exception()
615n/a return
616n/a
617n/a if fut.cancelled():
618n/a res = futures.CancelledError()
619n/a if not return_exceptions:
620n/a outer.set_exception(res)
621n/a return
622n/a elif fut._exception is not None:
623n/a res = fut.exception() # Mark exception retrieved.
624n/a if not return_exceptions:
625n/a outer.set_exception(res)
626n/a return
627n/a else:
628n/a res = fut._result
629n/a results[i] = res
630n/a nfinished += 1
631n/a if nfinished == nchildren:
632n/a outer.set_result(results)
633n/a
634n/a for i, fut in enumerate(children):
635n/a fut.add_done_callback(functools.partial(_done_callback, i))
636n/a return outer
637n/a
638n/a
639n/adef shield(arg, *, loop=None):
640n/a """Wait for a future, shielding it from cancellation.
641n/a
642n/a The statement
643n/a
644n/a res = yield from shield(something())
645n/a
646n/a is exactly equivalent to the statement
647n/a
648n/a res = yield from something()
649n/a
650n/a *except* that if the coroutine containing it is cancelled, the
651n/a task running in something() is not cancelled. From the POV of
652n/a something(), the cancellation did not happen. But its caller is
653n/a still cancelled, so the yield-from expression still raises
654n/a CancelledError. Note: If something() is cancelled by other means
655n/a this will still cancel shield().
656n/a
657n/a If you want to completely ignore cancellation (not recommended)
658n/a you can combine shield() with a try/except clause, as follows:
659n/a
660n/a try:
661n/a res = yield from shield(something())
662n/a except CancelledError:
663n/a res = None
664n/a """
665n/a inner = ensure_future(arg, loop=loop)
666n/a if inner.done():
667n/a # Shortcut.
668n/a return inner
669n/a loop = inner._loop
670n/a outer = loop.create_future()
671n/a
672n/a def _done_callback(inner):
673n/a if outer.cancelled():
674n/a if not inner.cancelled():
675n/a # Mark inner's result as retrieved.
676n/a inner.exception()
677n/a return
678n/a
679n/a if inner.cancelled():
680n/a outer.cancel()
681n/a else:
682n/a exc = inner.exception()
683n/a if exc is not None:
684n/a outer.set_exception(exc)
685n/a else:
686n/a outer.set_result(inner.result())
687n/a
688n/a inner.add_done_callback(_done_callback)
689n/a return outer
690n/a
691n/a
692n/adef run_coroutine_threadsafe(coro, loop):
693n/a """Submit a coroutine object to a given event loop.
694n/a
695n/a Return a concurrent.futures.Future to access the result.
696n/a """
697n/a if not coroutines.iscoroutine(coro):
698n/a raise TypeError('A coroutine object is required')
699n/a future = concurrent.futures.Future()
700n/a
701n/a def callback():
702n/a try:
703n/a futures._chain_future(ensure_future(coro, loop=loop), future)
704n/a except Exception as exc:
705n/a if future.set_running_or_notify_cancel():
706n/a future.set_exception(exc)
707n/a raise
708n/a
709n/a loop.call_soon_threadsafe(callback)
710n/a return future