ยปCore Development>Code coverage>Lib/test/test_weakref.py

Python code coverage for Lib/test/test_weakref.py

#countcontent
1n/aimport gc
2n/aimport sys
3n/aimport unittest
4n/aimport collections
5n/aimport weakref
6n/aimport operator
7n/aimport contextlib
8n/aimport copy
9n/aimport time
10n/a
11n/afrom test import support
12n/afrom test.support import script_helper
13n/a
14n/a# Used in ReferencesTestCase.test_ref_created_during_del() .
15n/aref_from_del = None
16n/a
17n/a# Used by FinalizeTestCase as a global that may be replaced by None
18n/a# when the interpreter shuts down.
19n/a_global_var = 'foobar'
20n/a
21n/aclass C:
22n/a def method(self):
23n/a pass
24n/a
25n/a
26n/aclass Callable:
27n/a bar = None
28n/a
29n/a def __call__(self, x):
30n/a self.bar = x
31n/a
32n/a
33n/adef create_function():
34n/a def f(): pass
35n/a return f
36n/a
37n/adef create_bound_method():
38n/a return C().method
39n/a
40n/a
41n/aclass Object:
42n/a def __init__(self, arg):
43n/a self.arg = arg
44n/a def __repr__(self):
45n/a return "<Object %r>" % self.arg
46n/a def __eq__(self, other):
47n/a if isinstance(other, Object):
48n/a return self.arg == other.arg
49n/a return NotImplemented
50n/a def __lt__(self, other):
51n/a if isinstance(other, Object):
52n/a return self.arg < other.arg
53n/a return NotImplemented
54n/a def __hash__(self):
55n/a return hash(self.arg)
56n/a def some_method(self):
57n/a return 4
58n/a def other_method(self):
59n/a return 5
60n/a
61n/a
62n/aclass RefCycle:
63n/a def __init__(self):
64n/a self.cycle = self
65n/a
66n/a
67n/aclass TestBase(unittest.TestCase):
68n/a
69n/a def setUp(self):
70n/a self.cbcalled = 0
71n/a
72n/a def callback(self, ref):
73n/a self.cbcalled += 1
74n/a
75n/a
76n/a@contextlib.contextmanager
77n/adef collect_in_thread(period=0.0001):
78n/a """
79n/a Ensure GC collections happen in a different thread, at a high frequency.
80n/a """
81n/a threading = support.import_module('threading')
82n/a please_stop = False
83n/a
84n/a def collect():
85n/a while not please_stop:
86n/a time.sleep(period)
87n/a gc.collect()
88n/a
89n/a with support.disable_gc():
90n/a t = threading.Thread(target=collect)
91n/a t.start()
92n/a try:
93n/a yield
94n/a finally:
95n/a please_stop = True
96n/a t.join()
97n/a
98n/a
99n/aclass ReferencesTestCase(TestBase):
100n/a
101n/a def test_basic_ref(self):
102n/a self.check_basic_ref(C)
103n/a self.check_basic_ref(create_function)
104n/a self.check_basic_ref(create_bound_method)
105n/a
106n/a # Just make sure the tp_repr handler doesn't raise an exception.
107n/a # Live reference:
108n/a o = C()
109n/a wr = weakref.ref(o)
110n/a repr(wr)
111n/a # Dead reference:
112n/a del o
113n/a repr(wr)
114n/a
115n/a def test_basic_callback(self):
116n/a self.check_basic_callback(C)
117n/a self.check_basic_callback(create_function)
118n/a self.check_basic_callback(create_bound_method)
119n/a
120n/a @support.cpython_only
121n/a def test_cfunction(self):
122n/a import _testcapi
123n/a create_cfunction = _testcapi.create_cfunction
124n/a f = create_cfunction()
125n/a wr = weakref.ref(f)
126n/a self.assertIs(wr(), f)
127n/a del f
128n/a self.assertIsNone(wr())
129n/a self.check_basic_ref(create_cfunction)
130n/a self.check_basic_callback(create_cfunction)
131n/a
132n/a def test_multiple_callbacks(self):
133n/a o = C()
134n/a ref1 = weakref.ref(o, self.callback)
135n/a ref2 = weakref.ref(o, self.callback)
136n/a del o
137n/a self.assertIsNone(ref1(), "expected reference to be invalidated")
138n/a self.assertIsNone(ref2(), "expected reference to be invalidated")
139n/a self.assertEqual(self.cbcalled, 2,
140n/a "callback not called the right number of times")
141n/a
142n/a def test_multiple_selfref_callbacks(self):
143n/a # Make sure all references are invalidated before callbacks are called
144n/a #
145n/a # What's important here is that we're using the first
146n/a # reference in the callback invoked on the second reference
147n/a # (the most recently created ref is cleaned up first). This
148n/a # tests that all references to the object are invalidated
149n/a # before any of the callbacks are invoked, so that we only
150n/a # have one invocation of _weakref.c:cleanup_helper() active
151n/a # for a particular object at a time.
152n/a #
153n/a def callback(object, self=self):
154n/a self.ref()
155n/a c = C()
156n/a self.ref = weakref.ref(c, callback)
157n/a ref1 = weakref.ref(c, callback)
158n/a del c
159n/a
160n/a def test_constructor_kwargs(self):
161n/a c = C()
162n/a self.assertRaises(TypeError, weakref.ref, c, callback=None)
163n/a
164n/a def test_proxy_ref(self):
165n/a o = C()
166n/a o.bar = 1
167n/a ref1 = weakref.proxy(o, self.callback)
168n/a ref2 = weakref.proxy(o, self.callback)
169n/a del o
170n/a
171n/a def check(proxy):
172n/a proxy.bar
173n/a
174n/a self.assertRaises(ReferenceError, check, ref1)
175n/a self.assertRaises(ReferenceError, check, ref2)
176n/a self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
177n/a self.assertEqual(self.cbcalled, 2)
178n/a
179n/a def check_basic_ref(self, factory):
180n/a o = factory()
181n/a ref = weakref.ref(o)
182n/a self.assertIsNotNone(ref(),
183n/a "weak reference to live object should be live")
184n/a o2 = ref()
185n/a self.assertIs(o, o2,
186n/a "<ref>() should return original object if live")
187n/a
188n/a def check_basic_callback(self, factory):
189n/a self.cbcalled = 0
190n/a o = factory()
191n/a ref = weakref.ref(o, self.callback)
192n/a del o
193n/a self.assertEqual(self.cbcalled, 1,
194n/a "callback did not properly set 'cbcalled'")
195n/a self.assertIsNone(ref(),
196n/a "ref2 should be dead after deleting object reference")
197n/a
198n/a def test_ref_reuse(self):
199n/a o = C()
200n/a ref1 = weakref.ref(o)
201n/a # create a proxy to make sure that there's an intervening creation
202n/a # between these two; it should make no difference
203n/a proxy = weakref.proxy(o)
204n/a ref2 = weakref.ref(o)
205n/a self.assertIs(ref1, ref2,
206n/a "reference object w/out callback should be re-used")
207n/a
208n/a o = C()
209n/a proxy = weakref.proxy(o)
210n/a ref1 = weakref.ref(o)
211n/a ref2 = weakref.ref(o)
212n/a self.assertIs(ref1, ref2,
213n/a "reference object w/out callback should be re-used")
214n/a self.assertEqual(weakref.getweakrefcount(o), 2,
215n/a "wrong weak ref count for object")
216n/a del proxy
217n/a self.assertEqual(weakref.getweakrefcount(o), 1,
218n/a "wrong weak ref count for object after deleting proxy")
219n/a
220n/a def test_proxy_reuse(self):
221n/a o = C()
222n/a proxy1 = weakref.proxy(o)
223n/a ref = weakref.ref(o)
224n/a proxy2 = weakref.proxy(o)
225n/a self.assertIs(proxy1, proxy2,
226n/a "proxy object w/out callback should have been re-used")
227n/a
228n/a def test_basic_proxy(self):
229n/a o = C()
230n/a self.check_proxy(o, weakref.proxy(o))
231n/a
232n/a L = collections.UserList()
233n/a p = weakref.proxy(L)
234n/a self.assertFalse(p, "proxy for empty UserList should be false")
235n/a p.append(12)
236n/a self.assertEqual(len(L), 1)
237n/a self.assertTrue(p, "proxy for non-empty UserList should be true")
238n/a p[:] = [2, 3]
239n/a self.assertEqual(len(L), 2)
240n/a self.assertEqual(len(p), 2)
241n/a self.assertIn(3, p, "proxy didn't support __contains__() properly")
242n/a p[1] = 5
243n/a self.assertEqual(L[1], 5)
244n/a self.assertEqual(p[1], 5)
245n/a L2 = collections.UserList(L)
246n/a p2 = weakref.proxy(L2)
247n/a self.assertEqual(p, p2)
248n/a ## self.assertEqual(repr(L2), repr(p2))
249n/a L3 = collections.UserList(range(10))
250n/a p3 = weakref.proxy(L3)
251n/a self.assertEqual(L3[:], p3[:])
252n/a self.assertEqual(L3[5:], p3[5:])
253n/a self.assertEqual(L3[:5], p3[:5])
254n/a self.assertEqual(L3[2:5], p3[2:5])
255n/a
256n/a def test_proxy_unicode(self):
257n/a # See bug 5037
258n/a class C(object):
259n/a def __str__(self):
260n/a return "string"
261n/a def __bytes__(self):
262n/a return b"bytes"
263n/a instance = C()
264n/a self.assertIn("__bytes__", dir(weakref.proxy(instance)))
265n/a self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
266n/a
267n/a def test_proxy_index(self):
268n/a class C:
269n/a def __index__(self):
270n/a return 10
271n/a o = C()
272n/a p = weakref.proxy(o)
273n/a self.assertEqual(operator.index(p), 10)
274n/a
275n/a def test_proxy_div(self):
276n/a class C:
277n/a def __floordiv__(self, other):
278n/a return 42
279n/a def __ifloordiv__(self, other):
280n/a return 21
281n/a o = C()
282n/a p = weakref.proxy(o)
283n/a self.assertEqual(p // 5, 42)
284n/a p //= 5
285n/a self.assertEqual(p, 21)
286n/a
287n/a # The PyWeakref_* C API is documented as allowing either NULL or
288n/a # None as the value for the callback, where either means "no
289n/a # callback". The "no callback" ref and proxy objects are supposed
290n/a # to be shared so long as they exist by all callers so long as
291n/a # they are active. In Python 2.3.3 and earlier, this guarantee
292n/a # was not honored, and was broken in different ways for
293n/a # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.)
294n/a
295n/a def test_shared_ref_without_callback(self):
296n/a self.check_shared_without_callback(weakref.ref)
297n/a
298n/a def test_shared_proxy_without_callback(self):
299n/a self.check_shared_without_callback(weakref.proxy)
300n/a
301n/a def check_shared_without_callback(self, makeref):
302n/a o = Object(1)
303n/a p1 = makeref(o, None)
304n/a p2 = makeref(o, None)
305n/a self.assertIs(p1, p2, "both callbacks were None in the C API")
306n/a del p1, p2
307n/a p1 = makeref(o)
308n/a p2 = makeref(o, None)
309n/a self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
310n/a del p1, p2
311n/a p1 = makeref(o)
312n/a p2 = makeref(o)
313n/a self.assertIs(p1, p2, "both callbacks were NULL in the C API")
314n/a del p1, p2
315n/a p1 = makeref(o, None)
316n/a p2 = makeref(o)
317n/a self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
318n/a
319n/a def test_callable_proxy(self):
320n/a o = Callable()
321n/a ref1 = weakref.proxy(o)
322n/a
323n/a self.check_proxy(o, ref1)
324n/a
325n/a self.assertIs(type(ref1), weakref.CallableProxyType,
326n/a "proxy is not of callable type")
327n/a ref1('twinkies!')
328n/a self.assertEqual(o.bar, 'twinkies!',
329n/a "call through proxy not passed through to original")
330n/a ref1(x='Splat.')
331n/a self.assertEqual(o.bar, 'Splat.',
332n/a "call through proxy not passed through to original")
333n/a
334n/a # expect due to too few args
335n/a self.assertRaises(TypeError, ref1)
336n/a
337n/a # expect due to too many args
338n/a self.assertRaises(TypeError, ref1, 1, 2, 3)
339n/a
340n/a def check_proxy(self, o, proxy):
341n/a o.foo = 1
342n/a self.assertEqual(proxy.foo, 1,
343n/a "proxy does not reflect attribute addition")
344n/a o.foo = 2
345n/a self.assertEqual(proxy.foo, 2,
346n/a "proxy does not reflect attribute modification")
347n/a del o.foo
348n/a self.assertFalse(hasattr(proxy, 'foo'),
349n/a "proxy does not reflect attribute removal")
350n/a
351n/a proxy.foo = 1
352n/a self.assertEqual(o.foo, 1,
353n/a "object does not reflect attribute addition via proxy")
354n/a proxy.foo = 2
355n/a self.assertEqual(o.foo, 2,
356n/a "object does not reflect attribute modification via proxy")
357n/a del proxy.foo
358n/a self.assertFalse(hasattr(o, 'foo'),
359n/a "object does not reflect attribute removal via proxy")
360n/a
361n/a def test_proxy_deletion(self):
362n/a # Test clearing of SF bug #762891
363n/a class Foo:
364n/a result = None
365n/a def __delitem__(self, accessor):
366n/a self.result = accessor
367n/a g = Foo()
368n/a f = weakref.proxy(g)
369n/a del f[0]
370n/a self.assertEqual(f.result, 0)
371n/a
372n/a def test_proxy_bool(self):
373n/a # Test clearing of SF bug #1170766
374n/a class List(list): pass
375n/a lyst = List()
376n/a self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
377n/a
378n/a def test_getweakrefcount(self):
379n/a o = C()
380n/a ref1 = weakref.ref(o)
381n/a ref2 = weakref.ref(o, self.callback)
382n/a self.assertEqual(weakref.getweakrefcount(o), 2,
383n/a "got wrong number of weak reference objects")
384n/a
385n/a proxy1 = weakref.proxy(o)
386n/a proxy2 = weakref.proxy(o, self.callback)
387n/a self.assertEqual(weakref.getweakrefcount(o), 4,
388n/a "got wrong number of weak reference objects")
389n/a
390n/a del ref1, ref2, proxy1, proxy2
391n/a self.assertEqual(weakref.getweakrefcount(o), 0,
392n/a "weak reference objects not unlinked from"
393n/a " referent when discarded.")
394n/a
395n/a # assumes ints do not support weakrefs
396n/a self.assertEqual(weakref.getweakrefcount(1), 0,
397n/a "got wrong number of weak reference objects for int")
398n/a
399n/a def test_getweakrefs(self):
400n/a o = C()
401n/a ref1 = weakref.ref(o, self.callback)
402n/a ref2 = weakref.ref(o, self.callback)
403n/a del ref1
404n/a self.assertEqual(weakref.getweakrefs(o), [ref2],
405n/a "list of refs does not match")
406n/a
407n/a o = C()
408n/a ref1 = weakref.ref(o, self.callback)
409n/a ref2 = weakref.ref(o, self.callback)
410n/a del ref2
411n/a self.assertEqual(weakref.getweakrefs(o), [ref1],
412n/a "list of refs does not match")
413n/a
414n/a del ref1
415n/a self.assertEqual(weakref.getweakrefs(o), [],
416n/a "list of refs not cleared")
417n/a
418n/a # assumes ints do not support weakrefs
419n/a self.assertEqual(weakref.getweakrefs(1), [],
420n/a "list of refs does not match for int")
421n/a
422n/a def test_newstyle_number_ops(self):
423n/a class F(float):
424n/a pass
425n/a f = F(2.0)
426n/a p = weakref.proxy(f)
427n/a self.assertEqual(p + 1.0, 3.0)
428n/a self.assertEqual(1.0 + p, 3.0) # this used to SEGV
429n/a
430n/a def test_callbacks_protected(self):
431n/a # Callbacks protected from already-set exceptions?
432n/a # Regression test for SF bug #478534.
433n/a class BogusError(Exception):
434n/a pass
435n/a data = {}
436n/a def remove(k):
437n/a del data[k]
438n/a def encapsulate():
439n/a f = lambda : ()
440n/a data[weakref.ref(f, remove)] = None
441n/a raise BogusError
442n/a try:
443n/a encapsulate()
444n/a except BogusError:
445n/a pass
446n/a else:
447n/a self.fail("exception not properly restored")
448n/a try:
449n/a encapsulate()
450n/a except BogusError:
451n/a pass
452n/a else:
453n/a self.fail("exception not properly restored")
454n/a
455n/a def test_sf_bug_840829(self):
456n/a # "weakref callbacks and gc corrupt memory"
457n/a # subtype_dealloc erroneously exposed a new-style instance
458n/a # already in the process of getting deallocated to gc,
459n/a # causing double-deallocation if the instance had a weakref
460n/a # callback that triggered gc.
461n/a # If the bug exists, there probably won't be an obvious symptom
462n/a # in a release build. In a debug build, a segfault will occur
463n/a # when the second attempt to remove the instance from the "list
464n/a # of all objects" occurs.
465n/a
466n/a import gc
467n/a
468n/a class C(object):
469n/a pass
470n/a
471n/a c = C()
472n/a wr = weakref.ref(c, lambda ignore: gc.collect())
473n/a del c
474n/a
475n/a # There endeth the first part. It gets worse.
476n/a del wr
477n/a
478n/a c1 = C()
479n/a c1.i = C()
480n/a wr = weakref.ref(c1.i, lambda ignore: gc.collect())
481n/a
482n/a c2 = C()
483n/a c2.c1 = c1
484n/a del c1 # still alive because c2 points to it
485n/a
486n/a # Now when subtype_dealloc gets called on c2, it's not enough just
487n/a # that c2 is immune from gc while the weakref callbacks associated
488n/a # with c2 execute (there are none in this 2nd half of the test, btw).
489n/a # subtype_dealloc goes on to call the base classes' deallocs too,
490n/a # so any gc triggered by weakref callbacks associated with anything
491n/a # torn down by a base class dealloc can also trigger double
492n/a # deallocation of c2.
493n/a del c2
494n/a
495n/a def test_callback_in_cycle_1(self):
496n/a import gc
497n/a
498n/a class J(object):
499n/a pass
500n/a
501n/a class II(object):
502n/a def acallback(self, ignore):
503n/a self.J
504n/a
505n/a I = II()
506n/a I.J = J
507n/a I.wr = weakref.ref(J, I.acallback)
508n/a
509n/a # Now J and II are each in a self-cycle (as all new-style class
510n/a # objects are, since their __mro__ points back to them). I holds
511n/a # both a weak reference (I.wr) and a strong reference (I.J) to class
512n/a # J. I is also in a cycle (I.wr points to a weakref that references
513n/a # I.acallback). When we del these three, they all become trash, but
514n/a # the cycles prevent any of them from getting cleaned up immediately.
515n/a # Instead they have to wait for cyclic gc to deduce that they're
516n/a # trash.
517n/a #
518n/a # gc used to call tp_clear on all of them, and the order in which
519n/a # it does that is pretty accidental. The exact order in which we
520n/a # built up these things manages to provoke gc into running tp_clear
521n/a # in just the right order (I last). Calling tp_clear on II leaves
522n/a # behind an insane class object (its __mro__ becomes NULL). Calling
523n/a # tp_clear on J breaks its self-cycle, but J doesn't get deleted
524n/a # just then because of the strong reference from I.J. Calling
525n/a # tp_clear on I starts to clear I's __dict__, and just happens to
526n/a # clear I.J first -- I.wr is still intact. That removes the last
527n/a # reference to J, which triggers the weakref callback. The callback
528n/a # tries to do "self.J", and instances of new-style classes look up
529n/a # attributes ("J") in the class dict first. The class (II) wants to
530n/a # search II.__mro__, but that's NULL. The result was a segfault in
531n/a # a release build, and an assert failure in a debug build.
532n/a del I, J, II
533n/a gc.collect()
534n/a
535n/a def test_callback_in_cycle_2(self):
536n/a import gc
537n/a
538n/a # This is just like test_callback_in_cycle_1, except that II is an
539n/a # old-style class. The symptom is different then: an instance of an
540n/a # old-style class looks in its own __dict__ first. 'J' happens to
541n/a # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
542n/a # __dict__, so the attribute isn't found. The difference is that
543n/a # the old-style II doesn't have a NULL __mro__ (it doesn't have any
544n/a # __mro__), so no segfault occurs. Instead it got:
545n/a # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
546n/a # Exception exceptions.AttributeError:
547n/a # "II instance has no attribute 'J'" in <bound method II.acallback
548n/a # of <?.II instance at 0x00B9B4B8>> ignored
549n/a
550n/a class J(object):
551n/a pass
552n/a
553n/a class II:
554n/a def acallback(self, ignore):
555n/a self.J
556n/a
557n/a I = II()
558n/a I.J = J
559n/a I.wr = weakref.ref(J, I.acallback)
560n/a
561n/a del I, J, II
562n/a gc.collect()
563n/a
564n/a def test_callback_in_cycle_3(self):
565n/a import gc
566n/a
567n/a # This one broke the first patch that fixed the last two. In this
568n/a # case, the objects reachable from the callback aren't also reachable
569n/a # from the object (c1) *triggering* the callback: you can get to
570n/a # c1 from c2, but not vice-versa. The result was that c2's __dict__
571n/a # got tp_clear'ed by the time the c2.cb callback got invoked.
572n/a
573n/a class C:
574n/a def cb(self, ignore):
575n/a self.me
576n/a self.c1
577n/a self.wr
578n/a
579n/a c1, c2 = C(), C()
580n/a
581n/a c2.me = c2
582n/a c2.c1 = c1
583n/a c2.wr = weakref.ref(c1, c2.cb)
584n/a
585n/a del c1, c2
586n/a gc.collect()
587n/a
588n/a def test_callback_in_cycle_4(self):
589n/a import gc
590n/a
591n/a # Like test_callback_in_cycle_3, except c2 and c1 have different
592n/a # classes. c2's class (C) isn't reachable from c1 then, so protecting
593n/a # objects reachable from the dying object (c1) isn't enough to stop
594n/a # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
595n/a # The result was a segfault (C.__mro__ was NULL when the callback
596n/a # tried to look up self.me).
597n/a
598n/a class C(object):
599n/a def cb(self, ignore):
600n/a self.me
601n/a self.c1
602n/a self.wr
603n/a
604n/a class D:
605n/a pass
606n/a
607n/a c1, c2 = D(), C()
608n/a
609n/a c2.me = c2
610n/a c2.c1 = c1
611n/a c2.wr = weakref.ref(c1, c2.cb)
612n/a
613n/a del c1, c2, C, D
614n/a gc.collect()
615n/a
616n/a @support.requires_type_collecting
617n/a def test_callback_in_cycle_resurrection(self):
618n/a import gc
619n/a
620n/a # Do something nasty in a weakref callback: resurrect objects
621n/a # from dead cycles. For this to be attempted, the weakref and
622n/a # its callback must also be part of the cyclic trash (else the
623n/a # objects reachable via the callback couldn't be in cyclic trash
624n/a # to begin with -- the callback would act like an external root).
625n/a # But gc clears trash weakrefs with callbacks early now, which
626n/a # disables the callbacks, so the callbacks shouldn't get called
627n/a # at all (and so nothing actually gets resurrected).
628n/a
629n/a alist = []
630n/a class C(object):
631n/a def __init__(self, value):
632n/a self.attribute = value
633n/a
634n/a def acallback(self, ignore):
635n/a alist.append(self.c)
636n/a
637n/a c1, c2 = C(1), C(2)
638n/a c1.c = c2
639n/a c2.c = c1
640n/a c1.wr = weakref.ref(c2, c1.acallback)
641n/a c2.wr = weakref.ref(c1, c2.acallback)
642n/a
643n/a def C_went_away(ignore):
644n/a alist.append("C went away")
645n/a wr = weakref.ref(C, C_went_away)
646n/a
647n/a del c1, c2, C # make them all trash
648n/a self.assertEqual(alist, []) # del isn't enough to reclaim anything
649n/a
650n/a gc.collect()
651n/a # c1.wr and c2.wr were part of the cyclic trash, so should have
652n/a # been cleared without their callbacks executing. OTOH, the weakref
653n/a # to C is bound to a function local (wr), and wasn't trash, so that
654n/a # callback should have been invoked when C went away.
655n/a self.assertEqual(alist, ["C went away"])
656n/a # The remaining weakref should be dead now (its callback ran).
657n/a self.assertEqual(wr(), None)
658n/a
659n/a del alist[:]
660n/a gc.collect()
661n/a self.assertEqual(alist, [])
662n/a
663n/a def test_callbacks_on_callback(self):
664n/a import gc
665n/a
666n/a # Set up weakref callbacks *on* weakref callbacks.
667n/a alist = []
668n/a def safe_callback(ignore):
669n/a alist.append("safe_callback called")
670n/a
671n/a class C(object):
672n/a def cb(self, ignore):
673n/a alist.append("cb called")
674n/a
675n/a c, d = C(), C()
676n/a c.other = d
677n/a d.other = c
678n/a callback = c.cb
679n/a c.wr = weakref.ref(d, callback) # this won't trigger
680n/a d.wr = weakref.ref(callback, d.cb) # ditto
681n/a external_wr = weakref.ref(callback, safe_callback) # but this will
682n/a self.assertIs(external_wr(), callback)
683n/a
684n/a # The weakrefs attached to c and d should get cleared, so that
685n/a # C.cb is never called. But external_wr isn't part of the cyclic
686n/a # trash, and no cyclic trash is reachable from it, so safe_callback
687n/a # should get invoked when the bound method object callback (c.cb)
688n/a # -- which is itself a callback, and also part of the cyclic trash --
689n/a # gets reclaimed at the end of gc.
690n/a
691n/a del callback, c, d, C
692n/a self.assertEqual(alist, []) # del isn't enough to clean up cycles
693n/a gc.collect()
694n/a self.assertEqual(alist, ["safe_callback called"])
695n/a self.assertEqual(external_wr(), None)
696n/a
697n/a del alist[:]
698n/a gc.collect()
699n/a self.assertEqual(alist, [])
700n/a
701n/a def test_gc_during_ref_creation(self):
702n/a self.check_gc_during_creation(weakref.ref)
703n/a
704n/a def test_gc_during_proxy_creation(self):
705n/a self.check_gc_during_creation(weakref.proxy)
706n/a
707n/a def check_gc_during_creation(self, makeref):
708n/a thresholds = gc.get_threshold()
709n/a gc.set_threshold(1, 1, 1)
710n/a gc.collect()
711n/a class A:
712n/a pass
713n/a
714n/a def callback(*args):
715n/a pass
716n/a
717n/a referenced = A()
718n/a
719n/a a = A()
720n/a a.a = a
721n/a a.wr = makeref(referenced)
722n/a
723n/a try:
724n/a # now make sure the object and the ref get labeled as
725n/a # cyclic trash:
726n/a a = A()
727n/a weakref.ref(referenced, callback)
728n/a
729n/a finally:
730n/a gc.set_threshold(*thresholds)
731n/a
732n/a def test_ref_created_during_del(self):
733n/a # Bug #1377858
734n/a # A weakref created in an object's __del__() would crash the
735n/a # interpreter when the weakref was cleaned up since it would refer to
736n/a # non-existent memory. This test should not segfault the interpreter.
737n/a class Target(object):
738n/a def __del__(self):
739n/a global ref_from_del
740n/a ref_from_del = weakref.ref(self)
741n/a
742n/a w = Target()
743n/a
744n/a def test_init(self):
745n/a # Issue 3634
746n/a # <weakref to class>.__init__() doesn't check errors correctly
747n/a r = weakref.ref(Exception)
748n/a self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
749n/a # No exception should be raised here
750n/a gc.collect()
751n/a
752n/a def test_classes(self):
753n/a # Check that classes are weakrefable.
754n/a class A(object):
755n/a pass
756n/a l = []
757n/a weakref.ref(int)
758n/a a = weakref.ref(A, l.append)
759n/a A = None
760n/a gc.collect()
761n/a self.assertEqual(a(), None)
762n/a self.assertEqual(l, [a])
763n/a
764n/a def test_equality(self):
765n/a # Alive weakrefs defer equality testing to their underlying object.
766n/a x = Object(1)
767n/a y = Object(1)
768n/a z = Object(2)
769n/a a = weakref.ref(x)
770n/a b = weakref.ref(y)
771n/a c = weakref.ref(z)
772n/a d = weakref.ref(x)
773n/a # Note how we directly test the operators here, to stress both
774n/a # __eq__ and __ne__.
775n/a self.assertTrue(a == b)
776n/a self.assertFalse(a != b)
777n/a self.assertFalse(a == c)
778n/a self.assertTrue(a != c)
779n/a self.assertTrue(a == d)
780n/a self.assertFalse(a != d)
781n/a del x, y, z
782n/a gc.collect()
783n/a for r in a, b, c:
784n/a # Sanity check
785n/a self.assertIs(r(), None)
786n/a # Dead weakrefs compare by identity: whether `a` and `d` are the
787n/a # same weakref object is an implementation detail, since they pointed
788n/a # to the same original object and didn't have a callback.
789n/a # (see issue #16453).
790n/a self.assertFalse(a == b)
791n/a self.assertTrue(a != b)
792n/a self.assertFalse(a == c)
793n/a self.assertTrue(a != c)
794n/a self.assertEqual(a == d, a is d)
795n/a self.assertEqual(a != d, a is not d)
796n/a
797n/a def test_ordering(self):
798n/a # weakrefs cannot be ordered, even if the underlying objects can.
799n/a ops = [operator.lt, operator.gt, operator.le, operator.ge]
800n/a x = Object(1)
801n/a y = Object(1)
802n/a a = weakref.ref(x)
803n/a b = weakref.ref(y)
804n/a for op in ops:
805n/a self.assertRaises(TypeError, op, a, b)
806n/a # Same when dead.
807n/a del x, y
808n/a gc.collect()
809n/a for op in ops:
810n/a self.assertRaises(TypeError, op, a, b)
811n/a
812n/a def test_hashing(self):
813n/a # Alive weakrefs hash the same as the underlying object
814n/a x = Object(42)
815n/a y = Object(42)
816n/a a = weakref.ref(x)
817n/a b = weakref.ref(y)
818n/a self.assertEqual(hash(a), hash(42))
819n/a del x, y
820n/a gc.collect()
821n/a # Dead weakrefs:
822n/a # - retain their hash is they were hashed when alive;
823n/a # - otherwise, cannot be hashed.
824n/a self.assertEqual(hash(a), hash(42))
825n/a self.assertRaises(TypeError, hash, b)
826n/a
827n/a def test_trashcan_16602(self):
828n/a # Issue #16602: when a weakref's target was part of a long
829n/a # deallocation chain, the trashcan mechanism could delay clearing
830n/a # of the weakref and make the target object visible from outside
831n/a # code even though its refcount had dropped to 0. A crash ensued.
832n/a class C:
833n/a def __init__(self, parent):
834n/a if not parent:
835n/a return
836n/a wself = weakref.ref(self)
837n/a def cb(wparent):
838n/a o = wself()
839n/a self.wparent = weakref.ref(parent, cb)
840n/a
841n/a d = weakref.WeakKeyDictionary()
842n/a root = c = C(None)
843n/a for n in range(100):
844n/a d[c] = c = C(c)
845n/a del root
846n/a gc.collect()
847n/a
848n/a def test_callback_attribute(self):
849n/a x = Object(1)
850n/a callback = lambda ref: None
851n/a ref1 = weakref.ref(x, callback)
852n/a self.assertIs(ref1.__callback__, callback)
853n/a
854n/a ref2 = weakref.ref(x)
855n/a self.assertIsNone(ref2.__callback__)
856n/a
857n/a def test_callback_attribute_after_deletion(self):
858n/a x = Object(1)
859n/a ref = weakref.ref(x, self.callback)
860n/a self.assertIsNotNone(ref.__callback__)
861n/a del x
862n/a support.gc_collect()
863n/a self.assertIsNone(ref.__callback__)
864n/a
865n/a def test_set_callback_attribute(self):
866n/a x = Object(1)
867n/a callback = lambda ref: None
868n/a ref1 = weakref.ref(x, callback)
869n/a with self.assertRaises(AttributeError):
870n/a ref1.__callback__ = lambda ref: None
871n/a
872n/a def test_callback_gcs(self):
873n/a class ObjectWithDel(Object):
874n/a def __del__(self): pass
875n/a x = ObjectWithDel(1)
876n/a ref1 = weakref.ref(x, lambda ref: support.gc_collect())
877n/a del x
878n/a support.gc_collect()
879n/a
880n/a
881n/aclass SubclassableWeakrefTestCase(TestBase):
882n/a
883n/a def test_subclass_refs(self):
884n/a class MyRef(weakref.ref):
885n/a def __init__(self, ob, callback=None, value=42):
886n/a self.value = value
887n/a super().__init__(ob, callback)
888n/a def __call__(self):
889n/a self.called = True
890n/a return super().__call__()
891n/a o = Object("foo")
892n/a mr = MyRef(o, value=24)
893n/a self.assertIs(mr(), o)
894n/a self.assertTrue(mr.called)
895n/a self.assertEqual(mr.value, 24)
896n/a del o
897n/a self.assertIsNone(mr())
898n/a self.assertTrue(mr.called)
899n/a
900n/a def test_subclass_refs_dont_replace_standard_refs(self):
901n/a class MyRef(weakref.ref):
902n/a pass
903n/a o = Object(42)
904n/a r1 = MyRef(o)
905n/a r2 = weakref.ref(o)
906n/a self.assertIsNot(r1, r2)
907n/a self.assertEqual(weakref.getweakrefs(o), [r2, r1])
908n/a self.assertEqual(weakref.getweakrefcount(o), 2)
909n/a r3 = MyRef(o)
910n/a self.assertEqual(weakref.getweakrefcount(o), 3)
911n/a refs = weakref.getweakrefs(o)
912n/a self.assertEqual(len(refs), 3)
913n/a self.assertIs(r2, refs[0])
914n/a self.assertIn(r1, refs[1:])
915n/a self.assertIn(r3, refs[1:])
916n/a
917n/a def test_subclass_refs_dont_conflate_callbacks(self):
918n/a class MyRef(weakref.ref):
919n/a pass
920n/a o = Object(42)
921n/a r1 = MyRef(o, id)
922n/a r2 = MyRef(o, str)
923n/a self.assertIsNot(r1, r2)
924n/a refs = weakref.getweakrefs(o)
925n/a self.assertIn(r1, refs)
926n/a self.assertIn(r2, refs)
927n/a
928n/a def test_subclass_refs_with_slots(self):
929n/a class MyRef(weakref.ref):
930n/a __slots__ = "slot1", "slot2"
931n/a def __new__(type, ob, callback, slot1, slot2):
932n/a return weakref.ref.__new__(type, ob, callback)
933n/a def __init__(self, ob, callback, slot1, slot2):
934n/a self.slot1 = slot1
935n/a self.slot2 = slot2
936n/a def meth(self):
937n/a return self.slot1 + self.slot2
938n/a o = Object(42)
939n/a r = MyRef(o, None, "abc", "def")
940n/a self.assertEqual(r.slot1, "abc")
941n/a self.assertEqual(r.slot2, "def")
942n/a self.assertEqual(r.meth(), "abcdef")
943n/a self.assertFalse(hasattr(r, "__dict__"))
944n/a
945n/a def test_subclass_refs_with_cycle(self):
946n/a """Confirm https://bugs.python.org/issue3100 is fixed."""
947n/a # An instance of a weakref subclass can have attributes.
948n/a # If such a weakref holds the only strong reference to the object,
949n/a # deleting the weakref will delete the object. In this case,
950n/a # the callback must not be called, because the ref object is
951n/a # being deleted.
952n/a class MyRef(weakref.ref):
953n/a pass
954n/a
955n/a # Use a local callback, for "regrtest -R::"
956n/a # to detect refcounting problems
957n/a def callback(w):
958n/a self.cbcalled += 1
959n/a
960n/a o = C()
961n/a r1 = MyRef(o, callback)
962n/a r1.o = o
963n/a del o
964n/a
965n/a del r1 # Used to crash here
966n/a
967n/a self.assertEqual(self.cbcalled, 0)
968n/a
969n/a # Same test, with two weakrefs to the same object
970n/a # (since code paths are different)
971n/a o = C()
972n/a r1 = MyRef(o, callback)
973n/a r2 = MyRef(o, callback)
974n/a r1.r = r2
975n/a r2.o = o
976n/a del o
977n/a del r2
978n/a
979n/a del r1 # Used to crash here
980n/a
981n/a self.assertEqual(self.cbcalled, 0)
982n/a
983n/a
984n/aclass WeakMethodTestCase(unittest.TestCase):
985n/a
986n/a def _subclass(self):
987n/a """Return an Object subclass overriding `some_method`."""
988n/a class C(Object):
989n/a def some_method(self):
990n/a return 6
991n/a return C
992n/a
993n/a def test_alive(self):
994n/a o = Object(1)
995n/a r = weakref.WeakMethod(o.some_method)
996n/a self.assertIsInstance(r, weakref.ReferenceType)
997n/a self.assertIsInstance(r(), type(o.some_method))
998n/a self.assertIs(r().__self__, o)
999n/a self.assertIs(r().__func__, o.some_method.__func__)
1000n/a self.assertEqual(r()(), 4)
1001n/a
1002n/a def test_object_dead(self):
1003n/a o = Object(1)
1004n/a r = weakref.WeakMethod(o.some_method)
1005n/a del o
1006n/a gc.collect()
1007n/a self.assertIs(r(), None)
1008n/a
1009n/a def test_method_dead(self):
1010n/a C = self._subclass()
1011n/a o = C(1)
1012n/a r = weakref.WeakMethod(o.some_method)
1013n/a del C.some_method
1014n/a gc.collect()
1015n/a self.assertIs(r(), None)
1016n/a
1017n/a def test_callback_when_object_dead(self):
1018n/a # Test callback behaviour when object dies first.
1019n/a C = self._subclass()
1020n/a calls = []
1021n/a def cb(arg):
1022n/a calls.append(arg)
1023n/a o = C(1)
1024n/a r = weakref.WeakMethod(o.some_method, cb)
1025n/a del o
1026n/a gc.collect()
1027n/a self.assertEqual(calls, [r])
1028n/a # Callback is only called once.
1029n/a C.some_method = Object.some_method
1030n/a gc.collect()
1031n/a self.assertEqual(calls, [r])
1032n/a
1033n/a def test_callback_when_method_dead(self):
1034n/a # Test callback behaviour when method dies first.
1035n/a C = self._subclass()
1036n/a calls = []
1037n/a def cb(arg):
1038n/a calls.append(arg)
1039n/a o = C(1)
1040n/a r = weakref.WeakMethod(o.some_method, cb)
1041n/a del C.some_method
1042n/a gc.collect()
1043n/a self.assertEqual(calls, [r])
1044n/a # Callback is only called once.
1045n/a del o
1046n/a gc.collect()
1047n/a self.assertEqual(calls, [r])
1048n/a
1049n/a @support.cpython_only
1050n/a def test_no_cycles(self):
1051n/a # A WeakMethod doesn't create any reference cycle to itself.
1052n/a o = Object(1)
1053n/a def cb(_):
1054n/a pass
1055n/a r = weakref.WeakMethod(o.some_method, cb)
1056n/a wr = weakref.ref(r)
1057n/a del r
1058n/a self.assertIs(wr(), None)
1059n/a
1060n/a def test_equality(self):
1061n/a def _eq(a, b):
1062n/a self.assertTrue(a == b)
1063n/a self.assertFalse(a != b)
1064n/a def _ne(a, b):
1065n/a self.assertTrue(a != b)
1066n/a self.assertFalse(a == b)
1067n/a x = Object(1)
1068n/a y = Object(1)
1069n/a a = weakref.WeakMethod(x.some_method)
1070n/a b = weakref.WeakMethod(y.some_method)
1071n/a c = weakref.WeakMethod(x.other_method)
1072n/a d = weakref.WeakMethod(y.other_method)
1073n/a # Objects equal, same method
1074n/a _eq(a, b)
1075n/a _eq(c, d)
1076n/a # Objects equal, different method
1077n/a _ne(a, c)
1078n/a _ne(a, d)
1079n/a _ne(b, c)
1080n/a _ne(b, d)
1081n/a # Objects unequal, same or different method
1082n/a z = Object(2)
1083n/a e = weakref.WeakMethod(z.some_method)
1084n/a f = weakref.WeakMethod(z.other_method)
1085n/a _ne(a, e)
1086n/a _ne(a, f)
1087n/a _ne(b, e)
1088n/a _ne(b, f)
1089n/a del x, y, z
1090n/a gc.collect()
1091n/a # Dead WeakMethods compare by identity
1092n/a refs = a, b, c, d, e, f
1093n/a for q in refs:
1094n/a for r in refs:
1095n/a self.assertEqual(q == r, q is r)
1096n/a self.assertEqual(q != r, q is not r)
1097n/a
1098n/a def test_hashing(self):
1099n/a # Alive WeakMethods are hashable if the underlying object is
1100n/a # hashable.
1101n/a x = Object(1)
1102n/a y = Object(1)
1103n/a a = weakref.WeakMethod(x.some_method)
1104n/a b = weakref.WeakMethod(y.some_method)
1105n/a c = weakref.WeakMethod(y.other_method)
1106n/a # Since WeakMethod objects are equal, the hashes should be equal.
1107n/a self.assertEqual(hash(a), hash(b))
1108n/a ha = hash(a)
1109n/a # Dead WeakMethods retain their old hash value
1110n/a del x, y
1111n/a gc.collect()
1112n/a self.assertEqual(hash(a), ha)
1113n/a self.assertEqual(hash(b), ha)
1114n/a # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1115n/a self.assertRaises(TypeError, hash, c)
1116n/a
1117n/a
1118n/aclass MappingTestCase(TestBase):
1119n/a
1120n/a COUNT = 10
1121n/a
1122n/a def check_len_cycles(self, dict_type, cons):
1123n/a N = 20
1124n/a items = [RefCycle() for i in range(N)]
1125n/a dct = dict_type(cons(o) for o in items)
1126n/a # Keep an iterator alive
1127n/a it = dct.items()
1128n/a try:
1129n/a next(it)
1130n/a except StopIteration:
1131n/a pass
1132n/a del items
1133n/a gc.collect()
1134n/a n1 = len(dct)
1135n/a del it
1136n/a gc.collect()
1137n/a n2 = len(dct)
1138n/a # one item may be kept alive inside the iterator
1139n/a self.assertIn(n1, (0, 1))
1140n/a self.assertEqual(n2, 0)
1141n/a
1142n/a def test_weak_keyed_len_cycles(self):
1143n/a self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1144n/a
1145n/a def test_weak_valued_len_cycles(self):
1146n/a self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1147n/a
1148n/a def check_len_race(self, dict_type, cons):
1149n/a # Extended sanity checks for len() in the face of cyclic collection
1150n/a self.addCleanup(gc.set_threshold, *gc.get_threshold())
1151n/a for th in range(1, 100):
1152n/a N = 20
1153n/a gc.collect(0)
1154n/a gc.set_threshold(th, th, th)
1155n/a items = [RefCycle() for i in range(N)]
1156n/a dct = dict_type(cons(o) for o in items)
1157n/a del items
1158n/a # All items will be collected at next garbage collection pass
1159n/a it = dct.items()
1160n/a try:
1161n/a next(it)
1162n/a except StopIteration:
1163n/a pass
1164n/a n1 = len(dct)
1165n/a del it
1166n/a n2 = len(dct)
1167n/a self.assertGreaterEqual(n1, 0)
1168n/a self.assertLessEqual(n1, N)
1169n/a self.assertGreaterEqual(n2, 0)
1170n/a self.assertLessEqual(n2, n1)
1171n/a
1172n/a def test_weak_keyed_len_race(self):
1173n/a self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1174n/a
1175n/a def test_weak_valued_len_race(self):
1176n/a self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1177n/a
1178n/a def test_weak_values(self):
1179n/a #
1180n/a # This exercises d.copy(), d.items(), d[], del d[], len(d).
1181n/a #
1182n/a dict, objects = self.make_weak_valued_dict()
1183n/a for o in objects:
1184n/a self.assertEqual(weakref.getweakrefcount(o), 1)
1185n/a self.assertIs(o, dict[o.arg],
1186n/a "wrong object returned by weak dict!")
1187n/a items1 = list(dict.items())
1188n/a items2 = list(dict.copy().items())
1189n/a items1.sort()
1190n/a items2.sort()
1191n/a self.assertEqual(items1, items2,
1192n/a "cloning of weak-valued dictionary did not work!")
1193n/a del items1, items2
1194n/a self.assertEqual(len(dict), self.COUNT)
1195n/a del objects[0]
1196n/a self.assertEqual(len(dict), self.COUNT - 1,
1197n/a "deleting object did not cause dictionary update")
1198n/a del objects, o
1199n/a self.assertEqual(len(dict), 0,
1200n/a "deleting the values did not clear the dictionary")
1201n/a # regression on SF bug #447152:
1202n/a dict = weakref.WeakValueDictionary()
1203n/a self.assertRaises(KeyError, dict.__getitem__, 1)
1204n/a dict[2] = C()
1205n/a self.assertRaises(KeyError, dict.__getitem__, 2)
1206n/a
1207n/a def test_weak_keys(self):
1208n/a #
1209n/a # This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1210n/a # len(d), k in d.
1211n/a #
1212n/a dict, objects = self.make_weak_keyed_dict()
1213n/a for o in objects:
1214n/a self.assertEqual(weakref.getweakrefcount(o), 1,
1215n/a "wrong number of weak references to %r!" % o)
1216n/a self.assertIs(o.arg, dict[o],
1217n/a "wrong object returned by weak dict!")
1218n/a items1 = dict.items()
1219n/a items2 = dict.copy().items()
1220n/a self.assertEqual(set(items1), set(items2),
1221n/a "cloning of weak-keyed dictionary did not work!")
1222n/a del items1, items2
1223n/a self.assertEqual(len(dict), self.COUNT)
1224n/a del objects[0]
1225n/a self.assertEqual(len(dict), (self.COUNT - 1),
1226n/a "deleting object did not cause dictionary update")
1227n/a del objects, o
1228n/a self.assertEqual(len(dict), 0,
1229n/a "deleting the keys did not clear the dictionary")
1230n/a o = Object(42)
1231n/a dict[o] = "What is the meaning of the universe?"
1232n/a self.assertIn(o, dict)
1233n/a self.assertNotIn(34, dict)
1234n/a
1235n/a def test_weak_keyed_iters(self):
1236n/a dict, objects = self.make_weak_keyed_dict()
1237n/a self.check_iters(dict)
1238n/a
1239n/a # Test keyrefs()
1240n/a refs = dict.keyrefs()
1241n/a self.assertEqual(len(refs), len(objects))
1242n/a objects2 = list(objects)
1243n/a for wr in refs:
1244n/a ob = wr()
1245n/a self.assertIn(ob, dict)
1246n/a self.assertIn(ob, dict)
1247n/a self.assertEqual(ob.arg, dict[ob])
1248n/a objects2.remove(ob)
1249n/a self.assertEqual(len(objects2), 0)
1250n/a
1251n/a # Test iterkeyrefs()
1252n/a objects2 = list(objects)
1253n/a self.assertEqual(len(list(dict.keyrefs())), len(objects))
1254n/a for wr in dict.keyrefs():
1255n/a ob = wr()
1256n/a self.assertIn(ob, dict)
1257n/a self.assertIn(ob, dict)
1258n/a self.assertEqual(ob.arg, dict[ob])
1259n/a objects2.remove(ob)
1260n/a self.assertEqual(len(objects2), 0)
1261n/a
1262n/a def test_weak_valued_iters(self):
1263n/a dict, objects = self.make_weak_valued_dict()
1264n/a self.check_iters(dict)
1265n/a
1266n/a # Test valuerefs()
1267n/a refs = dict.valuerefs()
1268n/a self.assertEqual(len(refs), len(objects))
1269n/a objects2 = list(objects)
1270n/a for wr in refs:
1271n/a ob = wr()
1272n/a self.assertEqual(ob, dict[ob.arg])
1273n/a self.assertEqual(ob.arg, dict[ob.arg].arg)
1274n/a objects2.remove(ob)
1275n/a self.assertEqual(len(objects2), 0)
1276n/a
1277n/a # Test itervaluerefs()
1278n/a objects2 = list(objects)
1279n/a self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1280n/a for wr in dict.itervaluerefs():
1281n/a ob = wr()
1282n/a self.assertEqual(ob, dict[ob.arg])
1283n/a self.assertEqual(ob.arg, dict[ob.arg].arg)
1284n/a objects2.remove(ob)
1285n/a self.assertEqual(len(objects2), 0)
1286n/a
1287n/a def check_iters(self, dict):
1288n/a # item iterator:
1289n/a items = list(dict.items())
1290n/a for item in dict.items():
1291n/a items.remove(item)
1292n/a self.assertFalse(items, "items() did not touch all items")
1293n/a
1294n/a # key iterator, via __iter__():
1295n/a keys = list(dict.keys())
1296n/a for k in dict:
1297n/a keys.remove(k)
1298n/a self.assertFalse(keys, "__iter__() did not touch all keys")
1299n/a
1300n/a # key iterator, via iterkeys():
1301n/a keys = list(dict.keys())
1302n/a for k in dict.keys():
1303n/a keys.remove(k)
1304n/a self.assertFalse(keys, "iterkeys() did not touch all keys")
1305n/a
1306n/a # value iterator:
1307n/a values = list(dict.values())
1308n/a for v in dict.values():
1309n/a values.remove(v)
1310n/a self.assertFalse(values,
1311n/a "itervalues() did not touch all values")
1312n/a
1313n/a def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1314n/a n = len(dict)
1315n/a it = iter(getattr(dict, iter_name)())
1316n/a next(it) # Trigger internal iteration
1317n/a # Destroy an object
1318n/a del objects[-1]
1319n/a gc.collect() # just in case
1320n/a # We have removed either the first consumed object, or another one
1321n/a self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1322n/a del it
1323n/a # The removal has been committed
1324n/a self.assertEqual(len(dict), n - 1)
1325n/a
1326n/a def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1327n/a # Check that we can explicitly mutate the weak dict without
1328n/a # interfering with delayed removal.
1329n/a # `testcontext` should create an iterator, destroy one of the
1330n/a # weakref'ed objects and then return a new key/value pair corresponding
1331n/a # to the destroyed object.
1332n/a with testcontext() as (k, v):
1333n/a self.assertNotIn(k, dict)
1334n/a with testcontext() as (k, v):
1335n/a self.assertRaises(KeyError, dict.__delitem__, k)
1336n/a self.assertNotIn(k, dict)
1337n/a with testcontext() as (k, v):
1338n/a self.assertRaises(KeyError, dict.pop, k)
1339n/a self.assertNotIn(k, dict)
1340n/a with testcontext() as (k, v):
1341n/a dict[k] = v
1342n/a self.assertEqual(dict[k], v)
1343n/a ddict = copy.copy(dict)
1344n/a with testcontext() as (k, v):
1345n/a dict.update(ddict)
1346n/a self.assertEqual(dict, ddict)
1347n/a with testcontext() as (k, v):
1348n/a dict.clear()
1349n/a self.assertEqual(len(dict), 0)
1350n/a
1351n/a def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1352n/a # Check that len() works when both iterating and removing keys
1353n/a # explicitly through various means (.pop(), .clear()...), while
1354n/a # implicit mutation is deferred because an iterator is alive.
1355n/a # (each call to testcontext() should schedule one item for removal
1356n/a # for this test to work properly)
1357n/a o = Object(123456)
1358n/a with testcontext():
1359n/a n = len(dict)
1360n/a # Since underlaying dict is ordered, first item is popped
1361n/a dict.pop(next(dict.keys()))
1362n/a self.assertEqual(len(dict), n - 1)
1363n/a dict[o] = o
1364n/a self.assertEqual(len(dict), n)
1365n/a # last item in objects is removed from dict in context shutdown
1366n/a with testcontext():
1367n/a self.assertEqual(len(dict), n - 1)
1368n/a # Then, (o, o) is popped
1369n/a dict.popitem()
1370n/a self.assertEqual(len(dict), n - 2)
1371n/a with testcontext():
1372n/a self.assertEqual(len(dict), n - 3)
1373n/a del dict[next(dict.keys())]
1374n/a self.assertEqual(len(dict), n - 4)
1375n/a with testcontext():
1376n/a self.assertEqual(len(dict), n - 5)
1377n/a dict.popitem()
1378n/a self.assertEqual(len(dict), n - 6)
1379n/a with testcontext():
1380n/a dict.clear()
1381n/a self.assertEqual(len(dict), 0)
1382n/a self.assertEqual(len(dict), 0)
1383n/a
1384n/a def test_weak_keys_destroy_while_iterating(self):
1385n/a # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1386n/a dict, objects = self.make_weak_keyed_dict()
1387n/a self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1388n/a self.check_weak_destroy_while_iterating(dict, objects, 'items')
1389n/a self.check_weak_destroy_while_iterating(dict, objects, 'values')
1390n/a self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1391n/a dict, objects = self.make_weak_keyed_dict()
1392n/a @contextlib.contextmanager
1393n/a def testcontext():
1394n/a try:
1395n/a it = iter(dict.items())
1396n/a next(it)
1397n/a # Schedule a key/value for removal and recreate it
1398n/a v = objects.pop().arg
1399n/a gc.collect() # just in case
1400n/a yield Object(v), v
1401n/a finally:
1402n/a it = None # should commit all removals
1403n/a gc.collect()
1404n/a self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1405n/a # Issue #21173: len() fragile when keys are both implicitly and
1406n/a # explicitly removed.
1407n/a dict, objects = self.make_weak_keyed_dict()
1408n/a self.check_weak_del_and_len_while_iterating(dict, testcontext)
1409n/a
1410n/a def test_weak_values_destroy_while_iterating(self):
1411n/a # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1412n/a dict, objects = self.make_weak_valued_dict()
1413n/a self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1414n/a self.check_weak_destroy_while_iterating(dict, objects, 'items')
1415n/a self.check_weak_destroy_while_iterating(dict, objects, 'values')
1416n/a self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1417n/a self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1418n/a dict, objects = self.make_weak_valued_dict()
1419n/a @contextlib.contextmanager
1420n/a def testcontext():
1421n/a try:
1422n/a it = iter(dict.items())
1423n/a next(it)
1424n/a # Schedule a key/value for removal and recreate it
1425n/a k = objects.pop().arg
1426n/a gc.collect() # just in case
1427n/a yield k, Object(k)
1428n/a finally:
1429n/a it = None # should commit all removals
1430n/a gc.collect()
1431n/a self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1432n/a dict, objects = self.make_weak_valued_dict()
1433n/a self.check_weak_del_and_len_while_iterating(dict, testcontext)
1434n/a
1435n/a def test_make_weak_keyed_dict_from_dict(self):
1436n/a o = Object(3)
1437n/a dict = weakref.WeakKeyDictionary({o:364})
1438n/a self.assertEqual(dict[o], 364)
1439n/a
1440n/a def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1441n/a o = Object(3)
1442n/a dict = weakref.WeakKeyDictionary({o:364})
1443n/a dict2 = weakref.WeakKeyDictionary(dict)
1444n/a self.assertEqual(dict[o], 364)
1445n/a
1446n/a def make_weak_keyed_dict(self):
1447n/a dict = weakref.WeakKeyDictionary()
1448n/a objects = list(map(Object, range(self.COUNT)))
1449n/a for o in objects:
1450n/a dict[o] = o.arg
1451n/a return dict, objects
1452n/a
1453n/a def test_make_weak_valued_dict_from_dict(self):
1454n/a o = Object(3)
1455n/a dict = weakref.WeakValueDictionary({364:o})
1456n/a self.assertEqual(dict[364], o)
1457n/a
1458n/a def test_make_weak_valued_dict_from_weak_valued_dict(self):
1459n/a o = Object(3)
1460n/a dict = weakref.WeakValueDictionary({364:o})
1461n/a dict2 = weakref.WeakValueDictionary(dict)
1462n/a self.assertEqual(dict[364], o)
1463n/a
1464n/a def test_make_weak_valued_dict_misc(self):
1465n/a # errors
1466n/a self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1467n/a self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1468n/a self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1469n/a # special keyword arguments
1470n/a o = Object(3)
1471n/a for kw in 'self', 'dict', 'other', 'iterable':
1472n/a d = weakref.WeakValueDictionary(**{kw: o})
1473n/a self.assertEqual(list(d.keys()), [kw])
1474n/a self.assertEqual(d[kw], o)
1475n/a
1476n/a def make_weak_valued_dict(self):
1477n/a dict = weakref.WeakValueDictionary()
1478n/a objects = list(map(Object, range(self.COUNT)))
1479n/a for o in objects:
1480n/a dict[o.arg] = o
1481n/a return dict, objects
1482n/a
1483n/a def check_popitem(self, klass, key1, value1, key2, value2):
1484n/a weakdict = klass()
1485n/a weakdict[key1] = value1
1486n/a weakdict[key2] = value2
1487n/a self.assertEqual(len(weakdict), 2)
1488n/a k, v = weakdict.popitem()
1489n/a self.assertEqual(len(weakdict), 1)
1490n/a if k is key1:
1491n/a self.assertIs(v, value1)
1492n/a else:
1493n/a self.assertIs(v, value2)
1494n/a k, v = weakdict.popitem()
1495n/a self.assertEqual(len(weakdict), 0)
1496n/a if k is key1:
1497n/a self.assertIs(v, value1)
1498n/a else:
1499n/a self.assertIs(v, value2)
1500n/a
1501n/a def test_weak_valued_dict_popitem(self):
1502n/a self.check_popitem(weakref.WeakValueDictionary,
1503n/a "key1", C(), "key2", C())
1504n/a
1505n/a def test_weak_keyed_dict_popitem(self):
1506n/a self.check_popitem(weakref.WeakKeyDictionary,
1507n/a C(), "value 1", C(), "value 2")
1508n/a
1509n/a def check_setdefault(self, klass, key, value1, value2):
1510n/a self.assertIsNot(value1, value2,
1511n/a "invalid test"
1512n/a " -- value parameters must be distinct objects")
1513n/a weakdict = klass()
1514n/a o = weakdict.setdefault(key, value1)
1515n/a self.assertIs(o, value1)
1516n/a self.assertIn(key, weakdict)
1517n/a self.assertIs(weakdict.get(key), value1)
1518n/a self.assertIs(weakdict[key], value1)
1519n/a
1520n/a o = weakdict.setdefault(key, value2)
1521n/a self.assertIs(o, value1)
1522n/a self.assertIn(key, weakdict)
1523n/a self.assertIs(weakdict.get(key), value1)
1524n/a self.assertIs(weakdict[key], value1)
1525n/a
1526n/a def test_weak_valued_dict_setdefault(self):
1527n/a self.check_setdefault(weakref.WeakValueDictionary,
1528n/a "key", C(), C())
1529n/a
1530n/a def test_weak_keyed_dict_setdefault(self):
1531n/a self.check_setdefault(weakref.WeakKeyDictionary,
1532n/a C(), "value 1", "value 2")
1533n/a
1534n/a def check_update(self, klass, dict):
1535n/a #
1536n/a # This exercises d.update(), len(d), d.keys(), k in d,
1537n/a # d.get(), d[].
1538n/a #
1539n/a weakdict = klass()
1540n/a weakdict.update(dict)
1541n/a self.assertEqual(len(weakdict), len(dict))
1542n/a for k in weakdict.keys():
1543n/a self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1544n/a v = dict.get(k)
1545n/a self.assertIs(v, weakdict[k])
1546n/a self.assertIs(v, weakdict.get(k))
1547n/a for k in dict.keys():
1548n/a self.assertIn(k, weakdict, "original key disappeared in weak dict")
1549n/a v = dict[k]
1550n/a self.assertIs(v, weakdict[k])
1551n/a self.assertIs(v, weakdict.get(k))
1552n/a
1553n/a def test_weak_valued_dict_update(self):
1554n/a self.check_update(weakref.WeakValueDictionary,
1555n/a {1: C(), 'a': C(), C(): C()})
1556n/a # errors
1557n/a self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1558n/a d = weakref.WeakValueDictionary()
1559n/a self.assertRaises(TypeError, d.update, {}, {})
1560n/a self.assertRaises(TypeError, d.update, (), ())
1561n/a self.assertEqual(list(d.keys()), [])
1562n/a # special keyword arguments
1563n/a o = Object(3)
1564n/a for kw in 'self', 'dict', 'other', 'iterable':
1565n/a d = weakref.WeakValueDictionary()
1566n/a d.update(**{kw: o})
1567n/a self.assertEqual(list(d.keys()), [kw])
1568n/a self.assertEqual(d[kw], o)
1569n/a
1570n/a def test_weak_keyed_dict_update(self):
1571n/a self.check_update(weakref.WeakKeyDictionary,
1572n/a {C(): 1, C(): 2, C(): 3})
1573n/a
1574n/a def test_weak_keyed_delitem(self):
1575n/a d = weakref.WeakKeyDictionary()
1576n/a o1 = Object('1')
1577n/a o2 = Object('2')
1578n/a d[o1] = 'something'
1579n/a d[o2] = 'something'
1580n/a self.assertEqual(len(d), 2)
1581n/a del d[o1]
1582n/a self.assertEqual(len(d), 1)
1583n/a self.assertEqual(list(d.keys()), [o2])
1584n/a
1585n/a def test_weak_valued_delitem(self):
1586n/a d = weakref.WeakValueDictionary()
1587n/a o1 = Object('1')
1588n/a o2 = Object('2')
1589n/a d['something'] = o1
1590n/a d['something else'] = o2
1591n/a self.assertEqual(len(d), 2)
1592n/a del d['something']
1593n/a self.assertEqual(len(d), 1)
1594n/a self.assertEqual(list(d.items()), [('something else', o2)])
1595n/a
1596n/a def test_weak_keyed_bad_delitem(self):
1597n/a d = weakref.WeakKeyDictionary()
1598n/a o = Object('1')
1599n/a # An attempt to delete an object that isn't there should raise
1600n/a # KeyError. It didn't before 2.3.
1601n/a self.assertRaises(KeyError, d.__delitem__, o)
1602n/a self.assertRaises(KeyError, d.__getitem__, o)
1603n/a
1604n/a # If a key isn't of a weakly referencable type, __getitem__ and
1605n/a # __setitem__ raise TypeError. __delitem__ should too.
1606n/a self.assertRaises(TypeError, d.__delitem__, 13)
1607n/a self.assertRaises(TypeError, d.__getitem__, 13)
1608n/a self.assertRaises(TypeError, d.__setitem__, 13, 13)
1609n/a
1610n/a def test_weak_keyed_cascading_deletes(self):
1611n/a # SF bug 742860. For some reason, before 2.3 __delitem__ iterated
1612n/a # over the keys via self.data.iterkeys(). If things vanished from
1613n/a # the dict during this (or got added), that caused a RuntimeError.
1614n/a
1615n/a d = weakref.WeakKeyDictionary()
1616n/a mutate = False
1617n/a
1618n/a class C(object):
1619n/a def __init__(self, i):
1620n/a self.value = i
1621n/a def __hash__(self):
1622n/a return hash(self.value)
1623n/a def __eq__(self, other):
1624n/a if mutate:
1625n/a # Side effect that mutates the dict, by removing the
1626n/a # last strong reference to a key.
1627n/a del objs[-1]
1628n/a return self.value == other.value
1629n/a
1630n/a objs = [C(i) for i in range(4)]
1631n/a for o in objs:
1632n/a d[o] = o.value
1633n/a del o # now the only strong references to keys are in objs
1634n/a # Find the order in which iterkeys sees the keys.
1635n/a objs = list(d.keys())
1636n/a # Reverse it, so that the iteration implementation of __delitem__
1637n/a # has to keep looping to find the first object we delete.
1638n/a objs.reverse()
1639n/a
1640n/a # Turn on mutation in C.__eq__. The first time thru the loop,
1641n/a # under the iterkeys() business the first comparison will delete
1642n/a # the last item iterkeys() would see, and that causes a
1643n/a # RuntimeError: dictionary changed size during iteration
1644n/a # when the iterkeys() loop goes around to try comparing the next
1645n/a # key. After this was fixed, it just deletes the last object *our*
1646n/a # "for o in obj" loop would have gotten to.
1647n/a mutate = True
1648n/a count = 0
1649n/a for o in objs:
1650n/a count += 1
1651n/a del d[o]
1652n/a self.assertEqual(len(d), 0)
1653n/a self.assertEqual(count, 2)
1654n/a
1655n/a def test_make_weak_valued_dict_repr(self):
1656n/a dict = weakref.WeakValueDictionary()
1657n/a self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1658n/a
1659n/a def test_make_weak_keyed_dict_repr(self):
1660n/a dict = weakref.WeakKeyDictionary()
1661n/a self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1662n/a
1663n/a def test_threaded_weak_valued_setdefault(self):
1664n/a d = weakref.WeakValueDictionary()
1665n/a with collect_in_thread():
1666n/a for i in range(100000):
1667n/a x = d.setdefault(10, RefCycle())
1668n/a self.assertIsNot(x, None) # we never put None in there!
1669n/a del x
1670n/a
1671n/a def test_threaded_weak_valued_pop(self):
1672n/a d = weakref.WeakValueDictionary()
1673n/a with collect_in_thread():
1674n/a for i in range(100000):
1675n/a d[10] = RefCycle()
1676n/a x = d.pop(10, 10)
1677n/a self.assertIsNot(x, None) # we never put None in there!
1678n/a
1679n/a def test_threaded_weak_valued_consistency(self):
1680n/a # Issue #28427: old keys should not remove new values from
1681n/a # WeakValueDictionary when collecting from another thread.
1682n/a d = weakref.WeakValueDictionary()
1683n/a with collect_in_thread():
1684n/a for i in range(200000):
1685n/a o = RefCycle()
1686n/a d[10] = o
1687n/a # o is still alive, so the dict can't be empty
1688n/a self.assertEqual(len(d), 1)
1689n/a o = None # lose ref
1690n/a
1691n/a
1692n/afrom test import mapping_tests
1693n/a
1694n/aclass WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1695n/a """Check that WeakValueDictionary conforms to the mapping protocol"""
1696n/a __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1697n/a type2test = weakref.WeakValueDictionary
1698n/a def _reference(self):
1699n/a return self.__ref.copy()
1700n/a
1701n/aclass WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1702n/a """Check that WeakKeyDictionary conforms to the mapping protocol"""
1703n/a __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1704n/a type2test = weakref.WeakKeyDictionary
1705n/a def _reference(self):
1706n/a return self.__ref.copy()
1707n/a
1708n/a
1709n/aclass FinalizeTestCase(unittest.TestCase):
1710n/a
1711n/a class A:
1712n/a pass
1713n/a
1714n/a def _collect_if_necessary(self):
1715n/a # we create no ref-cycles so in CPython no gc should be needed
1716n/a if sys.implementation.name != 'cpython':
1717n/a support.gc_collect()
1718n/a
1719n/a def test_finalize(self):
1720n/a def add(x,y,z):
1721n/a res.append(x + y + z)
1722n/a return x + y + z
1723n/a
1724n/a a = self.A()
1725n/a
1726n/a res = []
1727n/a f = weakref.finalize(a, add, 67, 43, z=89)
1728n/a self.assertEqual(f.alive, True)
1729n/a self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1730n/a self.assertEqual(f(), 199)
1731n/a self.assertEqual(f(), None)
1732n/a self.assertEqual(f(), None)
1733n/a self.assertEqual(f.peek(), None)
1734n/a self.assertEqual(f.detach(), None)
1735n/a self.assertEqual(f.alive, False)
1736n/a self.assertEqual(res, [199])
1737n/a
1738n/a res = []
1739n/a f = weakref.finalize(a, add, 67, 43, 89)
1740n/a self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
1741n/a self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
1742n/a self.assertEqual(f(), None)
1743n/a self.assertEqual(f(), None)
1744n/a self.assertEqual(f.peek(), None)
1745n/a self.assertEqual(f.detach(), None)
1746n/a self.assertEqual(f.alive, False)
1747n/a self.assertEqual(res, [])
1748n/a
1749n/a res = []
1750n/a f = weakref.finalize(a, add, x=67, y=43, z=89)
1751n/a del a
1752n/a self._collect_if_necessary()
1753n/a self.assertEqual(f(), None)
1754n/a self.assertEqual(f(), None)
1755n/a self.assertEqual(f.peek(), None)
1756n/a self.assertEqual(f.detach(), None)
1757n/a self.assertEqual(f.alive, False)
1758n/a self.assertEqual(res, [199])
1759n/a
1760n/a def test_order(self):
1761n/a a = self.A()
1762n/a res = []
1763n/a
1764n/a f1 = weakref.finalize(a, res.append, 'f1')
1765n/a f2 = weakref.finalize(a, res.append, 'f2')
1766n/a f3 = weakref.finalize(a, res.append, 'f3')
1767n/a f4 = weakref.finalize(a, res.append, 'f4')
1768n/a f5 = weakref.finalize(a, res.append, 'f5')
1769n/a
1770n/a # make sure finalizers can keep themselves alive
1771n/a del f1, f4
1772n/a
1773n/a self.assertTrue(f2.alive)
1774n/a self.assertTrue(f3.alive)
1775n/a self.assertTrue(f5.alive)
1776n/a
1777n/a self.assertTrue(f5.detach())
1778n/a self.assertFalse(f5.alive)
1779n/a
1780n/a f5() # nothing because previously unregistered
1781n/a res.append('A')
1782n/a f3() # => res.append('f3')
1783n/a self.assertFalse(f3.alive)
1784n/a res.append('B')
1785n/a f3() # nothing because previously called
1786n/a res.append('C')
1787n/a del a
1788n/a self._collect_if_necessary()
1789n/a # => res.append('f4')
1790n/a # => res.append('f2')
1791n/a # => res.append('f1')
1792n/a self.assertFalse(f2.alive)
1793n/a res.append('D')
1794n/a f2() # nothing because previously called by gc
1795n/a
1796n/a expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
1797n/a self.assertEqual(res, expected)
1798n/a
1799n/a def test_all_freed(self):
1800n/a # we want a weakrefable subclass of weakref.finalize
1801n/a class MyFinalizer(weakref.finalize):
1802n/a pass
1803n/a
1804n/a a = self.A()
1805n/a res = []
1806n/a def callback():
1807n/a res.append(123)
1808n/a f = MyFinalizer(a, callback)
1809n/a
1810n/a wr_callback = weakref.ref(callback)
1811n/a wr_f = weakref.ref(f)
1812n/a del callback, f
1813n/a
1814n/a self.assertIsNotNone(wr_callback())
1815n/a self.assertIsNotNone(wr_f())
1816n/a
1817n/a del a
1818n/a self._collect_if_necessary()
1819n/a
1820n/a self.assertIsNone(wr_callback())
1821n/a self.assertIsNone(wr_f())
1822n/a self.assertEqual(res, [123])
1823n/a
1824n/a @classmethod
1825n/a def run_in_child(cls):
1826n/a def error():
1827n/a # Create an atexit finalizer from inside a finalizer called
1828n/a # at exit. This should be the next to be run.
1829n/a g1 = weakref.finalize(cls, print, 'g1')
1830n/a print('f3 error')
1831n/a 1/0
1832n/a
1833n/a # cls should stay alive till atexit callbacks run
1834n/a f1 = weakref.finalize(cls, print, 'f1', _global_var)
1835n/a f2 = weakref.finalize(cls, print, 'f2', _global_var)
1836n/a f3 = weakref.finalize(cls, error)
1837n/a f4 = weakref.finalize(cls, print, 'f4', _global_var)
1838n/a
1839n/a assert f1.atexit == True
1840n/a f2.atexit = False
1841n/a assert f3.atexit == True
1842n/a assert f4.atexit == True
1843n/a
1844n/a def test_atexit(self):
1845n/a prog = ('from test.test_weakref import FinalizeTestCase;'+
1846n/a 'FinalizeTestCase.run_in_child()')
1847n/a rc, out, err = script_helper.assert_python_ok('-c', prog)
1848n/a out = out.decode('ascii').splitlines()
1849n/a self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
1850n/a self.assertTrue(b'ZeroDivisionError' in err)
1851n/a
1852n/a
1853n/alibreftest = """ Doctest for examples in the library reference: weakref.rst
1854n/a
1855n/a>>> import weakref
1856n/a>>> class Dict(dict):
1857n/a... pass
1858n/a...
1859n/a>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable
1860n/a>>> r = weakref.ref(obj)
1861n/a>>> print(r() is obj)
1862n/aTrue
1863n/a
1864n/a>>> import weakref
1865n/a>>> class Object:
1866n/a... pass
1867n/a...
1868n/a>>> o = Object()
1869n/a>>> r = weakref.ref(o)
1870n/a>>> o2 = r()
1871n/a>>> o is o2
1872n/aTrue
1873n/a>>> del o, o2
1874n/a>>> print(r())
1875n/aNone
1876n/a
1877n/a>>> import weakref
1878n/a>>> class ExtendedRef(weakref.ref):
1879n/a... def __init__(self, ob, callback=None, **annotations):
1880n/a... super().__init__(ob, callback)
1881n/a... self.__counter = 0
1882n/a... for k, v in annotations.items():
1883n/a... setattr(self, k, v)
1884n/a... def __call__(self):
1885n/a... '''Return a pair containing the referent and the number of
1886n/a... times the reference has been called.
1887n/a... '''
1888n/a... ob = super().__call__()
1889n/a... if ob is not None:
1890n/a... self.__counter += 1
1891n/a... ob = (ob, self.__counter)
1892n/a... return ob
1893n/a...
1894n/a>>> class A: # not in docs from here, just testing the ExtendedRef
1895n/a... pass
1896n/a...
1897n/a>>> a = A()
1898n/a>>> r = ExtendedRef(a, foo=1, bar="baz")
1899n/a>>> r.foo
1900n/a1
1901n/a>>> r.bar
1902n/a'baz'
1903n/a>>> r()[1]
1904n/a1
1905n/a>>> r()[1]
1906n/a2
1907n/a>>> r()[0] is a
1908n/aTrue
1909n/a
1910n/a
1911n/a>>> import weakref
1912n/a>>> _id2obj_dict = weakref.WeakValueDictionary()
1913n/a>>> def remember(obj):
1914n/a... oid = id(obj)
1915n/a... _id2obj_dict[oid] = obj
1916n/a... return oid
1917n/a...
1918n/a>>> def id2obj(oid):
1919n/a... return _id2obj_dict[oid]
1920n/a...
1921n/a>>> a = A() # from here, just testing
1922n/a>>> a_id = remember(a)
1923n/a>>> id2obj(a_id) is a
1924n/aTrue
1925n/a>>> del a
1926n/a>>> try:
1927n/a... id2obj(a_id)
1928n/a... except KeyError:
1929n/a... print('OK')
1930n/a... else:
1931n/a... print('WeakValueDictionary error')
1932n/aOK
1933n/a
1934n/a"""
1935n/a
1936n/a__test__ = {'libreftest' : libreftest}
1937n/a
1938n/adef test_main():
1939n/a support.run_unittest(
1940n/a ReferencesTestCase,
1941n/a WeakMethodTestCase,
1942n/a MappingTestCase,
1943n/a WeakValueDictionaryTestCase,
1944n/a WeakKeyDictionaryTestCase,
1945n/a SubclassableWeakrefTestCase,
1946n/a FinalizeTestCase,
1947n/a )
1948n/a support.run_doctest(sys.modules[__name__])
1949n/a
1950n/a
1951n/aif __name__ == "__main__":
1952n/a test_main()