ยปCore Development>Code coverage>Lib/test/test_threading.py

Python code coverage for Lib/test/test_threading.py

#countcontent
1n/a"""
2n/aTests for the threading module.
3n/a"""
4n/a
5n/aimport test.support
6n/afrom test.support import (verbose, import_module, cpython_only,
7n/a requires_type_collecting)
8n/afrom test.support.script_helper import assert_python_ok, assert_python_failure
9n/a
10n/aimport random
11n/aimport sys
12n/a_thread = import_module('_thread')
13n/athreading = import_module('threading')
14n/aimport time
15n/aimport unittest
16n/aimport weakref
17n/aimport os
18n/aimport subprocess
19n/a
20n/afrom test import lock_tests
21n/afrom test import support
22n/a
23n/a
24n/a# Between fork() and exec(), only async-safe functions are allowed (issues
25n/a# #12316 and #11870), and fork() from a worker thread is known to trigger
26n/a# problems with some operating systems (issue #3863): skip problematic tests
27n/a# on platforms known to behave badly.
28n/aplatforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
29n/a 'hp-ux11')
30n/a
31n/a
32n/a# A trivial mutable counter.
33n/aclass Counter(object):
34n/a def __init__(self):
35n/a self.value = 0
36n/a def inc(self):
37n/a self.value += 1
38n/a def dec(self):
39n/a self.value -= 1
40n/a def get(self):
41n/a return self.value
42n/a
43n/aclass TestThread(threading.Thread):
44n/a def __init__(self, name, testcase, sema, mutex, nrunning):
45n/a threading.Thread.__init__(self, name=name)
46n/a self.testcase = testcase
47n/a self.sema = sema
48n/a self.mutex = mutex
49n/a self.nrunning = nrunning
50n/a
51n/a def run(self):
52n/a delay = random.random() / 10000.0
53n/a if verbose:
54n/a print('task %s will run for %.1f usec' %
55n/a (self.name, delay * 1e6))
56n/a
57n/a with self.sema:
58n/a with self.mutex:
59n/a self.nrunning.inc()
60n/a if verbose:
61n/a print(self.nrunning.get(), 'tasks are running')
62n/a self.testcase.assertLessEqual(self.nrunning.get(), 3)
63n/a
64n/a time.sleep(delay)
65n/a if verbose:
66n/a print('task', self.name, 'done')
67n/a
68n/a with self.mutex:
69n/a self.nrunning.dec()
70n/a self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
71n/a if verbose:
72n/a print('%s is finished. %d tasks are running' %
73n/a (self.name, self.nrunning.get()))
74n/a
75n/a
76n/aclass BaseTestCase(unittest.TestCase):
77n/a def setUp(self):
78n/a self._threads = test.support.threading_setup()
79n/a
80n/a def tearDown(self):
81n/a test.support.threading_cleanup(*self._threads)
82n/a test.support.reap_children()
83n/a
84n/a
85n/aclass ThreadTests(BaseTestCase):
86n/a
87n/a # Create a bunch of threads, let each do some work, wait until all are
88n/a # done.
89n/a def test_various_ops(self):
90n/a # This takes about n/3 seconds to run (about n/3 clumps of tasks,
91n/a # times about 1 second per clump).
92n/a NUMTASKS = 10
93n/a
94n/a # no more than 3 of the 10 can run at once
95n/a sema = threading.BoundedSemaphore(value=3)
96n/a mutex = threading.RLock()
97n/a numrunning = Counter()
98n/a
99n/a threads = []
100n/a
101n/a for i in range(NUMTASKS):
102n/a t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
103n/a threads.append(t)
104n/a self.assertIsNone(t.ident)
105n/a self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
106n/a t.start()
107n/a
108n/a if verbose:
109n/a print('waiting for all tasks to complete')
110n/a for t in threads:
111n/a t.join()
112n/a self.assertFalse(t.is_alive())
113n/a self.assertNotEqual(t.ident, 0)
114n/a self.assertIsNotNone(t.ident)
115n/a self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
116n/a if verbose:
117n/a print('all tasks done')
118n/a self.assertEqual(numrunning.get(), 0)
119n/a
120n/a def test_ident_of_no_threading_threads(self):
121n/a # The ident still must work for the main thread and dummy threads.
122n/a self.assertIsNotNone(threading.currentThread().ident)
123n/a def f():
124n/a ident.append(threading.currentThread().ident)
125n/a done.set()
126n/a done = threading.Event()
127n/a ident = []
128n/a _thread.start_new_thread(f, ())
129n/a done.wait()
130n/a self.assertIsNotNone(ident[0])
131n/a # Kill the "immortal" _DummyThread
132n/a del threading._active[ident[0]]
133n/a
134n/a # run with a small(ish) thread stack size (256kB)
135n/a def test_various_ops_small_stack(self):
136n/a if verbose:
137n/a print('with 256kB thread stack size...')
138n/a try:
139n/a threading.stack_size(262144)
140n/a except _thread.error:
141n/a raise unittest.SkipTest(
142n/a 'platform does not support changing thread stack size')
143n/a self.test_various_ops()
144n/a threading.stack_size(0)
145n/a
146n/a # run with a large thread stack size (1MB)
147n/a def test_various_ops_large_stack(self):
148n/a if verbose:
149n/a print('with 1MB thread stack size...')
150n/a try:
151n/a threading.stack_size(0x100000)
152n/a except _thread.error:
153n/a raise unittest.SkipTest(
154n/a 'platform does not support changing thread stack size')
155n/a self.test_various_ops()
156n/a threading.stack_size(0)
157n/a
158n/a def test_foreign_thread(self):
159n/a # Check that a "foreign" thread can use the threading module.
160n/a def f(mutex):
161n/a # Calling current_thread() forces an entry for the foreign
162n/a # thread to get made in the threading._active map.
163n/a threading.current_thread()
164n/a mutex.release()
165n/a
166n/a mutex = threading.Lock()
167n/a mutex.acquire()
168n/a tid = _thread.start_new_thread(f, (mutex,))
169n/a # Wait for the thread to finish.
170n/a mutex.acquire()
171n/a self.assertIn(tid, threading._active)
172n/a self.assertIsInstance(threading._active[tid], threading._DummyThread)
173n/a del threading._active[tid]
174n/a
175n/a # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
176n/a # exposed at the Python level. This test relies on ctypes to get at it.
177n/a def test_PyThreadState_SetAsyncExc(self):
178n/a ctypes = import_module("ctypes")
179n/a
180n/a set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
181n/a
182n/a class AsyncExc(Exception):
183n/a pass
184n/a
185n/a exception = ctypes.py_object(AsyncExc)
186n/a
187n/a # First check it works when setting the exception from the same thread.
188n/a tid = threading.get_ident()
189n/a
190n/a try:
191n/a result = set_async_exc(ctypes.c_long(tid), exception)
192n/a # The exception is async, so we might have to keep the VM busy until
193n/a # it notices.
194n/a while True:
195n/a pass
196n/a except AsyncExc:
197n/a pass
198n/a else:
199n/a # This code is unreachable but it reflects the intent. If we wanted
200n/a # to be smarter the above loop wouldn't be infinite.
201n/a self.fail("AsyncExc not raised")
202n/a try:
203n/a self.assertEqual(result, 1) # one thread state modified
204n/a except UnboundLocalError:
205n/a # The exception was raised too quickly for us to get the result.
206n/a pass
207n/a
208n/a # `worker_started` is set by the thread when it's inside a try/except
209n/a # block waiting to catch the asynchronously set AsyncExc exception.
210n/a # `worker_saw_exception` is set by the thread upon catching that
211n/a # exception.
212n/a worker_started = threading.Event()
213n/a worker_saw_exception = threading.Event()
214n/a
215n/a class Worker(threading.Thread):
216n/a def run(self):
217n/a self.id = threading.get_ident()
218n/a self.finished = False
219n/a
220n/a try:
221n/a while True:
222n/a worker_started.set()
223n/a time.sleep(0.1)
224n/a except AsyncExc:
225n/a self.finished = True
226n/a worker_saw_exception.set()
227n/a
228n/a t = Worker()
229n/a t.daemon = True # so if this fails, we don't hang Python at shutdown
230n/a t.start()
231n/a if verbose:
232n/a print(" started worker thread")
233n/a
234n/a # Try a thread id that doesn't make sense.
235n/a if verbose:
236n/a print(" trying nonsensical thread id")
237n/a result = set_async_exc(ctypes.c_long(-1), exception)
238n/a self.assertEqual(result, 0) # no thread states modified
239n/a
240n/a # Now raise an exception in the worker thread.
241n/a if verbose:
242n/a print(" waiting for worker thread to get started")
243n/a ret = worker_started.wait()
244n/a self.assertTrue(ret)
245n/a if verbose:
246n/a print(" verifying worker hasn't exited")
247n/a self.assertFalse(t.finished)
248n/a if verbose:
249n/a print(" attempting to raise asynch exception in worker")
250n/a result = set_async_exc(ctypes.c_long(t.id), exception)
251n/a self.assertEqual(result, 1) # one thread state modified
252n/a if verbose:
253n/a print(" waiting for worker to say it caught the exception")
254n/a worker_saw_exception.wait(timeout=10)
255n/a self.assertTrue(t.finished)
256n/a if verbose:
257n/a print(" all OK -- joining worker")
258n/a if t.finished:
259n/a t.join()
260n/a # else the thread is still running, and we have no way to kill it
261n/a
262n/a def test_limbo_cleanup(self):
263n/a # Issue 7481: Failure to start thread should cleanup the limbo map.
264n/a def fail_new_thread(*args):
265n/a raise threading.ThreadError()
266n/a _start_new_thread = threading._start_new_thread
267n/a threading._start_new_thread = fail_new_thread
268n/a try:
269n/a t = threading.Thread(target=lambda: None)
270n/a self.assertRaises(threading.ThreadError, t.start)
271n/a self.assertFalse(
272n/a t in threading._limbo,
273n/a "Failed to cleanup _limbo map on failure of Thread.start().")
274n/a finally:
275n/a threading._start_new_thread = _start_new_thread
276n/a
277n/a def test_finalize_runnning_thread(self):
278n/a # Issue 1402: the PyGILState_Ensure / _Release functions may be called
279n/a # very late on python exit: on deallocation of a running thread for
280n/a # example.
281n/a import_module("ctypes")
282n/a
283n/a rc, out, err = assert_python_failure("-c", """if 1:
284n/a import ctypes, sys, time, _thread
285n/a
286n/a # This lock is used as a simple event variable.
287n/a ready = _thread.allocate_lock()
288n/a ready.acquire()
289n/a
290n/a # Module globals are cleared before __del__ is run
291n/a # So we save the functions in class dict
292n/a class C:
293n/a ensure = ctypes.pythonapi.PyGILState_Ensure
294n/a release = ctypes.pythonapi.PyGILState_Release
295n/a def __del__(self):
296n/a state = self.ensure()
297n/a self.release(state)
298n/a
299n/a def waitingThread():
300n/a x = C()
301n/a ready.release()
302n/a time.sleep(100)
303n/a
304n/a _thread.start_new_thread(waitingThread, ())
305n/a ready.acquire() # Be sure the other thread is waiting.
306n/a sys.exit(42)
307n/a """)
308n/a self.assertEqual(rc, 42)
309n/a
310n/a def test_finalize_with_trace(self):
311n/a # Issue1733757
312n/a # Avoid a deadlock when sys.settrace steps into threading._shutdown
313n/a assert_python_ok("-c", """if 1:
314n/a import sys, threading
315n/a
316n/a # A deadlock-killer, to prevent the
317n/a # testsuite to hang forever
318n/a def killer():
319n/a import os, time
320n/a time.sleep(2)
321n/a print('program blocked; aborting')
322n/a os._exit(2)
323n/a t = threading.Thread(target=killer)
324n/a t.daemon = True
325n/a t.start()
326n/a
327n/a # This is the trace function
328n/a def func(frame, event, arg):
329n/a threading.current_thread()
330n/a return func
331n/a
332n/a sys.settrace(func)
333n/a """)
334n/a
335n/a def test_join_nondaemon_on_shutdown(self):
336n/a # Issue 1722344
337n/a # Raising SystemExit skipped threading._shutdown
338n/a rc, out, err = assert_python_ok("-c", """if 1:
339n/a import threading
340n/a from time import sleep
341n/a
342n/a def child():
343n/a sleep(1)
344n/a # As a non-daemon thread we SHOULD wake up and nothing
345n/a # should be torn down yet
346n/a print("Woke up, sleep function is:", sleep)
347n/a
348n/a threading.Thread(target=child).start()
349n/a raise SystemExit
350n/a """)
351n/a self.assertEqual(out.strip(),
352n/a b"Woke up, sleep function is: <built-in function sleep>")
353n/a self.assertEqual(err, b"")
354n/a
355n/a def test_enumerate_after_join(self):
356n/a # Try hard to trigger #1703448: a thread is still returned in
357n/a # threading.enumerate() after it has been join()ed.
358n/a enum = threading.enumerate
359n/a old_interval = sys.getswitchinterval()
360n/a try:
361n/a for i in range(1, 100):
362n/a sys.setswitchinterval(i * 0.0002)
363n/a t = threading.Thread(target=lambda: None)
364n/a t.start()
365n/a t.join()
366n/a l = enum()
367n/a self.assertNotIn(t, l,
368n/a "#1703448 triggered after %d trials: %s" % (i, l))
369n/a finally:
370n/a sys.setswitchinterval(old_interval)
371n/a
372n/a def test_no_refcycle_through_target(self):
373n/a class RunSelfFunction(object):
374n/a def __init__(self, should_raise):
375n/a # The links in this refcycle from Thread back to self
376n/a # should be cleaned up when the thread completes.
377n/a self.should_raise = should_raise
378n/a self.thread = threading.Thread(target=self._run,
379n/a args=(self,),
380n/a kwargs={'yet_another':self})
381n/a self.thread.start()
382n/a
383n/a def _run(self, other_ref, yet_another):
384n/a if self.should_raise:
385n/a raise SystemExit
386n/a
387n/a cyclic_object = RunSelfFunction(should_raise=False)
388n/a weak_cyclic_object = weakref.ref(cyclic_object)
389n/a cyclic_object.thread.join()
390n/a del cyclic_object
391n/a self.assertIsNone(weak_cyclic_object(),
392n/a msg=('%d references still around' %
393n/a sys.getrefcount(weak_cyclic_object())))
394n/a
395n/a raising_cyclic_object = RunSelfFunction(should_raise=True)
396n/a weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
397n/a raising_cyclic_object.thread.join()
398n/a del raising_cyclic_object
399n/a self.assertIsNone(weak_raising_cyclic_object(),
400n/a msg=('%d references still around' %
401n/a sys.getrefcount(weak_raising_cyclic_object())))
402n/a
403n/a def test_old_threading_api(self):
404n/a # Just a quick sanity check to make sure the old method names are
405n/a # still present
406n/a t = threading.Thread()
407n/a t.isDaemon()
408n/a t.setDaemon(True)
409n/a t.getName()
410n/a t.setName("name")
411n/a t.isAlive()
412n/a e = threading.Event()
413n/a e.isSet()
414n/a threading.activeCount()
415n/a
416n/a def test_repr_daemon(self):
417n/a t = threading.Thread()
418n/a self.assertNotIn('daemon', repr(t))
419n/a t.daemon = True
420n/a self.assertIn('daemon', repr(t))
421n/a
422n/a def test_deamon_param(self):
423n/a t = threading.Thread()
424n/a self.assertFalse(t.daemon)
425n/a t = threading.Thread(daemon=False)
426n/a self.assertFalse(t.daemon)
427n/a t = threading.Thread(daemon=True)
428n/a self.assertTrue(t.daemon)
429n/a
430n/a @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
431n/a def test_dummy_thread_after_fork(self):
432n/a # Issue #14308: a dummy thread in the active list doesn't mess up
433n/a # the after-fork mechanism.
434n/a code = """if 1:
435n/a import _thread, threading, os, time
436n/a
437n/a def background_thread(evt):
438n/a # Creates and registers the _DummyThread instance
439n/a threading.current_thread()
440n/a evt.set()
441n/a time.sleep(10)
442n/a
443n/a evt = threading.Event()
444n/a _thread.start_new_thread(background_thread, (evt,))
445n/a evt.wait()
446n/a assert threading.active_count() == 2, threading.active_count()
447n/a if os.fork() == 0:
448n/a assert threading.active_count() == 1, threading.active_count()
449n/a os._exit(0)
450n/a else:
451n/a os.wait()
452n/a """
453n/a _, out, err = assert_python_ok("-c", code)
454n/a self.assertEqual(out, b'')
455n/a self.assertEqual(err, b'')
456n/a
457n/a @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
458n/a def test_is_alive_after_fork(self):
459n/a # Try hard to trigger #18418: is_alive() could sometimes be True on
460n/a # threads that vanished after a fork.
461n/a old_interval = sys.getswitchinterval()
462n/a self.addCleanup(sys.setswitchinterval, old_interval)
463n/a
464n/a # Make the bug more likely to manifest.
465n/a test.support.setswitchinterval(1e-6)
466n/a
467n/a for i in range(20):
468n/a t = threading.Thread(target=lambda: None)
469n/a t.start()
470n/a self.addCleanup(t.join)
471n/a pid = os.fork()
472n/a if pid == 0:
473n/a os._exit(1 if t.is_alive() else 0)
474n/a else:
475n/a pid, status = os.waitpid(pid, 0)
476n/a self.assertEqual(0, status)
477n/a
478n/a def test_main_thread(self):
479n/a main = threading.main_thread()
480n/a self.assertEqual(main.name, 'MainThread')
481n/a self.assertEqual(main.ident, threading.current_thread().ident)
482n/a self.assertEqual(main.ident, threading.get_ident())
483n/a
484n/a def f():
485n/a self.assertNotEqual(threading.main_thread().ident,
486n/a threading.current_thread().ident)
487n/a th = threading.Thread(target=f)
488n/a th.start()
489n/a th.join()
490n/a
491n/a @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
492n/a @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
493n/a def test_main_thread_after_fork(self):
494n/a code = """if 1:
495n/a import os, threading
496n/a
497n/a pid = os.fork()
498n/a if pid == 0:
499n/a main = threading.main_thread()
500n/a print(main.name)
501n/a print(main.ident == threading.current_thread().ident)
502n/a print(main.ident == threading.get_ident())
503n/a else:
504n/a os.waitpid(pid, 0)
505n/a """
506n/a _, out, err = assert_python_ok("-c", code)
507n/a data = out.decode().replace('\r', '')
508n/a self.assertEqual(err, b"")
509n/a self.assertEqual(data, "MainThread\nTrue\nTrue\n")
510n/a
511n/a @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
512n/a @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
513n/a @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
514n/a def test_main_thread_after_fork_from_nonmain_thread(self):
515n/a code = """if 1:
516n/a import os, threading, sys
517n/a
518n/a def f():
519n/a pid = os.fork()
520n/a if pid == 0:
521n/a main = threading.main_thread()
522n/a print(main.name)
523n/a print(main.ident == threading.current_thread().ident)
524n/a print(main.ident == threading.get_ident())
525n/a # stdout is fully buffered because not a tty,
526n/a # we have to flush before exit.
527n/a sys.stdout.flush()
528n/a else:
529n/a os.waitpid(pid, 0)
530n/a
531n/a th = threading.Thread(target=f)
532n/a th.start()
533n/a th.join()
534n/a """
535n/a _, out, err = assert_python_ok("-c", code)
536n/a data = out.decode().replace('\r', '')
537n/a self.assertEqual(err, b"")
538n/a self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
539n/a
540n/a def test_tstate_lock(self):
541n/a # Test an implementation detail of Thread objects.
542n/a started = _thread.allocate_lock()
543n/a finish = _thread.allocate_lock()
544n/a started.acquire()
545n/a finish.acquire()
546n/a def f():
547n/a started.release()
548n/a finish.acquire()
549n/a time.sleep(0.01)
550n/a # The tstate lock is None until the thread is started
551n/a t = threading.Thread(target=f)
552n/a self.assertIs(t._tstate_lock, None)
553n/a t.start()
554n/a started.acquire()
555n/a self.assertTrue(t.is_alive())
556n/a # The tstate lock can't be acquired when the thread is running
557n/a # (or suspended).
558n/a tstate_lock = t._tstate_lock
559n/a self.assertFalse(tstate_lock.acquire(timeout=0), False)
560n/a finish.release()
561n/a # When the thread ends, the state_lock can be successfully
562n/a # acquired.
563n/a self.assertTrue(tstate_lock.acquire(timeout=5), False)
564n/a # But is_alive() is still True: we hold _tstate_lock now, which
565n/a # prevents is_alive() from knowing the thread's end-of-life C code
566n/a # is done.
567n/a self.assertTrue(t.is_alive())
568n/a # Let is_alive() find out the C code is done.
569n/a tstate_lock.release()
570n/a self.assertFalse(t.is_alive())
571n/a # And verify the thread disposed of _tstate_lock.
572n/a self.assertIsNone(t._tstate_lock)
573n/a
574n/a def test_repr_stopped(self):
575n/a # Verify that "stopped" shows up in repr(Thread) appropriately.
576n/a started = _thread.allocate_lock()
577n/a finish = _thread.allocate_lock()
578n/a started.acquire()
579n/a finish.acquire()
580n/a def f():
581n/a started.release()
582n/a finish.acquire()
583n/a t = threading.Thread(target=f)
584n/a t.start()
585n/a started.acquire()
586n/a self.assertIn("started", repr(t))
587n/a finish.release()
588n/a # "stopped" should appear in the repr in a reasonable amount of time.
589n/a # Implementation detail: as of this writing, that's trivially true
590n/a # if .join() is called, and almost trivially true if .is_alive() is
591n/a # called. The detail we're testing here is that "stopped" shows up
592n/a # "all on its own".
593n/a LOOKING_FOR = "stopped"
594n/a for i in range(500):
595n/a if LOOKING_FOR in repr(t):
596n/a break
597n/a time.sleep(0.01)
598n/a self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
599n/a
600n/a def test_BoundedSemaphore_limit(self):
601n/a # BoundedSemaphore should raise ValueError if released too often.
602n/a for limit in range(1, 10):
603n/a bs = threading.BoundedSemaphore(limit)
604n/a threads = [threading.Thread(target=bs.acquire)
605n/a for _ in range(limit)]
606n/a for t in threads:
607n/a t.start()
608n/a for t in threads:
609n/a t.join()
610n/a threads = [threading.Thread(target=bs.release)
611n/a for _ in range(limit)]
612n/a for t in threads:
613n/a t.start()
614n/a for t in threads:
615n/a t.join()
616n/a self.assertRaises(ValueError, bs.release)
617n/a
618n/a @cpython_only
619n/a def test_frame_tstate_tracing(self):
620n/a # Issue #14432: Crash when a generator is created in a C thread that is
621n/a # destroyed while the generator is still used. The issue was that a
622n/a # generator contains a frame, and the frame kept a reference to the
623n/a # Python state of the destroyed C thread. The crash occurs when a trace
624n/a # function is setup.
625n/a
626n/a def noop_trace(frame, event, arg):
627n/a # no operation
628n/a return noop_trace
629n/a
630n/a def generator():
631n/a while 1:
632n/a yield "generator"
633n/a
634n/a def callback():
635n/a if callback.gen is None:
636n/a callback.gen = generator()
637n/a return next(callback.gen)
638n/a callback.gen = None
639n/a
640n/a old_trace = sys.gettrace()
641n/a sys.settrace(noop_trace)
642n/a try:
643n/a # Install a trace function
644n/a threading.settrace(noop_trace)
645n/a
646n/a # Create a generator in a C thread which exits after the call
647n/a import _testcapi
648n/a _testcapi.call_in_temporary_c_thread(callback)
649n/a
650n/a # Call the generator in a different Python thread, check that the
651n/a # generator didn't keep a reference to the destroyed thread state
652n/a for test in range(3):
653n/a # The trace function is still called here
654n/a callback()
655n/a finally:
656n/a sys.settrace(old_trace)
657n/a
658n/a
659n/aclass ThreadJoinOnShutdown(BaseTestCase):
660n/a
661n/a def _run_and_join(self, script):
662n/a script = """if 1:
663n/a import sys, os, time, threading
664n/a
665n/a # a thread, which waits for the main program to terminate
666n/a def joiningfunc(mainthread):
667n/a mainthread.join()
668n/a print('end of thread')
669n/a # stdout is fully buffered because not a tty, we have to flush
670n/a # before exit.
671n/a sys.stdout.flush()
672n/a \n""" + script
673n/a
674n/a rc, out, err = assert_python_ok("-c", script)
675n/a data = out.decode().replace('\r', '')
676n/a self.assertEqual(data, "end of main\nend of thread\n")
677n/a
678n/a def test_1_join_on_shutdown(self):
679n/a # The usual case: on exit, wait for a non-daemon thread
680n/a script = """if 1:
681n/a import os
682n/a t = threading.Thread(target=joiningfunc,
683n/a args=(threading.current_thread(),))
684n/a t.start()
685n/a time.sleep(0.1)
686n/a print('end of main')
687n/a """
688n/a self._run_and_join(script)
689n/a
690n/a @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
691n/a @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
692n/a def test_2_join_in_forked_process(self):
693n/a # Like the test above, but from a forked interpreter
694n/a script = """if 1:
695n/a childpid = os.fork()
696n/a if childpid != 0:
697n/a os.waitpid(childpid, 0)
698n/a sys.exit(0)
699n/a
700n/a t = threading.Thread(target=joiningfunc,
701n/a args=(threading.current_thread(),))
702n/a t.start()
703n/a print('end of main')
704n/a """
705n/a self._run_and_join(script)
706n/a
707n/a @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
708n/a @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
709n/a def test_3_join_in_forked_from_thread(self):
710n/a # Like the test above, but fork() was called from a worker thread
711n/a # In the forked process, the main Thread object must be marked as stopped.
712n/a
713n/a script = """if 1:
714n/a main_thread = threading.current_thread()
715n/a def worker():
716n/a childpid = os.fork()
717n/a if childpid != 0:
718n/a os.waitpid(childpid, 0)
719n/a sys.exit(0)
720n/a
721n/a t = threading.Thread(target=joiningfunc,
722n/a args=(main_thread,))
723n/a print('end of main')
724n/a t.start()
725n/a t.join() # Should not block: main_thread is already stopped
726n/a
727n/a w = threading.Thread(target=worker)
728n/a w.start()
729n/a """
730n/a self._run_and_join(script)
731n/a
732n/a @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
733n/a def test_4_daemon_threads(self):
734n/a # Check that a daemon thread cannot crash the interpreter on shutdown
735n/a # by manipulating internal structures that are being disposed of in
736n/a # the main thread.
737n/a script = """if True:
738n/a import os
739n/a import random
740n/a import sys
741n/a import time
742n/a import threading
743n/a
744n/a thread_has_run = set()
745n/a
746n/a def random_io():
747n/a '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
748n/a while True:
749n/a in_f = open(os.__file__, 'rb')
750n/a stuff = in_f.read(200)
751n/a null_f = open(os.devnull, 'wb')
752n/a null_f.write(stuff)
753n/a time.sleep(random.random() / 1995)
754n/a null_f.close()
755n/a in_f.close()
756n/a thread_has_run.add(threading.current_thread())
757n/a
758n/a def main():
759n/a count = 0
760n/a for _ in range(40):
761n/a new_thread = threading.Thread(target=random_io)
762n/a new_thread.daemon = True
763n/a new_thread.start()
764n/a count += 1
765n/a while len(thread_has_run) < count:
766n/a time.sleep(0.001)
767n/a # Trigger process shutdown
768n/a sys.exit(0)
769n/a
770n/a main()
771n/a """
772n/a rc, out, err = assert_python_ok('-c', script)
773n/a self.assertFalse(err)
774n/a
775n/a @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
776n/a @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
777n/a def test_reinit_tls_after_fork(self):
778n/a # Issue #13817: fork() would deadlock in a multithreaded program with
779n/a # the ad-hoc TLS implementation.
780n/a
781n/a def do_fork_and_wait():
782n/a # just fork a child process and wait it
783n/a pid = os.fork()
784n/a if pid > 0:
785n/a os.waitpid(pid, 0)
786n/a else:
787n/a os._exit(0)
788n/a
789n/a # start a bunch of threads that will fork() child processes
790n/a threads = []
791n/a for i in range(16):
792n/a t = threading.Thread(target=do_fork_and_wait)
793n/a threads.append(t)
794n/a t.start()
795n/a
796n/a for t in threads:
797n/a t.join()
798n/a
799n/a @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
800n/a def test_clear_threads_states_after_fork(self):
801n/a # Issue #17094: check that threads states are cleared after fork()
802n/a
803n/a # start a bunch of threads
804n/a threads = []
805n/a for i in range(16):
806n/a t = threading.Thread(target=lambda : time.sleep(0.3))
807n/a threads.append(t)
808n/a t.start()
809n/a
810n/a pid = os.fork()
811n/a if pid == 0:
812n/a # check that threads states have been cleared
813n/a if len(sys._current_frames()) == 1:
814n/a os._exit(0)
815n/a else:
816n/a os._exit(1)
817n/a else:
818n/a _, status = os.waitpid(pid, 0)
819n/a self.assertEqual(0, status)
820n/a
821n/a for t in threads:
822n/a t.join()
823n/a
824n/a
825n/aclass SubinterpThreadingTests(BaseTestCase):
826n/a
827n/a def test_threads_join(self):
828n/a # Non-daemon threads should be joined at subinterpreter shutdown
829n/a # (issue #18808)
830n/a r, w = os.pipe()
831n/a self.addCleanup(os.close, r)
832n/a self.addCleanup(os.close, w)
833n/a code = r"""if 1:
834n/a import os
835n/a import threading
836n/a import time
837n/a
838n/a def f():
839n/a # Sleep a bit so that the thread is still running when
840n/a # Py_EndInterpreter is called.
841n/a time.sleep(0.05)
842n/a os.write(%d, b"x")
843n/a threading.Thread(target=f).start()
844n/a """ % (w,)
845n/a ret = test.support.run_in_subinterp(code)
846n/a self.assertEqual(ret, 0)
847n/a # The thread was joined properly.
848n/a self.assertEqual(os.read(r, 1), b"x")
849n/a
850n/a def test_threads_join_2(self):
851n/a # Same as above, but a delay gets introduced after the thread's
852n/a # Python code returned but before the thread state is deleted.
853n/a # To achieve this, we register a thread-local object which sleeps
854n/a # a bit when deallocated.
855n/a r, w = os.pipe()
856n/a self.addCleanup(os.close, r)
857n/a self.addCleanup(os.close, w)
858n/a code = r"""if 1:
859n/a import os
860n/a import threading
861n/a import time
862n/a
863n/a class Sleeper:
864n/a def __del__(self):
865n/a time.sleep(0.05)
866n/a
867n/a tls = threading.local()
868n/a
869n/a def f():
870n/a # Sleep a bit so that the thread is still running when
871n/a # Py_EndInterpreter is called.
872n/a time.sleep(0.05)
873n/a tls.x = Sleeper()
874n/a os.write(%d, b"x")
875n/a threading.Thread(target=f).start()
876n/a """ % (w,)
877n/a ret = test.support.run_in_subinterp(code)
878n/a self.assertEqual(ret, 0)
879n/a # The thread was joined properly.
880n/a self.assertEqual(os.read(r, 1), b"x")
881n/a
882n/a @cpython_only
883n/a def test_daemon_threads_fatal_error(self):
884n/a subinterp_code = r"""if 1:
885n/a import os
886n/a import threading
887n/a import time
888n/a
889n/a def f():
890n/a # Make sure the daemon thread is still running when
891n/a # Py_EndInterpreter is called.
892n/a time.sleep(10)
893n/a threading.Thread(target=f, daemon=True).start()
894n/a """
895n/a script = r"""if 1:
896n/a import _testcapi
897n/a
898n/a _testcapi.run_in_subinterp(%r)
899n/a """ % (subinterp_code,)
900n/a with test.support.SuppressCrashReport():
901n/a rc, out, err = assert_python_failure("-c", script)
902n/a self.assertIn("Fatal Python error: Py_EndInterpreter: "
903n/a "not the last thread", err.decode())
904n/a
905n/a
906n/aclass ThreadingExceptionTests(BaseTestCase):
907n/a # A RuntimeError should be raised if Thread.start() is called
908n/a # multiple times.
909n/a def test_start_thread_again(self):
910n/a thread = threading.Thread()
911n/a thread.start()
912n/a self.assertRaises(RuntimeError, thread.start)
913n/a
914n/a def test_joining_current_thread(self):
915n/a current_thread = threading.current_thread()
916n/a self.assertRaises(RuntimeError, current_thread.join);
917n/a
918n/a def test_joining_inactive_thread(self):
919n/a thread = threading.Thread()
920n/a self.assertRaises(RuntimeError, thread.join)
921n/a
922n/a def test_daemonize_active_thread(self):
923n/a thread = threading.Thread()
924n/a thread.start()
925n/a self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
926n/a
927n/a def test_releasing_unacquired_lock(self):
928n/a lock = threading.Lock()
929n/a self.assertRaises(RuntimeError, lock.release)
930n/a
931n/a @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(),
932n/a 'test macosx problem')
933n/a def test_recursion_limit(self):
934n/a # Issue 9670
935n/a # test that excessive recursion within a non-main thread causes
936n/a # an exception rather than crashing the interpreter on platforms
937n/a # like Mac OS X or FreeBSD which have small default stack sizes
938n/a # for threads
939n/a script = """if True:
940n/a import threading
941n/a
942n/a def recurse():
943n/a return recurse()
944n/a
945n/a def outer():
946n/a try:
947n/a recurse()
948n/a except RecursionError:
949n/a pass
950n/a
951n/a w = threading.Thread(target=outer)
952n/a w.start()
953n/a w.join()
954n/a print('end of main thread')
955n/a """
956n/a expected_output = "end of main thread\n"
957n/a p = subprocess.Popen([sys.executable, "-c", script],
958n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE)
959n/a stdout, stderr = p.communicate()
960n/a data = stdout.decode().replace('\r', '')
961n/a self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
962n/a self.assertEqual(data, expected_output)
963n/a
964n/a def test_print_exception(self):
965n/a script = r"""if True:
966n/a import threading
967n/a import time
968n/a
969n/a running = False
970n/a def run():
971n/a global running
972n/a running = True
973n/a while running:
974n/a time.sleep(0.01)
975n/a 1/0
976n/a t = threading.Thread(target=run)
977n/a t.start()
978n/a while not running:
979n/a time.sleep(0.01)
980n/a running = False
981n/a t.join()
982n/a """
983n/a rc, out, err = assert_python_ok("-c", script)
984n/a self.assertEqual(out, b'')
985n/a err = err.decode()
986n/a self.assertIn("Exception in thread", err)
987n/a self.assertIn("Traceback (most recent call last):", err)
988n/a self.assertIn("ZeroDivisionError", err)
989n/a self.assertNotIn("Unhandled exception", err)
990n/a
991n/a @requires_type_collecting
992n/a def test_print_exception_stderr_is_none_1(self):
993n/a script = r"""if True:
994n/a import sys
995n/a import threading
996n/a import time
997n/a
998n/a running = False
999n/a def run():
1000n/a global running
1001n/a running = True
1002n/a while running:
1003n/a time.sleep(0.01)
1004n/a 1/0
1005n/a t = threading.Thread(target=run)
1006n/a t.start()
1007n/a while not running:
1008n/a time.sleep(0.01)
1009n/a sys.stderr = None
1010n/a running = False
1011n/a t.join()
1012n/a """
1013n/a rc, out, err = assert_python_ok("-c", script)
1014n/a self.assertEqual(out, b'')
1015n/a err = err.decode()
1016n/a self.assertIn("Exception in thread", err)
1017n/a self.assertIn("Traceback (most recent call last):", err)
1018n/a self.assertIn("ZeroDivisionError", err)
1019n/a self.assertNotIn("Unhandled exception", err)
1020n/a
1021n/a def test_print_exception_stderr_is_none_2(self):
1022n/a script = r"""if True:
1023n/a import sys
1024n/a import threading
1025n/a import time
1026n/a
1027n/a running = False
1028n/a def run():
1029n/a global running
1030n/a running = True
1031n/a while running:
1032n/a time.sleep(0.01)
1033n/a 1/0
1034n/a sys.stderr = None
1035n/a t = threading.Thread(target=run)
1036n/a t.start()
1037n/a while not running:
1038n/a time.sleep(0.01)
1039n/a running = False
1040n/a t.join()
1041n/a """
1042n/a rc, out, err = assert_python_ok("-c", script)
1043n/a self.assertEqual(out, b'')
1044n/a self.assertNotIn("Unhandled exception", err.decode())
1045n/a
1046n/a def test_bare_raise_in_brand_new_thread(self):
1047n/a def bare_raise():
1048n/a raise
1049n/a
1050n/a class Issue27558(threading.Thread):
1051n/a exc = None
1052n/a
1053n/a def run(self):
1054n/a try:
1055n/a bare_raise()
1056n/a except Exception as exc:
1057n/a self.exc = exc
1058n/a
1059n/a thread = Issue27558()
1060n/a thread.start()
1061n/a thread.join()
1062n/a self.assertIsNotNone(thread.exc)
1063n/a self.assertIsInstance(thread.exc, RuntimeError)
1064n/a
1065n/aclass TimerTests(BaseTestCase):
1066n/a
1067n/a def setUp(self):
1068n/a BaseTestCase.setUp(self)
1069n/a self.callback_args = []
1070n/a self.callback_event = threading.Event()
1071n/a
1072n/a def test_init_immutable_default_args(self):
1073n/a # Issue 17435: constructor defaults were mutable objects, they could be
1074n/a # mutated via the object attributes and affect other Timer objects.
1075n/a timer1 = threading.Timer(0.01, self._callback_spy)
1076n/a timer1.start()
1077n/a self.callback_event.wait()
1078n/a timer1.args.append("blah")
1079n/a timer1.kwargs["foo"] = "bar"
1080n/a self.callback_event.clear()
1081n/a timer2 = threading.Timer(0.01, self._callback_spy)
1082n/a timer2.start()
1083n/a self.callback_event.wait()
1084n/a self.assertEqual(len(self.callback_args), 2)
1085n/a self.assertEqual(self.callback_args, [((), {}), ((), {})])
1086n/a
1087n/a def _callback_spy(self, *args, **kwargs):
1088n/a self.callback_args.append((args[:], kwargs.copy()))
1089n/a self.callback_event.set()
1090n/a
1091n/aclass LockTests(lock_tests.LockTests):
1092n/a locktype = staticmethod(threading.Lock)
1093n/a
1094n/aclass PyRLockTests(lock_tests.RLockTests):
1095n/a locktype = staticmethod(threading._PyRLock)
1096n/a
1097n/a@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1098n/aclass CRLockTests(lock_tests.RLockTests):
1099n/a locktype = staticmethod(threading._CRLock)
1100n/a
1101n/aclass EventTests(lock_tests.EventTests):
1102n/a eventtype = staticmethod(threading.Event)
1103n/a
1104n/aclass ConditionAsRLockTests(lock_tests.RLockTests):
1105n/a # Condition uses an RLock by default and exports its API.
1106n/a locktype = staticmethod(threading.Condition)
1107n/a
1108n/aclass ConditionTests(lock_tests.ConditionTests):
1109n/a condtype = staticmethod(threading.Condition)
1110n/a
1111n/aclass SemaphoreTests(lock_tests.SemaphoreTests):
1112n/a semtype = staticmethod(threading.Semaphore)
1113n/a
1114n/aclass BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
1115n/a semtype = staticmethod(threading.BoundedSemaphore)
1116n/a
1117n/aclass BarrierTests(lock_tests.BarrierTests):
1118n/a barriertype = staticmethod(threading.Barrier)
1119n/a
1120n/aclass MiscTestCase(unittest.TestCase):
1121n/a def test__all__(self):
1122n/a extra = {"ThreadError"}
1123n/a blacklist = {'currentThread', 'activeCount'}
1124n/a support.check__all__(self, threading, ('threading', '_thread'),
1125n/a extra=extra, blacklist=blacklist)
1126n/a
1127n/aif __name__ == "__main__":
1128n/a unittest.main()