ยปCore Development>Code coverage>Lib/test/test_dummy_threading.py

Python code coverage for Lib/test/test_dummy_threading.py

#countcontent
1n/afrom test import support
2n/aimport unittest
3n/aimport dummy_threading as _threading
4n/aimport time
5n/a
6n/aclass DummyThreadingTestCase(unittest.TestCase):
7n/a
8n/a class TestThread(_threading.Thread):
9n/a
10n/a def run(self):
11n/a global running
12n/a global sema
13n/a global mutex
14n/a # Uncomment if testing another module, such as the real 'threading'
15n/a # module.
16n/a #delay = random.random() * 2
17n/a delay = 0
18n/a if support.verbose:
19n/a print('task', self.name, 'will run for', delay, 'sec')
20n/a sema.acquire()
21n/a mutex.acquire()
22n/a running += 1
23n/a if support.verbose:
24n/a print(running, 'tasks are running')
25n/a mutex.release()
26n/a time.sleep(delay)
27n/a if support.verbose:
28n/a print('task', self.name, 'done')
29n/a mutex.acquire()
30n/a running -= 1
31n/a if support.verbose:
32n/a print(self.name, 'is finished.', running, 'tasks are running')
33n/a mutex.release()
34n/a sema.release()
35n/a
36n/a def setUp(self):
37n/a self.numtasks = 10
38n/a global sema
39n/a sema = _threading.BoundedSemaphore(value=3)
40n/a global mutex
41n/a mutex = _threading.RLock()
42n/a global running
43n/a running = 0
44n/a self.threads = []
45n/a
46n/a def test_tasks(self):
47n/a for i in range(self.numtasks):
48n/a t = self.TestThread(name="<thread %d>"%i)
49n/a self.threads.append(t)
50n/a t.start()
51n/a
52n/a if support.verbose:
53n/a print('waiting for all tasks to complete')
54n/a for t in self.threads:
55n/a t.join()
56n/a if support.verbose:
57n/a print('all tasks done')
58n/a
59n/aif __name__ == '__main__':
60n/a unittest.main()