ยปCore Development>Code coverage>Lib/test/test_asyncio/test_windows_events.py

Python code coverage for Lib/test/test_asyncio/test_windows_events.py

#countcontent
1n/aimport os
2n/aimport sys
3n/aimport unittest
4n/afrom unittest import mock
5n/a
6n/aif sys.platform != 'win32':
7n/a raise unittest.SkipTest('Windows only')
8n/a
9n/aimport _winapi
10n/a
11n/aimport asyncio
12n/afrom asyncio import _overlapped
13n/afrom asyncio import test_utils
14n/afrom asyncio import windows_events
15n/a
16n/a
17n/aclass UpperProto(asyncio.Protocol):
18n/a def __init__(self):
19n/a self.buf = []
20n/a
21n/a def connection_made(self, trans):
22n/a self.trans = trans
23n/a
24n/a def data_received(self, data):
25n/a self.buf.append(data)
26n/a if b'\n' in data:
27n/a self.trans.write(b''.join(self.buf).upper())
28n/a self.trans.close()
29n/a
30n/a
31n/aclass ProactorTests(test_utils.TestCase):
32n/a
33n/a def setUp(self):
34n/a super().setUp()
35n/a self.loop = asyncio.ProactorEventLoop()
36n/a self.set_event_loop(self.loop)
37n/a
38n/a def test_close(self):
39n/a a, b = self.loop._socketpair()
40n/a trans = self.loop._make_socket_transport(a, asyncio.Protocol())
41n/a f = asyncio.ensure_future(self.loop.sock_recv(b, 100))
42n/a trans.close()
43n/a self.loop.run_until_complete(f)
44n/a self.assertEqual(f.result(), b'')
45n/a b.close()
46n/a
47n/a def test_double_bind(self):
48n/a ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
49n/a server1 = windows_events.PipeServer(ADDRESS)
50n/a with self.assertRaises(PermissionError):
51n/a windows_events.PipeServer(ADDRESS)
52n/a server1.close()
53n/a
54n/a def test_pipe(self):
55n/a res = self.loop.run_until_complete(self._test_pipe())
56n/a self.assertEqual(res, 'done')
57n/a
58n/a def _test_pipe(self):
59n/a ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
60n/a
61n/a with self.assertRaises(FileNotFoundError):
62n/a yield from self.loop.create_pipe_connection(
63n/a asyncio.Protocol, ADDRESS)
64n/a
65n/a [server] = yield from self.loop.start_serving_pipe(
66n/a UpperProto, ADDRESS)
67n/a self.assertIsInstance(server, windows_events.PipeServer)
68n/a
69n/a clients = []
70n/a for i in range(5):
71n/a stream_reader = asyncio.StreamReader(loop=self.loop)
72n/a protocol = asyncio.StreamReaderProtocol(stream_reader,
73n/a loop=self.loop)
74n/a trans, proto = yield from self.loop.create_pipe_connection(
75n/a lambda: protocol, ADDRESS)
76n/a self.assertIsInstance(trans, asyncio.Transport)
77n/a self.assertEqual(protocol, proto)
78n/a clients.append((stream_reader, trans))
79n/a
80n/a for i, (r, w) in enumerate(clients):
81n/a w.write('lower-{}\n'.format(i).encode())
82n/a
83n/a for i, (r, w) in enumerate(clients):
84n/a response = yield from r.readline()
85n/a self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
86n/a w.close()
87n/a
88n/a server.close()
89n/a
90n/a with self.assertRaises(FileNotFoundError):
91n/a yield from self.loop.create_pipe_connection(
92n/a asyncio.Protocol, ADDRESS)
93n/a
94n/a return 'done'
95n/a
96n/a def test_connect_pipe_cancel(self):
97n/a exc = OSError()
98n/a exc.winerror = _overlapped.ERROR_PIPE_BUSY
99n/a with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
100n/a coro = self.loop._proactor.connect_pipe('pipe_address')
101n/a task = self.loop.create_task(coro)
102n/a
103n/a # check that it's possible to cancel connect_pipe()
104n/a task.cancel()
105n/a with self.assertRaises(asyncio.CancelledError):
106n/a self.loop.run_until_complete(task)
107n/a
108n/a def test_wait_for_handle(self):
109n/a event = _overlapped.CreateEvent(None, True, False, None)
110n/a self.addCleanup(_winapi.CloseHandle, event)
111n/a
112n/a # Wait for unset event with 0.5s timeout;
113n/a # result should be False at timeout
114n/a fut = self.loop._proactor.wait_for_handle(event, 0.5)
115n/a start = self.loop.time()
116n/a done = self.loop.run_until_complete(fut)
117n/a elapsed = self.loop.time() - start
118n/a
119n/a self.assertEqual(done, False)
120n/a self.assertFalse(fut.result())
121n/a self.assertTrue(0.48 < elapsed < 0.9, elapsed)
122n/a
123n/a _overlapped.SetEvent(event)
124n/a
125n/a # Wait for set event;
126n/a # result should be True immediately
127n/a fut = self.loop._proactor.wait_for_handle(event, 10)
128n/a start = self.loop.time()
129n/a done = self.loop.run_until_complete(fut)
130n/a elapsed = self.loop.time() - start
131n/a
132n/a self.assertEqual(done, True)
133n/a self.assertTrue(fut.result())
134n/a self.assertTrue(0 <= elapsed < 0.3, elapsed)
135n/a
136n/a # asyncio issue #195: cancelling a done _WaitHandleFuture
137n/a # must not crash
138n/a fut.cancel()
139n/a
140n/a def test_wait_for_handle_cancel(self):
141n/a event = _overlapped.CreateEvent(None, True, False, None)
142n/a self.addCleanup(_winapi.CloseHandle, event)
143n/a
144n/a # Wait for unset event with a cancelled future;
145n/a # CancelledError should be raised immediately
146n/a fut = self.loop._proactor.wait_for_handle(event, 10)
147n/a fut.cancel()
148n/a start = self.loop.time()
149n/a with self.assertRaises(asyncio.CancelledError):
150n/a self.loop.run_until_complete(fut)
151n/a elapsed = self.loop.time() - start
152n/a self.assertTrue(0 <= elapsed < 0.1, elapsed)
153n/a
154n/a # asyncio issue #195: cancelling a _WaitHandleFuture twice
155n/a # must not crash
156n/a fut = self.loop._proactor.wait_for_handle(event)
157n/a fut.cancel()
158n/a fut.cancel()
159n/a
160n/a
161n/aif __name__ == '__main__':
162n/a unittest.main()