ยปCore Development>Code coverage>Lib/test/test_asyncio/test_pep492.py

Python code coverage for Lib/test/test_asyncio/test_pep492.py

#countcontent
1n/a"""Tests support for new syntax introduced by PEP 492."""
2n/a
3n/aimport collections.abc
4n/aimport types
5n/aimport unittest
6n/a
7n/atry:
8n/a from test import support
9n/aexcept ImportError:
10n/a from asyncio import test_support as support
11n/afrom unittest import mock
12n/a
13n/aimport asyncio
14n/afrom asyncio import test_utils
15n/a
16n/a
17n/aclass BaseTest(test_utils.TestCase):
18n/a
19n/a def setUp(self):
20n/a super().setUp()
21n/a self.loop = asyncio.BaseEventLoop()
22n/a self.loop._process_events = mock.Mock()
23n/a self.loop._selector = mock.Mock()
24n/a self.loop._selector.select.return_value = ()
25n/a self.set_event_loop(self.loop)
26n/a
27n/a
28n/aclass LockTests(BaseTest):
29n/a
30n/a def test_context_manager_async_with(self):
31n/a primitives = [
32n/a asyncio.Lock(loop=self.loop),
33n/a asyncio.Condition(loop=self.loop),
34n/a asyncio.Semaphore(loop=self.loop),
35n/a asyncio.BoundedSemaphore(loop=self.loop),
36n/a ]
37n/a
38n/a async def test(lock):
39n/a await asyncio.sleep(0.01, loop=self.loop)
40n/a self.assertFalse(lock.locked())
41n/a async with lock as _lock:
42n/a self.assertIs(_lock, None)
43n/a self.assertTrue(lock.locked())
44n/a await asyncio.sleep(0.01, loop=self.loop)
45n/a self.assertTrue(lock.locked())
46n/a self.assertFalse(lock.locked())
47n/a
48n/a for primitive in primitives:
49n/a self.loop.run_until_complete(test(primitive))
50n/a self.assertFalse(primitive.locked())
51n/a
52n/a def test_context_manager_with_await(self):
53n/a primitives = [
54n/a asyncio.Lock(loop=self.loop),
55n/a asyncio.Condition(loop=self.loop),
56n/a asyncio.Semaphore(loop=self.loop),
57n/a asyncio.BoundedSemaphore(loop=self.loop),
58n/a ]
59n/a
60n/a async def test(lock):
61n/a await asyncio.sleep(0.01, loop=self.loop)
62n/a self.assertFalse(lock.locked())
63n/a with await lock as _lock:
64n/a self.assertIs(_lock, None)
65n/a self.assertTrue(lock.locked())
66n/a await asyncio.sleep(0.01, loop=self.loop)
67n/a self.assertTrue(lock.locked())
68n/a self.assertFalse(lock.locked())
69n/a
70n/a for primitive in primitives:
71n/a self.loop.run_until_complete(test(primitive))
72n/a self.assertFalse(primitive.locked())
73n/a
74n/a
75n/aclass StreamReaderTests(BaseTest):
76n/a
77n/a def test_readline(self):
78n/a DATA = b'line1\nline2\nline3'
79n/a
80n/a stream = asyncio.StreamReader(loop=self.loop)
81n/a stream.feed_data(DATA)
82n/a stream.feed_eof()
83n/a
84n/a async def reader():
85n/a data = []
86n/a async for line in stream:
87n/a data.append(line)
88n/a return data
89n/a
90n/a data = self.loop.run_until_complete(reader())
91n/a self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
92n/a
93n/a
94n/aclass CoroutineTests(BaseTest):
95n/a
96n/a def test_iscoroutine(self):
97n/a async def foo(): pass
98n/a
99n/a f = foo()
100n/a try:
101n/a self.assertTrue(asyncio.iscoroutine(f))
102n/a finally:
103n/a f.close() # silence warning
104n/a
105n/a # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
106n/a class FakeCoro:
107n/a def send(self, value): pass
108n/a def throw(self, typ, val=None, tb=None): pass
109n/a def close(self): pass
110n/a def __await__(self): yield
111n/a
112n/a self.assertTrue(asyncio.iscoroutine(FakeCoro()))
113n/a
114n/a def test_iscoroutinefunction(self):
115n/a async def foo(): pass
116n/a self.assertTrue(asyncio.iscoroutinefunction(foo))
117n/a
118n/a def test_function_returning_awaitable(self):
119n/a class Awaitable:
120n/a def __await__(self):
121n/a return ('spam',)
122n/a
123n/a @asyncio.coroutine
124n/a def func():
125n/a return Awaitable()
126n/a
127n/a coro = func()
128n/a self.assertEqual(coro.send(None), 'spam')
129n/a coro.close()
130n/a
131n/a def test_async_def_coroutines(self):
132n/a async def bar():
133n/a return 'spam'
134n/a async def foo():
135n/a return await bar()
136n/a
137n/a # production mode
138n/a data = self.loop.run_until_complete(foo())
139n/a self.assertEqual(data, 'spam')
140n/a
141n/a # debug mode
142n/a self.loop.set_debug(True)
143n/a data = self.loop.run_until_complete(foo())
144n/a self.assertEqual(data, 'spam')
145n/a
146n/a @mock.patch('asyncio.coroutines.logger')
147n/a def test_async_def_wrapped(self, m_log):
148n/a async def foo():
149n/a pass
150n/a async def start():
151n/a foo_coro = foo()
152n/a self.assertRegex(
153n/a repr(foo_coro),
154n/a r'<CoroWrapper .*\.foo\(\) running at .*pep492.*>')
155n/a
156n/a with support.check_warnings((r'.*foo.*was never',
157n/a RuntimeWarning)):
158n/a foo_coro = None
159n/a support.gc_collect()
160n/a self.assertTrue(m_log.error.called)
161n/a message = m_log.error.call_args[0][0]
162n/a self.assertRegex(message,
163n/a r'CoroWrapper.*foo.*was never')
164n/a
165n/a self.loop.set_debug(True)
166n/a self.loop.run_until_complete(start())
167n/a
168n/a async def start():
169n/a foo_coro = foo()
170n/a task = asyncio.ensure_future(foo_coro, loop=self.loop)
171n/a self.assertRegex(repr(task), r'Task.*foo.*running')
172n/a
173n/a self.loop.run_until_complete(start())
174n/a
175n/a
176n/a def test_types_coroutine(self):
177n/a def gen():
178n/a yield from ()
179n/a return 'spam'
180n/a
181n/a @types.coroutine
182n/a def func():
183n/a return gen()
184n/a
185n/a async def coro():
186n/a wrapper = func()
187n/a self.assertIsInstance(wrapper, types._GeneratorWrapper)
188n/a return await wrapper
189n/a
190n/a data = self.loop.run_until_complete(coro())
191n/a self.assertEqual(data, 'spam')
192n/a
193n/a def test_task_print_stack(self):
194n/a T = None
195n/a
196n/a async def foo():
197n/a f = T.get_stack(limit=1)
198n/a try:
199n/a self.assertEqual(f[0].f_code.co_name, 'foo')
200n/a finally:
201n/a f = None
202n/a
203n/a async def runner():
204n/a nonlocal T
205n/a T = asyncio.ensure_future(foo(), loop=self.loop)
206n/a await T
207n/a
208n/a self.loop.run_until_complete(runner())
209n/a
210n/a def test_double_await(self):
211n/a async def afunc():
212n/a await asyncio.sleep(0.1, loop=self.loop)
213n/a
214n/a async def runner():
215n/a coro = afunc()
216n/a t = asyncio.Task(coro, loop=self.loop)
217n/a try:
218n/a await asyncio.sleep(0, loop=self.loop)
219n/a await coro
220n/a finally:
221n/a t.cancel()
222n/a
223n/a self.loop.set_debug(True)
224n/a with self.assertRaisesRegex(
225n/a RuntimeError,
226n/a r'Cannot await.*test_double_await.*\bafunc\b.*while.*\bsleep\b'):
227n/a
228n/a self.loop.run_until_complete(runner())
229n/a
230n/a
231n/aif __name__ == '__main__':
232n/a unittest.main()