ยปCore Development>Code coverage>Lib/asyncio/coroutines.py

Python code coverage for Lib/asyncio/coroutines.py

#countcontent
1n/a__all__ = ['coroutine',
2n/a 'iscoroutinefunction', 'iscoroutine']
3n/a
4n/aimport functools
5n/aimport inspect
6n/aimport opcode
7n/aimport os
8n/aimport sys
9n/aimport traceback
10n/aimport types
11n/a
12n/afrom . import compat
13n/afrom . import events
14n/afrom . import base_futures
15n/afrom .log import logger
16n/a
17n/a
18n/a# Opcode of "yield from" instruction
19n/a_YIELD_FROM = opcode.opmap['YIELD_FROM']
20n/a
21n/a# If you set _DEBUG to true, @coroutine will wrap the resulting
22n/a# generator objects in a CoroWrapper instance (defined below). That
23n/a# instance will log a message when the generator is never iterated
24n/a# over, which may happen when you forget to use "yield from" with a
25n/a# coroutine call. Note that the value of the _DEBUG flag is taken
26n/a# when the decorator is used, so to be of any use it must be set
27n/a# before you define your coroutines. A downside of using this feature
28n/a# is that tracebacks show entries for the CoroWrapper.__next__ method
29n/a# when _DEBUG is true.
30n/a_DEBUG = (not sys.flags.ignore_environment and
31n/a bool(os.environ.get('PYTHONASYNCIODEBUG')))
32n/a
33n/a
34n/atry:
35n/a _types_coroutine = types.coroutine
36n/a _types_CoroutineType = types.CoroutineType
37n/aexcept AttributeError:
38n/a # Python 3.4
39n/a _types_coroutine = None
40n/a _types_CoroutineType = None
41n/a
42n/atry:
43n/a _inspect_iscoroutinefunction = inspect.iscoroutinefunction
44n/aexcept AttributeError:
45n/a # Python 3.4
46n/a _inspect_iscoroutinefunction = lambda func: False
47n/a
48n/atry:
49n/a from collections.abc import Coroutine as _CoroutineABC, \
50n/a Awaitable as _AwaitableABC
51n/aexcept ImportError:
52n/a _CoroutineABC = _AwaitableABC = None
53n/a
54n/a
55n/a# Check for CPython issue #21209
56n/adef has_yield_from_bug():
57n/a class MyGen:
58n/a def __init__(self):
59n/a self.send_args = None
60n/a def __iter__(self):
61n/a return self
62n/a def __next__(self):
63n/a return 42
64n/a def send(self, *what):
65n/a self.send_args = what
66n/a return None
67n/a def yield_from_gen(gen):
68n/a yield from gen
69n/a value = (1, 2, 3)
70n/a gen = MyGen()
71n/a coro = yield_from_gen(gen)
72n/a next(coro)
73n/a coro.send(value)
74n/a return gen.send_args != (value,)
75n/a_YIELD_FROM_BUG = has_yield_from_bug()
76n/adel has_yield_from_bug
77n/a
78n/a
79n/adef debug_wrapper(gen):
80n/a # This function is called from 'sys.set_coroutine_wrapper'.
81n/a # We only wrap here coroutines defined via 'async def' syntax.
82n/a # Generator-based coroutines are wrapped in @coroutine
83n/a # decorator.
84n/a return CoroWrapper(gen, None)
85n/a
86n/a
87n/aclass CoroWrapper:
88n/a # Wrapper for coroutine object in _DEBUG mode.
89n/a
90n/a def __init__(self, gen, func=None):
91n/a assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
92n/a self.gen = gen
93n/a self.func = func # Used to unwrap @coroutine decorator
94n/a self._source_traceback = traceback.extract_stack(sys._getframe(1))
95n/a self.__name__ = getattr(gen, '__name__', None)
96n/a self.__qualname__ = getattr(gen, '__qualname__', None)
97n/a
98n/a def __repr__(self):
99n/a coro_repr = _format_coroutine(self)
100n/a if self._source_traceback:
101n/a frame = self._source_traceback[-1]
102n/a coro_repr += ', created at %s:%s' % (frame[0], frame[1])
103n/a return '<%s %s>' % (self.__class__.__name__, coro_repr)
104n/a
105n/a def __iter__(self):
106n/a return self
107n/a
108n/a def __next__(self):
109n/a return self.gen.send(None)
110n/a
111n/a if _YIELD_FROM_BUG:
112n/a # For for CPython issue #21209: using "yield from" and a custom
113n/a # generator, generator.send(tuple) unpacks the tuple instead of passing
114n/a # the tuple unchanged. Check if the caller is a generator using "yield
115n/a # from" to decide if the parameter should be unpacked or not.
116n/a def send(self, *value):
117n/a frame = sys._getframe()
118n/a caller = frame.f_back
119n/a assert caller.f_lasti >= 0
120n/a if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
121n/a value = value[0]
122n/a return self.gen.send(value)
123n/a else:
124n/a def send(self, value):
125n/a return self.gen.send(value)
126n/a
127n/a def throw(self, type, value=None, traceback=None):
128n/a return self.gen.throw(type, value, traceback)
129n/a
130n/a def close(self):
131n/a return self.gen.close()
132n/a
133n/a @property
134n/a def gi_frame(self):
135n/a return self.gen.gi_frame
136n/a
137n/a @property
138n/a def gi_running(self):
139n/a return self.gen.gi_running
140n/a
141n/a @property
142n/a def gi_code(self):
143n/a return self.gen.gi_code
144n/a
145n/a if compat.PY35:
146n/a
147n/a def __await__(self):
148n/a cr_await = getattr(self.gen, 'cr_await', None)
149n/a if cr_await is not None:
150n/a raise RuntimeError(
151n/a "Cannot await on coroutine {!r} while it's "
152n/a "awaiting for {!r}".format(self.gen, cr_await))
153n/a return self
154n/a
155n/a @property
156n/a def gi_yieldfrom(self):
157n/a return self.gen.gi_yieldfrom
158n/a
159n/a @property
160n/a def cr_await(self):
161n/a return self.gen.cr_await
162n/a
163n/a @property
164n/a def cr_running(self):
165n/a return self.gen.cr_running
166n/a
167n/a @property
168n/a def cr_code(self):
169n/a return self.gen.cr_code
170n/a
171n/a @property
172n/a def cr_frame(self):
173n/a return self.gen.cr_frame
174n/a
175n/a def __del__(self):
176n/a # Be careful accessing self.gen.frame -- self.gen might not exist.
177n/a gen = getattr(self, 'gen', None)
178n/a frame = getattr(gen, 'gi_frame', None)
179n/a if frame is None:
180n/a frame = getattr(gen, 'cr_frame', None)
181n/a if frame is not None and frame.f_lasti == -1:
182n/a msg = '%r was never yielded from' % self
183n/a tb = getattr(self, '_source_traceback', ())
184n/a if tb:
185n/a tb = ''.join(traceback.format_list(tb))
186n/a msg += ('\nCoroutine object created at '
187n/a '(most recent call last):\n')
188n/a msg += tb.rstrip()
189n/a logger.error(msg)
190n/a
191n/a
192n/adef coroutine(func):
193n/a """Decorator to mark coroutines.
194n/a
195n/a If the coroutine is not yielded from before it is destroyed,
196n/a an error message is logged.
197n/a """
198n/a if _inspect_iscoroutinefunction(func):
199n/a # In Python 3.5 that's all we need to do for coroutines
200n/a # defiend with "async def".
201n/a # Wrapping in CoroWrapper will happen via
202n/a # 'sys.set_coroutine_wrapper' function.
203n/a return func
204n/a
205n/a if inspect.isgeneratorfunction(func):
206n/a coro = func
207n/a else:
208n/a @functools.wraps(func)
209n/a def coro(*args, **kw):
210n/a res = func(*args, **kw)
211n/a if (base_futures.isfuture(res) or inspect.isgenerator(res) or
212n/a isinstance(res, CoroWrapper)):
213n/a res = yield from res
214n/a elif _AwaitableABC is not None:
215n/a # If 'func' returns an Awaitable (new in 3.5) we
216n/a # want to run it.
217n/a try:
218n/a await_meth = res.__await__
219n/a except AttributeError:
220n/a pass
221n/a else:
222n/a if isinstance(res, _AwaitableABC):
223n/a res = yield from await_meth()
224n/a return res
225n/a
226n/a if not _DEBUG:
227n/a if _types_coroutine is None:
228n/a wrapper = coro
229n/a else:
230n/a wrapper = _types_coroutine(coro)
231n/a else:
232n/a @functools.wraps(func)
233n/a def wrapper(*args, **kwds):
234n/a w = CoroWrapper(coro(*args, **kwds), func=func)
235n/a if w._source_traceback:
236n/a del w._source_traceback[-1]
237n/a # Python < 3.5 does not implement __qualname__
238n/a # on generator objects, so we set it manually.
239n/a # We use getattr as some callables (such as
240n/a # functools.partial may lack __qualname__).
241n/a w.__name__ = getattr(func, '__name__', None)
242n/a w.__qualname__ = getattr(func, '__qualname__', None)
243n/a return w
244n/a
245n/a wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().
246n/a return wrapper
247n/a
248n/a
249n/a# A marker for iscoroutinefunction.
250n/a_is_coroutine = object()
251n/a
252n/a
253n/adef iscoroutinefunction(func):
254n/a """Return True if func is a decorated coroutine function."""
255n/a return (getattr(func, '_is_coroutine', None) is _is_coroutine or
256n/a _inspect_iscoroutinefunction(func))
257n/a
258n/a
259n/a_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
260n/aif _CoroutineABC is not None:
261n/a _COROUTINE_TYPES += (_CoroutineABC,)
262n/aif _types_CoroutineType is not None:
263n/a # Prioritize native coroutine check to speed-up
264n/a # asyncio.iscoroutine.
265n/a _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES
266n/a
267n/a
268n/adef iscoroutine(obj):
269n/a """Return True if obj is a coroutine object."""
270n/a return isinstance(obj, _COROUTINE_TYPES)
271n/a
272n/a
273n/adef _format_coroutine(coro):
274n/a assert iscoroutine(coro)
275n/a
276n/a if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'):
277n/a # Most likely a built-in type or a Cython coroutine.
278n/a
279n/a # Built-in types might not have __qualname__ or __name__.
280n/a coro_name = getattr(
281n/a coro, '__qualname__',
282n/a getattr(coro, '__name__', type(coro).__name__))
283n/a coro_name = '{}()'.format(coro_name)
284n/a
285n/a running = False
286n/a try:
287n/a running = coro.cr_running
288n/a except AttributeError:
289n/a try:
290n/a running = coro.gi_running
291n/a except AttributeError:
292n/a pass
293n/a
294n/a if running:
295n/a return '{} running'.format(coro_name)
296n/a else:
297n/a return coro_name
298n/a
299n/a coro_name = None
300n/a if isinstance(coro, CoroWrapper):
301n/a func = coro.func
302n/a coro_name = coro.__qualname__
303n/a if coro_name is not None:
304n/a coro_name = '{}()'.format(coro_name)
305n/a else:
306n/a func = coro
307n/a
308n/a if coro_name is None:
309n/a coro_name = events._format_callback(func, (), {})
310n/a
311n/a try:
312n/a coro_code = coro.gi_code
313n/a except AttributeError:
314n/a coro_code = coro.cr_code
315n/a
316n/a try:
317n/a coro_frame = coro.gi_frame
318n/a except AttributeError:
319n/a coro_frame = coro.cr_frame
320n/a
321n/a filename = coro_code.co_filename
322n/a lineno = 0
323n/a if (isinstance(coro, CoroWrapper) and
324n/a not inspect.isgeneratorfunction(coro.func) and
325n/a coro.func is not None):
326n/a source = events._get_function_source(coro.func)
327n/a if source is not None:
328n/a filename, lineno = source
329n/a if coro_frame is None:
330n/a coro_repr = ('%s done, defined at %s:%s'
331n/a % (coro_name, filename, lineno))
332n/a else:
333n/a coro_repr = ('%s running, defined at %s:%s'
334n/a % (coro_name, filename, lineno))
335n/a elif coro_frame is not None:
336n/a lineno = coro_frame.f_lineno
337n/a coro_repr = ('%s running at %s:%s'
338n/a % (coro_name, filename, lineno))
339n/a else:
340n/a lineno = coro_code.co_firstlineno
341n/a coro_repr = ('%s done, defined at %s:%s'
342n/a % (coro_name, filename, lineno))
343n/a
344n/a return coro_repr