ยปCore Development>Code coverage>Lib/test/signalinterproctester.py

Python code coverage for Lib/test/signalinterproctester.py

#countcontent
1n/aimport os
2n/aimport signal
3n/aimport subprocess
4n/aimport sys
5n/aimport time
6n/aimport unittest
7n/a
8n/a
9n/aclass SIGUSR1Exception(Exception):
10n/a pass
11n/a
12n/a
13n/aclass InterProcessSignalTests(unittest.TestCase):
14n/a def setUp(self):
15n/a self.got_signals = {'SIGHUP': 0, 'SIGUSR1': 0, 'SIGALRM': 0}
16n/a
17n/a def sighup_handler(self, signum, frame):
18n/a self.got_signals['SIGHUP'] += 1
19n/a
20n/a def sigusr1_handler(self, signum, frame):
21n/a self.got_signals['SIGUSR1'] += 1
22n/a raise SIGUSR1Exception
23n/a
24n/a def wait_signal(self, child, signame, exc_class=None):
25n/a try:
26n/a if child is not None:
27n/a # This wait should be interrupted by exc_class
28n/a # (if set)
29n/a child.wait()
30n/a
31n/a timeout = 10.0
32n/a deadline = time.monotonic() + timeout
33n/a
34n/a while time.monotonic() < deadline:
35n/a if self.got_signals[signame]:
36n/a return
37n/a signal.pause()
38n/a except BaseException as exc:
39n/a if exc_class is not None and isinstance(exc, exc_class):
40n/a # got the expected exception
41n/a return
42n/a raise
43n/a
44n/a self.fail('signal %s not received after %s seconds'
45n/a % (signame, timeout))
46n/a
47n/a def subprocess_send_signal(self, pid, signame):
48n/a code = 'import os, signal; os.kill(%s, signal.%s)' % (pid, signame)
49n/a args = [sys.executable, '-I', '-c', code]
50n/a return subprocess.Popen(args)
51n/a
52n/a def test_interprocess_signal(self):
53n/a # Install handlers. This function runs in a sub-process, so we
54n/a # don't worry about re-setting the default handlers.
55n/a signal.signal(signal.SIGHUP, self.sighup_handler)
56n/a signal.signal(signal.SIGUSR1, self.sigusr1_handler)
57n/a signal.signal(signal.SIGUSR2, signal.SIG_IGN)
58n/a signal.signal(signal.SIGALRM, signal.default_int_handler)
59n/a
60n/a # Let the sub-processes know who to send signals to.
61n/a pid = str(os.getpid())
62n/a
63n/a with self.subprocess_send_signal(pid, "SIGHUP") as child:
64n/a self.wait_signal(child, 'SIGHUP')
65n/a self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 0,
66n/a 'SIGALRM': 0})
67n/a
68n/a with self.subprocess_send_signal(pid, "SIGUSR1") as child:
69n/a self.wait_signal(child, 'SIGUSR1', SIGUSR1Exception)
70n/a self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
71n/a 'SIGALRM': 0})
72n/a
73n/a with self.subprocess_send_signal(pid, "SIGUSR2") as child:
74n/a # Nothing should happen: SIGUSR2 is ignored
75n/a child.wait()
76n/a
77n/a signal.alarm(1)
78n/a self.wait_signal(None, 'SIGALRM', KeyboardInterrupt)
79n/a self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
80n/a 'SIGALRM': 0})
81n/a
82n/a
83n/aif __name__ == "__main__":
84n/a unittest.main()