ยปCore Development>Code coverage>Lib/test/test_pickle.py

Python code coverage for Lib/test/test_pickle.py

#countcontent
1n/afrom _compat_pickle import (IMPORT_MAPPING, REVERSE_IMPORT_MAPPING,
2n/a NAME_MAPPING, REVERSE_NAME_MAPPING)
3n/aimport builtins
4n/aimport pickle
5n/aimport io
6n/aimport collections
7n/aimport struct
8n/aimport sys
9n/a
10n/aimport unittest
11n/afrom test import support
12n/a
13n/afrom test.pickletester import AbstractUnpickleTests
14n/afrom test.pickletester import AbstractPickleTests
15n/afrom test.pickletester import AbstractPickleModuleTests
16n/afrom test.pickletester import AbstractPersistentPicklerTests
17n/afrom test.pickletester import AbstractIdentityPersistentPicklerTests
18n/afrom test.pickletester import AbstractPicklerUnpicklerObjectTests
19n/afrom test.pickletester import AbstractDispatchTableTests
20n/afrom test.pickletester import BigmemPickleTests
21n/a
22n/atry:
23n/a import _pickle
24n/a has_c_implementation = True
25n/aexcept ImportError:
26n/a has_c_implementation = False
27n/a
28n/a
29n/aclass PickleTests(AbstractPickleModuleTests):
30n/a pass
31n/a
32n/a
33n/aclass PyUnpicklerTests(AbstractUnpickleTests):
34n/a
35n/a unpickler = pickle._Unpickler
36n/a bad_stack_errors = (IndexError,)
37n/a truncated_errors = (pickle.UnpicklingError, EOFError,
38n/a AttributeError, ValueError,
39n/a struct.error, IndexError, ImportError)
40n/a
41n/a def loads(self, buf, **kwds):
42n/a f = io.BytesIO(buf)
43n/a u = self.unpickler(f, **kwds)
44n/a return u.load()
45n/a
46n/a
47n/aclass PyPicklerTests(AbstractPickleTests):
48n/a
49n/a pickler = pickle._Pickler
50n/a unpickler = pickle._Unpickler
51n/a
52n/a def dumps(self, arg, proto=None):
53n/a f = io.BytesIO()
54n/a p = self.pickler(f, proto)
55n/a p.dump(arg)
56n/a f.seek(0)
57n/a return bytes(f.read())
58n/a
59n/a def loads(self, buf, **kwds):
60n/a f = io.BytesIO(buf)
61n/a u = self.unpickler(f, **kwds)
62n/a return u.load()
63n/a
64n/a
65n/aclass InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests,
66n/a BigmemPickleTests):
67n/a
68n/a pickler = pickle._Pickler
69n/a unpickler = pickle._Unpickler
70n/a bad_stack_errors = (pickle.UnpicklingError, IndexError)
71n/a truncated_errors = (pickle.UnpicklingError, EOFError,
72n/a AttributeError, ValueError,
73n/a struct.error, IndexError, ImportError)
74n/a
75n/a def dumps(self, arg, protocol=None):
76n/a return pickle.dumps(arg, protocol)
77n/a
78n/a def loads(self, buf, **kwds):
79n/a return pickle.loads(buf, **kwds)
80n/a
81n/a
82n/aclass PersistentPicklerUnpicklerMixin(object):
83n/a
84n/a def dumps(self, arg, proto=None):
85n/a class PersPickler(self.pickler):
86n/a def persistent_id(subself, obj):
87n/a return self.persistent_id(obj)
88n/a f = io.BytesIO()
89n/a p = PersPickler(f, proto)
90n/a p.dump(arg)
91n/a return f.getvalue()
92n/a
93n/a def loads(self, buf, **kwds):
94n/a class PersUnpickler(self.unpickler):
95n/a def persistent_load(subself, obj):
96n/a return self.persistent_load(obj)
97n/a f = io.BytesIO(buf)
98n/a u = PersUnpickler(f, **kwds)
99n/a return u.load()
100n/a
101n/a
102n/aclass PyPersPicklerTests(AbstractPersistentPicklerTests,
103n/a PersistentPicklerUnpicklerMixin):
104n/a
105n/a pickler = pickle._Pickler
106n/a unpickler = pickle._Unpickler
107n/a
108n/a
109n/aclass PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests,
110n/a PersistentPicklerUnpicklerMixin):
111n/a
112n/a pickler = pickle._Pickler
113n/a unpickler = pickle._Unpickler
114n/a
115n/a
116n/aclass PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
117n/a
118n/a pickler_class = pickle._Pickler
119n/a unpickler_class = pickle._Unpickler
120n/a
121n/a
122n/aclass PyDispatchTableTests(AbstractDispatchTableTests):
123n/a
124n/a pickler_class = pickle._Pickler
125n/a
126n/a def get_dispatch_table(self):
127n/a return pickle.dispatch_table.copy()
128n/a
129n/a
130n/aclass PyChainDispatchTableTests(AbstractDispatchTableTests):
131n/a
132n/a pickler_class = pickle._Pickler
133n/a
134n/a def get_dispatch_table(self):
135n/a return collections.ChainMap({}, pickle.dispatch_table)
136n/a
137n/a
138n/aif has_c_implementation:
139n/a class CUnpicklerTests(PyUnpicklerTests):
140n/a unpickler = _pickle.Unpickler
141n/a bad_stack_errors = (pickle.UnpicklingError,)
142n/a truncated_errors = (pickle.UnpicklingError,)
143n/a
144n/a class CPicklerTests(PyPicklerTests):
145n/a pickler = _pickle.Pickler
146n/a unpickler = _pickle.Unpickler
147n/a
148n/a class CPersPicklerTests(PyPersPicklerTests):
149n/a pickler = _pickle.Pickler
150n/a unpickler = _pickle.Unpickler
151n/a
152n/a class CIdPersPicklerTests(PyIdPersPicklerTests):
153n/a pickler = _pickle.Pickler
154n/a unpickler = _pickle.Unpickler
155n/a
156n/a class CDumpPickle_LoadPickle(PyPicklerTests):
157n/a pickler = _pickle.Pickler
158n/a unpickler = pickle._Unpickler
159n/a
160n/a class DumpPickle_CLoadPickle(PyPicklerTests):
161n/a pickler = pickle._Pickler
162n/a unpickler = _pickle.Unpickler
163n/a
164n/a class CPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests):
165n/a pickler_class = _pickle.Pickler
166n/a unpickler_class = _pickle.Unpickler
167n/a
168n/a def test_issue18339(self):
169n/a unpickler = self.unpickler_class(io.BytesIO())
170n/a with self.assertRaises(TypeError):
171n/a unpickler.memo = object
172n/a # used to cause a segfault
173n/a with self.assertRaises(ValueError):
174n/a unpickler.memo = {-1: None}
175n/a unpickler.memo = {1: None}
176n/a
177n/a class CDispatchTableTests(AbstractDispatchTableTests):
178n/a pickler_class = pickle.Pickler
179n/a def get_dispatch_table(self):
180n/a return pickle.dispatch_table.copy()
181n/a
182n/a class CChainDispatchTableTests(AbstractDispatchTableTests):
183n/a pickler_class = pickle.Pickler
184n/a def get_dispatch_table(self):
185n/a return collections.ChainMap({}, pickle.dispatch_table)
186n/a
187n/a @support.cpython_only
188n/a class SizeofTests(unittest.TestCase):
189n/a check_sizeof = support.check_sizeof
190n/a
191n/a def test_pickler(self):
192n/a basesize = support.calcobjsize('5P2n3i2n3iP')
193n/a p = _pickle.Pickler(io.BytesIO())
194n/a self.assertEqual(object.__sizeof__(p), basesize)
195n/a MT_size = struct.calcsize('3nP0n')
196n/a ME_size = struct.calcsize('Pn0P')
197n/a check = self.check_sizeof
198n/a check(p, basesize +
199n/a MT_size + 8 * ME_size + # Minimal memo table size.
200n/a sys.getsizeof(b'x'*4096)) # Minimal write buffer size.
201n/a for i in range(6):
202n/a p.dump(chr(i))
203n/a check(p, basesize +
204n/a MT_size + 32 * ME_size + # Size of memo table required to
205n/a # save references to 6 objects.
206n/a 0) # Write buffer is cleared after every dump().
207n/a
208n/a def test_unpickler(self):
209n/a basesize = support.calcobjsize('2Pn2P 2P2n2i5P 2P3n6P2n2i')
210n/a unpickler = _pickle.Unpickler
211n/a P = struct.calcsize('P') # Size of memo table entry.
212n/a n = struct.calcsize('n') # Size of mark table entry.
213n/a check = self.check_sizeof
214n/a for encoding in 'ASCII', 'UTF-16', 'latin-1':
215n/a for errors in 'strict', 'replace':
216n/a u = unpickler(io.BytesIO(),
217n/a encoding=encoding, errors=errors)
218n/a self.assertEqual(object.__sizeof__(u), basesize)
219n/a check(u, basesize +
220n/a 32 * P + # Minimal memo table size.
221n/a len(encoding) + 1 + len(errors) + 1)
222n/a
223n/a stdsize = basesize + len('ASCII') + 1 + len('strict') + 1
224n/a def check_unpickler(data, memo_size, marks_size):
225n/a dump = pickle.dumps(data)
226n/a u = unpickler(io.BytesIO(dump),
227n/a encoding='ASCII', errors='strict')
228n/a u.load()
229n/a check(u, stdsize + memo_size * P + marks_size * n)
230n/a
231n/a check_unpickler(0, 32, 0)
232n/a # 20 is minimal non-empty mark stack size.
233n/a check_unpickler([0] * 100, 32, 20)
234n/a # 128 is memo table size required to save references to 100 objects.
235n/a check_unpickler([chr(i) for i in range(100)], 128, 20)
236n/a def recurse(deep):
237n/a data = 0
238n/a for i in range(deep):
239n/a data = [data, data]
240n/a return data
241n/a check_unpickler(recurse(0), 32, 0)
242n/a check_unpickler(recurse(1), 32, 20)
243n/a check_unpickler(recurse(20), 32, 58)
244n/a check_unpickler(recurse(50), 64, 58)
245n/a check_unpickler(recurse(100), 128, 134)
246n/a
247n/a u = unpickler(io.BytesIO(pickle.dumps('a', 0)),
248n/a encoding='ASCII', errors='strict')
249n/a u.load()
250n/a check(u, stdsize + 32 * P + 2 + 1)
251n/a
252n/a
253n/aALT_IMPORT_MAPPING = {
254n/a ('_elementtree', 'xml.etree.ElementTree'),
255n/a ('cPickle', 'pickle'),
256n/a ('StringIO', 'io'),
257n/a ('cStringIO', 'io'),
258n/a}
259n/a
260n/aALT_NAME_MAPPING = {
261n/a ('__builtin__', 'basestring', 'builtins', 'str'),
262n/a ('exceptions', 'StandardError', 'builtins', 'Exception'),
263n/a ('UserDict', 'UserDict', 'collections', 'UserDict'),
264n/a ('socket', '_socketobject', 'socket', 'SocketType'),
265n/a}
266n/a
267n/adef mapping(module, name):
268n/a if (module, name) in NAME_MAPPING:
269n/a module, name = NAME_MAPPING[(module, name)]
270n/a elif module in IMPORT_MAPPING:
271n/a module = IMPORT_MAPPING[module]
272n/a return module, name
273n/a
274n/adef reverse_mapping(module, name):
275n/a if (module, name) in REVERSE_NAME_MAPPING:
276n/a module, name = REVERSE_NAME_MAPPING[(module, name)]
277n/a elif module in REVERSE_IMPORT_MAPPING:
278n/a module = REVERSE_IMPORT_MAPPING[module]
279n/a return module, name
280n/a
281n/adef getmodule(module):
282n/a try:
283n/a return sys.modules[module]
284n/a except KeyError:
285n/a try:
286n/a __import__(module)
287n/a except AttributeError as exc:
288n/a if support.verbose:
289n/a print("Can't import module %r: %s" % (module, exc))
290n/a raise ImportError
291n/a except ImportError as exc:
292n/a if support.verbose:
293n/a print(exc)
294n/a raise
295n/a return sys.modules[module]
296n/a
297n/adef getattribute(module, name):
298n/a obj = getmodule(module)
299n/a for n in name.split('.'):
300n/a obj = getattr(obj, n)
301n/a return obj
302n/a
303n/adef get_exceptions(mod):
304n/a for name in dir(mod):
305n/a attr = getattr(mod, name)
306n/a if isinstance(attr, type) and issubclass(attr, BaseException):
307n/a yield name, attr
308n/a
309n/aclass CompatPickleTests(unittest.TestCase):
310n/a def test_import(self):
311n/a modules = set(IMPORT_MAPPING.values())
312n/a modules |= set(REVERSE_IMPORT_MAPPING)
313n/a modules |= {module for module, name in REVERSE_NAME_MAPPING}
314n/a modules |= {module for module, name in NAME_MAPPING.values()}
315n/a for module in modules:
316n/a try:
317n/a getmodule(module)
318n/a except ImportError:
319n/a pass
320n/a
321n/a def test_import_mapping(self):
322n/a for module3, module2 in REVERSE_IMPORT_MAPPING.items():
323n/a with self.subTest((module3, module2)):
324n/a try:
325n/a getmodule(module3)
326n/a except ImportError:
327n/a pass
328n/a if module3[:1] != '_':
329n/a self.assertIn(module2, IMPORT_MAPPING)
330n/a self.assertEqual(IMPORT_MAPPING[module2], module3)
331n/a
332n/a def test_name_mapping(self):
333n/a for (module3, name3), (module2, name2) in REVERSE_NAME_MAPPING.items():
334n/a with self.subTest(((module3, name3), (module2, name2))):
335n/a if (module2, name2) == ('exceptions', 'OSError'):
336n/a attr = getattribute(module3, name3)
337n/a self.assertTrue(issubclass(attr, OSError))
338n/a elif (module2, name2) == ('exceptions', 'ImportError'):
339n/a attr = getattribute(module3, name3)
340n/a self.assertTrue(issubclass(attr, ImportError))
341n/a else:
342n/a module, name = mapping(module2, name2)
343n/a if module3[:1] != '_':
344n/a self.assertEqual((module, name), (module3, name3))
345n/a try:
346n/a attr = getattribute(module3, name3)
347n/a except ImportError:
348n/a pass
349n/a else:
350n/a self.assertEqual(getattribute(module, name), attr)
351n/a
352n/a def test_reverse_import_mapping(self):
353n/a for module2, module3 in IMPORT_MAPPING.items():
354n/a with self.subTest((module2, module3)):
355n/a try:
356n/a getmodule(module3)
357n/a except ImportError as exc:
358n/a if support.verbose:
359n/a print(exc)
360n/a if ((module2, module3) not in ALT_IMPORT_MAPPING and
361n/a REVERSE_IMPORT_MAPPING.get(module3, None) != module2):
362n/a for (m3, n3), (m2, n2) in REVERSE_NAME_MAPPING.items():
363n/a if (module3, module2) == (m3, m2):
364n/a break
365n/a else:
366n/a self.fail('No reverse mapping from %r to %r' %
367n/a (module3, module2))
368n/a module = REVERSE_IMPORT_MAPPING.get(module3, module3)
369n/a module = IMPORT_MAPPING.get(module, module)
370n/a self.assertEqual(module, module3)
371n/a
372n/a def test_reverse_name_mapping(self):
373n/a for (module2, name2), (module3, name3) in NAME_MAPPING.items():
374n/a with self.subTest(((module2, name2), (module3, name3))):
375n/a try:
376n/a attr = getattribute(module3, name3)
377n/a except ImportError:
378n/a pass
379n/a module, name = reverse_mapping(module3, name3)
380n/a if (module2, name2, module3, name3) not in ALT_NAME_MAPPING:
381n/a self.assertEqual((module, name), (module2, name2))
382n/a module, name = mapping(module, name)
383n/a self.assertEqual((module, name), (module3, name3))
384n/a
385n/a def test_exceptions(self):
386n/a self.assertEqual(mapping('exceptions', 'StandardError'),
387n/a ('builtins', 'Exception'))
388n/a self.assertEqual(mapping('exceptions', 'Exception'),
389n/a ('builtins', 'Exception'))
390n/a self.assertEqual(reverse_mapping('builtins', 'Exception'),
391n/a ('exceptions', 'Exception'))
392n/a self.assertEqual(mapping('exceptions', 'OSError'),
393n/a ('builtins', 'OSError'))
394n/a self.assertEqual(reverse_mapping('builtins', 'OSError'),
395n/a ('exceptions', 'OSError'))
396n/a
397n/a for name, exc in get_exceptions(builtins):
398n/a with self.subTest(name):
399n/a if exc in (BlockingIOError,
400n/a ResourceWarning,
401n/a StopAsyncIteration,
402n/a RecursionError):
403n/a continue
404n/a if exc is not OSError and issubclass(exc, OSError):
405n/a self.assertEqual(reverse_mapping('builtins', name),
406n/a ('exceptions', 'OSError'))
407n/a elif exc is not ImportError and issubclass(exc, ImportError):
408n/a self.assertEqual(reverse_mapping('builtins', name),
409n/a ('exceptions', 'ImportError'))
410n/a self.assertEqual(mapping('exceptions', name),
411n/a ('exceptions', name))
412n/a else:
413n/a self.assertEqual(reverse_mapping('builtins', name),
414n/a ('exceptions', name))
415n/a self.assertEqual(mapping('exceptions', name),
416n/a ('builtins', name))
417n/a
418n/a def test_multiprocessing_exceptions(self):
419n/a module = support.import_module('multiprocessing.context')
420n/a for name, exc in get_exceptions(module):
421n/a with self.subTest(name):
422n/a self.assertEqual(reverse_mapping('multiprocessing.context', name),
423n/a ('multiprocessing', name))
424n/a self.assertEqual(mapping('multiprocessing', name),
425n/a ('multiprocessing.context', name))
426n/a
427n/a
428n/adef test_main():
429n/a tests = [PickleTests, PyUnpicklerTests, PyPicklerTests,
430n/a PyPersPicklerTests, PyIdPersPicklerTests,
431n/a PyDispatchTableTests, PyChainDispatchTableTests,
432n/a CompatPickleTests]
433n/a if has_c_implementation:
434n/a tests.extend([CUnpicklerTests, CPicklerTests,
435n/a CPersPicklerTests, CIdPersPicklerTests,
436n/a CDumpPickle_LoadPickle, DumpPickle_CLoadPickle,
437n/a PyPicklerUnpicklerObjectTests,
438n/a CPicklerUnpicklerObjectTests,
439n/a CDispatchTableTests, CChainDispatchTableTests,
440n/a InMemoryPickleTests, SizeofTests])
441n/a support.run_unittest(*tests)
442n/a support.run_doctest(pickle)
443n/a
444n/aif __name__ == "__main__":
445n/a test_main()