ยปCore Development>Code coverage>Lib/test/support.py

Python code coverage for Lib/test/support.py

#countcontent
1n/a"""Supporting definitions for the Python regression tests."""
2n/a
3n/aif __name__ != 'test.support':
4n/a raise ImportError('support must be imported from the test package')
5n/a
6n/aimport contextlib
7n/aimport errno
8n/aimport functools
9n/aimport gc
10n/aimport socket
11n/aimport sys
12n/aimport os
13n/aimport platform
14n/aimport shutil
15n/aimport warnings
16n/aimport unittest
17n/aimport importlib
18n/aimport importlib.util
19n/aimport collections.abc
20n/aimport re
21n/aimport subprocess
22n/aimport time
23n/aimport sysconfig
24n/aimport fnmatch
25n/aimport logging.handlers
26n/aimport struct
27n/aimport tempfile
28n/aimport _testcapi
29n/a
30n/atry:
31n/a import _thread, threading
32n/aexcept ImportError:
33n/a _thread = None
34n/a threading = None
35n/atry:
36n/a import multiprocessing.process
37n/aexcept ImportError:
38n/a multiprocessing = None
39n/a
40n/atry:
41n/a import zlib
42n/aexcept ImportError:
43n/a zlib = None
44n/a
45n/atry:
46n/a import gzip
47n/aexcept ImportError:
48n/a gzip = None
49n/a
50n/atry:
51n/a import bz2
52n/aexcept ImportError:
53n/a bz2 = None
54n/a
55n/atry:
56n/a import lzma
57n/aexcept ImportError:
58n/a lzma = None
59n/a
60n/a__all__ = [
61n/a "Error", "TestFailed", "ResourceDenied", "import_module", "verbose",
62n/a "use_resources", "max_memuse", "record_original_stdout",
63n/a "get_original_stdout", "unload", "unlink", "rmtree", "forget",
64n/a "is_resource_enabled", "requires", "requires_freebsd_version",
65n/a "requires_linux_version", "requires_mac_ver", "find_unused_port",
66n/a "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD",
67n/a "temp_cwd", "findfile", "create_empty_file", "sortdict",
68n/a "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport",
69n/a "EnvironmentVarGuard", "TransientResource", "captured_stdout",
70n/a "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset",
71n/a "ioerror_peer_reset", "run_with_locale", 'temp_umask',
72n/a "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
73n/a "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
74n/a "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
75n/a "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
76n/a "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
77n/a "skip_unless_xattr", "import_fresh_module", "requires_zlib",
78n/a "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz",
79n/a "requires_gzip", "requires_bz2", "requires_lzma", "suppress_crash_popup",
80n/a ]
81n/a
82n/aclass Error(Exception):
83n/a """Base class for regression test exceptions."""
84n/a
85n/aclass TestFailed(Error):
86n/a """Test failed."""
87n/a
88n/aclass ResourceDenied(unittest.SkipTest):
89n/a """Test skipped because it requested a disallowed resource.
90n/a
91n/a This is raised when a test calls requires() for a resource that
92n/a has not be enabled. It is used to distinguish between expected
93n/a and unexpected skips.
94n/a """
95n/a
96n/a@contextlib.contextmanager
97n/adef _ignore_deprecated_imports(ignore=True):
98n/a """Context manager to suppress package and module deprecation
99n/a warnings when importing them.
100n/a
101n/a If ignore is False, this context manager has no effect.
102n/a """
103n/a if ignore:
104n/a with warnings.catch_warnings():
105n/a warnings.filterwarnings("ignore", ".+ (module|package)",
106n/a DeprecationWarning)
107n/a yield
108n/a else:
109n/a yield
110n/a
111n/a
112n/adef import_module(name, deprecated=False, *, required_on=()):
113n/a """Import and return the module to be tested, raising SkipTest if
114n/a it is not available.
115n/a
116n/a If deprecated is True, any module or package deprecation messages
117n/a will be suppressed. If a module is required on a platform but optional for
118n/a others, set required_on to an iterable of platform prefixes which will be
119n/a compared against sys.platform.
120n/a """
121n/a with _ignore_deprecated_imports(deprecated):
122n/a try:
123n/a return importlib.import_module(name)
124n/a except ImportError as msg:
125n/a if sys.platform.startswith(tuple(required_on)):
126n/a raise
127n/a raise unittest.SkipTest(str(msg))
128n/a
129n/a
130n/adef _save_and_remove_module(name, orig_modules):
131n/a """Helper function to save and remove a module from sys.modules
132n/a
133n/a Raise ImportError if the module can't be imported.
134n/a """
135n/a # try to import the module and raise an error if it can't be imported
136n/a if name not in sys.modules:
137n/a __import__(name)
138n/a del sys.modules[name]
139n/a for modname in list(sys.modules):
140n/a if modname == name or modname.startswith(name + '.'):
141n/a orig_modules[modname] = sys.modules[modname]
142n/a del sys.modules[modname]
143n/a
144n/adef _save_and_block_module(name, orig_modules):
145n/a """Helper function to save and block a module in sys.modules
146n/a
147n/a Return True if the module was in sys.modules, False otherwise."""
148n/a saved = True
149n/a try:
150n/a orig_modules[name] = sys.modules[name]
151n/a except KeyError:
152n/a saved = False
153n/a sys.modules[name] = None
154n/a return saved
155n/a
156n/a
157n/adef anticipate_failure(condition):
158n/a """Decorator to mark a test that is known to be broken in some cases
159n/a
160n/a Any use of this decorator should have a comment identifying the
161n/a associated tracker issue.
162n/a """
163n/a if condition:
164n/a return unittest.expectedFailure
165n/a return lambda f: f
166n/a
167n/a
168n/adef import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
169n/a """Imports and returns a module, deliberately bypassing the sys.modules cache
170n/a and importing a fresh copy of the module. Once the import is complete,
171n/a the sys.modules cache is restored to its original state.
172n/a
173n/a Modules named in fresh are also imported anew if needed by the import.
174n/a If one of these modules can't be imported, None is returned.
175n/a
176n/a Importing of modules named in blocked is prevented while the fresh import
177n/a takes place.
178n/a
179n/a If deprecated is True, any module or package deprecation messages
180n/a will be suppressed."""
181n/a # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
182n/a # to make sure that this utility function is working as expected
183n/a with _ignore_deprecated_imports(deprecated):
184n/a # Keep track of modules saved for later restoration as well
185n/a # as those which just need a blocking entry removed
186n/a orig_modules = {}
187n/a names_to_remove = []
188n/a _save_and_remove_module(name, orig_modules)
189n/a try:
190n/a for fresh_name in fresh:
191n/a _save_and_remove_module(fresh_name, orig_modules)
192n/a for blocked_name in blocked:
193n/a if not _save_and_block_module(blocked_name, orig_modules):
194n/a names_to_remove.append(blocked_name)
195n/a fresh_module = importlib.import_module(name)
196n/a except ImportError:
197n/a fresh_module = None
198n/a finally:
199n/a for orig_name, module in orig_modules.items():
200n/a sys.modules[orig_name] = module
201n/a for name_to_remove in names_to_remove:
202n/a del sys.modules[name_to_remove]
203n/a return fresh_module
204n/a
205n/a
206n/adef get_attribute(obj, name):
207n/a """Get an attribute, raising SkipTest if AttributeError is raised."""
208n/a try:
209n/a attribute = getattr(obj, name)
210n/a except AttributeError:
211n/a raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
212n/a else:
213n/a return attribute
214n/a
215n/averbose = 1 # Flag set to 0 by regrtest.py
216n/ause_resources = None # Flag set to [] by regrtest.py
217n/amax_memuse = 0 # Disable bigmem tests (they will still be run with
218n/a # small sizes, to make sure they work.)
219n/areal_max_memuse = 0
220n/afailfast = False
221n/amatch_tests = None
222n/a
223n/a# _original_stdout is meant to hold stdout at the time regrtest began.
224n/a# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
225n/a# The point is to have some flavor of stdout the user can actually see.
226n/a_original_stdout = None
227n/adef record_original_stdout(stdout):
228n/a global _original_stdout
229n/a _original_stdout = stdout
230n/a
231n/adef get_original_stdout():
232n/a return _original_stdout or sys.stdout
233n/a
234n/adef unload(name):
235n/a try:
236n/a del sys.modules[name]
237n/a except KeyError:
238n/a pass
239n/a
240n/aif sys.platform.startswith("win"):
241n/a def _waitfor(func, pathname, waitall=False):
242n/a # Peform the operation
243n/a func(pathname)
244n/a # Now setup the wait loop
245n/a if waitall:
246n/a dirname = pathname
247n/a else:
248n/a dirname, name = os.path.split(pathname)
249n/a dirname = dirname or '.'
250n/a # Check for `pathname` to be removed from the filesystem.
251n/a # The exponential backoff of the timeout amounts to a total
252n/a # of ~1 second after which the deletion is probably an error
253n/a # anyway.
254n/a # Testing on a i7@4.3GHz shows that usually only 1 iteration is
255n/a # required when contention occurs.
256n/a timeout = 0.001
257n/a while timeout < 1.0:
258n/a # Note we are only testing for the existance of the file(s) in
259n/a # the contents of the directory regardless of any security or
260n/a # access rights. If we have made it this far, we have sufficient
261n/a # permissions to do that much using Python's equivalent of the
262n/a # Windows API FindFirstFile.
263n/a # Other Windows APIs can fail or give incorrect results when
264n/a # dealing with files that are pending deletion.
265n/a L = os.listdir(dirname)
266n/a if not (L if waitall else name in L):
267n/a return
268n/a # Increase the timeout and try again
269n/a time.sleep(timeout)
270n/a timeout *= 2
271n/a warnings.warn('tests may fail, delete still pending for ' + pathname,
272n/a RuntimeWarning, stacklevel=4)
273n/a
274n/a def _unlink(filename):
275n/a _waitfor(os.unlink, filename)
276n/a
277n/a def _rmdir(dirname):
278n/a _waitfor(os.rmdir, dirname)
279n/a
280n/a def _rmtree(path):
281n/a def _rmtree_inner(path):
282n/a for name in os.listdir(path):
283n/a fullname = os.path.join(path, name)
284n/a if os.path.isdir(fullname):
285n/a _waitfor(_rmtree_inner, fullname, waitall=True)
286n/a os.rmdir(fullname)
287n/a else:
288n/a os.unlink(fullname)
289n/a _waitfor(_rmtree_inner, path, waitall=True)
290n/a _waitfor(os.rmdir, path)
291n/aelse:
292n/a _unlink = os.unlink
293n/a _rmdir = os.rmdir
294n/a _rmtree = shutil.rmtree
295n/a
296n/adef unlink(filename):
297n/a try:
298n/a _unlink(filename)
299n/a except (FileNotFoundError, NotADirectoryError):
300n/a pass
301n/a
302n/adef rmdir(dirname):
303n/a try:
304n/a _rmdir(dirname)
305n/a except FileNotFoundError:
306n/a pass
307n/a
308n/adef rmtree(path):
309n/a try:
310n/a _rmtree(path)
311n/a except FileNotFoundError:
312n/a pass
313n/a
314n/adef make_legacy_pyc(source):
315n/a """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
316n/a
317n/a The choice of .pyc or .pyo extension is done based on the __debug__ flag
318n/a value.
319n/a
320n/a :param source: The file system path to the source file. The source file
321n/a does not need to exist, however the PEP 3147 pyc file must exist.
322n/a :return: The file system path to the legacy pyc file.
323n/a """
324n/a pyc_file = importlib.util.cache_from_source(source)
325n/a up_one = os.path.dirname(os.path.abspath(source))
326n/a legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
327n/a os.rename(pyc_file, legacy_pyc)
328n/a return legacy_pyc
329n/a
330n/adef forget(modname):
331n/a """'Forget' a module was ever imported.
332n/a
333n/a This removes the module from sys.modules and deletes any PEP 3147 or
334n/a legacy .pyc and .pyo files.
335n/a """
336n/a unload(modname)
337n/a for dirname in sys.path:
338n/a source = os.path.join(dirname, modname + '.py')
339n/a # It doesn't matter if they exist or not, unlink all possible
340n/a # combinations of PEP 3147 and legacy pyc and pyo files.
341n/a unlink(source + 'c')
342n/a unlink(source + 'o')
343n/a unlink(importlib.util.cache_from_source(source, debug_override=True))
344n/a unlink(importlib.util.cache_from_source(source, debug_override=False))
345n/a
346n/a# On some platforms, should not run gui test even if it is allowed
347n/a# in `use_resources'.
348n/aif sys.platform.startswith('win'):
349n/a import ctypes
350n/a import ctypes.wintypes
351n/a def _is_gui_available():
352n/a UOI_FLAGS = 1
353n/a WSF_VISIBLE = 0x0001
354n/a class USEROBJECTFLAGS(ctypes.Structure):
355n/a _fields_ = [("fInherit", ctypes.wintypes.BOOL),
356n/a ("fReserved", ctypes.wintypes.BOOL),
357n/a ("dwFlags", ctypes.wintypes.DWORD)]
358n/a dll = ctypes.windll.user32
359n/a h = dll.GetProcessWindowStation()
360n/a if not h:
361n/a raise ctypes.WinError()
362n/a uof = USEROBJECTFLAGS()
363n/a needed = ctypes.wintypes.DWORD()
364n/a res = dll.GetUserObjectInformationW(h,
365n/a UOI_FLAGS,
366n/a ctypes.byref(uof),
367n/a ctypes.sizeof(uof),
368n/a ctypes.byref(needed))
369n/a if not res:
370n/a raise ctypes.WinError()
371n/a return bool(uof.dwFlags & WSF_VISIBLE)
372n/aelse:
373n/a def _is_gui_available():
374n/a return True
375n/a
376n/adef is_resource_enabled(resource):
377n/a """Test whether a resource is enabled. Known resources are set by
378n/a regrtest.py."""
379n/a return use_resources is not None and resource in use_resources
380n/a
381n/adef requires(resource, msg=None):
382n/a """Raise ResourceDenied if the specified resource is not available.
383n/a
384n/a If the caller's module is __main__ then automatically return True. The
385n/a possibility of False being returned occurs when regrtest.py is
386n/a executing.
387n/a """
388n/a if resource == 'gui' and not _is_gui_available():
389n/a raise unittest.SkipTest("Cannot use the 'gui' resource")
390n/a # see if the caller's module is __main__ - if so, treat as if
391n/a # the resource was set
392n/a if sys._getframe(1).f_globals.get("__name__") == "__main__":
393n/a return
394n/a if not is_resource_enabled(resource):
395n/a if msg is None:
396n/a msg = "Use of the %r resource not enabled" % resource
397n/a raise ResourceDenied(msg)
398n/a
399n/adef _requires_unix_version(sysname, min_version):
400n/a """Decorator raising SkipTest if the OS is `sysname` and the version is less
401n/a than `min_version`.
402n/a
403n/a For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
404n/a the FreeBSD version is less than 7.2.
405n/a """
406n/a def decorator(func):
407n/a @functools.wraps(func)
408n/a def wrapper(*args, **kw):
409n/a if platform.system() == sysname:
410n/a version_txt = platform.release().split('-', 1)[0]
411n/a try:
412n/a version = tuple(map(int, version_txt.split('.')))
413n/a except ValueError:
414n/a pass
415n/a else:
416n/a if version < min_version:
417n/a min_version_txt = '.'.join(map(str, min_version))
418n/a raise unittest.SkipTest(
419n/a "%s version %s or higher required, not %s"
420n/a % (sysname, min_version_txt, version_txt))
421n/a return wrapper
422n/a return decorator
423n/a
424n/adef requires_freebsd_version(*min_version):
425n/a """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
426n/a less than `min_version`.
427n/a
428n/a For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
429n/a version is less than 7.2.
430n/a """
431n/a return _requires_unix_version('FreeBSD', min_version)
432n/a
433n/adef requires_linux_version(*min_version):
434n/a """Decorator raising SkipTest if the OS is Linux and the Linux version is
435n/a less than `min_version`.
436n/a
437n/a For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
438n/a version is less than 2.6.32.
439n/a """
440n/a return _requires_unix_version('Linux', min_version)
441n/a
442n/adef requires_mac_ver(*min_version):
443n/a """Decorator raising SkipTest if the OS is Mac OS X and the OS X
444n/a version if less than min_version.
445n/a
446n/a For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
447n/a is lesser than 10.5.
448n/a """
449n/a def decorator(func):
450n/a @functools.wraps(func)
451n/a def wrapper(*args, **kw):
452n/a if sys.platform == 'darwin':
453n/a version_txt = platform.mac_ver()[0]
454n/a try:
455n/a version = tuple(map(int, version_txt.split('.')))
456n/a except ValueError:
457n/a pass
458n/a else:
459n/a if version < min_version:
460n/a min_version_txt = '.'.join(map(str, min_version))
461n/a raise unittest.SkipTest(
462n/a "Mac OS X %s or higher required, not %s"
463n/a % (min_version_txt, version_txt))
464n/a return func(*args, **kw)
465n/a wrapper.min_version = min_version
466n/a return wrapper
467n/a return decorator
468n/a
469n/a
470n/aHOST = 'localhost'
471n/a
472n/adef find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
473n/a """Returns an unused port that should be suitable for binding. This is
474n/a achieved by creating a temporary socket with the same family and type as
475n/a the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
476n/a the specified host address (defaults to 0.0.0.0) with the port set to 0,
477n/a eliciting an unused ephemeral port from the OS. The temporary socket is
478n/a then closed and deleted, and the ephemeral port is returned.
479n/a
480n/a Either this method or bind_port() should be used for any tests where a
481n/a server socket needs to be bound to a particular port for the duration of
482n/a the test. Which one to use depends on whether the calling code is creating
483n/a a python socket, or if an unused port needs to be provided in a constructor
484n/a or passed to an external program (i.e. the -accept argument to openssl's
485n/a s_server mode). Always prefer bind_port() over find_unused_port() where
486n/a possible. Hard coded ports should *NEVER* be used. As soon as a server
487n/a socket is bound to a hard coded port, the ability to run multiple instances
488n/a of the test simultaneously on the same host is compromised, which makes the
489n/a test a ticking time bomb in a buildbot environment. On Unix buildbots, this
490n/a may simply manifest as a failed test, which can be recovered from without
491n/a intervention in most cases, but on Windows, the entire python process can
492n/a completely and utterly wedge, requiring someone to log in to the buildbot
493n/a and manually kill the affected process.
494n/a
495n/a (This is easy to reproduce on Windows, unfortunately, and can be traced to
496n/a the SO_REUSEADDR socket option having different semantics on Windows versus
497n/a Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
498n/a listen and then accept connections on identical host/ports. An EADDRINUSE
499n/a OSError will be raised at some point (depending on the platform and
500n/a the order bind and listen were called on each socket).
501n/a
502n/a However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
503n/a will ever be raised when attempting to bind two identical host/ports. When
504n/a accept() is called on each socket, the second caller's process will steal
505n/a the port from the first caller, leaving them both in an awkwardly wedged
506n/a state where they'll no longer respond to any signals or graceful kills, and
507n/a must be forcibly killed via OpenProcess()/TerminateProcess().
508n/a
509n/a The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
510n/a instead of SO_REUSEADDR, which effectively affords the same semantics as
511n/a SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
512n/a Source world compared to Windows ones, this is a common mistake. A quick
513n/a look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
514n/a openssl.exe is called with the 's_server' option, for example. See
515n/a http://bugs.python.org/issue2550 for more info. The following site also
516n/a has a very thorough description about the implications of both REUSEADDR
517n/a and EXCLUSIVEADDRUSE on Windows:
518n/a http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
519n/a
520n/a XXX: although this approach is a vast improvement on previous attempts to
521n/a elicit unused ports, it rests heavily on the assumption that the ephemeral
522n/a port returned to us by the OS won't immediately be dished back out to some
523n/a other process when we close and delete our temporary socket but before our
524n/a calling code has a chance to bind the returned port. We can deal with this
525n/a issue if/when we come across it.
526n/a """
527n/a
528n/a tempsock = socket.socket(family, socktype)
529n/a port = bind_port(tempsock)
530n/a tempsock.close()
531n/a del tempsock
532n/a return port
533n/a
534n/adef bind_port(sock, host=HOST):
535n/a """Bind the socket to a free port and return the port number. Relies on
536n/a ephemeral ports in order to ensure we are using an unbound port. This is
537n/a important as many tests may be running simultaneously, especially in a
538n/a buildbot environment. This method raises an exception if the sock.family
539n/a is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
540n/a or SO_REUSEPORT set on it. Tests should *never* set these socket options
541n/a for TCP/IP sockets. The only case for setting these options is testing
542n/a multicasting via multiple UDP sockets.
543n/a
544n/a Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
545n/a on Windows), it will be set on the socket. This will prevent anyone else
546n/a from bind()'ing to our host/port for the duration of the test.
547n/a """
548n/a
549n/a if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
550n/a if hasattr(socket, 'SO_REUSEADDR'):
551n/a if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
552n/a raise TestFailed("tests should never set the SO_REUSEADDR " \
553n/a "socket option on TCP/IP sockets!")
554n/a if hasattr(socket, 'SO_REUSEPORT'):
555n/a if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
556n/a raise TestFailed("tests should never set the SO_REUSEPORT " \
557n/a "socket option on TCP/IP sockets!")
558n/a if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
559n/a sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
560n/a
561n/a sock.bind((host, 0))
562n/a port = sock.getsockname()[1]
563n/a return port
564n/a
565n/adef _is_ipv6_enabled():
566n/a """Check whether IPv6 is enabled on this host."""
567n/a if socket.has_ipv6:
568n/a sock = None
569n/a try:
570n/a sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
571n/a sock.bind(('::1', 0))
572n/a return True
573n/a except OSError:
574n/a pass
575n/a finally:
576n/a if sock:
577n/a sock.close()
578n/a return False
579n/a
580n/aIPV6_ENABLED = _is_ipv6_enabled()
581n/a
582n/a
583n/a# A constant likely larger than the underlying OS pipe buffer size, to
584n/a# make writes blocking.
585n/a# Windows limit seems to be around 512 B, and many Unix kernels have a
586n/a# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
587n/a# (see issue #17835 for a discussion of this number).
588n/aPIPE_MAX_SIZE = 4 *1024 * 1024 + 1
589n/a
590n/a
591n/a# decorator for skipping tests on non-IEEE 754 platforms
592n/arequires_IEEE_754 = unittest.skipUnless(
593n/a float.__getformat__("double").startswith("IEEE"),
594n/a "test requires IEEE 754 doubles")
595n/a
596n/arequires_zlib = unittest.skipUnless(zlib, 'requires zlib')
597n/a
598n/arequires_gzip = unittest.skipUnless(gzip, 'requires gzip')
599n/a
600n/arequires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
601n/a
602n/arequires_lzma = unittest.skipUnless(lzma, 'requires lzma')
603n/a
604n/ais_jython = sys.platform.startswith('java')
605n/a
606n/a# Filename used for testing
607n/aif os.name == 'java':
608n/a # Jython disallows @ in module names
609n/a TESTFN = '$test'
610n/aelse:
611n/a TESTFN = '@test'
612n/a
613n/a# Disambiguate TESTFN for parallel testing, while letting it remain a valid
614n/a# module name.
615n/aTESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
616n/a
617n/a# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
618n/a# or None if there is no such character.
619n/aFS_NONASCII = None
620n/afor character in (
621n/a # First try printable and common characters to have a readable filename.
622n/a # For each character, the encoding list are just example of encodings able
623n/a # to encode the character (the list is not exhaustive).
624n/a
625n/a # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
626n/a '\u00E6',
627n/a # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
628n/a '\u0130',
629n/a # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
630n/a '\u0141',
631n/a # U+03C6 (Greek Small Letter Phi): cp1253
632n/a '\u03C6',
633n/a # U+041A (Cyrillic Capital Letter Ka): cp1251
634n/a '\u041A',
635n/a # U+05D0 (Hebrew Letter Alef): Encodable to cp424
636n/a '\u05D0',
637n/a # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
638n/a '\u060C',
639n/a # U+062A (Arabic Letter Teh): cp720
640n/a '\u062A',
641n/a # U+0E01 (Thai Character Ko Kai): cp874
642n/a '\u0E01',
643n/a
644n/a # Then try more "special" characters. "special" because they may be
645n/a # interpreted or displayed differently depending on the exact locale
646n/a # encoding and the font.
647n/a
648n/a # U+00A0 (No-Break Space)
649n/a '\u00A0',
650n/a # U+20AC (Euro Sign)
651n/a '\u20AC',
652n/a):
653n/a try:
654n/a os.fsdecode(os.fsencode(character))
655n/a except UnicodeError:
656n/a pass
657n/a else:
658n/a FS_NONASCII = character
659n/a break
660n/a
661n/a# TESTFN_UNICODE is a non-ascii filename
662n/aTESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
663n/aif sys.platform == 'darwin':
664n/a # In Mac OS X's VFS API file names are, by definition, canonically
665n/a # decomposed Unicode, encoded using UTF-8. See QA1173:
666n/a # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
667n/a import unicodedata
668n/a TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
669n/aTESTFN_ENCODING = sys.getfilesystemencoding()
670n/a
671n/a# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
672n/a# encoded by the filesystem encoding (in strict mode). It can be None if we
673n/a# cannot generate such filename.
674n/aTESTFN_UNENCODABLE = None
675n/aif os.name in ('nt', 'ce'):
676n/a # skip win32s (0) or Windows 9x/ME (1)
677n/a if sys.getwindowsversion().platform >= 2:
678n/a # Different kinds of characters from various languages to minimize the
679n/a # probability that the whole name is encodable to MBCS (issue #9819)
680n/a TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
681n/a try:
682n/a TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
683n/a except UnicodeEncodeError:
684n/a pass
685n/a else:
686n/a print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
687n/a 'Unicode filename tests may not be effective'
688n/a % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
689n/a TESTFN_UNENCODABLE = None
690n/a# Mac OS X denies unencodable filenames (invalid utf-8)
691n/aelif sys.platform != 'darwin':
692n/a try:
693n/a # ascii and utf-8 cannot encode the byte 0xff
694n/a b'\xff'.decode(TESTFN_ENCODING)
695n/a except UnicodeDecodeError:
696n/a # 0xff will be encoded using the surrogate character u+DCFF
697n/a TESTFN_UNENCODABLE = TESTFN \
698n/a + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
699n/a else:
700n/a # File system encoding (eg. ISO-8859-* encodings) can encode
701n/a # the byte 0xff. Skip some unicode filename tests.
702n/a pass
703n/a
704n/a# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
705n/a# decoded from the filesystem encoding (in strict mode). It can be None if we
706n/a# cannot generate such filename (ex: the latin1 encoding can decode any byte
707n/a# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
708n/a# to the surrogateescape error handler (PEP 383), but not from the filesystem
709n/a# encoding in strict mode.
710n/aTESTFN_UNDECODABLE = None
711n/afor name in (
712n/a # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
713n/a # accepts it to create a file or a directory, or don't accept to enter to
714n/a # such directory (when the bytes name is used). So test b'\xe7' first: it is
715n/a # not decodable from cp932.
716n/a b'\xe7w\xf0',
717n/a # undecodable from ASCII, UTF-8
718n/a b'\xff',
719n/a # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
720n/a # and cp857
721n/a b'\xae\xd5'
722n/a # undecodable from UTF-8 (UNIX and Mac OS X)
723n/a b'\xed\xb2\x80', b'\xed\xb4\x80',
724n/a # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
725n/a # cp1253, cp1254, cp1255, cp1257, cp1258
726n/a b'\x81\x98',
727n/a):
728n/a try:
729n/a name.decode(TESTFN_ENCODING)
730n/a except UnicodeDecodeError:
731n/a TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
732n/a break
733n/a
734n/aif FS_NONASCII:
735n/a TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
736n/aelse:
737n/a TESTFN_NONASCII = None
738n/a
739n/a# Save the initial cwd
740n/aSAVEDCWD = os.getcwd()
741n/a
742n/a@contextlib.contextmanager
743n/adef temp_cwd(name='tempcwd', quiet=False, path=None):
744n/a """
745n/a Context manager that temporarily changes the CWD.
746n/a
747n/a An existing path may be provided as *path*, in which case this
748n/a function makes no changes to the file system.
749n/a
750n/a Otherwise, the new CWD is created in the current directory and it's
751n/a named *name*. If *quiet* is False (default) and it's not possible to
752n/a create or change the CWD, an error is raised. If it's True, only a
753n/a warning is raised and the original CWD is used.
754n/a """
755n/a saved_dir = os.getcwd()
756n/a is_temporary = False
757n/a if path is None:
758n/a path = name
759n/a try:
760n/a os.mkdir(name)
761n/a is_temporary = True
762n/a except OSError:
763n/a if not quiet:
764n/a raise
765n/a warnings.warn('tests may fail, unable to create temp CWD ' + name,
766n/a RuntimeWarning, stacklevel=3)
767n/a try:
768n/a os.chdir(path)
769n/a except OSError:
770n/a if not quiet:
771n/a raise
772n/a warnings.warn('tests may fail, unable to change the CWD to ' + path,
773n/a RuntimeWarning, stacklevel=3)
774n/a try:
775n/a yield os.getcwd()
776n/a finally:
777n/a os.chdir(saved_dir)
778n/a if is_temporary:
779n/a rmtree(name)
780n/a
781n/a
782n/aif hasattr(os, "umask"):
783n/a @contextlib.contextmanager
784n/a def temp_umask(umask):
785n/a """Context manager that temporarily sets the process umask."""
786n/a oldmask = os.umask(umask)
787n/a try:
788n/a yield
789n/a finally:
790n/a os.umask(oldmask)
791n/a
792n/a
793n/adef findfile(file, here=__file__, subdir=None):
794n/a """Try to find a file on sys.path and the working directory. If it is not
795n/a found the argument passed to the function is returned (this does not
796n/a necessarily signal failure; could still be the legitimate path)."""
797n/a if os.path.isabs(file):
798n/a return file
799n/a if subdir is not None:
800n/a file = os.path.join(subdir, file)
801n/a path = sys.path
802n/a path = [os.path.dirname(here)] + path
803n/a for dn in path:
804n/a fn = os.path.join(dn, file)
805n/a if os.path.exists(fn): return fn
806n/a return file
807n/a
808n/adef create_empty_file(filename):
809n/a """Create an empty file. If the file already exists, truncate it."""
810n/a fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
811n/a os.close(fd)
812n/a
813n/adef sortdict(dict):
814n/a "Like repr(dict), but in sorted order."
815n/a items = sorted(dict.items())
816n/a reprpairs = ["%r: %r" % pair for pair in items]
817n/a withcommas = ", ".join(reprpairs)
818n/a return "{%s}" % withcommas
819n/a
820n/adef make_bad_fd():
821n/a """
822n/a Create an invalid file descriptor by opening and closing a file and return
823n/a its fd.
824n/a """
825n/a file = open(TESTFN, "wb")
826n/a try:
827n/a return file.fileno()
828n/a finally:
829n/a file.close()
830n/a unlink(TESTFN)
831n/a
832n/adef check_syntax_error(testcase, statement):
833n/a testcase.assertRaises(SyntaxError, compile, statement,
834n/a '<test string>', 'exec')
835n/a
836n/adef open_urlresource(url, *args, **kw):
837n/a import urllib.request, urllib.parse
838n/a
839n/a check = kw.pop('check', None)
840n/a
841n/a filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
842n/a
843n/a fn = os.path.join(os.path.dirname(__file__), "data", filename)
844n/a
845n/a def check_valid_file(fn):
846n/a f = open(fn, *args, **kw)
847n/a if check is None:
848n/a return f
849n/a elif check(f):
850n/a f.seek(0)
851n/a return f
852n/a f.close()
853n/a
854n/a if os.path.exists(fn):
855n/a f = check_valid_file(fn)
856n/a if f is not None:
857n/a return f
858n/a unlink(fn)
859n/a
860n/a # Verify the requirement before downloading the file
861n/a requires('urlfetch')
862n/a
863n/a print('\tfetching %s ...' % url, file=get_original_stdout())
864n/a f = urllib.request.urlopen(url, timeout=15)
865n/a try:
866n/a with open(fn, "wb") as out:
867n/a s = f.read()
868n/a while s:
869n/a out.write(s)
870n/a s = f.read()
871n/a finally:
872n/a f.close()
873n/a
874n/a f = check_valid_file(fn)
875n/a if f is not None:
876n/a return f
877n/a raise TestFailed('invalid resource %r' % fn)
878n/a
879n/a
880n/aclass WarningsRecorder(object):
881n/a """Convenience wrapper for the warnings list returned on
882n/a entry to the warnings.catch_warnings() context manager.
883n/a """
884n/a def __init__(self, warnings_list):
885n/a self._warnings = warnings_list
886n/a self._last = 0
887n/a
888n/a def __getattr__(self, attr):
889n/a if len(self._warnings) > self._last:
890n/a return getattr(self._warnings[-1], attr)
891n/a elif attr in warnings.WarningMessage._WARNING_DETAILS:
892n/a return None
893n/a raise AttributeError("%r has no attribute %r" % (self, attr))
894n/a
895n/a @property
896n/a def warnings(self):
897n/a return self._warnings[self._last:]
898n/a
899n/a def reset(self):
900n/a self._last = len(self._warnings)
901n/a
902n/a
903n/adef _filterwarnings(filters, quiet=False):
904n/a """Catch the warnings, then check if all the expected
905n/a warnings have been raised and re-raise unexpected warnings.
906n/a If 'quiet' is True, only re-raise the unexpected warnings.
907n/a """
908n/a # Clear the warning registry of the calling module
909n/a # in order to re-raise the warnings.
910n/a frame = sys._getframe(2)
911n/a registry = frame.f_globals.get('__warningregistry__')
912n/a if registry:
913n/a registry.clear()
914n/a with warnings.catch_warnings(record=True) as w:
915n/a # Set filter "always" to record all warnings. Because
916n/a # test_warnings swap the module, we need to look up in
917n/a # the sys.modules dictionary.
918n/a sys.modules['warnings'].simplefilter("always")
919n/a yield WarningsRecorder(w)
920n/a # Filter the recorded warnings
921n/a reraise = list(w)
922n/a missing = []
923n/a for msg, cat in filters:
924n/a seen = False
925n/a for w in reraise[:]:
926n/a warning = w.message
927n/a # Filter out the matching messages
928n/a if (re.match(msg, str(warning), re.I) and
929n/a issubclass(warning.__class__, cat)):
930n/a seen = True
931n/a reraise.remove(w)
932n/a if not seen and not quiet:
933n/a # This filter caught nothing
934n/a missing.append((msg, cat.__name__))
935n/a if reraise:
936n/a raise AssertionError("unhandled warning %s" % reraise[0])
937n/a if missing:
938n/a raise AssertionError("filter (%r, %s) did not catch any warning" %
939n/a missing[0])
940n/a
941n/a
942n/a@contextlib.contextmanager
943n/adef check_warnings(*filters, **kwargs):
944n/a """Context manager to silence warnings.
945n/a
946n/a Accept 2-tuples as positional arguments:
947n/a ("message regexp", WarningCategory)
948n/a
949n/a Optional argument:
950n/a - if 'quiet' is True, it does not fail if a filter catches nothing
951n/a (default True without argument,
952n/a default False if some filters are defined)
953n/a
954n/a Without argument, it defaults to:
955n/a check_warnings(("", Warning), quiet=True)
956n/a """
957n/a quiet = kwargs.get('quiet')
958n/a if not filters:
959n/a filters = (("", Warning),)
960n/a # Preserve backward compatibility
961n/a if quiet is None:
962n/a quiet = True
963n/a return _filterwarnings(filters, quiet)
964n/a
965n/a
966n/aclass CleanImport(object):
967n/a """Context manager to force import to return a new module reference.
968n/a
969n/a This is useful for testing module-level behaviours, such as
970n/a the emission of a DeprecationWarning on import.
971n/a
972n/a Use like this:
973n/a
974n/a with CleanImport("foo"):
975n/a importlib.import_module("foo") # new reference
976n/a """
977n/a
978n/a def __init__(self, *module_names):
979n/a self.original_modules = sys.modules.copy()
980n/a for module_name in module_names:
981n/a if module_name in sys.modules:
982n/a module = sys.modules[module_name]
983n/a # It is possible that module_name is just an alias for
984n/a # another module (e.g. stub for modules renamed in 3.x).
985n/a # In that case, we also need delete the real module to clear
986n/a # the import cache.
987n/a if module.__name__ != module_name:
988n/a del sys.modules[module.__name__]
989n/a del sys.modules[module_name]
990n/a
991n/a def __enter__(self):
992n/a return self
993n/a
994n/a def __exit__(self, *ignore_exc):
995n/a sys.modules.update(self.original_modules)
996n/a
997n/a
998n/aclass EnvironmentVarGuard(collections.abc.MutableMapping):
999n/a
1000n/a """Class to help protect the environment variable properly. Can be used as
1001n/a a context manager."""
1002n/a
1003n/a def __init__(self):
1004n/a self._environ = os.environ
1005n/a self._changed = {}
1006n/a
1007n/a def __getitem__(self, envvar):
1008n/a return self._environ[envvar]
1009n/a
1010n/a def __setitem__(self, envvar, value):
1011n/a # Remember the initial value on the first access
1012n/a if envvar not in self._changed:
1013n/a self._changed[envvar] = self._environ.get(envvar)
1014n/a self._environ[envvar] = value
1015n/a
1016n/a def __delitem__(self, envvar):
1017n/a # Remember the initial value on the first access
1018n/a if envvar not in self._changed:
1019n/a self._changed[envvar] = self._environ.get(envvar)
1020n/a if envvar in self._environ:
1021n/a del self._environ[envvar]
1022n/a
1023n/a def keys(self):
1024n/a return self._environ.keys()
1025n/a
1026n/a def __iter__(self):
1027n/a return iter(self._environ)
1028n/a
1029n/a def __len__(self):
1030n/a return len(self._environ)
1031n/a
1032n/a def set(self, envvar, value):
1033n/a self[envvar] = value
1034n/a
1035n/a def unset(self, envvar):
1036n/a del self[envvar]
1037n/a
1038n/a def __enter__(self):
1039n/a return self
1040n/a
1041n/a def __exit__(self, *ignore_exc):
1042n/a for (k, v) in self._changed.items():
1043n/a if v is None:
1044n/a if k in self._environ:
1045n/a del self._environ[k]
1046n/a else:
1047n/a self._environ[k] = v
1048n/a os.environ = self._environ
1049n/a
1050n/a
1051n/aclass DirsOnSysPath(object):
1052n/a """Context manager to temporarily add directories to sys.path.
1053n/a
1054n/a This makes a copy of sys.path, appends any directories given
1055n/a as positional arguments, then reverts sys.path to the copied
1056n/a settings when the context ends.
1057n/a
1058n/a Note that *all* sys.path modifications in the body of the
1059n/a context manager, including replacement of the object,
1060n/a will be reverted at the end of the block.
1061n/a """
1062n/a
1063n/a def __init__(self, *paths):
1064n/a self.original_value = sys.path[:]
1065n/a self.original_object = sys.path
1066n/a sys.path.extend(paths)
1067n/a
1068n/a def __enter__(self):
1069n/a return self
1070n/a
1071n/a def __exit__(self, *ignore_exc):
1072n/a sys.path = self.original_object
1073n/a sys.path[:] = self.original_value
1074n/a
1075n/a
1076n/aclass TransientResource(object):
1077n/a
1078n/a """Raise ResourceDenied if an exception is raised while the context manager
1079n/a is in effect that matches the specified exception and attributes."""
1080n/a
1081n/a def __init__(self, exc, **kwargs):
1082n/a self.exc = exc
1083n/a self.attrs = kwargs
1084n/a
1085n/a def __enter__(self):
1086n/a return self
1087n/a
1088n/a def __exit__(self, type_=None, value=None, traceback=None):
1089n/a """If type_ is a subclass of self.exc and value has attributes matching
1090n/a self.attrs, raise ResourceDenied. Otherwise let the exception
1091n/a propagate (if any)."""
1092n/a if type_ is not None and issubclass(self.exc, type_):
1093n/a for attr, attr_value in self.attrs.items():
1094n/a if not hasattr(value, attr):
1095n/a break
1096n/a if getattr(value, attr) != attr_value:
1097n/a break
1098n/a else:
1099n/a raise ResourceDenied("an optional resource is not available")
1100n/a
1101n/a# Context managers that raise ResourceDenied when various issues
1102n/a# with the Internet connection manifest themselves as exceptions.
1103n/a# XXX deprecate these and use transient_internet() instead
1104n/atime_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1105n/asocket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1106n/aioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1107n/a
1108n/a
1109n/a@contextlib.contextmanager
1110n/adef transient_internet(resource_name, *, timeout=30.0, errnos=()):
1111n/a """Return a context manager that raises ResourceDenied when various issues
1112n/a with the Internet connection manifest themselves as exceptions."""
1113n/a default_errnos = [
1114n/a ('ECONNREFUSED', 111),
1115n/a ('ECONNRESET', 104),
1116n/a ('EHOSTUNREACH', 113),
1117n/a ('ENETUNREACH', 101),
1118n/a ('ETIMEDOUT', 110),
1119n/a ]
1120n/a default_gai_errnos = [
1121n/a ('EAI_AGAIN', -3),
1122n/a ('EAI_FAIL', -4),
1123n/a ('EAI_NONAME', -2),
1124n/a ('EAI_NODATA', -5),
1125n/a # Encountered when trying to resolve IPv6-only hostnames
1126n/a ('WSANO_DATA', 11004),
1127n/a ]
1128n/a
1129n/a denied = ResourceDenied("Resource %r is not available" % resource_name)
1130n/a captured_errnos = errnos
1131n/a gai_errnos = []
1132n/a if not captured_errnos:
1133n/a captured_errnos = [getattr(errno, name, num)
1134n/a for (name, num) in default_errnos]
1135n/a gai_errnos = [getattr(socket, name, num)
1136n/a for (name, num) in default_gai_errnos]
1137n/a
1138n/a def filter_error(err):
1139n/a n = getattr(err, 'errno', None)
1140n/a if (isinstance(err, socket.timeout) or
1141n/a (isinstance(err, socket.gaierror) and n in gai_errnos) or
1142n/a n in captured_errnos):
1143n/a if not verbose:
1144n/a sys.stderr.write(denied.args[0] + "\n")
1145n/a raise denied from err
1146n/a
1147n/a old_timeout = socket.getdefaulttimeout()
1148n/a try:
1149n/a if timeout is not None:
1150n/a socket.setdefaulttimeout(timeout)
1151n/a yield
1152n/a except OSError as err:
1153n/a # urllib can wrap original socket errors multiple times (!), we must
1154n/a # unwrap to get at the original error.
1155n/a while True:
1156n/a a = err.args
1157n/a if len(a) >= 1 and isinstance(a[0], OSError):
1158n/a err = a[0]
1159n/a # The error can also be wrapped as args[1]:
1160n/a # except socket.error as msg:
1161n/a # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1162n/a elif len(a) >= 2 and isinstance(a[1], OSError):
1163n/a err = a[1]
1164n/a else:
1165n/a break
1166n/a filter_error(err)
1167n/a raise
1168n/a # XXX should we catch generic exceptions and look for their
1169n/a # __cause__ or __context__?
1170n/a finally:
1171n/a socket.setdefaulttimeout(old_timeout)
1172n/a
1173n/a
1174n/a@contextlib.contextmanager
1175n/adef captured_output(stream_name):
1176n/a """Return a context manager used by captured_stdout/stdin/stderr
1177n/a that temporarily replaces the sys stream *stream_name* with a StringIO."""
1178n/a import io
1179n/a orig_stdout = getattr(sys, stream_name)
1180n/a setattr(sys, stream_name, io.StringIO())
1181n/a try:
1182n/a yield getattr(sys, stream_name)
1183n/a finally:
1184n/a setattr(sys, stream_name, orig_stdout)
1185n/a
1186n/adef captured_stdout():
1187n/a """Capture the output of sys.stdout:
1188n/a
1189n/a with captured_stdout() as stdout:
1190n/a print("hello")
1191n/a self.assertEqual(stdout.getvalue(), "hello\n")
1192n/a """
1193n/a return captured_output("stdout")
1194n/a
1195n/adef captured_stderr():
1196n/a """Capture the output of sys.stderr:
1197n/a
1198n/a with captured_stderr() as stderr:
1199n/a print("hello", file=sys.stderr)
1200n/a self.assertEqual(stderr.getvalue(), "hello\n")
1201n/a """
1202n/a return captured_output("stderr")
1203n/a
1204n/adef captured_stdin():
1205n/a """Capture the input to sys.stdin:
1206n/a
1207n/a with captured_stdin() as stdin:
1208n/a stdin.write('hello\n')
1209n/a stdin.seek(0)
1210n/a # call test code that consumes from sys.stdin
1211n/a captured = input()
1212n/a self.assertEqual(captured, "hello")
1213n/a """
1214n/a return captured_output("stdin")
1215n/a
1216n/a
1217n/adef gc_collect():
1218n/a """Force as many objects as possible to be collected.
1219n/a
1220n/a In non-CPython implementations of Python, this is needed because timely
1221n/a deallocation is not guaranteed by the garbage collector. (Even in CPython
1222n/a this can be the case in case of reference cycles.) This means that __del__
1223n/a methods may be called later than expected and weakrefs may remain alive for
1224n/a longer than expected. This function tries its best to force all garbage
1225n/a objects to disappear.
1226n/a """
1227n/a gc.collect()
1228n/a if is_jython:
1229n/a time.sleep(0.1)
1230n/a gc.collect()
1231n/a gc.collect()
1232n/a
1233n/a@contextlib.contextmanager
1234n/adef disable_gc():
1235n/a have_gc = gc.isenabled()
1236n/a gc.disable()
1237n/a try:
1238n/a yield
1239n/a finally:
1240n/a if have_gc:
1241n/a gc.enable()
1242n/a
1243n/a
1244n/adef python_is_optimized():
1245n/a """Find if Python was built with optimizations."""
1246n/a cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1247n/a final_opt = ""
1248n/a for opt in cflags.split():
1249n/a if opt.startswith('-O'):
1250n/a final_opt = opt
1251n/a return final_opt != '' and final_opt != '-O0'
1252n/a
1253n/a
1254n/a_header = 'nP'
1255n/a_align = '0n'
1256n/aif hasattr(sys, "gettotalrefcount"):
1257n/a _header = '2P' + _header
1258n/a _align = '0P'
1259n/a_vheader = _header + 'n'
1260n/a
1261n/adef calcobjsize(fmt):
1262n/a return struct.calcsize(_header + fmt + _align)
1263n/a
1264n/adef calcvobjsize(fmt):
1265n/a return struct.calcsize(_vheader + fmt + _align)
1266n/a
1267n/a
1268n/a_TPFLAGS_HAVE_GC = 1<<14
1269n/a_TPFLAGS_HEAPTYPE = 1<<9
1270n/a
1271n/adef check_sizeof(test, o, size):
1272n/a result = sys.getsizeof(o)
1273n/a # add GC header size
1274n/a if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1275n/a ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1276n/a size += _testcapi.SIZEOF_PYGC_HEAD
1277n/a msg = 'wrong size for %s: got %d, expected %d' \
1278n/a % (type(o), result, size)
1279n/a test.assertEqual(result, size, msg)
1280n/a
1281n/a#=======================================================================
1282n/a# Decorator for running a function in a different locale, correctly resetting
1283n/a# it afterwards.
1284n/a
1285n/adef run_with_locale(catstr, *locales):
1286n/a def decorator(func):
1287n/a def inner(*args, **kwds):
1288n/a try:
1289n/a import locale
1290n/a category = getattr(locale, catstr)
1291n/a orig_locale = locale.setlocale(category)
1292n/a except AttributeError:
1293n/a # if the test author gives us an invalid category string
1294n/a raise
1295n/a except:
1296n/a # cannot retrieve original locale, so do nothing
1297n/a locale = orig_locale = None
1298n/a else:
1299n/a for loc in locales:
1300n/a try:
1301n/a locale.setlocale(category, loc)
1302n/a break
1303n/a except:
1304n/a pass
1305n/a
1306n/a # now run the function, resetting the locale on exceptions
1307n/a try:
1308n/a return func(*args, **kwds)
1309n/a finally:
1310n/a if locale and orig_locale:
1311n/a locale.setlocale(category, orig_locale)
1312n/a inner.__name__ = func.__name__
1313n/a inner.__doc__ = func.__doc__
1314n/a return inner
1315n/a return decorator
1316n/a
1317n/a#=======================================================================
1318n/a# Decorator for running a function in a specific timezone, correctly
1319n/a# resetting it afterwards.
1320n/a
1321n/adef run_with_tz(tz):
1322n/a def decorator(func):
1323n/a def inner(*args, **kwds):
1324n/a try:
1325n/a tzset = time.tzset
1326n/a except AttributeError:
1327n/a raise unittest.SkipTest("tzset required")
1328n/a if 'TZ' in os.environ:
1329n/a orig_tz = os.environ['TZ']
1330n/a else:
1331n/a orig_tz = None
1332n/a os.environ['TZ'] = tz
1333n/a tzset()
1334n/a
1335n/a # now run the function, resetting the tz on exceptions
1336n/a try:
1337n/a return func(*args, **kwds)
1338n/a finally:
1339n/a if orig_tz is None:
1340n/a del os.environ['TZ']
1341n/a else:
1342n/a os.environ['TZ'] = orig_tz
1343n/a time.tzset()
1344n/a
1345n/a inner.__name__ = func.__name__
1346n/a inner.__doc__ = func.__doc__
1347n/a return inner
1348n/a return decorator
1349n/a
1350n/a#=======================================================================
1351n/a# Big-memory-test support. Separate from 'resources' because memory use
1352n/a# should be configurable.
1353n/a
1354n/a# Some handy shorthands. Note that these are used for byte-limits as well
1355n/a# as size-limits, in the various bigmem tests
1356n/a_1M = 1024*1024
1357n/a_1G = 1024 * _1M
1358n/a_2G = 2 * _1G
1359n/a_4G = 4 * _1G
1360n/a
1361n/aMAX_Py_ssize_t = sys.maxsize
1362n/a
1363n/adef set_memlimit(limit):
1364n/a global max_memuse
1365n/a global real_max_memuse
1366n/a sizes = {
1367n/a 'k': 1024,
1368n/a 'm': _1M,
1369n/a 'g': _1G,
1370n/a 't': 1024*_1G,
1371n/a }
1372n/a m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1373n/a re.IGNORECASE | re.VERBOSE)
1374n/a if m is None:
1375n/a raise ValueError('Invalid memory limit %r' % (limit,))
1376n/a memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1377n/a real_max_memuse = memlimit
1378n/a if memlimit > MAX_Py_ssize_t:
1379n/a memlimit = MAX_Py_ssize_t
1380n/a if memlimit < _2G - 1:
1381n/a raise ValueError('Memory limit %r too low to be useful' % (limit,))
1382n/a max_memuse = memlimit
1383n/a
1384n/aclass _MemoryWatchdog:
1385n/a """An object which periodically watches the process' memory consumption
1386n/a and prints it out.
1387n/a """
1388n/a
1389n/a def __init__(self):
1390n/a self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1391n/a self.started = False
1392n/a
1393n/a def start(self):
1394n/a try:
1395n/a f = open(self.procfile, 'r')
1396n/a except OSError as e:
1397n/a warnings.warn('/proc not available for stats: {}'.format(e),
1398n/a RuntimeWarning)
1399n/a sys.stderr.flush()
1400n/a return
1401n/a
1402n/a watchdog_script = findfile("memory_watchdog.py")
1403n/a self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1404n/a stdin=f, stderr=subprocess.DEVNULL)
1405n/a f.close()
1406n/a self.started = True
1407n/a
1408n/a def stop(self):
1409n/a if self.started:
1410n/a self.mem_watchdog.terminate()
1411n/a self.mem_watchdog.wait()
1412n/a
1413n/a
1414n/adef bigmemtest(size, memuse, dry_run=True):
1415n/a """Decorator for bigmem tests.
1416n/a
1417n/a 'minsize' is the minimum useful size for the test (in arbitrary,
1418n/a test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1419n/a the test, or a good estimate of it.
1420n/a
1421n/a if 'dry_run' is False, it means the test doesn't support dummy runs
1422n/a when -M is not specified.
1423n/a """
1424n/a def decorator(f):
1425n/a def wrapper(self):
1426n/a size = wrapper.size
1427n/a memuse = wrapper.memuse
1428n/a if not real_max_memuse:
1429n/a maxsize = 5147
1430n/a else:
1431n/a maxsize = size
1432n/a
1433n/a if ((real_max_memuse or not dry_run)
1434n/a and real_max_memuse < maxsize * memuse):
1435n/a raise unittest.SkipTest(
1436n/a "not enough memory: %.1fG minimum needed"
1437n/a % (size * memuse / (1024 ** 3)))
1438n/a
1439n/a if real_max_memuse and verbose:
1440n/a print()
1441n/a print(" ... expected peak memory use: {peak:.1f}G"
1442n/a .format(peak=size * memuse / (1024 ** 3)))
1443n/a watchdog = _MemoryWatchdog()
1444n/a watchdog.start()
1445n/a else:
1446n/a watchdog = None
1447n/a
1448n/a try:
1449n/a return f(self, maxsize)
1450n/a finally:
1451n/a if watchdog:
1452n/a watchdog.stop()
1453n/a
1454n/a wrapper.size = size
1455n/a wrapper.memuse = memuse
1456n/a return wrapper
1457n/a return decorator
1458n/a
1459n/adef bigaddrspacetest(f):
1460n/a """Decorator for tests that fill the address space."""
1461n/a def wrapper(self):
1462n/a if max_memuse < MAX_Py_ssize_t:
1463n/a if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1464n/a raise unittest.SkipTest(
1465n/a "not enough memory: try a 32-bit build instead")
1466n/a else:
1467n/a raise unittest.SkipTest(
1468n/a "not enough memory: %.1fG minimum needed"
1469n/a % (MAX_Py_ssize_t / (1024 ** 3)))
1470n/a else:
1471n/a return f(self)
1472n/a return wrapper
1473n/a
1474n/a#=======================================================================
1475n/a# unittest integration.
1476n/a
1477n/aclass BasicTestRunner:
1478n/a def run(self, test):
1479n/a result = unittest.TestResult()
1480n/a test(result)
1481n/a return result
1482n/a
1483n/adef _id(obj):
1484n/a return obj
1485n/a
1486n/adef requires_resource(resource):
1487n/a if resource == 'gui' and not _is_gui_available():
1488n/a return unittest.skip("resource 'gui' is not available")
1489n/a if is_resource_enabled(resource):
1490n/a return _id
1491n/a else:
1492n/a return unittest.skip("resource {0!r} is not enabled".format(resource))
1493n/a
1494n/adef cpython_only(test):
1495n/a """
1496n/a Decorator for tests only applicable on CPython.
1497n/a """
1498n/a return impl_detail(cpython=True)(test)
1499n/a
1500n/adef impl_detail(msg=None, **guards):
1501n/a if check_impl_detail(**guards):
1502n/a return _id
1503n/a if msg is None:
1504n/a guardnames, default = _parse_guards(guards)
1505n/a if default:
1506n/a msg = "implementation detail not available on {0}"
1507n/a else:
1508n/a msg = "implementation detail specific to {0}"
1509n/a guardnames = sorted(guardnames.keys())
1510n/a msg = msg.format(' or '.join(guardnames))
1511n/a return unittest.skip(msg)
1512n/a
1513n/adef _parse_guards(guards):
1514n/a # Returns a tuple ({platform_name: run_me}, default_value)
1515n/a if not guards:
1516n/a return ({'cpython': True}, False)
1517n/a is_true = list(guards.values())[0]
1518n/a assert list(guards.values()) == [is_true] * len(guards) # all True or all False
1519n/a return (guards, not is_true)
1520n/a
1521n/a# Use the following check to guard CPython's implementation-specific tests --
1522n/a# or to run them only on the implementation(s) guarded by the arguments.
1523n/adef check_impl_detail(**guards):
1524n/a """This function returns True or False depending on the host platform.
1525n/a Examples:
1526n/a if check_impl_detail(): # only on CPython (default)
1527n/a if check_impl_detail(jython=True): # only on Jython
1528n/a if check_impl_detail(cpython=False): # everywhere except on CPython
1529n/a """
1530n/a guards, default = _parse_guards(guards)
1531n/a return guards.get(platform.python_implementation().lower(), default)
1532n/a
1533n/a
1534n/adef no_tracing(func):
1535n/a """Decorator to temporarily turn off tracing for the duration of a test."""
1536n/a if not hasattr(sys, 'gettrace'):
1537n/a return func
1538n/a else:
1539n/a @functools.wraps(func)
1540n/a def wrapper(*args, **kwargs):
1541n/a original_trace = sys.gettrace()
1542n/a try:
1543n/a sys.settrace(None)
1544n/a return func(*args, **kwargs)
1545n/a finally:
1546n/a sys.settrace(original_trace)
1547n/a return wrapper
1548n/a
1549n/a
1550n/adef refcount_test(test):
1551n/a """Decorator for tests which involve reference counting.
1552n/a
1553n/a To start, the decorator does not run the test if is not run by CPython.
1554n/a After that, any trace function is unset during the test to prevent
1555n/a unexpected refcounts caused by the trace function.
1556n/a
1557n/a """
1558n/a return no_tracing(cpython_only(test))
1559n/a
1560n/a
1561n/adef _filter_suite(suite, pred):
1562n/a """Recursively filter test cases in a suite based on a predicate."""
1563n/a newtests = []
1564n/a for test in suite._tests:
1565n/a if isinstance(test, unittest.TestSuite):
1566n/a _filter_suite(test, pred)
1567n/a newtests.append(test)
1568n/a else:
1569n/a if pred(test):
1570n/a newtests.append(test)
1571n/a suite._tests = newtests
1572n/a
1573n/adef _run_suite(suite):
1574n/a """Run tests from a unittest.TestSuite-derived class."""
1575n/a if verbose:
1576n/a runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
1577n/a failfast=failfast)
1578n/a else:
1579n/a runner = BasicTestRunner()
1580n/a
1581n/a result = runner.run(suite)
1582n/a if not result.wasSuccessful():
1583n/a if len(result.errors) == 1 and not result.failures:
1584n/a err = result.errors[0][1]
1585n/a elif len(result.failures) == 1 and not result.errors:
1586n/a err = result.failures[0][1]
1587n/a else:
1588n/a err = "multiple errors occurred"
1589n/a if not verbose: err += "; run in verbose mode for details"
1590n/a raise TestFailed(err)
1591n/a
1592n/a
1593n/adef run_unittest(*classes):
1594n/a """Run tests from unittest.TestCase-derived classes."""
1595n/a valid_types = (unittest.TestSuite, unittest.TestCase)
1596n/a suite = unittest.TestSuite()
1597n/a for cls in classes:
1598n/a if isinstance(cls, str):
1599n/a if cls in sys.modules:
1600n/a suite.addTest(unittest.findTestCases(sys.modules[cls]))
1601n/a else:
1602n/a raise ValueError("str arguments must be keys in sys.modules")
1603n/a elif isinstance(cls, valid_types):
1604n/a suite.addTest(cls)
1605n/a else:
1606n/a suite.addTest(unittest.makeSuite(cls))
1607n/a def case_pred(test):
1608n/a if match_tests is None:
1609n/a return True
1610n/a for name in test.id().split("."):
1611n/a if fnmatch.fnmatchcase(name, match_tests):
1612n/a return True
1613n/a return False
1614n/a _filter_suite(suite, case_pred)
1615n/a _run_suite(suite)
1616n/a
1617n/a#=======================================================================
1618n/a# Check for the presence of docstrings.
1619n/a
1620n/aHAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1621n/a sys.platform == 'win32' or
1622n/a sysconfig.get_config_var('WITH_DOC_STRINGS'))
1623n/a
1624n/arequires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1625n/a "test requires docstrings")
1626n/a
1627n/a
1628n/a#=======================================================================
1629n/a# doctest driver.
1630n/a
1631n/adef run_doctest(module, verbosity=None, optionflags=0):
1632n/a """Run doctest on the given module. Return (#failures, #tests).
1633n/a
1634n/a If optional argument verbosity is not specified (or is None), pass
1635n/a support's belief about verbosity on to doctest. Else doctest's
1636n/a usual behavior is used (it searches sys.argv for -v).
1637n/a """
1638n/a
1639n/a import doctest
1640n/a
1641n/a if verbosity is None:
1642n/a verbosity = verbose
1643n/a else:
1644n/a verbosity = None
1645n/a
1646n/a f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1647n/a if f:
1648n/a raise TestFailed("%d of %d doctests failed" % (f, t))
1649n/a if verbose:
1650n/a print('doctest (%s) ... %d tests with zero failures' %
1651n/a (module.__name__, t))
1652n/a return f, t
1653n/a
1654n/a
1655n/a#=======================================================================
1656n/a# Support for saving and restoring the imported modules.
1657n/a
1658n/adef modules_setup():
1659n/a return sys.modules.copy(),
1660n/a
1661n/adef modules_cleanup(oldmodules):
1662n/a # Encoders/decoders are registered permanently within the internal
1663n/a # codec cache. If we destroy the corresponding modules their
1664n/a # globals will be set to None which will trip up the cached functions.
1665n/a encodings = [(k, v) for k, v in sys.modules.items()
1666n/a if k.startswith('encodings.')]
1667n/a sys.modules.clear()
1668n/a sys.modules.update(encodings)
1669n/a # XXX: This kind of problem can affect more than just encodings. In particular
1670n/a # extension modules (such as _ssl) don't cope with reloading properly.
1671n/a # Really, test modules should be cleaning out the test specific modules they
1672n/a # know they added (ala test_runpy) rather than relying on this function (as
1673n/a # test_importhooks and test_pkg do currently).
1674n/a # Implicitly imported *real* modules should be left alone (see issue 10556).
1675n/a sys.modules.update(oldmodules)
1676n/a
1677n/a#=======================================================================
1678n/a# Threading support to prevent reporting refleaks when running regrtest.py -R
1679n/a
1680n/a# NOTE: we use thread._count() rather than threading.enumerate() (or the
1681n/a# moral equivalent thereof) because a threading.Thread object is still alive
1682n/a# until its __bootstrap() method has returned, even after it has been
1683n/a# unregistered from the threading module.
1684n/a# thread._count(), on the other hand, only gets decremented *after* the
1685n/a# __bootstrap() method has returned, which gives us reliable reference counts
1686n/a# at the end of a test run.
1687n/a
1688n/adef threading_setup():
1689n/a if _thread:
1690n/a return _thread._count(), threading._dangling.copy()
1691n/a else:
1692n/a return 1, ()
1693n/a
1694n/adef threading_cleanup(*original_values):
1695n/a if not _thread:
1696n/a return
1697n/a _MAX_COUNT = 10
1698n/a for count in range(_MAX_COUNT):
1699n/a values = _thread._count(), threading._dangling
1700n/a if values == original_values:
1701n/a break
1702n/a time.sleep(0.1)
1703n/a gc_collect()
1704n/a # XXX print a warning in case of failure?
1705n/a
1706n/adef reap_threads(func):
1707n/a """Use this function when threads are being used. This will
1708n/a ensure that the threads are cleaned up even when the test fails.
1709n/a If threading is unavailable this function does nothing.
1710n/a """
1711n/a if not _thread:
1712n/a return func
1713n/a
1714n/a @functools.wraps(func)
1715n/a def decorator(*args):
1716n/a key = threading_setup()
1717n/a try:
1718n/a return func(*args)
1719n/a finally:
1720n/a threading_cleanup(*key)
1721n/a return decorator
1722n/a
1723n/adef reap_children():
1724n/a """Use this function at the end of test_main() whenever sub-processes
1725n/a are started. This will help ensure that no extra children (zombies)
1726n/a stick around to hog resources and create problems when looking
1727n/a for refleaks.
1728n/a """
1729n/a
1730n/a # Reap all our dead child processes so we don't leave zombies around.
1731n/a # These hog resources and might be causing some of the buildbots to die.
1732n/a if hasattr(os, 'waitpid'):
1733n/a any_process = -1
1734n/a while True:
1735n/a try:
1736n/a # This will raise an exception on Windows. That's ok.
1737n/a pid, status = os.waitpid(any_process, os.WNOHANG)
1738n/a if pid == 0:
1739n/a break
1740n/a except:
1741n/a break
1742n/a
1743n/a@contextlib.contextmanager
1744n/adef swap_attr(obj, attr, new_val):
1745n/a """Temporary swap out an attribute with a new object.
1746n/a
1747n/a Usage:
1748n/a with swap_attr(obj, "attr", 5):
1749n/a ...
1750n/a
1751n/a This will set obj.attr to 5 for the duration of the with: block,
1752n/a restoring the old value at the end of the block. If `attr` doesn't
1753n/a exist on `obj`, it will be created and then deleted at the end of the
1754n/a block.
1755n/a """
1756n/a if hasattr(obj, attr):
1757n/a real_val = getattr(obj, attr)
1758n/a setattr(obj, attr, new_val)
1759n/a try:
1760n/a yield
1761n/a finally:
1762n/a setattr(obj, attr, real_val)
1763n/a else:
1764n/a setattr(obj, attr, new_val)
1765n/a try:
1766n/a yield
1767n/a finally:
1768n/a delattr(obj, attr)
1769n/a
1770n/a@contextlib.contextmanager
1771n/adef swap_item(obj, item, new_val):
1772n/a """Temporary swap out an item with a new object.
1773n/a
1774n/a Usage:
1775n/a with swap_item(obj, "item", 5):
1776n/a ...
1777n/a
1778n/a This will set obj["item"] to 5 for the duration of the with: block,
1779n/a restoring the old value at the end of the block. If `item` doesn't
1780n/a exist on `obj`, it will be created and then deleted at the end of the
1781n/a block.
1782n/a """
1783n/a if item in obj:
1784n/a real_val = obj[item]
1785n/a obj[item] = new_val
1786n/a try:
1787n/a yield
1788n/a finally:
1789n/a obj[item] = real_val
1790n/a else:
1791n/a obj[item] = new_val
1792n/a try:
1793n/a yield
1794n/a finally:
1795n/a del obj[item]
1796n/a
1797n/adef strip_python_stderr(stderr):
1798n/a """Strip the stderr of a Python process from potential debug output
1799n/a emitted by the interpreter.
1800n/a
1801n/a This will typically be run on the result of the communicate() method
1802n/a of a subprocess.Popen object.
1803n/a """
1804n/a stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
1805n/a return stderr
1806n/a
1807n/adef args_from_interpreter_flags():
1808n/a """Return a list of command-line arguments reproducing the current
1809n/a settings in sys.flags and sys.warnoptions."""
1810n/a return subprocess._args_from_interpreter_flags()
1811n/a
1812n/a#============================================================
1813n/a# Support for assertions about logging.
1814n/a#============================================================
1815n/a
1816n/aclass TestHandler(logging.handlers.BufferingHandler):
1817n/a def __init__(self, matcher):
1818n/a # BufferingHandler takes a "capacity" argument
1819n/a # so as to know when to flush. As we're overriding
1820n/a # shouldFlush anyway, we can set a capacity of zero.
1821n/a # You can call flush() manually to clear out the
1822n/a # buffer.
1823n/a logging.handlers.BufferingHandler.__init__(self, 0)
1824n/a self.matcher = matcher
1825n/a
1826n/a def shouldFlush(self):
1827n/a return False
1828n/a
1829n/a def emit(self, record):
1830n/a self.format(record)
1831n/a self.buffer.append(record.__dict__)
1832n/a
1833n/a def matches(self, **kwargs):
1834n/a """
1835n/a Look for a saved dict whose keys/values match the supplied arguments.
1836n/a """
1837n/a result = False
1838n/a for d in self.buffer:
1839n/a if self.matcher.matches(d, **kwargs):
1840n/a result = True
1841n/a break
1842n/a return result
1843n/a
1844n/aclass Matcher(object):
1845n/a
1846n/a _partial_matches = ('msg', 'message')
1847n/a
1848n/a def matches(self, d, **kwargs):
1849n/a """
1850n/a Try to match a single dict with the supplied arguments.
1851n/a
1852n/a Keys whose values are strings and which are in self._partial_matches
1853n/a will be checked for partial (i.e. substring) matches. You can extend
1854n/a this scheme to (for example) do regular expression matching, etc.
1855n/a """
1856n/a result = True
1857n/a for k in kwargs:
1858n/a v = kwargs[k]
1859n/a dv = d.get(k)
1860n/a if not self.match_value(k, dv, v):
1861n/a result = False
1862n/a break
1863n/a return result
1864n/a
1865n/a def match_value(self, k, dv, v):
1866n/a """
1867n/a Try to match a single stored value (dv) with a supplied value (v).
1868n/a """
1869n/a if type(v) != type(dv):
1870n/a result = False
1871n/a elif type(dv) is not str or k not in self._partial_matches:
1872n/a result = (v == dv)
1873n/a else:
1874n/a result = dv.find(v) >= 0
1875n/a return result
1876n/a
1877n/a
1878n/a_can_symlink = None
1879n/adef can_symlink():
1880n/a global _can_symlink
1881n/a if _can_symlink is not None:
1882n/a return _can_symlink
1883n/a symlink_path = TESTFN + "can_symlink"
1884n/a try:
1885n/a os.symlink(TESTFN, symlink_path)
1886n/a can = True
1887n/a except (OSError, NotImplementedError, AttributeError):
1888n/a can = False
1889n/a else:
1890n/a os.remove(symlink_path)
1891n/a _can_symlink = can
1892n/a return can
1893n/a
1894n/adef skip_unless_symlink(test):
1895n/a """Skip decorator for tests that require functional symlink"""
1896n/a ok = can_symlink()
1897n/a msg = "Requires functional symlink implementation"
1898n/a return test if ok else unittest.skip(msg)(test)
1899n/a
1900n/a_can_xattr = None
1901n/adef can_xattr():
1902n/a global _can_xattr
1903n/a if _can_xattr is not None:
1904n/a return _can_xattr
1905n/a if not hasattr(os, "setxattr"):
1906n/a can = False
1907n/a else:
1908n/a tmp_fp, tmp_name = tempfile.mkstemp()
1909n/a try:
1910n/a with open(TESTFN, "wb") as fp:
1911n/a try:
1912n/a # TESTFN & tempfile may use different file systems with
1913n/a # different capabilities
1914n/a os.setxattr(tmp_fp, b"user.test", b"")
1915n/a os.setxattr(fp.fileno(), b"user.test", b"")
1916n/a # Kernels < 2.6.39 don't respect setxattr flags.
1917n/a kernel_version = platform.release()
1918n/a m = re.match("2.6.(\d{1,2})", kernel_version)
1919n/a can = m is None or int(m.group(1)) >= 39
1920n/a except OSError:
1921n/a can = False
1922n/a finally:
1923n/a unlink(TESTFN)
1924n/a unlink(tmp_name)
1925n/a _can_xattr = can
1926n/a return can
1927n/a
1928n/adef skip_unless_xattr(test):
1929n/a """Skip decorator for tests that require functional extended attributes"""
1930n/a ok = can_xattr()
1931n/a msg = "no non-broken extended attribute support"
1932n/a return test if ok else unittest.skip(msg)(test)
1933n/a
1934n/a
1935n/aif sys.platform.startswith('win'):
1936n/a @contextlib.contextmanager
1937n/a def suppress_crash_popup():
1938n/a """Disable Windows Error Reporting dialogs using SetErrorMode."""
1939n/a # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621%28v=vs.85%29.aspx
1940n/a # GetErrorMode is not available on Windows XP and Windows Server 2003,
1941n/a # but SetErrorMode returns the previous value, so we can use that
1942n/a import ctypes
1943n/a k32 = ctypes.windll.kernel32
1944n/a SEM_NOGPFAULTERRORBOX = 0x02
1945n/a old_error_mode = k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
1946n/a k32.SetErrorMode(old_error_mode | SEM_NOGPFAULTERRORBOX)
1947n/a try:
1948n/a yield
1949n/a finally:
1950n/a k32.SetErrorMode(old_error_mode)
1951n/aelse:
1952n/a # this is a no-op for other platforms
1953n/a @contextlib.contextmanager
1954n/a def suppress_crash_popup():
1955n/a yield
1956n/a
1957n/a
1958n/adef patch(test_instance, object_to_patch, attr_name, new_value):
1959n/a """Override 'object_to_patch'.'attr_name' with 'new_value'.
1960n/a
1961n/a Also, add a cleanup procedure to 'test_instance' to restore
1962n/a 'object_to_patch' value for 'attr_name'.
1963n/a The 'attr_name' should be a valid attribute for 'object_to_patch'.
1964n/a
1965n/a """
1966n/a # check that 'attr_name' is a real attribute for 'object_to_patch'
1967n/a # will raise AttributeError if it does not exist
1968n/a getattr(object_to_patch, attr_name)
1969n/a
1970n/a # keep a copy of the old value
1971n/a attr_is_local = False
1972n/a try:
1973n/a old_value = object_to_patch.__dict__[attr_name]
1974n/a except (AttributeError, KeyError):
1975n/a old_value = getattr(object_to_patch, attr_name, None)
1976n/a else:
1977n/a attr_is_local = True
1978n/a
1979n/a # restore the value when the test is done
1980n/a def cleanup():
1981n/a if attr_is_local:
1982n/a setattr(object_to_patch, attr_name, old_value)
1983n/a else:
1984n/a delattr(object_to_patch, attr_name)
1985n/a
1986n/a test_instance.addCleanup(cleanup)
1987n/a
1988n/a # actually override the attribute
1989n/a setattr(object_to_patch, attr_name, new_value)