ยปCore Development>Code coverage>Lib/test/test_poll.py

Python code coverage for Lib/test/test_poll.py

#countcontent
1n/a# Test case for the os.poll() function
2n/a
3n/aimport os
4n/aimport subprocess
5n/aimport random
6n/aimport select
7n/atry:
8n/a import threading
9n/aexcept ImportError:
10n/a threading = None
11n/aimport time
12n/aimport unittest
13n/afrom test.support import TESTFN, run_unittest, reap_threads, cpython_only
14n/a
15n/atry:
16n/a select.poll
17n/aexcept AttributeError:
18n/a raise unittest.SkipTest("select.poll not defined")
19n/a
20n/a
21n/adef find_ready_matching(ready, flag):
22n/a match = []
23n/a for fd, mode in ready:
24n/a if mode & flag:
25n/a match.append(fd)
26n/a return match
27n/a
28n/aclass PollTests(unittest.TestCase):
29n/a
30n/a def test_poll1(self):
31n/a # Basic functional test of poll object
32n/a # Create a bunch of pipe and test that poll works with them.
33n/a
34n/a p = select.poll()
35n/a
36n/a NUM_PIPES = 12
37n/a MSG = b" This is a test."
38n/a MSG_LEN = len(MSG)
39n/a readers = []
40n/a writers = []
41n/a r2w = {}
42n/a w2r = {}
43n/a
44n/a for i in range(NUM_PIPES):
45n/a rd, wr = os.pipe()
46n/a p.register(rd)
47n/a p.modify(rd, select.POLLIN)
48n/a p.register(wr, select.POLLOUT)
49n/a readers.append(rd)
50n/a writers.append(wr)
51n/a r2w[rd] = wr
52n/a w2r[wr] = rd
53n/a
54n/a bufs = []
55n/a
56n/a while writers:
57n/a ready = p.poll()
58n/a ready_writers = find_ready_matching(ready, select.POLLOUT)
59n/a if not ready_writers:
60n/a raise RuntimeError("no pipes ready for writing")
61n/a wr = random.choice(ready_writers)
62n/a os.write(wr, MSG)
63n/a
64n/a ready = p.poll()
65n/a ready_readers = find_ready_matching(ready, select.POLLIN)
66n/a if not ready_readers:
67n/a raise RuntimeError("no pipes ready for reading")
68n/a rd = random.choice(ready_readers)
69n/a buf = os.read(rd, MSG_LEN)
70n/a self.assertEqual(len(buf), MSG_LEN)
71n/a bufs.append(buf)
72n/a os.close(r2w[rd]) ; os.close( rd )
73n/a p.unregister( r2w[rd] )
74n/a p.unregister( rd )
75n/a writers.remove(r2w[rd])
76n/a
77n/a self.assertEqual(bufs, [MSG] * NUM_PIPES)
78n/a
79n/a def test_poll_unit_tests(self):
80n/a # returns NVAL for invalid file descriptor
81n/a FD, w = os.pipe()
82n/a os.close(FD)
83n/a os.close(w)
84n/a p = select.poll()
85n/a p.register(FD)
86n/a r = p.poll()
87n/a self.assertEqual(r[0], (FD, select.POLLNVAL))
88n/a
89n/a f = open(TESTFN, 'w')
90n/a fd = f.fileno()
91n/a p = select.poll()
92n/a p.register(f)
93n/a r = p.poll()
94n/a self.assertEqual(r[0][0], fd)
95n/a f.close()
96n/a r = p.poll()
97n/a self.assertEqual(r[0], (fd, select.POLLNVAL))
98n/a os.unlink(TESTFN)
99n/a
100n/a # type error for invalid arguments
101n/a p = select.poll()
102n/a self.assertRaises(TypeError, p.register, p)
103n/a self.assertRaises(TypeError, p.unregister, p)
104n/a
105n/a # can't unregister non-existent object
106n/a p = select.poll()
107n/a self.assertRaises(KeyError, p.unregister, 3)
108n/a
109n/a # Test error cases
110n/a pollster = select.poll()
111n/a class Nope:
112n/a pass
113n/a
114n/a class Almost:
115n/a def fileno(self):
116n/a return 'fileno'
117n/a
118n/a self.assertRaises(TypeError, pollster.register, Nope(), 0)
119n/a self.assertRaises(TypeError, pollster.register, Almost(), 0)
120n/a
121n/a # Another test case for poll(). This is copied from the test case for
122n/a # select(), modified to use poll() instead.
123n/a
124n/a def test_poll2(self):
125n/a cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
126n/a proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
127n/a bufsize=0)
128n/a proc.__enter__()
129n/a self.addCleanup(proc.__exit__, None, None, None)
130n/a p = proc.stdout
131n/a pollster = select.poll()
132n/a pollster.register( p, select.POLLIN )
133n/a for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
134n/a fdlist = pollster.poll(tout)
135n/a if (fdlist == []):
136n/a continue
137n/a fd, flags = fdlist[0]
138n/a if flags & select.POLLHUP:
139n/a line = p.readline()
140n/a if line != b"":
141n/a self.fail('error: pipe seems to be closed, but still returns data')
142n/a continue
143n/a
144n/a elif flags & select.POLLIN:
145n/a line = p.readline()
146n/a if not line:
147n/a break
148n/a self.assertEqual(line, b'testing...\n')
149n/a continue
150n/a else:
151n/a self.fail('Unexpected return value from select.poll: %s' % fdlist)
152n/a
153n/a def test_poll3(self):
154n/a # test int overflow
155n/a pollster = select.poll()
156n/a pollster.register(1)
157n/a
158n/a self.assertRaises(OverflowError, pollster.poll, 1 << 64)
159n/a
160n/a x = 2 + 3
161n/a if x != 5:
162n/a self.fail('Overflow must have occurred')
163n/a
164n/a # Issues #15989, #17919
165n/a self.assertRaises(OverflowError, pollster.register, 0, -1)
166n/a self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
167n/a self.assertRaises(OverflowError, pollster.modify, 1, -1)
168n/a self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
169n/a
170n/a @cpython_only
171n/a def test_poll_c_limits(self):
172n/a from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
173n/a pollster = select.poll()
174n/a pollster.register(1)
175n/a
176n/a # Issues #15989, #17919
177n/a self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
178n/a self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
179n/a self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
180n/a self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
181n/a
182n/a @unittest.skipUnless(threading, 'Threading required for this test.')
183n/a @reap_threads
184n/a def test_threaded_poll(self):
185n/a r, w = os.pipe()
186n/a self.addCleanup(os.close, r)
187n/a self.addCleanup(os.close, w)
188n/a rfds = []
189n/a for i in range(10):
190n/a fd = os.dup(r)
191n/a self.addCleanup(os.close, fd)
192n/a rfds.append(fd)
193n/a pollster = select.poll()
194n/a for fd in rfds:
195n/a pollster.register(fd, select.POLLIN)
196n/a
197n/a t = threading.Thread(target=pollster.poll)
198n/a t.start()
199n/a try:
200n/a time.sleep(0.5)
201n/a # trigger ufds array reallocation
202n/a for fd in rfds:
203n/a pollster.unregister(fd)
204n/a pollster.register(w, select.POLLOUT)
205n/a self.assertRaises(RuntimeError, pollster.poll)
206n/a finally:
207n/a # and make the call to poll() from the thread return
208n/a os.write(w, b'spam')
209n/a t.join()
210n/a
211n/a
212n/adef test_main():
213n/a run_unittest(PollTests)
214n/a
215n/aif __name__ == '__main__':
216n/a test_main()