ยปCore Development>Code coverage>Lib/test/test_importlib/test_locks.py

Python code coverage for Lib/test/test_importlib/test_locks.py

#countcontent
1n/afrom . import util as test_util
2n/a
3n/ainit = test_util.import_importlib('importlib')
4n/a
5n/aimport sys
6n/aimport unittest
7n/aimport weakref
8n/a
9n/afrom test import support
10n/a
11n/atry:
12n/a import threading
13n/aexcept ImportError:
14n/a threading = None
15n/aelse:
16n/a from test import lock_tests
17n/a
18n/aif threading is not None:
19n/a class ModuleLockAsRLockTests:
20n/a locktype = classmethod(lambda cls: cls.LockType("some_lock"))
21n/a
22n/a # _is_owned() unsupported
23n/a test__is_owned = None
24n/a # acquire(blocking=False) unsupported
25n/a test_try_acquire = None
26n/a test_try_acquire_contended = None
27n/a # `with` unsupported
28n/a test_with = None
29n/a # acquire(timeout=...) unsupported
30n/a test_timeout = None
31n/a # _release_save() unsupported
32n/a test_release_save_unacquired = None
33n/a # lock status in repr unsupported
34n/a test_repr = None
35n/a test_locked_repr = None
36n/a
37n/a LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
38n/a for kind, splitinit in init.items()}
39n/a
40n/a (Frozen_ModuleLockAsRLockTests,
41n/a Source_ModuleLockAsRLockTests
42n/a ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
43n/a LockType=LOCK_TYPES)
44n/aelse:
45n/a LOCK_TYPES = {}
46n/a
47n/a class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
48n/a pass
49n/a
50n/a class Source_ModuleLockAsRLockTests(unittest.TestCase):
51n/a pass
52n/a
53n/a
54n/aif threading is not None:
55n/a class DeadlockAvoidanceTests:
56n/a
57n/a def setUp(self):
58n/a try:
59n/a self.old_switchinterval = sys.getswitchinterval()
60n/a support.setswitchinterval(0.000001)
61n/a except AttributeError:
62n/a self.old_switchinterval = None
63n/a
64n/a def tearDown(self):
65n/a if self.old_switchinterval is not None:
66n/a sys.setswitchinterval(self.old_switchinterval)
67n/a
68n/a def run_deadlock_avoidance_test(self, create_deadlock):
69n/a NLOCKS = 10
70n/a locks = [self.LockType(str(i)) for i in range(NLOCKS)]
71n/a pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
72n/a if create_deadlock:
73n/a NTHREADS = NLOCKS
74n/a else:
75n/a NTHREADS = NLOCKS - 1
76n/a barrier = threading.Barrier(NTHREADS)
77n/a results = []
78n/a
79n/a def _acquire(lock):
80n/a """Try to acquire the lock. Return True on success,
81n/a False on deadlock."""
82n/a try:
83n/a lock.acquire()
84n/a except self.DeadlockError:
85n/a return False
86n/a else:
87n/a return True
88n/a
89n/a def f():
90n/a a, b = pairs.pop()
91n/a ra = _acquire(a)
92n/a barrier.wait()
93n/a rb = _acquire(b)
94n/a results.append((ra, rb))
95n/a if rb:
96n/a b.release()
97n/a if ra:
98n/a a.release()
99n/a lock_tests.Bunch(f, NTHREADS).wait_for_finished()
100n/a self.assertEqual(len(results), NTHREADS)
101n/a return results
102n/a
103n/a def test_deadlock(self):
104n/a results = self.run_deadlock_avoidance_test(True)
105n/a # At least one of the threads detected a potential deadlock on its
106n/a # second acquire() call. It may be several of them, because the
107n/a # deadlock avoidance mechanism is conservative.
108n/a nb_deadlocks = results.count((True, False))
109n/a self.assertGreaterEqual(nb_deadlocks, 1)
110n/a self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
111n/a
112n/a def test_no_deadlock(self):
113n/a results = self.run_deadlock_avoidance_test(False)
114n/a self.assertEqual(results.count((True, False)), 0)
115n/a self.assertEqual(results.count((True, True)), len(results))
116n/a
117n/a
118n/a DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
119n/a for kind, splitinit in init.items()}
120n/a
121n/a (Frozen_DeadlockAvoidanceTests,
122n/a Source_DeadlockAvoidanceTests
123n/a ) = test_util.test_both(DeadlockAvoidanceTests,
124n/a LockType=LOCK_TYPES,
125n/a DeadlockError=DEADLOCK_ERRORS)
126n/aelse:
127n/a DEADLOCK_ERRORS = {}
128n/a
129n/a class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
130n/a pass
131n/a
132n/a class Source_DeadlockAvoidanceTests(unittest.TestCase):
133n/a pass
134n/a
135n/a
136n/aclass LifetimeTests:
137n/a
138n/a @property
139n/a def bootstrap(self):
140n/a return self.init._bootstrap
141n/a
142n/a def test_lock_lifetime(self):
143n/a name = "xyzzy"
144n/a self.assertNotIn(name, self.bootstrap._module_locks)
145n/a lock = self.bootstrap._get_module_lock(name)
146n/a self.assertIn(name, self.bootstrap._module_locks)
147n/a wr = weakref.ref(lock)
148n/a del lock
149n/a support.gc_collect()
150n/a self.assertNotIn(name, self.bootstrap._module_locks)
151n/a self.assertIsNone(wr())
152n/a
153n/a def test_all_locks(self):
154n/a support.gc_collect()
155n/a self.assertEqual(0, len(self.bootstrap._module_locks),
156n/a self.bootstrap._module_locks)
157n/a
158n/a
159n/a(Frozen_LifetimeTests,
160n/a Source_LifetimeTests
161n/a ) = test_util.test_both(LifetimeTests, init=init)
162n/a
163n/a
164n/a@support.reap_threads
165n/adef test_main():
166n/a support.run_unittest(Frozen_ModuleLockAsRLockTests,
167n/a Source_ModuleLockAsRLockTests,
168n/a Frozen_DeadlockAvoidanceTests,
169n/a Source_DeadlockAvoidanceTests,
170n/a Frozen_LifetimeTests,
171n/a Source_LifetimeTests)
172n/a
173n/a
174n/aif __name__ == '__main__':
175n/a test_main()