ยปCore Development>Code coverage>Lib/test/test_contextlib.py

Python code coverage for Lib/test/test_contextlib.py

#countcontent
1n/a"""Unit tests for contextlib.py, and other context managers."""
2n/a
3n/aimport io
4n/aimport sys
5n/aimport tempfile
6n/aimport unittest
7n/afrom contextlib import * # Tests __all__
8n/afrom test import support
9n/atry:
10n/a import threading
11n/aexcept ImportError:
12n/a threading = None
13n/a
14n/a
15n/aclass TestAbstractContextManager(unittest.TestCase):
16n/a
17n/a def test_enter(self):
18n/a class DefaultEnter(AbstractContextManager):
19n/a def __exit__(self, *args):
20n/a super().__exit__(*args)
21n/a
22n/a manager = DefaultEnter()
23n/a self.assertIs(manager.__enter__(), manager)
24n/a
25n/a def test_exit_is_abstract(self):
26n/a class MissingExit(AbstractContextManager):
27n/a pass
28n/a
29n/a with self.assertRaises(TypeError):
30n/a MissingExit()
31n/a
32n/a def test_structural_subclassing(self):
33n/a class ManagerFromScratch:
34n/a def __enter__(self):
35n/a return self
36n/a def __exit__(self, exc_type, exc_value, traceback):
37n/a return None
38n/a
39n/a self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
40n/a
41n/a class DefaultEnter(AbstractContextManager):
42n/a def __exit__(self, *args):
43n/a super().__exit__(*args)
44n/a
45n/a self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
46n/a
47n/a
48n/aclass ContextManagerTestCase(unittest.TestCase):
49n/a
50n/a def test_contextmanager_plain(self):
51n/a state = []
52n/a @contextmanager
53n/a def woohoo():
54n/a state.append(1)
55n/a yield 42
56n/a state.append(999)
57n/a with woohoo() as x:
58n/a self.assertEqual(state, [1])
59n/a self.assertEqual(x, 42)
60n/a state.append(x)
61n/a self.assertEqual(state, [1, 42, 999])
62n/a
63n/a def test_contextmanager_finally(self):
64n/a state = []
65n/a @contextmanager
66n/a def woohoo():
67n/a state.append(1)
68n/a try:
69n/a yield 42
70n/a finally:
71n/a state.append(999)
72n/a with self.assertRaises(ZeroDivisionError):
73n/a with woohoo() as x:
74n/a self.assertEqual(state, [1])
75n/a self.assertEqual(x, 42)
76n/a state.append(x)
77n/a raise ZeroDivisionError()
78n/a self.assertEqual(state, [1, 42, 999])
79n/a
80n/a def test_contextmanager_no_reraise(self):
81n/a @contextmanager
82n/a def whee():
83n/a yield
84n/a ctx = whee()
85n/a ctx.__enter__()
86n/a # Calling __exit__ should not result in an exception
87n/a self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
88n/a
89n/a def test_contextmanager_trap_yield_after_throw(self):
90n/a @contextmanager
91n/a def whoo():
92n/a try:
93n/a yield
94n/a except:
95n/a yield
96n/a ctx = whoo()
97n/a ctx.__enter__()
98n/a self.assertRaises(
99n/a RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
100n/a )
101n/a
102n/a def test_contextmanager_except(self):
103n/a state = []
104n/a @contextmanager
105n/a def woohoo():
106n/a state.append(1)
107n/a try:
108n/a yield 42
109n/a except ZeroDivisionError as e:
110n/a state.append(e.args[0])
111n/a self.assertEqual(state, [1, 42, 999])
112n/a with woohoo() as x:
113n/a self.assertEqual(state, [1])
114n/a self.assertEqual(x, 42)
115n/a state.append(x)
116n/a raise ZeroDivisionError(999)
117n/a self.assertEqual(state, [1, 42, 999])
118n/a
119n/a def test_contextmanager_except_stopiter(self):
120n/a stop_exc = StopIteration('spam')
121n/a @contextmanager
122n/a def woohoo():
123n/a yield
124n/a try:
125n/a with self.assertWarnsRegex(DeprecationWarning,
126n/a "StopIteration"):
127n/a with woohoo():
128n/a raise stop_exc
129n/a except Exception as ex:
130n/a self.assertIs(ex, stop_exc)
131n/a else:
132n/a self.fail('StopIteration was suppressed')
133n/a
134n/a def test_contextmanager_except_pep479(self):
135n/a code = """\
136n/afrom __future__ import generator_stop
137n/afrom contextlib import contextmanager
138n/a@contextmanager
139n/adef woohoo():
140n/a yield
141n/a"""
142n/a locals = {}
143n/a exec(code, locals, locals)
144n/a woohoo = locals['woohoo']
145n/a
146n/a stop_exc = StopIteration('spam')
147n/a try:
148n/a with woohoo():
149n/a raise stop_exc
150n/a except Exception as ex:
151n/a self.assertIs(ex, stop_exc)
152n/a else:
153n/a self.fail('StopIteration was suppressed')
154n/a
155n/a def _create_contextmanager_attribs(self):
156n/a def attribs(**kw):
157n/a def decorate(func):
158n/a for k,v in kw.items():
159n/a setattr(func,k,v)
160n/a return func
161n/a return decorate
162n/a @contextmanager
163n/a @attribs(foo='bar')
164n/a def baz(spam):
165n/a """Whee!"""
166n/a return baz
167n/a
168n/a def test_contextmanager_attribs(self):
169n/a baz = self._create_contextmanager_attribs()
170n/a self.assertEqual(baz.__name__,'baz')
171n/a self.assertEqual(baz.foo, 'bar')
172n/a
173n/a @support.requires_docstrings
174n/a def test_contextmanager_doc_attrib(self):
175n/a baz = self._create_contextmanager_attribs()
176n/a self.assertEqual(baz.__doc__, "Whee!")
177n/a
178n/a @support.requires_docstrings
179n/a def test_instance_docstring_given_cm_docstring(self):
180n/a baz = self._create_contextmanager_attribs()(None)
181n/a self.assertEqual(baz.__doc__, "Whee!")
182n/a
183n/a def test_keywords(self):
184n/a # Ensure no keyword arguments are inhibited
185n/a @contextmanager
186n/a def woohoo(self, func, args, kwds):
187n/a yield (self, func, args, kwds)
188n/a with woohoo(self=11, func=22, args=33, kwds=44) as target:
189n/a self.assertEqual(target, (11, 22, 33, 44))
190n/a
191n/a
192n/aclass ClosingTestCase(unittest.TestCase):
193n/a
194n/a @support.requires_docstrings
195n/a def test_instance_docs(self):
196n/a # Issue 19330: ensure context manager instances have good docstrings
197n/a cm_docstring = closing.__doc__
198n/a obj = closing(None)
199n/a self.assertEqual(obj.__doc__, cm_docstring)
200n/a
201n/a def test_closing(self):
202n/a state = []
203n/a class C:
204n/a def close(self):
205n/a state.append(1)
206n/a x = C()
207n/a self.assertEqual(state, [])
208n/a with closing(x) as y:
209n/a self.assertEqual(x, y)
210n/a self.assertEqual(state, [1])
211n/a
212n/a def test_closing_error(self):
213n/a state = []
214n/a class C:
215n/a def close(self):
216n/a state.append(1)
217n/a x = C()
218n/a self.assertEqual(state, [])
219n/a with self.assertRaises(ZeroDivisionError):
220n/a with closing(x) as y:
221n/a self.assertEqual(x, y)
222n/a 1 / 0
223n/a self.assertEqual(state, [1])
224n/a
225n/aclass FileContextTestCase(unittest.TestCase):
226n/a
227n/a def testWithOpen(self):
228n/a tfn = tempfile.mktemp()
229n/a try:
230n/a f = None
231n/a with open(tfn, "w") as f:
232n/a self.assertFalse(f.closed)
233n/a f.write("Booh\n")
234n/a self.assertTrue(f.closed)
235n/a f = None
236n/a with self.assertRaises(ZeroDivisionError):
237n/a with open(tfn, "r") as f:
238n/a self.assertFalse(f.closed)
239n/a self.assertEqual(f.read(), "Booh\n")
240n/a 1 / 0
241n/a self.assertTrue(f.closed)
242n/a finally:
243n/a support.unlink(tfn)
244n/a
245n/a@unittest.skipUnless(threading, 'Threading required for this test.')
246n/aclass LockContextTestCase(unittest.TestCase):
247n/a
248n/a def boilerPlate(self, lock, locked):
249n/a self.assertFalse(locked())
250n/a with lock:
251n/a self.assertTrue(locked())
252n/a self.assertFalse(locked())
253n/a with self.assertRaises(ZeroDivisionError):
254n/a with lock:
255n/a self.assertTrue(locked())
256n/a 1 / 0
257n/a self.assertFalse(locked())
258n/a
259n/a def testWithLock(self):
260n/a lock = threading.Lock()
261n/a self.boilerPlate(lock, lock.locked)
262n/a
263n/a def testWithRLock(self):
264n/a lock = threading.RLock()
265n/a self.boilerPlate(lock, lock._is_owned)
266n/a
267n/a def testWithCondition(self):
268n/a lock = threading.Condition()
269n/a def locked():
270n/a return lock._is_owned()
271n/a self.boilerPlate(lock, locked)
272n/a
273n/a def testWithSemaphore(self):
274n/a lock = threading.Semaphore()
275n/a def locked():
276n/a if lock.acquire(False):
277n/a lock.release()
278n/a return False
279n/a else:
280n/a return True
281n/a self.boilerPlate(lock, locked)
282n/a
283n/a def testWithBoundedSemaphore(self):
284n/a lock = threading.BoundedSemaphore()
285n/a def locked():
286n/a if lock.acquire(False):
287n/a lock.release()
288n/a return False
289n/a else:
290n/a return True
291n/a self.boilerPlate(lock, locked)
292n/a
293n/a
294n/aclass mycontext(ContextDecorator):
295n/a """Example decoration-compatible context manager for testing"""
296n/a started = False
297n/a exc = None
298n/a catch = False
299n/a
300n/a def __enter__(self):
301n/a self.started = True
302n/a return self
303n/a
304n/a def __exit__(self, *exc):
305n/a self.exc = exc
306n/a return self.catch
307n/a
308n/a
309n/aclass TestContextDecorator(unittest.TestCase):
310n/a
311n/a @support.requires_docstrings
312n/a def test_instance_docs(self):
313n/a # Issue 19330: ensure context manager instances have good docstrings
314n/a cm_docstring = mycontext.__doc__
315n/a obj = mycontext()
316n/a self.assertEqual(obj.__doc__, cm_docstring)
317n/a
318n/a def test_contextdecorator(self):
319n/a context = mycontext()
320n/a with context as result:
321n/a self.assertIs(result, context)
322n/a self.assertTrue(context.started)
323n/a
324n/a self.assertEqual(context.exc, (None, None, None))
325n/a
326n/a
327n/a def test_contextdecorator_with_exception(self):
328n/a context = mycontext()
329n/a
330n/a with self.assertRaisesRegex(NameError, 'foo'):
331n/a with context:
332n/a raise NameError('foo')
333n/a self.assertIsNotNone(context.exc)
334n/a self.assertIs(context.exc[0], NameError)
335n/a
336n/a context = mycontext()
337n/a context.catch = True
338n/a with context:
339n/a raise NameError('foo')
340n/a self.assertIsNotNone(context.exc)
341n/a self.assertIs(context.exc[0], NameError)
342n/a
343n/a
344n/a def test_decorator(self):
345n/a context = mycontext()
346n/a
347n/a @context
348n/a def test():
349n/a self.assertIsNone(context.exc)
350n/a self.assertTrue(context.started)
351n/a test()
352n/a self.assertEqual(context.exc, (None, None, None))
353n/a
354n/a
355n/a def test_decorator_with_exception(self):
356n/a context = mycontext()
357n/a
358n/a @context
359n/a def test():
360n/a self.assertIsNone(context.exc)
361n/a self.assertTrue(context.started)
362n/a raise NameError('foo')
363n/a
364n/a with self.assertRaisesRegex(NameError, 'foo'):
365n/a test()
366n/a self.assertIsNotNone(context.exc)
367n/a self.assertIs(context.exc[0], NameError)
368n/a
369n/a
370n/a def test_decorating_method(self):
371n/a context = mycontext()
372n/a
373n/a class Test(object):
374n/a
375n/a @context
376n/a def method(self, a, b, c=None):
377n/a self.a = a
378n/a self.b = b
379n/a self.c = c
380n/a
381n/a # these tests are for argument passing when used as a decorator
382n/a test = Test()
383n/a test.method(1, 2)
384n/a self.assertEqual(test.a, 1)
385n/a self.assertEqual(test.b, 2)
386n/a self.assertEqual(test.c, None)
387n/a
388n/a test = Test()
389n/a test.method('a', 'b', 'c')
390n/a self.assertEqual(test.a, 'a')
391n/a self.assertEqual(test.b, 'b')
392n/a self.assertEqual(test.c, 'c')
393n/a
394n/a test = Test()
395n/a test.method(a=1, b=2)
396n/a self.assertEqual(test.a, 1)
397n/a self.assertEqual(test.b, 2)
398n/a
399n/a
400n/a def test_typo_enter(self):
401n/a class mycontext(ContextDecorator):
402n/a def __unter__(self):
403n/a pass
404n/a def __exit__(self, *exc):
405n/a pass
406n/a
407n/a with self.assertRaises(AttributeError):
408n/a with mycontext():
409n/a pass
410n/a
411n/a
412n/a def test_typo_exit(self):
413n/a class mycontext(ContextDecorator):
414n/a def __enter__(self):
415n/a pass
416n/a def __uxit__(self, *exc):
417n/a pass
418n/a
419n/a with self.assertRaises(AttributeError):
420n/a with mycontext():
421n/a pass
422n/a
423n/a
424n/a def test_contextdecorator_as_mixin(self):
425n/a class somecontext(object):
426n/a started = False
427n/a exc = None
428n/a
429n/a def __enter__(self):
430n/a self.started = True
431n/a return self
432n/a
433n/a def __exit__(self, *exc):
434n/a self.exc = exc
435n/a
436n/a class mycontext(somecontext, ContextDecorator):
437n/a pass
438n/a
439n/a context = mycontext()
440n/a @context
441n/a def test():
442n/a self.assertIsNone(context.exc)
443n/a self.assertTrue(context.started)
444n/a test()
445n/a self.assertEqual(context.exc, (None, None, None))
446n/a
447n/a
448n/a def test_contextmanager_as_decorator(self):
449n/a @contextmanager
450n/a def woohoo(y):
451n/a state.append(y)
452n/a yield
453n/a state.append(999)
454n/a
455n/a state = []
456n/a @woohoo(1)
457n/a def test(x):
458n/a self.assertEqual(state, [1])
459n/a state.append(x)
460n/a test('something')
461n/a self.assertEqual(state, [1, 'something', 999])
462n/a
463n/a # Issue #11647: Ensure the decorated function is 'reusable'
464n/a state = []
465n/a test('something else')
466n/a self.assertEqual(state, [1, 'something else', 999])
467n/a
468n/a
469n/aclass TestExitStack(unittest.TestCase):
470n/a
471n/a @support.requires_docstrings
472n/a def test_instance_docs(self):
473n/a # Issue 19330: ensure context manager instances have good docstrings
474n/a cm_docstring = ExitStack.__doc__
475n/a obj = ExitStack()
476n/a self.assertEqual(obj.__doc__, cm_docstring)
477n/a
478n/a def test_no_resources(self):
479n/a with ExitStack():
480n/a pass
481n/a
482n/a def test_callback(self):
483n/a expected = [
484n/a ((), {}),
485n/a ((1,), {}),
486n/a ((1,2), {}),
487n/a ((), dict(example=1)),
488n/a ((1,), dict(example=1)),
489n/a ((1,2), dict(example=1)),
490n/a ]
491n/a result = []
492n/a def _exit(*args, **kwds):
493n/a """Test metadata propagation"""
494n/a result.append((args, kwds))
495n/a with ExitStack() as stack:
496n/a for args, kwds in reversed(expected):
497n/a if args and kwds:
498n/a f = stack.callback(_exit, *args, **kwds)
499n/a elif args:
500n/a f = stack.callback(_exit, *args)
501n/a elif kwds:
502n/a f = stack.callback(_exit, **kwds)
503n/a else:
504n/a f = stack.callback(_exit)
505n/a self.assertIs(f, _exit)
506n/a for wrapper in stack._exit_callbacks:
507n/a self.assertIs(wrapper.__wrapped__, _exit)
508n/a self.assertNotEqual(wrapper.__name__, _exit.__name__)
509n/a self.assertIsNone(wrapper.__doc__, _exit.__doc__)
510n/a self.assertEqual(result, expected)
511n/a
512n/a def test_push(self):
513n/a exc_raised = ZeroDivisionError
514n/a def _expect_exc(exc_type, exc, exc_tb):
515n/a self.assertIs(exc_type, exc_raised)
516n/a def _suppress_exc(*exc_details):
517n/a return True
518n/a def _expect_ok(exc_type, exc, exc_tb):
519n/a self.assertIsNone(exc_type)
520n/a self.assertIsNone(exc)
521n/a self.assertIsNone(exc_tb)
522n/a class ExitCM(object):
523n/a def __init__(self, check_exc):
524n/a self.check_exc = check_exc
525n/a def __enter__(self):
526n/a self.fail("Should not be called!")
527n/a def __exit__(self, *exc_details):
528n/a self.check_exc(*exc_details)
529n/a with ExitStack() as stack:
530n/a stack.push(_expect_ok)
531n/a self.assertIs(stack._exit_callbacks[-1], _expect_ok)
532n/a cm = ExitCM(_expect_ok)
533n/a stack.push(cm)
534n/a self.assertIs(stack._exit_callbacks[-1].__self__, cm)
535n/a stack.push(_suppress_exc)
536n/a self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
537n/a cm = ExitCM(_expect_exc)
538n/a stack.push(cm)
539n/a self.assertIs(stack._exit_callbacks[-1].__self__, cm)
540n/a stack.push(_expect_exc)
541n/a self.assertIs(stack._exit_callbacks[-1], _expect_exc)
542n/a stack.push(_expect_exc)
543n/a self.assertIs(stack._exit_callbacks[-1], _expect_exc)
544n/a 1/0
545n/a
546n/a def test_enter_context(self):
547n/a class TestCM(object):
548n/a def __enter__(self):
549n/a result.append(1)
550n/a def __exit__(self, *exc_details):
551n/a result.append(3)
552n/a
553n/a result = []
554n/a cm = TestCM()
555n/a with ExitStack() as stack:
556n/a @stack.callback # Registered first => cleaned up last
557n/a def _exit():
558n/a result.append(4)
559n/a self.assertIsNotNone(_exit)
560n/a stack.enter_context(cm)
561n/a self.assertIs(stack._exit_callbacks[-1].__self__, cm)
562n/a result.append(2)
563n/a self.assertEqual(result, [1, 2, 3, 4])
564n/a
565n/a def test_close(self):
566n/a result = []
567n/a with ExitStack() as stack:
568n/a @stack.callback
569n/a def _exit():
570n/a result.append(1)
571n/a self.assertIsNotNone(_exit)
572n/a stack.close()
573n/a result.append(2)
574n/a self.assertEqual(result, [1, 2])
575n/a
576n/a def test_pop_all(self):
577n/a result = []
578n/a with ExitStack() as stack:
579n/a @stack.callback
580n/a def _exit():
581n/a result.append(3)
582n/a self.assertIsNotNone(_exit)
583n/a new_stack = stack.pop_all()
584n/a result.append(1)
585n/a result.append(2)
586n/a new_stack.close()
587n/a self.assertEqual(result, [1, 2, 3])
588n/a
589n/a def test_exit_raise(self):
590n/a with self.assertRaises(ZeroDivisionError):
591n/a with ExitStack() as stack:
592n/a stack.push(lambda *exc: False)
593n/a 1/0
594n/a
595n/a def test_exit_suppress(self):
596n/a with ExitStack() as stack:
597n/a stack.push(lambda *exc: True)
598n/a 1/0
599n/a
600n/a def test_exit_exception_chaining_reference(self):
601n/a # Sanity check to make sure that ExitStack chaining matches
602n/a # actual nested with statements
603n/a class RaiseExc:
604n/a def __init__(self, exc):
605n/a self.exc = exc
606n/a def __enter__(self):
607n/a return self
608n/a def __exit__(self, *exc_details):
609n/a raise self.exc
610n/a
611n/a class RaiseExcWithContext:
612n/a def __init__(self, outer, inner):
613n/a self.outer = outer
614n/a self.inner = inner
615n/a def __enter__(self):
616n/a return self
617n/a def __exit__(self, *exc_details):
618n/a try:
619n/a raise self.inner
620n/a except:
621n/a raise self.outer
622n/a
623n/a class SuppressExc:
624n/a def __enter__(self):
625n/a return self
626n/a def __exit__(self, *exc_details):
627n/a type(self).saved_details = exc_details
628n/a return True
629n/a
630n/a try:
631n/a with RaiseExc(IndexError):
632n/a with RaiseExcWithContext(KeyError, AttributeError):
633n/a with SuppressExc():
634n/a with RaiseExc(ValueError):
635n/a 1 / 0
636n/a except IndexError as exc:
637n/a self.assertIsInstance(exc.__context__, KeyError)
638n/a self.assertIsInstance(exc.__context__.__context__, AttributeError)
639n/a # Inner exceptions were suppressed
640n/a self.assertIsNone(exc.__context__.__context__.__context__)
641n/a else:
642n/a self.fail("Expected IndexError, but no exception was raised")
643n/a # Check the inner exceptions
644n/a inner_exc = SuppressExc.saved_details[1]
645n/a self.assertIsInstance(inner_exc, ValueError)
646n/a self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
647n/a
648n/a def test_exit_exception_chaining(self):
649n/a # Ensure exception chaining matches the reference behaviour
650n/a def raise_exc(exc):
651n/a raise exc
652n/a
653n/a saved_details = None
654n/a def suppress_exc(*exc_details):
655n/a nonlocal saved_details
656n/a saved_details = exc_details
657n/a return True
658n/a
659n/a try:
660n/a with ExitStack() as stack:
661n/a stack.callback(raise_exc, IndexError)
662n/a stack.callback(raise_exc, KeyError)
663n/a stack.callback(raise_exc, AttributeError)
664n/a stack.push(suppress_exc)
665n/a stack.callback(raise_exc, ValueError)
666n/a 1 / 0
667n/a except IndexError as exc:
668n/a self.assertIsInstance(exc.__context__, KeyError)
669n/a self.assertIsInstance(exc.__context__.__context__, AttributeError)
670n/a # Inner exceptions were suppressed
671n/a self.assertIsNone(exc.__context__.__context__.__context__)
672n/a else:
673n/a self.fail("Expected IndexError, but no exception was raised")
674n/a # Check the inner exceptions
675n/a inner_exc = saved_details[1]
676n/a self.assertIsInstance(inner_exc, ValueError)
677n/a self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
678n/a
679n/a def test_exit_exception_non_suppressing(self):
680n/a # http://bugs.python.org/issue19092
681n/a def raise_exc(exc):
682n/a raise exc
683n/a
684n/a def suppress_exc(*exc_details):
685n/a return True
686n/a
687n/a try:
688n/a with ExitStack() as stack:
689n/a stack.callback(lambda: None)
690n/a stack.callback(raise_exc, IndexError)
691n/a except Exception as exc:
692n/a self.assertIsInstance(exc, IndexError)
693n/a else:
694n/a self.fail("Expected IndexError, but no exception was raised")
695n/a
696n/a try:
697n/a with ExitStack() as stack:
698n/a stack.callback(raise_exc, KeyError)
699n/a stack.push(suppress_exc)
700n/a stack.callback(raise_exc, IndexError)
701n/a except Exception as exc:
702n/a self.assertIsInstance(exc, KeyError)
703n/a else:
704n/a self.fail("Expected KeyError, but no exception was raised")
705n/a
706n/a def test_exit_exception_with_correct_context(self):
707n/a # http://bugs.python.org/issue20317
708n/a @contextmanager
709n/a def gets_the_context_right(exc):
710n/a try:
711n/a yield
712n/a finally:
713n/a raise exc
714n/a
715n/a exc1 = Exception(1)
716n/a exc2 = Exception(2)
717n/a exc3 = Exception(3)
718n/a exc4 = Exception(4)
719n/a
720n/a # The contextmanager already fixes the context, so prior to the
721n/a # fix, ExitStack would try to fix it *again* and get into an
722n/a # infinite self-referential loop
723n/a try:
724n/a with ExitStack() as stack:
725n/a stack.enter_context(gets_the_context_right(exc4))
726n/a stack.enter_context(gets_the_context_right(exc3))
727n/a stack.enter_context(gets_the_context_right(exc2))
728n/a raise exc1
729n/a except Exception as exc:
730n/a self.assertIs(exc, exc4)
731n/a self.assertIs(exc.__context__, exc3)
732n/a self.assertIs(exc.__context__.__context__, exc2)
733n/a self.assertIs(exc.__context__.__context__.__context__, exc1)
734n/a self.assertIsNone(
735n/a exc.__context__.__context__.__context__.__context__)
736n/a
737n/a def test_exit_exception_with_existing_context(self):
738n/a # Addresses a lack of test coverage discovered after checking in a
739n/a # fix for issue 20317 that still contained debugging code.
740n/a def raise_nested(inner_exc, outer_exc):
741n/a try:
742n/a raise inner_exc
743n/a finally:
744n/a raise outer_exc
745n/a exc1 = Exception(1)
746n/a exc2 = Exception(2)
747n/a exc3 = Exception(3)
748n/a exc4 = Exception(4)
749n/a exc5 = Exception(5)
750n/a try:
751n/a with ExitStack() as stack:
752n/a stack.callback(raise_nested, exc4, exc5)
753n/a stack.callback(raise_nested, exc2, exc3)
754n/a raise exc1
755n/a except Exception as exc:
756n/a self.assertIs(exc, exc5)
757n/a self.assertIs(exc.__context__, exc4)
758n/a self.assertIs(exc.__context__.__context__, exc3)
759n/a self.assertIs(exc.__context__.__context__.__context__, exc2)
760n/a self.assertIs(
761n/a exc.__context__.__context__.__context__.__context__, exc1)
762n/a self.assertIsNone(
763n/a exc.__context__.__context__.__context__.__context__.__context__)
764n/a
765n/a
766n/a
767n/a def test_body_exception_suppress(self):
768n/a def suppress_exc(*exc_details):
769n/a return True
770n/a try:
771n/a with ExitStack() as stack:
772n/a stack.push(suppress_exc)
773n/a 1/0
774n/a except IndexError as exc:
775n/a self.fail("Expected no exception, got IndexError")
776n/a
777n/a def test_exit_exception_chaining_suppress(self):
778n/a with ExitStack() as stack:
779n/a stack.push(lambda *exc: True)
780n/a stack.push(lambda *exc: 1/0)
781n/a stack.push(lambda *exc: {}[1])
782n/a
783n/a def test_excessive_nesting(self):
784n/a # The original implementation would die with RecursionError here
785n/a with ExitStack() as stack:
786n/a for i in range(10000):
787n/a stack.callback(int)
788n/a
789n/a def test_instance_bypass(self):
790n/a class Example(object): pass
791n/a cm = Example()
792n/a cm.__exit__ = object()
793n/a stack = ExitStack()
794n/a self.assertRaises(AttributeError, stack.enter_context, cm)
795n/a stack.push(cm)
796n/a self.assertIs(stack._exit_callbacks[-1], cm)
797n/a
798n/a def test_dont_reraise_RuntimeError(self):
799n/a # https://bugs.python.org/issue27122
800n/a class UniqueException(Exception): pass
801n/a class UniqueRuntimeError(RuntimeError): pass
802n/a
803n/a @contextmanager
804n/a def second():
805n/a try:
806n/a yield 1
807n/a except Exception as exc:
808n/a raise UniqueException("new exception") from exc
809n/a
810n/a @contextmanager
811n/a def first():
812n/a try:
813n/a yield 1
814n/a except Exception as exc:
815n/a raise exc
816n/a
817n/a # The UniqueRuntimeError should be caught by second()'s exception
818n/a # handler which chain raised a new UniqueException.
819n/a with self.assertRaises(UniqueException) as err_ctx:
820n/a with ExitStack() as es_ctx:
821n/a es_ctx.enter_context(second())
822n/a es_ctx.enter_context(first())
823n/a raise UniqueRuntimeError("please no infinite loop.")
824n/a
825n/a exc = err_ctx.exception
826n/a self.assertIsInstance(exc, UniqueException)
827n/a self.assertIsInstance(exc.__context__, UniqueRuntimeError)
828n/a self.assertIsNone(exc.__context__.__context__)
829n/a self.assertIsNone(exc.__context__.__cause__)
830n/a self.assertIs(exc.__cause__, exc.__context__)
831n/a
832n/a
833n/aclass TestRedirectStream:
834n/a
835n/a redirect_stream = None
836n/a orig_stream = None
837n/a
838n/a @support.requires_docstrings
839n/a def test_instance_docs(self):
840n/a # Issue 19330: ensure context manager instances have good docstrings
841n/a cm_docstring = self.redirect_stream.__doc__
842n/a obj = self.redirect_stream(None)
843n/a self.assertEqual(obj.__doc__, cm_docstring)
844n/a
845n/a def test_no_redirect_in_init(self):
846n/a orig_stdout = getattr(sys, self.orig_stream)
847n/a self.redirect_stream(None)
848n/a self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
849n/a
850n/a def test_redirect_to_string_io(self):
851n/a f = io.StringIO()
852n/a msg = "Consider an API like help(), which prints directly to stdout"
853n/a orig_stdout = getattr(sys, self.orig_stream)
854n/a with self.redirect_stream(f):
855n/a print(msg, file=getattr(sys, self.orig_stream))
856n/a self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
857n/a s = f.getvalue().strip()
858n/a self.assertEqual(s, msg)
859n/a
860n/a def test_enter_result_is_target(self):
861n/a f = io.StringIO()
862n/a with self.redirect_stream(f) as enter_result:
863n/a self.assertIs(enter_result, f)
864n/a
865n/a def test_cm_is_reusable(self):
866n/a f = io.StringIO()
867n/a write_to_f = self.redirect_stream(f)
868n/a orig_stdout = getattr(sys, self.orig_stream)
869n/a with write_to_f:
870n/a print("Hello", end=" ", file=getattr(sys, self.orig_stream))
871n/a with write_to_f:
872n/a print("World!", file=getattr(sys, self.orig_stream))
873n/a self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
874n/a s = f.getvalue()
875n/a self.assertEqual(s, "Hello World!\n")
876n/a
877n/a def test_cm_is_reentrant(self):
878n/a f = io.StringIO()
879n/a write_to_f = self.redirect_stream(f)
880n/a orig_stdout = getattr(sys, self.orig_stream)
881n/a with write_to_f:
882n/a print("Hello", end=" ", file=getattr(sys, self.orig_stream))
883n/a with write_to_f:
884n/a print("World!", file=getattr(sys, self.orig_stream))
885n/a self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
886n/a s = f.getvalue()
887n/a self.assertEqual(s, "Hello World!\n")
888n/a
889n/a
890n/aclass TestRedirectStdout(TestRedirectStream, unittest.TestCase):
891n/a
892n/a redirect_stream = redirect_stdout
893n/a orig_stream = "stdout"
894n/a
895n/a
896n/aclass TestRedirectStderr(TestRedirectStream, unittest.TestCase):
897n/a
898n/a redirect_stream = redirect_stderr
899n/a orig_stream = "stderr"
900n/a
901n/a
902n/aclass TestSuppress(unittest.TestCase):
903n/a
904n/a @support.requires_docstrings
905n/a def test_instance_docs(self):
906n/a # Issue 19330: ensure context manager instances have good docstrings
907n/a cm_docstring = suppress.__doc__
908n/a obj = suppress()
909n/a self.assertEqual(obj.__doc__, cm_docstring)
910n/a
911n/a def test_no_result_from_enter(self):
912n/a with suppress(ValueError) as enter_result:
913n/a self.assertIsNone(enter_result)
914n/a
915n/a def test_no_exception(self):
916n/a with suppress(ValueError):
917n/a self.assertEqual(pow(2, 5), 32)
918n/a
919n/a def test_exact_exception(self):
920n/a with suppress(TypeError):
921n/a len(5)
922n/a
923n/a def test_exception_hierarchy(self):
924n/a with suppress(LookupError):
925n/a 'Hello'[50]
926n/a
927n/a def test_other_exception(self):
928n/a with self.assertRaises(ZeroDivisionError):
929n/a with suppress(TypeError):
930n/a 1/0
931n/a
932n/a def test_no_args(self):
933n/a with self.assertRaises(ZeroDivisionError):
934n/a with suppress():
935n/a 1/0
936n/a
937n/a def test_multiple_exception_args(self):
938n/a with suppress(ZeroDivisionError, TypeError):
939n/a 1/0
940n/a with suppress(ZeroDivisionError, TypeError):
941n/a len(5)
942n/a
943n/a def test_cm_is_reentrant(self):
944n/a ignore_exceptions = suppress(Exception)
945n/a with ignore_exceptions:
946n/a pass
947n/a with ignore_exceptions:
948n/a len(5)
949n/a with ignore_exceptions:
950n/a with ignore_exceptions: # Check nested usage
951n/a len(5)
952n/a outer_continued = True
953n/a 1/0
954n/a self.assertTrue(outer_continued)
955n/a
956n/aif __name__ == "__main__":
957n/a unittest.main()