ยปCore Development>Code coverage>Lib/test/test_threaded_import.py

Python code coverage for Lib/test/test_threaded_import.py

#countcontent
1n/a# This is a variant of the very old (early 90's) file
2n/a# Demo/threads/bug.py. It simply provokes a number of threads into
3n/a# trying to import the same module "at the same time".
4n/a# There are no pleasant failure modes -- most likely is that Python
5n/a# complains several times about module random having no attribute
6n/a# randrange, and then Python hangs.
7n/a
8n/aimport _imp as imp
9n/aimport os
10n/aimport importlib
11n/aimport sys
12n/aimport time
13n/aimport shutil
14n/aimport unittest
15n/afrom test.support import (
16n/a verbose, import_module, run_unittest, TESTFN, reap_threads,
17n/a forget, unlink, rmtree, start_threads)
18n/athreading = import_module('threading')
19n/a
20n/adef task(N, done, done_tasks, errors):
21n/a try:
22n/a # We don't use modulefinder but still import it in order to stress
23n/a # importing of different modules from several threads.
24n/a if len(done_tasks) % 2:
25n/a import modulefinder
26n/a import random
27n/a else:
28n/a import random
29n/a import modulefinder
30n/a # This will fail if random is not completely initialized
31n/a x = random.randrange(1, 3)
32n/a except Exception as e:
33n/a errors.append(e.with_traceback(None))
34n/a finally:
35n/a done_tasks.append(threading.get_ident())
36n/a finished = len(done_tasks) == N
37n/a if finished:
38n/a done.set()
39n/a
40n/a# Create a circular import structure: A -> C -> B -> D -> A
41n/a# NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
42n/a
43n/acircular_imports_modules = {
44n/a 'A': """if 1:
45n/a import time
46n/a time.sleep(%(delay)s)
47n/a x = 'a'
48n/a import C
49n/a """,
50n/a 'B': """if 1:
51n/a import time
52n/a time.sleep(%(delay)s)
53n/a x = 'b'
54n/a import D
55n/a """,
56n/a 'C': """import B""",
57n/a 'D': """import A""",
58n/a}
59n/a
60n/aclass Finder:
61n/a """A dummy finder to detect concurrent access to its find_spec()
62n/a method."""
63n/a
64n/a def __init__(self):
65n/a self.numcalls = 0
66n/a self.x = 0
67n/a self.lock = threading.Lock()
68n/a
69n/a def find_spec(self, name, path=None, target=None):
70n/a # Simulate some thread-unsafe behaviour. If calls to find_spec()
71n/a # are properly serialized, `x` will end up the same as `numcalls`.
72n/a # Otherwise not.
73n/a assert imp.lock_held()
74n/a with self.lock:
75n/a self.numcalls += 1
76n/a x = self.x
77n/a time.sleep(0.01)
78n/a self.x = x + 1
79n/a
80n/aclass FlushingFinder:
81n/a """A dummy finder which flushes sys.path_importer_cache when it gets
82n/a called."""
83n/a
84n/a def find_spec(self, name, path=None, target=None):
85n/a sys.path_importer_cache.clear()
86n/a
87n/a
88n/aclass ThreadedImportTests(unittest.TestCase):
89n/a
90n/a def setUp(self):
91n/a self.old_random = sys.modules.pop('random', None)
92n/a
93n/a def tearDown(self):
94n/a # If the `random` module was already initialized, we restore the
95n/a # old module at the end so that pickling tests don't fail.
96n/a # See http://bugs.python.org/issue3657#msg110461
97n/a if self.old_random is not None:
98n/a sys.modules['random'] = self.old_random
99n/a
100n/a def check_parallel_module_init(self):
101n/a if imp.lock_held():
102n/a # This triggers on, e.g., from test import autotest.
103n/a raise unittest.SkipTest("can't run when import lock is held")
104n/a
105n/a done = threading.Event()
106n/a for N in (20, 50) * 3:
107n/a if verbose:
108n/a print("Trying", N, "threads ...", end=' ')
109n/a # Make sure that random and modulefinder get reimported freshly
110n/a for modname in ['random', 'modulefinder']:
111n/a try:
112n/a del sys.modules[modname]
113n/a except KeyError:
114n/a pass
115n/a errors = []
116n/a done_tasks = []
117n/a done.clear()
118n/a t0 = time.monotonic()
119n/a with start_threads(threading.Thread(target=task,
120n/a args=(N, done, done_tasks, errors,))
121n/a for i in range(N)):
122n/a pass
123n/a completed = done.wait(10 * 60)
124n/a dt = time.monotonic() - t0
125n/a if verbose:
126n/a print("%.1f ms" % (dt*1e3), flush=True, end=" ")
127n/a dbg_info = 'done: %s/%s' % (len(done_tasks), N)
128n/a self.assertFalse(errors, dbg_info)
129n/a self.assertTrue(completed, dbg_info)
130n/a if verbose:
131n/a print("OK.")
132n/a
133n/a def test_parallel_module_init(self):
134n/a self.check_parallel_module_init()
135n/a
136n/a def test_parallel_meta_path(self):
137n/a finder = Finder()
138n/a sys.meta_path.insert(0, finder)
139n/a try:
140n/a self.check_parallel_module_init()
141n/a self.assertGreater(finder.numcalls, 0)
142n/a self.assertEqual(finder.x, finder.numcalls)
143n/a finally:
144n/a sys.meta_path.remove(finder)
145n/a
146n/a def test_parallel_path_hooks(self):
147n/a # Here the Finder instance is only used to check concurrent calls
148n/a # to path_hook().
149n/a finder = Finder()
150n/a # In order for our path hook to be called at each import, we need
151n/a # to flush the path_importer_cache, which we do by registering a
152n/a # dedicated meta_path entry.
153n/a flushing_finder = FlushingFinder()
154n/a def path_hook(path):
155n/a finder.find_spec('')
156n/a raise ImportError
157n/a sys.path_hooks.insert(0, path_hook)
158n/a sys.meta_path.append(flushing_finder)
159n/a try:
160n/a # Flush the cache a first time
161n/a flushing_finder.find_spec('')
162n/a numtests = self.check_parallel_module_init()
163n/a self.assertGreater(finder.numcalls, 0)
164n/a self.assertEqual(finder.x, finder.numcalls)
165n/a finally:
166n/a sys.meta_path.remove(flushing_finder)
167n/a sys.path_hooks.remove(path_hook)
168n/a
169n/a def test_import_hangers(self):
170n/a # In case this test is run again, make sure the helper module
171n/a # gets loaded from scratch again.
172n/a try:
173n/a del sys.modules['test.threaded_import_hangers']
174n/a except KeyError:
175n/a pass
176n/a import test.threaded_import_hangers
177n/a self.assertFalse(test.threaded_import_hangers.errors)
178n/a
179n/a def test_circular_imports(self):
180n/a # The goal of this test is to exercise implementations of the import
181n/a # lock which use a per-module lock, rather than a global lock.
182n/a # In these implementations, there is a possible deadlock with
183n/a # circular imports, for example:
184n/a # - thread 1 imports A (grabbing the lock for A) which imports B
185n/a # - thread 2 imports B (grabbing the lock for B) which imports A
186n/a # Such implementations should be able to detect such situations and
187n/a # resolve them one way or the other, without freezing.
188n/a # NOTE: our test constructs a slightly less trivial import cycle,
189n/a # in order to better stress the deadlock avoidance mechanism.
190n/a delay = 0.5
191n/a os.mkdir(TESTFN)
192n/a self.addCleanup(shutil.rmtree, TESTFN)
193n/a sys.path.insert(0, TESTFN)
194n/a self.addCleanup(sys.path.remove, TESTFN)
195n/a for name, contents in circular_imports_modules.items():
196n/a contents = contents % {'delay': delay}
197n/a with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
198n/a f.write(contents.encode('utf-8'))
199n/a self.addCleanup(forget, name)
200n/a
201n/a importlib.invalidate_caches()
202n/a results = []
203n/a def import_ab():
204n/a import A
205n/a results.append(getattr(A, 'x', None))
206n/a def import_ba():
207n/a import B
208n/a results.append(getattr(B, 'x', None))
209n/a t1 = threading.Thread(target=import_ab)
210n/a t2 = threading.Thread(target=import_ba)
211n/a t1.start()
212n/a t2.start()
213n/a t1.join()
214n/a t2.join()
215n/a self.assertEqual(set(results), {'a', 'b'})
216n/a
217n/a def test_side_effect_import(self):
218n/a code = """if 1:
219n/a import threading
220n/a def target():
221n/a import random
222n/a t = threading.Thread(target=target)
223n/a t.start()
224n/a t.join()"""
225n/a sys.path.insert(0, os.curdir)
226n/a self.addCleanup(sys.path.remove, os.curdir)
227n/a filename = TESTFN + ".py"
228n/a with open(filename, "wb") as f:
229n/a f.write(code.encode('utf-8'))
230n/a self.addCleanup(unlink, filename)
231n/a self.addCleanup(forget, TESTFN)
232n/a self.addCleanup(rmtree, '__pycache__')
233n/a importlib.invalidate_caches()
234n/a __import__(TESTFN)
235n/a
236n/a
237n/a@reap_threads
238n/adef test_main():
239n/a old_switchinterval = None
240n/a try:
241n/a old_switchinterval = sys.getswitchinterval()
242n/a sys.setswitchinterval(1e-5)
243n/a except AttributeError:
244n/a pass
245n/a try:
246n/a run_unittest(ThreadedImportTests)
247n/a finally:
248n/a if old_switchinterval is not None:
249n/a sys.setswitchinterval(old_switchinterval)
250n/a
251n/aif __name__ == "__main__":
252n/a test_main()