ยปCore Development>Code coverage>Lib/test/test_zlib.py

Python code coverage for Lib/test/test_zlib.py

#countcontent
1n/aimport unittest
2n/afrom test import support
3n/aimport binascii
4n/aimport pickle
5n/aimport random
6n/aimport sys
7n/afrom test.support import bigmemtest, _1G, _4G
8n/a
9n/azlib = support.import_module('zlib')
10n/a
11n/arequires_Compress_copy = unittest.skipUnless(
12n/a hasattr(zlib.compressobj(), "copy"),
13n/a 'requires Compress.copy()')
14n/arequires_Decompress_copy = unittest.skipUnless(
15n/a hasattr(zlib.decompressobj(), "copy"),
16n/a 'requires Decompress.copy()')
17n/a
18n/a
19n/aclass VersionTestCase(unittest.TestCase):
20n/a
21n/a def test_library_version(self):
22n/a # Test that the major version of the actual library in use matches the
23n/a # major version that we were compiled against. We can't guarantee that
24n/a # the minor versions will match (even on the machine on which the module
25n/a # was compiled), and the API is stable between minor versions, so
26n/a # testing only the major versions avoids spurious failures.
27n/a self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
28n/a
29n/a
30n/aclass ChecksumTestCase(unittest.TestCase):
31n/a # checksum test cases
32n/a def test_crc32start(self):
33n/a self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
34n/a self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
35n/a
36n/a def test_crc32empty(self):
37n/a self.assertEqual(zlib.crc32(b"", 0), 0)
38n/a self.assertEqual(zlib.crc32(b"", 1), 1)
39n/a self.assertEqual(zlib.crc32(b"", 432), 432)
40n/a
41n/a def test_adler32start(self):
42n/a self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
43n/a self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
44n/a
45n/a def test_adler32empty(self):
46n/a self.assertEqual(zlib.adler32(b"", 0), 0)
47n/a self.assertEqual(zlib.adler32(b"", 1), 1)
48n/a self.assertEqual(zlib.adler32(b"", 432), 432)
49n/a
50n/a def test_penguins(self):
51n/a self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
52n/a self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
53n/a self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
54n/a self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
55n/a
56n/a self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
57n/a self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
58n/a
59n/a def test_crc32_adler32_unsigned(self):
60n/a foo = b'abcdefghijklmnop'
61n/a # explicitly test signed behavior
62n/a self.assertEqual(zlib.crc32(foo), 2486878355)
63n/a self.assertEqual(zlib.crc32(b'spam'), 1138425661)
64n/a self.assertEqual(zlib.adler32(foo+foo), 3573550353)
65n/a self.assertEqual(zlib.adler32(b'spam'), 72286642)
66n/a
67n/a def test_same_as_binascii_crc32(self):
68n/a foo = b'abcdefghijklmnop'
69n/a crc = 2486878355
70n/a self.assertEqual(binascii.crc32(foo), crc)
71n/a self.assertEqual(zlib.crc32(foo), crc)
72n/a self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
73n/a
74n/a
75n/a# Issue #10276 - check that inputs >=4GB are handled correctly.
76n/aclass ChecksumBigBufferTestCase(unittest.TestCase):
77n/a
78n/a @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
79n/a def test_big_buffer(self, size):
80n/a data = b"nyan" * (_1G + 1)
81n/a self.assertEqual(zlib.crc32(data), 1044521549)
82n/a self.assertEqual(zlib.adler32(data), 2256789997)
83n/a
84n/a
85n/aclass ExceptionTestCase(unittest.TestCase):
86n/a # make sure we generate some expected errors
87n/a def test_badlevel(self):
88n/a # specifying compression level out of range causes an error
89n/a # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
90n/a # accepts 0 too)
91n/a self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
92n/a
93n/a def test_badargs(self):
94n/a self.assertRaises(TypeError, zlib.adler32)
95n/a self.assertRaises(TypeError, zlib.crc32)
96n/a self.assertRaises(TypeError, zlib.compress)
97n/a self.assertRaises(TypeError, zlib.decompress)
98n/a for arg in (42, None, '', 'abc', (), []):
99n/a self.assertRaises(TypeError, zlib.adler32, arg)
100n/a self.assertRaises(TypeError, zlib.crc32, arg)
101n/a self.assertRaises(TypeError, zlib.compress, arg)
102n/a self.assertRaises(TypeError, zlib.decompress, arg)
103n/a
104n/a def test_badcompressobj(self):
105n/a # verify failure on building compress object with bad params
106n/a self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
107n/a # specifying total bits too large causes an error
108n/a self.assertRaises(ValueError,
109n/a zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
110n/a
111n/a def test_baddecompressobj(self):
112n/a # verify failure on building decompress object with bad params
113n/a self.assertRaises(ValueError, zlib.decompressobj, -1)
114n/a
115n/a def test_decompressobj_badflush(self):
116n/a # verify failure on calling decompressobj.flush with bad params
117n/a self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
118n/a self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
119n/a
120n/a @support.cpython_only
121n/a def test_overflow(self):
122n/a with self.assertRaisesRegex(OverflowError, 'int too large'):
123n/a zlib.decompress(b'', 15, sys.maxsize + 1)
124n/a with self.assertRaisesRegex(OverflowError, 'int too large'):
125n/a zlib.decompressobj().decompress(b'', sys.maxsize + 1)
126n/a with self.assertRaisesRegex(OverflowError, 'int too large'):
127n/a zlib.decompressobj().flush(sys.maxsize + 1)
128n/a
129n/a
130n/aclass BaseCompressTestCase(object):
131n/a def check_big_compress_buffer(self, size, compress_func):
132n/a _1M = 1024 * 1024
133n/a # Generate 10MB worth of random, and expand it by repeating it.
134n/a # The assumption is that zlib's memory is not big enough to exploit
135n/a # such spread out redundancy.
136n/a data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
137n/a for i in range(10)])
138n/a data = data * (size // len(data) + 1)
139n/a try:
140n/a compress_func(data)
141n/a finally:
142n/a # Release memory
143n/a data = None
144n/a
145n/a def check_big_decompress_buffer(self, size, decompress_func):
146n/a data = b'x' * size
147n/a try:
148n/a compressed = zlib.compress(data, 1)
149n/a finally:
150n/a # Release memory
151n/a data = None
152n/a data = decompress_func(compressed)
153n/a # Sanity check
154n/a try:
155n/a self.assertEqual(len(data), size)
156n/a self.assertEqual(len(data.strip(b'x')), 0)
157n/a finally:
158n/a data = None
159n/a
160n/a
161n/aclass CompressTestCase(BaseCompressTestCase, unittest.TestCase):
162n/a # Test compression in one go (whole message compression)
163n/a def test_speech(self):
164n/a x = zlib.compress(HAMLET_SCENE)
165n/a self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
166n/a
167n/a def test_keywords(self):
168n/a x = zlib.compress(HAMLET_SCENE, level=3)
169n/a self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
170n/a with self.assertRaises(TypeError):
171n/a zlib.compress(data=HAMLET_SCENE, level=3)
172n/a self.assertEqual(zlib.decompress(x,
173n/a wbits=zlib.MAX_WBITS,
174n/a bufsize=zlib.DEF_BUF_SIZE),
175n/a HAMLET_SCENE)
176n/a
177n/a def test_speech128(self):
178n/a # compress more data
179n/a data = HAMLET_SCENE * 128
180n/a x = zlib.compress(data)
181n/a self.assertEqual(zlib.compress(bytearray(data)), x)
182n/a for ob in x, bytearray(x):
183n/a self.assertEqual(zlib.decompress(ob), data)
184n/a
185n/a def test_incomplete_stream(self):
186n/a # A useful error message is given
187n/a x = zlib.compress(HAMLET_SCENE)
188n/a self.assertRaisesRegex(zlib.error,
189n/a "Error -5 while decompressing data: incomplete or truncated stream",
190n/a zlib.decompress, x[:-1])
191n/a
192n/a # Memory use of the following functions takes into account overallocation
193n/a
194n/a @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
195n/a def test_big_compress_buffer(self, size):
196n/a compress = lambda s: zlib.compress(s, 1)
197n/a self.check_big_compress_buffer(size, compress)
198n/a
199n/a @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
200n/a def test_big_decompress_buffer(self, size):
201n/a self.check_big_decompress_buffer(size, zlib.decompress)
202n/a
203n/a @bigmemtest(size=_4G, memuse=1)
204n/a def test_large_bufsize(self, size):
205n/a # Test decompress(bufsize) parameter greater than the internal limit
206n/a data = HAMLET_SCENE * 10
207n/a compressed = zlib.compress(data, 1)
208n/a self.assertEqual(zlib.decompress(compressed, 15, size), data)
209n/a
210n/a def test_custom_bufsize(self):
211n/a data = HAMLET_SCENE * 10
212n/a compressed = zlib.compress(data, 1)
213n/a self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214n/a
215n/a @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216n/a @bigmemtest(size=_4G + 100, memuse=4)
217n/a def test_64bit_compress(self, size):
218n/a data = b'x' * size
219n/a try:
220n/a comp = zlib.compress(data, 0)
221n/a self.assertEqual(zlib.decompress(comp), data)
222n/a finally:
223n/a comp = data = None
224n/a
225n/a
226n/aclass CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
227n/a # Test compression object
228n/a def test_pair(self):
229n/a # straightforward compress/decompress objects
230n/a datasrc = HAMLET_SCENE * 128
231n/a datazip = zlib.compress(datasrc)
232n/a # should compress both bytes and bytearray data
233n/a for data in (datasrc, bytearray(datasrc)):
234n/a co = zlib.compressobj()
235n/a x1 = co.compress(data)
236n/a x2 = co.flush()
237n/a self.assertRaises(zlib.error, co.flush) # second flush should not work
238n/a self.assertEqual(x1 + x2, datazip)
239n/a for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240n/a dco = zlib.decompressobj()
241n/a y1 = dco.decompress(v1 + v2)
242n/a y2 = dco.flush()
243n/a self.assertEqual(data, y1 + y2)
244n/a self.assertIsInstance(dco.unconsumed_tail, bytes)
245n/a self.assertIsInstance(dco.unused_data, bytes)
246n/a
247n/a def test_keywords(self):
248n/a level = 2
249n/a method = zlib.DEFLATED
250n/a wbits = -12
251n/a memLevel = 9
252n/a strategy = zlib.Z_FILTERED
253n/a co = zlib.compressobj(level=level,
254n/a method=method,
255n/a wbits=wbits,
256n/a memLevel=memLevel,
257n/a strategy=strategy,
258n/a zdict=b"")
259n/a do = zlib.decompressobj(wbits=wbits, zdict=b"")
260n/a with self.assertRaises(TypeError):
261n/a co.compress(data=HAMLET_SCENE)
262n/a with self.assertRaises(TypeError):
263n/a do.decompress(data=zlib.compress(HAMLET_SCENE))
264n/a x = co.compress(HAMLET_SCENE) + co.flush()
265n/a y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266n/a self.assertEqual(HAMLET_SCENE, y)
267n/a
268n/a def test_compressoptions(self):
269n/a # specify lots of options to compressobj()
270n/a level = 2
271n/a method = zlib.DEFLATED
272n/a wbits = -12
273n/a memLevel = 9
274n/a strategy = zlib.Z_FILTERED
275n/a co = zlib.compressobj(level, method, wbits, memLevel, strategy)
276n/a x1 = co.compress(HAMLET_SCENE)
277n/a x2 = co.flush()
278n/a dco = zlib.decompressobj(wbits)
279n/a y1 = dco.decompress(x1 + x2)
280n/a y2 = dco.flush()
281n/a self.assertEqual(HAMLET_SCENE, y1 + y2)
282n/a
283n/a def test_compressincremental(self):
284n/a # compress object in steps, decompress object as one-shot
285n/a data = HAMLET_SCENE * 128
286n/a co = zlib.compressobj()
287n/a bufs = []
288n/a for i in range(0, len(data), 256):
289n/a bufs.append(co.compress(data[i:i+256]))
290n/a bufs.append(co.flush())
291n/a combuf = b''.join(bufs)
292n/a
293n/a dco = zlib.decompressobj()
294n/a y1 = dco.decompress(b''.join(bufs))
295n/a y2 = dco.flush()
296n/a self.assertEqual(data, y1 + y2)
297n/a
298n/a def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
299n/a # compress object in steps, decompress object in steps
300n/a source = source or HAMLET_SCENE
301n/a data = source * 128
302n/a co = zlib.compressobj()
303n/a bufs = []
304n/a for i in range(0, len(data), cx):
305n/a bufs.append(co.compress(data[i:i+cx]))
306n/a bufs.append(co.flush())
307n/a combuf = b''.join(bufs)
308n/a
309n/a decombuf = zlib.decompress(combuf)
310n/a # Test type of return value
311n/a self.assertIsInstance(decombuf, bytes)
312n/a
313n/a self.assertEqual(data, decombuf)
314n/a
315n/a dco = zlib.decompressobj()
316n/a bufs = []
317n/a for i in range(0, len(combuf), dcx):
318n/a bufs.append(dco.decompress(combuf[i:i+dcx]))
319n/a self.assertEqual(b'', dco.unconsumed_tail, ########
320n/a "(A) uct should be b'': not %d long" %
321n/a len(dco.unconsumed_tail))
322n/a self.assertEqual(b'', dco.unused_data)
323n/a if flush:
324n/a bufs.append(dco.flush())
325n/a else:
326n/a while True:
327n/a chunk = dco.decompress(b'')
328n/a if chunk:
329n/a bufs.append(chunk)
330n/a else:
331n/a break
332n/a self.assertEqual(b'', dco.unconsumed_tail, ########
333n/a "(B) uct should be b'': not %d long" %
334n/a len(dco.unconsumed_tail))
335n/a self.assertEqual(b'', dco.unused_data)
336n/a self.assertEqual(data, b''.join(bufs))
337n/a # Failure means: "decompressobj with init options failed"
338n/a
339n/a def test_decompincflush(self):
340n/a self.test_decompinc(flush=True)
341n/a
342n/a def test_decompimax(self, source=None, cx=256, dcx=64):
343n/a # compress in steps, decompress in length-restricted steps
344n/a source = source or HAMLET_SCENE
345n/a # Check a decompression object with max_length specified
346n/a data = source * 128
347n/a co = zlib.compressobj()
348n/a bufs = []
349n/a for i in range(0, len(data), cx):
350n/a bufs.append(co.compress(data[i:i+cx]))
351n/a bufs.append(co.flush())
352n/a combuf = b''.join(bufs)
353n/a self.assertEqual(data, zlib.decompress(combuf),
354n/a 'compressed data failure')
355n/a
356n/a dco = zlib.decompressobj()
357n/a bufs = []
358n/a cb = combuf
359n/a while cb:
360n/a #max_length = 1 + len(cb)//10
361n/a chunk = dco.decompress(cb, dcx)
362n/a self.assertFalse(len(chunk) > dcx,
363n/a 'chunk too big (%d>%d)' % (len(chunk), dcx))
364n/a bufs.append(chunk)
365n/a cb = dco.unconsumed_tail
366n/a bufs.append(dco.flush())
367n/a self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
368n/a
369n/a def test_decompressmaxlen(self, flush=False):
370n/a # Check a decompression object with max_length specified
371n/a data = HAMLET_SCENE * 128
372n/a co = zlib.compressobj()
373n/a bufs = []
374n/a for i in range(0, len(data), 256):
375n/a bufs.append(co.compress(data[i:i+256]))
376n/a bufs.append(co.flush())
377n/a combuf = b''.join(bufs)
378n/a self.assertEqual(data, zlib.decompress(combuf),
379n/a 'compressed data failure')
380n/a
381n/a dco = zlib.decompressobj()
382n/a bufs = []
383n/a cb = combuf
384n/a while cb:
385n/a max_length = 1 + len(cb)//10
386n/a chunk = dco.decompress(cb, max_length)
387n/a self.assertFalse(len(chunk) > max_length,
388n/a 'chunk too big (%d>%d)' % (len(chunk),max_length))
389n/a bufs.append(chunk)
390n/a cb = dco.unconsumed_tail
391n/a if flush:
392n/a bufs.append(dco.flush())
393n/a else:
394n/a while chunk:
395n/a chunk = dco.decompress(b'', max_length)
396n/a self.assertFalse(len(chunk) > max_length,
397n/a 'chunk too big (%d>%d)' % (len(chunk),max_length))
398n/a bufs.append(chunk)
399n/a self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
400n/a
401n/a def test_decompressmaxlenflush(self):
402n/a self.test_decompressmaxlen(flush=True)
403n/a
404n/a def test_maxlenmisc(self):
405n/a # Misc tests of max_length
406n/a dco = zlib.decompressobj()
407n/a self.assertRaises(ValueError, dco.decompress, b"", -1)
408n/a self.assertEqual(b'', dco.unconsumed_tail)
409n/a
410n/a def test_maxlen_large(self):
411n/a # Sizes up to sys.maxsize should be accepted, although zlib is
412n/a # internally limited to expressing sizes with unsigned int
413n/a data = HAMLET_SCENE * 10
414n/a self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415n/a compressed = zlib.compress(data, 1)
416n/a dco = zlib.decompressobj()
417n/a self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418n/a
419n/a def test_maxlen_custom(self):
420n/a data = HAMLET_SCENE * 10
421n/a compressed = zlib.compress(data, 1)
422n/a dco = zlib.decompressobj()
423n/a self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424n/a
425n/a def test_clear_unconsumed_tail(self):
426n/a # Issue #12050: calling decompress() without providing max_length
427n/a # should clear the unconsumed_tail attribute.
428n/a cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
429n/a dco = zlib.decompressobj()
430n/a ddata = dco.decompress(cdata, 1)
431n/a ddata += dco.decompress(dco.unconsumed_tail)
432n/a self.assertEqual(dco.unconsumed_tail, b"")
433n/a
434n/a def test_flushes(self):
435n/a # Test flush() with the various options, using all the
436n/a # different levels in order to provide more variations.
437n/a sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
438n/a sync_opt = [getattr(zlib, opt) for opt in sync_opt
439n/a if hasattr(zlib, opt)]
440n/a data = HAMLET_SCENE * 8
441n/a
442n/a for sync in sync_opt:
443n/a for level in range(10):
444n/a obj = zlib.compressobj( level )
445n/a a = obj.compress( data[:3000] )
446n/a b = obj.flush( sync )
447n/a c = obj.compress( data[3000:] )
448n/a d = obj.flush()
449n/a self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
450n/a data, ("Decompress failed: flush "
451n/a "mode=%i, level=%i") % (sync, level))
452n/a del obj
453n/a
454n/a @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
455n/a 'requires zlib.Z_SYNC_FLUSH')
456n/a def test_odd_flush(self):
457n/a # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
458n/a import random
459n/a # Testing on 17K of "random" data
460n/a
461n/a # Create compressor and decompressor objects
462n/a co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
463n/a dco = zlib.decompressobj()
464n/a
465n/a # Try 17K of data
466n/a # generate random data stream
467n/a try:
468n/a # In 2.3 and later, WichmannHill is the RNG of the bug report
469n/a gen = random.WichmannHill()
470n/a except AttributeError:
471n/a try:
472n/a # 2.2 called it Random
473n/a gen = random.Random()
474n/a except AttributeError:
475n/a # others might simply have a single RNG
476n/a gen = random
477n/a gen.seed(1)
478n/a data = genblock(1, 17 * 1024, generator=gen)
479n/a
480n/a # compress, sync-flush, and decompress
481n/a first = co.compress(data)
482n/a second = co.flush(zlib.Z_SYNC_FLUSH)
483n/a expanded = dco.decompress(first + second)
484n/a
485n/a # if decompressed data is different from the input data, choke.
486n/a self.assertEqual(expanded, data, "17K random source doesn't match")
487n/a
488n/a def test_empty_flush(self):
489n/a # Test that calling .flush() on unused objects works.
490n/a # (Bug #1083110 -- calling .flush() on decompress objects
491n/a # caused a core dump.)
492n/a
493n/a co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
494n/a self.assertTrue(co.flush()) # Returns a zlib header
495n/a dco = zlib.decompressobj()
496n/a self.assertEqual(dco.flush(), b"") # Returns nothing
497n/a
498n/a def test_dictionary(self):
499n/a h = HAMLET_SCENE
500n/a # Build a simulated dictionary out of the words in HAMLET.
501n/a words = h.split()
502n/a random.shuffle(words)
503n/a zdict = b''.join(words)
504n/a # Use it to compress HAMLET.
505n/a co = zlib.compressobj(zdict=zdict)
506n/a cd = co.compress(h) + co.flush()
507n/a # Verify that it will decompress with the dictionary.
508n/a dco = zlib.decompressobj(zdict=zdict)
509n/a self.assertEqual(dco.decompress(cd) + dco.flush(), h)
510n/a # Verify that it fails when not given the dictionary.
511n/a dco = zlib.decompressobj()
512n/a self.assertRaises(zlib.error, dco.decompress, cd)
513n/a
514n/a def test_dictionary_streaming(self):
515n/a # This simulates the reuse of a compressor object for compressing
516n/a # several separate data streams.
517n/a co = zlib.compressobj(zdict=HAMLET_SCENE)
518n/a do = zlib.decompressobj(zdict=HAMLET_SCENE)
519n/a piece = HAMLET_SCENE[1000:1500]
520n/a d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
521n/a d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
522n/a d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
523n/a self.assertEqual(do.decompress(d0), piece)
524n/a self.assertEqual(do.decompress(d1), piece[100:])
525n/a self.assertEqual(do.decompress(d2), piece[:-100])
526n/a
527n/a def test_decompress_incomplete_stream(self):
528n/a # This is 'foo', deflated
529n/a x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
530n/a # For the record
531n/a self.assertEqual(zlib.decompress(x), b'foo')
532n/a self.assertRaises(zlib.error, zlib.decompress, x[:-5])
533n/a # Omitting the stream end works with decompressor objects
534n/a # (see issue #8672).
535n/a dco = zlib.decompressobj()
536n/a y = dco.decompress(x[:-5])
537n/a y += dco.flush()
538n/a self.assertEqual(y, b'foo')
539n/a
540n/a def test_decompress_eof(self):
541n/a x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
542n/a dco = zlib.decompressobj()
543n/a self.assertFalse(dco.eof)
544n/a dco.decompress(x[:-5])
545n/a self.assertFalse(dco.eof)
546n/a dco.decompress(x[-5:])
547n/a self.assertTrue(dco.eof)
548n/a dco.flush()
549n/a self.assertTrue(dco.eof)
550n/a
551n/a def test_decompress_eof_incomplete_stream(self):
552n/a x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
553n/a dco = zlib.decompressobj()
554n/a self.assertFalse(dco.eof)
555n/a dco.decompress(x[:-5])
556n/a self.assertFalse(dco.eof)
557n/a dco.flush()
558n/a self.assertFalse(dco.eof)
559n/a
560n/a def test_decompress_unused_data(self):
561n/a # Repeated calls to decompress() after EOF should accumulate data in
562n/a # dco.unused_data, instead of just storing the arg to the last call.
563n/a source = b'abcdefghijklmnopqrstuvwxyz'
564n/a remainder = b'0123456789'
565n/a y = zlib.compress(source)
566n/a x = y + remainder
567n/a for maxlen in 0, 1000:
568n/a for step in 1, 2, len(y), len(x):
569n/a dco = zlib.decompressobj()
570n/a data = b''
571n/a for i in range(0, len(x), step):
572n/a if i < len(y):
573n/a self.assertEqual(dco.unused_data, b'')
574n/a if maxlen == 0:
575n/a data += dco.decompress(x[i : i + step])
576n/a self.assertEqual(dco.unconsumed_tail, b'')
577n/a else:
578n/a data += dco.decompress(
579n/a dco.unconsumed_tail + x[i : i + step], maxlen)
580n/a data += dco.flush()
581n/a self.assertTrue(dco.eof)
582n/a self.assertEqual(data, source)
583n/a self.assertEqual(dco.unconsumed_tail, b'')
584n/a self.assertEqual(dco.unused_data, remainder)
585n/a
586n/a # issue27164
587n/a def test_decompress_raw_with_dictionary(self):
588n/a zdict = b'abcdefghijklmnopqrstuvwxyz'
589n/a co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
590n/a comp = co.compress(zdict) + co.flush()
591n/a dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
592n/a uncomp = dco.decompress(comp) + dco.flush()
593n/a self.assertEqual(zdict, uncomp)
594n/a
595n/a def test_flush_with_freed_input(self):
596n/a # Issue #16411: decompressor accesses input to last decompress() call
597n/a # in flush(), even if this object has been freed in the meanwhile.
598n/a input1 = b'abcdefghijklmnopqrstuvwxyz'
599n/a input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
600n/a data = zlib.compress(input1)
601n/a dco = zlib.decompressobj()
602n/a dco.decompress(data, 1)
603n/a del data
604n/a data = zlib.compress(input2)
605n/a self.assertEqual(dco.flush(), input1[1:])
606n/a
607n/a @bigmemtest(size=_4G, memuse=1)
608n/a def test_flush_large_length(self, size):
609n/a # Test flush(length) parameter greater than internal limit UINT_MAX
610n/a input = HAMLET_SCENE * 10
611n/a data = zlib.compress(input, 1)
612n/a dco = zlib.decompressobj()
613n/a dco.decompress(data, 1)
614n/a self.assertEqual(dco.flush(size), input[1:])
615n/a
616n/a def test_flush_custom_length(self):
617n/a input = HAMLET_SCENE * 10
618n/a data = zlib.compress(input, 1)
619n/a dco = zlib.decompressobj()
620n/a dco.decompress(data, 1)
621n/a self.assertEqual(dco.flush(CustomInt()), input[1:])
622n/a
623n/a @requires_Compress_copy
624n/a def test_compresscopy(self):
625n/a # Test copying a compression object
626n/a data0 = HAMLET_SCENE
627n/a data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
628n/a c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
629n/a bufs0 = []
630n/a bufs0.append(c0.compress(data0))
631n/a
632n/a c1 = c0.copy()
633n/a bufs1 = bufs0[:]
634n/a
635n/a bufs0.append(c0.compress(data0))
636n/a bufs0.append(c0.flush())
637n/a s0 = b''.join(bufs0)
638n/a
639n/a bufs1.append(c1.compress(data1))
640n/a bufs1.append(c1.flush())
641n/a s1 = b''.join(bufs1)
642n/a
643n/a self.assertEqual(zlib.decompress(s0),data0+data0)
644n/a self.assertEqual(zlib.decompress(s1),data0+data1)
645n/a
646n/a @requires_Compress_copy
647n/a def test_badcompresscopy(self):
648n/a # Test copying a compression object in an inconsistent state
649n/a c = zlib.compressobj()
650n/a c.compress(HAMLET_SCENE)
651n/a c.flush()
652n/a self.assertRaises(ValueError, c.copy)
653n/a
654n/a @requires_Decompress_copy
655n/a def test_decompresscopy(self):
656n/a # Test copying a decompression object
657n/a data = HAMLET_SCENE
658n/a comp = zlib.compress(data)
659n/a # Test type of return value
660n/a self.assertIsInstance(comp, bytes)
661n/a
662n/a d0 = zlib.decompressobj()
663n/a bufs0 = []
664n/a bufs0.append(d0.decompress(comp[:32]))
665n/a
666n/a d1 = d0.copy()
667n/a bufs1 = bufs0[:]
668n/a
669n/a bufs0.append(d0.decompress(comp[32:]))
670n/a s0 = b''.join(bufs0)
671n/a
672n/a bufs1.append(d1.decompress(comp[32:]))
673n/a s1 = b''.join(bufs1)
674n/a
675n/a self.assertEqual(s0,s1)
676n/a self.assertEqual(s0,data)
677n/a
678n/a @requires_Decompress_copy
679n/a def test_baddecompresscopy(self):
680n/a # Test copying a compression object in an inconsistent state
681n/a data = zlib.compress(HAMLET_SCENE)
682n/a d = zlib.decompressobj()
683n/a d.decompress(data)
684n/a d.flush()
685n/a self.assertRaises(ValueError, d.copy)
686n/a
687n/a def test_compresspickle(self):
688n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
689n/a with self.assertRaises((TypeError, pickle.PicklingError)):
690n/a pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
691n/a
692n/a def test_decompresspickle(self):
693n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
694n/a with self.assertRaises((TypeError, pickle.PicklingError)):
695n/a pickle.dumps(zlib.decompressobj(), proto)
696n/a
697n/a # Memory use of the following functions takes into account overallocation
698n/a
699n/a @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
700n/a def test_big_compress_buffer(self, size):
701n/a c = zlib.compressobj(1)
702n/a compress = lambda s: c.compress(s) + c.flush()
703n/a self.check_big_compress_buffer(size, compress)
704n/a
705n/a @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
706n/a def test_big_decompress_buffer(self, size):
707n/a d = zlib.decompressobj()
708n/a decompress = lambda s: d.decompress(s) + d.flush()
709n/a self.check_big_decompress_buffer(size, decompress)
710n/a
711n/a @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
712n/a @bigmemtest(size=_4G + 100, memuse=4)
713n/a def test_64bit_compress(self, size):
714n/a data = b'x' * size
715n/a co = zlib.compressobj(0)
716n/a do = zlib.decompressobj()
717n/a try:
718n/a comp = co.compress(data) + co.flush()
719n/a uncomp = do.decompress(comp) + do.flush()
720n/a self.assertEqual(uncomp, data)
721n/a finally:
722n/a comp = uncomp = data = None
723n/a
724n/a @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
725n/a @bigmemtest(size=_4G + 100, memuse=3)
726n/a def test_large_unused_data(self, size):
727n/a data = b'abcdefghijklmnop'
728n/a unused = b'x' * size
729n/a comp = zlib.compress(data) + unused
730n/a do = zlib.decompressobj()
731n/a try:
732n/a uncomp = do.decompress(comp) + do.flush()
733n/a self.assertEqual(unused, do.unused_data)
734n/a self.assertEqual(uncomp, data)
735n/a finally:
736n/a unused = comp = do = None
737n/a
738n/a @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
739n/a @bigmemtest(size=_4G + 100, memuse=5)
740n/a def test_large_unconsumed_tail(self, size):
741n/a data = b'x' * size
742n/a do = zlib.decompressobj()
743n/a try:
744n/a comp = zlib.compress(data, 0)
745n/a uncomp = do.decompress(comp, 1) + do.flush()
746n/a self.assertEqual(uncomp, data)
747n/a self.assertEqual(do.unconsumed_tail, b'')
748n/a finally:
749n/a comp = uncomp = data = None
750n/a
751n/a def test_wbits(self):
752n/a # wbits=0 only supported since zlib v1.2.3.5
753n/a # Register "1.2.3" as "1.2.3.0"
754n/a v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4)
755n/a supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \
756n/a and (int(v[1]) > 2 or int(v[1]) == 2
757n/a and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5))
758n/a
759n/a co = zlib.compressobj(level=1, wbits=15)
760n/a zlib15 = co.compress(HAMLET_SCENE) + co.flush()
761n/a self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
762n/a if supports_wbits_0:
763n/a self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
764n/a self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
765n/a with self.assertRaisesRegex(zlib.error, 'invalid window size'):
766n/a zlib.decompress(zlib15, 14)
767n/a dco = zlib.decompressobj(wbits=32 + 15)
768n/a self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
769n/a dco = zlib.decompressobj(wbits=14)
770n/a with self.assertRaisesRegex(zlib.error, 'invalid window size'):
771n/a dco.decompress(zlib15)
772n/a
773n/a co = zlib.compressobj(level=1, wbits=9)
774n/a zlib9 = co.compress(HAMLET_SCENE) + co.flush()
775n/a self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
776n/a self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
777n/a if supports_wbits_0:
778n/a self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
779n/a self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
780n/a dco = zlib.decompressobj(wbits=32 + 9)
781n/a self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
782n/a
783n/a co = zlib.compressobj(level=1, wbits=-15)
784n/a deflate15 = co.compress(HAMLET_SCENE) + co.flush()
785n/a self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
786n/a dco = zlib.decompressobj(wbits=-15)
787n/a self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
788n/a
789n/a co = zlib.compressobj(level=1, wbits=-9)
790n/a deflate9 = co.compress(HAMLET_SCENE) + co.flush()
791n/a self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
792n/a self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
793n/a dco = zlib.decompressobj(wbits=-9)
794n/a self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
795n/a
796n/a co = zlib.compressobj(level=1, wbits=16 + 15)
797n/a gzip = co.compress(HAMLET_SCENE) + co.flush()
798n/a self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
799n/a self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
800n/a dco = zlib.decompressobj(32 + 15)
801n/a self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
802n/a
803n/a
804n/adef genblock(seed, length, step=1024, generator=random):
805n/a """length-byte stream of random data from a seed (in step-byte blocks)."""
806n/a if seed is not None:
807n/a generator.seed(seed)
808n/a randint = generator.randint
809n/a if length < step or step < 2:
810n/a step = length
811n/a blocks = bytes()
812n/a for i in range(0, length, step):
813n/a blocks += bytes(randint(0, 255) for x in range(step))
814n/a return blocks
815n/a
816n/a
817n/a
818n/adef choose_lines(source, number, seed=None, generator=random):
819n/a """Return a list of number lines randomly chosen from the source"""
820n/a if seed is not None:
821n/a generator.seed(seed)
822n/a sources = source.split('\n')
823n/a return [generator.choice(sources) for n in range(number)]
824n/a
825n/a
826n/a
827n/aHAMLET_SCENE = b"""
828n/aLAERTES
829n/a
830n/a O, fear me not.
831n/a I stay too long: but here my father comes.
832n/a
833n/a Enter POLONIUS
834n/a
835n/a A double blessing is a double grace,
836n/a Occasion smiles upon a second leave.
837n/a
838n/aLORD POLONIUS
839n/a
840n/a Yet here, Laertes! aboard, aboard, for shame!
841n/a The wind sits in the shoulder of your sail,
842n/a And you are stay'd for. There; my blessing with thee!
843n/a And these few precepts in thy memory
844n/a See thou character. Give thy thoughts no tongue,
845n/a Nor any unproportioned thought his act.
846n/a Be thou familiar, but by no means vulgar.
847n/a Those friends thou hast, and their adoption tried,
848n/a Grapple them to thy soul with hoops of steel;
849n/a But do not dull thy palm with entertainment
850n/a Of each new-hatch'd, unfledged comrade. Beware
851n/a Of entrance to a quarrel, but being in,
852n/a Bear't that the opposed may beware of thee.
853n/a Give every man thy ear, but few thy voice;
854n/a Take each man's censure, but reserve thy judgment.
855n/a Costly thy habit as thy purse can buy,
856n/a But not express'd in fancy; rich, not gaudy;
857n/a For the apparel oft proclaims the man,
858n/a And they in France of the best rank and station
859n/a Are of a most select and generous chief in that.
860n/a Neither a borrower nor a lender be;
861n/a For loan oft loses both itself and friend,
862n/a And borrowing dulls the edge of husbandry.
863n/a This above all: to thine ownself be true,
864n/a And it must follow, as the night the day,
865n/a Thou canst not then be false to any man.
866n/a Farewell: my blessing season this in thee!
867n/a
868n/aLAERTES
869n/a
870n/a Most humbly do I take my leave, my lord.
871n/a
872n/aLORD POLONIUS
873n/a
874n/a The time invites you; go; your servants tend.
875n/a
876n/aLAERTES
877n/a
878n/a Farewell, Ophelia; and remember well
879n/a What I have said to you.
880n/a
881n/aOPHELIA
882n/a
883n/a 'Tis in my memory lock'd,
884n/a And you yourself shall keep the key of it.
885n/a
886n/aLAERTES
887n/a
888n/a Farewell.
889n/a"""
890n/a
891n/a
892n/aclass CustomInt:
893n/a def __int__(self):
894n/a return 100
895n/a
896n/a
897n/aif __name__ == "__main__":
898n/a unittest.main()