1 | n/a | import unittest |
---|
2 | n/a | from test import support |
---|
3 | n/a | import binascii |
---|
4 | n/a | import pickle |
---|
5 | n/a | import random |
---|
6 | n/a | import sys |
---|
7 | n/a | from test.support import bigmemtest, _1G, _4G |
---|
8 | n/a | |
---|
9 | n/a | zlib = support.import_module('zlib') |
---|
10 | n/a | |
---|
11 | n/a | requires_Compress_copy = unittest.skipUnless( |
---|
12 | n/a | hasattr(zlib.compressobj(), "copy"), |
---|
13 | n/a | 'requires Compress.copy()') |
---|
14 | n/a | requires_Decompress_copy = unittest.skipUnless( |
---|
15 | n/a | hasattr(zlib.decompressobj(), "copy"), |
---|
16 | n/a | 'requires Decompress.copy()') |
---|
17 | n/a | |
---|
18 | n/a | |
---|
19 | n/a | class VersionTestCase(unittest.TestCase): |
---|
20 | n/a | |
---|
21 | n/a | def test_library_version(self): |
---|
22 | n/a | # Test that the major version of the actual library in use matches the |
---|
23 | n/a | # major version that we were compiled against. We can't guarantee that |
---|
24 | n/a | # the minor versions will match (even on the machine on which the module |
---|
25 | n/a | # was compiled), and the API is stable between minor versions, so |
---|
26 | n/a | # testing only the major versions avoids spurious failures. |
---|
27 | n/a | self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) |
---|
28 | n/a | |
---|
29 | n/a | |
---|
30 | n/a | class ChecksumTestCase(unittest.TestCase): |
---|
31 | n/a | # checksum test cases |
---|
32 | n/a | def test_crc32start(self): |
---|
33 | n/a | self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) |
---|
34 | n/a | self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) |
---|
35 | n/a | |
---|
36 | n/a | def test_crc32empty(self): |
---|
37 | n/a | self.assertEqual(zlib.crc32(b"", 0), 0) |
---|
38 | n/a | self.assertEqual(zlib.crc32(b"", 1), 1) |
---|
39 | n/a | self.assertEqual(zlib.crc32(b"", 432), 432) |
---|
40 | n/a | |
---|
41 | n/a | def test_adler32start(self): |
---|
42 | n/a | self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) |
---|
43 | n/a | self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) |
---|
44 | n/a | |
---|
45 | n/a | def test_adler32empty(self): |
---|
46 | n/a | self.assertEqual(zlib.adler32(b"", 0), 0) |
---|
47 | n/a | self.assertEqual(zlib.adler32(b"", 1), 1) |
---|
48 | n/a | self.assertEqual(zlib.adler32(b"", 432), 432) |
---|
49 | n/a | |
---|
50 | n/a | def test_penguins(self): |
---|
51 | n/a | self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) |
---|
52 | n/a | self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) |
---|
53 | n/a | self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) |
---|
54 | n/a | self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) |
---|
55 | n/a | |
---|
56 | n/a | self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) |
---|
57 | n/a | self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) |
---|
58 | n/a | |
---|
59 | n/a | def test_crc32_adler32_unsigned(self): |
---|
60 | n/a | foo = b'abcdefghijklmnop' |
---|
61 | n/a | # explicitly test signed behavior |
---|
62 | n/a | self.assertEqual(zlib.crc32(foo), 2486878355) |
---|
63 | n/a | self.assertEqual(zlib.crc32(b'spam'), 1138425661) |
---|
64 | n/a | self.assertEqual(zlib.adler32(foo+foo), 3573550353) |
---|
65 | n/a | self.assertEqual(zlib.adler32(b'spam'), 72286642) |
---|
66 | n/a | |
---|
67 | n/a | def test_same_as_binascii_crc32(self): |
---|
68 | n/a | foo = b'abcdefghijklmnop' |
---|
69 | n/a | crc = 2486878355 |
---|
70 | n/a | self.assertEqual(binascii.crc32(foo), crc) |
---|
71 | n/a | self.assertEqual(zlib.crc32(foo), crc) |
---|
72 | n/a | self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) |
---|
73 | n/a | |
---|
74 | n/a | |
---|
75 | n/a | # Issue #10276 - check that inputs >=4GB are handled correctly. |
---|
76 | n/a | class ChecksumBigBufferTestCase(unittest.TestCase): |
---|
77 | n/a | |
---|
78 | n/a | @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) |
---|
79 | n/a | def test_big_buffer(self, size): |
---|
80 | n/a | data = b"nyan" * (_1G + 1) |
---|
81 | n/a | self.assertEqual(zlib.crc32(data), 1044521549) |
---|
82 | n/a | self.assertEqual(zlib.adler32(data), 2256789997) |
---|
83 | n/a | |
---|
84 | n/a | |
---|
85 | n/a | class ExceptionTestCase(unittest.TestCase): |
---|
86 | n/a | # make sure we generate some expected errors |
---|
87 | n/a | def test_badlevel(self): |
---|
88 | n/a | # specifying compression level out of range causes an error |
---|
89 | n/a | # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib |
---|
90 | n/a | # accepts 0 too) |
---|
91 | n/a | self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) |
---|
92 | n/a | |
---|
93 | n/a | def test_badargs(self): |
---|
94 | n/a | self.assertRaises(TypeError, zlib.adler32) |
---|
95 | n/a | self.assertRaises(TypeError, zlib.crc32) |
---|
96 | n/a | self.assertRaises(TypeError, zlib.compress) |
---|
97 | n/a | self.assertRaises(TypeError, zlib.decompress) |
---|
98 | n/a | for arg in (42, None, '', 'abc', (), []): |
---|
99 | n/a | self.assertRaises(TypeError, zlib.adler32, arg) |
---|
100 | n/a | self.assertRaises(TypeError, zlib.crc32, arg) |
---|
101 | n/a | self.assertRaises(TypeError, zlib.compress, arg) |
---|
102 | n/a | self.assertRaises(TypeError, zlib.decompress, arg) |
---|
103 | n/a | |
---|
104 | n/a | def test_badcompressobj(self): |
---|
105 | n/a | # verify failure on building compress object with bad params |
---|
106 | n/a | self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) |
---|
107 | n/a | # specifying total bits too large causes an error |
---|
108 | n/a | self.assertRaises(ValueError, |
---|
109 | n/a | zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) |
---|
110 | n/a | |
---|
111 | n/a | def test_baddecompressobj(self): |
---|
112 | n/a | # verify failure on building decompress object with bad params |
---|
113 | n/a | self.assertRaises(ValueError, zlib.decompressobj, -1) |
---|
114 | n/a | |
---|
115 | n/a | def test_decompressobj_badflush(self): |
---|
116 | n/a | # verify failure on calling decompressobj.flush with bad params |
---|
117 | n/a | self.assertRaises(ValueError, zlib.decompressobj().flush, 0) |
---|
118 | n/a | self.assertRaises(ValueError, zlib.decompressobj().flush, -1) |
---|
119 | n/a | |
---|
120 | n/a | @support.cpython_only |
---|
121 | n/a | def test_overflow(self): |
---|
122 | n/a | with self.assertRaisesRegex(OverflowError, 'int too large'): |
---|
123 | n/a | zlib.decompress(b'', 15, sys.maxsize + 1) |
---|
124 | n/a | with self.assertRaisesRegex(OverflowError, 'int too large'): |
---|
125 | n/a | zlib.decompressobj().decompress(b'', sys.maxsize + 1) |
---|
126 | n/a | with self.assertRaisesRegex(OverflowError, 'int too large'): |
---|
127 | n/a | zlib.decompressobj().flush(sys.maxsize + 1) |
---|
128 | n/a | |
---|
129 | n/a | |
---|
130 | n/a | class BaseCompressTestCase(object): |
---|
131 | n/a | def check_big_compress_buffer(self, size, compress_func): |
---|
132 | n/a | _1M = 1024 * 1024 |
---|
133 | n/a | # Generate 10MB worth of random, and expand it by repeating it. |
---|
134 | n/a | # The assumption is that zlib's memory is not big enough to exploit |
---|
135 | n/a | # such spread out redundancy. |
---|
136 | n/a | data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little') |
---|
137 | n/a | for i in range(10)]) |
---|
138 | n/a | data = data * (size // len(data) + 1) |
---|
139 | n/a | try: |
---|
140 | n/a | compress_func(data) |
---|
141 | n/a | finally: |
---|
142 | n/a | # Release memory |
---|
143 | n/a | data = None |
---|
144 | n/a | |
---|
145 | n/a | def check_big_decompress_buffer(self, size, decompress_func): |
---|
146 | n/a | data = b'x' * size |
---|
147 | n/a | try: |
---|
148 | n/a | compressed = zlib.compress(data, 1) |
---|
149 | n/a | finally: |
---|
150 | n/a | # Release memory |
---|
151 | n/a | data = None |
---|
152 | n/a | data = decompress_func(compressed) |
---|
153 | n/a | # Sanity check |
---|
154 | n/a | try: |
---|
155 | n/a | self.assertEqual(len(data), size) |
---|
156 | n/a | self.assertEqual(len(data.strip(b'x')), 0) |
---|
157 | n/a | finally: |
---|
158 | n/a | data = None |
---|
159 | n/a | |
---|
160 | n/a | |
---|
161 | n/a | class CompressTestCase(BaseCompressTestCase, unittest.TestCase): |
---|
162 | n/a | # Test compression in one go (whole message compression) |
---|
163 | n/a | def test_speech(self): |
---|
164 | n/a | x = zlib.compress(HAMLET_SCENE) |
---|
165 | n/a | self.assertEqual(zlib.decompress(x), HAMLET_SCENE) |
---|
166 | n/a | |
---|
167 | n/a | def test_keywords(self): |
---|
168 | n/a | x = zlib.compress(HAMLET_SCENE, level=3) |
---|
169 | n/a | self.assertEqual(zlib.decompress(x), HAMLET_SCENE) |
---|
170 | n/a | with self.assertRaises(TypeError): |
---|
171 | n/a | zlib.compress(data=HAMLET_SCENE, level=3) |
---|
172 | n/a | self.assertEqual(zlib.decompress(x, |
---|
173 | n/a | wbits=zlib.MAX_WBITS, |
---|
174 | n/a | bufsize=zlib.DEF_BUF_SIZE), |
---|
175 | n/a | HAMLET_SCENE) |
---|
176 | n/a | |
---|
177 | n/a | def test_speech128(self): |
---|
178 | n/a | # compress more data |
---|
179 | n/a | data = HAMLET_SCENE * 128 |
---|
180 | n/a | x = zlib.compress(data) |
---|
181 | n/a | self.assertEqual(zlib.compress(bytearray(data)), x) |
---|
182 | n/a | for ob in x, bytearray(x): |
---|
183 | n/a | self.assertEqual(zlib.decompress(ob), data) |
---|
184 | n/a | |
---|
185 | n/a | def test_incomplete_stream(self): |
---|
186 | n/a | # A useful error message is given |
---|
187 | n/a | x = zlib.compress(HAMLET_SCENE) |
---|
188 | n/a | self.assertRaisesRegex(zlib.error, |
---|
189 | n/a | "Error -5 while decompressing data: incomplete or truncated stream", |
---|
190 | n/a | zlib.decompress, x[:-1]) |
---|
191 | n/a | |
---|
192 | n/a | # Memory use of the following functions takes into account overallocation |
---|
193 | n/a | |
---|
194 | n/a | @bigmemtest(size=_1G + 1024 * 1024, memuse=3) |
---|
195 | n/a | def test_big_compress_buffer(self, size): |
---|
196 | n/a | compress = lambda s: zlib.compress(s, 1) |
---|
197 | n/a | self.check_big_compress_buffer(size, compress) |
---|
198 | n/a | |
---|
199 | n/a | @bigmemtest(size=_1G + 1024 * 1024, memuse=2) |
---|
200 | n/a | def test_big_decompress_buffer(self, size): |
---|
201 | n/a | self.check_big_decompress_buffer(size, zlib.decompress) |
---|
202 | n/a | |
---|
203 | n/a | @bigmemtest(size=_4G, memuse=1) |
---|
204 | n/a | def test_large_bufsize(self, size): |
---|
205 | n/a | # Test decompress(bufsize) parameter greater than the internal limit |
---|
206 | n/a | data = HAMLET_SCENE * 10 |
---|
207 | n/a | compressed = zlib.compress(data, 1) |
---|
208 | n/a | self.assertEqual(zlib.decompress(compressed, 15, size), data) |
---|
209 | n/a | |
---|
210 | n/a | def test_custom_bufsize(self): |
---|
211 | n/a | data = HAMLET_SCENE * 10 |
---|
212 | n/a | compressed = zlib.compress(data, 1) |
---|
213 | n/a | self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) |
---|
214 | n/a | |
---|
215 | n/a | @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') |
---|
216 | n/a | @bigmemtest(size=_4G + 100, memuse=4) |
---|
217 | n/a | def test_64bit_compress(self, size): |
---|
218 | n/a | data = b'x' * size |
---|
219 | n/a | try: |
---|
220 | n/a | comp = zlib.compress(data, 0) |
---|
221 | n/a | self.assertEqual(zlib.decompress(comp), data) |
---|
222 | n/a | finally: |
---|
223 | n/a | comp = data = None |
---|
224 | n/a | |
---|
225 | n/a | |
---|
226 | n/a | class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): |
---|
227 | n/a | # Test compression object |
---|
228 | n/a | def test_pair(self): |
---|
229 | n/a | # straightforward compress/decompress objects |
---|
230 | n/a | datasrc = HAMLET_SCENE * 128 |
---|
231 | n/a | datazip = zlib.compress(datasrc) |
---|
232 | n/a | # should compress both bytes and bytearray data |
---|
233 | n/a | for data in (datasrc, bytearray(datasrc)): |
---|
234 | n/a | co = zlib.compressobj() |
---|
235 | n/a | x1 = co.compress(data) |
---|
236 | n/a | x2 = co.flush() |
---|
237 | n/a | self.assertRaises(zlib.error, co.flush) # second flush should not work |
---|
238 | n/a | self.assertEqual(x1 + x2, datazip) |
---|
239 | n/a | for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): |
---|
240 | n/a | dco = zlib.decompressobj() |
---|
241 | n/a | y1 = dco.decompress(v1 + v2) |
---|
242 | n/a | y2 = dco.flush() |
---|
243 | n/a | self.assertEqual(data, y1 + y2) |
---|
244 | n/a | self.assertIsInstance(dco.unconsumed_tail, bytes) |
---|
245 | n/a | self.assertIsInstance(dco.unused_data, bytes) |
---|
246 | n/a | |
---|
247 | n/a | def test_keywords(self): |
---|
248 | n/a | level = 2 |
---|
249 | n/a | method = zlib.DEFLATED |
---|
250 | n/a | wbits = -12 |
---|
251 | n/a | memLevel = 9 |
---|
252 | n/a | strategy = zlib.Z_FILTERED |
---|
253 | n/a | co = zlib.compressobj(level=level, |
---|
254 | n/a | method=method, |
---|
255 | n/a | wbits=wbits, |
---|
256 | n/a | memLevel=memLevel, |
---|
257 | n/a | strategy=strategy, |
---|
258 | n/a | zdict=b"") |
---|
259 | n/a | do = zlib.decompressobj(wbits=wbits, zdict=b"") |
---|
260 | n/a | with self.assertRaises(TypeError): |
---|
261 | n/a | co.compress(data=HAMLET_SCENE) |
---|
262 | n/a | with self.assertRaises(TypeError): |
---|
263 | n/a | do.decompress(data=zlib.compress(HAMLET_SCENE)) |
---|
264 | n/a | x = co.compress(HAMLET_SCENE) + co.flush() |
---|
265 | n/a | y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() |
---|
266 | n/a | self.assertEqual(HAMLET_SCENE, y) |
---|
267 | n/a | |
---|
268 | n/a | def test_compressoptions(self): |
---|
269 | n/a | # specify lots of options to compressobj() |
---|
270 | n/a | level = 2 |
---|
271 | n/a | method = zlib.DEFLATED |
---|
272 | n/a | wbits = -12 |
---|
273 | n/a | memLevel = 9 |
---|
274 | n/a | strategy = zlib.Z_FILTERED |
---|
275 | n/a | co = zlib.compressobj(level, method, wbits, memLevel, strategy) |
---|
276 | n/a | x1 = co.compress(HAMLET_SCENE) |
---|
277 | n/a | x2 = co.flush() |
---|
278 | n/a | dco = zlib.decompressobj(wbits) |
---|
279 | n/a | y1 = dco.decompress(x1 + x2) |
---|
280 | n/a | y2 = dco.flush() |
---|
281 | n/a | self.assertEqual(HAMLET_SCENE, y1 + y2) |
---|
282 | n/a | |
---|
283 | n/a | def test_compressincremental(self): |
---|
284 | n/a | # compress object in steps, decompress object as one-shot |
---|
285 | n/a | data = HAMLET_SCENE * 128 |
---|
286 | n/a | co = zlib.compressobj() |
---|
287 | n/a | bufs = [] |
---|
288 | n/a | for i in range(0, len(data), 256): |
---|
289 | n/a | bufs.append(co.compress(data[i:i+256])) |
---|
290 | n/a | bufs.append(co.flush()) |
---|
291 | n/a | combuf = b''.join(bufs) |
---|
292 | n/a | |
---|
293 | n/a | dco = zlib.decompressobj() |
---|
294 | n/a | y1 = dco.decompress(b''.join(bufs)) |
---|
295 | n/a | y2 = dco.flush() |
---|
296 | n/a | self.assertEqual(data, y1 + y2) |
---|
297 | n/a | |
---|
298 | n/a | def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): |
---|
299 | n/a | # compress object in steps, decompress object in steps |
---|
300 | n/a | source = source or HAMLET_SCENE |
---|
301 | n/a | data = source * 128 |
---|
302 | n/a | co = zlib.compressobj() |
---|
303 | n/a | bufs = [] |
---|
304 | n/a | for i in range(0, len(data), cx): |
---|
305 | n/a | bufs.append(co.compress(data[i:i+cx])) |
---|
306 | n/a | bufs.append(co.flush()) |
---|
307 | n/a | combuf = b''.join(bufs) |
---|
308 | n/a | |
---|
309 | n/a | decombuf = zlib.decompress(combuf) |
---|
310 | n/a | # Test type of return value |
---|
311 | n/a | self.assertIsInstance(decombuf, bytes) |
---|
312 | n/a | |
---|
313 | n/a | self.assertEqual(data, decombuf) |
---|
314 | n/a | |
---|
315 | n/a | dco = zlib.decompressobj() |
---|
316 | n/a | bufs = [] |
---|
317 | n/a | for i in range(0, len(combuf), dcx): |
---|
318 | n/a | bufs.append(dco.decompress(combuf[i:i+dcx])) |
---|
319 | n/a | self.assertEqual(b'', dco.unconsumed_tail, ######## |
---|
320 | n/a | "(A) uct should be b'': not %d long" % |
---|
321 | n/a | len(dco.unconsumed_tail)) |
---|
322 | n/a | self.assertEqual(b'', dco.unused_data) |
---|
323 | n/a | if flush: |
---|
324 | n/a | bufs.append(dco.flush()) |
---|
325 | n/a | else: |
---|
326 | n/a | while True: |
---|
327 | n/a | chunk = dco.decompress(b'') |
---|
328 | n/a | if chunk: |
---|
329 | n/a | bufs.append(chunk) |
---|
330 | n/a | else: |
---|
331 | n/a | break |
---|
332 | n/a | self.assertEqual(b'', dco.unconsumed_tail, ######## |
---|
333 | n/a | "(B) uct should be b'': not %d long" % |
---|
334 | n/a | len(dco.unconsumed_tail)) |
---|
335 | n/a | self.assertEqual(b'', dco.unused_data) |
---|
336 | n/a | self.assertEqual(data, b''.join(bufs)) |
---|
337 | n/a | # Failure means: "decompressobj with init options failed" |
---|
338 | n/a | |
---|
339 | n/a | def test_decompincflush(self): |
---|
340 | n/a | self.test_decompinc(flush=True) |
---|
341 | n/a | |
---|
342 | n/a | def test_decompimax(self, source=None, cx=256, dcx=64): |
---|
343 | n/a | # compress in steps, decompress in length-restricted steps |
---|
344 | n/a | source = source or HAMLET_SCENE |
---|
345 | n/a | # Check a decompression object with max_length specified |
---|
346 | n/a | data = source * 128 |
---|
347 | n/a | co = zlib.compressobj() |
---|
348 | n/a | bufs = [] |
---|
349 | n/a | for i in range(0, len(data), cx): |
---|
350 | n/a | bufs.append(co.compress(data[i:i+cx])) |
---|
351 | n/a | bufs.append(co.flush()) |
---|
352 | n/a | combuf = b''.join(bufs) |
---|
353 | n/a | self.assertEqual(data, zlib.decompress(combuf), |
---|
354 | n/a | 'compressed data failure') |
---|
355 | n/a | |
---|
356 | n/a | dco = zlib.decompressobj() |
---|
357 | n/a | bufs = [] |
---|
358 | n/a | cb = combuf |
---|
359 | n/a | while cb: |
---|
360 | n/a | #max_length = 1 + len(cb)//10 |
---|
361 | n/a | chunk = dco.decompress(cb, dcx) |
---|
362 | n/a | self.assertFalse(len(chunk) > dcx, |
---|
363 | n/a | 'chunk too big (%d>%d)' % (len(chunk), dcx)) |
---|
364 | n/a | bufs.append(chunk) |
---|
365 | n/a | cb = dco.unconsumed_tail |
---|
366 | n/a | bufs.append(dco.flush()) |
---|
367 | n/a | self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') |
---|
368 | n/a | |
---|
369 | n/a | def test_decompressmaxlen(self, flush=False): |
---|
370 | n/a | # Check a decompression object with max_length specified |
---|
371 | n/a | data = HAMLET_SCENE * 128 |
---|
372 | n/a | co = zlib.compressobj() |
---|
373 | n/a | bufs = [] |
---|
374 | n/a | for i in range(0, len(data), 256): |
---|
375 | n/a | bufs.append(co.compress(data[i:i+256])) |
---|
376 | n/a | bufs.append(co.flush()) |
---|
377 | n/a | combuf = b''.join(bufs) |
---|
378 | n/a | self.assertEqual(data, zlib.decompress(combuf), |
---|
379 | n/a | 'compressed data failure') |
---|
380 | n/a | |
---|
381 | n/a | dco = zlib.decompressobj() |
---|
382 | n/a | bufs = [] |
---|
383 | n/a | cb = combuf |
---|
384 | n/a | while cb: |
---|
385 | n/a | max_length = 1 + len(cb)//10 |
---|
386 | n/a | chunk = dco.decompress(cb, max_length) |
---|
387 | n/a | self.assertFalse(len(chunk) > max_length, |
---|
388 | n/a | 'chunk too big (%d>%d)' % (len(chunk),max_length)) |
---|
389 | n/a | bufs.append(chunk) |
---|
390 | n/a | cb = dco.unconsumed_tail |
---|
391 | n/a | if flush: |
---|
392 | n/a | bufs.append(dco.flush()) |
---|
393 | n/a | else: |
---|
394 | n/a | while chunk: |
---|
395 | n/a | chunk = dco.decompress(b'', max_length) |
---|
396 | n/a | self.assertFalse(len(chunk) > max_length, |
---|
397 | n/a | 'chunk too big (%d>%d)' % (len(chunk),max_length)) |
---|
398 | n/a | bufs.append(chunk) |
---|
399 | n/a | self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') |
---|
400 | n/a | |
---|
401 | n/a | def test_decompressmaxlenflush(self): |
---|
402 | n/a | self.test_decompressmaxlen(flush=True) |
---|
403 | n/a | |
---|
404 | n/a | def test_maxlenmisc(self): |
---|
405 | n/a | # Misc tests of max_length |
---|
406 | n/a | dco = zlib.decompressobj() |
---|
407 | n/a | self.assertRaises(ValueError, dco.decompress, b"", -1) |
---|
408 | n/a | self.assertEqual(b'', dco.unconsumed_tail) |
---|
409 | n/a | |
---|
410 | n/a | def test_maxlen_large(self): |
---|
411 | n/a | # Sizes up to sys.maxsize should be accepted, although zlib is |
---|
412 | n/a | # internally limited to expressing sizes with unsigned int |
---|
413 | n/a | data = HAMLET_SCENE * 10 |
---|
414 | n/a | self.assertGreater(len(data), zlib.DEF_BUF_SIZE) |
---|
415 | n/a | compressed = zlib.compress(data, 1) |
---|
416 | n/a | dco = zlib.decompressobj() |
---|
417 | n/a | self.assertEqual(dco.decompress(compressed, sys.maxsize), data) |
---|
418 | n/a | |
---|
419 | n/a | def test_maxlen_custom(self): |
---|
420 | n/a | data = HAMLET_SCENE * 10 |
---|
421 | n/a | compressed = zlib.compress(data, 1) |
---|
422 | n/a | dco = zlib.decompressobj() |
---|
423 | n/a | self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) |
---|
424 | n/a | |
---|
425 | n/a | def test_clear_unconsumed_tail(self): |
---|
426 | n/a | # Issue #12050: calling decompress() without providing max_length |
---|
427 | n/a | # should clear the unconsumed_tail attribute. |
---|
428 | n/a | cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" |
---|
429 | n/a | dco = zlib.decompressobj() |
---|
430 | n/a | ddata = dco.decompress(cdata, 1) |
---|
431 | n/a | ddata += dco.decompress(dco.unconsumed_tail) |
---|
432 | n/a | self.assertEqual(dco.unconsumed_tail, b"") |
---|
433 | n/a | |
---|
434 | n/a | def test_flushes(self): |
---|
435 | n/a | # Test flush() with the various options, using all the |
---|
436 | n/a | # different levels in order to provide more variations. |
---|
437 | n/a | sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] |
---|
438 | n/a | sync_opt = [getattr(zlib, opt) for opt in sync_opt |
---|
439 | n/a | if hasattr(zlib, opt)] |
---|
440 | n/a | data = HAMLET_SCENE * 8 |
---|
441 | n/a | |
---|
442 | n/a | for sync in sync_opt: |
---|
443 | n/a | for level in range(10): |
---|
444 | n/a | obj = zlib.compressobj( level ) |
---|
445 | n/a | a = obj.compress( data[:3000] ) |
---|
446 | n/a | b = obj.flush( sync ) |
---|
447 | n/a | c = obj.compress( data[3000:] ) |
---|
448 | n/a | d = obj.flush() |
---|
449 | n/a | self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), |
---|
450 | n/a | data, ("Decompress failed: flush " |
---|
451 | n/a | "mode=%i, level=%i") % (sync, level)) |
---|
452 | n/a | del obj |
---|
453 | n/a | |
---|
454 | n/a | @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), |
---|
455 | n/a | 'requires zlib.Z_SYNC_FLUSH') |
---|
456 | n/a | def test_odd_flush(self): |
---|
457 | n/a | # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 |
---|
458 | n/a | import random |
---|
459 | n/a | # Testing on 17K of "random" data |
---|
460 | n/a | |
---|
461 | n/a | # Create compressor and decompressor objects |
---|
462 | n/a | co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) |
---|
463 | n/a | dco = zlib.decompressobj() |
---|
464 | n/a | |
---|
465 | n/a | # Try 17K of data |
---|
466 | n/a | # generate random data stream |
---|
467 | n/a | try: |
---|
468 | n/a | # In 2.3 and later, WichmannHill is the RNG of the bug report |
---|
469 | n/a | gen = random.WichmannHill() |
---|
470 | n/a | except AttributeError: |
---|
471 | n/a | try: |
---|
472 | n/a | # 2.2 called it Random |
---|
473 | n/a | gen = random.Random() |
---|
474 | n/a | except AttributeError: |
---|
475 | n/a | # others might simply have a single RNG |
---|
476 | n/a | gen = random |
---|
477 | n/a | gen.seed(1) |
---|
478 | n/a | data = genblock(1, 17 * 1024, generator=gen) |
---|
479 | n/a | |
---|
480 | n/a | # compress, sync-flush, and decompress |
---|
481 | n/a | first = co.compress(data) |
---|
482 | n/a | second = co.flush(zlib.Z_SYNC_FLUSH) |
---|
483 | n/a | expanded = dco.decompress(first + second) |
---|
484 | n/a | |
---|
485 | n/a | # if decompressed data is different from the input data, choke. |
---|
486 | n/a | self.assertEqual(expanded, data, "17K random source doesn't match") |
---|
487 | n/a | |
---|
488 | n/a | def test_empty_flush(self): |
---|
489 | n/a | # Test that calling .flush() on unused objects works. |
---|
490 | n/a | # (Bug #1083110 -- calling .flush() on decompress objects |
---|
491 | n/a | # caused a core dump.) |
---|
492 | n/a | |
---|
493 | n/a | co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) |
---|
494 | n/a | self.assertTrue(co.flush()) # Returns a zlib header |
---|
495 | n/a | dco = zlib.decompressobj() |
---|
496 | n/a | self.assertEqual(dco.flush(), b"") # Returns nothing |
---|
497 | n/a | |
---|
498 | n/a | def test_dictionary(self): |
---|
499 | n/a | h = HAMLET_SCENE |
---|
500 | n/a | # Build a simulated dictionary out of the words in HAMLET. |
---|
501 | n/a | words = h.split() |
---|
502 | n/a | random.shuffle(words) |
---|
503 | n/a | zdict = b''.join(words) |
---|
504 | n/a | # Use it to compress HAMLET. |
---|
505 | n/a | co = zlib.compressobj(zdict=zdict) |
---|
506 | n/a | cd = co.compress(h) + co.flush() |
---|
507 | n/a | # Verify that it will decompress with the dictionary. |
---|
508 | n/a | dco = zlib.decompressobj(zdict=zdict) |
---|
509 | n/a | self.assertEqual(dco.decompress(cd) + dco.flush(), h) |
---|
510 | n/a | # Verify that it fails when not given the dictionary. |
---|
511 | n/a | dco = zlib.decompressobj() |
---|
512 | n/a | self.assertRaises(zlib.error, dco.decompress, cd) |
---|
513 | n/a | |
---|
514 | n/a | def test_dictionary_streaming(self): |
---|
515 | n/a | # This simulates the reuse of a compressor object for compressing |
---|
516 | n/a | # several separate data streams. |
---|
517 | n/a | co = zlib.compressobj(zdict=HAMLET_SCENE) |
---|
518 | n/a | do = zlib.decompressobj(zdict=HAMLET_SCENE) |
---|
519 | n/a | piece = HAMLET_SCENE[1000:1500] |
---|
520 | n/a | d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) |
---|
521 | n/a | d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) |
---|
522 | n/a | d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) |
---|
523 | n/a | self.assertEqual(do.decompress(d0), piece) |
---|
524 | n/a | self.assertEqual(do.decompress(d1), piece[100:]) |
---|
525 | n/a | self.assertEqual(do.decompress(d2), piece[:-100]) |
---|
526 | n/a | |
---|
527 | n/a | def test_decompress_incomplete_stream(self): |
---|
528 | n/a | # This is 'foo', deflated |
---|
529 | n/a | x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' |
---|
530 | n/a | # For the record |
---|
531 | n/a | self.assertEqual(zlib.decompress(x), b'foo') |
---|
532 | n/a | self.assertRaises(zlib.error, zlib.decompress, x[:-5]) |
---|
533 | n/a | # Omitting the stream end works with decompressor objects |
---|
534 | n/a | # (see issue #8672). |
---|
535 | n/a | dco = zlib.decompressobj() |
---|
536 | n/a | y = dco.decompress(x[:-5]) |
---|
537 | n/a | y += dco.flush() |
---|
538 | n/a | self.assertEqual(y, b'foo') |
---|
539 | n/a | |
---|
540 | n/a | def test_decompress_eof(self): |
---|
541 | n/a | x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' |
---|
542 | n/a | dco = zlib.decompressobj() |
---|
543 | n/a | self.assertFalse(dco.eof) |
---|
544 | n/a | dco.decompress(x[:-5]) |
---|
545 | n/a | self.assertFalse(dco.eof) |
---|
546 | n/a | dco.decompress(x[-5:]) |
---|
547 | n/a | self.assertTrue(dco.eof) |
---|
548 | n/a | dco.flush() |
---|
549 | n/a | self.assertTrue(dco.eof) |
---|
550 | n/a | |
---|
551 | n/a | def test_decompress_eof_incomplete_stream(self): |
---|
552 | n/a | x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' |
---|
553 | n/a | dco = zlib.decompressobj() |
---|
554 | n/a | self.assertFalse(dco.eof) |
---|
555 | n/a | dco.decompress(x[:-5]) |
---|
556 | n/a | self.assertFalse(dco.eof) |
---|
557 | n/a | dco.flush() |
---|
558 | n/a | self.assertFalse(dco.eof) |
---|
559 | n/a | |
---|
560 | n/a | def test_decompress_unused_data(self): |
---|
561 | n/a | # Repeated calls to decompress() after EOF should accumulate data in |
---|
562 | n/a | # dco.unused_data, instead of just storing the arg to the last call. |
---|
563 | n/a | source = b'abcdefghijklmnopqrstuvwxyz' |
---|
564 | n/a | remainder = b'0123456789' |
---|
565 | n/a | y = zlib.compress(source) |
---|
566 | n/a | x = y + remainder |
---|
567 | n/a | for maxlen in 0, 1000: |
---|
568 | n/a | for step in 1, 2, len(y), len(x): |
---|
569 | n/a | dco = zlib.decompressobj() |
---|
570 | n/a | data = b'' |
---|
571 | n/a | for i in range(0, len(x), step): |
---|
572 | n/a | if i < len(y): |
---|
573 | n/a | self.assertEqual(dco.unused_data, b'') |
---|
574 | n/a | if maxlen == 0: |
---|
575 | n/a | data += dco.decompress(x[i : i + step]) |
---|
576 | n/a | self.assertEqual(dco.unconsumed_tail, b'') |
---|
577 | n/a | else: |
---|
578 | n/a | data += dco.decompress( |
---|
579 | n/a | dco.unconsumed_tail + x[i : i + step], maxlen) |
---|
580 | n/a | data += dco.flush() |
---|
581 | n/a | self.assertTrue(dco.eof) |
---|
582 | n/a | self.assertEqual(data, source) |
---|
583 | n/a | self.assertEqual(dco.unconsumed_tail, b'') |
---|
584 | n/a | self.assertEqual(dco.unused_data, remainder) |
---|
585 | n/a | |
---|
586 | n/a | # issue27164 |
---|
587 | n/a | def test_decompress_raw_with_dictionary(self): |
---|
588 | n/a | zdict = b'abcdefghijklmnopqrstuvwxyz' |
---|
589 | n/a | co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) |
---|
590 | n/a | comp = co.compress(zdict) + co.flush() |
---|
591 | n/a | dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) |
---|
592 | n/a | uncomp = dco.decompress(comp) + dco.flush() |
---|
593 | n/a | self.assertEqual(zdict, uncomp) |
---|
594 | n/a | |
---|
595 | n/a | def test_flush_with_freed_input(self): |
---|
596 | n/a | # Issue #16411: decompressor accesses input to last decompress() call |
---|
597 | n/a | # in flush(), even if this object has been freed in the meanwhile. |
---|
598 | n/a | input1 = b'abcdefghijklmnopqrstuvwxyz' |
---|
599 | n/a | input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' |
---|
600 | n/a | data = zlib.compress(input1) |
---|
601 | n/a | dco = zlib.decompressobj() |
---|
602 | n/a | dco.decompress(data, 1) |
---|
603 | n/a | del data |
---|
604 | n/a | data = zlib.compress(input2) |
---|
605 | n/a | self.assertEqual(dco.flush(), input1[1:]) |
---|
606 | n/a | |
---|
607 | n/a | @bigmemtest(size=_4G, memuse=1) |
---|
608 | n/a | def test_flush_large_length(self, size): |
---|
609 | n/a | # Test flush(length) parameter greater than internal limit UINT_MAX |
---|
610 | n/a | input = HAMLET_SCENE * 10 |
---|
611 | n/a | data = zlib.compress(input, 1) |
---|
612 | n/a | dco = zlib.decompressobj() |
---|
613 | n/a | dco.decompress(data, 1) |
---|
614 | n/a | self.assertEqual(dco.flush(size), input[1:]) |
---|
615 | n/a | |
---|
616 | n/a | def test_flush_custom_length(self): |
---|
617 | n/a | input = HAMLET_SCENE * 10 |
---|
618 | n/a | data = zlib.compress(input, 1) |
---|
619 | n/a | dco = zlib.decompressobj() |
---|
620 | n/a | dco.decompress(data, 1) |
---|
621 | n/a | self.assertEqual(dco.flush(CustomInt()), input[1:]) |
---|
622 | n/a | |
---|
623 | n/a | @requires_Compress_copy |
---|
624 | n/a | def test_compresscopy(self): |
---|
625 | n/a | # Test copying a compression object |
---|
626 | n/a | data0 = HAMLET_SCENE |
---|
627 | n/a | data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") |
---|
628 | n/a | c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) |
---|
629 | n/a | bufs0 = [] |
---|
630 | n/a | bufs0.append(c0.compress(data0)) |
---|
631 | n/a | |
---|
632 | n/a | c1 = c0.copy() |
---|
633 | n/a | bufs1 = bufs0[:] |
---|
634 | n/a | |
---|
635 | n/a | bufs0.append(c0.compress(data0)) |
---|
636 | n/a | bufs0.append(c0.flush()) |
---|
637 | n/a | s0 = b''.join(bufs0) |
---|
638 | n/a | |
---|
639 | n/a | bufs1.append(c1.compress(data1)) |
---|
640 | n/a | bufs1.append(c1.flush()) |
---|
641 | n/a | s1 = b''.join(bufs1) |
---|
642 | n/a | |
---|
643 | n/a | self.assertEqual(zlib.decompress(s0),data0+data0) |
---|
644 | n/a | self.assertEqual(zlib.decompress(s1),data0+data1) |
---|
645 | n/a | |
---|
646 | n/a | @requires_Compress_copy |
---|
647 | n/a | def test_badcompresscopy(self): |
---|
648 | n/a | # Test copying a compression object in an inconsistent state |
---|
649 | n/a | c = zlib.compressobj() |
---|
650 | n/a | c.compress(HAMLET_SCENE) |
---|
651 | n/a | c.flush() |
---|
652 | n/a | self.assertRaises(ValueError, c.copy) |
---|
653 | n/a | |
---|
654 | n/a | @requires_Decompress_copy |
---|
655 | n/a | def test_decompresscopy(self): |
---|
656 | n/a | # Test copying a decompression object |
---|
657 | n/a | data = HAMLET_SCENE |
---|
658 | n/a | comp = zlib.compress(data) |
---|
659 | n/a | # Test type of return value |
---|
660 | n/a | self.assertIsInstance(comp, bytes) |
---|
661 | n/a | |
---|
662 | n/a | d0 = zlib.decompressobj() |
---|
663 | n/a | bufs0 = [] |
---|
664 | n/a | bufs0.append(d0.decompress(comp[:32])) |
---|
665 | n/a | |
---|
666 | n/a | d1 = d0.copy() |
---|
667 | n/a | bufs1 = bufs0[:] |
---|
668 | n/a | |
---|
669 | n/a | bufs0.append(d0.decompress(comp[32:])) |
---|
670 | n/a | s0 = b''.join(bufs0) |
---|
671 | n/a | |
---|
672 | n/a | bufs1.append(d1.decompress(comp[32:])) |
---|
673 | n/a | s1 = b''.join(bufs1) |
---|
674 | n/a | |
---|
675 | n/a | self.assertEqual(s0,s1) |
---|
676 | n/a | self.assertEqual(s0,data) |
---|
677 | n/a | |
---|
678 | n/a | @requires_Decompress_copy |
---|
679 | n/a | def test_baddecompresscopy(self): |
---|
680 | n/a | # Test copying a compression object in an inconsistent state |
---|
681 | n/a | data = zlib.compress(HAMLET_SCENE) |
---|
682 | n/a | d = zlib.decompressobj() |
---|
683 | n/a | d.decompress(data) |
---|
684 | n/a | d.flush() |
---|
685 | n/a | self.assertRaises(ValueError, d.copy) |
---|
686 | n/a | |
---|
687 | n/a | def test_compresspickle(self): |
---|
688 | n/a | for proto in range(pickle.HIGHEST_PROTOCOL + 1): |
---|
689 | n/a | with self.assertRaises((TypeError, pickle.PicklingError)): |
---|
690 | n/a | pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) |
---|
691 | n/a | |
---|
692 | n/a | def test_decompresspickle(self): |
---|
693 | n/a | for proto in range(pickle.HIGHEST_PROTOCOL + 1): |
---|
694 | n/a | with self.assertRaises((TypeError, pickle.PicklingError)): |
---|
695 | n/a | pickle.dumps(zlib.decompressobj(), proto) |
---|
696 | n/a | |
---|
697 | n/a | # Memory use of the following functions takes into account overallocation |
---|
698 | n/a | |
---|
699 | n/a | @bigmemtest(size=_1G + 1024 * 1024, memuse=3) |
---|
700 | n/a | def test_big_compress_buffer(self, size): |
---|
701 | n/a | c = zlib.compressobj(1) |
---|
702 | n/a | compress = lambda s: c.compress(s) + c.flush() |
---|
703 | n/a | self.check_big_compress_buffer(size, compress) |
---|
704 | n/a | |
---|
705 | n/a | @bigmemtest(size=_1G + 1024 * 1024, memuse=2) |
---|
706 | n/a | def test_big_decompress_buffer(self, size): |
---|
707 | n/a | d = zlib.decompressobj() |
---|
708 | n/a | decompress = lambda s: d.decompress(s) + d.flush() |
---|
709 | n/a | self.check_big_decompress_buffer(size, decompress) |
---|
710 | n/a | |
---|
711 | n/a | @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') |
---|
712 | n/a | @bigmemtest(size=_4G + 100, memuse=4) |
---|
713 | n/a | def test_64bit_compress(self, size): |
---|
714 | n/a | data = b'x' * size |
---|
715 | n/a | co = zlib.compressobj(0) |
---|
716 | n/a | do = zlib.decompressobj() |
---|
717 | n/a | try: |
---|
718 | n/a | comp = co.compress(data) + co.flush() |
---|
719 | n/a | uncomp = do.decompress(comp) + do.flush() |
---|
720 | n/a | self.assertEqual(uncomp, data) |
---|
721 | n/a | finally: |
---|
722 | n/a | comp = uncomp = data = None |
---|
723 | n/a | |
---|
724 | n/a | @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') |
---|
725 | n/a | @bigmemtest(size=_4G + 100, memuse=3) |
---|
726 | n/a | def test_large_unused_data(self, size): |
---|
727 | n/a | data = b'abcdefghijklmnop' |
---|
728 | n/a | unused = b'x' * size |
---|
729 | n/a | comp = zlib.compress(data) + unused |
---|
730 | n/a | do = zlib.decompressobj() |
---|
731 | n/a | try: |
---|
732 | n/a | uncomp = do.decompress(comp) + do.flush() |
---|
733 | n/a | self.assertEqual(unused, do.unused_data) |
---|
734 | n/a | self.assertEqual(uncomp, data) |
---|
735 | n/a | finally: |
---|
736 | n/a | unused = comp = do = None |
---|
737 | n/a | |
---|
738 | n/a | @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') |
---|
739 | n/a | @bigmemtest(size=_4G + 100, memuse=5) |
---|
740 | n/a | def test_large_unconsumed_tail(self, size): |
---|
741 | n/a | data = b'x' * size |
---|
742 | n/a | do = zlib.decompressobj() |
---|
743 | n/a | try: |
---|
744 | n/a | comp = zlib.compress(data, 0) |
---|
745 | n/a | uncomp = do.decompress(comp, 1) + do.flush() |
---|
746 | n/a | self.assertEqual(uncomp, data) |
---|
747 | n/a | self.assertEqual(do.unconsumed_tail, b'') |
---|
748 | n/a | finally: |
---|
749 | n/a | comp = uncomp = data = None |
---|
750 | n/a | |
---|
751 | n/a | def test_wbits(self): |
---|
752 | n/a | # wbits=0 only supported since zlib v1.2.3.5 |
---|
753 | n/a | # Register "1.2.3" as "1.2.3.0" |
---|
754 | n/a | v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4) |
---|
755 | n/a | supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \ |
---|
756 | n/a | and (int(v[1]) > 2 or int(v[1]) == 2 |
---|
757 | n/a | and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5)) |
---|
758 | n/a | |
---|
759 | n/a | co = zlib.compressobj(level=1, wbits=15) |
---|
760 | n/a | zlib15 = co.compress(HAMLET_SCENE) + co.flush() |
---|
761 | n/a | self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) |
---|
762 | n/a | if supports_wbits_0: |
---|
763 | n/a | self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) |
---|
764 | n/a | self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) |
---|
765 | n/a | with self.assertRaisesRegex(zlib.error, 'invalid window size'): |
---|
766 | n/a | zlib.decompress(zlib15, 14) |
---|
767 | n/a | dco = zlib.decompressobj(wbits=32 + 15) |
---|
768 | n/a | self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) |
---|
769 | n/a | dco = zlib.decompressobj(wbits=14) |
---|
770 | n/a | with self.assertRaisesRegex(zlib.error, 'invalid window size'): |
---|
771 | n/a | dco.decompress(zlib15) |
---|
772 | n/a | |
---|
773 | n/a | co = zlib.compressobj(level=1, wbits=9) |
---|
774 | n/a | zlib9 = co.compress(HAMLET_SCENE) + co.flush() |
---|
775 | n/a | self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) |
---|
776 | n/a | self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) |
---|
777 | n/a | if supports_wbits_0: |
---|
778 | n/a | self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) |
---|
779 | n/a | self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) |
---|
780 | n/a | dco = zlib.decompressobj(wbits=32 + 9) |
---|
781 | n/a | self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) |
---|
782 | n/a | |
---|
783 | n/a | co = zlib.compressobj(level=1, wbits=-15) |
---|
784 | n/a | deflate15 = co.compress(HAMLET_SCENE) + co.flush() |
---|
785 | n/a | self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) |
---|
786 | n/a | dco = zlib.decompressobj(wbits=-15) |
---|
787 | n/a | self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) |
---|
788 | n/a | |
---|
789 | n/a | co = zlib.compressobj(level=1, wbits=-9) |
---|
790 | n/a | deflate9 = co.compress(HAMLET_SCENE) + co.flush() |
---|
791 | n/a | self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) |
---|
792 | n/a | self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) |
---|
793 | n/a | dco = zlib.decompressobj(wbits=-9) |
---|
794 | n/a | self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) |
---|
795 | n/a | |
---|
796 | n/a | co = zlib.compressobj(level=1, wbits=16 + 15) |
---|
797 | n/a | gzip = co.compress(HAMLET_SCENE) + co.flush() |
---|
798 | n/a | self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) |
---|
799 | n/a | self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) |
---|
800 | n/a | dco = zlib.decompressobj(32 + 15) |
---|
801 | n/a | self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) |
---|
802 | n/a | |
---|
803 | n/a | |
---|
804 | n/a | def genblock(seed, length, step=1024, generator=random): |
---|
805 | n/a | """length-byte stream of random data from a seed (in step-byte blocks).""" |
---|
806 | n/a | if seed is not None: |
---|
807 | n/a | generator.seed(seed) |
---|
808 | n/a | randint = generator.randint |
---|
809 | n/a | if length < step or step < 2: |
---|
810 | n/a | step = length |
---|
811 | n/a | blocks = bytes() |
---|
812 | n/a | for i in range(0, length, step): |
---|
813 | n/a | blocks += bytes(randint(0, 255) for x in range(step)) |
---|
814 | n/a | return blocks |
---|
815 | n/a | |
---|
816 | n/a | |
---|
817 | n/a | |
---|
818 | n/a | def choose_lines(source, number, seed=None, generator=random): |
---|
819 | n/a | """Return a list of number lines randomly chosen from the source""" |
---|
820 | n/a | if seed is not None: |
---|
821 | n/a | generator.seed(seed) |
---|
822 | n/a | sources = source.split('\n') |
---|
823 | n/a | return [generator.choice(sources) for n in range(number)] |
---|
824 | n/a | |
---|
825 | n/a | |
---|
826 | n/a | |
---|
827 | n/a | HAMLET_SCENE = b""" |
---|
828 | n/a | LAERTES |
---|
829 | n/a | |
---|
830 | n/a | O, fear me not. |
---|
831 | n/a | I stay too long: but here my father comes. |
---|
832 | n/a | |
---|
833 | n/a | Enter POLONIUS |
---|
834 | n/a | |
---|
835 | n/a | A double blessing is a double grace, |
---|
836 | n/a | Occasion smiles upon a second leave. |
---|
837 | n/a | |
---|
838 | n/a | LORD POLONIUS |
---|
839 | n/a | |
---|
840 | n/a | Yet here, Laertes! aboard, aboard, for shame! |
---|
841 | n/a | The wind sits in the shoulder of your sail, |
---|
842 | n/a | And you are stay'd for. There; my blessing with thee! |
---|
843 | n/a | And these few precepts in thy memory |
---|
844 | n/a | See thou character. Give thy thoughts no tongue, |
---|
845 | n/a | Nor any unproportioned thought his act. |
---|
846 | n/a | Be thou familiar, but by no means vulgar. |
---|
847 | n/a | Those friends thou hast, and their adoption tried, |
---|
848 | n/a | Grapple them to thy soul with hoops of steel; |
---|
849 | n/a | But do not dull thy palm with entertainment |
---|
850 | n/a | Of each new-hatch'd, unfledged comrade. Beware |
---|
851 | n/a | Of entrance to a quarrel, but being in, |
---|
852 | n/a | Bear't that the opposed may beware of thee. |
---|
853 | n/a | Give every man thy ear, but few thy voice; |
---|
854 | n/a | Take each man's censure, but reserve thy judgment. |
---|
855 | n/a | Costly thy habit as thy purse can buy, |
---|
856 | n/a | But not express'd in fancy; rich, not gaudy; |
---|
857 | n/a | For the apparel oft proclaims the man, |
---|
858 | n/a | And they in France of the best rank and station |
---|
859 | n/a | Are of a most select and generous chief in that. |
---|
860 | n/a | Neither a borrower nor a lender be; |
---|
861 | n/a | For loan oft loses both itself and friend, |
---|
862 | n/a | And borrowing dulls the edge of husbandry. |
---|
863 | n/a | This above all: to thine ownself be true, |
---|
864 | n/a | And it must follow, as the night the day, |
---|
865 | n/a | Thou canst not then be false to any man. |
---|
866 | n/a | Farewell: my blessing season this in thee! |
---|
867 | n/a | |
---|
868 | n/a | LAERTES |
---|
869 | n/a | |
---|
870 | n/a | Most humbly do I take my leave, my lord. |
---|
871 | n/a | |
---|
872 | n/a | LORD POLONIUS |
---|
873 | n/a | |
---|
874 | n/a | The time invites you; go; your servants tend. |
---|
875 | n/a | |
---|
876 | n/a | LAERTES |
---|
877 | n/a | |
---|
878 | n/a | Farewell, Ophelia; and remember well |
---|
879 | n/a | What I have said to you. |
---|
880 | n/a | |
---|
881 | n/a | OPHELIA |
---|
882 | n/a | |
---|
883 | n/a | 'Tis in my memory lock'd, |
---|
884 | n/a | And you yourself shall keep the key of it. |
---|
885 | n/a | |
---|
886 | n/a | LAERTES |
---|
887 | n/a | |
---|
888 | n/a | Farewell. |
---|
889 | n/a | """ |
---|
890 | n/a | |
---|
891 | n/a | |
---|
892 | n/a | class CustomInt: |
---|
893 | n/a | def __int__(self): |
---|
894 | n/a | return 100 |
---|
895 | n/a | |
---|
896 | n/a | |
---|
897 | n/a | if __name__ == "__main__": |
---|
898 | n/a | unittest.main() |
---|