ยปCore Development>Code coverage>Lib/test/test_faulthandler.py

Python code coverage for Lib/test/test_faulthandler.py

#countcontent
1n/afrom contextlib import contextmanager
2n/aimport datetime
3n/aimport faulthandler
4n/aimport os
5n/aimport signal
6n/aimport subprocess
7n/aimport sys
8n/afrom test import support
9n/afrom test.support import script_helper, is_android, requires_android_level
10n/aimport tempfile
11n/aimport unittest
12n/afrom textwrap import dedent
13n/a
14n/atry:
15n/a import threading
16n/a HAVE_THREADS = True
17n/aexcept ImportError:
18n/a HAVE_THREADS = False
19n/atry:
20n/a import _testcapi
21n/aexcept ImportError:
22n/a _testcapi = None
23n/a
24n/aTIMEOUT = 0.5
25n/aMS_WINDOWS = (os.name == 'nt')
26n/a
27n/adef expected_traceback(lineno1, lineno2, header, min_count=1):
28n/a regex = header
29n/a regex += ' File "<string>", line %s in func\n' % lineno1
30n/a regex += ' File "<string>", line %s in <module>' % lineno2
31n/a if 1 < min_count:
32n/a return '^' + (regex + '\n') * (min_count - 1) + regex
33n/a else:
34n/a return '^' + regex + '$'
35n/a
36n/a@contextmanager
37n/adef temporary_filename():
38n/a filename = tempfile.mktemp()
39n/a try:
40n/a yield filename
41n/a finally:
42n/a support.unlink(filename)
43n/a
44n/adef requires_raise(test):
45n/a return (test if not is_android else
46n/a requires_android_level(24, 'raise() is buggy')(test))
47n/a
48n/aclass FaultHandlerTests(unittest.TestCase):
49n/a def get_output(self, code, filename=None, fd=None):
50n/a """
51n/a Run the specified code in Python (in a new child process) and read the
52n/a output from the standard error or from a file (if filename is set).
53n/a Return the output lines as a list.
54n/a
55n/a Strip the reference count from the standard error for Python debug
56n/a build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
57n/a thread XXX".
58n/a """
59n/a code = dedent(code).strip()
60n/a pass_fds = []
61n/a if fd is not None:
62n/a pass_fds.append(fd)
63n/a with support.SuppressCrashReport():
64n/a process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
65n/a with process:
66n/a stdout, stderr = process.communicate()
67n/a exitcode = process.wait()
68n/a output = support.strip_python_stderr(stdout)
69n/a output = output.decode('ascii', 'backslashreplace')
70n/a if filename:
71n/a self.assertEqual(output, '')
72n/a with open(filename, "rb") as fp:
73n/a output = fp.read()
74n/a output = output.decode('ascii', 'backslashreplace')
75n/a elif fd is not None:
76n/a self.assertEqual(output, '')
77n/a os.lseek(fd, os.SEEK_SET, 0)
78n/a with open(fd, "rb", closefd=False) as fp:
79n/a output = fp.read()
80n/a output = output.decode('ascii', 'backslashreplace')
81n/a return output.splitlines(), exitcode
82n/a
83n/a def check_error(self, code, line_number, fatal_error, *,
84n/a filename=None, all_threads=True, other_regex=None,
85n/a fd=None, know_current_thread=True):
86n/a """
87n/a Check that the fault handler for fatal errors is enabled and check the
88n/a traceback from the child process output.
89n/a
90n/a Raise an error if the output doesn't match the expected format.
91n/a """
92n/a if all_threads:
93n/a if know_current_thread:
94n/a header = 'Current thread 0x[0-9a-f]+'
95n/a else:
96n/a header = 'Thread 0x[0-9a-f]+'
97n/a else:
98n/a header = 'Stack'
99n/a regex = r"""
100n/a ^{fatal_error}
101n/a
102n/a {header} \(most recent call first\):
103n/a File "<string>", line {lineno} in <module>
104n/a """
105n/a regex = dedent(regex.format(
106n/a lineno=line_number,
107n/a fatal_error=fatal_error,
108n/a header=header)).strip()
109n/a if other_regex:
110n/a regex += '|' + other_regex
111n/a output, exitcode = self.get_output(code, filename=filename, fd=fd)
112n/a output = '\n'.join(output)
113n/a self.assertRegex(output, regex)
114n/a self.assertNotEqual(exitcode, 0)
115n/a
116n/a def check_fatal_error(self, code, line_number, name_regex, **kw):
117n/a fatal_error = 'Fatal Python error: %s' % name_regex
118n/a self.check_error(code, line_number, fatal_error, **kw)
119n/a
120n/a def check_windows_exception(self, code, line_number, name_regex, **kw):
121n/a fatal_error = 'Windows fatal exception: %s' % name_regex
122n/a self.check_error(code, line_number, fatal_error, **kw)
123n/a
124n/a @unittest.skipIf(sys.platform.startswith('aix'),
125n/a "the first page of memory is a mapped read-only on AIX")
126n/a def test_read_null(self):
127n/a if not MS_WINDOWS:
128n/a self.check_fatal_error("""
129n/a import faulthandler
130n/a faulthandler.enable()
131n/a faulthandler._read_null()
132n/a """,
133n/a 3,
134n/a # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
135n/a '(?:Segmentation fault'
136n/a '|Bus error'
137n/a '|Illegal instruction)')
138n/a else:
139n/a self.check_windows_exception("""
140n/a import faulthandler
141n/a faulthandler.enable()
142n/a faulthandler._read_null()
143n/a """,
144n/a 3,
145n/a 'access violation')
146n/a
147n/a @requires_raise
148n/a def test_sigsegv(self):
149n/a self.check_fatal_error("""
150n/a import faulthandler
151n/a faulthandler.enable()
152n/a faulthandler._sigsegv()
153n/a """,
154n/a 3,
155n/a 'Segmentation fault')
156n/a
157n/a @unittest.skipIf(not HAVE_THREADS, 'need threads')
158n/a def test_fatal_error_c_thread(self):
159n/a self.check_fatal_error("""
160n/a import faulthandler
161n/a faulthandler.enable()
162n/a faulthandler._fatal_error_c_thread()
163n/a """,
164n/a 3,
165n/a 'in new thread',
166n/a know_current_thread=False)
167n/a
168n/a def test_sigabrt(self):
169n/a self.check_fatal_error("""
170n/a import faulthandler
171n/a faulthandler.enable()
172n/a faulthandler._sigabrt()
173n/a """,
174n/a 3,
175n/a 'Aborted')
176n/a
177n/a @unittest.skipIf(sys.platform == 'win32',
178n/a "SIGFPE cannot be caught on Windows")
179n/a def test_sigfpe(self):
180n/a self.check_fatal_error("""
181n/a import faulthandler
182n/a faulthandler.enable()
183n/a faulthandler._sigfpe()
184n/a """,
185n/a 3,
186n/a 'Floating point exception')
187n/a
188n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
189n/a @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
190n/a @requires_raise
191n/a def test_sigbus(self):
192n/a self.check_fatal_error("""
193n/a import _testcapi
194n/a import faulthandler
195n/a import signal
196n/a
197n/a faulthandler.enable()
198n/a _testcapi.raise_signal(signal.SIGBUS)
199n/a """,
200n/a 6,
201n/a 'Bus error')
202n/a
203n/a @unittest.skipIf(_testcapi is None, 'need _testcapi')
204n/a @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
205n/a @requires_raise
206n/a def test_sigill(self):
207n/a self.check_fatal_error("""
208n/a import _testcapi
209n/a import faulthandler
210n/a import signal
211n/a
212n/a faulthandler.enable()
213n/a _testcapi.raise_signal(signal.SIGILL)
214n/a """,
215n/a 6,
216n/a 'Illegal instruction')
217n/a
218n/a def test_fatal_error(self):
219n/a self.check_fatal_error("""
220n/a import faulthandler
221n/a faulthandler._fatal_error(b'xyz')
222n/a """,
223n/a 2,
224n/a 'xyz')
225n/a
226n/a def test_fatal_error_without_gil(self):
227n/a self.check_fatal_error("""
228n/a import faulthandler
229n/a faulthandler._fatal_error(b'xyz', True)
230n/a """,
231n/a 2,
232n/a 'xyz')
233n/a
234n/a @unittest.skipIf(sys.platform.startswith('openbsd') and HAVE_THREADS,
235n/a "Issue #12868: sigaltstack() doesn't work on "
236n/a "OpenBSD if Python is compiled with pthread")
237n/a @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
238n/a 'need faulthandler._stack_overflow()')
239n/a def test_stack_overflow(self):
240n/a self.check_fatal_error("""
241n/a import faulthandler
242n/a faulthandler.enable()
243n/a faulthandler._stack_overflow()
244n/a """,
245n/a 3,
246n/a '(?:Segmentation fault|Bus error)',
247n/a other_regex='unable to raise a stack overflow')
248n/a
249n/a @requires_raise
250n/a def test_gil_released(self):
251n/a self.check_fatal_error("""
252n/a import faulthandler
253n/a faulthandler.enable()
254n/a faulthandler._sigsegv(True)
255n/a """,
256n/a 3,
257n/a 'Segmentation fault')
258n/a
259n/a @requires_raise
260n/a def test_enable_file(self):
261n/a with temporary_filename() as filename:
262n/a self.check_fatal_error("""
263n/a import faulthandler
264n/a output = open({filename}, 'wb')
265n/a faulthandler.enable(output)
266n/a faulthandler._sigsegv()
267n/a """.format(filename=repr(filename)),
268n/a 4,
269n/a 'Segmentation fault',
270n/a filename=filename)
271n/a
272n/a @unittest.skipIf(sys.platform == "win32",
273n/a "subprocess doesn't support pass_fds on Windows")
274n/a @requires_raise
275n/a def test_enable_fd(self):
276n/a with tempfile.TemporaryFile('wb+') as fp:
277n/a fd = fp.fileno()
278n/a self.check_fatal_error("""
279n/a import faulthandler
280n/a import sys
281n/a faulthandler.enable(%s)
282n/a faulthandler._sigsegv()
283n/a """ % fd,
284n/a 4,
285n/a 'Segmentation fault',
286n/a fd=fd)
287n/a
288n/a @requires_raise
289n/a def test_enable_single_thread(self):
290n/a self.check_fatal_error("""
291n/a import faulthandler
292n/a faulthandler.enable(all_threads=False)
293n/a faulthandler._sigsegv()
294n/a """,
295n/a 3,
296n/a 'Segmentation fault',
297n/a all_threads=False)
298n/a
299n/a @requires_raise
300n/a def test_disable(self):
301n/a code = """
302n/a import faulthandler
303n/a faulthandler.enable()
304n/a faulthandler.disable()
305n/a faulthandler._sigsegv()
306n/a """
307n/a not_expected = 'Fatal Python error'
308n/a stderr, exitcode = self.get_output(code)
309n/a stderr = '\n'.join(stderr)
310n/a self.assertTrue(not_expected not in stderr,
311n/a "%r is present in %r" % (not_expected, stderr))
312n/a self.assertNotEqual(exitcode, 0)
313n/a
314n/a def test_is_enabled(self):
315n/a orig_stderr = sys.stderr
316n/a try:
317n/a # regrtest may replace sys.stderr by io.StringIO object, but
318n/a # faulthandler.enable() requires that sys.stderr has a fileno()
319n/a # method
320n/a sys.stderr = sys.__stderr__
321n/a
322n/a was_enabled = faulthandler.is_enabled()
323n/a try:
324n/a faulthandler.enable()
325n/a self.assertTrue(faulthandler.is_enabled())
326n/a faulthandler.disable()
327n/a self.assertFalse(faulthandler.is_enabled())
328n/a finally:
329n/a if was_enabled:
330n/a faulthandler.enable()
331n/a else:
332n/a faulthandler.disable()
333n/a finally:
334n/a sys.stderr = orig_stderr
335n/a
336n/a def test_disabled_by_default(self):
337n/a # By default, the module should be disabled
338n/a code = "import faulthandler; print(faulthandler.is_enabled())"
339n/a args = filter(None, (sys.executable,
340n/a "-E" if sys.flags.ignore_environment else "",
341n/a "-c", code))
342n/a env = os.environ.copy()
343n/a env.pop("PYTHONFAULTHANDLER", None)
344n/a # don't use assert_python_ok() because it always enables faulthandler
345n/a output = subprocess.check_output(args, env=env)
346n/a self.assertEqual(output.rstrip(), b"False")
347n/a
348n/a def test_sys_xoptions(self):
349n/a # Test python -X faulthandler
350n/a code = "import faulthandler; print(faulthandler.is_enabled())"
351n/a args = filter(None, (sys.executable,
352n/a "-E" if sys.flags.ignore_environment else "",
353n/a "-X", "faulthandler", "-c", code))
354n/a env = os.environ.copy()
355n/a env.pop("PYTHONFAULTHANDLER", None)
356n/a # don't use assert_python_ok() because it always enables faulthandler
357n/a output = subprocess.check_output(args, env=env)
358n/a self.assertEqual(output.rstrip(), b"True")
359n/a
360n/a def test_env_var(self):
361n/a # empty env var
362n/a code = "import faulthandler; print(faulthandler.is_enabled())"
363n/a args = (sys.executable, "-c", code)
364n/a env = os.environ.copy()
365n/a env['PYTHONFAULTHANDLER'] = ''
366n/a # don't use assert_python_ok() because it always enables faulthandler
367n/a output = subprocess.check_output(args, env=env)
368n/a self.assertEqual(output.rstrip(), b"False")
369n/a
370n/a # non-empty env var
371n/a env = os.environ.copy()
372n/a env['PYTHONFAULTHANDLER'] = '1'
373n/a output = subprocess.check_output(args, env=env)
374n/a self.assertEqual(output.rstrip(), b"True")
375n/a
376n/a def check_dump_traceback(self, *, filename=None, fd=None):
377n/a """
378n/a Explicitly call dump_traceback() function and check its output.
379n/a Raise an error if the output doesn't match the expected format.
380n/a """
381n/a code = """
382n/a import faulthandler
383n/a
384n/a filename = {filename!r}
385n/a fd = {fd}
386n/a
387n/a def funcB():
388n/a if filename:
389n/a with open(filename, "wb") as fp:
390n/a faulthandler.dump_traceback(fp, all_threads=False)
391n/a elif fd is not None:
392n/a faulthandler.dump_traceback(fd,
393n/a all_threads=False)
394n/a else:
395n/a faulthandler.dump_traceback(all_threads=False)
396n/a
397n/a def funcA():
398n/a funcB()
399n/a
400n/a funcA()
401n/a """
402n/a code = code.format(
403n/a filename=filename,
404n/a fd=fd,
405n/a )
406n/a if filename:
407n/a lineno = 9
408n/a elif fd is not None:
409n/a lineno = 12
410n/a else:
411n/a lineno = 14
412n/a expected = [
413n/a 'Stack (most recent call first):',
414n/a ' File "<string>", line %s in funcB' % lineno,
415n/a ' File "<string>", line 17 in funcA',
416n/a ' File "<string>", line 19 in <module>'
417n/a ]
418n/a trace, exitcode = self.get_output(code, filename, fd)
419n/a self.assertEqual(trace, expected)
420n/a self.assertEqual(exitcode, 0)
421n/a
422n/a def test_dump_traceback(self):
423n/a self.check_dump_traceback()
424n/a
425n/a def test_dump_traceback_file(self):
426n/a with temporary_filename() as filename:
427n/a self.check_dump_traceback(filename=filename)
428n/a
429n/a @unittest.skipIf(sys.platform == "win32",
430n/a "subprocess doesn't support pass_fds on Windows")
431n/a def test_dump_traceback_fd(self):
432n/a with tempfile.TemporaryFile('wb+') as fp:
433n/a self.check_dump_traceback(fd=fp.fileno())
434n/a
435n/a def test_truncate(self):
436n/a maxlen = 500
437n/a func_name = 'x' * (maxlen + 50)
438n/a truncated = 'x' * maxlen + '...'
439n/a code = """
440n/a import faulthandler
441n/a
442n/a def {func_name}():
443n/a faulthandler.dump_traceback(all_threads=False)
444n/a
445n/a {func_name}()
446n/a """
447n/a code = code.format(
448n/a func_name=func_name,
449n/a )
450n/a expected = [
451n/a 'Stack (most recent call first):',
452n/a ' File "<string>", line 4 in %s' % truncated,
453n/a ' File "<string>", line 6 in <module>'
454n/a ]
455n/a trace, exitcode = self.get_output(code)
456n/a self.assertEqual(trace, expected)
457n/a self.assertEqual(exitcode, 0)
458n/a
459n/a @unittest.skipIf(not HAVE_THREADS, 'need threads')
460n/a def check_dump_traceback_threads(self, filename):
461n/a """
462n/a Call explicitly dump_traceback(all_threads=True) and check the output.
463n/a Raise an error if the output doesn't match the expected format.
464n/a """
465n/a code = """
466n/a import faulthandler
467n/a from threading import Thread, Event
468n/a import time
469n/a
470n/a def dump():
471n/a if {filename}:
472n/a with open({filename}, "wb") as fp:
473n/a faulthandler.dump_traceback(fp, all_threads=True)
474n/a else:
475n/a faulthandler.dump_traceback(all_threads=True)
476n/a
477n/a class Waiter(Thread):
478n/a # avoid blocking if the main thread raises an exception.
479n/a daemon = True
480n/a
481n/a def __init__(self):
482n/a Thread.__init__(self)
483n/a self.running = Event()
484n/a self.stop = Event()
485n/a
486n/a def run(self):
487n/a self.running.set()
488n/a self.stop.wait()
489n/a
490n/a waiter = Waiter()
491n/a waiter.start()
492n/a waiter.running.wait()
493n/a dump()
494n/a waiter.stop.set()
495n/a waiter.join()
496n/a """
497n/a code = code.format(filename=repr(filename))
498n/a output, exitcode = self.get_output(code, filename)
499n/a output = '\n'.join(output)
500n/a if filename:
501n/a lineno = 8
502n/a else:
503n/a lineno = 10
504n/a regex = r"""
505n/a ^Thread 0x[0-9a-f]+ \(most recent call first\):
506n/a (?: File ".*threading.py", line [0-9]+ in [_a-z]+
507n/a ){{1,3}} File "<string>", line 23 in run
508n/a File ".*threading.py", line [0-9]+ in _bootstrap_inner
509n/a File ".*threading.py", line [0-9]+ in _bootstrap
510n/a
511n/a Current thread 0x[0-9a-f]+ \(most recent call first\):
512n/a File "<string>", line {lineno} in dump
513n/a File "<string>", line 28 in <module>$
514n/a """
515n/a regex = dedent(regex.format(lineno=lineno)).strip()
516n/a self.assertRegex(output, regex)
517n/a self.assertEqual(exitcode, 0)
518n/a
519n/a def test_dump_traceback_threads(self):
520n/a self.check_dump_traceback_threads(None)
521n/a
522n/a def test_dump_traceback_threads_file(self):
523n/a with temporary_filename() as filename:
524n/a self.check_dump_traceback_threads(filename)
525n/a
526n/a @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
527n/a 'need faulthandler.dump_traceback_later()')
528n/a def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
529n/a *, filename=None, fd=None):
530n/a """
531n/a Check how many times the traceback is written in timeout x 2.5 seconds,
532n/a or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
533n/a on repeat and cancel options.
534n/a
535n/a Raise an error if the output doesn't match the expect format.
536n/a """
537n/a timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
538n/a code = """
539n/a import faulthandler
540n/a import time
541n/a import sys
542n/a
543n/a timeout = {timeout}
544n/a repeat = {repeat}
545n/a cancel = {cancel}
546n/a loops = {loops}
547n/a filename = {filename!r}
548n/a fd = {fd}
549n/a
550n/a def func(timeout, repeat, cancel, file, loops):
551n/a for loop in range(loops):
552n/a faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
553n/a if cancel:
554n/a faulthandler.cancel_dump_traceback_later()
555n/a time.sleep(timeout * 5)
556n/a faulthandler.cancel_dump_traceback_later()
557n/a
558n/a if filename:
559n/a file = open(filename, "wb")
560n/a elif fd is not None:
561n/a file = sys.stderr.fileno()
562n/a else:
563n/a file = None
564n/a func(timeout, repeat, cancel, file, loops)
565n/a if filename:
566n/a file.close()
567n/a """
568n/a code = code.format(
569n/a timeout=TIMEOUT,
570n/a repeat=repeat,
571n/a cancel=cancel,
572n/a loops=loops,
573n/a filename=filename,
574n/a fd=fd,
575n/a )
576n/a trace, exitcode = self.get_output(code, filename)
577n/a trace = '\n'.join(trace)
578n/a
579n/a if not cancel:
580n/a count = loops
581n/a if repeat:
582n/a count *= 2
583n/a header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
584n/a regex = expected_traceback(17, 26, header, min_count=count)
585n/a self.assertRegex(trace, regex)
586n/a else:
587n/a self.assertEqual(trace, '')
588n/a self.assertEqual(exitcode, 0)
589n/a
590n/a def test_dump_traceback_later(self):
591n/a self.check_dump_traceback_later()
592n/a
593n/a def test_dump_traceback_later_repeat(self):
594n/a self.check_dump_traceback_later(repeat=True)
595n/a
596n/a def test_dump_traceback_later_cancel(self):
597n/a self.check_dump_traceback_later(cancel=True)
598n/a
599n/a def test_dump_traceback_later_file(self):
600n/a with temporary_filename() as filename:
601n/a self.check_dump_traceback_later(filename=filename)
602n/a
603n/a @unittest.skipIf(sys.platform == "win32",
604n/a "subprocess doesn't support pass_fds on Windows")
605n/a def test_dump_traceback_later_fd(self):
606n/a with tempfile.TemporaryFile('wb+') as fp:
607n/a self.check_dump_traceback_later(fd=fp.fileno())
608n/a
609n/a def test_dump_traceback_later_twice(self):
610n/a self.check_dump_traceback_later(loops=2)
611n/a
612n/a @unittest.skipIf(not hasattr(faulthandler, "register"),
613n/a "need faulthandler.register")
614n/a def check_register(self, filename=False, all_threads=False,
615n/a unregister=False, chain=False, fd=None):
616n/a """
617n/a Register a handler displaying the traceback on a user signal. Raise the
618n/a signal and check the written traceback.
619n/a
620n/a If chain is True, check that the previous signal handler is called.
621n/a
622n/a Raise an error if the output doesn't match the expected format.
623n/a """
624n/a signum = signal.SIGUSR1
625n/a code = """
626n/a import faulthandler
627n/a import os
628n/a import signal
629n/a import sys
630n/a
631n/a all_threads = {all_threads}
632n/a signum = {signum}
633n/a unregister = {unregister}
634n/a chain = {chain}
635n/a filename = {filename!r}
636n/a fd = {fd}
637n/a
638n/a def func(signum):
639n/a os.kill(os.getpid(), signum)
640n/a
641n/a def handler(signum, frame):
642n/a handler.called = True
643n/a handler.called = False
644n/a
645n/a if filename:
646n/a file = open(filename, "wb")
647n/a elif fd is not None:
648n/a file = sys.stderr.fileno()
649n/a else:
650n/a file = None
651n/a if chain:
652n/a signal.signal(signum, handler)
653n/a faulthandler.register(signum, file=file,
654n/a all_threads=all_threads, chain={chain})
655n/a if unregister:
656n/a faulthandler.unregister(signum)
657n/a func(signum)
658n/a if chain and not handler.called:
659n/a if file is not None:
660n/a output = file
661n/a else:
662n/a output = sys.stderr
663n/a print("Error: signal handler not called!", file=output)
664n/a exitcode = 1
665n/a else:
666n/a exitcode = 0
667n/a if filename:
668n/a file.close()
669n/a sys.exit(exitcode)
670n/a """
671n/a code = code.format(
672n/a all_threads=all_threads,
673n/a signum=signum,
674n/a unregister=unregister,
675n/a chain=chain,
676n/a filename=filename,
677n/a fd=fd,
678n/a )
679n/a trace, exitcode = self.get_output(code, filename)
680n/a trace = '\n'.join(trace)
681n/a if not unregister:
682n/a if all_threads:
683n/a regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
684n/a else:
685n/a regex = r'Stack \(most recent call first\):\n'
686n/a regex = expected_traceback(14, 32, regex)
687n/a self.assertRegex(trace, regex)
688n/a else:
689n/a self.assertEqual(trace, '')
690n/a if unregister:
691n/a self.assertNotEqual(exitcode, 0)
692n/a else:
693n/a self.assertEqual(exitcode, 0)
694n/a
695n/a def test_register(self):
696n/a self.check_register()
697n/a
698n/a def test_unregister(self):
699n/a self.check_register(unregister=True)
700n/a
701n/a def test_register_file(self):
702n/a with temporary_filename() as filename:
703n/a self.check_register(filename=filename)
704n/a
705n/a @unittest.skipIf(sys.platform == "win32",
706n/a "subprocess doesn't support pass_fds on Windows")
707n/a def test_register_fd(self):
708n/a with tempfile.TemporaryFile('wb+') as fp:
709n/a self.check_register(fd=fp.fileno())
710n/a
711n/a def test_register_threads(self):
712n/a self.check_register(all_threads=True)
713n/a
714n/a def test_register_chain(self):
715n/a self.check_register(chain=True)
716n/a
717n/a @contextmanager
718n/a def check_stderr_none(self):
719n/a stderr = sys.stderr
720n/a try:
721n/a sys.stderr = None
722n/a with self.assertRaises(RuntimeError) as cm:
723n/a yield
724n/a self.assertEqual(str(cm.exception), "sys.stderr is None")
725n/a finally:
726n/a sys.stderr = stderr
727n/a
728n/a def test_stderr_None(self):
729n/a # Issue #21497: provide a helpful error if sys.stderr is None,
730n/a # instead of just an attribute error: "None has no attribute fileno".
731n/a with self.check_stderr_none():
732n/a faulthandler.enable()
733n/a with self.check_stderr_none():
734n/a faulthandler.dump_traceback()
735n/a if hasattr(faulthandler, 'dump_traceback_later'):
736n/a with self.check_stderr_none():
737n/a faulthandler.dump_traceback_later(1e-3)
738n/a if hasattr(faulthandler, "register"):
739n/a with self.check_stderr_none():
740n/a faulthandler.register(signal.SIGUSR1)
741n/a
742n/a @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
743n/a def test_raise_exception(self):
744n/a for exc, name in (
745n/a ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
746n/a ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
747n/a ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
748n/a ):
749n/a self.check_windows_exception(f"""
750n/a import faulthandler
751n/a faulthandler.enable()
752n/a faulthandler._raise_exception(faulthandler._{exc})
753n/a """,
754n/a 3,
755n/a name)
756n/a
757n/a
758n/a
759n/aif __name__ == "__main__":
760n/a unittest.main()