ยปCore Development>Code coverage>Lib/test/test_gc.py

Python code coverage for Lib/test/test_gc.py

#countcontent
1n/aimport unittest
2n/afrom test.support import (verbose, refcount_test, run_unittest,
3n/a strip_python_stderr, cpython_only, start_threads,
4n/a temp_dir, requires_type_collecting)
5n/afrom test.support.script_helper import assert_python_ok, make_script
6n/a
7n/aimport sys
8n/aimport time
9n/aimport gc
10n/aimport weakref
11n/a
12n/atry:
13n/a import threading
14n/aexcept ImportError:
15n/a threading = None
16n/a
17n/atry:
18n/a from _testcapi import with_tp_del
19n/aexcept ImportError:
20n/a def with_tp_del(cls):
21n/a class C(object):
22n/a def __new__(cls, *args, **kwargs):
23n/a raise TypeError('requires _testcapi.with_tp_del')
24n/a return C
25n/a
26n/a### Support code
27n/a###############################################################################
28n/a
29n/a# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
30n/a# cyclic gc.
31n/a
32n/a# An instance of C1055820 has a self-loop, so becomes cyclic trash when
33n/a# unreachable.
34n/aclass C1055820(object):
35n/a def __init__(self, i):
36n/a self.i = i
37n/a self.loop = self
38n/a
39n/aclass GC_Detector(object):
40n/a # Create an instance I. Then gc hasn't happened again so long as
41n/a # I.gc_happened is false.
42n/a
43n/a def __init__(self):
44n/a self.gc_happened = False
45n/a
46n/a def it_happened(ignored):
47n/a self.gc_happened = True
48n/a
49n/a # Create a piece of cyclic trash that triggers it_happened when
50n/a # gc collects it.
51n/a self.wr = weakref.ref(C1055820(666), it_happened)
52n/a
53n/a@with_tp_del
54n/aclass Uncollectable(object):
55n/a """Create a reference cycle with multiple __del__ methods.
56n/a
57n/a An object in a reference cycle will never have zero references,
58n/a and so must be garbage collected. If one or more objects in the
59n/a cycle have __del__ methods, the gc refuses to guess an order,
60n/a and leaves the cycle uncollected."""
61n/a def __init__(self, partner=None):
62n/a if partner is None:
63n/a self.partner = Uncollectable(partner=self)
64n/a else:
65n/a self.partner = partner
66n/a def __tp_del__(self):
67n/a pass
68n/a
69n/a### Tests
70n/a###############################################################################
71n/a
72n/aclass GCTests(unittest.TestCase):
73n/a def test_list(self):
74n/a l = []
75n/a l.append(l)
76n/a gc.collect()
77n/a del l
78n/a self.assertEqual(gc.collect(), 1)
79n/a
80n/a def test_dict(self):
81n/a d = {}
82n/a d[1] = d
83n/a gc.collect()
84n/a del d
85n/a self.assertEqual(gc.collect(), 1)
86n/a
87n/a def test_tuple(self):
88n/a # since tuples are immutable we close the loop with a list
89n/a l = []
90n/a t = (l,)
91n/a l.append(t)
92n/a gc.collect()
93n/a del t
94n/a del l
95n/a self.assertEqual(gc.collect(), 2)
96n/a
97n/a def test_class(self):
98n/a class A:
99n/a pass
100n/a A.a = A
101n/a gc.collect()
102n/a del A
103n/a self.assertNotEqual(gc.collect(), 0)
104n/a
105n/a def test_newstyleclass(self):
106n/a class A(object):
107n/a pass
108n/a gc.collect()
109n/a del A
110n/a self.assertNotEqual(gc.collect(), 0)
111n/a
112n/a def test_instance(self):
113n/a class A:
114n/a pass
115n/a a = A()
116n/a a.a = a
117n/a gc.collect()
118n/a del a
119n/a self.assertNotEqual(gc.collect(), 0)
120n/a
121n/a @requires_type_collecting
122n/a def test_newinstance(self):
123n/a class A(object):
124n/a pass
125n/a a = A()
126n/a a.a = a
127n/a gc.collect()
128n/a del a
129n/a self.assertNotEqual(gc.collect(), 0)
130n/a class B(list):
131n/a pass
132n/a class C(B, A):
133n/a pass
134n/a a = C()
135n/a a.a = a
136n/a gc.collect()
137n/a del a
138n/a self.assertNotEqual(gc.collect(), 0)
139n/a del B, C
140n/a self.assertNotEqual(gc.collect(), 0)
141n/a A.a = A()
142n/a del A
143n/a self.assertNotEqual(gc.collect(), 0)
144n/a self.assertEqual(gc.collect(), 0)
145n/a
146n/a def test_method(self):
147n/a # Tricky: self.__init__ is a bound method, it references the instance.
148n/a class A:
149n/a def __init__(self):
150n/a self.init = self.__init__
151n/a a = A()
152n/a gc.collect()
153n/a del a
154n/a self.assertNotEqual(gc.collect(), 0)
155n/a
156n/a @cpython_only
157n/a def test_legacy_finalizer(self):
158n/a # A() is uncollectable if it is part of a cycle, make sure it shows up
159n/a # in gc.garbage.
160n/a @with_tp_del
161n/a class A:
162n/a def __tp_del__(self): pass
163n/a class B:
164n/a pass
165n/a a = A()
166n/a a.a = a
167n/a id_a = id(a)
168n/a b = B()
169n/a b.b = b
170n/a gc.collect()
171n/a del a
172n/a del b
173n/a self.assertNotEqual(gc.collect(), 0)
174n/a for obj in gc.garbage:
175n/a if id(obj) == id_a:
176n/a del obj.a
177n/a break
178n/a else:
179n/a self.fail("didn't find obj in garbage (finalizer)")
180n/a gc.garbage.remove(obj)
181n/a
182n/a @cpython_only
183n/a def test_legacy_finalizer_newclass(self):
184n/a # A() is uncollectable if it is part of a cycle, make sure it shows up
185n/a # in gc.garbage.
186n/a @with_tp_del
187n/a class A(object):
188n/a def __tp_del__(self): pass
189n/a class B(object):
190n/a pass
191n/a a = A()
192n/a a.a = a
193n/a id_a = id(a)
194n/a b = B()
195n/a b.b = b
196n/a gc.collect()
197n/a del a
198n/a del b
199n/a self.assertNotEqual(gc.collect(), 0)
200n/a for obj in gc.garbage:
201n/a if id(obj) == id_a:
202n/a del obj.a
203n/a break
204n/a else:
205n/a self.fail("didn't find obj in garbage (finalizer)")
206n/a gc.garbage.remove(obj)
207n/a
208n/a def test_function(self):
209n/a # Tricky: f -> d -> f, code should call d.clear() after the exec to
210n/a # break the cycle.
211n/a d = {}
212n/a exec("def f(): pass\n", d)
213n/a gc.collect()
214n/a del d
215n/a self.assertEqual(gc.collect(), 2)
216n/a
217n/a @refcount_test
218n/a def test_frame(self):
219n/a def f():
220n/a frame = sys._getframe()
221n/a gc.collect()
222n/a f()
223n/a self.assertEqual(gc.collect(), 1)
224n/a
225n/a def test_saveall(self):
226n/a # Verify that cyclic garbage like lists show up in gc.garbage if the
227n/a # SAVEALL option is enabled.
228n/a
229n/a # First make sure we don't save away other stuff that just happens to
230n/a # be waiting for collection.
231n/a gc.collect()
232n/a # if this fails, someone else created immortal trash
233n/a self.assertEqual(gc.garbage, [])
234n/a
235n/a L = []
236n/a L.append(L)
237n/a id_L = id(L)
238n/a
239n/a debug = gc.get_debug()
240n/a gc.set_debug(debug | gc.DEBUG_SAVEALL)
241n/a del L
242n/a gc.collect()
243n/a gc.set_debug(debug)
244n/a
245n/a self.assertEqual(len(gc.garbage), 1)
246n/a obj = gc.garbage.pop()
247n/a self.assertEqual(id(obj), id_L)
248n/a
249n/a def test_del(self):
250n/a # __del__ methods can trigger collection, make this to happen
251n/a thresholds = gc.get_threshold()
252n/a gc.enable()
253n/a gc.set_threshold(1)
254n/a
255n/a class A:
256n/a def __del__(self):
257n/a dir(self)
258n/a a = A()
259n/a del a
260n/a
261n/a gc.disable()
262n/a gc.set_threshold(*thresholds)
263n/a
264n/a def test_del_newclass(self):
265n/a # __del__ methods can trigger collection, make this to happen
266n/a thresholds = gc.get_threshold()
267n/a gc.enable()
268n/a gc.set_threshold(1)
269n/a
270n/a class A(object):
271n/a def __del__(self):
272n/a dir(self)
273n/a a = A()
274n/a del a
275n/a
276n/a gc.disable()
277n/a gc.set_threshold(*thresholds)
278n/a
279n/a # The following two tests are fragile:
280n/a # They precisely count the number of allocations,
281n/a # which is highly implementation-dependent.
282n/a # For example, disposed tuples are not freed, but reused.
283n/a # To minimize variations, though, we first store the get_count() results
284n/a # and check them at the end.
285n/a @refcount_test
286n/a def test_get_count(self):
287n/a gc.collect()
288n/a a, b, c = gc.get_count()
289n/a x = []
290n/a d, e, f = gc.get_count()
291n/a self.assertEqual((b, c), (0, 0))
292n/a self.assertEqual((e, f), (0, 0))
293n/a # This is less fragile than asserting that a equals 0.
294n/a self.assertLess(a, 5)
295n/a # Between the two calls to get_count(), at least one object was
296n/a # created (the list).
297n/a self.assertGreater(d, a)
298n/a
299n/a @refcount_test
300n/a def test_collect_generations(self):
301n/a gc.collect()
302n/a # This object will "trickle" into generation N + 1 after
303n/a # each call to collect(N)
304n/a x = []
305n/a gc.collect(0)
306n/a # x is now in gen 1
307n/a a, b, c = gc.get_count()
308n/a gc.collect(1)
309n/a # x is now in gen 2
310n/a d, e, f = gc.get_count()
311n/a gc.collect(2)
312n/a # x is now in gen 3
313n/a g, h, i = gc.get_count()
314n/a # We don't check a, d, g since their exact values depends on
315n/a # internal implementation details of the interpreter.
316n/a self.assertEqual((b, c), (1, 0))
317n/a self.assertEqual((e, f), (0, 1))
318n/a self.assertEqual((h, i), (0, 0))
319n/a
320n/a def test_trashcan(self):
321n/a class Ouch:
322n/a n = 0
323n/a def __del__(self):
324n/a Ouch.n = Ouch.n + 1
325n/a if Ouch.n % 17 == 0:
326n/a gc.collect()
327n/a
328n/a # "trashcan" is a hack to prevent stack overflow when deallocating
329n/a # very deeply nested tuples etc. It works in part by abusing the
330n/a # type pointer and refcount fields, and that can yield horrible
331n/a # problems when gc tries to traverse the structures.
332n/a # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
333n/a # most likely die via segfault.
334n/a
335n/a # Note: In 2.3 the possibility for compiling without cyclic gc was
336n/a # removed, and that in turn allows the trashcan mechanism to work
337n/a # via much simpler means (e.g., it never abuses the type pointer or
338n/a # refcount fields anymore). Since it's much less likely to cause a
339n/a # problem now, the various constants in this expensive (we force a lot
340n/a # of full collections) test are cut back from the 2.2 version.
341n/a gc.enable()
342n/a N = 150
343n/a for count in range(2):
344n/a t = []
345n/a for i in range(N):
346n/a t = [t, Ouch()]
347n/a u = []
348n/a for i in range(N):
349n/a u = [u, Ouch()]
350n/a v = {}
351n/a for i in range(N):
352n/a v = {1: v, 2: Ouch()}
353n/a gc.disable()
354n/a
355n/a @unittest.skipUnless(threading, "test meaningless on builds without threads")
356n/a def test_trashcan_threads(self):
357n/a # Issue #13992: trashcan mechanism should be thread-safe
358n/a NESTING = 60
359n/a N_THREADS = 2
360n/a
361n/a def sleeper_gen():
362n/a """A generator that releases the GIL when closed or dealloc'ed."""
363n/a try:
364n/a yield
365n/a finally:
366n/a time.sleep(0.000001)
367n/a
368n/a class C(list):
369n/a # Appending to a list is atomic, which avoids the use of a lock.
370n/a inits = []
371n/a dels = []
372n/a def __init__(self, alist):
373n/a self[:] = alist
374n/a C.inits.append(None)
375n/a def __del__(self):
376n/a # This __del__ is called by subtype_dealloc().
377n/a C.dels.append(None)
378n/a # `g` will release the GIL when garbage-collected. This
379n/a # helps assert subtype_dealloc's behaviour when threads
380n/a # switch in the middle of it.
381n/a g = sleeper_gen()
382n/a next(g)
383n/a # Now that __del__ is finished, subtype_dealloc will proceed
384n/a # to call list_dealloc, which also uses the trashcan mechanism.
385n/a
386n/a def make_nested():
387n/a """Create a sufficiently nested container object so that the
388n/a trashcan mechanism is invoked when deallocating it."""
389n/a x = C([])
390n/a for i in range(NESTING):
391n/a x = [C([x])]
392n/a del x
393n/a
394n/a def run_thread():
395n/a """Exercise make_nested() in a loop."""
396n/a while not exit:
397n/a make_nested()
398n/a
399n/a old_switchinterval = sys.getswitchinterval()
400n/a sys.setswitchinterval(1e-5)
401n/a try:
402n/a exit = []
403n/a threads = []
404n/a for i in range(N_THREADS):
405n/a t = threading.Thread(target=run_thread)
406n/a threads.append(t)
407n/a with start_threads(threads, lambda: exit.append(1)):
408n/a time.sleep(1.0)
409n/a finally:
410n/a sys.setswitchinterval(old_switchinterval)
411n/a gc.collect()
412n/a self.assertEqual(len(C.inits), len(C.dels))
413n/a
414n/a def test_boom(self):
415n/a class Boom:
416n/a def __getattr__(self, someattribute):
417n/a del self.attr
418n/a raise AttributeError
419n/a
420n/a a = Boom()
421n/a b = Boom()
422n/a a.attr = b
423n/a b.attr = a
424n/a
425n/a gc.collect()
426n/a garbagelen = len(gc.garbage)
427n/a del a, b
428n/a # a<->b are in a trash cycle now. Collection will invoke
429n/a # Boom.__getattr__ (to see whether a and b have __del__ methods), and
430n/a # __getattr__ deletes the internal "attr" attributes as a side effect.
431n/a # That causes the trash cycle to get reclaimed via refcounts falling to
432n/a # 0, thus mutating the trash graph as a side effect of merely asking
433n/a # whether __del__ exists. This used to (before 2.3b1) crash Python.
434n/a # Now __getattr__ isn't called.
435n/a self.assertEqual(gc.collect(), 4)
436n/a self.assertEqual(len(gc.garbage), garbagelen)
437n/a
438n/a def test_boom2(self):
439n/a class Boom2:
440n/a def __init__(self):
441n/a self.x = 0
442n/a
443n/a def __getattr__(self, someattribute):
444n/a self.x += 1
445n/a if self.x > 1:
446n/a del self.attr
447n/a raise AttributeError
448n/a
449n/a a = Boom2()
450n/a b = Boom2()
451n/a a.attr = b
452n/a b.attr = a
453n/a
454n/a gc.collect()
455n/a garbagelen = len(gc.garbage)
456n/a del a, b
457n/a # Much like test_boom(), except that __getattr__ doesn't break the
458n/a # cycle until the second time gc checks for __del__. As of 2.3b1,
459n/a # there isn't a second time, so this simply cleans up the trash cycle.
460n/a # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
461n/a # reclaimed this way.
462n/a self.assertEqual(gc.collect(), 4)
463n/a self.assertEqual(len(gc.garbage), garbagelen)
464n/a
465n/a def test_boom_new(self):
466n/a # boom__new and boom2_new are exactly like boom and boom2, except use
467n/a # new-style classes.
468n/a
469n/a class Boom_New(object):
470n/a def __getattr__(self, someattribute):
471n/a del self.attr
472n/a raise AttributeError
473n/a
474n/a a = Boom_New()
475n/a b = Boom_New()
476n/a a.attr = b
477n/a b.attr = a
478n/a
479n/a gc.collect()
480n/a garbagelen = len(gc.garbage)
481n/a del a, b
482n/a self.assertEqual(gc.collect(), 4)
483n/a self.assertEqual(len(gc.garbage), garbagelen)
484n/a
485n/a def test_boom2_new(self):
486n/a class Boom2_New(object):
487n/a def __init__(self):
488n/a self.x = 0
489n/a
490n/a def __getattr__(self, someattribute):
491n/a self.x += 1
492n/a if self.x > 1:
493n/a del self.attr
494n/a raise AttributeError
495n/a
496n/a a = Boom2_New()
497n/a b = Boom2_New()
498n/a a.attr = b
499n/a b.attr = a
500n/a
501n/a gc.collect()
502n/a garbagelen = len(gc.garbage)
503n/a del a, b
504n/a self.assertEqual(gc.collect(), 4)
505n/a self.assertEqual(len(gc.garbage), garbagelen)
506n/a
507n/a def test_get_referents(self):
508n/a alist = [1, 3, 5]
509n/a got = gc.get_referents(alist)
510n/a got.sort()
511n/a self.assertEqual(got, alist)
512n/a
513n/a atuple = tuple(alist)
514n/a got = gc.get_referents(atuple)
515n/a got.sort()
516n/a self.assertEqual(got, alist)
517n/a
518n/a adict = {1: 3, 5: 7}
519n/a expected = [1, 3, 5, 7]
520n/a got = gc.get_referents(adict)
521n/a got.sort()
522n/a self.assertEqual(got, expected)
523n/a
524n/a got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
525n/a got.sort()
526n/a self.assertEqual(got, [0, 0] + list(range(5)))
527n/a
528n/a self.assertEqual(gc.get_referents(1, 'a', 4j), [])
529n/a
530n/a def test_is_tracked(self):
531n/a # Atomic built-in types are not tracked, user-defined objects and
532n/a # mutable containers are.
533n/a # NOTE: types with special optimizations (e.g. tuple) have tests
534n/a # in their own test files instead.
535n/a self.assertFalse(gc.is_tracked(None))
536n/a self.assertFalse(gc.is_tracked(1))
537n/a self.assertFalse(gc.is_tracked(1.0))
538n/a self.assertFalse(gc.is_tracked(1.0 + 5.0j))
539n/a self.assertFalse(gc.is_tracked(True))
540n/a self.assertFalse(gc.is_tracked(False))
541n/a self.assertFalse(gc.is_tracked(b"a"))
542n/a self.assertFalse(gc.is_tracked("a"))
543n/a self.assertFalse(gc.is_tracked(bytearray(b"a")))
544n/a self.assertFalse(gc.is_tracked(type))
545n/a self.assertFalse(gc.is_tracked(int))
546n/a self.assertFalse(gc.is_tracked(object))
547n/a self.assertFalse(gc.is_tracked(object()))
548n/a
549n/a class UserClass:
550n/a pass
551n/a
552n/a class UserInt(int):
553n/a pass
554n/a
555n/a # Base class is object; no extra fields.
556n/a class UserClassSlots:
557n/a __slots__ = ()
558n/a
559n/a # Base class is fixed size larger than object; no extra fields.
560n/a class UserFloatSlots(float):
561n/a __slots__ = ()
562n/a
563n/a # Base class is variable size; no extra fields.
564n/a class UserIntSlots(int):
565n/a __slots__ = ()
566n/a
567n/a self.assertTrue(gc.is_tracked(gc))
568n/a self.assertTrue(gc.is_tracked(UserClass))
569n/a self.assertTrue(gc.is_tracked(UserClass()))
570n/a self.assertTrue(gc.is_tracked(UserInt()))
571n/a self.assertTrue(gc.is_tracked([]))
572n/a self.assertTrue(gc.is_tracked(set()))
573n/a self.assertFalse(gc.is_tracked(UserClassSlots()))
574n/a self.assertFalse(gc.is_tracked(UserFloatSlots()))
575n/a self.assertFalse(gc.is_tracked(UserIntSlots()))
576n/a
577n/a def test_bug1055820b(self):
578n/a # Corresponds to temp2b.py in the bug report.
579n/a
580n/a ouch = []
581n/a def callback(ignored):
582n/a ouch[:] = [wr() for wr in WRs]
583n/a
584n/a Cs = [C1055820(i) for i in range(2)]
585n/a WRs = [weakref.ref(c, callback) for c in Cs]
586n/a c = None
587n/a
588n/a gc.collect()
589n/a self.assertEqual(len(ouch), 0)
590n/a # Make the two instances trash, and collect again. The bug was that
591n/a # the callback materialized a strong reference to an instance, but gc
592n/a # cleared the instance's dict anyway.
593n/a Cs = None
594n/a gc.collect()
595n/a self.assertEqual(len(ouch), 2) # else the callbacks didn't run
596n/a for x in ouch:
597n/a # If the callback resurrected one of these guys, the instance
598n/a # would be damaged, with an empty __dict__.
599n/a self.assertEqual(x, None)
600n/a
601n/a def test_bug21435(self):
602n/a # This is a poor test - its only virtue is that it happened to
603n/a # segfault on Tim's Windows box before the patch for 21435 was
604n/a # applied. That's a nasty bug relying on specific pieces of cyclic
605n/a # trash appearing in exactly the right order in finalize_garbage()'s
606n/a # input list.
607n/a # But there's no reliable way to force that order from Python code,
608n/a # so over time chances are good this test won't really be testing much
609n/a # of anything anymore. Still, if it blows up, there's _some_
610n/a # problem ;-)
611n/a gc.collect()
612n/a
613n/a class A:
614n/a pass
615n/a
616n/a class B:
617n/a def __init__(self, x):
618n/a self.x = x
619n/a
620n/a def __del__(self):
621n/a self.attr = None
622n/a
623n/a def do_work():
624n/a a = A()
625n/a b = B(A())
626n/a
627n/a a.attr = b
628n/a b.attr = a
629n/a
630n/a do_work()
631n/a gc.collect() # this blows up (bad C pointer) when it fails
632n/a
633n/a @cpython_only
634n/a def test_garbage_at_shutdown(self):
635n/a import subprocess
636n/a code = """if 1:
637n/a import gc
638n/a import _testcapi
639n/a @_testcapi.with_tp_del
640n/a class X:
641n/a def __init__(self, name):
642n/a self.name = name
643n/a def __repr__(self):
644n/a return "<X %%r>" %% self.name
645n/a def __tp_del__(self):
646n/a pass
647n/a
648n/a x = X('first')
649n/a x.x = x
650n/a x.y = X('second')
651n/a del x
652n/a gc.set_debug(%s)
653n/a """
654n/a def run_command(code):
655n/a p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
656n/a stdout=subprocess.PIPE,
657n/a stderr=subprocess.PIPE)
658n/a stdout, stderr = p.communicate()
659n/a p.stdout.close()
660n/a p.stderr.close()
661n/a self.assertEqual(p.returncode, 0)
662n/a self.assertEqual(stdout.strip(), b"")
663n/a return strip_python_stderr(stderr)
664n/a
665n/a stderr = run_command(code % "0")
666n/a self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
667n/a b"shutdown; use", stderr)
668n/a self.assertNotIn(b"<X 'first'>", stderr)
669n/a # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
670n/a stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
671n/a self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
672n/a b"shutdown", stderr)
673n/a self.assertTrue(
674n/a (b"[<X 'first'>, <X 'second'>]" in stderr) or
675n/a (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
676n/a # With DEBUG_SAVEALL, no additional message should get printed
677n/a # (because gc.garbage also contains normally reclaimable cyclic
678n/a # references, and its elements get printed at runtime anyway).
679n/a stderr = run_command(code % "gc.DEBUG_SAVEALL")
680n/a self.assertNotIn(b"uncollectable objects at shutdown", stderr)
681n/a
682n/a @requires_type_collecting
683n/a def test_gc_main_module_at_shutdown(self):
684n/a # Create a reference cycle through the __main__ module and check
685n/a # it gets collected at interpreter shutdown.
686n/a code = """if 1:
687n/a class C:
688n/a def __del__(self):
689n/a print('__del__ called')
690n/a l = [C()]
691n/a l.append(l)
692n/a """
693n/a rc, out, err = assert_python_ok('-c', code)
694n/a self.assertEqual(out.strip(), b'__del__ called')
695n/a
696n/a @requires_type_collecting
697n/a def test_gc_ordinary_module_at_shutdown(self):
698n/a # Same as above, but with a non-__main__ module.
699n/a with temp_dir() as script_dir:
700n/a module = """if 1:
701n/a class C:
702n/a def __del__(self):
703n/a print('__del__ called')
704n/a l = [C()]
705n/a l.append(l)
706n/a """
707n/a code = """if 1:
708n/a import sys
709n/a sys.path.insert(0, %r)
710n/a import gctest
711n/a """ % (script_dir,)
712n/a make_script(script_dir, 'gctest', module)
713n/a rc, out, err = assert_python_ok('-c', code)
714n/a self.assertEqual(out.strip(), b'__del__ called')
715n/a
716n/a def test_get_stats(self):
717n/a stats = gc.get_stats()
718n/a self.assertEqual(len(stats), 3)
719n/a for st in stats:
720n/a self.assertIsInstance(st, dict)
721n/a self.assertEqual(set(st),
722n/a {"collected", "collections", "uncollectable"})
723n/a self.assertGreaterEqual(st["collected"], 0)
724n/a self.assertGreaterEqual(st["collections"], 0)
725n/a self.assertGreaterEqual(st["uncollectable"], 0)
726n/a # Check that collection counts are incremented correctly
727n/a if gc.isenabled():
728n/a self.addCleanup(gc.enable)
729n/a gc.disable()
730n/a old = gc.get_stats()
731n/a gc.collect(0)
732n/a new = gc.get_stats()
733n/a self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
734n/a self.assertEqual(new[1]["collections"], old[1]["collections"])
735n/a self.assertEqual(new[2]["collections"], old[2]["collections"])
736n/a gc.collect(2)
737n/a new = gc.get_stats()
738n/a self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
739n/a self.assertEqual(new[1]["collections"], old[1]["collections"])
740n/a self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
741n/a
742n/a
743n/aclass GCCallbackTests(unittest.TestCase):
744n/a def setUp(self):
745n/a # Save gc state and disable it.
746n/a self.enabled = gc.isenabled()
747n/a gc.disable()
748n/a self.debug = gc.get_debug()
749n/a gc.set_debug(0)
750n/a gc.callbacks.append(self.cb1)
751n/a gc.callbacks.append(self.cb2)
752n/a self.othergarbage = []
753n/a
754n/a def tearDown(self):
755n/a # Restore gc state
756n/a del self.visit
757n/a gc.callbacks.remove(self.cb1)
758n/a gc.callbacks.remove(self.cb2)
759n/a gc.set_debug(self.debug)
760n/a if self.enabled:
761n/a gc.enable()
762n/a # destroy any uncollectables
763n/a gc.collect()
764n/a for obj in gc.garbage:
765n/a if isinstance(obj, Uncollectable):
766n/a obj.partner = None
767n/a del gc.garbage[:]
768n/a del self.othergarbage
769n/a gc.collect()
770n/a
771n/a def preclean(self):
772n/a # Remove all fluff from the system. Invoke this function
773n/a # manually rather than through self.setUp() for maximum
774n/a # safety.
775n/a self.visit = []
776n/a gc.collect()
777n/a garbage, gc.garbage[:] = gc.garbage[:], []
778n/a self.othergarbage.append(garbage)
779n/a self.visit = []
780n/a
781n/a def cb1(self, phase, info):
782n/a self.visit.append((1, phase, dict(info)))
783n/a
784n/a def cb2(self, phase, info):
785n/a self.visit.append((2, phase, dict(info)))
786n/a if phase == "stop" and hasattr(self, "cleanup"):
787n/a # Clean Uncollectable from garbage
788n/a uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
789n/a gc.garbage[:] = [e for e in gc.garbage
790n/a if not isinstance(e, Uncollectable)]
791n/a for e in uc:
792n/a e.partner = None
793n/a
794n/a def test_collect(self):
795n/a self.preclean()
796n/a gc.collect()
797n/a # Algorithmically verify the contents of self.visit
798n/a # because it is long and tortuous.
799n/a
800n/a # Count the number of visits to each callback
801n/a n = [v[0] for v in self.visit]
802n/a n1 = [i for i in n if i == 1]
803n/a n2 = [i for i in n if i == 2]
804n/a self.assertEqual(n1, [1]*2)
805n/a self.assertEqual(n2, [2]*2)
806n/a
807n/a # Count that we got the right number of start and stop callbacks.
808n/a n = [v[1] for v in self.visit]
809n/a n1 = [i for i in n if i == "start"]
810n/a n2 = [i for i in n if i == "stop"]
811n/a self.assertEqual(n1, ["start"]*2)
812n/a self.assertEqual(n2, ["stop"]*2)
813n/a
814n/a # Check that we got the right info dict for all callbacks
815n/a for v in self.visit:
816n/a info = v[2]
817n/a self.assertTrue("generation" in info)
818n/a self.assertTrue("collected" in info)
819n/a self.assertTrue("uncollectable" in info)
820n/a
821n/a def test_collect_generation(self):
822n/a self.preclean()
823n/a gc.collect(2)
824n/a for v in self.visit:
825n/a info = v[2]
826n/a self.assertEqual(info["generation"], 2)
827n/a
828n/a @cpython_only
829n/a def test_collect_garbage(self):
830n/a self.preclean()
831n/a # Each of these cause four objects to be garbage: Two
832n/a # Uncolectables and their instance dicts.
833n/a Uncollectable()
834n/a Uncollectable()
835n/a C1055820(666)
836n/a gc.collect()
837n/a for v in self.visit:
838n/a if v[1] != "stop":
839n/a continue
840n/a info = v[2]
841n/a self.assertEqual(info["collected"], 2)
842n/a self.assertEqual(info["uncollectable"], 8)
843n/a
844n/a # We should now have the Uncollectables in gc.garbage
845n/a self.assertEqual(len(gc.garbage), 4)
846n/a for e in gc.garbage:
847n/a self.assertIsInstance(e, Uncollectable)
848n/a
849n/a # Now, let our callback handle the Uncollectable instances
850n/a self.cleanup=True
851n/a self.visit = []
852n/a gc.garbage[:] = []
853n/a gc.collect()
854n/a for v in self.visit:
855n/a if v[1] != "stop":
856n/a continue
857n/a info = v[2]
858n/a self.assertEqual(info["collected"], 0)
859n/a self.assertEqual(info["uncollectable"], 4)
860n/a
861n/a # Uncollectables should be gone
862n/a self.assertEqual(len(gc.garbage), 0)
863n/a
864n/a
865n/aclass GCTogglingTests(unittest.TestCase):
866n/a def setUp(self):
867n/a gc.enable()
868n/a
869n/a def tearDown(self):
870n/a gc.disable()
871n/a
872n/a def test_bug1055820c(self):
873n/a # Corresponds to temp2c.py in the bug report. This is pretty
874n/a # elaborate.
875n/a
876n/a c0 = C1055820(0)
877n/a # Move c0 into generation 2.
878n/a gc.collect()
879n/a
880n/a c1 = C1055820(1)
881n/a c1.keep_c0_alive = c0
882n/a del c0.loop # now only c1 keeps c0 alive
883n/a
884n/a c2 = C1055820(2)
885n/a c2wr = weakref.ref(c2) # no callback!
886n/a
887n/a ouch = []
888n/a def callback(ignored):
889n/a ouch[:] = [c2wr()]
890n/a
891n/a # The callback gets associated with a wr on an object in generation 2.
892n/a c0wr = weakref.ref(c0, callback)
893n/a
894n/a c0 = c1 = c2 = None
895n/a
896n/a # What we've set up: c0, c1, and c2 are all trash now. c0 is in
897n/a # generation 2. The only thing keeping it alive is that c1 points to
898n/a # it. c1 and c2 are in generation 0, and are in self-loops. There's a
899n/a # global weakref to c2 (c2wr), but that weakref has no callback.
900n/a # There's also a global weakref to c0 (c0wr), and that does have a
901n/a # callback, and that callback references c2 via c2wr().
902n/a #
903n/a # c0 has a wr with callback, which references c2wr
904n/a # ^
905n/a # |
906n/a # | Generation 2 above dots
907n/a #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
908n/a # | Generation 0 below dots
909n/a # |
910n/a # |
911n/a # ^->c1 ^->c2 has a wr but no callback
912n/a # | | | |
913n/a # <--v <--v
914n/a #
915n/a # So this is the nightmare: when generation 0 gets collected, we see
916n/a # that c2 has a callback-free weakref, and c1 doesn't even have a
917n/a # weakref. Collecting generation 0 doesn't see c0 at all, and c0 is
918n/a # the only object that has a weakref with a callback. gc clears c1
919n/a # and c2. Clearing c1 has the side effect of dropping the refcount on
920n/a # c0 to 0, so c0 goes away (despite that it's in an older generation)
921n/a # and c0's wr callback triggers. That in turn materializes a reference
922n/a # to c2 via c2wr(), but c2 gets cleared anyway by gc.
923n/a
924n/a # We want to let gc happen "naturally", to preserve the distinction
925n/a # between generations.
926n/a junk = []
927n/a i = 0
928n/a detector = GC_Detector()
929n/a while not detector.gc_happened:
930n/a i += 1
931n/a if i > 10000:
932n/a self.fail("gc didn't happen after 10000 iterations")
933n/a self.assertEqual(len(ouch), 0)
934n/a junk.append([]) # this will eventually trigger gc
935n/a
936n/a self.assertEqual(len(ouch), 1) # else the callback wasn't invoked
937n/a for x in ouch:
938n/a # If the callback resurrected c2, the instance would be damaged,
939n/a # with an empty __dict__.
940n/a self.assertEqual(x, None)
941n/a
942n/a def test_bug1055820d(self):
943n/a # Corresponds to temp2d.py in the bug report. This is very much like
944n/a # test_bug1055820c, but uses a __del__ method instead of a weakref
945n/a # callback to sneak in a resurrection of cyclic trash.
946n/a
947n/a ouch = []
948n/a class D(C1055820):
949n/a def __del__(self):
950n/a ouch[:] = [c2wr()]
951n/a
952n/a d0 = D(0)
953n/a # Move all the above into generation 2.
954n/a gc.collect()
955n/a
956n/a c1 = C1055820(1)
957n/a c1.keep_d0_alive = d0
958n/a del d0.loop # now only c1 keeps d0 alive
959n/a
960n/a c2 = C1055820(2)
961n/a c2wr = weakref.ref(c2) # no callback!
962n/a
963n/a d0 = c1 = c2 = None
964n/a
965n/a # What we've set up: d0, c1, and c2 are all trash now. d0 is in
966n/a # generation 2. The only thing keeping it alive is that c1 points to
967n/a # it. c1 and c2 are in generation 0, and are in self-loops. There's
968n/a # a global weakref to c2 (c2wr), but that weakref has no callback.
969n/a # There are no other weakrefs.
970n/a #
971n/a # d0 has a __del__ method that references c2wr
972n/a # ^
973n/a # |
974n/a # | Generation 2 above dots
975n/a #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
976n/a # | Generation 0 below dots
977n/a # |
978n/a # |
979n/a # ^->c1 ^->c2 has a wr but no callback
980n/a # | | | |
981n/a # <--v <--v
982n/a #
983n/a # So this is the nightmare: when generation 0 gets collected, we see
984n/a # that c2 has a callback-free weakref, and c1 doesn't even have a
985n/a # weakref. Collecting generation 0 doesn't see d0 at all. gc clears
986n/a # c1 and c2. Clearing c1 has the side effect of dropping the refcount
987n/a # on d0 to 0, so d0 goes away (despite that it's in an older
988n/a # generation) and d0's __del__ triggers. That in turn materializes
989n/a # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
990n/a
991n/a # We want to let gc happen "naturally", to preserve the distinction
992n/a # between generations.
993n/a detector = GC_Detector()
994n/a junk = []
995n/a i = 0
996n/a while not detector.gc_happened:
997n/a i += 1
998n/a if i > 10000:
999n/a self.fail("gc didn't happen after 10000 iterations")
1000n/a self.assertEqual(len(ouch), 0)
1001n/a junk.append([]) # this will eventually trigger gc
1002n/a
1003n/a self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked
1004n/a for x in ouch:
1005n/a # If __del__ resurrected c2, the instance would be damaged, with an
1006n/a # empty __dict__.
1007n/a self.assertEqual(x, None)
1008n/a
1009n/adef test_main():
1010n/a enabled = gc.isenabled()
1011n/a gc.disable()
1012n/a assert not gc.isenabled()
1013n/a debug = gc.get_debug()
1014n/a gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1015n/a
1016n/a try:
1017n/a gc.collect() # Delete 2nd generation garbage
1018n/a run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
1019n/a finally:
1020n/a gc.set_debug(debug)
1021n/a # test gc.enable() even if GC is disabled by default
1022n/a if verbose:
1023n/a print("restoring automatic collection")
1024n/a # make sure to always test gc.enable()
1025n/a gc.enable()
1026n/a assert gc.isenabled()
1027n/a if not enabled:
1028n/a gc.disable()
1029n/a
1030n/aif __name__ == "__main__":
1031n/a test_main()