ยปCore Development>Code coverage>Lib/test/test_threadsignals.py

Python code coverage for Lib/test/test_threadsignals.py

#countcontent
1n/a"""PyUnit testing that threads honor our signal semantics"""
2n/a
3n/aimport unittest
4n/aimport signal
5n/aimport os
6n/aimport sys
7n/afrom test.support import run_unittest, import_module
8n/athread = import_module('_thread')
9n/aimport time
10n/a
11n/aif (sys.platform[:3] == 'win'):
12n/a raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13n/a
14n/aprocess_pid = os.getpid()
15n/asignalled_all=thread.allocate_lock()
16n/a
17n/aUSING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18n/a and sys.thread_info.lock == 'mutex+cond')
19n/a
20n/adef registerSignals(for_usr1, for_usr2, for_alrm):
21n/a usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22n/a usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23n/a alrm = signal.signal(signal.SIGALRM, for_alrm)
24n/a return usr1, usr2, alrm
25n/a
26n/a
27n/a# The signal handler. Just note that the signal occurred and
28n/a# from who.
29n/adef handle_signals(sig,frame):
30n/a signal_blackboard[sig]['tripped'] += 1
31n/a signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32n/a
33n/a# a function that will be spawned as a separate thread.
34n/adef send_signals():
35n/a os.kill(process_pid, signal.SIGUSR1)
36n/a os.kill(process_pid, signal.SIGUSR2)
37n/a signalled_all.release()
38n/a
39n/aclass ThreadSignals(unittest.TestCase):
40n/a
41n/a def test_signals(self):
42n/a # Test signal handling semantics of threads.
43n/a # We spawn a thread, have the thread send two signals, and
44n/a # wait for it to finish. Check that we got both signals
45n/a # and that they were run by the main thread.
46n/a signalled_all.acquire()
47n/a self.spawnSignallingThread()
48n/a signalled_all.acquire()
49n/a # the signals that we asked the kernel to send
50n/a # will come back, but we don't know when.
51n/a # (it might even be after the thread exits
52n/a # and might be out of order.) If we haven't seen
53n/a # the signals yet, send yet another signal and
54n/a # wait for it return.
55n/a if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
56n/a or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
57n/a signal.alarm(1)
58n/a signal.pause()
59n/a signal.alarm(0)
60n/a
61n/a self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
62n/a self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
63n/a thread.get_ident())
64n/a self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
65n/a self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
66n/a thread.get_ident())
67n/a signalled_all.release()
68n/a
69n/a def spawnSignallingThread(self):
70n/a thread.start_new_thread(send_signals, ())
71n/a
72n/a def alarm_interrupt(self, sig, frame):
73n/a raise KeyboardInterrupt
74n/a
75n/a @unittest.skipIf(USING_PTHREAD_COND,
76n/a 'POSIX condition variables cannot be interrupted')
77n/a # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
78n/a @unittest.skipIf(sys.platform.startswith('openbsd'),
79n/a 'lock cannot be interrupted on OpenBSD')
80n/a def test_lock_acquire_interruption(self):
81n/a # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
82n/a # in a deadlock.
83n/a # XXX this test can fail when the legacy (non-semaphore) implementation
84n/a # of locks is used in thread_pthread.h, see issue #11223.
85n/a oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
86n/a try:
87n/a lock = thread.allocate_lock()
88n/a lock.acquire()
89n/a signal.alarm(1)
90n/a t1 = time.time()
91n/a self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
92n/a dt = time.time() - t1
93n/a # Checking that KeyboardInterrupt was raised is not sufficient.
94n/a # We want to assert that lock.acquire() was interrupted because
95n/a # of the signal, not that the signal handler was called immediately
96n/a # after timeout return of lock.acquire() (which can fool assertRaises).
97n/a self.assertLess(dt, 3.0)
98n/a finally:
99n/a signal.signal(signal.SIGALRM, oldalrm)
100n/a
101n/a @unittest.skipIf(USING_PTHREAD_COND,
102n/a 'POSIX condition variables cannot be interrupted')
103n/a # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
104n/a @unittest.skipIf(sys.platform.startswith('openbsd'),
105n/a 'lock cannot be interrupted on OpenBSD')
106n/a def test_rlock_acquire_interruption(self):
107n/a # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
108n/a # in a deadlock.
109n/a # XXX this test can fail when the legacy (non-semaphore) implementation
110n/a # of locks is used in thread_pthread.h, see issue #11223.
111n/a oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
112n/a try:
113n/a rlock = thread.RLock()
114n/a # For reentrant locks, the initial acquisition must be in another
115n/a # thread.
116n/a def other_thread():
117n/a rlock.acquire()
118n/a thread.start_new_thread(other_thread, ())
119n/a # Wait until we can't acquire it without blocking...
120n/a while rlock.acquire(blocking=False):
121n/a rlock.release()
122n/a time.sleep(0.01)
123n/a signal.alarm(1)
124n/a t1 = time.time()
125n/a self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
126n/a dt = time.time() - t1
127n/a # See rationale above in test_lock_acquire_interruption
128n/a self.assertLess(dt, 3.0)
129n/a finally:
130n/a signal.signal(signal.SIGALRM, oldalrm)
131n/a
132n/a def acquire_retries_on_intr(self, lock):
133n/a self.sig_recvd = False
134n/a def my_handler(signal, frame):
135n/a self.sig_recvd = True
136n/a old_handler = signal.signal(signal.SIGUSR1, my_handler)
137n/a try:
138n/a def other_thread():
139n/a # Acquire the lock in a non-main thread, so this test works for
140n/a # RLocks.
141n/a lock.acquire()
142n/a # Wait until the main thread is blocked in the lock acquire, and
143n/a # then wake it up with this.
144n/a time.sleep(0.5)
145n/a os.kill(process_pid, signal.SIGUSR1)
146n/a # Let the main thread take the interrupt, handle it, and retry
147n/a # the lock acquisition. Then we'll let it run.
148n/a time.sleep(0.5)
149n/a lock.release()
150n/a thread.start_new_thread(other_thread, ())
151n/a # Wait until we can't acquire it without blocking...
152n/a while lock.acquire(blocking=False):
153n/a lock.release()
154n/a time.sleep(0.01)
155n/a result = lock.acquire() # Block while we receive a signal.
156n/a self.assertTrue(self.sig_recvd)
157n/a self.assertTrue(result)
158n/a finally:
159n/a signal.signal(signal.SIGUSR1, old_handler)
160n/a
161n/a def test_lock_acquire_retries_on_intr(self):
162n/a self.acquire_retries_on_intr(thread.allocate_lock())
163n/a
164n/a def test_rlock_acquire_retries_on_intr(self):
165n/a self.acquire_retries_on_intr(thread.RLock())
166n/a
167n/a def test_interrupted_timed_acquire(self):
168n/a # Test to make sure we recompute lock acquisition timeouts when we
169n/a # receive a signal. Check this by repeatedly interrupting a lock
170n/a # acquire in the main thread, and make sure that the lock acquire times
171n/a # out after the right amount of time.
172n/a # NOTE: this test only behaves as expected if C signals get delivered
173n/a # to the main thread. Otherwise lock.acquire() itself doesn't get
174n/a # interrupted and the test trivially succeeds.
175n/a self.start = None
176n/a self.end = None
177n/a self.sigs_recvd = 0
178n/a done = thread.allocate_lock()
179n/a done.acquire()
180n/a lock = thread.allocate_lock()
181n/a lock.acquire()
182n/a def my_handler(signum, frame):
183n/a self.sigs_recvd += 1
184n/a old_handler = signal.signal(signal.SIGUSR1, my_handler)
185n/a try:
186n/a def timed_acquire():
187n/a self.start = time.time()
188n/a lock.acquire(timeout=0.5)
189n/a self.end = time.time()
190n/a def send_signals():
191n/a for _ in range(40):
192n/a time.sleep(0.02)
193n/a os.kill(process_pid, signal.SIGUSR1)
194n/a done.release()
195n/a
196n/a # Send the signals from the non-main thread, since the main thread
197n/a # is the only one that can process signals.
198n/a thread.start_new_thread(send_signals, ())
199n/a timed_acquire()
200n/a # Wait for thread to finish
201n/a done.acquire()
202n/a # This allows for some timing and scheduling imprecision
203n/a self.assertLess(self.end - self.start, 2.0)
204n/a self.assertGreater(self.end - self.start, 0.3)
205n/a # If the signal is received several times before PyErr_CheckSignals()
206n/a # is called, the handler will get called less than 40 times. Just
207n/a # check it's been called at least once.
208n/a self.assertGreater(self.sigs_recvd, 0)
209n/a finally:
210n/a signal.signal(signal.SIGUSR1, old_handler)
211n/a
212n/a
213n/adef test_main():
214n/a global signal_blackboard
215n/a
216n/a signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
217n/a signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
218n/a signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
219n/a
220n/a oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
221n/a try:
222n/a run_unittest(ThreadSignals)
223n/a finally:
224n/a registerSignals(*oldsigs)
225n/a
226n/aif __name__ == '__main__':
227n/a test_main()