ยปCore Development>Code coverage>Lib/test/test_signal.py

Python code coverage for Lib/test/test_signal.py

#countcontent
1n/aimport unittest
2n/afrom test import support
3n/afrom contextlib import closing
4n/aimport select
5n/aimport signal
6n/aimport socket
7n/aimport struct
8n/aimport subprocess
9n/aimport sys, os, time, errno
10n/afrom test.support.script_helper import assert_python_ok, spawn_python
11n/atry:
12n/a import threading
13n/aexcept ImportError:
14n/a threading = None
15n/atry:
16n/a import _testcapi
17n/aexcept ImportError:
18n/a _testcapi = None
19n/a
20n/a
21n/aclass GenericTests(unittest.TestCase):
22n/a
23n/a @unittest.skipIf(threading is None, "test needs threading module")
24n/a def test_enums(self):
25n/a for name in dir(signal):
26n/a sig = getattr(signal, name)
27n/a if name in {'SIG_DFL', 'SIG_IGN'}:
28n/a self.assertIsInstance(sig, signal.Handlers)
29n/a elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
30n/a self.assertIsInstance(sig, signal.Sigmasks)
31n/a elif name.startswith('SIG') and not name.startswith('SIG_'):
32n/a self.assertIsInstance(sig, signal.Signals)
33n/a elif name.startswith('CTRL_'):
34n/a self.assertIsInstance(sig, signal.Signals)
35n/a self.assertEqual(sys.platform, "win32")
36n/a
37n/a
38n/a@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
39n/aclass PosixTests(unittest.TestCase):
40n/a def trivial_signal_handler(self, *args):
41n/a pass
42n/a
43n/a def test_out_of_range_signal_number_raises_error(self):
44n/a self.assertRaises(ValueError, signal.getsignal, 4242)
45n/a
46n/a self.assertRaises(ValueError, signal.signal, 4242,
47n/a self.trivial_signal_handler)
48n/a
49n/a def test_setting_signal_handler_to_none_raises_error(self):
50n/a self.assertRaises(TypeError, signal.signal,
51n/a signal.SIGUSR1, None)
52n/a
53n/a def test_getsignal(self):
54n/a hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
55n/a self.assertIsInstance(hup, signal.Handlers)
56n/a self.assertEqual(signal.getsignal(signal.SIGHUP),
57n/a self.trivial_signal_handler)
58n/a signal.signal(signal.SIGHUP, hup)
59n/a self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
60n/a
61n/a # Issue 3864, unknown if this affects earlier versions of freebsd also
62n/a @unittest.skipIf(sys.platform=='freebsd6',
63n/a 'inter process signals not reliable (do not mix well with threading) '
64n/a 'on freebsd6')
65n/a def test_interprocess_signal(self):
66n/a dirname = os.path.dirname(__file__)
67n/a script = os.path.join(dirname, 'signalinterproctester.py')
68n/a assert_python_ok(script)
69n/a
70n/a
71n/a@unittest.skipUnless(sys.platform == "win32", "Windows specific")
72n/aclass WindowsSignalTests(unittest.TestCase):
73n/a def test_issue9324(self):
74n/a # Updated for issue #10003, adding SIGBREAK
75n/a handler = lambda x, y: None
76n/a checked = set()
77n/a for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
78n/a signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
79n/a signal.SIGTERM):
80n/a # Set and then reset a handler for signals that work on windows.
81n/a # Issue #18396, only for signals without a C-level handler.
82n/a if signal.getsignal(sig) is not None:
83n/a signal.signal(sig, signal.signal(sig, handler))
84n/a checked.add(sig)
85n/a # Issue #18396: Ensure the above loop at least tested *something*
86n/a self.assertTrue(checked)
87n/a
88n/a with self.assertRaises(ValueError):
89n/a signal.signal(-1, handler)
90n/a
91n/a with self.assertRaises(ValueError):
92n/a signal.signal(7, handler)
93n/a
94n/a
95n/aclass WakeupFDTests(unittest.TestCase):
96n/a
97n/a def test_invalid_fd(self):
98n/a fd = support.make_bad_fd()
99n/a self.assertRaises((ValueError, OSError),
100n/a signal.set_wakeup_fd, fd)
101n/a
102n/a def test_invalid_socket(self):
103n/a sock = socket.socket()
104n/a fd = sock.fileno()
105n/a sock.close()
106n/a self.assertRaises((ValueError, OSError),
107n/a signal.set_wakeup_fd, fd)
108n/a
109n/a def test_set_wakeup_fd_result(self):
110n/a r1, w1 = os.pipe()
111n/a self.addCleanup(os.close, r1)
112n/a self.addCleanup(os.close, w1)
113n/a r2, w2 = os.pipe()
114n/a self.addCleanup(os.close, r2)
115n/a self.addCleanup(os.close, w2)
116n/a
117n/a if hasattr(os, 'set_blocking'):
118n/a os.set_blocking(w1, False)
119n/a os.set_blocking(w2, False)
120n/a
121n/a signal.set_wakeup_fd(w1)
122n/a self.assertEqual(signal.set_wakeup_fd(w2), w1)
123n/a self.assertEqual(signal.set_wakeup_fd(-1), w2)
124n/a self.assertEqual(signal.set_wakeup_fd(-1), -1)
125n/a
126n/a def test_set_wakeup_fd_socket_result(self):
127n/a sock1 = socket.socket()
128n/a self.addCleanup(sock1.close)
129n/a sock1.setblocking(False)
130n/a fd1 = sock1.fileno()
131n/a
132n/a sock2 = socket.socket()
133n/a self.addCleanup(sock2.close)
134n/a sock2.setblocking(False)
135n/a fd2 = sock2.fileno()
136n/a
137n/a signal.set_wakeup_fd(fd1)
138n/a self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
139n/a self.assertEqual(signal.set_wakeup_fd(-1), fd2)
140n/a self.assertEqual(signal.set_wakeup_fd(-1), -1)
141n/a
142n/a # On Windows, files are always blocking and Windows does not provide a
143n/a # function to test if a socket is in non-blocking mode.
144n/a @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
145n/a def test_set_wakeup_fd_blocking(self):
146n/a rfd, wfd = os.pipe()
147n/a self.addCleanup(os.close, rfd)
148n/a self.addCleanup(os.close, wfd)
149n/a
150n/a # fd must be non-blocking
151n/a os.set_blocking(wfd, True)
152n/a with self.assertRaises(ValueError) as cm:
153n/a signal.set_wakeup_fd(wfd)
154n/a self.assertEqual(str(cm.exception),
155n/a "the fd %s must be in non-blocking mode" % wfd)
156n/a
157n/a # non-blocking is ok
158n/a os.set_blocking(wfd, False)
159n/a signal.set_wakeup_fd(wfd)
160n/a signal.set_wakeup_fd(-1)
161n/a
162n/a
163n/a@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
164n/aclass WakeupSignalTests(unittest.TestCase):
165n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
166n/a def check_wakeup(self, test_body, *signals, ordered=True):
167n/a # use a subprocess to have only one thread
168n/a code = """if 1:
169n/a import _testcapi
170n/a import os
171n/a import signal
172n/a import struct
173n/a
174n/a signals = {!r}
175n/a
176n/a def handler(signum, frame):
177n/a pass
178n/a
179n/a def check_signum(signals):
180n/a data = os.read(read, len(signals)+1)
181n/a raised = struct.unpack('%uB' % len(data), data)
182n/a if not {!r}:
183n/a raised = set(raised)
184n/a signals = set(signals)
185n/a if raised != signals:
186n/a raise Exception("%r != %r" % (raised, signals))
187n/a
188n/a {}
189n/a
190n/a signal.signal(signal.SIGALRM, handler)
191n/a read, write = os.pipe()
192n/a os.set_blocking(write, False)
193n/a signal.set_wakeup_fd(write)
194n/a
195n/a test()
196n/a check_signum(signals)
197n/a
198n/a os.close(read)
199n/a os.close(write)
200n/a """.format(tuple(map(int, signals)), ordered, test_body)
201n/a
202n/a assert_python_ok('-c', code)
203n/a
204n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
205n/a def test_wakeup_write_error(self):
206n/a # Issue #16105: write() errors in the C signal handler should not
207n/a # pass silently.
208n/a # Use a subprocess to have only one thread.
209n/a code = """if 1:
210n/a import _testcapi
211n/a import errno
212n/a import os
213n/a import signal
214n/a import sys
215n/a from test.support import captured_stderr
216n/a
217n/a def handler(signum, frame):
218n/a 1/0
219n/a
220n/a signal.signal(signal.SIGALRM, handler)
221n/a r, w = os.pipe()
222n/a os.set_blocking(r, False)
223n/a
224n/a # Set wakeup_fd a read-only file descriptor to trigger the error
225n/a signal.set_wakeup_fd(r)
226n/a try:
227n/a with captured_stderr() as err:
228n/a _testcapi.raise_signal(signal.SIGALRM)
229n/a except ZeroDivisionError:
230n/a # An ignored exception should have been printed out on stderr
231n/a err = err.getvalue()
232n/a if ('Exception ignored when trying to write to the signal wakeup fd'
233n/a not in err):
234n/a raise AssertionError(err)
235n/a if ('OSError: [Errno %d]' % errno.EBADF) not in err:
236n/a raise AssertionError(err)
237n/a else:
238n/a raise AssertionError("ZeroDivisionError not raised")
239n/a
240n/a os.close(r)
241n/a os.close(w)
242n/a """
243n/a r, w = os.pipe()
244n/a try:
245n/a os.write(r, b'x')
246n/a except OSError:
247n/a pass
248n/a else:
249n/a self.skipTest("OS doesn't report write() error on the read end of a pipe")
250n/a finally:
251n/a os.close(r)
252n/a os.close(w)
253n/a
254n/a assert_python_ok('-c', code)
255n/a
256n/a def test_wakeup_fd_early(self):
257n/a self.check_wakeup("""def test():
258n/a import select
259n/a import time
260n/a
261n/a TIMEOUT_FULL = 10
262n/a TIMEOUT_HALF = 5
263n/a
264n/a class InterruptSelect(Exception):
265n/a pass
266n/a
267n/a def handler(signum, frame):
268n/a raise InterruptSelect
269n/a signal.signal(signal.SIGALRM, handler)
270n/a
271n/a signal.alarm(1)
272n/a
273n/a # We attempt to get a signal during the sleep,
274n/a # before select is called
275n/a try:
276n/a select.select([], [], [], TIMEOUT_FULL)
277n/a except InterruptSelect:
278n/a pass
279n/a else:
280n/a raise Exception("select() was not interrupted")
281n/a
282n/a before_time = time.monotonic()
283n/a select.select([read], [], [], TIMEOUT_FULL)
284n/a after_time = time.monotonic()
285n/a dt = after_time - before_time
286n/a if dt >= TIMEOUT_HALF:
287n/a raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
288n/a """, signal.SIGALRM)
289n/a
290n/a def test_wakeup_fd_during(self):
291n/a self.check_wakeup("""def test():
292n/a import select
293n/a import time
294n/a
295n/a TIMEOUT_FULL = 10
296n/a TIMEOUT_HALF = 5
297n/a
298n/a class InterruptSelect(Exception):
299n/a pass
300n/a
301n/a def handler(signum, frame):
302n/a raise InterruptSelect
303n/a signal.signal(signal.SIGALRM, handler)
304n/a
305n/a signal.alarm(1)
306n/a before_time = time.monotonic()
307n/a # We attempt to get a signal during the select call
308n/a try:
309n/a select.select([read], [], [], TIMEOUT_FULL)
310n/a except InterruptSelect:
311n/a pass
312n/a else:
313n/a raise Exception("select() was not interrupted")
314n/a after_time = time.monotonic()
315n/a dt = after_time - before_time
316n/a if dt >= TIMEOUT_HALF:
317n/a raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
318n/a """, signal.SIGALRM)
319n/a
320n/a def test_signum(self):
321n/a self.check_wakeup("""def test():
322n/a import _testcapi
323n/a signal.signal(signal.SIGUSR1, handler)
324n/a _testcapi.raise_signal(signal.SIGUSR1)
325n/a _testcapi.raise_signal(signal.SIGALRM)
326n/a """, signal.SIGUSR1, signal.SIGALRM)
327n/a
328n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
329n/a 'need signal.pthread_sigmask()')
330n/a def test_pending(self):
331n/a self.check_wakeup("""def test():
332n/a signum1 = signal.SIGUSR1
333n/a signum2 = signal.SIGUSR2
334n/a
335n/a signal.signal(signum1, handler)
336n/a signal.signal(signum2, handler)
337n/a
338n/a signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
339n/a _testcapi.raise_signal(signum1)
340n/a _testcapi.raise_signal(signum2)
341n/a # Unblocking the 2 signals calls the C signal handler twice
342n/a signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
343n/a """, signal.SIGUSR1, signal.SIGUSR2, ordered=False)
344n/a
345n/a
346n/a@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
347n/aclass WakeupSocketSignalTests(unittest.TestCase):
348n/a
349n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
350n/a def test_socket(self):
351n/a # use a subprocess to have only one thread
352n/a code = """if 1:
353n/a import signal
354n/a import socket
355n/a import struct
356n/a import _testcapi
357n/a
358n/a signum = signal.SIGINT
359n/a signals = (signum,)
360n/a
361n/a def handler(signum, frame):
362n/a pass
363n/a
364n/a signal.signal(signum, handler)
365n/a
366n/a read, write = socket.socketpair()
367n/a read.setblocking(False)
368n/a write.setblocking(False)
369n/a signal.set_wakeup_fd(write.fileno())
370n/a
371n/a _testcapi.raise_signal(signum)
372n/a
373n/a data = read.recv(1)
374n/a if not data:
375n/a raise Exception("no signum written")
376n/a raised = struct.unpack('B', data)
377n/a if raised != signals:
378n/a raise Exception("%r != %r" % (raised, signals))
379n/a
380n/a read.close()
381n/a write.close()
382n/a """
383n/a
384n/a assert_python_ok('-c', code)
385n/a
386n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
387n/a def test_send_error(self):
388n/a # Use a subprocess to have only one thread.
389n/a if os.name == 'nt':
390n/a action = 'send'
391n/a else:
392n/a action = 'write'
393n/a code = """if 1:
394n/a import errno
395n/a import signal
396n/a import socket
397n/a import sys
398n/a import time
399n/a import _testcapi
400n/a from test.support import captured_stderr
401n/a
402n/a signum = signal.SIGINT
403n/a
404n/a def handler(signum, frame):
405n/a pass
406n/a
407n/a signal.signal(signum, handler)
408n/a
409n/a read, write = socket.socketpair()
410n/a read.setblocking(False)
411n/a write.setblocking(False)
412n/a
413n/a signal.set_wakeup_fd(write.fileno())
414n/a
415n/a # Close sockets: send() will fail
416n/a read.close()
417n/a write.close()
418n/a
419n/a with captured_stderr() as err:
420n/a _testcapi.raise_signal(signum)
421n/a
422n/a err = err.getvalue()
423n/a if ('Exception ignored when trying to {action} to the signal wakeup fd'
424n/a not in err):
425n/a raise AssertionError(err)
426n/a """.format(action=action)
427n/a assert_python_ok('-c', code)
428n/a
429n/a
430n/a@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
431n/aclass SiginterruptTest(unittest.TestCase):
432n/a
433n/a def readpipe_interrupted(self, interrupt):
434n/a """Perform a read during which a signal will arrive. Return True if the
435n/a read is interrupted by the signal and raises an exception. Return False
436n/a if it returns normally.
437n/a """
438n/a # use a subprocess to have only one thread, to have a timeout on the
439n/a # blocking read and to not touch signal handling in this process
440n/a code = """if 1:
441n/a import errno
442n/a import os
443n/a import signal
444n/a import sys
445n/a
446n/a interrupt = %r
447n/a r, w = os.pipe()
448n/a
449n/a def handler(signum, frame):
450n/a 1 / 0
451n/a
452n/a signal.signal(signal.SIGALRM, handler)
453n/a if interrupt is not None:
454n/a signal.siginterrupt(signal.SIGALRM, interrupt)
455n/a
456n/a print("ready")
457n/a sys.stdout.flush()
458n/a
459n/a # run the test twice
460n/a try:
461n/a for loop in range(2):
462n/a # send a SIGALRM in a second (during the read)
463n/a signal.alarm(1)
464n/a try:
465n/a # blocking call: read from a pipe without data
466n/a os.read(r, 1)
467n/a except ZeroDivisionError:
468n/a pass
469n/a else:
470n/a sys.exit(2)
471n/a sys.exit(3)
472n/a finally:
473n/a os.close(r)
474n/a os.close(w)
475n/a """ % (interrupt,)
476n/a with spawn_python('-c', code) as process:
477n/a try:
478n/a # wait until the child process is loaded and has started
479n/a first_line = process.stdout.readline()
480n/a
481n/a stdout, stderr = process.communicate(timeout=5.0)
482n/a except subprocess.TimeoutExpired:
483n/a process.kill()
484n/a return False
485n/a else:
486n/a stdout = first_line + stdout
487n/a exitcode = process.wait()
488n/a if exitcode not in (2, 3):
489n/a raise Exception("Child error (exit code %s): %r"
490n/a % (exitcode, stdout))
491n/a return (exitcode == 3)
492n/a
493n/a def test_without_siginterrupt(self):
494n/a # If a signal handler is installed and siginterrupt is not called
495n/a # at all, when that signal arrives, it interrupts a syscall that's in
496n/a # progress.
497n/a interrupted = self.readpipe_interrupted(None)
498n/a self.assertTrue(interrupted)
499n/a
500n/a def test_siginterrupt_on(self):
501n/a # If a signal handler is installed and siginterrupt is called with
502n/a # a true value for the second argument, when that signal arrives, it
503n/a # interrupts a syscall that's in progress.
504n/a interrupted = self.readpipe_interrupted(True)
505n/a self.assertTrue(interrupted)
506n/a
507n/a def test_siginterrupt_off(self):
508n/a # If a signal handler is installed and siginterrupt is called with
509n/a # a false value for the second argument, when that signal arrives, it
510n/a # does not interrupt a syscall that's in progress.
511n/a interrupted = self.readpipe_interrupted(False)
512n/a self.assertFalse(interrupted)
513n/a
514n/a
515n/a@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
516n/aclass ItimerTest(unittest.TestCase):
517n/a def setUp(self):
518n/a self.hndl_called = False
519n/a self.hndl_count = 0
520n/a self.itimer = None
521n/a self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
522n/a
523n/a def tearDown(self):
524n/a signal.signal(signal.SIGALRM, self.old_alarm)
525n/a if self.itimer is not None: # test_itimer_exc doesn't change this attr
526n/a # just ensure that itimer is stopped
527n/a signal.setitimer(self.itimer, 0)
528n/a
529n/a def sig_alrm(self, *args):
530n/a self.hndl_called = True
531n/a
532n/a def sig_vtalrm(self, *args):
533n/a self.hndl_called = True
534n/a
535n/a if self.hndl_count > 3:
536n/a # it shouldn't be here, because it should have been disabled.
537n/a raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
538n/a "timer.")
539n/a elif self.hndl_count == 3:
540n/a # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
541n/a signal.setitimer(signal.ITIMER_VIRTUAL, 0)
542n/a
543n/a self.hndl_count += 1
544n/a
545n/a def sig_prof(self, *args):
546n/a self.hndl_called = True
547n/a signal.setitimer(signal.ITIMER_PROF, 0)
548n/a
549n/a def test_itimer_exc(self):
550n/a # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
551n/a # defines it ?
552n/a self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
553n/a # Negative times are treated as zero on some platforms.
554n/a if 0:
555n/a self.assertRaises(signal.ItimerError,
556n/a signal.setitimer, signal.ITIMER_REAL, -1)
557n/a
558n/a def test_itimer_real(self):
559n/a self.itimer = signal.ITIMER_REAL
560n/a signal.setitimer(self.itimer, 1.0)
561n/a signal.pause()
562n/a self.assertEqual(self.hndl_called, True)
563n/a
564n/a # Issue 3864, unknown if this affects earlier versions of freebsd also
565n/a @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
566n/a 'itimer not reliable (does not mix well with threading) on some BSDs.')
567n/a def test_itimer_virtual(self):
568n/a self.itimer = signal.ITIMER_VIRTUAL
569n/a signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
570n/a signal.setitimer(self.itimer, 0.3, 0.2)
571n/a
572n/a start_time = time.monotonic()
573n/a while time.monotonic() - start_time < 60.0:
574n/a # use up some virtual time by doing real work
575n/a _ = pow(12345, 67890, 10000019)
576n/a if signal.getitimer(self.itimer) == (0.0, 0.0):
577n/a break # sig_vtalrm handler stopped this itimer
578n/a else: # Issue 8424
579n/a self.skipTest("timeout: likely cause: machine too slow or load too "
580n/a "high")
581n/a
582n/a # virtual itimer should be (0.0, 0.0) now
583n/a self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
584n/a # and the handler should have been called
585n/a self.assertEqual(self.hndl_called, True)
586n/a
587n/a # Issue 3864, unknown if this affects earlier versions of freebsd also
588n/a @unittest.skipIf(sys.platform=='freebsd6',
589n/a 'itimer not reliable (does not mix well with threading) on freebsd6')
590n/a def test_itimer_prof(self):
591n/a self.itimer = signal.ITIMER_PROF
592n/a signal.signal(signal.SIGPROF, self.sig_prof)
593n/a signal.setitimer(self.itimer, 0.2, 0.2)
594n/a
595n/a start_time = time.monotonic()
596n/a while time.monotonic() - start_time < 60.0:
597n/a # do some work
598n/a _ = pow(12345, 67890, 10000019)
599n/a if signal.getitimer(self.itimer) == (0.0, 0.0):
600n/a break # sig_prof handler stopped this itimer
601n/a else: # Issue 8424
602n/a self.skipTest("timeout: likely cause: machine too slow or load too "
603n/a "high")
604n/a
605n/a # profiling itimer should be (0.0, 0.0) now
606n/a self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
607n/a # and the handler should have been called
608n/a self.assertEqual(self.hndl_called, True)
609n/a
610n/a
611n/aclass PendingSignalsTests(unittest.TestCase):
612n/a """
613n/a Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
614n/a functions.
615n/a """
616n/a @unittest.skipUnless(hasattr(signal, 'sigpending'),
617n/a 'need signal.sigpending()')
618n/a def test_sigpending_empty(self):
619n/a self.assertEqual(signal.sigpending(), set())
620n/a
621n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
622n/a 'need signal.pthread_sigmask()')
623n/a @unittest.skipUnless(hasattr(signal, 'sigpending'),
624n/a 'need signal.sigpending()')
625n/a def test_sigpending(self):
626n/a code = """if 1:
627n/a import os
628n/a import signal
629n/a
630n/a def handler(signum, frame):
631n/a 1/0
632n/a
633n/a signum = signal.SIGUSR1
634n/a signal.signal(signum, handler)
635n/a
636n/a signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
637n/a os.kill(os.getpid(), signum)
638n/a pending = signal.sigpending()
639n/a for sig in pending:
640n/a assert isinstance(sig, signal.Signals), repr(pending)
641n/a if pending != {signum}:
642n/a raise Exception('%s != {%s}' % (pending, signum))
643n/a try:
644n/a signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
645n/a except ZeroDivisionError:
646n/a pass
647n/a else:
648n/a raise Exception("ZeroDivisionError not raised")
649n/a """
650n/a assert_python_ok('-c', code)
651n/a
652n/a @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
653n/a 'need signal.pthread_kill()')
654n/a def test_pthread_kill(self):
655n/a code = """if 1:
656n/a import signal
657n/a import threading
658n/a import sys
659n/a
660n/a signum = signal.SIGUSR1
661n/a
662n/a def handler(signum, frame):
663n/a 1/0
664n/a
665n/a signal.signal(signum, handler)
666n/a
667n/a if sys.platform == 'freebsd6':
668n/a # Issue #12392 and #12469: send a signal to the main thread
669n/a # doesn't work before the creation of the first thread on
670n/a # FreeBSD 6
671n/a def noop():
672n/a pass
673n/a thread = threading.Thread(target=noop)
674n/a thread.start()
675n/a thread.join()
676n/a
677n/a tid = threading.get_ident()
678n/a try:
679n/a signal.pthread_kill(tid, signum)
680n/a except ZeroDivisionError:
681n/a pass
682n/a else:
683n/a raise Exception("ZeroDivisionError not raised")
684n/a """
685n/a assert_python_ok('-c', code)
686n/a
687n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
688n/a 'need signal.pthread_sigmask()')
689n/a def wait_helper(self, blocked, test):
690n/a """
691n/a test: body of the "def test(signum):" function.
692n/a blocked: number of the blocked signal
693n/a """
694n/a code = '''if 1:
695n/a import signal
696n/a import sys
697n/a from signal import Signals
698n/a
699n/a def handler(signum, frame):
700n/a 1/0
701n/a
702n/a %s
703n/a
704n/a blocked = %s
705n/a signum = signal.SIGALRM
706n/a
707n/a # child: block and wait the signal
708n/a try:
709n/a signal.signal(signum, handler)
710n/a signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
711n/a
712n/a # Do the tests
713n/a test(signum)
714n/a
715n/a # The handler must not be called on unblock
716n/a try:
717n/a signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
718n/a except ZeroDivisionError:
719n/a print("the signal handler has been called",
720n/a file=sys.stderr)
721n/a sys.exit(1)
722n/a except BaseException as err:
723n/a print("error: {}".format(err), file=sys.stderr)
724n/a sys.stderr.flush()
725n/a sys.exit(1)
726n/a ''' % (test.strip(), blocked)
727n/a
728n/a # sig*wait* must be called with the signal blocked: since the current
729n/a # process might have several threads running, use a subprocess to have
730n/a # a single thread.
731n/a assert_python_ok('-c', code)
732n/a
733n/a @unittest.skipUnless(hasattr(signal, 'sigwait'),
734n/a 'need signal.sigwait()')
735n/a def test_sigwait(self):
736n/a self.wait_helper(signal.SIGALRM, '''
737n/a def test(signum):
738n/a signal.alarm(1)
739n/a received = signal.sigwait([signum])
740n/a assert isinstance(received, signal.Signals), received
741n/a if received != signum:
742n/a raise Exception('received %s, not %s' % (received, signum))
743n/a ''')
744n/a
745n/a @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
746n/a 'need signal.sigwaitinfo()')
747n/a def test_sigwaitinfo(self):
748n/a self.wait_helper(signal.SIGALRM, '''
749n/a def test(signum):
750n/a signal.alarm(1)
751n/a info = signal.sigwaitinfo([signum])
752n/a if info.si_signo != signum:
753n/a raise Exception("info.si_signo != %s" % signum)
754n/a ''')
755n/a
756n/a @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
757n/a 'need signal.sigtimedwait()')
758n/a def test_sigtimedwait(self):
759n/a self.wait_helper(signal.SIGALRM, '''
760n/a def test(signum):
761n/a signal.alarm(1)
762n/a info = signal.sigtimedwait([signum], 10.1000)
763n/a if info.si_signo != signum:
764n/a raise Exception('info.si_signo != %s' % signum)
765n/a ''')
766n/a
767n/a @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
768n/a 'need signal.sigtimedwait()')
769n/a def test_sigtimedwait_poll(self):
770n/a # check that polling with sigtimedwait works
771n/a self.wait_helper(signal.SIGALRM, '''
772n/a def test(signum):
773n/a import os
774n/a os.kill(os.getpid(), signum)
775n/a info = signal.sigtimedwait([signum], 0)
776n/a if info.si_signo != signum:
777n/a raise Exception('info.si_signo != %s' % signum)
778n/a ''')
779n/a
780n/a @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
781n/a 'need signal.sigtimedwait()')
782n/a def test_sigtimedwait_timeout(self):
783n/a self.wait_helper(signal.SIGALRM, '''
784n/a def test(signum):
785n/a received = signal.sigtimedwait([signum], 1.0)
786n/a if received is not None:
787n/a raise Exception("received=%r" % (received,))
788n/a ''')
789n/a
790n/a @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
791n/a 'need signal.sigtimedwait()')
792n/a def test_sigtimedwait_negative_timeout(self):
793n/a signum = signal.SIGALRM
794n/a self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
795n/a
796n/a @unittest.skipUnless(hasattr(signal, 'sigwait'),
797n/a 'need signal.sigwait()')
798n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
799n/a 'need signal.pthread_sigmask()')
800n/a @unittest.skipIf(threading is None, "test needs threading module")
801n/a def test_sigwait_thread(self):
802n/a # Check that calling sigwait() from a thread doesn't suspend the whole
803n/a # process. A new interpreter is spawned to avoid problems when mixing
804n/a # threads and fork(): only async-safe functions are allowed between
805n/a # fork() and exec().
806n/a assert_python_ok("-c", """if True:
807n/a import os, threading, sys, time, signal
808n/a
809n/a # the default handler terminates the process
810n/a signum = signal.SIGUSR1
811n/a
812n/a def kill_later():
813n/a # wait until the main thread is waiting in sigwait()
814n/a time.sleep(1)
815n/a os.kill(os.getpid(), signum)
816n/a
817n/a # the signal must be blocked by all the threads
818n/a signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
819n/a killer = threading.Thread(target=kill_later)
820n/a killer.start()
821n/a received = signal.sigwait([signum])
822n/a if received != signum:
823n/a print("sigwait() received %s, not %s" % (received, signum),
824n/a file=sys.stderr)
825n/a sys.exit(1)
826n/a killer.join()
827n/a # unblock the signal, which should have been cleared by sigwait()
828n/a signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
829n/a """)
830n/a
831n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
832n/a 'need signal.pthread_sigmask()')
833n/a def test_pthread_sigmask_arguments(self):
834n/a self.assertRaises(TypeError, signal.pthread_sigmask)
835n/a self.assertRaises(TypeError, signal.pthread_sigmask, 1)
836n/a self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
837n/a self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
838n/a
839n/a @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
840n/a 'need signal.pthread_sigmask()')
841n/a def test_pthread_sigmask(self):
842n/a code = """if 1:
843n/a import signal
844n/a import os; import threading
845n/a
846n/a def handler(signum, frame):
847n/a 1/0
848n/a
849n/a def kill(signum):
850n/a os.kill(os.getpid(), signum)
851n/a
852n/a def check_mask(mask):
853n/a for sig in mask:
854n/a assert isinstance(sig, signal.Signals), repr(sig)
855n/a
856n/a def read_sigmask():
857n/a sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
858n/a check_mask(sigmask)
859n/a return sigmask
860n/a
861n/a signum = signal.SIGUSR1
862n/a
863n/a # Install our signal handler
864n/a old_handler = signal.signal(signum, handler)
865n/a
866n/a # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
867n/a old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
868n/a check_mask(old_mask)
869n/a try:
870n/a kill(signum)
871n/a except ZeroDivisionError:
872n/a pass
873n/a else:
874n/a raise Exception("ZeroDivisionError not raised")
875n/a
876n/a # Block and then raise SIGUSR1. The signal is blocked: the signal
877n/a # handler is not called, and the signal is now pending
878n/a mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
879n/a check_mask(mask)
880n/a kill(signum)
881n/a
882n/a # Check the new mask
883n/a blocked = read_sigmask()
884n/a check_mask(blocked)
885n/a if signum not in blocked:
886n/a raise Exception("%s not in %s" % (signum, blocked))
887n/a if old_mask ^ blocked != {signum}:
888n/a raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
889n/a
890n/a # Unblock SIGUSR1
891n/a try:
892n/a # unblock the pending signal calls immediately the signal handler
893n/a signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
894n/a except ZeroDivisionError:
895n/a pass
896n/a else:
897n/a raise Exception("ZeroDivisionError not raised")
898n/a try:
899n/a kill(signum)
900n/a except ZeroDivisionError:
901n/a pass
902n/a else:
903n/a raise Exception("ZeroDivisionError not raised")
904n/a
905n/a # Check the new mask
906n/a unblocked = read_sigmask()
907n/a if signum in unblocked:
908n/a raise Exception("%s in %s" % (signum, unblocked))
909n/a if blocked ^ unblocked != {signum}:
910n/a raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
911n/a if old_mask != unblocked:
912n/a raise Exception("%s != %s" % (old_mask, unblocked))
913n/a """
914n/a assert_python_ok('-c', code)
915n/a
916n/a @unittest.skipIf(sys.platform == 'freebsd6',
917n/a "issue #12392: send a signal to the main thread doesn't work "
918n/a "before the creation of the first thread on FreeBSD 6")
919n/a @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
920n/a 'need signal.pthread_kill()')
921n/a def test_pthread_kill_main_thread(self):
922n/a # Test that a signal can be sent to the main thread with pthread_kill()
923n/a # before any other thread has been created (see issue #12392).
924n/a code = """if True:
925n/a import threading
926n/a import signal
927n/a import sys
928n/a
929n/a def handler(signum, frame):
930n/a sys.exit(3)
931n/a
932n/a signal.signal(signal.SIGUSR1, handler)
933n/a signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
934n/a sys.exit(2)
935n/a """
936n/a
937n/a with spawn_python('-c', code) as process:
938n/a stdout, stderr = process.communicate()
939n/a exitcode = process.wait()
940n/a if exitcode != 3:
941n/a raise Exception("Child error (exit code %s): %s" %
942n/a (exitcode, stdout))
943n/a
944n/a
945n/adef tearDownModule():
946n/a support.reap_children()
947n/a
948n/aif __name__ == "__main__":
949n/a unittest.main()