ยปCore Development>Code coverage>Lib/test/test_dtrace.py

Python code coverage for Lib/test/test_dtrace.py

#countcontent
1n/aimport dis
2n/aimport os.path
3n/aimport re
4n/aimport subprocess
5n/aimport sys
6n/aimport types
7n/aimport unittest
8n/a
9n/afrom test.support import findfile, run_unittest
10n/a
11n/a
12n/adef abspath(filename):
13n/a return os.path.abspath(findfile(filename, subdir="dtracedata"))
14n/a
15n/a
16n/adef normalize_trace_output(output):
17n/a """Normalize DTrace output for comparison.
18n/a
19n/a DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
20n/a are concatenated. So if the operating system moves our thread around, the
21n/a straight result can be "non-causal". So we add timestamps to the probe
22n/a firing, sort by that field, then strip it from the output"""
23n/a
24n/a # When compiling with '--with-pydebug', strip '[# refs]' debug output.
25n/a output = re.sub(r"\[[0-9]+ refs\]", "", output)
26n/a try:
27n/a result = [
28n/a row.split("\t")
29n/a for row in output.splitlines()
30n/a if row and not row.startswith('#')
31n/a ]
32n/a result.sort(key=lambda row: int(row[0]))
33n/a result = [row[1] for row in result]
34n/a return "\n".join(result)
35n/a except (IndexError, ValueError):
36n/a raise AssertionError(
37n/a "tracer produced unparseable output:\n{}".format(output)
38n/a )
39n/a
40n/a
41n/aclass TraceBackend:
42n/a EXTENSION = None
43n/a COMMAND = None
44n/a COMMAND_ARGS = []
45n/a
46n/a def run_case(self, name, optimize_python=None):
47n/a actual_output = normalize_trace_output(self.trace_python(
48n/a script_file=abspath(name + self.EXTENSION),
49n/a python_file=abspath(name + ".py"),
50n/a optimize_python=optimize_python))
51n/a
52n/a with open(abspath(name + self.EXTENSION + ".expected")) as f:
53n/a expected_output = f.read().rstrip()
54n/a
55n/a return (expected_output, actual_output)
56n/a
57n/a def generate_trace_command(self, script_file, subcommand=None):
58n/a command = self.COMMAND + [script_file]
59n/a if subcommand:
60n/a command += ["-c", subcommand]
61n/a return command
62n/a
63n/a def trace(self, script_file, subcommand=None):
64n/a command = self.generate_trace_command(script_file, subcommand)
65n/a stdout, _ = subprocess.Popen(command,
66n/a stdout=subprocess.PIPE,
67n/a stderr=subprocess.STDOUT,
68n/a universal_newlines=True).communicate()
69n/a return stdout
70n/a
71n/a def trace_python(self, script_file, python_file, optimize_python=None):
72n/a python_flags = []
73n/a if optimize_python:
74n/a python_flags.extend(["-O"] * optimize_python)
75n/a subcommand = " ".join([sys.executable] + python_flags + [python_file])
76n/a return self.trace(script_file, subcommand)
77n/a
78n/a def assert_usable(self):
79n/a try:
80n/a output = self.trace(abspath("assert_usable" + self.EXTENSION))
81n/a output = output.strip()
82n/a except (FileNotFoundError, PermissionError) as fnfe:
83n/a output = str(fnfe)
84n/a if output != "probe: success":
85n/a raise unittest.SkipTest(
86n/a "{}(1) failed: {}".format(self.COMMAND[0], output)
87n/a )
88n/a
89n/a
90n/aclass DTraceBackend(TraceBackend):
91n/a EXTENSION = ".d"
92n/a COMMAND = ["dtrace", "-q", "-s"]
93n/a
94n/a
95n/aclass SystemTapBackend(TraceBackend):
96n/a EXTENSION = ".stp"
97n/a COMMAND = ["stap", "-g"]
98n/a
99n/a
100n/aclass TraceTests(unittest.TestCase):
101n/a # unittest.TestCase options
102n/a maxDiff = None
103n/a
104n/a # TraceTests options
105n/a backend = None
106n/a optimize_python = 0
107n/a
108n/a @classmethod
109n/a def setUpClass(self):
110n/a self.backend.assert_usable()
111n/a
112n/a def run_case(self, name):
113n/a actual_output, expected_output = self.backend.run_case(
114n/a name, optimize_python=self.optimize_python)
115n/a self.assertEqual(actual_output, expected_output)
116n/a
117n/a def test_function_entry_return(self):
118n/a self.run_case("call_stack")
119n/a
120n/a def test_verify_call_opcodes(self):
121n/a """Ensure our call stack test hits all function call opcodes"""
122n/a
123n/a opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
124n/a
125n/a with open(abspath("call_stack.py")) as f:
126n/a code_string = f.read()
127n/a
128n/a def get_function_instructions(funcname):
129n/a # Recompile with appropriate optimization setting
130n/a code = compile(source=code_string,
131n/a filename="<string>",
132n/a mode="exec",
133n/a optimize=self.optimize_python)
134n/a
135n/a for c in code.co_consts:
136n/a if isinstance(c, types.CodeType) and c.co_name == funcname:
137n/a return dis.get_instructions(c)
138n/a return []
139n/a
140n/a for instruction in get_function_instructions('start'):
141n/a opcodes.discard(instruction.opname)
142n/a
143n/a self.assertEqual(set(), opcodes)
144n/a
145n/a def test_gc(self):
146n/a self.run_case("gc")
147n/a
148n/a def test_line(self):
149n/a self.run_case("line")
150n/a
151n/a
152n/aclass DTraceNormalTests(TraceTests):
153n/a backend = DTraceBackend()
154n/a optimize_python = 0
155n/a
156n/a
157n/aclass DTraceOptimizedTests(TraceTests):
158n/a backend = DTraceBackend()
159n/a optimize_python = 2
160n/a
161n/a
162n/aclass SystemTapNormalTests(TraceTests):
163n/a backend = SystemTapBackend()
164n/a optimize_python = 0
165n/a
166n/a
167n/aclass SystemTapOptimizedTests(TraceTests):
168n/a backend = SystemTapBackend()
169n/a optimize_python = 2
170n/a
171n/a
172n/adef test_main():
173n/a run_unittest(DTraceNormalTests, DTraceOptimizedTests, SystemTapNormalTests,
174n/a SystemTapOptimizedTests)
175n/a
176n/a
177n/aif __name__ == '__main__':
178n/a test_main()