ยปCore Development>Code coverage>Lib/test/test_finalization.py

Python code coverage for Lib/test/test_finalization.py

#countcontent
1n/a"""
2n/aTests for object finalization semantics, as outlined in PEP 442.
3n/a"""
4n/a
5n/aimport contextlib
6n/aimport gc
7n/aimport unittest
8n/aimport weakref
9n/a
10n/atry:
11n/a from _testcapi import with_tp_del
12n/aexcept ImportError:
13n/a def with_tp_del(cls):
14n/a class C(object):
15n/a def __new__(cls, *args, **kwargs):
16n/a raise TypeError('requires _testcapi.with_tp_del')
17n/a return C
18n/a
19n/afrom test import support
20n/a
21n/a
22n/aclass NonGCSimpleBase:
23n/a """
24n/a The base class for all the objects under test, equipped with various
25n/a testing features.
26n/a """
27n/a
28n/a survivors = []
29n/a del_calls = []
30n/a tp_del_calls = []
31n/a errors = []
32n/a
33n/a _cleaning = False
34n/a
35n/a __slots__ = ()
36n/a
37n/a @classmethod
38n/a def _cleanup(cls):
39n/a cls.survivors.clear()
40n/a cls.errors.clear()
41n/a gc.garbage.clear()
42n/a gc.collect()
43n/a cls.del_calls.clear()
44n/a cls.tp_del_calls.clear()
45n/a
46n/a @classmethod
47n/a @contextlib.contextmanager
48n/a def test(cls):
49n/a """
50n/a A context manager to use around all finalization tests.
51n/a """
52n/a with support.disable_gc():
53n/a cls.del_calls.clear()
54n/a cls.tp_del_calls.clear()
55n/a NonGCSimpleBase._cleaning = False
56n/a try:
57n/a yield
58n/a if cls.errors:
59n/a raise cls.errors[0]
60n/a finally:
61n/a NonGCSimpleBase._cleaning = True
62n/a cls._cleanup()
63n/a
64n/a def check_sanity(self):
65n/a """
66n/a Check the object is sane (non-broken).
67n/a """
68n/a
69n/a def __del__(self):
70n/a """
71n/a PEP 442 finalizer. Record that this was called, check the
72n/a object is in a sane state, and invoke a side effect.
73n/a """
74n/a try:
75n/a if not self._cleaning:
76n/a self.del_calls.append(id(self))
77n/a self.check_sanity()
78n/a self.side_effect()
79n/a except Exception as e:
80n/a self.errors.append(e)
81n/a
82n/a def side_effect(self):
83n/a """
84n/a A side effect called on destruction.
85n/a """
86n/a
87n/a
88n/aclass SimpleBase(NonGCSimpleBase):
89n/a
90n/a def __init__(self):
91n/a self.id_ = id(self)
92n/a
93n/a def check_sanity(self):
94n/a assert self.id_ == id(self)
95n/a
96n/a
97n/aclass NonGC(NonGCSimpleBase):
98n/a __slots__ = ()
99n/a
100n/aclass NonGCResurrector(NonGCSimpleBase):
101n/a __slots__ = ()
102n/a
103n/a def side_effect(self):
104n/a """
105n/a Resurrect self by storing self in a class-wide list.
106n/a """
107n/a self.survivors.append(self)
108n/a
109n/aclass Simple(SimpleBase):
110n/a pass
111n/a
112n/aclass SimpleResurrector(NonGCResurrector, SimpleBase):
113n/a pass
114n/a
115n/a
116n/aclass TestBase:
117n/a
118n/a def setUp(self):
119n/a self.old_garbage = gc.garbage[:]
120n/a gc.garbage[:] = []
121n/a
122n/a def tearDown(self):
123n/a # None of the tests here should put anything in gc.garbage
124n/a try:
125n/a self.assertEqual(gc.garbage, [])
126n/a finally:
127n/a del self.old_garbage
128n/a gc.collect()
129n/a
130n/a def assert_del_calls(self, ids):
131n/a self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
132n/a
133n/a def assert_tp_del_calls(self, ids):
134n/a self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
135n/a
136n/a def assert_survivors(self, ids):
137n/a self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
138n/a
139n/a def assert_garbage(self, ids):
140n/a self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
141n/a
142n/a def clear_survivors(self):
143n/a SimpleBase.survivors.clear()
144n/a
145n/a
146n/aclass SimpleFinalizationTest(TestBase, unittest.TestCase):
147n/a """
148n/a Test finalization without refcycles.
149n/a """
150n/a
151n/a def test_simple(self):
152n/a with SimpleBase.test():
153n/a s = Simple()
154n/a ids = [id(s)]
155n/a wr = weakref.ref(s)
156n/a del s
157n/a gc.collect()
158n/a self.assert_del_calls(ids)
159n/a self.assert_survivors([])
160n/a self.assertIs(wr(), None)
161n/a gc.collect()
162n/a self.assert_del_calls(ids)
163n/a self.assert_survivors([])
164n/a
165n/a def test_simple_resurrect(self):
166n/a with SimpleBase.test():
167n/a s = SimpleResurrector()
168n/a ids = [id(s)]
169n/a wr = weakref.ref(s)
170n/a del s
171n/a gc.collect()
172n/a self.assert_del_calls(ids)
173n/a self.assert_survivors(ids)
174n/a self.assertIsNot(wr(), None)
175n/a self.clear_survivors()
176n/a gc.collect()
177n/a self.assert_del_calls(ids)
178n/a self.assert_survivors([])
179n/a self.assertIs(wr(), None)
180n/a
181n/a def test_non_gc(self):
182n/a with SimpleBase.test():
183n/a s = NonGC()
184n/a self.assertFalse(gc.is_tracked(s))
185n/a ids = [id(s)]
186n/a del s
187n/a gc.collect()
188n/a self.assert_del_calls(ids)
189n/a self.assert_survivors([])
190n/a gc.collect()
191n/a self.assert_del_calls(ids)
192n/a self.assert_survivors([])
193n/a
194n/a def test_non_gc_resurrect(self):
195n/a with SimpleBase.test():
196n/a s = NonGCResurrector()
197n/a self.assertFalse(gc.is_tracked(s))
198n/a ids = [id(s)]
199n/a del s
200n/a gc.collect()
201n/a self.assert_del_calls(ids)
202n/a self.assert_survivors(ids)
203n/a self.clear_survivors()
204n/a gc.collect()
205n/a self.assert_del_calls(ids * 2)
206n/a self.assert_survivors(ids)
207n/a
208n/a
209n/aclass SelfCycleBase:
210n/a
211n/a def __init__(self):
212n/a super().__init__()
213n/a self.ref = self
214n/a
215n/a def check_sanity(self):
216n/a super().check_sanity()
217n/a assert self.ref is self
218n/a
219n/aclass SimpleSelfCycle(SelfCycleBase, Simple):
220n/a pass
221n/a
222n/aclass SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
223n/a pass
224n/a
225n/aclass SuicidalSelfCycle(SelfCycleBase, Simple):
226n/a
227n/a def side_effect(self):
228n/a """
229n/a Explicitly break the reference cycle.
230n/a """
231n/a self.ref = None
232n/a
233n/a
234n/aclass SelfCycleFinalizationTest(TestBase, unittest.TestCase):
235n/a """
236n/a Test finalization of an object having a single cyclic reference to
237n/a itself.
238n/a """
239n/a
240n/a def test_simple(self):
241n/a with SimpleBase.test():
242n/a s = SimpleSelfCycle()
243n/a ids = [id(s)]
244n/a wr = weakref.ref(s)
245n/a del s
246n/a gc.collect()
247n/a self.assert_del_calls(ids)
248n/a self.assert_survivors([])
249n/a self.assertIs(wr(), None)
250n/a gc.collect()
251n/a self.assert_del_calls(ids)
252n/a self.assert_survivors([])
253n/a
254n/a def test_simple_resurrect(self):
255n/a # Test that __del__ can resurrect the object being finalized.
256n/a with SimpleBase.test():
257n/a s = SelfCycleResurrector()
258n/a ids = [id(s)]
259n/a wr = weakref.ref(s)
260n/a del s
261n/a gc.collect()
262n/a self.assert_del_calls(ids)
263n/a self.assert_survivors(ids)
264n/a # XXX is this desirable?
265n/a self.assertIs(wr(), None)
266n/a # When trying to destroy the object a second time, __del__
267n/a # isn't called anymore (and the object isn't resurrected).
268n/a self.clear_survivors()
269n/a gc.collect()
270n/a self.assert_del_calls(ids)
271n/a self.assert_survivors([])
272n/a self.assertIs(wr(), None)
273n/a
274n/a def test_simple_suicide(self):
275n/a # Test the GC is able to deal with an object that kills its last
276n/a # reference during __del__.
277n/a with SimpleBase.test():
278n/a s = SuicidalSelfCycle()
279n/a ids = [id(s)]
280n/a wr = weakref.ref(s)
281n/a del s
282n/a gc.collect()
283n/a self.assert_del_calls(ids)
284n/a self.assert_survivors([])
285n/a self.assertIs(wr(), None)
286n/a gc.collect()
287n/a self.assert_del_calls(ids)
288n/a self.assert_survivors([])
289n/a self.assertIs(wr(), None)
290n/a
291n/a
292n/aclass ChainedBase:
293n/a
294n/a def chain(self, left):
295n/a self.suicided = False
296n/a self.left = left
297n/a left.right = self
298n/a
299n/a def check_sanity(self):
300n/a super().check_sanity()
301n/a if self.suicided:
302n/a assert self.left is None
303n/a assert self.right is None
304n/a else:
305n/a left = self.left
306n/a if left.suicided:
307n/a assert left.right is None
308n/a else:
309n/a assert left.right is self
310n/a right = self.right
311n/a if right.suicided:
312n/a assert right.left is None
313n/a else:
314n/a assert right.left is self
315n/a
316n/aclass SimpleChained(ChainedBase, Simple):
317n/a pass
318n/a
319n/aclass ChainedResurrector(ChainedBase, SimpleResurrector):
320n/a pass
321n/a
322n/aclass SuicidalChained(ChainedBase, Simple):
323n/a
324n/a def side_effect(self):
325n/a """
326n/a Explicitly break the reference cycle.
327n/a """
328n/a self.suicided = True
329n/a self.left = None
330n/a self.right = None
331n/a
332n/a
333n/aclass CycleChainFinalizationTest(TestBase, unittest.TestCase):
334n/a """
335n/a Test finalization of a cyclic chain. These tests are similar in
336n/a spirit to the self-cycle tests above, but the collectable object
337n/a graph isn't trivial anymore.
338n/a """
339n/a
340n/a def build_chain(self, classes):
341n/a nodes = [cls() for cls in classes]
342n/a for i in range(len(nodes)):
343n/a nodes[i].chain(nodes[i-1])
344n/a return nodes
345n/a
346n/a def check_non_resurrecting_chain(self, classes):
347n/a N = len(classes)
348n/a with SimpleBase.test():
349n/a nodes = self.build_chain(classes)
350n/a ids = [id(s) for s in nodes]
351n/a wrs = [weakref.ref(s) for s in nodes]
352n/a del nodes
353n/a gc.collect()
354n/a self.assert_del_calls(ids)
355n/a self.assert_survivors([])
356n/a self.assertEqual([wr() for wr in wrs], [None] * N)
357n/a gc.collect()
358n/a self.assert_del_calls(ids)
359n/a
360n/a def check_resurrecting_chain(self, classes):
361n/a N = len(classes)
362n/a with SimpleBase.test():
363n/a nodes = self.build_chain(classes)
364n/a N = len(nodes)
365n/a ids = [id(s) for s in nodes]
366n/a survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
367n/a wrs = [weakref.ref(s) for s in nodes]
368n/a del nodes
369n/a gc.collect()
370n/a self.assert_del_calls(ids)
371n/a self.assert_survivors(survivor_ids)
372n/a # XXX desirable?
373n/a self.assertEqual([wr() for wr in wrs], [None] * N)
374n/a self.clear_survivors()
375n/a gc.collect()
376n/a self.assert_del_calls(ids)
377n/a self.assert_survivors([])
378n/a
379n/a def test_homogenous(self):
380n/a self.check_non_resurrecting_chain([SimpleChained] * 3)
381n/a
382n/a def test_homogenous_resurrect(self):
383n/a self.check_resurrecting_chain([ChainedResurrector] * 3)
384n/a
385n/a def test_homogenous_suicidal(self):
386n/a self.check_non_resurrecting_chain([SuicidalChained] * 3)
387n/a
388n/a def test_heterogenous_suicidal_one(self):
389n/a self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
390n/a
391n/a def test_heterogenous_suicidal_two(self):
392n/a self.check_non_resurrecting_chain(
393n/a [SuicidalChained] * 2 + [SimpleChained] * 2)
394n/a
395n/a def test_heterogenous_resurrect_one(self):
396n/a self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
397n/a
398n/a def test_heterogenous_resurrect_two(self):
399n/a self.check_resurrecting_chain(
400n/a [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
401n/a
402n/a def test_heterogenous_resurrect_three(self):
403n/a self.check_resurrecting_chain(
404n/a [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
405n/a
406n/a
407n/a# NOTE: the tp_del slot isn't automatically inherited, so we have to call
408n/a# with_tp_del() for each instantiated class.
409n/a
410n/aclass LegacyBase(SimpleBase):
411n/a
412n/a def __del__(self):
413n/a try:
414n/a # Do not invoke side_effect here, since we are now exercising
415n/a # the tp_del slot.
416n/a if not self._cleaning:
417n/a self.del_calls.append(id(self))
418n/a self.check_sanity()
419n/a except Exception as e:
420n/a self.errors.append(e)
421n/a
422n/a def __tp_del__(self):
423n/a """
424n/a Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
425n/a """
426n/a try:
427n/a if not self._cleaning:
428n/a self.tp_del_calls.append(id(self))
429n/a self.check_sanity()
430n/a self.side_effect()
431n/a except Exception as e:
432n/a self.errors.append(e)
433n/a
434n/a@with_tp_del
435n/aclass Legacy(LegacyBase):
436n/a pass
437n/a
438n/a@with_tp_del
439n/aclass LegacyResurrector(LegacyBase):
440n/a
441n/a def side_effect(self):
442n/a """
443n/a Resurrect self by storing self in a class-wide list.
444n/a """
445n/a self.survivors.append(self)
446n/a
447n/a@with_tp_del
448n/aclass LegacySelfCycle(SelfCycleBase, LegacyBase):
449n/a pass
450n/a
451n/a
452n/a@support.cpython_only
453n/aclass LegacyFinalizationTest(TestBase, unittest.TestCase):
454n/a """
455n/a Test finalization of objects with a tp_del.
456n/a """
457n/a
458n/a def tearDown(self):
459n/a # These tests need to clean up a bit more, since they create
460n/a # uncollectable objects.
461n/a gc.garbage.clear()
462n/a gc.collect()
463n/a super().tearDown()
464n/a
465n/a def test_legacy(self):
466n/a with SimpleBase.test():
467n/a s = Legacy()
468n/a ids = [id(s)]
469n/a wr = weakref.ref(s)
470n/a del s
471n/a gc.collect()
472n/a self.assert_del_calls(ids)
473n/a self.assert_tp_del_calls(ids)
474n/a self.assert_survivors([])
475n/a self.assertIs(wr(), None)
476n/a gc.collect()
477n/a self.assert_del_calls(ids)
478n/a self.assert_tp_del_calls(ids)
479n/a
480n/a def test_legacy_resurrect(self):
481n/a with SimpleBase.test():
482n/a s = LegacyResurrector()
483n/a ids = [id(s)]
484n/a wr = weakref.ref(s)
485n/a del s
486n/a gc.collect()
487n/a self.assert_del_calls(ids)
488n/a self.assert_tp_del_calls(ids)
489n/a self.assert_survivors(ids)
490n/a # weakrefs are cleared before tp_del is called.
491n/a self.assertIs(wr(), None)
492n/a self.clear_survivors()
493n/a gc.collect()
494n/a self.assert_del_calls(ids)
495n/a self.assert_tp_del_calls(ids * 2)
496n/a self.assert_survivors(ids)
497n/a self.assertIs(wr(), None)
498n/a
499n/a def test_legacy_self_cycle(self):
500n/a # Self-cycles with legacy finalizers end up in gc.garbage.
501n/a with SimpleBase.test():
502n/a s = LegacySelfCycle()
503n/a ids = [id(s)]
504n/a wr = weakref.ref(s)
505n/a del s
506n/a gc.collect()
507n/a self.assert_del_calls([])
508n/a self.assert_tp_del_calls([])
509n/a self.assert_survivors([])
510n/a self.assert_garbage(ids)
511n/a self.assertIsNot(wr(), None)
512n/a # Break the cycle to allow collection
513n/a gc.garbage[0].ref = None
514n/a self.assert_garbage([])
515n/a self.assertIs(wr(), None)
516n/a
517n/a
518n/aif __name__ == "__main__":
519n/a unittest.main()