ยปCore Development>Code coverage>Lib/test/test_capi.py

Python code coverage for Lib/test/test_capi.py

#countcontent
1n/a# Run the _testcapi module tests (tests for the Python/C API): by defn,
2n/a# these are all functions _testcapi exports whose name begins with 'test_'.
3n/a
4n/aimport os
5n/aimport pickle
6n/aimport random
7n/aimport re
8n/aimport subprocess
9n/aimport sys
10n/aimport sysconfig
11n/aimport textwrap
12n/aimport time
13n/aimport unittest
14n/afrom test import support
15n/afrom test.support import MISSING_C_DOCSTRINGS
16n/afrom test.support.script_helper import assert_python_failure
17n/atry:
18n/a import _posixsubprocess
19n/aexcept ImportError:
20n/a _posixsubprocess = None
21n/atry:
22n/a import threading
23n/aexcept ImportError:
24n/a threading = None
25n/a# Skip this test if the _testcapi module isn't available.
26n/a_testcapi = support.import_module('_testcapi')
27n/a
28n/a# Were we compiled --with-pydebug or with #define Py_DEBUG?
29n/aPy_DEBUG = hasattr(sys, 'gettotalrefcount')
30n/a
31n/a
32n/adef testfunction(self):
33n/a """some doc"""
34n/a return self
35n/a
36n/aclass InstanceMethod:
37n/a id = _testcapi.instancemethod(id)
38n/a testfunction = _testcapi.instancemethod(testfunction)
39n/a
40n/aclass CAPITest(unittest.TestCase):
41n/a
42n/a def test_instancemethod(self):
43n/a inst = InstanceMethod()
44n/a self.assertEqual(id(inst), inst.id())
45n/a self.assertTrue(inst.testfunction() is inst)
46n/a self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
47n/a self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
48n/a
49n/a InstanceMethod.testfunction.attribute = "test"
50n/a self.assertEqual(testfunction.attribute, "test")
51n/a self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
52n/a
53n/a @unittest.skipUnless(threading, 'Threading required for this test.')
54n/a def test_no_FatalError_infinite_loop(self):
55n/a with support.SuppressCrashReport():
56n/a p = subprocess.Popen([sys.executable, "-c",
57n/a 'import _testcapi;'
58n/a '_testcapi.crash_no_current_thread()'],
59n/a stdout=subprocess.PIPE,
60n/a stderr=subprocess.PIPE)
61n/a (out, err) = p.communicate()
62n/a self.assertEqual(out, b'')
63n/a # This used to cause an infinite loop.
64n/a self.assertTrue(err.rstrip().startswith(
65n/a b'Fatal Python error:'
66n/a b' PyThreadState_Get: no current thread'))
67n/a
68n/a def test_memoryview_from_NULL_pointer(self):
69n/a self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
70n/a
71n/a def test_exc_info(self):
72n/a raised_exception = ValueError("5")
73n/a new_exc = TypeError("TEST")
74n/a try:
75n/a raise raised_exception
76n/a except ValueError as e:
77n/a tb = e.__traceback__
78n/a orig_sys_exc_info = sys.exc_info()
79n/a orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
80n/a new_sys_exc_info = sys.exc_info()
81n/a new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
82n/a reset_sys_exc_info = sys.exc_info()
83n/a
84n/a self.assertEqual(orig_exc_info[1], e)
85n/a
86n/a self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
87n/a self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
88n/a self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
89n/a self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
90n/a self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
91n/a else:
92n/a self.assertTrue(False)
93n/a
94n/a @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
95n/a def test_seq_bytes_to_charp_array(self):
96n/a # Issue #15732: crash in _PySequence_BytesToCharpArray()
97n/a class Z(object):
98n/a def __len__(self):
99n/a return 1
100n/a self.assertRaises(TypeError, _posixsubprocess.fork_exec,
101n/a 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
102n/a # Issue #15736: overflow in _PySequence_BytesToCharpArray()
103n/a class Z(object):
104n/a def __len__(self):
105n/a return sys.maxsize
106n/a def __getitem__(self, i):
107n/a return b'x'
108n/a self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
109n/a 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
110n/a
111n/a @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
112n/a def test_subprocess_fork_exec(self):
113n/a class Z(object):
114n/a def __len__(self):
115n/a return 1
116n/a
117n/a # Issue #15738: crash in subprocess_fork_exec()
118n/a self.assertRaises(TypeError, _posixsubprocess.fork_exec,
119n/a Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17)
120n/a
121n/a @unittest.skipIf(MISSING_C_DOCSTRINGS,
122n/a "Signature information for builtins requires docstrings")
123n/a def test_docstring_signature_parsing(self):
124n/a
125n/a self.assertEqual(_testcapi.no_docstring.__doc__, None)
126n/a self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
127n/a
128n/a self.assertEqual(_testcapi.docstring_empty.__doc__, None)
129n/a self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
130n/a
131n/a self.assertEqual(_testcapi.docstring_no_signature.__doc__,
132n/a "This docstring has no signature.")
133n/a self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
134n/a
135n/a self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
136n/a "docstring_with_invalid_signature($module, /, boo)\n"
137n/a "\n"
138n/a "This docstring has an invalid signature."
139n/a )
140n/a self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
141n/a
142n/a self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
143n/a "docstring_with_invalid_signature2($module, /, boo)\n"
144n/a "\n"
145n/a "--\n"
146n/a "\n"
147n/a "This docstring also has an invalid signature."
148n/a )
149n/a self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
150n/a
151n/a self.assertEqual(_testcapi.docstring_with_signature.__doc__,
152n/a "This docstring has a valid signature.")
153n/a self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
154n/a
155n/a self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
156n/a self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
157n/a "($module, /, sig)")
158n/a
159n/a self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
160n/a "\nThis docstring has a valid signature and some extra newlines.")
161n/a self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
162n/a "($module, /, parameter)")
163n/a
164n/a def test_c_type_with_matrix_multiplication(self):
165n/a M = _testcapi.matmulType
166n/a m1 = M()
167n/a m2 = M()
168n/a self.assertEqual(m1 @ m2, ("matmul", m1, m2))
169n/a self.assertEqual(m1 @ 42, ("matmul", m1, 42))
170n/a self.assertEqual(42 @ m1, ("matmul", 42, m1))
171n/a o = m1
172n/a o @= m2
173n/a self.assertEqual(o, ("imatmul", m1, m2))
174n/a o = m1
175n/a o @= 42
176n/a self.assertEqual(o, ("imatmul", m1, 42))
177n/a o = 42
178n/a o @= m1
179n/a self.assertEqual(o, ("matmul", 42, m1))
180n/a
181n/a def test_return_null_without_error(self):
182n/a # Issue #23571: A function must not return NULL without setting an
183n/a # error
184n/a if Py_DEBUG:
185n/a code = textwrap.dedent("""
186n/a import _testcapi
187n/a from test import support
188n/a
189n/a with support.SuppressCrashReport():
190n/a _testcapi.return_null_without_error()
191n/a """)
192n/a rc, out, err = assert_python_failure('-c', code)
193n/a self.assertRegex(err.replace(b'\r', b''),
194n/a br'Fatal Python error: a function returned NULL '
195n/a br'without setting an error\n'
196n/a br'SystemError: <built-in function '
197n/a br'return_null_without_error> returned NULL '
198n/a br'without setting an error\n'
199n/a br'\n'
200n/a br'Current thread.*:\n'
201n/a br' File .*", line 6 in <module>')
202n/a else:
203n/a with self.assertRaises(SystemError) as cm:
204n/a _testcapi.return_null_without_error()
205n/a self.assertRegex(str(cm.exception),
206n/a 'return_null_without_error.* '
207n/a 'returned NULL without setting an error')
208n/a
209n/a def test_return_result_with_error(self):
210n/a # Issue #23571: A function must not return a result with an error set
211n/a if Py_DEBUG:
212n/a code = textwrap.dedent("""
213n/a import _testcapi
214n/a from test import support
215n/a
216n/a with support.SuppressCrashReport():
217n/a _testcapi.return_result_with_error()
218n/a """)
219n/a rc, out, err = assert_python_failure('-c', code)
220n/a self.assertRegex(err.replace(b'\r', b''),
221n/a br'Fatal Python error: a function returned a '
222n/a br'result with an error set\n'
223n/a br'ValueError\n'
224n/a br'\n'
225n/a br'The above exception was the direct cause '
226n/a br'of the following exception:\n'
227n/a br'\n'
228n/a br'SystemError: <built-in '
229n/a br'function return_result_with_error> '
230n/a br'returned a result with an error set\n'
231n/a br'\n'
232n/a br'Current thread.*:\n'
233n/a br' File .*, line 6 in <module>')
234n/a else:
235n/a with self.assertRaises(SystemError) as cm:
236n/a _testcapi.return_result_with_error()
237n/a self.assertRegex(str(cm.exception),
238n/a 'return_result_with_error.* '
239n/a 'returned a result with an error set')
240n/a
241n/a def test_buildvalue_N(self):
242n/a _testcapi.test_buildvalue_N()
243n/a
244n/a
245n/a@unittest.skipUnless(threading, 'Threading required for this test.')
246n/aclass TestPendingCalls(unittest.TestCase):
247n/a
248n/a def pendingcalls_submit(self, l, n):
249n/a def callback():
250n/a #this function can be interrupted by thread switching so let's
251n/a #use an atomic operation
252n/a l.append(None)
253n/a
254n/a for i in range(n):
255n/a time.sleep(random.random()*0.02) #0.01 secs on average
256n/a #try submitting callback until successful.
257n/a #rely on regular interrupt to flush queue if we are
258n/a #unsuccessful.
259n/a while True:
260n/a if _testcapi._pending_threadfunc(callback):
261n/a break;
262n/a
263n/a def pendingcalls_wait(self, l, n, context = None):
264n/a #now, stick around until l[0] has grown to 10
265n/a count = 0;
266n/a while len(l) != n:
267n/a #this busy loop is where we expect to be interrupted to
268n/a #run our callbacks. Note that callbacks are only run on the
269n/a #main thread
270n/a if False and support.verbose:
271n/a print("(%i)"%(len(l),),)
272n/a for i in range(1000):
273n/a a = i*i
274n/a if context and not context.event.is_set():
275n/a continue
276n/a count += 1
277n/a self.assertTrue(count < 10000,
278n/a "timeout waiting for %i callbacks, got %i"%(n, len(l)))
279n/a if False and support.verbose:
280n/a print("(%i)"%(len(l),))
281n/a
282n/a def test_pendingcalls_threaded(self):
283n/a
284n/a #do every callback on a separate thread
285n/a n = 32 #total callbacks
286n/a threads = []
287n/a class foo(object):pass
288n/a context = foo()
289n/a context.l = []
290n/a context.n = 2 #submits per thread
291n/a context.nThreads = n // context.n
292n/a context.nFinished = 0
293n/a context.lock = threading.Lock()
294n/a context.event = threading.Event()
295n/a
296n/a threads = [threading.Thread(target=self.pendingcalls_thread,
297n/a args=(context,))
298n/a for i in range(context.nThreads)]
299n/a with support.start_threads(threads):
300n/a self.pendingcalls_wait(context.l, n, context)
301n/a
302n/a def pendingcalls_thread(self, context):
303n/a try:
304n/a self.pendingcalls_submit(context.l, context.n)
305n/a finally:
306n/a with context.lock:
307n/a context.nFinished += 1
308n/a nFinished = context.nFinished
309n/a if False and support.verbose:
310n/a print("finished threads: ", nFinished)
311n/a if nFinished == context.nThreads:
312n/a context.event.set()
313n/a
314n/a def test_pendingcalls_non_threaded(self):
315n/a #again, just using the main thread, likely they will all be dispatched at
316n/a #once. It is ok to ask for too many, because we loop until we find a slot.
317n/a #the loop can be interrupted to dispatch.
318n/a #there are only 32 dispatch slots, so we go for twice that!
319n/a l = []
320n/a n = 64
321n/a self.pendingcalls_submit(l, n)
322n/a self.pendingcalls_wait(l, n)
323n/a
324n/a
325n/aclass SubinterpreterTest(unittest.TestCase):
326n/a
327n/a def test_subinterps(self):
328n/a import builtins
329n/a r, w = os.pipe()
330n/a code = """if 1:
331n/a import sys, builtins, pickle
332n/a with open({:d}, "wb") as f:
333n/a pickle.dump(id(sys.modules), f)
334n/a pickle.dump(id(builtins), f)
335n/a """.format(w)
336n/a with open(r, "rb") as f:
337n/a ret = support.run_in_subinterp(code)
338n/a self.assertEqual(ret, 0)
339n/a self.assertNotEqual(pickle.load(f), id(sys.modules))
340n/a self.assertNotEqual(pickle.load(f), id(builtins))
341n/a
342n/a
343n/a# Bug #6012
344n/aclass Test6012(unittest.TestCase):
345n/a def test(self):
346n/a self.assertEqual(_testcapi.argparsing("Hello", "World"), 1)
347n/a
348n/a
349n/aclass EmbeddingTests(unittest.TestCase):
350n/a def setUp(self):
351n/a here = os.path.abspath(__file__)
352n/a basepath = os.path.dirname(os.path.dirname(os.path.dirname(here)))
353n/a exename = "_testembed"
354n/a if sys.platform.startswith("win"):
355n/a ext = ("_d" if "_d" in sys.executable else "") + ".exe"
356n/a exename += ext
357n/a exepath = os.path.dirname(sys.executable)
358n/a else:
359n/a exepath = os.path.join(basepath, "Programs")
360n/a self.test_exe = exe = os.path.join(exepath, exename)
361n/a if not os.path.exists(exe):
362n/a self.skipTest("%r doesn't exist" % exe)
363n/a # This is needed otherwise we get a fatal error:
364n/a # "Py_Initialize: Unable to get the locale encoding
365n/a # LookupError: no codec search functions registered: can't find encoding"
366n/a self.oldcwd = os.getcwd()
367n/a os.chdir(basepath)
368n/a
369n/a def tearDown(self):
370n/a os.chdir(self.oldcwd)
371n/a
372n/a def run_embedded_interpreter(self, *args):
373n/a """Runs a test in the embedded interpreter"""
374n/a cmd = [self.test_exe]
375n/a cmd.extend(args)
376n/a p = subprocess.Popen(cmd,
377n/a stdout=subprocess.PIPE,
378n/a stderr=subprocess.PIPE,
379n/a universal_newlines=True)
380n/a (out, err) = p.communicate()
381n/a self.assertEqual(p.returncode, 0,
382n/a "bad returncode %d, stderr is %r" %
383n/a (p.returncode, err))
384n/a return out, err
385n/a
386n/a def test_subinterps(self):
387n/a # This is just a "don't crash" test
388n/a out, err = self.run_embedded_interpreter("repeated_init_and_subinterpreters")
389n/a if support.verbose:
390n/a print()
391n/a print(out)
392n/a print(err)
393n/a
394n/a @staticmethod
395n/a def _get_default_pipe_encoding():
396n/a rp, wp = os.pipe()
397n/a try:
398n/a with os.fdopen(wp, 'w') as w:
399n/a default_pipe_encoding = w.encoding
400n/a finally:
401n/a os.close(rp)
402n/a return default_pipe_encoding
403n/a
404n/a def test_forced_io_encoding(self):
405n/a # Checks forced configuration of embedded interpreter IO streams
406n/a out, err = self.run_embedded_interpreter("forced_io_encoding")
407n/a if support.verbose:
408n/a print()
409n/a print(out)
410n/a print(err)
411n/a expected_errors = sys.__stdout__.errors
412n/a expected_stdin_encoding = sys.__stdin__.encoding
413n/a expected_pipe_encoding = self._get_default_pipe_encoding()
414n/a expected_output = '\n'.join([
415n/a "--- Use defaults ---",
416n/a "Expected encoding: default",
417n/a "Expected errors: default",
418n/a "stdin: {in_encoding}:{errors}",
419n/a "stdout: {out_encoding}:{errors}",
420n/a "stderr: {out_encoding}:backslashreplace",
421n/a "--- Set errors only ---",
422n/a "Expected encoding: default",
423n/a "Expected errors: ignore",
424n/a "stdin: {in_encoding}:ignore",
425n/a "stdout: {out_encoding}:ignore",
426n/a "stderr: {out_encoding}:backslashreplace",
427n/a "--- Set encoding only ---",
428n/a "Expected encoding: latin-1",
429n/a "Expected errors: default",
430n/a "stdin: latin-1:{errors}",
431n/a "stdout: latin-1:{errors}",
432n/a "stderr: latin-1:backslashreplace",
433n/a "--- Set encoding and errors ---",
434n/a "Expected encoding: latin-1",
435n/a "Expected errors: replace",
436n/a "stdin: latin-1:replace",
437n/a "stdout: latin-1:replace",
438n/a "stderr: latin-1:backslashreplace"])
439n/a expected_output = expected_output.format(
440n/a in_encoding=expected_stdin_encoding,
441n/a out_encoding=expected_pipe_encoding,
442n/a errors=expected_errors)
443n/a # This is useful if we ever trip over odd platform behaviour
444n/a self.maxDiff = None
445n/a self.assertEqual(out.strip(), expected_output)
446n/a
447n/a
448n/aclass SkipitemTest(unittest.TestCase):
449n/a
450n/a def test_skipitem(self):
451n/a """
452n/a If this test failed, you probably added a new "format unit"
453n/a in Python/getargs.c, but neglected to update our poor friend
454n/a skipitem() in the same file. (If so, shame on you!)
455n/a
456n/a With a few exceptions**, this function brute-force tests all
457n/a printable ASCII*** characters (32 to 126 inclusive) as format units,
458n/a checking to see that PyArg_ParseTupleAndKeywords() return consistent
459n/a errors both when the unit is attempted to be used and when it is
460n/a skipped. If the format unit doesn't exist, we'll get one of two
461n/a specific error messages (one for used, one for skipped); if it does
462n/a exist we *won't* get that error--we'll get either no error or some
463n/a other error. If we get the specific "does not exist" error for one
464n/a test and not for the other, there's a mismatch, and the test fails.
465n/a
466n/a ** Some format units have special funny semantics and it would
467n/a be difficult to accommodate them here. Since these are all
468n/a well-established and properly skipped in skipitem() we can
469n/a get away with not testing them--this test is really intended
470n/a to catch *new* format units.
471n/a
472n/a *** Python C source files must be ASCII. Therefore it's impossible
473n/a to have non-ASCII format units.
474n/a
475n/a """
476n/a empty_tuple = ()
477n/a tuple_1 = (0,)
478n/a dict_b = {'b':1}
479n/a keywords = ["a", "b"]
480n/a
481n/a for i in range(32, 127):
482n/a c = chr(i)
483n/a
484n/a # skip parentheses, the error reporting is inconsistent about them
485n/a # skip 'e', it's always a two-character code
486n/a # skip '|' and '$', they don't represent arguments anyway
487n/a if c in '()e|$':
488n/a continue
489n/a
490n/a # test the format unit when not skipped
491n/a format = c + "i"
492n/a try:
493n/a # (note: the format string must be bytes!)
494n/a _testcapi.parse_tuple_and_keywords(tuple_1, dict_b,
495n/a format.encode("ascii"), keywords)
496n/a when_not_skipped = False
497n/a except SystemError as e:
498n/a s = "argument 1 (impossible<bad format char>)"
499n/a when_not_skipped = (str(e) == s)
500n/a except TypeError:
501n/a when_not_skipped = False
502n/a
503n/a # test the format unit when skipped
504n/a optional_format = "|" + format
505n/a try:
506n/a _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b,
507n/a optional_format.encode("ascii"), keywords)
508n/a when_skipped = False
509n/a except SystemError as e:
510n/a s = "impossible<bad format char>: '{}'".format(format)
511n/a when_skipped = (str(e) == s)
512n/a
513n/a message = ("test_skipitem_parity: "
514n/a "detected mismatch between convertsimple and skipitem "
515n/a "for format unit '{}' ({}), not skipped {}, skipped {}".format(
516n/a c, i, when_skipped, when_not_skipped))
517n/a self.assertIs(when_skipped, when_not_skipped, message)
518n/a
519n/a def test_parse_tuple_and_keywords(self):
520n/a # parse_tuple_and_keywords error handling tests
521n/a self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords,
522n/a (), {}, 42, [])
523n/a self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
524n/a (), {}, b'', 42)
525n/a self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
526n/a (), {}, b'', [''] * 42)
527n/a self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords,
528n/a (), {}, b'', [42])
529n/a
530n/a def test_positional_only(self):
531n/a parse = _testcapi.parse_tuple_and_keywords
532n/a
533n/a parse((1, 2, 3), {}, b'OOO', ['', '', 'a'])
534n/a parse((1, 2), {'a': 3}, b'OOO', ['', '', 'a'])
535n/a with self.assertRaisesRegex(TypeError,
536n/a r'Function takes at least 2 positional arguments \(1 given\)'):
537n/a parse((1,), {'a': 3}, b'OOO', ['', '', 'a'])
538n/a parse((1,), {}, b'O|OO', ['', '', 'a'])
539n/a with self.assertRaisesRegex(TypeError,
540n/a r'Function takes at least 1 positional arguments \(0 given\)'):
541n/a parse((), {}, b'O|OO', ['', '', 'a'])
542n/a parse((1, 2), {'a': 3}, b'OO$O', ['', '', 'a'])
543n/a with self.assertRaisesRegex(TypeError,
544n/a r'Function takes exactly 2 positional arguments \(1 given\)'):
545n/a parse((1,), {'a': 3}, b'OO$O', ['', '', 'a'])
546n/a parse((1,), {}, b'O|O$O', ['', '', 'a'])
547n/a with self.assertRaisesRegex(TypeError,
548n/a r'Function takes at least 1 positional arguments \(0 given\)'):
549n/a parse((), {}, b'O|O$O', ['', '', 'a'])
550n/a with self.assertRaisesRegex(SystemError, r'Empty parameter name after \$'):
551n/a parse((1,), {}, b'O|$OO', ['', '', 'a'])
552n/a with self.assertRaisesRegex(SystemError, 'Empty keyword'):
553n/a parse((1,), {}, b'O|OO', ['', 'a', ''])
554n/a
555n/a
556n/a@unittest.skipUnless(threading, 'Threading required for this test.')
557n/aclass TestThreadState(unittest.TestCase):
558n/a
559n/a @support.reap_threads
560n/a def test_thread_state(self):
561n/a # some extra thread-state tests driven via _testcapi
562n/a def target():
563n/a idents = []
564n/a
565n/a def callback():
566n/a idents.append(threading.get_ident())
567n/a
568n/a _testcapi._test_thread_state(callback)
569n/a a = b = callback
570n/a time.sleep(1)
571n/a # Check our main thread is in the list exactly 3 times.
572n/a self.assertEqual(idents.count(threading.get_ident()), 3,
573n/a "Couldn't find main thread correctly in the list")
574n/a
575n/a target()
576n/a t = threading.Thread(target=target)
577n/a t.start()
578n/a t.join()
579n/a
580n/a
581n/aclass Test_testcapi(unittest.TestCase):
582n/a def test__testcapi(self):
583n/a for name in dir(_testcapi):
584n/a if name.startswith('test_'):
585n/a with self.subTest("internal", name=name):
586n/a test = getattr(_testcapi, name)
587n/a test()
588n/a
589n/a
590n/aclass PyMemDebugTests(unittest.TestCase):
591n/a PYTHONMALLOC = 'debug'
592n/a # '0x04c06e0' or '04C06E0'
593n/a PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
594n/a
595n/a def check(self, code):
596n/a with support.SuppressCrashReport():
597n/a out = assert_python_failure('-c', code,
598n/a PYTHONMALLOC=self.PYTHONMALLOC)
599n/a stderr = out.err
600n/a return stderr.decode('ascii', 'replace')
601n/a
602n/a def test_buffer_overflow(self):
603n/a out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
604n/a regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
605n/a r" 16 bytes originally requested\n"
606n/a r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
607n/a r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
608n/a r" at tail\+0: 0x78 \*\*\* OUCH\n"
609n/a r" at tail\+1: 0xfb\n"
610n/a r" at tail\+2: 0xfb\n"
611n/a r" .*\n"
612n/a r" The block was made by call #[0-9]+ to debug malloc/realloc.\n"
613n/a r" Data at p: cb cb cb .*\n"
614n/a r"\n"
615n/a r"Fatal Python error: bad trailing pad byte")
616n/a regex = regex.format(ptr=self.PTR_REGEX)
617n/a regex = re.compile(regex, flags=re.DOTALL)
618n/a self.assertRegex(out, regex)
619n/a
620n/a def test_api_misuse(self):
621n/a out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
622n/a regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
623n/a r" 16 bytes originally requested\n"
624n/a r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
625n/a r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
626n/a r" The block was made by call #[0-9]+ to debug malloc/realloc.\n"
627n/a r" Data at p: cb cb cb .*\n"
628n/a r"\n"
629n/a r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n")
630n/a regex = regex.format(ptr=self.PTR_REGEX)
631n/a self.assertRegex(out, regex)
632n/a
633n/a @unittest.skipUnless(threading, 'Test requires a GIL (multithreading)')
634n/a def check_malloc_without_gil(self, code):
635n/a out = self.check(code)
636n/a expected = ('Fatal Python error: Python memory allocator called '
637n/a 'without holding the GIL')
638n/a self.assertIn(expected, out)
639n/a
640n/a def test_pymem_malloc_without_gil(self):
641n/a # Debug hooks must raise an error if PyMem_Malloc() is called
642n/a # without holding the GIL
643n/a code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
644n/a self.check_malloc_without_gil(code)
645n/a
646n/a def test_pyobject_malloc_without_gil(self):
647n/a # Debug hooks must raise an error if PyObject_Malloc() is called
648n/a # without holding the GIL
649n/a code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
650n/a self.check_malloc_without_gil(code)
651n/a
652n/a
653n/aclass PyMemMallocDebugTests(PyMemDebugTests):
654n/a PYTHONMALLOC = 'malloc_debug'
655n/a
656n/a
657n/a@unittest.skipUnless(sysconfig.get_config_var('WITH_PYMALLOC') == 1,
658n/a 'need pymalloc')
659n/aclass PyMemPymallocDebugTests(PyMemDebugTests):
660n/a PYTHONMALLOC = 'pymalloc_debug'
661n/a
662n/a
663n/a@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
664n/aclass PyMemDefaultTests(PyMemDebugTests):
665n/a # test default allocator of Python compiled in debug mode
666n/a PYTHONMALLOC = ''
667n/a
668n/a
669n/aif __name__ == "__main__":
670n/a unittest.main()