ยปCore Development>Code coverage>Lib/test/libregrtest/main.py

Python code coverage for Lib/test/libregrtest/main.py

#countcontent
1n/aimport datetime
2n/aimport faulthandler
3n/aimport locale
4n/aimport os
5n/aimport platform
6n/aimport random
7n/aimport re
8n/aimport sys
9n/aimport sysconfig
10n/aimport tempfile
11n/aimport textwrap
12n/aimport time
13n/afrom test.libregrtest.cmdline import _parse_args
14n/afrom test.libregrtest.runtest import (
15n/a findtests, runtest,
16n/a STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
17n/a INTERRUPTED, CHILD_ERROR,
18n/a PROGRESS_MIN_TIME, format_test_result)
19n/afrom test.libregrtest.setup import setup_tests
20n/afrom test import support
21n/atry:
22n/a import gc
23n/aexcept ImportError:
24n/a gc = None
25n/a
26n/a
27n/a# When tests are run from the Python build directory, it is best practice
28n/a# to keep the test files in a subfolder. This eases the cleanup of leftover
29n/a# files using the "make distclean" command.
30n/aif sysconfig.is_python_build():
31n/a TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
32n/aelse:
33n/a TEMPDIR = tempfile.gettempdir()
34n/aTEMPDIR = os.path.abspath(TEMPDIR)
35n/a
36n/a
37n/adef format_duration(seconds):
38n/a if seconds < 1.0:
39n/a return '%.0f ms' % (seconds * 1e3)
40n/a if seconds < 60.0:
41n/a return '%.0f sec' % seconds
42n/a
43n/a minutes, seconds = divmod(seconds, 60.0)
44n/a return '%.0f min %.0f sec' % (minutes, seconds)
45n/a
46n/a
47n/aclass Regrtest:
48n/a """Execute a test suite.
49n/a
50n/a This also parses command-line options and modifies its behavior
51n/a accordingly.
52n/a
53n/a tests -- a list of strings containing test names (optional)
54n/a testdir -- the directory in which to look for tests (optional)
55n/a
56n/a Users other than the Python test suite will certainly want to
57n/a specify testdir; if it's omitted, the directory containing the
58n/a Python test suite is searched for.
59n/a
60n/a If the tests argument is omitted, the tests listed on the
61n/a command-line will be used. If that's empty, too, then all *.py
62n/a files beginning with test_ will be used.
63n/a
64n/a The other default arguments (verbose, quiet, exclude,
65n/a single, randomize, findleaks, use_resources, trace, coverdir,
66n/a print_slow, and random_seed) allow programmers calling main()
67n/a directly to set the values that would normally be set by flags
68n/a on the command line.
69n/a """
70n/a def __init__(self):
71n/a # Namespace of command line options
72n/a self.ns = None
73n/a
74n/a # tests
75n/a self.tests = []
76n/a self.selected = []
77n/a
78n/a # test results
79n/a self.good = []
80n/a self.bad = []
81n/a self.skipped = []
82n/a self.resource_denieds = []
83n/a self.environment_changed = []
84n/a self.interrupted = False
85n/a
86n/a # used by --slow
87n/a self.test_times = []
88n/a
89n/a # used by --coverage, trace.Trace instance
90n/a self.tracer = None
91n/a
92n/a # used by --findleaks, store for gc.garbage
93n/a self.found_garbage = []
94n/a
95n/a # used to display the progress bar "[ 3/100]"
96n/a self.start_time = time.monotonic()
97n/a self.test_count = ''
98n/a self.test_count_width = 1
99n/a
100n/a # used by --single
101n/a self.next_single_test = None
102n/a self.next_single_filename = None
103n/a
104n/a def accumulate_result(self, test, result):
105n/a ok, test_time = result
106n/a if ok not in (CHILD_ERROR, INTERRUPTED):
107n/a self.test_times.append((test_time, test))
108n/a if ok == PASSED:
109n/a self.good.append(test)
110n/a elif ok in (FAILED, CHILD_ERROR):
111n/a self.bad.append(test)
112n/a elif ok == ENV_CHANGED:
113n/a self.environment_changed.append(test)
114n/a elif ok == SKIPPED:
115n/a self.skipped.append(test)
116n/a elif ok == RESOURCE_DENIED:
117n/a self.skipped.append(test)
118n/a self.resource_denieds.append(test)
119n/a
120n/a def display_progress(self, test_index, test):
121n/a if self.ns.quiet:
122n/a return
123n/a if self.bad and not self.ns.pgo:
124n/a fmt = "{time} [{test_index:{count_width}}{test_count}/{nbad}] {test_name}"
125n/a else:
126n/a fmt = "{time} [{test_index:{count_width}}{test_count}] {test_name}"
127n/a test_time = time.monotonic() - self.start_time
128n/a test_time = datetime.timedelta(seconds=int(test_time))
129n/a line = fmt.format(count_width=self.test_count_width,
130n/a test_index=test_index,
131n/a test_count=self.test_count,
132n/a nbad=len(self.bad),
133n/a test_name=test,
134n/a time=test_time)
135n/a print(line, flush=True)
136n/a
137n/a def parse_args(self, kwargs):
138n/a ns = _parse_args(sys.argv[1:], **kwargs)
139n/a
140n/a if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'):
141n/a print("Warning: The timeout option requires "
142n/a "faulthandler.dump_traceback_later", file=sys.stderr)
143n/a ns.timeout = None
144n/a
145n/a if ns.threshold is not None and gc is None:
146n/a print('No GC available, ignore --threshold.', file=sys.stderr)
147n/a ns.threshold = None
148n/a
149n/a if ns.findleaks:
150n/a if gc is not None:
151n/a # Uncomment the line below to report garbage that is not
152n/a # freeable by reference counting alone. By default only
153n/a # garbage that is not collectable by the GC is reported.
154n/a pass
155n/a #gc.set_debug(gc.DEBUG_SAVEALL)
156n/a else:
157n/a print('No GC available, disabling --findleaks',
158n/a file=sys.stderr)
159n/a ns.findleaks = False
160n/a
161n/a # Strip .py extensions.
162n/a removepy(ns.args)
163n/a
164n/a return ns
165n/a
166n/a def find_tests(self, tests):
167n/a self.tests = tests
168n/a
169n/a if self.ns.single:
170n/a self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
171n/a try:
172n/a with open(self.next_single_filename, 'r') as fp:
173n/a next_test = fp.read().strip()
174n/a self.tests = [next_test]
175n/a except OSError:
176n/a pass
177n/a
178n/a if self.ns.fromfile:
179n/a self.tests = []
180n/a # regex to match 'test_builtin' in line:
181n/a # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
182n/a regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
183n/a with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
184n/a for line in fp:
185n/a line = line.split('#', 1)[0]
186n/a line = line.strip()
187n/a match = regex.search(line)
188n/a if match is not None:
189n/a self.tests.append(match.group())
190n/a
191n/a removepy(self.tests)
192n/a
193n/a stdtests = STDTESTS[:]
194n/a nottests = NOTTESTS.copy()
195n/a if self.ns.exclude:
196n/a for arg in self.ns.args:
197n/a if arg in stdtests:
198n/a stdtests.remove(arg)
199n/a nottests.add(arg)
200n/a self.ns.args = []
201n/a
202n/a # if testdir is set, then we are not running the python tests suite, so
203n/a # don't add default tests to be executed or skipped (pass empty values)
204n/a if self.ns.testdir:
205n/a alltests = findtests(self.ns.testdir, list(), set())
206n/a else:
207n/a alltests = findtests(self.ns.testdir, stdtests, nottests)
208n/a
209n/a if not self.ns.fromfile:
210n/a self.selected = self.tests or self.ns.args or alltests
211n/a else:
212n/a self.selected = self.tests
213n/a if self.ns.single:
214n/a self.selected = self.selected[:1]
215n/a try:
216n/a pos = alltests.index(self.selected[0])
217n/a self.next_single_test = alltests[pos + 1]
218n/a except IndexError:
219n/a pass
220n/a
221n/a # Remove all the selected tests that precede start if it's set.
222n/a if self.ns.start:
223n/a try:
224n/a del self.selected[:self.selected.index(self.ns.start)]
225n/a except ValueError:
226n/a print("Couldn't find starting test (%s), using all tests"
227n/a % self.ns.start, file=sys.stderr)
228n/a
229n/a if self.ns.randomize:
230n/a if self.ns.random_seed is None:
231n/a self.ns.random_seed = random.randrange(10000000)
232n/a random.seed(self.ns.random_seed)
233n/a random.shuffle(self.selected)
234n/a
235n/a def list_tests(self):
236n/a for name in self.selected:
237n/a print(name)
238n/a
239n/a def rerun_failed_tests(self):
240n/a self.ns.verbose = True
241n/a self.ns.failfast = False
242n/a self.ns.verbose3 = False
243n/a self.ns.match_tests = None
244n/a
245n/a print("Re-running failed tests in verbose mode")
246n/a for test in self.bad[:]:
247n/a print("Re-running test %r in verbose mode" % test, flush=True)
248n/a try:
249n/a self.ns.verbose = True
250n/a ok = runtest(self.ns, test)
251n/a except KeyboardInterrupt:
252n/a self.interrupted = True
253n/a # print a newline separate from the ^C
254n/a print()
255n/a break
256n/a else:
257n/a if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
258n/a self.bad.remove(test)
259n/a else:
260n/a if self.bad:
261n/a print(count(len(self.bad), 'test'), "failed again:")
262n/a printlist(self.bad)
263n/a
264n/a def display_result(self):
265n/a if self.interrupted:
266n/a # print a newline after ^C
267n/a print()
268n/a print("Test suite interrupted by signal SIGINT.")
269n/a executed = set(self.good) | set(self.bad) | set(self.skipped)
270n/a omitted = set(self.selected) - executed
271n/a print(count(len(omitted), "test"), "omitted:")
272n/a printlist(omitted)
273n/a
274n/a # If running the test suite for PGO then no one cares about
275n/a # results.
276n/a if self.ns.pgo:
277n/a return
278n/a
279n/a if self.good and not self.ns.quiet:
280n/a if (not self.bad
281n/a and not self.skipped
282n/a and not self.interrupted
283n/a and len(self.good) > 1):
284n/a print("All", end=' ')
285n/a print(count(len(self.good), "test"), "OK.")
286n/a
287n/a if self.ns.print_slow:
288n/a self.test_times.sort(reverse=True)
289n/a print()
290n/a print("10 slowest tests:")
291n/a for time, test in self.test_times[:10]:
292n/a print("- %s: %s" % (test, format_duration(time)))
293n/a
294n/a if self.bad:
295n/a print()
296n/a print(count(len(self.bad), "test"), "failed:")
297n/a printlist(self.bad)
298n/a
299n/a if self.environment_changed:
300n/a print()
301n/a print("{} altered the execution environment:".format(
302n/a count(len(self.environment_changed), "test")))
303n/a printlist(self.environment_changed)
304n/a
305n/a if self.skipped and not self.ns.quiet:
306n/a print()
307n/a print(count(len(self.skipped), "test"), "skipped:")
308n/a printlist(self.skipped)
309n/a
310n/a def run_tests_sequential(self):
311n/a if self.ns.trace:
312n/a import trace
313n/a self.tracer = trace.Trace(trace=False, count=True)
314n/a
315n/a save_modules = sys.modules.keys()
316n/a
317n/a print("Run tests sequentially")
318n/a
319n/a previous_test = None
320n/a for test_index, test in enumerate(self.tests, 1):
321n/a start_time = time.monotonic()
322n/a
323n/a text = test
324n/a if previous_test:
325n/a text = '%s -- %s' % (text, previous_test)
326n/a self.display_progress(test_index, text)
327n/a
328n/a if self.tracer:
329n/a # If we're tracing code coverage, then we don't exit with status
330n/a # if on a false return value from main.
331n/a cmd = ('result = runtest(self.ns, test); '
332n/a 'self.accumulate_result(test, result)')
333n/a ns = dict(locals())
334n/a self.tracer.runctx(cmd, globals=globals(), locals=ns)
335n/a result = ns['result']
336n/a else:
337n/a try:
338n/a result = runtest(self.ns, test)
339n/a except KeyboardInterrupt:
340n/a self.interrupted = True
341n/a self.accumulate_result(test, (INTERRUPTED, None))
342n/a break
343n/a else:
344n/a self.accumulate_result(test, result)
345n/a
346n/a previous_test = format_test_result(test, result[0])
347n/a test_time = time.monotonic() - start_time
348n/a if test_time >= PROGRESS_MIN_TIME:
349n/a previous_test = "%s in %s" % (previous_test, format_duration(test_time))
350n/a elif result[0] == PASSED:
351n/a # be quiet: say nothing if the test passed shortly
352n/a previous_test = None
353n/a
354n/a if self.ns.findleaks:
355n/a gc.collect()
356n/a if gc.garbage:
357n/a print("Warning: test created", len(gc.garbage), end=' ')
358n/a print("uncollectable object(s).")
359n/a # move the uncollectable objects somewhere so we don't see
360n/a # them again
361n/a self.found_garbage.extend(gc.garbage)
362n/a del gc.garbage[:]
363n/a
364n/a # Unload the newly imported modules (best effort finalization)
365n/a for module in sys.modules.keys():
366n/a if module not in save_modules and module.startswith("test."):
367n/a support.unload(module)
368n/a
369n/a if previous_test:
370n/a print(previous_test)
371n/a
372n/a def _test_forever(self, tests):
373n/a while True:
374n/a for test in tests:
375n/a yield test
376n/a if self.bad:
377n/a return
378n/a
379n/a def run_tests(self):
380n/a # For a partial run, we do not need to clutter the output.
381n/a if (self.ns.verbose
382n/a or self.ns.header
383n/a or not (self.ns.pgo or self.ns.quiet or self.ns.single
384n/a or self.tests or self.ns.args)):
385n/a # Print basic platform information
386n/a print("==", platform.python_implementation(), *sys.version.split())
387n/a print("== ", platform.platform(aliased=True),
388n/a "%s-endian" % sys.byteorder)
389n/a print("== ", "hash algorithm:", sys.hash_info.algorithm,
390n/a "64bit" if sys.maxsize > 2**32 else "32bit")
391n/a print("== cwd:", os.getcwd())
392n/a print("== encodings: locale=%s, FS=%s"
393n/a % (locale.getpreferredencoding(False),
394n/a sys.getfilesystemencoding()))
395n/a print("Testing with flags:", sys.flags)
396n/a
397n/a if self.ns.randomize:
398n/a print("Using random seed", self.ns.random_seed)
399n/a
400n/a if self.ns.forever:
401n/a self.tests = self._test_forever(list(self.selected))
402n/a self.test_count = ''
403n/a self.test_count_width = 3
404n/a else:
405n/a self.tests = iter(self.selected)
406n/a self.test_count = '/{}'.format(len(self.selected))
407n/a self.test_count_width = len(self.test_count) - 1
408n/a
409n/a if self.ns.use_mp:
410n/a from test.libregrtest.runtest_mp import run_tests_multiprocess
411n/a run_tests_multiprocess(self)
412n/a else:
413n/a self.run_tests_sequential()
414n/a
415n/a def finalize(self):
416n/a if self.next_single_filename:
417n/a if self.next_single_test:
418n/a with open(self.next_single_filename, 'w') as fp:
419n/a fp.write(self.next_single_test + '\n')
420n/a else:
421n/a os.unlink(self.next_single_filename)
422n/a
423n/a if self.tracer:
424n/a r = self.tracer.results()
425n/a r.write_results(show_missing=True, summary=True,
426n/a coverdir=self.ns.coverdir)
427n/a
428n/a print()
429n/a duration = time.monotonic() - self.start_time
430n/a print("Total duration: %s" % format_duration(duration))
431n/a
432n/a if self.bad:
433n/a result = "FAILURE"
434n/a elif self.interrupted:
435n/a result = "INTERRUPTED"
436n/a else:
437n/a result = "SUCCESS"
438n/a print("Tests result: %s" % result)
439n/a
440n/a if self.ns.runleaks:
441n/a os.system("leaks %d" % os.getpid())
442n/a
443n/a def main(self, tests=None, **kwargs):
444n/a global TEMPDIR
445n/a
446n/a if sysconfig.is_python_build():
447n/a try:
448n/a os.mkdir(TEMPDIR)
449n/a except FileExistsError:
450n/a pass
451n/a
452n/a # Define a writable temp dir that will be used as cwd while running
453n/a # the tests. The name of the dir includes the pid to allow parallel
454n/a # testing (see the -j option).
455n/a test_cwd = 'test_python_{}'.format(os.getpid())
456n/a test_cwd = os.path.join(TEMPDIR, test_cwd)
457n/a
458n/a # Run the tests in a context manager that temporarily changes the CWD to a
459n/a # temporary and writable directory. If it's not possible to create or
460n/a # change the CWD, the original CWD will be used. The original CWD is
461n/a # available from support.SAVEDCWD.
462n/a with support.temp_cwd(test_cwd, quiet=True):
463n/a self._main(tests, kwargs)
464n/a
465n/a def _main(self, tests, kwargs):
466n/a self.ns = self.parse_args(kwargs)
467n/a
468n/a if self.ns.slaveargs is not None:
469n/a from test.libregrtest.runtest_mp import run_tests_slave
470n/a run_tests_slave(self.ns.slaveargs)
471n/a
472n/a if self.ns.wait:
473n/a input("Press any key to continue...")
474n/a
475n/a support.PGO = self.ns.pgo
476n/a
477n/a setup_tests(self.ns)
478n/a
479n/a self.find_tests(tests)
480n/a
481n/a if self.ns.list_tests:
482n/a self.list_tests()
483n/a sys.exit(0)
484n/a
485n/a self.run_tests()
486n/a self.display_result()
487n/a
488n/a if self.ns.verbose2 and self.bad:
489n/a self.rerun_failed_tests()
490n/a
491n/a self.finalize()
492n/a sys.exit(len(self.bad) > 0 or self.interrupted)
493n/a
494n/a
495n/adef removepy(names):
496n/a if not names:
497n/a return
498n/a for idx, name in enumerate(names):
499n/a basename, ext = os.path.splitext(name)
500n/a if ext == '.py':
501n/a names[idx] = basename
502n/a
503n/a
504n/adef count(n, word):
505n/a if n == 1:
506n/a return "%d %s" % (n, word)
507n/a else:
508n/a return "%d %ss" % (n, word)
509n/a
510n/a
511n/adef printlist(x, width=70, indent=4):
512n/a """Print the elements of iterable x to stdout.
513n/a
514n/a Optional arg width (default 70) is the maximum line length.
515n/a Optional arg indent (default 4) is the number of blanks with which to
516n/a begin each line.
517n/a """
518n/a
519n/a blanks = ' ' * indent
520n/a # Print the sorted list: 'x' may be a '--random' list or a set()
521n/a print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
522n/a initial_indent=blanks, subsequent_indent=blanks))
523n/a
524n/a
525n/adef main(tests=None, **kwargs):
526n/a """Run the Python suite."""
527n/a Regrtest().main(tests=tests, **kwargs)