ยปCore Development>Code coverage>Lib/test/test_selectors.py

Python code coverage for Lib/test/test_selectors.py

#countcontent
1n/aimport errno
2n/aimport os
3n/aimport random
4n/aimport selectors
5n/aimport signal
6n/aimport socket
7n/aimport sys
8n/afrom test import support
9n/afrom time import sleep
10n/aimport unittest
11n/aimport unittest.mock
12n/aimport tempfile
13n/afrom time import monotonic as time
14n/atry:
15n/a import resource
16n/aexcept ImportError:
17n/a resource = None
18n/a
19n/a
20n/aif hasattr(socket, 'socketpair'):
21n/a socketpair = socket.socketpair
22n/aelse:
23n/a def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
24n/a with socket.socket(family, type, proto) as l:
25n/a l.bind((support.HOST, 0))
26n/a l.listen()
27n/a c = socket.socket(family, type, proto)
28n/a try:
29n/a c.connect(l.getsockname())
30n/a caddr = c.getsockname()
31n/a while True:
32n/a a, addr = l.accept()
33n/a # check that we've got the correct client
34n/a if addr == caddr:
35n/a return c, a
36n/a a.close()
37n/a except OSError:
38n/a c.close()
39n/a raise
40n/a
41n/a
42n/adef find_ready_matching(ready, flag):
43n/a match = []
44n/a for key, events in ready:
45n/a if events & flag:
46n/a match.append(key.fileobj)
47n/a return match
48n/a
49n/a
50n/aclass BaseSelectorTestCase(unittest.TestCase):
51n/a
52n/a def make_socketpair(self):
53n/a rd, wr = socketpair()
54n/a self.addCleanup(rd.close)
55n/a self.addCleanup(wr.close)
56n/a return rd, wr
57n/a
58n/a def test_register(self):
59n/a s = self.SELECTOR()
60n/a self.addCleanup(s.close)
61n/a
62n/a rd, wr = self.make_socketpair()
63n/a
64n/a key = s.register(rd, selectors.EVENT_READ, "data")
65n/a self.assertIsInstance(key, selectors.SelectorKey)
66n/a self.assertEqual(key.fileobj, rd)
67n/a self.assertEqual(key.fd, rd.fileno())
68n/a self.assertEqual(key.events, selectors.EVENT_READ)
69n/a self.assertEqual(key.data, "data")
70n/a
71n/a # register an unknown event
72n/a self.assertRaises(ValueError, s.register, 0, 999999)
73n/a
74n/a # register an invalid FD
75n/a self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
76n/a
77n/a # register twice
78n/a self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
79n/a
80n/a # register the same FD, but with a different object
81n/a self.assertRaises(KeyError, s.register, rd.fileno(),
82n/a selectors.EVENT_READ)
83n/a
84n/a def test_unregister(self):
85n/a s = self.SELECTOR()
86n/a self.addCleanup(s.close)
87n/a
88n/a rd, wr = self.make_socketpair()
89n/a
90n/a s.register(rd, selectors.EVENT_READ)
91n/a s.unregister(rd)
92n/a
93n/a # unregister an unknown file obj
94n/a self.assertRaises(KeyError, s.unregister, 999999)
95n/a
96n/a # unregister twice
97n/a self.assertRaises(KeyError, s.unregister, rd)
98n/a
99n/a def test_unregister_after_fd_close(self):
100n/a s = self.SELECTOR()
101n/a self.addCleanup(s.close)
102n/a rd, wr = self.make_socketpair()
103n/a r, w = rd.fileno(), wr.fileno()
104n/a s.register(r, selectors.EVENT_READ)
105n/a s.register(w, selectors.EVENT_WRITE)
106n/a rd.close()
107n/a wr.close()
108n/a s.unregister(r)
109n/a s.unregister(w)
110n/a
111n/a @unittest.skipUnless(os.name == 'posix', "requires posix")
112n/a def test_unregister_after_fd_close_and_reuse(self):
113n/a s = self.SELECTOR()
114n/a self.addCleanup(s.close)
115n/a rd, wr = self.make_socketpair()
116n/a r, w = rd.fileno(), wr.fileno()
117n/a s.register(r, selectors.EVENT_READ)
118n/a s.register(w, selectors.EVENT_WRITE)
119n/a rd2, wr2 = self.make_socketpair()
120n/a rd.close()
121n/a wr.close()
122n/a os.dup2(rd2.fileno(), r)
123n/a os.dup2(wr2.fileno(), w)
124n/a self.addCleanup(os.close, r)
125n/a self.addCleanup(os.close, w)
126n/a s.unregister(r)
127n/a s.unregister(w)
128n/a
129n/a def test_unregister_after_socket_close(self):
130n/a s = self.SELECTOR()
131n/a self.addCleanup(s.close)
132n/a rd, wr = self.make_socketpair()
133n/a s.register(rd, selectors.EVENT_READ)
134n/a s.register(wr, selectors.EVENT_WRITE)
135n/a rd.close()
136n/a wr.close()
137n/a s.unregister(rd)
138n/a s.unregister(wr)
139n/a
140n/a def test_modify(self):
141n/a s = self.SELECTOR()
142n/a self.addCleanup(s.close)
143n/a
144n/a rd, wr = self.make_socketpair()
145n/a
146n/a key = s.register(rd, selectors.EVENT_READ)
147n/a
148n/a # modify events
149n/a key2 = s.modify(rd, selectors.EVENT_WRITE)
150n/a self.assertNotEqual(key.events, key2.events)
151n/a self.assertEqual(key2, s.get_key(rd))
152n/a
153n/a s.unregister(rd)
154n/a
155n/a # modify data
156n/a d1 = object()
157n/a d2 = object()
158n/a
159n/a key = s.register(rd, selectors.EVENT_READ, d1)
160n/a key2 = s.modify(rd, selectors.EVENT_READ, d2)
161n/a self.assertEqual(key.events, key2.events)
162n/a self.assertNotEqual(key.data, key2.data)
163n/a self.assertEqual(key2, s.get_key(rd))
164n/a self.assertEqual(key2.data, d2)
165n/a
166n/a # modify unknown file obj
167n/a self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
168n/a
169n/a # modify use a shortcut
170n/a d3 = object()
171n/a s.register = unittest.mock.Mock()
172n/a s.unregister = unittest.mock.Mock()
173n/a
174n/a s.modify(rd, selectors.EVENT_READ, d3)
175n/a self.assertFalse(s.register.called)
176n/a self.assertFalse(s.unregister.called)
177n/a
178n/a def test_close(self):
179n/a s = self.SELECTOR()
180n/a self.addCleanup(s.close)
181n/a
182n/a mapping = s.get_map()
183n/a rd, wr = self.make_socketpair()
184n/a
185n/a s.register(rd, selectors.EVENT_READ)
186n/a s.register(wr, selectors.EVENT_WRITE)
187n/a
188n/a s.close()
189n/a self.assertRaises(RuntimeError, s.get_key, rd)
190n/a self.assertRaises(RuntimeError, s.get_key, wr)
191n/a self.assertRaises(KeyError, mapping.__getitem__, rd)
192n/a self.assertRaises(KeyError, mapping.__getitem__, wr)
193n/a
194n/a def test_get_key(self):
195n/a s = self.SELECTOR()
196n/a self.addCleanup(s.close)
197n/a
198n/a rd, wr = self.make_socketpair()
199n/a
200n/a key = s.register(rd, selectors.EVENT_READ, "data")
201n/a self.assertEqual(key, s.get_key(rd))
202n/a
203n/a # unknown file obj
204n/a self.assertRaises(KeyError, s.get_key, 999999)
205n/a
206n/a def test_get_map(self):
207n/a s = self.SELECTOR()
208n/a self.addCleanup(s.close)
209n/a
210n/a rd, wr = self.make_socketpair()
211n/a
212n/a keys = s.get_map()
213n/a self.assertFalse(keys)
214n/a self.assertEqual(len(keys), 0)
215n/a self.assertEqual(list(keys), [])
216n/a key = s.register(rd, selectors.EVENT_READ, "data")
217n/a self.assertIn(rd, keys)
218n/a self.assertEqual(key, keys[rd])
219n/a self.assertEqual(len(keys), 1)
220n/a self.assertEqual(list(keys), [rd.fileno()])
221n/a self.assertEqual(list(keys.values()), [key])
222n/a
223n/a # unknown file obj
224n/a with self.assertRaises(KeyError):
225n/a keys[999999]
226n/a
227n/a # Read-only mapping
228n/a with self.assertRaises(TypeError):
229n/a del keys[rd]
230n/a
231n/a def test_select(self):
232n/a s = self.SELECTOR()
233n/a self.addCleanup(s.close)
234n/a
235n/a rd, wr = self.make_socketpair()
236n/a
237n/a s.register(rd, selectors.EVENT_READ)
238n/a wr_key = s.register(wr, selectors.EVENT_WRITE)
239n/a
240n/a result = s.select()
241n/a for key, events in result:
242n/a self.assertTrue(isinstance(key, selectors.SelectorKey))
243n/a self.assertTrue(events)
244n/a self.assertFalse(events & ~(selectors.EVENT_READ |
245n/a selectors.EVENT_WRITE))
246n/a
247n/a self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
248n/a
249n/a def test_context_manager(self):
250n/a s = self.SELECTOR()
251n/a self.addCleanup(s.close)
252n/a
253n/a rd, wr = self.make_socketpair()
254n/a
255n/a with s as sel:
256n/a sel.register(rd, selectors.EVENT_READ)
257n/a sel.register(wr, selectors.EVENT_WRITE)
258n/a
259n/a self.assertRaises(RuntimeError, s.get_key, rd)
260n/a self.assertRaises(RuntimeError, s.get_key, wr)
261n/a
262n/a def test_fileno(self):
263n/a s = self.SELECTOR()
264n/a self.addCleanup(s.close)
265n/a
266n/a if hasattr(s, 'fileno'):
267n/a fd = s.fileno()
268n/a self.assertTrue(isinstance(fd, int))
269n/a self.assertGreaterEqual(fd, 0)
270n/a
271n/a def test_selector(self):
272n/a s = self.SELECTOR()
273n/a self.addCleanup(s.close)
274n/a
275n/a NUM_SOCKETS = 12
276n/a MSG = b" This is a test."
277n/a MSG_LEN = len(MSG)
278n/a readers = []
279n/a writers = []
280n/a r2w = {}
281n/a w2r = {}
282n/a
283n/a for i in range(NUM_SOCKETS):
284n/a rd, wr = self.make_socketpair()
285n/a s.register(rd, selectors.EVENT_READ)
286n/a s.register(wr, selectors.EVENT_WRITE)
287n/a readers.append(rd)
288n/a writers.append(wr)
289n/a r2w[rd] = wr
290n/a w2r[wr] = rd
291n/a
292n/a bufs = []
293n/a
294n/a while writers:
295n/a ready = s.select()
296n/a ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
297n/a if not ready_writers:
298n/a self.fail("no sockets ready for writing")
299n/a wr = random.choice(ready_writers)
300n/a wr.send(MSG)
301n/a
302n/a for i in range(10):
303n/a ready = s.select()
304n/a ready_readers = find_ready_matching(ready,
305n/a selectors.EVENT_READ)
306n/a if ready_readers:
307n/a break
308n/a # there might be a delay between the write to the write end and
309n/a # the read end is reported ready
310n/a sleep(0.1)
311n/a else:
312n/a self.fail("no sockets ready for reading")
313n/a self.assertEqual([w2r[wr]], ready_readers)
314n/a rd = ready_readers[0]
315n/a buf = rd.recv(MSG_LEN)
316n/a self.assertEqual(len(buf), MSG_LEN)
317n/a bufs.append(buf)
318n/a s.unregister(r2w[rd])
319n/a s.unregister(rd)
320n/a writers.remove(r2w[rd])
321n/a
322n/a self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
323n/a
324n/a @unittest.skipIf(sys.platform == 'win32',
325n/a 'select.select() cannot be used with empty fd sets')
326n/a def test_empty_select(self):
327n/a # Issue #23009: Make sure EpollSelector.select() works when no FD is
328n/a # registered.
329n/a s = self.SELECTOR()
330n/a self.addCleanup(s.close)
331n/a self.assertEqual(s.select(timeout=0), [])
332n/a
333n/a def test_timeout(self):
334n/a s = self.SELECTOR()
335n/a self.addCleanup(s.close)
336n/a
337n/a rd, wr = self.make_socketpair()
338n/a
339n/a s.register(wr, selectors.EVENT_WRITE)
340n/a t = time()
341n/a self.assertEqual(1, len(s.select(0)))
342n/a self.assertEqual(1, len(s.select(-1)))
343n/a self.assertLess(time() - t, 0.5)
344n/a
345n/a s.unregister(wr)
346n/a s.register(rd, selectors.EVENT_READ)
347n/a t = time()
348n/a self.assertFalse(s.select(0))
349n/a self.assertFalse(s.select(-1))
350n/a self.assertLess(time() - t, 0.5)
351n/a
352n/a t0 = time()
353n/a self.assertFalse(s.select(1))
354n/a t1 = time()
355n/a dt = t1 - t0
356n/a # Tolerate 2.0 seconds for very slow buildbots
357n/a self.assertTrue(0.8 <= dt <= 2.0, dt)
358n/a
359n/a @unittest.skipUnless(hasattr(signal, "alarm"),
360n/a "signal.alarm() required for this test")
361n/a def test_select_interrupt_exc(self):
362n/a s = self.SELECTOR()
363n/a self.addCleanup(s.close)
364n/a
365n/a rd, wr = self.make_socketpair()
366n/a
367n/a class InterruptSelect(Exception):
368n/a pass
369n/a
370n/a def handler(*args):
371n/a raise InterruptSelect
372n/a
373n/a orig_alrm_handler = signal.signal(signal.SIGALRM, handler)
374n/a self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
375n/a self.addCleanup(signal.alarm, 0)
376n/a
377n/a signal.alarm(1)
378n/a
379n/a s.register(rd, selectors.EVENT_READ)
380n/a t = time()
381n/a # select() is interrupted by a signal which raises an exception
382n/a with self.assertRaises(InterruptSelect):
383n/a s.select(30)
384n/a # select() was interrupted before the timeout of 30 seconds
385n/a self.assertLess(time() - t, 5.0)
386n/a
387n/a @unittest.skipUnless(hasattr(signal, "alarm"),
388n/a "signal.alarm() required for this test")
389n/a def test_select_interrupt_noraise(self):
390n/a s = self.SELECTOR()
391n/a self.addCleanup(s.close)
392n/a
393n/a rd, wr = self.make_socketpair()
394n/a
395n/a orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
396n/a self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
397n/a self.addCleanup(signal.alarm, 0)
398n/a
399n/a signal.alarm(1)
400n/a
401n/a s.register(rd, selectors.EVENT_READ)
402n/a t = time()
403n/a # select() is interrupted by a signal, but the signal handler doesn't
404n/a # raise an exception, so select() should by retries with a recomputed
405n/a # timeout
406n/a self.assertFalse(s.select(1.5))
407n/a self.assertGreaterEqual(time() - t, 1.0)
408n/a
409n/a
410n/aclass ScalableSelectorMixIn:
411n/a
412n/a # see issue #18963 for why it's skipped on older OS X versions
413n/a @support.requires_mac_ver(10, 5)
414n/a @unittest.skipUnless(resource, "Test needs resource module")
415n/a def test_above_fd_setsize(self):
416n/a # A scalable implementation should have no problem with more than
417n/a # FD_SETSIZE file descriptors. Since we don't know the value, we just
418n/a # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
419n/a soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
420n/a try:
421n/a resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
422n/a self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
423n/a (soft, hard))
424n/a NUM_FDS = min(hard, 2**16)
425n/a except (OSError, ValueError):
426n/a NUM_FDS = soft
427n/a
428n/a # guard for already allocated FDs (stdin, stdout...)
429n/a NUM_FDS -= 32
430n/a
431n/a s = self.SELECTOR()
432n/a self.addCleanup(s.close)
433n/a
434n/a for i in range(NUM_FDS // 2):
435n/a try:
436n/a rd, wr = self.make_socketpair()
437n/a except OSError:
438n/a # too many FDs, skip - note that we should only catch EMFILE
439n/a # here, but apparently *BSD and Solaris can fail upon connect()
440n/a # or bind() with EADDRNOTAVAIL, so let's be safe
441n/a self.skipTest("FD limit reached")
442n/a
443n/a try:
444n/a s.register(rd, selectors.EVENT_READ)
445n/a s.register(wr, selectors.EVENT_WRITE)
446n/a except OSError as e:
447n/a if e.errno == errno.ENOSPC:
448n/a # this can be raised by epoll if we go over
449n/a # fs.epoll.max_user_watches sysctl
450n/a self.skipTest("FD limit reached")
451n/a raise
452n/a
453n/a self.assertEqual(NUM_FDS // 2, len(s.select()))
454n/a
455n/a
456n/aclass DefaultSelectorTestCase(BaseSelectorTestCase):
457n/a
458n/a SELECTOR = selectors.DefaultSelector
459n/a
460n/a
461n/aclass SelectSelectorTestCase(BaseSelectorTestCase):
462n/a
463n/a SELECTOR = selectors.SelectSelector
464n/a
465n/a
466n/a@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
467n/a "Test needs selectors.PollSelector")
468n/aclass PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
469n/a
470n/a SELECTOR = getattr(selectors, 'PollSelector', None)
471n/a
472n/a
473n/a@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
474n/a "Test needs selectors.EpollSelector")
475n/aclass EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
476n/a
477n/a SELECTOR = getattr(selectors, 'EpollSelector', None)
478n/a
479n/a def test_register_file(self):
480n/a # epoll(7) returns EPERM when given a file to watch
481n/a s = self.SELECTOR()
482n/a with tempfile.NamedTemporaryFile() as f:
483n/a with self.assertRaises(IOError):
484n/a s.register(f, selectors.EVENT_READ)
485n/a # the SelectorKey has been removed
486n/a with self.assertRaises(KeyError):
487n/a s.get_key(f)
488n/a
489n/a
490n/a@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
491n/a "Test needs selectors.KqueueSelector)")
492n/aclass KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
493n/a
494n/a SELECTOR = getattr(selectors, 'KqueueSelector', None)
495n/a
496n/a def test_register_bad_fd(self):
497n/a # a file descriptor that's been closed should raise an OSError
498n/a # with EBADF
499n/a s = self.SELECTOR()
500n/a bad_f = support.make_bad_fd()
501n/a with self.assertRaises(OSError) as cm:
502n/a s.register(bad_f, selectors.EVENT_READ)
503n/a self.assertEqual(cm.exception.errno, errno.EBADF)
504n/a # the SelectorKey has been removed
505n/a with self.assertRaises(KeyError):
506n/a s.get_key(bad_f)
507n/a
508n/a
509n/a@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
510n/a "Test needs selectors.DevpollSelector")
511n/aclass DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
512n/a
513n/a SELECTOR = getattr(selectors, 'DevpollSelector', None)
514n/a
515n/a
516n/a
517n/adef test_main():
518n/a tests = [DefaultSelectorTestCase, SelectSelectorTestCase,
519n/a PollSelectorTestCase, EpollSelectorTestCase,
520n/a KqueueSelectorTestCase, DevpollSelectorTestCase]
521n/a support.run_unittest(*tests)
522n/a support.reap_children()
523n/a
524n/a
525n/aif __name__ == "__main__":
526n/a test_main()