ยปCore Development>Code coverage>Lib/test/support/__init__.py

Python code coverage for Lib/test/support/__init__.py

#countcontent
1n/a"""Supporting definitions for the Python regression tests."""
2n/a
3n/aif __name__ != 'test.support':
4n/a raise ImportError('support must be imported from the test package')
5n/a
6n/aimport collections.abc
7n/aimport contextlib
8n/aimport errno
9n/aimport faulthandler
10n/aimport fnmatch
11n/aimport functools
12n/aimport gc
13n/aimport importlib
14n/aimport importlib.util
15n/aimport logging.handlers
16n/aimport nntplib
17n/aimport os
18n/aimport platform
19n/aimport re
20n/aimport shutil
21n/aimport socket
22n/aimport stat
23n/aimport struct
24n/aimport subprocess
25n/aimport sys
26n/aimport sysconfig
27n/aimport tempfile
28n/aimport time
29n/aimport types
30n/aimport unittest
31n/aimport urllib.error
32n/aimport warnings
33n/a
34n/atry:
35n/a import _thread, threading
36n/aexcept ImportError:
37n/a _thread = None
38n/a threading = None
39n/atry:
40n/a import multiprocessing.process
41n/aexcept ImportError:
42n/a multiprocessing = None
43n/a
44n/atry:
45n/a import zlib
46n/aexcept ImportError:
47n/a zlib = None
48n/a
49n/atry:
50n/a import gzip
51n/aexcept ImportError:
52n/a gzip = None
53n/a
54n/atry:
55n/a import bz2
56n/aexcept ImportError:
57n/a bz2 = None
58n/a
59n/atry:
60n/a import lzma
61n/aexcept ImportError:
62n/a lzma = None
63n/a
64n/atry:
65n/a import resource
66n/aexcept ImportError:
67n/a resource = None
68n/a
69n/a__all__ = [
70n/a # globals
71n/a "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
72n/a # exceptions
73n/a "Error", "TestFailed", "ResourceDenied",
74n/a # imports
75n/a "import_module", "import_fresh_module", "CleanImport",
76n/a # modules
77n/a "unload", "forget",
78n/a # io
79n/a "record_original_stdout", "get_original_stdout", "captured_stdout",
80n/a "captured_stdin", "captured_stderr",
81n/a # filesystem
82n/a "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
83n/a "create_empty_file", "can_symlink", "fs_is_case_insensitive",
84n/a # unittest
85n/a "is_resource_enabled", "requires", "requires_freebsd_version",
86n/a "requires_linux_version", "requires_mac_ver", "check_syntax_error",
87n/a "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
88n/a "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
89n/a "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
90n/a "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
91n/a "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
92n/a "anticipate_failure", "load_package_tests", "detect_api_mismatch",
93n/a "check__all__", "requires_android_level", "requires_multiprocessing_queue",
94n/a # sys
95n/a "is_jython", "is_android", "check_impl_detail", "unix_shell",
96n/a "setswitchinterval", "android_not_root",
97n/a # network
98n/a "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
99n/a "bind_unix_socket",
100n/a # processes
101n/a 'temp_umask', "reap_children",
102n/a # logging
103n/a "TestHandler",
104n/a # threads
105n/a "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
106n/a # miscellaneous
107n/a "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
108n/a "run_with_locale", "swap_item",
109n/a "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
110n/a "run_with_tz", "PGO", "missing_compiler_executable",
111n/a ]
112n/a
113n/aclass Error(Exception):
114n/a """Base class for regression test exceptions."""
115n/a
116n/aclass TestFailed(Error):
117n/a """Test failed."""
118n/a
119n/aclass ResourceDenied(unittest.SkipTest):
120n/a """Test skipped because it requested a disallowed resource.
121n/a
122n/a This is raised when a test calls requires() for a resource that
123n/a has not be enabled. It is used to distinguish between expected
124n/a and unexpected skips.
125n/a """
126n/a
127n/a@contextlib.contextmanager
128n/adef _ignore_deprecated_imports(ignore=True):
129n/a """Context manager to suppress package and module deprecation
130n/a warnings when importing them.
131n/a
132n/a If ignore is False, this context manager has no effect.
133n/a """
134n/a if ignore:
135n/a with warnings.catch_warnings():
136n/a warnings.filterwarnings("ignore", ".+ (module|package)",
137n/a DeprecationWarning)
138n/a yield
139n/a else:
140n/a yield
141n/a
142n/a
143n/adef import_module(name, deprecated=False, *, required_on=()):
144n/a """Import and return the module to be tested, raising SkipTest if
145n/a it is not available.
146n/a
147n/a If deprecated is True, any module or package deprecation messages
148n/a will be suppressed. If a module is required on a platform but optional for
149n/a others, set required_on to an iterable of platform prefixes which will be
150n/a compared against sys.platform.
151n/a """
152n/a with _ignore_deprecated_imports(deprecated):
153n/a try:
154n/a return importlib.import_module(name)
155n/a except ImportError as msg:
156n/a if sys.platform.startswith(tuple(required_on)):
157n/a raise
158n/a raise unittest.SkipTest(str(msg))
159n/a
160n/a
161n/adef _save_and_remove_module(name, orig_modules):
162n/a """Helper function to save and remove a module from sys.modules
163n/a
164n/a Raise ImportError if the module can't be imported.
165n/a """
166n/a # try to import the module and raise an error if it can't be imported
167n/a if name not in sys.modules:
168n/a __import__(name)
169n/a del sys.modules[name]
170n/a for modname in list(sys.modules):
171n/a if modname == name or modname.startswith(name + '.'):
172n/a orig_modules[modname] = sys.modules[modname]
173n/a del sys.modules[modname]
174n/a
175n/adef _save_and_block_module(name, orig_modules):
176n/a """Helper function to save and block a module in sys.modules
177n/a
178n/a Return True if the module was in sys.modules, False otherwise.
179n/a """
180n/a saved = True
181n/a try:
182n/a orig_modules[name] = sys.modules[name]
183n/a except KeyError:
184n/a saved = False
185n/a sys.modules[name] = None
186n/a return saved
187n/a
188n/a
189n/adef anticipate_failure(condition):
190n/a """Decorator to mark a test that is known to be broken in some cases
191n/a
192n/a Any use of this decorator should have a comment identifying the
193n/a associated tracker issue.
194n/a """
195n/a if condition:
196n/a return unittest.expectedFailure
197n/a return lambda f: f
198n/a
199n/adef load_package_tests(pkg_dir, loader, standard_tests, pattern):
200n/a """Generic load_tests implementation for simple test packages.
201n/a
202n/a Most packages can implement load_tests using this function as follows:
203n/a
204n/a def load_tests(*args):
205n/a return load_package_tests(os.path.dirname(__file__), *args)
206n/a """
207n/a if pattern is None:
208n/a pattern = "test*"
209n/a top_dir = os.path.dirname( # Lib
210n/a os.path.dirname( # test
211n/a os.path.dirname(__file__))) # support
212n/a package_tests = loader.discover(start_dir=pkg_dir,
213n/a top_level_dir=top_dir,
214n/a pattern=pattern)
215n/a standard_tests.addTests(package_tests)
216n/a return standard_tests
217n/a
218n/a
219n/adef import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
220n/a """Import and return a module, deliberately bypassing sys.modules.
221n/a
222n/a This function imports and returns a fresh copy of the named Python module
223n/a by removing the named module from sys.modules before doing the import.
224n/a Note that unlike reload, the original module is not affected by
225n/a this operation.
226n/a
227n/a *fresh* is an iterable of additional module names that are also removed
228n/a from the sys.modules cache before doing the import.
229n/a
230n/a *blocked* is an iterable of module names that are replaced with None
231n/a in the module cache during the import to ensure that attempts to import
232n/a them raise ImportError.
233n/a
234n/a The named module and any modules named in the *fresh* and *blocked*
235n/a parameters are saved before starting the import and then reinserted into
236n/a sys.modules when the fresh import is complete.
237n/a
238n/a Module and package deprecation messages are suppressed during this import
239n/a if *deprecated* is True.
240n/a
241n/a This function will raise ImportError if the named module cannot be
242n/a imported.
243n/a """
244n/a # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
245n/a # to make sure that this utility function is working as expected
246n/a with _ignore_deprecated_imports(deprecated):
247n/a # Keep track of modules saved for later restoration as well
248n/a # as those which just need a blocking entry removed
249n/a orig_modules = {}
250n/a names_to_remove = []
251n/a _save_and_remove_module(name, orig_modules)
252n/a try:
253n/a for fresh_name in fresh:
254n/a _save_and_remove_module(fresh_name, orig_modules)
255n/a for blocked_name in blocked:
256n/a if not _save_and_block_module(blocked_name, orig_modules):
257n/a names_to_remove.append(blocked_name)
258n/a fresh_module = importlib.import_module(name)
259n/a except ImportError:
260n/a fresh_module = None
261n/a finally:
262n/a for orig_name, module in orig_modules.items():
263n/a sys.modules[orig_name] = module
264n/a for name_to_remove in names_to_remove:
265n/a del sys.modules[name_to_remove]
266n/a return fresh_module
267n/a
268n/a
269n/adef get_attribute(obj, name):
270n/a """Get an attribute, raising SkipTest if AttributeError is raised."""
271n/a try:
272n/a attribute = getattr(obj, name)
273n/a except AttributeError:
274n/a raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
275n/a else:
276n/a return attribute
277n/a
278n/averbose = 1 # Flag set to 0 by regrtest.py
279n/ause_resources = None # Flag set to [] by regrtest.py
280n/amax_memuse = 0 # Disable bigmem tests (they will still be run with
281n/a # small sizes, to make sure they work.)
282n/areal_max_memuse = 0
283n/afailfast = False
284n/amatch_tests = None
285n/a
286n/a# _original_stdout is meant to hold stdout at the time regrtest began.
287n/a# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
288n/a# The point is to have some flavor of stdout the user can actually see.
289n/a_original_stdout = None
290n/adef record_original_stdout(stdout):
291n/a global _original_stdout
292n/a _original_stdout = stdout
293n/a
294n/adef get_original_stdout():
295n/a return _original_stdout or sys.stdout
296n/a
297n/adef unload(name):
298n/a try:
299n/a del sys.modules[name]
300n/a except KeyError:
301n/a pass
302n/a
303n/adef _force_run(path, func, *args):
304n/a try:
305n/a return func(*args)
306n/a except OSError as err:
307n/a if verbose >= 2:
308n/a print('%s: %s' % (err.__class__.__name__, err))
309n/a print('re-run %s%r' % (func.__name__, args))
310n/a os.chmod(path, stat.S_IRWXU)
311n/a return func(*args)
312n/a
313n/aif sys.platform.startswith("win"):
314n/a def _waitfor(func, pathname, waitall=False):
315n/a # Perform the operation
316n/a func(pathname)
317n/a # Now setup the wait loop
318n/a if waitall:
319n/a dirname = pathname
320n/a else:
321n/a dirname, name = os.path.split(pathname)
322n/a dirname = dirname or '.'
323n/a # Check for `pathname` to be removed from the filesystem.
324n/a # The exponential backoff of the timeout amounts to a total
325n/a # of ~1 second after which the deletion is probably an error
326n/a # anyway.
327n/a # Testing on an i7@4.3GHz shows that usually only 1 iteration is
328n/a # required when contention occurs.
329n/a timeout = 0.001
330n/a while timeout < 1.0:
331n/a # Note we are only testing for the existence of the file(s) in
332n/a # the contents of the directory regardless of any security or
333n/a # access rights. If we have made it this far, we have sufficient
334n/a # permissions to do that much using Python's equivalent of the
335n/a # Windows API FindFirstFile.
336n/a # Other Windows APIs can fail or give incorrect results when
337n/a # dealing with files that are pending deletion.
338n/a L = os.listdir(dirname)
339n/a if not (L if waitall else name in L):
340n/a return
341n/a # Increase the timeout and try again
342n/a time.sleep(timeout)
343n/a timeout *= 2
344n/a warnings.warn('tests may fail, delete still pending for ' + pathname,
345n/a RuntimeWarning, stacklevel=4)
346n/a
347n/a def _unlink(filename):
348n/a _waitfor(os.unlink, filename)
349n/a
350n/a def _rmdir(dirname):
351n/a _waitfor(os.rmdir, dirname)
352n/a
353n/a def _rmtree(path):
354n/a def _rmtree_inner(path):
355n/a for name in _force_run(path, os.listdir, path):
356n/a fullname = os.path.join(path, name)
357n/a try:
358n/a mode = os.lstat(fullname).st_mode
359n/a except OSError as exc:
360n/a print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
361n/a file=sys.__stderr__)
362n/a mode = 0
363n/a if stat.S_ISDIR(mode):
364n/a _waitfor(_rmtree_inner, fullname, waitall=True)
365n/a _force_run(fullname, os.rmdir, fullname)
366n/a else:
367n/a _force_run(fullname, os.unlink, fullname)
368n/a _waitfor(_rmtree_inner, path, waitall=True)
369n/a _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
370n/aelse:
371n/a _unlink = os.unlink
372n/a _rmdir = os.rmdir
373n/a
374n/a def _rmtree(path):
375n/a try:
376n/a shutil.rmtree(path)
377n/a return
378n/a except OSError:
379n/a pass
380n/a
381n/a def _rmtree_inner(path):
382n/a for name in _force_run(path, os.listdir, path):
383n/a fullname = os.path.join(path, name)
384n/a try:
385n/a mode = os.lstat(fullname).st_mode
386n/a except OSError:
387n/a mode = 0
388n/a if stat.S_ISDIR(mode):
389n/a _rmtree_inner(fullname)
390n/a _force_run(path, os.rmdir, fullname)
391n/a else:
392n/a _force_run(path, os.unlink, fullname)
393n/a _rmtree_inner(path)
394n/a os.rmdir(path)
395n/a
396n/adef unlink(filename):
397n/a try:
398n/a _unlink(filename)
399n/a except (FileNotFoundError, NotADirectoryError):
400n/a pass
401n/a
402n/adef rmdir(dirname):
403n/a try:
404n/a _rmdir(dirname)
405n/a except FileNotFoundError:
406n/a pass
407n/a
408n/adef rmtree(path):
409n/a try:
410n/a _rmtree(path)
411n/a except FileNotFoundError:
412n/a pass
413n/a
414n/adef make_legacy_pyc(source):
415n/a """Move a PEP 3147/488 pyc file to its legacy pyc location.
416n/a
417n/a :param source: The file system path to the source file. The source file
418n/a does not need to exist, however the PEP 3147/488 pyc file must exist.
419n/a :return: The file system path to the legacy pyc file.
420n/a """
421n/a pyc_file = importlib.util.cache_from_source(source)
422n/a up_one = os.path.dirname(os.path.abspath(source))
423n/a legacy_pyc = os.path.join(up_one, source + 'c')
424n/a os.rename(pyc_file, legacy_pyc)
425n/a return legacy_pyc
426n/a
427n/adef forget(modname):
428n/a """'Forget' a module was ever imported.
429n/a
430n/a This removes the module from sys.modules and deletes any PEP 3147/488 or
431n/a legacy .pyc files.
432n/a """
433n/a unload(modname)
434n/a for dirname in sys.path:
435n/a source = os.path.join(dirname, modname + '.py')
436n/a # It doesn't matter if they exist or not, unlink all possible
437n/a # combinations of PEP 3147/488 and legacy pyc files.
438n/a unlink(source + 'c')
439n/a for opt in ('', 1, 2):
440n/a unlink(importlib.util.cache_from_source(source, optimization=opt))
441n/a
442n/a# Check whether a gui is actually available
443n/adef _is_gui_available():
444n/a if hasattr(_is_gui_available, 'result'):
445n/a return _is_gui_available.result
446n/a reason = None
447n/a if sys.platform.startswith('win'):
448n/a # if Python is running as a service (such as the buildbot service),
449n/a # gui interaction may be disallowed
450n/a import ctypes
451n/a import ctypes.wintypes
452n/a UOI_FLAGS = 1
453n/a WSF_VISIBLE = 0x0001
454n/a class USEROBJECTFLAGS(ctypes.Structure):
455n/a _fields_ = [("fInherit", ctypes.wintypes.BOOL),
456n/a ("fReserved", ctypes.wintypes.BOOL),
457n/a ("dwFlags", ctypes.wintypes.DWORD)]
458n/a dll = ctypes.windll.user32
459n/a h = dll.GetProcessWindowStation()
460n/a if not h:
461n/a raise ctypes.WinError()
462n/a uof = USEROBJECTFLAGS()
463n/a needed = ctypes.wintypes.DWORD()
464n/a res = dll.GetUserObjectInformationW(h,
465n/a UOI_FLAGS,
466n/a ctypes.byref(uof),
467n/a ctypes.sizeof(uof),
468n/a ctypes.byref(needed))
469n/a if not res:
470n/a raise ctypes.WinError()
471n/a if not bool(uof.dwFlags & WSF_VISIBLE):
472n/a reason = "gui not available (WSF_VISIBLE flag not set)"
473n/a elif sys.platform == 'darwin':
474n/a # The Aqua Tk implementations on OS X can abort the process if
475n/a # being called in an environment where a window server connection
476n/a # cannot be made, for instance when invoked by a buildbot or ssh
477n/a # process not running under the same user id as the current console
478n/a # user. To avoid that, raise an exception if the window manager
479n/a # connection is not available.
480n/a from ctypes import cdll, c_int, pointer, Structure
481n/a from ctypes.util import find_library
482n/a
483n/a app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
484n/a
485n/a if app_services.CGMainDisplayID() == 0:
486n/a reason = "gui tests cannot run without OS X window manager"
487n/a else:
488n/a class ProcessSerialNumber(Structure):
489n/a _fields_ = [("highLongOfPSN", c_int),
490n/a ("lowLongOfPSN", c_int)]
491n/a psn = ProcessSerialNumber()
492n/a psn_p = pointer(psn)
493n/a if ( (app_services.GetCurrentProcess(psn_p) < 0) or
494n/a (app_services.SetFrontProcess(psn_p) < 0) ):
495n/a reason = "cannot run without OS X gui process"
496n/a
497n/a # check on every platform whether tkinter can actually do anything
498n/a if not reason:
499n/a try:
500n/a from tkinter import Tk
501n/a root = Tk()
502n/a root.withdraw()
503n/a root.update()
504n/a root.destroy()
505n/a except Exception as e:
506n/a err_string = str(e)
507n/a if len(err_string) > 50:
508n/a err_string = err_string[:50] + ' [...]'
509n/a reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
510n/a err_string)
511n/a
512n/a _is_gui_available.reason = reason
513n/a _is_gui_available.result = not reason
514n/a
515n/a return _is_gui_available.result
516n/a
517n/adef is_resource_enabled(resource):
518n/a """Test whether a resource is enabled.
519n/a
520n/a Known resources are set by regrtest.py. If not running under regrtest.py,
521n/a all resources are assumed enabled unless use_resources has been set.
522n/a """
523n/a return use_resources is None or resource in use_resources
524n/a
525n/adef requires(resource, msg=None):
526n/a """Raise ResourceDenied if the specified resource is not available."""
527n/a if not is_resource_enabled(resource):
528n/a if msg is None:
529n/a msg = "Use of the %r resource not enabled" % resource
530n/a raise ResourceDenied(msg)
531n/a if resource == 'gui' and not _is_gui_available():
532n/a raise ResourceDenied(_is_gui_available.reason)
533n/a
534n/adef _requires_unix_version(sysname, min_version):
535n/a """Decorator raising SkipTest if the OS is `sysname` and the version is less
536n/a than `min_version`.
537n/a
538n/a For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
539n/a the FreeBSD version is less than 7.2.
540n/a """
541n/a def decorator(func):
542n/a @functools.wraps(func)
543n/a def wrapper(*args, **kw):
544n/a if platform.system() == sysname:
545n/a version_txt = platform.release().split('-', 1)[0]
546n/a try:
547n/a version = tuple(map(int, version_txt.split('.')))
548n/a except ValueError:
549n/a pass
550n/a else:
551n/a if version < min_version:
552n/a min_version_txt = '.'.join(map(str, min_version))
553n/a raise unittest.SkipTest(
554n/a "%s version %s or higher required, not %s"
555n/a % (sysname, min_version_txt, version_txt))
556n/a return func(*args, **kw)
557n/a wrapper.min_version = min_version
558n/a return wrapper
559n/a return decorator
560n/a
561n/adef requires_freebsd_version(*min_version):
562n/a """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
563n/a less than `min_version`.
564n/a
565n/a For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
566n/a version is less than 7.2.
567n/a """
568n/a return _requires_unix_version('FreeBSD', min_version)
569n/a
570n/adef requires_linux_version(*min_version):
571n/a """Decorator raising SkipTest if the OS is Linux and the Linux version is
572n/a less than `min_version`.
573n/a
574n/a For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
575n/a version is less than 2.6.32.
576n/a """
577n/a return _requires_unix_version('Linux', min_version)
578n/a
579n/adef requires_mac_ver(*min_version):
580n/a """Decorator raising SkipTest if the OS is Mac OS X and the OS X
581n/a version if less than min_version.
582n/a
583n/a For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
584n/a is lesser than 10.5.
585n/a """
586n/a def decorator(func):
587n/a @functools.wraps(func)
588n/a def wrapper(*args, **kw):
589n/a if sys.platform == 'darwin':
590n/a version_txt = platform.mac_ver()[0]
591n/a try:
592n/a version = tuple(map(int, version_txt.split('.')))
593n/a except ValueError:
594n/a pass
595n/a else:
596n/a if version < min_version:
597n/a min_version_txt = '.'.join(map(str, min_version))
598n/a raise unittest.SkipTest(
599n/a "Mac OS X %s or higher required, not %s"
600n/a % (min_version_txt, version_txt))
601n/a return func(*args, **kw)
602n/a wrapper.min_version = min_version
603n/a return wrapper
604n/a return decorator
605n/a
606n/a
607n/a# Don't use "localhost", since resolving it uses the DNS under recent
608n/a# Windows versions (see issue #18792).
609n/aHOST = "127.0.0.1"
610n/aHOSTv6 = "::1"
611n/a
612n/a
613n/adef find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
614n/a """Returns an unused port that should be suitable for binding. This is
615n/a achieved by creating a temporary socket with the same family and type as
616n/a the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
617n/a the specified host address (defaults to 0.0.0.0) with the port set to 0,
618n/a eliciting an unused ephemeral port from the OS. The temporary socket is
619n/a then closed and deleted, and the ephemeral port is returned.
620n/a
621n/a Either this method or bind_port() should be used for any tests where a
622n/a server socket needs to be bound to a particular port for the duration of
623n/a the test. Which one to use depends on whether the calling code is creating
624n/a a python socket, or if an unused port needs to be provided in a constructor
625n/a or passed to an external program (i.e. the -accept argument to openssl's
626n/a s_server mode). Always prefer bind_port() over find_unused_port() where
627n/a possible. Hard coded ports should *NEVER* be used. As soon as a server
628n/a socket is bound to a hard coded port, the ability to run multiple instances
629n/a of the test simultaneously on the same host is compromised, which makes the
630n/a test a ticking time bomb in a buildbot environment. On Unix buildbots, this
631n/a may simply manifest as a failed test, which can be recovered from without
632n/a intervention in most cases, but on Windows, the entire python process can
633n/a completely and utterly wedge, requiring someone to log in to the buildbot
634n/a and manually kill the affected process.
635n/a
636n/a (This is easy to reproduce on Windows, unfortunately, and can be traced to
637n/a the SO_REUSEADDR socket option having different semantics on Windows versus
638n/a Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
639n/a listen and then accept connections on identical host/ports. An EADDRINUSE
640n/a OSError will be raised at some point (depending on the platform and
641n/a the order bind and listen were called on each socket).
642n/a
643n/a However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
644n/a will ever be raised when attempting to bind two identical host/ports. When
645n/a accept() is called on each socket, the second caller's process will steal
646n/a the port from the first caller, leaving them both in an awkwardly wedged
647n/a state where they'll no longer respond to any signals or graceful kills, and
648n/a must be forcibly killed via OpenProcess()/TerminateProcess().
649n/a
650n/a The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
651n/a instead of SO_REUSEADDR, which effectively affords the same semantics as
652n/a SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
653n/a Source world compared to Windows ones, this is a common mistake. A quick
654n/a look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
655n/a openssl.exe is called with the 's_server' option, for example. See
656n/a http://bugs.python.org/issue2550 for more info. The following site also
657n/a has a very thorough description about the implications of both REUSEADDR
658n/a and EXCLUSIVEADDRUSE on Windows:
659n/a http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
660n/a
661n/a XXX: although this approach is a vast improvement on previous attempts to
662n/a elicit unused ports, it rests heavily on the assumption that the ephemeral
663n/a port returned to us by the OS won't immediately be dished back out to some
664n/a other process when we close and delete our temporary socket but before our
665n/a calling code has a chance to bind the returned port. We can deal with this
666n/a issue if/when we come across it.
667n/a """
668n/a
669n/a tempsock = socket.socket(family, socktype)
670n/a port = bind_port(tempsock)
671n/a tempsock.close()
672n/a del tempsock
673n/a return port
674n/a
675n/adef bind_port(sock, host=HOST):
676n/a """Bind the socket to a free port and return the port number. Relies on
677n/a ephemeral ports in order to ensure we are using an unbound port. This is
678n/a important as many tests may be running simultaneously, especially in a
679n/a buildbot environment. This method raises an exception if the sock.family
680n/a is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
681n/a or SO_REUSEPORT set on it. Tests should *never* set these socket options
682n/a for TCP/IP sockets. The only case for setting these options is testing
683n/a multicasting via multiple UDP sockets.
684n/a
685n/a Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
686n/a on Windows), it will be set on the socket. This will prevent anyone else
687n/a from bind()'ing to our host/port for the duration of the test.
688n/a """
689n/a
690n/a if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
691n/a if hasattr(socket, 'SO_REUSEADDR'):
692n/a if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
693n/a raise TestFailed("tests should never set the SO_REUSEADDR " \
694n/a "socket option on TCP/IP sockets!")
695n/a if hasattr(socket, 'SO_REUSEPORT'):
696n/a try:
697n/a if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
698n/a raise TestFailed("tests should never set the SO_REUSEPORT " \
699n/a "socket option on TCP/IP sockets!")
700n/a except OSError:
701n/a # Python's socket module was compiled using modern headers
702n/a # thus defining SO_REUSEPORT but this process is running
703n/a # under an older kernel that does not support SO_REUSEPORT.
704n/a pass
705n/a if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
706n/a sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
707n/a
708n/a sock.bind((host, 0))
709n/a port = sock.getsockname()[1]
710n/a return port
711n/a
712n/adef bind_unix_socket(sock, addr):
713n/a """Bind a unix socket, raising SkipTest if PermissionError is raised."""
714n/a assert sock.family == socket.AF_UNIX
715n/a try:
716n/a sock.bind(addr)
717n/a except PermissionError:
718n/a sock.close()
719n/a raise unittest.SkipTest('cannot bind AF_UNIX sockets')
720n/a
721n/adef _is_ipv6_enabled():
722n/a """Check whether IPv6 is enabled on this host."""
723n/a if socket.has_ipv6:
724n/a sock = None
725n/a try:
726n/a sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
727n/a sock.bind((HOSTv6, 0))
728n/a return True
729n/a except OSError:
730n/a pass
731n/a finally:
732n/a if sock:
733n/a sock.close()
734n/a return False
735n/a
736n/aIPV6_ENABLED = _is_ipv6_enabled()
737n/a
738n/adef system_must_validate_cert(f):
739n/a """Skip the test on TLS certificate validation failures."""
740n/a @functools.wraps(f)
741n/a def dec(*args, **kwargs):
742n/a try:
743n/a f(*args, **kwargs)
744n/a except IOError as e:
745n/a if "CERTIFICATE_VERIFY_FAILED" in str(e):
746n/a raise unittest.SkipTest("system does not contain "
747n/a "necessary certificates")
748n/a raise
749n/a return dec
750n/a
751n/a# A constant likely larger than the underlying OS pipe buffer size, to
752n/a# make writes blocking.
753n/a# Windows limit seems to be around 512 B, and many Unix kernels have a
754n/a# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
755n/a# (see issue #17835 for a discussion of this number).
756n/aPIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
757n/a
758n/a# A constant likely larger than the underlying OS socket buffer size, to make
759n/a# writes blocking.
760n/a# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
761n/a# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
762n/a# for a discussion of this number).
763n/aSOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
764n/a
765n/a# decorator for skipping tests on non-IEEE 754 platforms
766n/arequires_IEEE_754 = unittest.skipUnless(
767n/a float.__getformat__("double").startswith("IEEE"),
768n/a "test requires IEEE 754 doubles")
769n/a
770n/arequires_zlib = unittest.skipUnless(zlib, 'requires zlib')
771n/a
772n/arequires_gzip = unittest.skipUnless(gzip, 'requires gzip')
773n/a
774n/arequires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
775n/a
776n/arequires_lzma = unittest.skipUnless(lzma, 'requires lzma')
777n/a
778n/ais_jython = sys.platform.startswith('java')
779n/a
780n/atry:
781n/a # constant used by requires_android_level()
782n/a _ANDROID_API_LEVEL = sys.getandroidapilevel()
783n/a is_android = True
784n/aexcept AttributeError:
785n/a # sys.getandroidapilevel() is only available on Android
786n/a is_android = False
787n/aandroid_not_root = (is_android and os.geteuid() != 0)
788n/a
789n/aif sys.platform != 'win32':
790n/a unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
791n/aelse:
792n/a unix_shell = None
793n/a
794n/a# Filename used for testing
795n/aif os.name == 'java':
796n/a # Jython disallows @ in module names
797n/a TESTFN = '$test'
798n/aelse:
799n/a TESTFN = '@test'
800n/a
801n/a# Disambiguate TESTFN for parallel testing, while letting it remain a valid
802n/a# module name.
803n/aTESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
804n/a
805n/a# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
806n/a# or None if there is no such character.
807n/aFS_NONASCII = None
808n/afor character in (
809n/a # First try printable and common characters to have a readable filename.
810n/a # For each character, the encoding list are just example of encodings able
811n/a # to encode the character (the list is not exhaustive).
812n/a
813n/a # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
814n/a '\u00E6',
815n/a # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
816n/a '\u0130',
817n/a # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
818n/a '\u0141',
819n/a # U+03C6 (Greek Small Letter Phi): cp1253
820n/a '\u03C6',
821n/a # U+041A (Cyrillic Capital Letter Ka): cp1251
822n/a '\u041A',
823n/a # U+05D0 (Hebrew Letter Alef): Encodable to cp424
824n/a '\u05D0',
825n/a # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
826n/a '\u060C',
827n/a # U+062A (Arabic Letter Teh): cp720
828n/a '\u062A',
829n/a # U+0E01 (Thai Character Ko Kai): cp874
830n/a '\u0E01',
831n/a
832n/a # Then try more "special" characters. "special" because they may be
833n/a # interpreted or displayed differently depending on the exact locale
834n/a # encoding and the font.
835n/a
836n/a # U+00A0 (No-Break Space)
837n/a '\u00A0',
838n/a # U+20AC (Euro Sign)
839n/a '\u20AC',
840n/a):
841n/a try:
842n/a os.fsdecode(os.fsencode(character))
843n/a except UnicodeError:
844n/a pass
845n/a else:
846n/a FS_NONASCII = character
847n/a break
848n/a
849n/a# TESTFN_UNICODE is a non-ascii filename
850n/aTESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
851n/aif sys.platform == 'darwin':
852n/a # In Mac OS X's VFS API file names are, by definition, canonically
853n/a # decomposed Unicode, encoded using UTF-8. See QA1173:
854n/a # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
855n/a import unicodedata
856n/a TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
857n/aTESTFN_ENCODING = sys.getfilesystemencoding()
858n/a
859n/a# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
860n/a# encoded by the filesystem encoding (in strict mode). It can be None if we
861n/a# cannot generate such filename.
862n/aTESTFN_UNENCODABLE = None
863n/aif os.name == 'nt':
864n/a # skip win32s (0) or Windows 9x/ME (1)
865n/a if sys.getwindowsversion().platform >= 2:
866n/a # Different kinds of characters from various languages to minimize the
867n/a # probability that the whole name is encodable to MBCS (issue #9819)
868n/a TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
869n/a try:
870n/a TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
871n/a except UnicodeEncodeError:
872n/a pass
873n/a else:
874n/a print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
875n/a 'Unicode filename tests may not be effective'
876n/a % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
877n/a TESTFN_UNENCODABLE = None
878n/a# Mac OS X denies unencodable filenames (invalid utf-8)
879n/aelif sys.platform != 'darwin':
880n/a try:
881n/a # ascii and utf-8 cannot encode the byte 0xff
882n/a b'\xff'.decode(TESTFN_ENCODING)
883n/a except UnicodeDecodeError:
884n/a # 0xff will be encoded using the surrogate character u+DCFF
885n/a TESTFN_UNENCODABLE = TESTFN \
886n/a + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
887n/a else:
888n/a # File system encoding (eg. ISO-8859-* encodings) can encode
889n/a # the byte 0xff. Skip some unicode filename tests.
890n/a pass
891n/a
892n/a# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
893n/a# decoded from the filesystem encoding (in strict mode). It can be None if we
894n/a# cannot generate such filename (ex: the latin1 encoding can decode any byte
895n/a# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
896n/a# to the surrogateescape error handler (PEP 383), but not from the filesystem
897n/a# encoding in strict mode.
898n/aTESTFN_UNDECODABLE = None
899n/afor name in (
900n/a # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
901n/a # accepts it to create a file or a directory, or don't accept to enter to
902n/a # such directory (when the bytes name is used). So test b'\xe7' first: it is
903n/a # not decodable from cp932.
904n/a b'\xe7w\xf0',
905n/a # undecodable from ASCII, UTF-8
906n/a b'\xff',
907n/a # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
908n/a # and cp857
909n/a b'\xae\xd5'
910n/a # undecodable from UTF-8 (UNIX and Mac OS X)
911n/a b'\xed\xb2\x80', b'\xed\xb4\x80',
912n/a # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
913n/a # cp1253, cp1254, cp1255, cp1257, cp1258
914n/a b'\x81\x98',
915n/a):
916n/a try:
917n/a name.decode(TESTFN_ENCODING)
918n/a except UnicodeDecodeError:
919n/a TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
920n/a break
921n/a
922n/aif FS_NONASCII:
923n/a TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
924n/aelse:
925n/a TESTFN_NONASCII = None
926n/a
927n/a# Save the initial cwd
928n/aSAVEDCWD = os.getcwd()
929n/a
930n/a# Set by libregrtest/main.py so we can skip tests that are not
931n/a# useful for PGO
932n/aPGO = False
933n/a
934n/a@contextlib.contextmanager
935n/adef temp_dir(path=None, quiet=False):
936n/a """Return a context manager that creates a temporary directory.
937n/a
938n/a Arguments:
939n/a
940n/a path: the directory to create temporarily. If omitted or None,
941n/a defaults to creating a temporary directory using tempfile.mkdtemp.
942n/a
943n/a quiet: if False (the default), the context manager raises an exception
944n/a on error. Otherwise, if the path is specified and cannot be
945n/a created, only a warning is issued.
946n/a
947n/a """
948n/a dir_created = False
949n/a if path is None:
950n/a path = tempfile.mkdtemp()
951n/a dir_created = True
952n/a path = os.path.realpath(path)
953n/a else:
954n/a try:
955n/a os.mkdir(path)
956n/a dir_created = True
957n/a except OSError as exc:
958n/a if not quiet:
959n/a raise
960n/a warnings.warn(f'tests may fail, unable to create '
961n/a f'temporary directory {path!r}: {exc}',
962n/a RuntimeWarning, stacklevel=3)
963n/a try:
964n/a yield path
965n/a finally:
966n/a if dir_created:
967n/a rmtree(path)
968n/a
969n/a@contextlib.contextmanager
970n/adef change_cwd(path, quiet=False):
971n/a """Return a context manager that changes the current working directory.
972n/a
973n/a Arguments:
974n/a
975n/a path: the directory to use as the temporary current working directory.
976n/a
977n/a quiet: if False (the default), the context manager raises an exception
978n/a on error. Otherwise, it issues only a warning and keeps the current
979n/a working directory the same.
980n/a
981n/a """
982n/a saved_dir = os.getcwd()
983n/a try:
984n/a os.chdir(path)
985n/a except OSError as exc:
986n/a if not quiet:
987n/a raise
988n/a warnings.warn(f'tests may fail, unable to change the current working '
989n/a f'directory to {path!r}: {exc}',
990n/a RuntimeWarning, stacklevel=3)
991n/a try:
992n/a yield os.getcwd()
993n/a finally:
994n/a os.chdir(saved_dir)
995n/a
996n/a
997n/a@contextlib.contextmanager
998n/adef temp_cwd(name='tempcwd', quiet=False):
999n/a """
1000n/a Context manager that temporarily creates and changes the CWD.
1001n/a
1002n/a The function temporarily changes the current working directory
1003n/a after creating a temporary directory in the current directory with
1004n/a name *name*. If *name* is None, the temporary directory is
1005n/a created using tempfile.mkdtemp.
1006n/a
1007n/a If *quiet* is False (default) and it is not possible to
1008n/a create or change the CWD, an error is raised. If *quiet* is True,
1009n/a only a warning is raised and the original CWD is used.
1010n/a
1011n/a """
1012n/a with temp_dir(path=name, quiet=quiet) as temp_path:
1013n/a with change_cwd(temp_path, quiet=quiet) as cwd_dir:
1014n/a yield cwd_dir
1015n/a
1016n/aif hasattr(os, "umask"):
1017n/a @contextlib.contextmanager
1018n/a def temp_umask(umask):
1019n/a """Context manager that temporarily sets the process umask."""
1020n/a oldmask = os.umask(umask)
1021n/a try:
1022n/a yield
1023n/a finally:
1024n/a os.umask(oldmask)
1025n/a
1026n/a# TEST_HOME_DIR refers to the top level directory of the "test" package
1027n/a# that contains Python's regression test suite
1028n/aTEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
1029n/aTEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
1030n/a
1031n/a# TEST_DATA_DIR is used as a target download location for remote resources
1032n/aTEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
1033n/a
1034n/adef findfile(filename, subdir=None):
1035n/a """Try to find a file on sys.path or in the test directory. If it is not
1036n/a found the argument passed to the function is returned (this does not
1037n/a necessarily signal failure; could still be the legitimate path).
1038n/a
1039n/a Setting *subdir* indicates a relative path to use to find the file
1040n/a rather than looking directly in the path directories.
1041n/a """
1042n/a if os.path.isabs(filename):
1043n/a return filename
1044n/a if subdir is not None:
1045n/a filename = os.path.join(subdir, filename)
1046n/a path = [TEST_HOME_DIR] + sys.path
1047n/a for dn in path:
1048n/a fn = os.path.join(dn, filename)
1049n/a if os.path.exists(fn): return fn
1050n/a return filename
1051n/a
1052n/adef create_empty_file(filename):
1053n/a """Create an empty file. If the file already exists, truncate it."""
1054n/a fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
1055n/a os.close(fd)
1056n/a
1057n/adef sortdict(dict):
1058n/a "Like repr(dict), but in sorted order."
1059n/a items = sorted(dict.items())
1060n/a reprpairs = ["%r: %r" % pair for pair in items]
1061n/a withcommas = ", ".join(reprpairs)
1062n/a return "{%s}" % withcommas
1063n/a
1064n/adef make_bad_fd():
1065n/a """
1066n/a Create an invalid file descriptor by opening and closing a file and return
1067n/a its fd.
1068n/a """
1069n/a file = open(TESTFN, "wb")
1070n/a try:
1071n/a return file.fileno()
1072n/a finally:
1073n/a file.close()
1074n/a unlink(TESTFN)
1075n/a
1076n/adef check_syntax_error(testcase, statement, *, lineno=None, offset=None):
1077n/a with testcase.assertRaises(SyntaxError) as cm:
1078n/a compile(statement, '<test string>', 'exec')
1079n/a err = cm.exception
1080n/a testcase.assertIsNotNone(err.lineno)
1081n/a if lineno is not None:
1082n/a testcase.assertEqual(err.lineno, lineno)
1083n/a testcase.assertIsNotNone(err.offset)
1084n/a if offset is not None:
1085n/a testcase.assertEqual(err.offset, offset)
1086n/a
1087n/adef open_urlresource(url, *args, **kw):
1088n/a import urllib.request, urllib.parse
1089n/a
1090n/a check = kw.pop('check', None)
1091n/a
1092n/a filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1093n/a
1094n/a fn = os.path.join(TEST_DATA_DIR, filename)
1095n/a
1096n/a def check_valid_file(fn):
1097n/a f = open(fn, *args, **kw)
1098n/a if check is None:
1099n/a return f
1100n/a elif check(f):
1101n/a f.seek(0)
1102n/a return f
1103n/a f.close()
1104n/a
1105n/a if os.path.exists(fn):
1106n/a f = check_valid_file(fn)
1107n/a if f is not None:
1108n/a return f
1109n/a unlink(fn)
1110n/a
1111n/a # Verify the requirement before downloading the file
1112n/a requires('urlfetch')
1113n/a
1114n/a if verbose:
1115n/a print('\tfetching %s ...' % url, file=get_original_stdout())
1116n/a opener = urllib.request.build_opener()
1117n/a if gzip:
1118n/a opener.addheaders.append(('Accept-Encoding', 'gzip'))
1119n/a f = opener.open(url, timeout=15)
1120n/a if gzip and f.headers.get('Content-Encoding') == 'gzip':
1121n/a f = gzip.GzipFile(fileobj=f)
1122n/a try:
1123n/a with open(fn, "wb") as out:
1124n/a s = f.read()
1125n/a while s:
1126n/a out.write(s)
1127n/a s = f.read()
1128n/a finally:
1129n/a f.close()
1130n/a
1131n/a f = check_valid_file(fn)
1132n/a if f is not None:
1133n/a return f
1134n/a raise TestFailed('invalid resource %r' % fn)
1135n/a
1136n/a
1137n/aclass WarningsRecorder(object):
1138n/a """Convenience wrapper for the warnings list returned on
1139n/a entry to the warnings.catch_warnings() context manager.
1140n/a """
1141n/a def __init__(self, warnings_list):
1142n/a self._warnings = warnings_list
1143n/a self._last = 0
1144n/a
1145n/a def __getattr__(self, attr):
1146n/a if len(self._warnings) > self._last:
1147n/a return getattr(self._warnings[-1], attr)
1148n/a elif attr in warnings.WarningMessage._WARNING_DETAILS:
1149n/a return None
1150n/a raise AttributeError("%r has no attribute %r" % (self, attr))
1151n/a
1152n/a @property
1153n/a def warnings(self):
1154n/a return self._warnings[self._last:]
1155n/a
1156n/a def reset(self):
1157n/a self._last = len(self._warnings)
1158n/a
1159n/a
1160n/adef _filterwarnings(filters, quiet=False):
1161n/a """Catch the warnings, then check if all the expected
1162n/a warnings have been raised and re-raise unexpected warnings.
1163n/a If 'quiet' is True, only re-raise the unexpected warnings.
1164n/a """
1165n/a # Clear the warning registry of the calling module
1166n/a # in order to re-raise the warnings.
1167n/a frame = sys._getframe(2)
1168n/a registry = frame.f_globals.get('__warningregistry__')
1169n/a if registry:
1170n/a registry.clear()
1171n/a with warnings.catch_warnings(record=True) as w:
1172n/a # Set filter "always" to record all warnings. Because
1173n/a # test_warnings swap the module, we need to look up in
1174n/a # the sys.modules dictionary.
1175n/a sys.modules['warnings'].simplefilter("always")
1176n/a yield WarningsRecorder(w)
1177n/a # Filter the recorded warnings
1178n/a reraise = list(w)
1179n/a missing = []
1180n/a for msg, cat in filters:
1181n/a seen = False
1182n/a for w in reraise[:]:
1183n/a warning = w.message
1184n/a # Filter out the matching messages
1185n/a if (re.match(msg, str(warning), re.I) and
1186n/a issubclass(warning.__class__, cat)):
1187n/a seen = True
1188n/a reraise.remove(w)
1189n/a if not seen and not quiet:
1190n/a # This filter caught nothing
1191n/a missing.append((msg, cat.__name__))
1192n/a if reraise:
1193n/a raise AssertionError("unhandled warning %s" % reraise[0])
1194n/a if missing:
1195n/a raise AssertionError("filter (%r, %s) did not catch any warning" %
1196n/a missing[0])
1197n/a
1198n/a
1199n/a@contextlib.contextmanager
1200n/adef check_warnings(*filters, **kwargs):
1201n/a """Context manager to silence warnings.
1202n/a
1203n/a Accept 2-tuples as positional arguments:
1204n/a ("message regexp", WarningCategory)
1205n/a
1206n/a Optional argument:
1207n/a - if 'quiet' is True, it does not fail if a filter catches nothing
1208n/a (default True without argument,
1209n/a default False if some filters are defined)
1210n/a
1211n/a Without argument, it defaults to:
1212n/a check_warnings(("", Warning), quiet=True)
1213n/a """
1214n/a quiet = kwargs.get('quiet')
1215n/a if not filters:
1216n/a filters = (("", Warning),)
1217n/a # Preserve backward compatibility
1218n/a if quiet is None:
1219n/a quiet = True
1220n/a return _filterwarnings(filters, quiet)
1221n/a
1222n/a
1223n/a@contextlib.contextmanager
1224n/adef check_no_resource_warning(testcase):
1225n/a """Context manager to check that no ResourceWarning is emitted.
1226n/a
1227n/a Usage:
1228n/a
1229n/a with check_no_resource_warning(self):
1230n/a f = open(...)
1231n/a ...
1232n/a del f
1233n/a
1234n/a You must remove the object which may emit ResourceWarning before
1235n/a the end of the context manager.
1236n/a """
1237n/a with warnings.catch_warnings(record=True) as warns:
1238n/a warnings.filterwarnings('always', category=ResourceWarning)
1239n/a yield
1240n/a gc_collect()
1241n/a testcase.assertEqual(warns, [])
1242n/a
1243n/a
1244n/aclass CleanImport(object):
1245n/a """Context manager to force import to return a new module reference.
1246n/a
1247n/a This is useful for testing module-level behaviours, such as
1248n/a the emission of a DeprecationWarning on import.
1249n/a
1250n/a Use like this:
1251n/a
1252n/a with CleanImport("foo"):
1253n/a importlib.import_module("foo") # new reference
1254n/a """
1255n/a
1256n/a def __init__(self, *module_names):
1257n/a self.original_modules = sys.modules.copy()
1258n/a for module_name in module_names:
1259n/a if module_name in sys.modules:
1260n/a module = sys.modules[module_name]
1261n/a # It is possible that module_name is just an alias for
1262n/a # another module (e.g. stub for modules renamed in 3.x).
1263n/a # In that case, we also need delete the real module to clear
1264n/a # the import cache.
1265n/a if module.__name__ != module_name:
1266n/a del sys.modules[module.__name__]
1267n/a del sys.modules[module_name]
1268n/a
1269n/a def __enter__(self):
1270n/a return self
1271n/a
1272n/a def __exit__(self, *ignore_exc):
1273n/a sys.modules.update(self.original_modules)
1274n/a
1275n/a
1276n/aclass EnvironmentVarGuard(collections.abc.MutableMapping):
1277n/a
1278n/a """Class to help protect the environment variable properly. Can be used as
1279n/a a context manager."""
1280n/a
1281n/a def __init__(self):
1282n/a self._environ = os.environ
1283n/a self._changed = {}
1284n/a
1285n/a def __getitem__(self, envvar):
1286n/a return self._environ[envvar]
1287n/a
1288n/a def __setitem__(self, envvar, value):
1289n/a # Remember the initial value on the first access
1290n/a if envvar not in self._changed:
1291n/a self._changed[envvar] = self._environ.get(envvar)
1292n/a self._environ[envvar] = value
1293n/a
1294n/a def __delitem__(self, envvar):
1295n/a # Remember the initial value on the first access
1296n/a if envvar not in self._changed:
1297n/a self._changed[envvar] = self._environ.get(envvar)
1298n/a if envvar in self._environ:
1299n/a del self._environ[envvar]
1300n/a
1301n/a def keys(self):
1302n/a return self._environ.keys()
1303n/a
1304n/a def __iter__(self):
1305n/a return iter(self._environ)
1306n/a
1307n/a def __len__(self):
1308n/a return len(self._environ)
1309n/a
1310n/a def set(self, envvar, value):
1311n/a self[envvar] = value
1312n/a
1313n/a def unset(self, envvar):
1314n/a del self[envvar]
1315n/a
1316n/a def __enter__(self):
1317n/a return self
1318n/a
1319n/a def __exit__(self, *ignore_exc):
1320n/a for (k, v) in self._changed.items():
1321n/a if v is None:
1322n/a if k in self._environ:
1323n/a del self._environ[k]
1324n/a else:
1325n/a self._environ[k] = v
1326n/a os.environ = self._environ
1327n/a
1328n/a
1329n/aclass DirsOnSysPath(object):
1330n/a """Context manager to temporarily add directories to sys.path.
1331n/a
1332n/a This makes a copy of sys.path, appends any directories given
1333n/a as positional arguments, then reverts sys.path to the copied
1334n/a settings when the context ends.
1335n/a
1336n/a Note that *all* sys.path modifications in the body of the
1337n/a context manager, including replacement of the object,
1338n/a will be reverted at the end of the block.
1339n/a """
1340n/a
1341n/a def __init__(self, *paths):
1342n/a self.original_value = sys.path[:]
1343n/a self.original_object = sys.path
1344n/a sys.path.extend(paths)
1345n/a
1346n/a def __enter__(self):
1347n/a return self
1348n/a
1349n/a def __exit__(self, *ignore_exc):
1350n/a sys.path = self.original_object
1351n/a sys.path[:] = self.original_value
1352n/a
1353n/a
1354n/aclass TransientResource(object):
1355n/a
1356n/a """Raise ResourceDenied if an exception is raised while the context manager
1357n/a is in effect that matches the specified exception and attributes."""
1358n/a
1359n/a def __init__(self, exc, **kwargs):
1360n/a self.exc = exc
1361n/a self.attrs = kwargs
1362n/a
1363n/a def __enter__(self):
1364n/a return self
1365n/a
1366n/a def __exit__(self, type_=None, value=None, traceback=None):
1367n/a """If type_ is a subclass of self.exc and value has attributes matching
1368n/a self.attrs, raise ResourceDenied. Otherwise let the exception
1369n/a propagate (if any)."""
1370n/a if type_ is not None and issubclass(self.exc, type_):
1371n/a for attr, attr_value in self.attrs.items():
1372n/a if not hasattr(value, attr):
1373n/a break
1374n/a if getattr(value, attr) != attr_value:
1375n/a break
1376n/a else:
1377n/a raise ResourceDenied("an optional resource is not available")
1378n/a
1379n/a# Context managers that raise ResourceDenied when various issues
1380n/a# with the Internet connection manifest themselves as exceptions.
1381n/a# XXX deprecate these and use transient_internet() instead
1382n/atime_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1383n/asocket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1384n/aioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1385n/a
1386n/a
1387n/a@contextlib.contextmanager
1388n/adef transient_internet(resource_name, *, timeout=30.0, errnos=()):
1389n/a """Return a context manager that raises ResourceDenied when various issues
1390n/a with the Internet connection manifest themselves as exceptions."""
1391n/a default_errnos = [
1392n/a ('ECONNREFUSED', 111),
1393n/a ('ECONNRESET', 104),
1394n/a ('EHOSTUNREACH', 113),
1395n/a ('ENETUNREACH', 101),
1396n/a ('ETIMEDOUT', 110),
1397n/a ]
1398n/a default_gai_errnos = [
1399n/a ('EAI_AGAIN', -3),
1400n/a ('EAI_FAIL', -4),
1401n/a ('EAI_NONAME', -2),
1402n/a ('EAI_NODATA', -5),
1403n/a # Encountered when trying to resolve IPv6-only hostnames
1404n/a ('WSANO_DATA', 11004),
1405n/a ]
1406n/a
1407n/a denied = ResourceDenied("Resource %r is not available" % resource_name)
1408n/a captured_errnos = errnos
1409n/a gai_errnos = []
1410n/a if not captured_errnos:
1411n/a captured_errnos = [getattr(errno, name, num)
1412n/a for (name, num) in default_errnos]
1413n/a gai_errnos = [getattr(socket, name, num)
1414n/a for (name, num) in default_gai_errnos]
1415n/a
1416n/a def filter_error(err):
1417n/a n = getattr(err, 'errno', None)
1418n/a if (isinstance(err, socket.timeout) or
1419n/a (isinstance(err, socket.gaierror) and n in gai_errnos) or
1420n/a (isinstance(err, urllib.error.HTTPError) and
1421n/a 500 <= err.code <= 599) or
1422n/a (isinstance(err, urllib.error.URLError) and
1423n/a (("ConnectionRefusedError" in err.reason) or
1424n/a ("TimeoutError" in err.reason) or
1425n/a ("EOFError" in err.reason))) or
1426n/a n in captured_errnos):
1427n/a if not verbose:
1428n/a sys.stderr.write(denied.args[0] + "\n")
1429n/a raise denied from err
1430n/a
1431n/a old_timeout = socket.getdefaulttimeout()
1432n/a try:
1433n/a if timeout is not None:
1434n/a socket.setdefaulttimeout(timeout)
1435n/a yield
1436n/a except nntplib.NNTPTemporaryError as err:
1437n/a if verbose:
1438n/a sys.stderr.write(denied.args[0] + "\n")
1439n/a raise denied from err
1440n/a except OSError as err:
1441n/a # urllib can wrap original socket errors multiple times (!), we must
1442n/a # unwrap to get at the original error.
1443n/a while True:
1444n/a a = err.args
1445n/a if len(a) >= 1 and isinstance(a[0], OSError):
1446n/a err = a[0]
1447n/a # The error can also be wrapped as args[1]:
1448n/a # except socket.error as msg:
1449n/a # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1450n/a elif len(a) >= 2 and isinstance(a[1], OSError):
1451n/a err = a[1]
1452n/a else:
1453n/a break
1454n/a filter_error(err)
1455n/a raise
1456n/a # XXX should we catch generic exceptions and look for their
1457n/a # __cause__ or __context__?
1458n/a finally:
1459n/a socket.setdefaulttimeout(old_timeout)
1460n/a
1461n/a
1462n/a@contextlib.contextmanager
1463n/adef captured_output(stream_name):
1464n/a """Return a context manager used by captured_stdout/stdin/stderr
1465n/a that temporarily replaces the sys stream *stream_name* with a StringIO."""
1466n/a import io
1467n/a orig_stdout = getattr(sys, stream_name)
1468n/a setattr(sys, stream_name, io.StringIO())
1469n/a try:
1470n/a yield getattr(sys, stream_name)
1471n/a finally:
1472n/a setattr(sys, stream_name, orig_stdout)
1473n/a
1474n/adef captured_stdout():
1475n/a """Capture the output of sys.stdout:
1476n/a
1477n/a with captured_stdout() as stdout:
1478n/a print("hello")
1479n/a self.assertEqual(stdout.getvalue(), "hello\\n")
1480n/a """
1481n/a return captured_output("stdout")
1482n/a
1483n/adef captured_stderr():
1484n/a """Capture the output of sys.stderr:
1485n/a
1486n/a with captured_stderr() as stderr:
1487n/a print("hello", file=sys.stderr)
1488n/a self.assertEqual(stderr.getvalue(), "hello\\n")
1489n/a """
1490n/a return captured_output("stderr")
1491n/a
1492n/adef captured_stdin():
1493n/a """Capture the input to sys.stdin:
1494n/a
1495n/a with captured_stdin() as stdin:
1496n/a stdin.write('hello\\n')
1497n/a stdin.seek(0)
1498n/a # call test code that consumes from sys.stdin
1499n/a captured = input()
1500n/a self.assertEqual(captured, "hello")
1501n/a """
1502n/a return captured_output("stdin")
1503n/a
1504n/a
1505n/adef gc_collect():
1506n/a """Force as many objects as possible to be collected.
1507n/a
1508n/a In non-CPython implementations of Python, this is needed because timely
1509n/a deallocation is not guaranteed by the garbage collector. (Even in CPython
1510n/a this can be the case in case of reference cycles.) This means that __del__
1511n/a methods may be called later than expected and weakrefs may remain alive for
1512n/a longer than expected. This function tries its best to force all garbage
1513n/a objects to disappear.
1514n/a """
1515n/a gc.collect()
1516n/a if is_jython:
1517n/a time.sleep(0.1)
1518n/a gc.collect()
1519n/a gc.collect()
1520n/a
1521n/a@contextlib.contextmanager
1522n/adef disable_gc():
1523n/a have_gc = gc.isenabled()
1524n/a gc.disable()
1525n/a try:
1526n/a yield
1527n/a finally:
1528n/a if have_gc:
1529n/a gc.enable()
1530n/a
1531n/a
1532n/adef python_is_optimized():
1533n/a """Find if Python was built with optimizations."""
1534n/a cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1535n/a final_opt = ""
1536n/a for opt in cflags.split():
1537n/a if opt.startswith('-O'):
1538n/a final_opt = opt
1539n/a return final_opt not in ('', '-O0', '-Og')
1540n/a
1541n/a
1542n/a_header = 'nP'
1543n/a_align = '0n'
1544n/aif hasattr(sys, "gettotalrefcount"):
1545n/a _header = '2P' + _header
1546n/a _align = '0P'
1547n/a_vheader = _header + 'n'
1548n/a
1549n/adef calcobjsize(fmt):
1550n/a return struct.calcsize(_header + fmt + _align)
1551n/a
1552n/adef calcvobjsize(fmt):
1553n/a return struct.calcsize(_vheader + fmt + _align)
1554n/a
1555n/a
1556n/a_TPFLAGS_HAVE_GC = 1<<14
1557n/a_TPFLAGS_HEAPTYPE = 1<<9
1558n/a
1559n/adef check_sizeof(test, o, size):
1560n/a import _testcapi
1561n/a result = sys.getsizeof(o)
1562n/a # add GC header size
1563n/a if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1564n/a ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1565n/a size += _testcapi.SIZEOF_PYGC_HEAD
1566n/a msg = 'wrong size for %s: got %d, expected %d' \
1567n/a % (type(o), result, size)
1568n/a test.assertEqual(result, size, msg)
1569n/a
1570n/a#=======================================================================
1571n/a# Decorator for running a function in a different locale, correctly resetting
1572n/a# it afterwards.
1573n/a
1574n/adef run_with_locale(catstr, *locales):
1575n/a def decorator(func):
1576n/a def inner(*args, **kwds):
1577n/a try:
1578n/a import locale
1579n/a category = getattr(locale, catstr)
1580n/a orig_locale = locale.setlocale(category)
1581n/a except AttributeError:
1582n/a # if the test author gives us an invalid category string
1583n/a raise
1584n/a except:
1585n/a # cannot retrieve original locale, so do nothing
1586n/a locale = orig_locale = None
1587n/a else:
1588n/a for loc in locales:
1589n/a try:
1590n/a locale.setlocale(category, loc)
1591n/a break
1592n/a except:
1593n/a pass
1594n/a
1595n/a # now run the function, resetting the locale on exceptions
1596n/a try:
1597n/a return func(*args, **kwds)
1598n/a finally:
1599n/a if locale and orig_locale:
1600n/a locale.setlocale(category, orig_locale)
1601n/a inner.__name__ = func.__name__
1602n/a inner.__doc__ = func.__doc__
1603n/a return inner
1604n/a return decorator
1605n/a
1606n/a#=======================================================================
1607n/a# Decorator for running a function in a specific timezone, correctly
1608n/a# resetting it afterwards.
1609n/a
1610n/adef run_with_tz(tz):
1611n/a def decorator(func):
1612n/a def inner(*args, **kwds):
1613n/a try:
1614n/a tzset = time.tzset
1615n/a except AttributeError:
1616n/a raise unittest.SkipTest("tzset required")
1617n/a if 'TZ' in os.environ:
1618n/a orig_tz = os.environ['TZ']
1619n/a else:
1620n/a orig_tz = None
1621n/a os.environ['TZ'] = tz
1622n/a tzset()
1623n/a
1624n/a # now run the function, resetting the tz on exceptions
1625n/a try:
1626n/a return func(*args, **kwds)
1627n/a finally:
1628n/a if orig_tz is None:
1629n/a del os.environ['TZ']
1630n/a else:
1631n/a os.environ['TZ'] = orig_tz
1632n/a time.tzset()
1633n/a
1634n/a inner.__name__ = func.__name__
1635n/a inner.__doc__ = func.__doc__
1636n/a return inner
1637n/a return decorator
1638n/a
1639n/a#=======================================================================
1640n/a# Big-memory-test support. Separate from 'resources' because memory use
1641n/a# should be configurable.
1642n/a
1643n/a# Some handy shorthands. Note that these are used for byte-limits as well
1644n/a# as size-limits, in the various bigmem tests
1645n/a_1M = 1024*1024
1646n/a_1G = 1024 * _1M
1647n/a_2G = 2 * _1G
1648n/a_4G = 4 * _1G
1649n/a
1650n/aMAX_Py_ssize_t = sys.maxsize
1651n/a
1652n/adef set_memlimit(limit):
1653n/a global max_memuse
1654n/a global real_max_memuse
1655n/a sizes = {
1656n/a 'k': 1024,
1657n/a 'm': _1M,
1658n/a 'g': _1G,
1659n/a 't': 1024*_1G,
1660n/a }
1661n/a m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1662n/a re.IGNORECASE | re.VERBOSE)
1663n/a if m is None:
1664n/a raise ValueError('Invalid memory limit %r' % (limit,))
1665n/a memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1666n/a real_max_memuse = memlimit
1667n/a if memlimit > MAX_Py_ssize_t:
1668n/a memlimit = MAX_Py_ssize_t
1669n/a if memlimit < _2G - 1:
1670n/a raise ValueError('Memory limit %r too low to be useful' % (limit,))
1671n/a max_memuse = memlimit
1672n/a
1673n/aclass _MemoryWatchdog:
1674n/a """An object which periodically watches the process' memory consumption
1675n/a and prints it out.
1676n/a """
1677n/a
1678n/a def __init__(self):
1679n/a self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1680n/a self.started = False
1681n/a
1682n/a def start(self):
1683n/a try:
1684n/a f = open(self.procfile, 'r')
1685n/a except OSError as e:
1686n/a warnings.warn('/proc not available for stats: {}'.format(e),
1687n/a RuntimeWarning)
1688n/a sys.stderr.flush()
1689n/a return
1690n/a
1691n/a watchdog_script = findfile("memory_watchdog.py")
1692n/a self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1693n/a stdin=f, stderr=subprocess.DEVNULL)
1694n/a f.close()
1695n/a self.started = True
1696n/a
1697n/a def stop(self):
1698n/a if self.started:
1699n/a self.mem_watchdog.terminate()
1700n/a self.mem_watchdog.wait()
1701n/a
1702n/a
1703n/adef bigmemtest(size, memuse, dry_run=True):
1704n/a """Decorator for bigmem tests.
1705n/a
1706n/a 'size' is a requested size for the test (in arbitrary, test-interpreted
1707n/a units.) 'memuse' is the number of bytes per unit for the test, or a good
1708n/a estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1709n/a each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1710n/a
1711n/a The 'size' argument is normally passed to the decorated test method as an
1712n/a extra argument. If 'dry_run' is true, the value passed to the test method
1713n/a may be less than the requested value. If 'dry_run' is false, it means the
1714n/a test doesn't support dummy runs when -M is not specified.
1715n/a """
1716n/a def decorator(f):
1717n/a def wrapper(self):
1718n/a size = wrapper.size
1719n/a memuse = wrapper.memuse
1720n/a if not real_max_memuse:
1721n/a maxsize = 5147
1722n/a else:
1723n/a maxsize = size
1724n/a
1725n/a if ((real_max_memuse or not dry_run)
1726n/a and real_max_memuse < maxsize * memuse):
1727n/a raise unittest.SkipTest(
1728n/a "not enough memory: %.1fG minimum needed"
1729n/a % (size * memuse / (1024 ** 3)))
1730n/a
1731n/a if real_max_memuse and verbose:
1732n/a print()
1733n/a print(" ... expected peak memory use: {peak:.1f}G"
1734n/a .format(peak=size * memuse / (1024 ** 3)))
1735n/a watchdog = _MemoryWatchdog()
1736n/a watchdog.start()
1737n/a else:
1738n/a watchdog = None
1739n/a
1740n/a try:
1741n/a return f(self, maxsize)
1742n/a finally:
1743n/a if watchdog:
1744n/a watchdog.stop()
1745n/a
1746n/a wrapper.size = size
1747n/a wrapper.memuse = memuse
1748n/a return wrapper
1749n/a return decorator
1750n/a
1751n/adef bigaddrspacetest(f):
1752n/a """Decorator for tests that fill the address space."""
1753n/a def wrapper(self):
1754n/a if max_memuse < MAX_Py_ssize_t:
1755n/a if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1756n/a raise unittest.SkipTest(
1757n/a "not enough memory: try a 32-bit build instead")
1758n/a else:
1759n/a raise unittest.SkipTest(
1760n/a "not enough memory: %.1fG minimum needed"
1761n/a % (MAX_Py_ssize_t / (1024 ** 3)))
1762n/a else:
1763n/a return f(self)
1764n/a return wrapper
1765n/a
1766n/a#=======================================================================
1767n/a# unittest integration.
1768n/a
1769n/aclass BasicTestRunner:
1770n/a def run(self, test):
1771n/a result = unittest.TestResult()
1772n/a test(result)
1773n/a return result
1774n/a
1775n/adef _id(obj):
1776n/a return obj
1777n/a
1778n/adef requires_resource(resource):
1779n/a if resource == 'gui' and not _is_gui_available():
1780n/a return unittest.skip(_is_gui_available.reason)
1781n/a if is_resource_enabled(resource):
1782n/a return _id
1783n/a else:
1784n/a return unittest.skip("resource {0!r} is not enabled".format(resource))
1785n/a
1786n/adef requires_android_level(level, reason):
1787n/a if is_android and _ANDROID_API_LEVEL < level:
1788n/a return unittest.skip('%s at Android API level %d' %
1789n/a (reason, _ANDROID_API_LEVEL))
1790n/a else:
1791n/a return _id
1792n/a
1793n/adef cpython_only(test):
1794n/a """
1795n/a Decorator for tests only applicable on CPython.
1796n/a """
1797n/a return impl_detail(cpython=True)(test)
1798n/a
1799n/adef impl_detail(msg=None, **guards):
1800n/a if check_impl_detail(**guards):
1801n/a return _id
1802n/a if msg is None:
1803n/a guardnames, default = _parse_guards(guards)
1804n/a if default:
1805n/a msg = "implementation detail not available on {0}"
1806n/a else:
1807n/a msg = "implementation detail specific to {0}"
1808n/a guardnames = sorted(guardnames.keys())
1809n/a msg = msg.format(' or '.join(guardnames))
1810n/a return unittest.skip(msg)
1811n/a
1812n/a_have_mp_queue = None
1813n/adef requires_multiprocessing_queue(test):
1814n/a """Skip decorator for tests that use multiprocessing.Queue."""
1815n/a global _have_mp_queue
1816n/a if _have_mp_queue is None:
1817n/a import multiprocessing
1818n/a # Without a functioning shared semaphore implementation attempts to
1819n/a # instantiate a Queue will result in an ImportError (issue #3770).
1820n/a try:
1821n/a multiprocessing.Queue()
1822n/a _have_mp_queue = True
1823n/a except ImportError:
1824n/a _have_mp_queue = False
1825n/a msg = "requires a functioning shared semaphore implementation"
1826n/a return test if _have_mp_queue else unittest.skip(msg)(test)
1827n/a
1828n/adef _parse_guards(guards):
1829n/a # Returns a tuple ({platform_name: run_me}, default_value)
1830n/a if not guards:
1831n/a return ({'cpython': True}, False)
1832n/a is_true = list(guards.values())[0]
1833n/a assert list(guards.values()) == [is_true] * len(guards) # all True or all False
1834n/a return (guards, not is_true)
1835n/a
1836n/a# Use the following check to guard CPython's implementation-specific tests --
1837n/a# or to run them only on the implementation(s) guarded by the arguments.
1838n/adef check_impl_detail(**guards):
1839n/a """This function returns True or False depending on the host platform.
1840n/a Examples:
1841n/a if check_impl_detail(): # only on CPython (default)
1842n/a if check_impl_detail(jython=True): # only on Jython
1843n/a if check_impl_detail(cpython=False): # everywhere except on CPython
1844n/a """
1845n/a guards, default = _parse_guards(guards)
1846n/a return guards.get(platform.python_implementation().lower(), default)
1847n/a
1848n/a
1849n/adef no_tracing(func):
1850n/a """Decorator to temporarily turn off tracing for the duration of a test."""
1851n/a if not hasattr(sys, 'gettrace'):
1852n/a return func
1853n/a else:
1854n/a @functools.wraps(func)
1855n/a def wrapper(*args, **kwargs):
1856n/a original_trace = sys.gettrace()
1857n/a try:
1858n/a sys.settrace(None)
1859n/a return func(*args, **kwargs)
1860n/a finally:
1861n/a sys.settrace(original_trace)
1862n/a return wrapper
1863n/a
1864n/a
1865n/adef refcount_test(test):
1866n/a """Decorator for tests which involve reference counting.
1867n/a
1868n/a To start, the decorator does not run the test if is not run by CPython.
1869n/a After that, any trace function is unset during the test to prevent
1870n/a unexpected refcounts caused by the trace function.
1871n/a
1872n/a """
1873n/a return no_tracing(cpython_only(test))
1874n/a
1875n/a
1876n/adef _filter_suite(suite, pred):
1877n/a """Recursively filter test cases in a suite based on a predicate."""
1878n/a newtests = []
1879n/a for test in suite._tests:
1880n/a if isinstance(test, unittest.TestSuite):
1881n/a _filter_suite(test, pred)
1882n/a newtests.append(test)
1883n/a else:
1884n/a if pred(test):
1885n/a newtests.append(test)
1886n/a suite._tests = newtests
1887n/a
1888n/adef _run_suite(suite):
1889n/a """Run tests from a unittest.TestSuite-derived class."""
1890n/a if verbose:
1891n/a runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
1892n/a failfast=failfast)
1893n/a else:
1894n/a runner = BasicTestRunner()
1895n/a
1896n/a result = runner.run(suite)
1897n/a if not result.wasSuccessful():
1898n/a if len(result.errors) == 1 and not result.failures:
1899n/a err = result.errors[0][1]
1900n/a elif len(result.failures) == 1 and not result.errors:
1901n/a err = result.failures[0][1]
1902n/a else:
1903n/a err = "multiple errors occurred"
1904n/a if not verbose: err += "; run in verbose mode for details"
1905n/a raise TestFailed(err)
1906n/a
1907n/a
1908n/adef run_unittest(*classes):
1909n/a """Run tests from unittest.TestCase-derived classes."""
1910n/a valid_types = (unittest.TestSuite, unittest.TestCase)
1911n/a suite = unittest.TestSuite()
1912n/a for cls in classes:
1913n/a if isinstance(cls, str):
1914n/a if cls in sys.modules:
1915n/a suite.addTest(unittest.findTestCases(sys.modules[cls]))
1916n/a else:
1917n/a raise ValueError("str arguments must be keys in sys.modules")
1918n/a elif isinstance(cls, valid_types):
1919n/a suite.addTest(cls)
1920n/a else:
1921n/a suite.addTest(unittest.makeSuite(cls))
1922n/a def case_pred(test):
1923n/a if match_tests is None:
1924n/a return True
1925n/a for name in test.id().split("."):
1926n/a if fnmatch.fnmatchcase(name, match_tests):
1927n/a return True
1928n/a return False
1929n/a _filter_suite(suite, case_pred)
1930n/a _run_suite(suite)
1931n/a
1932n/a#=======================================================================
1933n/a# Check for the presence of docstrings.
1934n/a
1935n/a# Rather than trying to enumerate all the cases where docstrings may be
1936n/a# disabled, we just check for that directly
1937n/a
1938n/adef _check_docstrings():
1939n/a """Just used to check if docstrings are enabled"""
1940n/a
1941n/aMISSING_C_DOCSTRINGS = (check_impl_detail() and
1942n/a sys.platform != 'win32' and
1943n/a not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1944n/a
1945n/aHAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1946n/a not MISSING_C_DOCSTRINGS)
1947n/a
1948n/arequires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1949n/a "test requires docstrings")
1950n/a
1951n/a
1952n/a#=======================================================================
1953n/a# doctest driver.
1954n/a
1955n/adef run_doctest(module, verbosity=None, optionflags=0):
1956n/a """Run doctest on the given module. Return (#failures, #tests).
1957n/a
1958n/a If optional argument verbosity is not specified (or is None), pass
1959n/a support's belief about verbosity on to doctest. Else doctest's
1960n/a usual behavior is used (it searches sys.argv for -v).
1961n/a """
1962n/a
1963n/a import doctest
1964n/a
1965n/a if verbosity is None:
1966n/a verbosity = verbose
1967n/a else:
1968n/a verbosity = None
1969n/a
1970n/a f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1971n/a if f:
1972n/a raise TestFailed("%d of %d doctests failed" % (f, t))
1973n/a if verbose:
1974n/a print('doctest (%s) ... %d tests with zero failures' %
1975n/a (module.__name__, t))
1976n/a return f, t
1977n/a
1978n/a
1979n/a#=======================================================================
1980n/a# Support for saving and restoring the imported modules.
1981n/a
1982n/adef modules_setup():
1983n/a return sys.modules.copy(),
1984n/a
1985n/adef modules_cleanup(oldmodules):
1986n/a # Encoders/decoders are registered permanently within the internal
1987n/a # codec cache. If we destroy the corresponding modules their
1988n/a # globals will be set to None which will trip up the cached functions.
1989n/a encodings = [(k, v) for k, v in sys.modules.items()
1990n/a if k.startswith('encodings.')]
1991n/a sys.modules.clear()
1992n/a sys.modules.update(encodings)
1993n/a # XXX: This kind of problem can affect more than just encodings. In particular
1994n/a # extension modules (such as _ssl) don't cope with reloading properly.
1995n/a # Really, test modules should be cleaning out the test specific modules they
1996n/a # know they added (ala test_runpy) rather than relying on this function (as
1997n/a # test_importhooks and test_pkg do currently).
1998n/a # Implicitly imported *real* modules should be left alone (see issue 10556).
1999n/a sys.modules.update(oldmodules)
2000n/a
2001n/a#=======================================================================
2002n/a# Threading support to prevent reporting refleaks when running regrtest.py -R
2003n/a
2004n/a# NOTE: we use thread._count() rather than threading.enumerate() (or the
2005n/a# moral equivalent thereof) because a threading.Thread object is still alive
2006n/a# until its __bootstrap() method has returned, even after it has been
2007n/a# unregistered from the threading module.
2008n/a# thread._count(), on the other hand, only gets decremented *after* the
2009n/a# __bootstrap() method has returned, which gives us reliable reference counts
2010n/a# at the end of a test run.
2011n/a
2012n/adef threading_setup():
2013n/a if _thread:
2014n/a return _thread._count(), threading._dangling.copy()
2015n/a else:
2016n/a return 1, ()
2017n/a
2018n/adef threading_cleanup(*original_values):
2019n/a if not _thread:
2020n/a return
2021n/a _MAX_COUNT = 100
2022n/a for count in range(_MAX_COUNT):
2023n/a values = _thread._count(), threading._dangling
2024n/a if values == original_values:
2025n/a break
2026n/a time.sleep(0.01)
2027n/a gc_collect()
2028n/a # XXX print a warning in case of failure?
2029n/a
2030n/adef reap_threads(func):
2031n/a """Use this function when threads are being used. This will
2032n/a ensure that the threads are cleaned up even when the test fails.
2033n/a If threading is unavailable this function does nothing.
2034n/a """
2035n/a if not _thread:
2036n/a return func
2037n/a
2038n/a @functools.wraps(func)
2039n/a def decorator(*args):
2040n/a key = threading_setup()
2041n/a try:
2042n/a return func(*args)
2043n/a finally:
2044n/a threading_cleanup(*key)
2045n/a return decorator
2046n/a
2047n/adef reap_children():
2048n/a """Use this function at the end of test_main() whenever sub-processes
2049n/a are started. This will help ensure that no extra children (zombies)
2050n/a stick around to hog resources and create problems when looking
2051n/a for refleaks.
2052n/a """
2053n/a
2054n/a # Reap all our dead child processes so we don't leave zombies around.
2055n/a # These hog resources and might be causing some of the buildbots to die.
2056n/a if hasattr(os, 'waitpid'):
2057n/a any_process = -1
2058n/a while True:
2059n/a try:
2060n/a # This will raise an exception on Windows. That's ok.
2061n/a pid, status = os.waitpid(any_process, os.WNOHANG)
2062n/a if pid == 0:
2063n/a break
2064n/a except:
2065n/a break
2066n/a
2067n/a@contextlib.contextmanager
2068n/adef start_threads(threads, unlock=None):
2069n/a threads = list(threads)
2070n/a started = []
2071n/a try:
2072n/a try:
2073n/a for t in threads:
2074n/a t.start()
2075n/a started.append(t)
2076n/a except:
2077n/a if verbose:
2078n/a print("Can't start %d threads, only %d threads started" %
2079n/a (len(threads), len(started)))
2080n/a raise
2081n/a yield
2082n/a finally:
2083n/a try:
2084n/a if unlock:
2085n/a unlock()
2086n/a endtime = starttime = time.time()
2087n/a for timeout in range(1, 16):
2088n/a endtime += 60
2089n/a for t in started:
2090n/a t.join(max(endtime - time.time(), 0.01))
2091n/a started = [t for t in started if t.isAlive()]
2092n/a if not started:
2093n/a break
2094n/a if verbose:
2095n/a print('Unable to join %d threads during a period of '
2096n/a '%d minutes' % (len(started), timeout))
2097n/a finally:
2098n/a started = [t for t in started if t.isAlive()]
2099n/a if started:
2100n/a faulthandler.dump_traceback(sys.stdout)
2101n/a raise AssertionError('Unable to join %d threads' % len(started))
2102n/a
2103n/a@contextlib.contextmanager
2104n/adef swap_attr(obj, attr, new_val):
2105n/a """Temporary swap out an attribute with a new object.
2106n/a
2107n/a Usage:
2108n/a with swap_attr(obj, "attr", 5):
2109n/a ...
2110n/a
2111n/a This will set obj.attr to 5 for the duration of the with: block,
2112n/a restoring the old value at the end of the block. If `attr` doesn't
2113n/a exist on `obj`, it will be created and then deleted at the end of the
2114n/a block.
2115n/a """
2116n/a if hasattr(obj, attr):
2117n/a real_val = getattr(obj, attr)
2118n/a setattr(obj, attr, new_val)
2119n/a try:
2120n/a yield
2121n/a finally:
2122n/a setattr(obj, attr, real_val)
2123n/a else:
2124n/a setattr(obj, attr, new_val)
2125n/a try:
2126n/a yield
2127n/a finally:
2128n/a delattr(obj, attr)
2129n/a
2130n/a@contextlib.contextmanager
2131n/adef swap_item(obj, item, new_val):
2132n/a """Temporary swap out an item with a new object.
2133n/a
2134n/a Usage:
2135n/a with swap_item(obj, "item", 5):
2136n/a ...
2137n/a
2138n/a This will set obj["item"] to 5 for the duration of the with: block,
2139n/a restoring the old value at the end of the block. If `item` doesn't
2140n/a exist on `obj`, it will be created and then deleted at the end of the
2141n/a block.
2142n/a """
2143n/a if item in obj:
2144n/a real_val = obj[item]
2145n/a obj[item] = new_val
2146n/a try:
2147n/a yield
2148n/a finally:
2149n/a obj[item] = real_val
2150n/a else:
2151n/a obj[item] = new_val
2152n/a try:
2153n/a yield
2154n/a finally:
2155n/a del obj[item]
2156n/a
2157n/adef strip_python_stderr(stderr):
2158n/a """Strip the stderr of a Python process from potential debug output
2159n/a emitted by the interpreter.
2160n/a
2161n/a This will typically be run on the result of the communicate() method
2162n/a of a subprocess.Popen object.
2163n/a """
2164n/a stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
2165n/a return stderr
2166n/a
2167n/arequires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
2168n/a 'types are immortal if COUNT_ALLOCS is defined')
2169n/a
2170n/adef args_from_interpreter_flags():
2171n/a """Return a list of command-line arguments reproducing the current
2172n/a settings in sys.flags and sys.warnoptions."""
2173n/a return subprocess._args_from_interpreter_flags()
2174n/a
2175n/adef optim_args_from_interpreter_flags():
2176n/a """Return a list of command-line arguments reproducing the current
2177n/a optimization settings in sys.flags."""
2178n/a return subprocess._optim_args_from_interpreter_flags()
2179n/a
2180n/a#============================================================
2181n/a# Support for assertions about logging.
2182n/a#============================================================
2183n/a
2184n/aclass TestHandler(logging.handlers.BufferingHandler):
2185n/a def __init__(self, matcher):
2186n/a # BufferingHandler takes a "capacity" argument
2187n/a # so as to know when to flush. As we're overriding
2188n/a # shouldFlush anyway, we can set a capacity of zero.
2189n/a # You can call flush() manually to clear out the
2190n/a # buffer.
2191n/a logging.handlers.BufferingHandler.__init__(self, 0)
2192n/a self.matcher = matcher
2193n/a
2194n/a def shouldFlush(self):
2195n/a return False
2196n/a
2197n/a def emit(self, record):
2198n/a self.format(record)
2199n/a self.buffer.append(record.__dict__)
2200n/a
2201n/a def matches(self, **kwargs):
2202n/a """
2203n/a Look for a saved dict whose keys/values match the supplied arguments.
2204n/a """
2205n/a result = False
2206n/a for d in self.buffer:
2207n/a if self.matcher.matches(d, **kwargs):
2208n/a result = True
2209n/a break
2210n/a return result
2211n/a
2212n/aclass Matcher(object):
2213n/a
2214n/a _partial_matches = ('msg', 'message')
2215n/a
2216n/a def matches(self, d, **kwargs):
2217n/a """
2218n/a Try to match a single dict with the supplied arguments.
2219n/a
2220n/a Keys whose values are strings and which are in self._partial_matches
2221n/a will be checked for partial (i.e. substring) matches. You can extend
2222n/a this scheme to (for example) do regular expression matching, etc.
2223n/a """
2224n/a result = True
2225n/a for k in kwargs:
2226n/a v = kwargs[k]
2227n/a dv = d.get(k)
2228n/a if not self.match_value(k, dv, v):
2229n/a result = False
2230n/a break
2231n/a return result
2232n/a
2233n/a def match_value(self, k, dv, v):
2234n/a """
2235n/a Try to match a single stored value (dv) with a supplied value (v).
2236n/a """
2237n/a if type(v) != type(dv):
2238n/a result = False
2239n/a elif type(dv) is not str or k not in self._partial_matches:
2240n/a result = (v == dv)
2241n/a else:
2242n/a result = dv.find(v) >= 0
2243n/a return result
2244n/a
2245n/a
2246n/a_can_symlink = None
2247n/adef can_symlink():
2248n/a global _can_symlink
2249n/a if _can_symlink is not None:
2250n/a return _can_symlink
2251n/a symlink_path = TESTFN + "can_symlink"
2252n/a try:
2253n/a os.symlink(TESTFN, symlink_path)
2254n/a can = True
2255n/a except (OSError, NotImplementedError, AttributeError):
2256n/a can = False
2257n/a else:
2258n/a os.remove(symlink_path)
2259n/a _can_symlink = can
2260n/a return can
2261n/a
2262n/adef skip_unless_symlink(test):
2263n/a """Skip decorator for tests that require functional symlink"""
2264n/a ok = can_symlink()
2265n/a msg = "Requires functional symlink implementation"
2266n/a return test if ok else unittest.skip(msg)(test)
2267n/a
2268n/a_can_xattr = None
2269n/adef can_xattr():
2270n/a global _can_xattr
2271n/a if _can_xattr is not None:
2272n/a return _can_xattr
2273n/a if not hasattr(os, "setxattr"):
2274n/a can = False
2275n/a else:
2276n/a tmp_fp, tmp_name = tempfile.mkstemp()
2277n/a try:
2278n/a with open(TESTFN, "wb") as fp:
2279n/a try:
2280n/a # TESTFN & tempfile may use different file systems with
2281n/a # different capabilities
2282n/a os.setxattr(tmp_fp, b"user.test", b"")
2283n/a os.setxattr(fp.fileno(), b"user.test", b"")
2284n/a # Kernels < 2.6.39 don't respect setxattr flags.
2285n/a kernel_version = platform.release()
2286n/a m = re.match(r"2.6.(\d{1,2})", kernel_version)
2287n/a can = m is None or int(m.group(1)) >= 39
2288n/a except OSError:
2289n/a can = False
2290n/a finally:
2291n/a unlink(TESTFN)
2292n/a unlink(tmp_name)
2293n/a _can_xattr = can
2294n/a return can
2295n/a
2296n/adef skip_unless_xattr(test):
2297n/a """Skip decorator for tests that require functional extended attributes"""
2298n/a ok = can_xattr()
2299n/a msg = "no non-broken extended attribute support"
2300n/a return test if ok else unittest.skip(msg)(test)
2301n/a
2302n/a
2303n/adef fs_is_case_insensitive(directory):
2304n/a """Detects if the file system for the specified directory is case-insensitive."""
2305n/a with tempfile.NamedTemporaryFile(dir=directory) as base:
2306n/a base_path = base.name
2307n/a case_path = base_path.upper()
2308n/a if case_path == base_path:
2309n/a case_path = base_path.lower()
2310n/a try:
2311n/a return os.path.samefile(base_path, case_path)
2312n/a except FileNotFoundError:
2313n/a return False
2314n/a
2315n/a
2316n/adef detect_api_mismatch(ref_api, other_api, *, ignore=()):
2317n/a """Returns the set of items in ref_api not in other_api, except for a
2318n/a defined list of items to be ignored in this check.
2319n/a
2320n/a By default this skips private attributes beginning with '_' but
2321n/a includes all magic methods, i.e. those starting and ending in '__'.
2322n/a """
2323n/a missing_items = set(dir(ref_api)) - set(dir(other_api))
2324n/a if ignore:
2325n/a missing_items -= set(ignore)
2326n/a missing_items = set(m for m in missing_items
2327n/a if not m.startswith('_') or m.endswith('__'))
2328n/a return missing_items
2329n/a
2330n/a
2331n/adef check__all__(test_case, module, name_of_module=None, extra=(),
2332n/a blacklist=()):
2333n/a """Assert that the __all__ variable of 'module' contains all public names.
2334n/a
2335n/a The module's public names (its API) are detected automatically based on
2336n/a whether they match the public name convention and were defined in
2337n/a 'module'.
2338n/a
2339n/a The 'name_of_module' argument can specify (as a string or tuple thereof)
2340n/a what module(s) an API could be defined in in order to be detected as a
2341n/a public API. One case for this is when 'module' imports part of its public
2342n/a API from other modules, possibly a C backend (like 'csv' and its '_csv').
2343n/a
2344n/a The 'extra' argument can be a set of names that wouldn't otherwise be
2345n/a automatically detected as "public", like objects without a proper
2346n/a '__module__' attriubute. If provided, it will be added to the
2347n/a automatically detected ones.
2348n/a
2349n/a The 'blacklist' argument can be a set of names that must not be treated
2350n/a as part of the public API even though their names indicate otherwise.
2351n/a
2352n/a Usage:
2353n/a import bar
2354n/a import foo
2355n/a import unittest
2356n/a from test import support
2357n/a
2358n/a class MiscTestCase(unittest.TestCase):
2359n/a def test__all__(self):
2360n/a support.check__all__(self, foo)
2361n/a
2362n/a class OtherTestCase(unittest.TestCase):
2363n/a def test__all__(self):
2364n/a extra = {'BAR_CONST', 'FOO_CONST'}
2365n/a blacklist = {'baz'} # Undocumented name.
2366n/a # bar imports part of its API from _bar.
2367n/a support.check__all__(self, bar, ('bar', '_bar'),
2368n/a extra=extra, blacklist=blacklist)
2369n/a
2370n/a """
2371n/a
2372n/a if name_of_module is None:
2373n/a name_of_module = (module.__name__, )
2374n/a elif isinstance(name_of_module, str):
2375n/a name_of_module = (name_of_module, )
2376n/a
2377n/a expected = set(extra)
2378n/a
2379n/a for name in dir(module):
2380n/a if name.startswith('_') or name in blacklist:
2381n/a continue
2382n/a obj = getattr(module, name)
2383n/a if (getattr(obj, '__module__', None) in name_of_module or
2384n/a (not hasattr(obj, '__module__') and
2385n/a not isinstance(obj, types.ModuleType))):
2386n/a expected.add(name)
2387n/a test_case.assertCountEqual(module.__all__, expected)
2388n/a
2389n/a
2390n/aclass SuppressCrashReport:
2391n/a """Try to prevent a crash report from popping up.
2392n/a
2393n/a On Windows, don't display the Windows Error Reporting dialog. On UNIX,
2394n/a disable the creation of coredump file.
2395n/a """
2396n/a old_value = None
2397n/a old_modes = None
2398n/a
2399n/a def __enter__(self):
2400n/a """On Windows, disable Windows Error Reporting dialogs using
2401n/a SetErrorMode.
2402n/a
2403n/a On UNIX, try to save the previous core file size limit, then set
2404n/a soft limit to 0.
2405n/a """
2406n/a if sys.platform.startswith('win'):
2407n/a # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2408n/a # GetErrorMode is not available on Windows XP and Windows Server 2003,
2409n/a # but SetErrorMode returns the previous value, so we can use that
2410n/a import ctypes
2411n/a self._k32 = ctypes.windll.kernel32
2412n/a SEM_NOGPFAULTERRORBOX = 0x02
2413n/a self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
2414n/a self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
2415n/a
2416n/a # Suppress assert dialogs in debug builds
2417n/a # (see http://bugs.python.org/issue23314)
2418n/a try:
2419n/a import msvcrt
2420n/a msvcrt.CrtSetReportMode
2421n/a except (AttributeError, ImportError):
2422n/a # no msvcrt or a release build
2423n/a pass
2424n/a else:
2425n/a self.old_modes = {}
2426n/a for report_type in [msvcrt.CRT_WARN,
2427n/a msvcrt.CRT_ERROR,
2428n/a msvcrt.CRT_ASSERT]:
2429n/a old_mode = msvcrt.CrtSetReportMode(report_type,
2430n/a msvcrt.CRTDBG_MODE_FILE)
2431n/a old_file = msvcrt.CrtSetReportFile(report_type,
2432n/a msvcrt.CRTDBG_FILE_STDERR)
2433n/a self.old_modes[report_type] = old_mode, old_file
2434n/a
2435n/a else:
2436n/a if resource is not None:
2437n/a try:
2438n/a self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2439n/a resource.setrlimit(resource.RLIMIT_CORE,
2440n/a (0, self.old_value[1]))
2441n/a except (ValueError, OSError):
2442n/a pass
2443n/a if sys.platform == 'darwin':
2444n/a # Check if the 'Crash Reporter' on OSX was configured
2445n/a # in 'Developer' mode and warn that it will get triggered
2446n/a # when it is.
2447n/a #
2448n/a # This assumes that this context manager is used in tests
2449n/a # that might trigger the next manager.
2450n/a value = subprocess.Popen(['/usr/bin/defaults', 'read',
2451n/a 'com.apple.CrashReporter', 'DialogType'],
2452n/a stdout=subprocess.PIPE).communicate()[0]
2453n/a if value.strip() == b'developer':
2454n/a print("this test triggers the Crash Reporter, "
2455n/a "that is intentional", end='', flush=True)
2456n/a
2457n/a return self
2458n/a
2459n/a def __exit__(self, *ignore_exc):
2460n/a """Restore Windows ErrorMode or core file behavior to initial value."""
2461n/a if self.old_value is None:
2462n/a return
2463n/a
2464n/a if sys.platform.startswith('win'):
2465n/a self._k32.SetErrorMode(self.old_value)
2466n/a
2467n/a if self.old_modes:
2468n/a import msvcrt
2469n/a for report_type, (old_mode, old_file) in self.old_modes.items():
2470n/a msvcrt.CrtSetReportMode(report_type, old_mode)
2471n/a msvcrt.CrtSetReportFile(report_type, old_file)
2472n/a else:
2473n/a if resource is not None:
2474n/a try:
2475n/a resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2476n/a except (ValueError, OSError):
2477n/a pass
2478n/a
2479n/a
2480n/adef patch(test_instance, object_to_patch, attr_name, new_value):
2481n/a """Override 'object_to_patch'.'attr_name' with 'new_value'.
2482n/a
2483n/a Also, add a cleanup procedure to 'test_instance' to restore
2484n/a 'object_to_patch' value for 'attr_name'.
2485n/a The 'attr_name' should be a valid attribute for 'object_to_patch'.
2486n/a
2487n/a """
2488n/a # check that 'attr_name' is a real attribute for 'object_to_patch'
2489n/a # will raise AttributeError if it does not exist
2490n/a getattr(object_to_patch, attr_name)
2491n/a
2492n/a # keep a copy of the old value
2493n/a attr_is_local = False
2494n/a try:
2495n/a old_value = object_to_patch.__dict__[attr_name]
2496n/a except (AttributeError, KeyError):
2497n/a old_value = getattr(object_to_patch, attr_name, None)
2498n/a else:
2499n/a attr_is_local = True
2500n/a
2501n/a # restore the value when the test is done
2502n/a def cleanup():
2503n/a if attr_is_local:
2504n/a setattr(object_to_patch, attr_name, old_value)
2505n/a else:
2506n/a delattr(object_to_patch, attr_name)
2507n/a
2508n/a test_instance.addCleanup(cleanup)
2509n/a
2510n/a # actually override the attribute
2511n/a setattr(object_to_patch, attr_name, new_value)
2512n/a
2513n/a
2514n/adef run_in_subinterp(code):
2515n/a """
2516n/a Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2517n/a module is enabled.
2518n/a """
2519n/a # Issue #10915, #15751: PyGILState_*() functions don't work with
2520n/a # sub-interpreters, the tracemalloc module uses these functions internally
2521n/a try:
2522n/a import tracemalloc
2523n/a except ImportError:
2524n/a pass
2525n/a else:
2526n/a if tracemalloc.is_tracing():
2527n/a raise unittest.SkipTest("run_in_subinterp() cannot be used "
2528n/a "if tracemalloc module is tracing "
2529n/a "memory allocations")
2530n/a import _testcapi
2531n/a return _testcapi.run_in_subinterp(code)
2532n/a
2533n/a
2534n/adef check_free_after_iterating(test, iter, cls, args=()):
2535n/a class A(cls):
2536n/a def __del__(self):
2537n/a nonlocal done
2538n/a done = True
2539n/a try:
2540n/a next(it)
2541n/a except StopIteration:
2542n/a pass
2543n/a
2544n/a done = False
2545n/a it = iter(A(*args))
2546n/a # Issue 26494: Shouldn't crash
2547n/a test.assertRaises(StopIteration, next, it)
2548n/a # The sequence should be deallocated just after the end of iterating
2549n/a gc_collect()
2550n/a test.assertTrue(done)
2551n/a
2552n/a
2553n/adef missing_compiler_executable(cmd_names=[]):
2554n/a """Check if the compiler components used to build the interpreter exist.
2555n/a
2556n/a Check for the existence of the compiler executables whose names are listed
2557n/a in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
2558n/a and return the first missing executable or None when none is found
2559n/a missing.
2560n/a
2561n/a """
2562n/a from distutils import ccompiler, sysconfig, spawn
2563n/a compiler = ccompiler.new_compiler()
2564n/a sysconfig.customize_compiler(compiler)
2565n/a for name in compiler.executables:
2566n/a if cmd_names and name not in cmd_names:
2567n/a continue
2568n/a cmd = getattr(compiler, name)
2569n/a if cmd_names:
2570n/a assert cmd is not None, \
2571n/a "the '%s' executable is not configured" % name
2572n/a elif cmd is None:
2573n/a continue
2574n/a if spawn.find_executable(cmd[0]) is None:
2575n/a return cmd[0]
2576n/a
2577n/a
2578n/a_is_android_emulator = None
2579n/adef setswitchinterval(interval):
2580n/a # Setting a very low gil interval on the Android emulator causes python
2581n/a # to hang (issue #26939).
2582n/a minimum_interval = 1e-5
2583n/a if is_android and interval < minimum_interval:
2584n/a global _is_android_emulator
2585n/a if _is_android_emulator is None:
2586n/a _is_android_emulator = (subprocess.check_output(
2587n/a ['getprop', 'ro.kernel.qemu']).strip() == b'1')
2588n/a if _is_android_emulator:
2589n/a interval = minimum_interval
2590n/a return sys.setswitchinterval(interval)