ยปCore Development>Code coverage>Lib/test/libregrtest/runtest_mp.py

Python code coverage for Lib/test/libregrtest/runtest_mp.py

#countcontent
1n/aimport faulthandler
2n/aimport json
3n/aimport os
4n/aimport queue
5n/aimport sys
6n/aimport time
7n/aimport traceback
8n/aimport types
9n/afrom test import support
10n/atry:
11n/a import threading
12n/aexcept ImportError:
13n/a print("Multiprocess option requires thread support")
14n/a sys.exit(2)
15n/a
16n/afrom test.libregrtest.runtest import (
17n/a runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
18n/a format_test_result)
19n/afrom test.libregrtest.setup import setup_tests
20n/a
21n/a
22n/a# Display the running tests if nothing happened last N seconds
23n/aPROGRESS_UPDATE = 30.0 # seconds
24n/a
25n/a# If interrupted, display the wait progress every N seconds
26n/aWAIT_PROGRESS = 2.0 # seconds
27n/a
28n/a
29n/adef run_test_in_subprocess(testname, ns):
30n/a """Run the given test in a subprocess with --slaveargs.
31n/a
32n/a ns is the option Namespace parsed from command-line arguments. regrtest
33n/a is invoked in a subprocess with the --slaveargs argument; when the
34n/a subprocess exits, its return code, stdout and stderr are returned as a
35n/a 3-tuple.
36n/a """
37n/a from subprocess import Popen, PIPE
38n/a
39n/a ns_dict = vars(ns)
40n/a slaveargs = (ns_dict, testname)
41n/a slaveargs = json.dumps(slaveargs)
42n/a
43n/a cmd = [sys.executable, *support.args_from_interpreter_flags(),
44n/a '-u', # Unbuffered stdout and stderr
45n/a '-m', 'test.regrtest',
46n/a '--slaveargs', slaveargs]
47n/a if ns.pgo:
48n/a cmd += ['--pgo']
49n/a
50n/a # Running the child from the same working directory as regrtest's original
51n/a # invocation ensures that TEMPDIR for the child is the same when
52n/a # sysconfig.is_python_build() is true. See issue 15300.
53n/a popen = Popen(cmd,
54n/a stdout=PIPE, stderr=PIPE,
55n/a universal_newlines=True,
56n/a close_fds=(os.name != 'nt'),
57n/a cwd=support.SAVEDCWD)
58n/a with popen:
59n/a stdout, stderr = popen.communicate()
60n/a retcode = popen.wait()
61n/a return retcode, stdout, stderr
62n/a
63n/a
64n/adef run_tests_slave(slaveargs):
65n/a ns_dict, testname = json.loads(slaveargs)
66n/a ns = types.SimpleNamespace(**ns_dict)
67n/a
68n/a setup_tests(ns)
69n/a
70n/a try:
71n/a result = runtest(ns, testname)
72n/a except KeyboardInterrupt:
73n/a result = INTERRUPTED, ''
74n/a except BaseException as e:
75n/a traceback.print_exc()
76n/a result = CHILD_ERROR, str(e)
77n/a
78n/a print() # Force a newline (just in case)
79n/a print(json.dumps(result), flush=True)
80n/a sys.exit(0)
81n/a
82n/a
83n/a# We do not use a generator so multiple threads can call next().
84n/aclass MultiprocessIterator:
85n/a
86n/a """A thread-safe iterator over tests for multiprocess mode."""
87n/a
88n/a def __init__(self, tests):
89n/a self.interrupted = False
90n/a self.lock = threading.Lock()
91n/a self.tests = tests
92n/a
93n/a def __iter__(self):
94n/a return self
95n/a
96n/a def __next__(self):
97n/a with self.lock:
98n/a if self.interrupted:
99n/a raise StopIteration('tests interrupted')
100n/a return next(self.tests)
101n/a
102n/a
103n/aclass MultiprocessThread(threading.Thread):
104n/a def __init__(self, pending, output, ns):
105n/a super().__init__()
106n/a self.pending = pending
107n/a self.output = output
108n/a self.ns = ns
109n/a self.current_test = None
110n/a self.start_time = None
111n/a
112n/a def _runtest(self):
113n/a try:
114n/a test = next(self.pending)
115n/a except StopIteration:
116n/a self.output.put((None, None, None, None))
117n/a return True
118n/a
119n/a try:
120n/a self.start_time = time.monotonic()
121n/a self.current_test = test
122n/a
123n/a retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
124n/a finally:
125n/a self.current_test = None
126n/a
127n/a stdout, _, result = stdout.strip().rpartition("\n")
128n/a if retcode != 0:
129n/a result = (CHILD_ERROR, "Exit code %s" % retcode)
130n/a self.output.put((test, stdout.rstrip(), stderr.rstrip(),
131n/a result))
132n/a return False
133n/a
134n/a if not result:
135n/a self.output.put((None, None, None, None))
136n/a return True
137n/a
138n/a result = json.loads(result)
139n/a self.output.put((test, stdout.rstrip(), stderr.rstrip(),
140n/a result))
141n/a return False
142n/a
143n/a def run(self):
144n/a try:
145n/a stop = False
146n/a while not stop:
147n/a stop = self._runtest()
148n/a except BaseException:
149n/a self.output.put((None, None, None, None))
150n/a raise
151n/a
152n/a
153n/adef run_tests_multiprocess(regrtest):
154n/a output = queue.Queue()
155n/a pending = MultiprocessIterator(regrtest.tests)
156n/a test_timeout = regrtest.ns.timeout
157n/a use_timeout = (test_timeout is not None)
158n/a
159n/a workers = [MultiprocessThread(pending, output, regrtest.ns)
160n/a for i in range(regrtest.ns.use_mp)]
161n/a print("Run tests in parallel using %s child processes"
162n/a % len(workers))
163n/a for worker in workers:
164n/a worker.start()
165n/a
166n/a def get_running(workers):
167n/a running = []
168n/a for worker in workers:
169n/a current_test = worker.current_test
170n/a if not current_test:
171n/a continue
172n/a dt = time.monotonic() - worker.start_time
173n/a if dt >= PROGRESS_MIN_TIME:
174n/a running.append('%s (%.0f sec)' % (current_test, dt))
175n/a return running
176n/a
177n/a finished = 0
178n/a test_index = 1
179n/a get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
180n/a try:
181n/a while finished < regrtest.ns.use_mp:
182n/a if use_timeout:
183n/a faulthandler.dump_traceback_later(test_timeout, exit=True)
184n/a
185n/a try:
186n/a item = output.get(timeout=get_timeout)
187n/a except queue.Empty:
188n/a running = get_running(workers)
189n/a if running and not regrtest.ns.pgo:
190n/a print('running: %s' % ', '.join(running))
191n/a continue
192n/a
193n/a test, stdout, stderr, result = item
194n/a if test is None:
195n/a finished += 1
196n/a continue
197n/a regrtest.accumulate_result(test, result)
198n/a
199n/a # Display progress
200n/a ok, test_time = result
201n/a text = format_test_result(test, ok)
202n/a if (ok not in (CHILD_ERROR, INTERRUPTED)
203n/a and test_time >= PROGRESS_MIN_TIME
204n/a and not regrtest.ns.pgo):
205n/a text += ' (%.0f sec)' % test_time
206n/a elif ok == CHILD_ERROR:
207n/a text = '%s (%s)' % (text, test_time)
208n/a running = get_running(workers)
209n/a if running and not regrtest.ns.pgo:
210n/a text += ' -- running: %s' % ', '.join(running)
211n/a regrtest.display_progress(test_index, text)
212n/a
213n/a # Copy stdout and stderr from the child process
214n/a if stdout:
215n/a print(stdout, flush=True)
216n/a if stderr and not regrtest.ns.pgo:
217n/a print(stderr, file=sys.stderr, flush=True)
218n/a
219n/a if result[0] == INTERRUPTED:
220n/a raise KeyboardInterrupt
221n/a test_index += 1
222n/a except KeyboardInterrupt:
223n/a regrtest.interrupted = True
224n/a pending.interrupted = True
225n/a print()
226n/a finally:
227n/a if use_timeout:
228n/a faulthandler.cancel_dump_traceback_later()
229n/a
230n/a # If tests are interrupted, wait until tests complete
231n/a wait_start = time.monotonic()
232n/a while True:
233n/a running = [worker.current_test for worker in workers]
234n/a running = list(filter(bool, running))
235n/a if not running:
236n/a break
237n/a
238n/a dt = time.monotonic() - wait_start
239n/a line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
240n/a if dt >= WAIT_PROGRESS:
241n/a line = "%s since %.0f sec" % (line, dt)
242n/a print(line)
243n/a for worker in workers:
244n/a worker.join(WAIT_PROGRESS)