ยปCore Development>Code coverage>Lib/test/test_bz2.py

Python code coverage for Lib/test/test_bz2.py

#countcontent
1n/afrom test import support
2n/afrom test.support import bigmemtest, _4G
3n/a
4n/aimport unittest
5n/afrom io import BytesIO, DEFAULT_BUFFER_SIZE
6n/aimport os
7n/aimport pickle
8n/aimport glob
9n/aimport pathlib
10n/aimport random
11n/aimport shutil
12n/aimport subprocess
13n/afrom test.support import unlink
14n/aimport _compression
15n/a
16n/atry:
17n/a import threading
18n/aexcept ImportError:
19n/a threading = None
20n/a
21n/a# Skip tests if the bz2 module doesn't exist.
22n/abz2 = support.import_module('bz2')
23n/afrom bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
24n/a
25n/ahas_cmdline_bunzip2 = None
26n/a
27n/adef ext_decompress(data):
28n/a global has_cmdline_bunzip2
29n/a if has_cmdline_bunzip2 is None:
30n/a has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
31n/a if has_cmdline_bunzip2:
32n/a return subprocess.check_output(['bunzip2'], input=data)
33n/a else:
34n/a return bz2.decompress(data)
35n/a
36n/aclass BaseTest(unittest.TestCase):
37n/a "Base for other testcases."
38n/a
39n/a TEXT_LINES = [
40n/a b'root:x:0:0:root:/root:/bin/bash\n',
41n/a b'bin:x:1:1:bin:/bin:\n',
42n/a b'daemon:x:2:2:daemon:/sbin:\n',
43n/a b'adm:x:3:4:adm:/var/adm:\n',
44n/a b'lp:x:4:7:lp:/var/spool/lpd:\n',
45n/a b'sync:x:5:0:sync:/sbin:/bin/sync\n',
46n/a b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n',
47n/a b'halt:x:7:0:halt:/sbin:/sbin/halt\n',
48n/a b'mail:x:8:12:mail:/var/spool/mail:\n',
49n/a b'news:x:9:13:news:/var/spool/news:\n',
50n/a b'uucp:x:10:14:uucp:/var/spool/uucp:\n',
51n/a b'operator:x:11:0:operator:/root:\n',
52n/a b'games:x:12:100:games:/usr/games:\n',
53n/a b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n',
54n/a b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n',
55n/a b'nobody:x:65534:65534:Nobody:/home:\n',
56n/a b'postfix:x:100:101:postfix:/var/spool/postfix:\n',
57n/a b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n',
58n/a b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n',
59n/a b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n',
60n/a b'www:x:103:104::/var/www:/bin/false\n',
61n/a ]
62n/a TEXT = b''.join(TEXT_LINES)
63n/a DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
64n/a EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
65n/a BAD_DATA = b'this is not a valid bzip2 file'
66n/a
67n/a # Some tests need more than one block of uncompressed data. Since one block
68n/a # is at least 100 kB, we gather some data dynamically and compress it.
69n/a # Note that this assumes that compression works correctly, so we cannot
70n/a # simply use the bigger test data for all tests.
71n/a test_size = 0
72n/a BIG_TEXT = bytearray(128*1024)
73n/a for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')):
74n/a with open(fname, 'rb') as fh:
75n/a test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
76n/a if test_size > 128*1024:
77n/a break
78n/a BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
79n/a
80n/a def setUp(self):
81n/a self.filename = support.TESTFN
82n/a
83n/a def tearDown(self):
84n/a if os.path.isfile(self.filename):
85n/a os.unlink(self.filename)
86n/a
87n/a
88n/aclass BZ2FileTest(BaseTest):
89n/a "Test the BZ2File class."
90n/a
91n/a def createTempFile(self, streams=1, suffix=b""):
92n/a with open(self.filename, "wb") as f:
93n/a f.write(self.DATA * streams)
94n/a f.write(suffix)
95n/a
96n/a def testBadArgs(self):
97n/a self.assertRaises(TypeError, BZ2File, 123.456)
98n/a self.assertRaises(ValueError, BZ2File, os.devnull, "z")
99n/a self.assertRaises(ValueError, BZ2File, os.devnull, "rx")
100n/a self.assertRaises(ValueError, BZ2File, os.devnull, "rbt")
101n/a self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0)
102n/a self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10)
103n/a
104n/a def testRead(self):
105n/a self.createTempFile()
106n/a with BZ2File(self.filename) as bz2f:
107n/a self.assertRaises(TypeError, bz2f.read, float())
108n/a self.assertEqual(bz2f.read(), self.TEXT)
109n/a
110n/a def testReadBadFile(self):
111n/a self.createTempFile(streams=0, suffix=self.BAD_DATA)
112n/a with BZ2File(self.filename) as bz2f:
113n/a self.assertRaises(OSError, bz2f.read)
114n/a
115n/a def testReadMultiStream(self):
116n/a self.createTempFile(streams=5)
117n/a with BZ2File(self.filename) as bz2f:
118n/a self.assertRaises(TypeError, bz2f.read, float())
119n/a self.assertEqual(bz2f.read(), self.TEXT * 5)
120n/a
121n/a def testReadMonkeyMultiStream(self):
122n/a # Test BZ2File.read() on a multi-stream archive where a stream
123n/a # boundary coincides with the end of the raw read buffer.
124n/a buffer_size = _compression.BUFFER_SIZE
125n/a _compression.BUFFER_SIZE = len(self.DATA)
126n/a try:
127n/a self.createTempFile(streams=5)
128n/a with BZ2File(self.filename) as bz2f:
129n/a self.assertRaises(TypeError, bz2f.read, float())
130n/a self.assertEqual(bz2f.read(), self.TEXT * 5)
131n/a finally:
132n/a _compression.BUFFER_SIZE = buffer_size
133n/a
134n/a def testReadTrailingJunk(self):
135n/a self.createTempFile(suffix=self.BAD_DATA)
136n/a with BZ2File(self.filename) as bz2f:
137n/a self.assertEqual(bz2f.read(), self.TEXT)
138n/a
139n/a def testReadMultiStreamTrailingJunk(self):
140n/a self.createTempFile(streams=5, suffix=self.BAD_DATA)
141n/a with BZ2File(self.filename) as bz2f:
142n/a self.assertEqual(bz2f.read(), self.TEXT * 5)
143n/a
144n/a def testRead0(self):
145n/a self.createTempFile()
146n/a with BZ2File(self.filename) as bz2f:
147n/a self.assertRaises(TypeError, bz2f.read, float())
148n/a self.assertEqual(bz2f.read(0), b"")
149n/a
150n/a def testReadChunk10(self):
151n/a self.createTempFile()
152n/a with BZ2File(self.filename) as bz2f:
153n/a text = b''
154n/a while True:
155n/a str = bz2f.read(10)
156n/a if not str:
157n/a break
158n/a text += str
159n/a self.assertEqual(text, self.TEXT)
160n/a
161n/a def testReadChunk10MultiStream(self):
162n/a self.createTempFile(streams=5)
163n/a with BZ2File(self.filename) as bz2f:
164n/a text = b''
165n/a while True:
166n/a str = bz2f.read(10)
167n/a if not str:
168n/a break
169n/a text += str
170n/a self.assertEqual(text, self.TEXT * 5)
171n/a
172n/a def testRead100(self):
173n/a self.createTempFile()
174n/a with BZ2File(self.filename) as bz2f:
175n/a self.assertEqual(bz2f.read(100), self.TEXT[:100])
176n/a
177n/a def testPeek(self):
178n/a self.createTempFile()
179n/a with BZ2File(self.filename) as bz2f:
180n/a pdata = bz2f.peek()
181n/a self.assertNotEqual(len(pdata), 0)
182n/a self.assertTrue(self.TEXT.startswith(pdata))
183n/a self.assertEqual(bz2f.read(), self.TEXT)
184n/a
185n/a def testReadInto(self):
186n/a self.createTempFile()
187n/a with BZ2File(self.filename) as bz2f:
188n/a n = 128
189n/a b = bytearray(n)
190n/a self.assertEqual(bz2f.readinto(b), n)
191n/a self.assertEqual(b, self.TEXT[:n])
192n/a n = len(self.TEXT) - n
193n/a b = bytearray(len(self.TEXT))
194n/a self.assertEqual(bz2f.readinto(b), n)
195n/a self.assertEqual(b[:n], self.TEXT[-n:])
196n/a
197n/a def testReadLine(self):
198n/a self.createTempFile()
199n/a with BZ2File(self.filename) as bz2f:
200n/a self.assertRaises(TypeError, bz2f.readline, None)
201n/a for line in self.TEXT_LINES:
202n/a self.assertEqual(bz2f.readline(), line)
203n/a
204n/a def testReadLineMultiStream(self):
205n/a self.createTempFile(streams=5)
206n/a with BZ2File(self.filename) as bz2f:
207n/a self.assertRaises(TypeError, bz2f.readline, None)
208n/a for line in self.TEXT_LINES * 5:
209n/a self.assertEqual(bz2f.readline(), line)
210n/a
211n/a def testReadLines(self):
212n/a self.createTempFile()
213n/a with BZ2File(self.filename) as bz2f:
214n/a self.assertRaises(TypeError, bz2f.readlines, None)
215n/a self.assertEqual(bz2f.readlines(), self.TEXT_LINES)
216n/a
217n/a def testReadLinesMultiStream(self):
218n/a self.createTempFile(streams=5)
219n/a with BZ2File(self.filename) as bz2f:
220n/a self.assertRaises(TypeError, bz2f.readlines, None)
221n/a self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5)
222n/a
223n/a def testIterator(self):
224n/a self.createTempFile()
225n/a with BZ2File(self.filename) as bz2f:
226n/a self.assertEqual(list(iter(bz2f)), self.TEXT_LINES)
227n/a
228n/a def testIteratorMultiStream(self):
229n/a self.createTempFile(streams=5)
230n/a with BZ2File(self.filename) as bz2f:
231n/a self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5)
232n/a
233n/a def testClosedIteratorDeadlock(self):
234n/a # Issue #3309: Iteration on a closed BZ2File should release the lock.
235n/a self.createTempFile()
236n/a bz2f = BZ2File(self.filename)
237n/a bz2f.close()
238n/a self.assertRaises(ValueError, next, bz2f)
239n/a # This call will deadlock if the above call failed to release the lock.
240n/a self.assertRaises(ValueError, bz2f.readlines)
241n/a
242n/a def testWrite(self):
243n/a with BZ2File(self.filename, "w") as bz2f:
244n/a self.assertRaises(TypeError, bz2f.write)
245n/a bz2f.write(self.TEXT)
246n/a with open(self.filename, 'rb') as f:
247n/a self.assertEqual(ext_decompress(f.read()), self.TEXT)
248n/a
249n/a def testWriteChunks10(self):
250n/a with BZ2File(self.filename, "w") as bz2f:
251n/a n = 0
252n/a while True:
253n/a str = self.TEXT[n*10:(n+1)*10]
254n/a if not str:
255n/a break
256n/a bz2f.write(str)
257n/a n += 1
258n/a with open(self.filename, 'rb') as f:
259n/a self.assertEqual(ext_decompress(f.read()), self.TEXT)
260n/a
261n/a def testWriteNonDefaultCompressLevel(self):
262n/a expected = bz2.compress(self.TEXT, compresslevel=5)
263n/a with BZ2File(self.filename, "w", compresslevel=5) as bz2f:
264n/a bz2f.write(self.TEXT)
265n/a with open(self.filename, "rb") as f:
266n/a self.assertEqual(f.read(), expected)
267n/a
268n/a def testWriteLines(self):
269n/a with BZ2File(self.filename, "w") as bz2f:
270n/a self.assertRaises(TypeError, bz2f.writelines)
271n/a bz2f.writelines(self.TEXT_LINES)
272n/a # Issue #1535500: Calling writelines() on a closed BZ2File
273n/a # should raise an exception.
274n/a self.assertRaises(ValueError, bz2f.writelines, ["a"])
275n/a with open(self.filename, 'rb') as f:
276n/a self.assertEqual(ext_decompress(f.read()), self.TEXT)
277n/a
278n/a def testWriteMethodsOnReadOnlyFile(self):
279n/a with BZ2File(self.filename, "w") as bz2f:
280n/a bz2f.write(b"abc")
281n/a
282n/a with BZ2File(self.filename, "r") as bz2f:
283n/a self.assertRaises(OSError, bz2f.write, b"a")
284n/a self.assertRaises(OSError, bz2f.writelines, [b"a"])
285n/a
286n/a def testAppend(self):
287n/a with BZ2File(self.filename, "w") as bz2f:
288n/a self.assertRaises(TypeError, bz2f.write)
289n/a bz2f.write(self.TEXT)
290n/a with BZ2File(self.filename, "a") as bz2f:
291n/a self.assertRaises(TypeError, bz2f.write)
292n/a bz2f.write(self.TEXT)
293n/a with open(self.filename, 'rb') as f:
294n/a self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
295n/a
296n/a def testSeekForward(self):
297n/a self.createTempFile()
298n/a with BZ2File(self.filename) as bz2f:
299n/a self.assertRaises(TypeError, bz2f.seek)
300n/a bz2f.seek(150)
301n/a self.assertEqual(bz2f.read(), self.TEXT[150:])
302n/a
303n/a def testSeekForwardAcrossStreams(self):
304n/a self.createTempFile(streams=2)
305n/a with BZ2File(self.filename) as bz2f:
306n/a self.assertRaises(TypeError, bz2f.seek)
307n/a bz2f.seek(len(self.TEXT) + 150)
308n/a self.assertEqual(bz2f.read(), self.TEXT[150:])
309n/a
310n/a def testSeekBackwards(self):
311n/a self.createTempFile()
312n/a with BZ2File(self.filename) as bz2f:
313n/a bz2f.read(500)
314n/a bz2f.seek(-150, 1)
315n/a self.assertEqual(bz2f.read(), self.TEXT[500-150:])
316n/a
317n/a def testSeekBackwardsAcrossStreams(self):
318n/a self.createTempFile(streams=2)
319n/a with BZ2File(self.filename) as bz2f:
320n/a readto = len(self.TEXT) + 100
321n/a while readto > 0:
322n/a readto -= len(bz2f.read(readto))
323n/a bz2f.seek(-150, 1)
324n/a self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT)
325n/a
326n/a def testSeekBackwardsFromEnd(self):
327n/a self.createTempFile()
328n/a with BZ2File(self.filename) as bz2f:
329n/a bz2f.seek(-150, 2)
330n/a self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
331n/a
332n/a def testSeekBackwardsFromEndAcrossStreams(self):
333n/a self.createTempFile(streams=2)
334n/a with BZ2File(self.filename) as bz2f:
335n/a bz2f.seek(-1000, 2)
336n/a self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:])
337n/a
338n/a def testSeekPostEnd(self):
339n/a self.createTempFile()
340n/a with BZ2File(self.filename) as bz2f:
341n/a bz2f.seek(150000)
342n/a self.assertEqual(bz2f.tell(), len(self.TEXT))
343n/a self.assertEqual(bz2f.read(), b"")
344n/a
345n/a def testSeekPostEndMultiStream(self):
346n/a self.createTempFile(streams=5)
347n/a with BZ2File(self.filename) as bz2f:
348n/a bz2f.seek(150000)
349n/a self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
350n/a self.assertEqual(bz2f.read(), b"")
351n/a
352n/a def testSeekPostEndTwice(self):
353n/a self.createTempFile()
354n/a with BZ2File(self.filename) as bz2f:
355n/a bz2f.seek(150000)
356n/a bz2f.seek(150000)
357n/a self.assertEqual(bz2f.tell(), len(self.TEXT))
358n/a self.assertEqual(bz2f.read(), b"")
359n/a
360n/a def testSeekPostEndTwiceMultiStream(self):
361n/a self.createTempFile(streams=5)
362n/a with BZ2File(self.filename) as bz2f:
363n/a bz2f.seek(150000)
364n/a bz2f.seek(150000)
365n/a self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
366n/a self.assertEqual(bz2f.read(), b"")
367n/a
368n/a def testSeekPreStart(self):
369n/a self.createTempFile()
370n/a with BZ2File(self.filename) as bz2f:
371n/a bz2f.seek(-150)
372n/a self.assertEqual(bz2f.tell(), 0)
373n/a self.assertEqual(bz2f.read(), self.TEXT)
374n/a
375n/a def testSeekPreStartMultiStream(self):
376n/a self.createTempFile(streams=2)
377n/a with BZ2File(self.filename) as bz2f:
378n/a bz2f.seek(-150)
379n/a self.assertEqual(bz2f.tell(), 0)
380n/a self.assertEqual(bz2f.read(), self.TEXT * 2)
381n/a
382n/a def testFileno(self):
383n/a self.createTempFile()
384n/a with open(self.filename, 'rb') as rawf:
385n/a bz2f = BZ2File(rawf)
386n/a try:
387n/a self.assertEqual(bz2f.fileno(), rawf.fileno())
388n/a finally:
389n/a bz2f.close()
390n/a self.assertRaises(ValueError, bz2f.fileno)
391n/a
392n/a def testSeekable(self):
393n/a bz2f = BZ2File(BytesIO(self.DATA))
394n/a try:
395n/a self.assertTrue(bz2f.seekable())
396n/a bz2f.read()
397n/a self.assertTrue(bz2f.seekable())
398n/a finally:
399n/a bz2f.close()
400n/a self.assertRaises(ValueError, bz2f.seekable)
401n/a
402n/a bz2f = BZ2File(BytesIO(), "w")
403n/a try:
404n/a self.assertFalse(bz2f.seekable())
405n/a finally:
406n/a bz2f.close()
407n/a self.assertRaises(ValueError, bz2f.seekable)
408n/a
409n/a src = BytesIO(self.DATA)
410n/a src.seekable = lambda: False
411n/a bz2f = BZ2File(src)
412n/a try:
413n/a self.assertFalse(bz2f.seekable())
414n/a finally:
415n/a bz2f.close()
416n/a self.assertRaises(ValueError, bz2f.seekable)
417n/a
418n/a def testReadable(self):
419n/a bz2f = BZ2File(BytesIO(self.DATA))
420n/a try:
421n/a self.assertTrue(bz2f.readable())
422n/a bz2f.read()
423n/a self.assertTrue(bz2f.readable())
424n/a finally:
425n/a bz2f.close()
426n/a self.assertRaises(ValueError, bz2f.readable)
427n/a
428n/a bz2f = BZ2File(BytesIO(), "w")
429n/a try:
430n/a self.assertFalse(bz2f.readable())
431n/a finally:
432n/a bz2f.close()
433n/a self.assertRaises(ValueError, bz2f.readable)
434n/a
435n/a def testWritable(self):
436n/a bz2f = BZ2File(BytesIO(self.DATA))
437n/a try:
438n/a self.assertFalse(bz2f.writable())
439n/a bz2f.read()
440n/a self.assertFalse(bz2f.writable())
441n/a finally:
442n/a bz2f.close()
443n/a self.assertRaises(ValueError, bz2f.writable)
444n/a
445n/a bz2f = BZ2File(BytesIO(), "w")
446n/a try:
447n/a self.assertTrue(bz2f.writable())
448n/a finally:
449n/a bz2f.close()
450n/a self.assertRaises(ValueError, bz2f.writable)
451n/a
452n/a def testOpenDel(self):
453n/a self.createTempFile()
454n/a for i in range(10000):
455n/a o = BZ2File(self.filename)
456n/a del o
457n/a
458n/a def testOpenNonexistent(self):
459n/a self.assertRaises(OSError, BZ2File, "/non/existent")
460n/a
461n/a def testReadlinesNoNewline(self):
462n/a # Issue #1191043: readlines() fails on a file containing no newline.
463n/a data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
464n/a with open(self.filename, "wb") as f:
465n/a f.write(data)
466n/a with BZ2File(self.filename) as bz2f:
467n/a lines = bz2f.readlines()
468n/a self.assertEqual(lines, [b'Test'])
469n/a with BZ2File(self.filename) as bz2f:
470n/a xlines = list(bz2f.readlines())
471n/a self.assertEqual(xlines, [b'Test'])
472n/a
473n/a def testContextProtocol(self):
474n/a f = None
475n/a with BZ2File(self.filename, "wb") as f:
476n/a f.write(b"xxx")
477n/a f = BZ2File(self.filename, "rb")
478n/a f.close()
479n/a try:
480n/a with f:
481n/a pass
482n/a except ValueError:
483n/a pass
484n/a else:
485n/a self.fail("__enter__ on a closed file didn't raise an exception")
486n/a try:
487n/a with BZ2File(self.filename, "wb") as f:
488n/a 1/0
489n/a except ZeroDivisionError:
490n/a pass
491n/a else:
492n/a self.fail("1/0 didn't raise an exception")
493n/a
494n/a @unittest.skipUnless(threading, 'Threading required for this test.')
495n/a def testThreading(self):
496n/a # Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
497n/a data = b"1" * 2**20
498n/a nthreads = 10
499n/a with BZ2File(self.filename, 'wb') as f:
500n/a def comp():
501n/a for i in range(5):
502n/a f.write(data)
503n/a threads = [threading.Thread(target=comp) for i in range(nthreads)]
504n/a with support.start_threads(threads):
505n/a pass
506n/a
507n/a def testWithoutThreading(self):
508n/a module = support.import_fresh_module("bz2", blocked=("threading",))
509n/a with module.BZ2File(self.filename, "wb") as f:
510n/a f.write(b"abc")
511n/a with module.BZ2File(self.filename, "rb") as f:
512n/a self.assertEqual(f.read(), b"abc")
513n/a
514n/a def testMixedIterationAndReads(self):
515n/a self.createTempFile()
516n/a linelen = len(self.TEXT_LINES[0])
517n/a halflen = linelen // 2
518n/a with BZ2File(self.filename) as bz2f:
519n/a bz2f.read(halflen)
520n/a self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
521n/a self.assertEqual(bz2f.read(), self.TEXT[linelen:])
522n/a with BZ2File(self.filename) as bz2f:
523n/a bz2f.readline()
524n/a self.assertEqual(next(bz2f), self.TEXT_LINES[1])
525n/a self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
526n/a with BZ2File(self.filename) as bz2f:
527n/a bz2f.readlines()
528n/a self.assertRaises(StopIteration, next, bz2f)
529n/a self.assertEqual(bz2f.readlines(), [])
530n/a
531n/a def testMultiStreamOrdering(self):
532n/a # Test the ordering of streams when reading a multi-stream archive.
533n/a data1 = b"foo" * 1000
534n/a data2 = b"bar" * 1000
535n/a with BZ2File(self.filename, "w") as bz2f:
536n/a bz2f.write(data1)
537n/a with BZ2File(self.filename, "a") as bz2f:
538n/a bz2f.write(data2)
539n/a with BZ2File(self.filename) as bz2f:
540n/a self.assertEqual(bz2f.read(), data1 + data2)
541n/a
542n/a def testOpenBytesFilename(self):
543n/a str_filename = self.filename
544n/a try:
545n/a bytes_filename = str_filename.encode("ascii")
546n/a except UnicodeEncodeError:
547n/a self.skipTest("Temporary file name needs to be ASCII")
548n/a with BZ2File(bytes_filename, "wb") as f:
549n/a f.write(self.DATA)
550n/a with BZ2File(bytes_filename, "rb") as f:
551n/a self.assertEqual(f.read(), self.DATA)
552n/a # Sanity check that we are actually operating on the right file.
553n/a with BZ2File(str_filename, "rb") as f:
554n/a self.assertEqual(f.read(), self.DATA)
555n/a
556n/a def testOpenPathLikeFilename(self):
557n/a filename = pathlib.Path(self.filename)
558n/a with BZ2File(filename, "wb") as f:
559n/a f.write(self.DATA)
560n/a with BZ2File(filename, "rb") as f:
561n/a self.assertEqual(f.read(), self.DATA)
562n/a
563n/a def testDecompressLimited(self):
564n/a """Decompressed data buffering should be limited"""
565n/a bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9)
566n/a self.assertLess(len(bomb), _compression.BUFFER_SIZE)
567n/a
568n/a decomp = BZ2File(BytesIO(bomb))
569n/a self.assertEqual(decomp.read(1), b'\0')
570n/a max_decomp = 1 + DEFAULT_BUFFER_SIZE
571n/a self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
572n/a "Excessive amount of data was decompressed")
573n/a
574n/a
575n/a # Tests for a BZ2File wrapping another file object:
576n/a
577n/a def testReadBytesIO(self):
578n/a with BytesIO(self.DATA) as bio:
579n/a with BZ2File(bio) as bz2f:
580n/a self.assertRaises(TypeError, bz2f.read, float())
581n/a self.assertEqual(bz2f.read(), self.TEXT)
582n/a self.assertFalse(bio.closed)
583n/a
584n/a def testPeekBytesIO(self):
585n/a with BytesIO(self.DATA) as bio:
586n/a with BZ2File(bio) as bz2f:
587n/a pdata = bz2f.peek()
588n/a self.assertNotEqual(len(pdata), 0)
589n/a self.assertTrue(self.TEXT.startswith(pdata))
590n/a self.assertEqual(bz2f.read(), self.TEXT)
591n/a
592n/a def testWriteBytesIO(self):
593n/a with BytesIO() as bio:
594n/a with BZ2File(bio, "w") as bz2f:
595n/a self.assertRaises(TypeError, bz2f.write)
596n/a bz2f.write(self.TEXT)
597n/a self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
598n/a self.assertFalse(bio.closed)
599n/a
600n/a def testSeekForwardBytesIO(self):
601n/a with BytesIO(self.DATA) as bio:
602n/a with BZ2File(bio) as bz2f:
603n/a self.assertRaises(TypeError, bz2f.seek)
604n/a bz2f.seek(150)
605n/a self.assertEqual(bz2f.read(), self.TEXT[150:])
606n/a
607n/a def testSeekBackwardsBytesIO(self):
608n/a with BytesIO(self.DATA) as bio:
609n/a with BZ2File(bio) as bz2f:
610n/a bz2f.read(500)
611n/a bz2f.seek(-150, 1)
612n/a self.assertEqual(bz2f.read(), self.TEXT[500-150:])
613n/a
614n/a def test_read_truncated(self):
615n/a # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
616n/a truncated = self.DATA[:-10]
617n/a with BZ2File(BytesIO(truncated)) as f:
618n/a self.assertRaises(EOFError, f.read)
619n/a with BZ2File(BytesIO(truncated)) as f:
620n/a self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
621n/a self.assertRaises(EOFError, f.read, 1)
622n/a # Incomplete 4-byte file header, and block header of at least 146 bits.
623n/a for i in range(22):
624n/a with BZ2File(BytesIO(truncated[:i])) as f:
625n/a self.assertRaises(EOFError, f.read, 1)
626n/a
627n/a
628n/aclass BZ2CompressorTest(BaseTest):
629n/a def testCompress(self):
630n/a bz2c = BZ2Compressor()
631n/a self.assertRaises(TypeError, bz2c.compress)
632n/a data = bz2c.compress(self.TEXT)
633n/a data += bz2c.flush()
634n/a self.assertEqual(ext_decompress(data), self.TEXT)
635n/a
636n/a def testCompressEmptyString(self):
637n/a bz2c = BZ2Compressor()
638n/a data = bz2c.compress(b'')
639n/a data += bz2c.flush()
640n/a self.assertEqual(data, self.EMPTY_DATA)
641n/a
642n/a def testCompressChunks10(self):
643n/a bz2c = BZ2Compressor()
644n/a n = 0
645n/a data = b''
646n/a while True:
647n/a str = self.TEXT[n*10:(n+1)*10]
648n/a if not str:
649n/a break
650n/a data += bz2c.compress(str)
651n/a n += 1
652n/a data += bz2c.flush()
653n/a self.assertEqual(ext_decompress(data), self.TEXT)
654n/a
655n/a @bigmemtest(size=_4G + 100, memuse=2)
656n/a def testCompress4G(self, size):
657n/a # "Test BZ2Compressor.compress()/flush() with >4GiB input"
658n/a bz2c = BZ2Compressor()
659n/a data = b"x" * size
660n/a try:
661n/a compressed = bz2c.compress(data)
662n/a compressed += bz2c.flush()
663n/a finally:
664n/a data = None # Release memory
665n/a data = bz2.decompress(compressed)
666n/a try:
667n/a self.assertEqual(len(data), size)
668n/a self.assertEqual(len(data.strip(b"x")), 0)
669n/a finally:
670n/a data = None
671n/a
672n/a def testPickle(self):
673n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
674n/a with self.assertRaises(TypeError):
675n/a pickle.dumps(BZ2Compressor(), proto)
676n/a
677n/a
678n/aclass BZ2DecompressorTest(BaseTest):
679n/a def test_Constructor(self):
680n/a self.assertRaises(TypeError, BZ2Decompressor, 42)
681n/a
682n/a def testDecompress(self):
683n/a bz2d = BZ2Decompressor()
684n/a self.assertRaises(TypeError, bz2d.decompress)
685n/a text = bz2d.decompress(self.DATA)
686n/a self.assertEqual(text, self.TEXT)
687n/a
688n/a def testDecompressChunks10(self):
689n/a bz2d = BZ2Decompressor()
690n/a text = b''
691n/a n = 0
692n/a while True:
693n/a str = self.DATA[n*10:(n+1)*10]
694n/a if not str:
695n/a break
696n/a text += bz2d.decompress(str)
697n/a n += 1
698n/a self.assertEqual(text, self.TEXT)
699n/a
700n/a def testDecompressUnusedData(self):
701n/a bz2d = BZ2Decompressor()
702n/a unused_data = b"this is unused data"
703n/a text = bz2d.decompress(self.DATA+unused_data)
704n/a self.assertEqual(text, self.TEXT)
705n/a self.assertEqual(bz2d.unused_data, unused_data)
706n/a
707n/a def testEOFError(self):
708n/a bz2d = BZ2Decompressor()
709n/a text = bz2d.decompress(self.DATA)
710n/a self.assertRaises(EOFError, bz2d.decompress, b"anything")
711n/a self.assertRaises(EOFError, bz2d.decompress, b"")
712n/a
713n/a @bigmemtest(size=_4G + 100, memuse=3.3)
714n/a def testDecompress4G(self, size):
715n/a # "Test BZ2Decompressor.decompress() with >4GiB input"
716n/a blocksize = 10 * 1024 * 1024
717n/a block = random.getrandbits(blocksize * 8).to_bytes(blocksize, 'little')
718n/a try:
719n/a data = block * (size // blocksize + 1)
720n/a compressed = bz2.compress(data)
721n/a bz2d = BZ2Decompressor()
722n/a decompressed = bz2d.decompress(compressed)
723n/a self.assertTrue(decompressed == data)
724n/a finally:
725n/a data = None
726n/a compressed = None
727n/a decompressed = None
728n/a
729n/a def testPickle(self):
730n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
731n/a with self.assertRaises(TypeError):
732n/a pickle.dumps(BZ2Decompressor(), proto)
733n/a
734n/a def testDecompressorChunksMaxsize(self):
735n/a bzd = BZ2Decompressor()
736n/a max_length = 100
737n/a out = []
738n/a
739n/a # Feed some input
740n/a len_ = len(self.BIG_DATA) - 64
741n/a out.append(bzd.decompress(self.BIG_DATA[:len_],
742n/a max_length=max_length))
743n/a self.assertFalse(bzd.needs_input)
744n/a self.assertEqual(len(out[-1]), max_length)
745n/a
746n/a # Retrieve more data without providing more input
747n/a out.append(bzd.decompress(b'', max_length=max_length))
748n/a self.assertFalse(bzd.needs_input)
749n/a self.assertEqual(len(out[-1]), max_length)
750n/a
751n/a # Retrieve more data while providing more input
752n/a out.append(bzd.decompress(self.BIG_DATA[len_:],
753n/a max_length=max_length))
754n/a self.assertLessEqual(len(out[-1]), max_length)
755n/a
756n/a # Retrieve remaining uncompressed data
757n/a while not bzd.eof:
758n/a out.append(bzd.decompress(b'', max_length=max_length))
759n/a self.assertLessEqual(len(out[-1]), max_length)
760n/a
761n/a out = b"".join(out)
762n/a self.assertEqual(out, self.BIG_TEXT)
763n/a self.assertEqual(bzd.unused_data, b"")
764n/a
765n/a def test_decompressor_inputbuf_1(self):
766n/a # Test reusing input buffer after moving existing
767n/a # contents to beginning
768n/a bzd = BZ2Decompressor()
769n/a out = []
770n/a
771n/a # Create input buffer and fill it
772n/a self.assertEqual(bzd.decompress(self.DATA[:100],
773n/a max_length=0), b'')
774n/a
775n/a # Retrieve some results, freeing capacity at beginning
776n/a # of input buffer
777n/a out.append(bzd.decompress(b'', 2))
778n/a
779n/a # Add more data that fits into input buffer after
780n/a # moving existing data to beginning
781n/a out.append(bzd.decompress(self.DATA[100:105], 15))
782n/a
783n/a # Decompress rest of data
784n/a out.append(bzd.decompress(self.DATA[105:]))
785n/a self.assertEqual(b''.join(out), self.TEXT)
786n/a
787n/a def test_decompressor_inputbuf_2(self):
788n/a # Test reusing input buffer by appending data at the
789n/a # end right away
790n/a bzd = BZ2Decompressor()
791n/a out = []
792n/a
793n/a # Create input buffer and empty it
794n/a self.assertEqual(bzd.decompress(self.DATA[:200],
795n/a max_length=0), b'')
796n/a out.append(bzd.decompress(b''))
797n/a
798n/a # Fill buffer with new data
799n/a out.append(bzd.decompress(self.DATA[200:280], 2))
800n/a
801n/a # Append some more data, not enough to require resize
802n/a out.append(bzd.decompress(self.DATA[280:300], 2))
803n/a
804n/a # Decompress rest of data
805n/a out.append(bzd.decompress(self.DATA[300:]))
806n/a self.assertEqual(b''.join(out), self.TEXT)
807n/a
808n/a def test_decompressor_inputbuf_3(self):
809n/a # Test reusing input buffer after extending it
810n/a
811n/a bzd = BZ2Decompressor()
812n/a out = []
813n/a
814n/a # Create almost full input buffer
815n/a out.append(bzd.decompress(self.DATA[:200], 5))
816n/a
817n/a # Add even more data to it, requiring resize
818n/a out.append(bzd.decompress(self.DATA[200:300], 5))
819n/a
820n/a # Decompress rest of data
821n/a out.append(bzd.decompress(self.DATA[300:]))
822n/a self.assertEqual(b''.join(out), self.TEXT)
823n/a
824n/a def test_failure(self):
825n/a bzd = BZ2Decompressor()
826n/a self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30)
827n/a # Previously, a second call could crash due to internal inconsistency
828n/a self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30)
829n/a
830n/aclass CompressDecompressTest(BaseTest):
831n/a def testCompress(self):
832n/a data = bz2.compress(self.TEXT)
833n/a self.assertEqual(ext_decompress(data), self.TEXT)
834n/a
835n/a def testCompressEmptyString(self):
836n/a text = bz2.compress(b'')
837n/a self.assertEqual(text, self.EMPTY_DATA)
838n/a
839n/a def testDecompress(self):
840n/a text = bz2.decompress(self.DATA)
841n/a self.assertEqual(text, self.TEXT)
842n/a
843n/a def testDecompressEmpty(self):
844n/a text = bz2.decompress(b"")
845n/a self.assertEqual(text, b"")
846n/a
847n/a def testDecompressToEmptyString(self):
848n/a text = bz2.decompress(self.EMPTY_DATA)
849n/a self.assertEqual(text, b'')
850n/a
851n/a def testDecompressIncomplete(self):
852n/a self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10])
853n/a
854n/a def testDecompressBadData(self):
855n/a self.assertRaises(OSError, bz2.decompress, self.BAD_DATA)
856n/a
857n/a def testDecompressMultiStream(self):
858n/a text = bz2.decompress(self.DATA * 5)
859n/a self.assertEqual(text, self.TEXT * 5)
860n/a
861n/a def testDecompressTrailingJunk(self):
862n/a text = bz2.decompress(self.DATA + self.BAD_DATA)
863n/a self.assertEqual(text, self.TEXT)
864n/a
865n/a def testDecompressMultiStreamTrailingJunk(self):
866n/a text = bz2.decompress(self.DATA * 5 + self.BAD_DATA)
867n/a self.assertEqual(text, self.TEXT * 5)
868n/a
869n/a
870n/aclass OpenTest(BaseTest):
871n/a "Test the open function."
872n/a
873n/a def open(self, *args, **kwargs):
874n/a return bz2.open(*args, **kwargs)
875n/a
876n/a def test_binary_modes(self):
877n/a for mode in ("wb", "xb"):
878n/a if mode == "xb":
879n/a unlink(self.filename)
880n/a with self.open(self.filename, mode) as f:
881n/a f.write(self.TEXT)
882n/a with open(self.filename, "rb") as f:
883n/a file_data = ext_decompress(f.read())
884n/a self.assertEqual(file_data, self.TEXT)
885n/a with self.open(self.filename, "rb") as f:
886n/a self.assertEqual(f.read(), self.TEXT)
887n/a with self.open(self.filename, "ab") as f:
888n/a f.write(self.TEXT)
889n/a with open(self.filename, "rb") as f:
890n/a file_data = ext_decompress(f.read())
891n/a self.assertEqual(file_data, self.TEXT * 2)
892n/a
893n/a def test_implicit_binary_modes(self):
894n/a # Test implicit binary modes (no "b" or "t" in mode string).
895n/a for mode in ("w", "x"):
896n/a if mode == "x":
897n/a unlink(self.filename)
898n/a with self.open(self.filename, mode) as f:
899n/a f.write(self.TEXT)
900n/a with open(self.filename, "rb") as f:
901n/a file_data = ext_decompress(f.read())
902n/a self.assertEqual(file_data, self.TEXT)
903n/a with self.open(self.filename, "r") as f:
904n/a self.assertEqual(f.read(), self.TEXT)
905n/a with self.open(self.filename, "a") as f:
906n/a f.write(self.TEXT)
907n/a with open(self.filename, "rb") as f:
908n/a file_data = ext_decompress(f.read())
909n/a self.assertEqual(file_data, self.TEXT * 2)
910n/a
911n/a def test_text_modes(self):
912n/a text = self.TEXT.decode("ascii")
913n/a text_native_eol = text.replace("\n", os.linesep)
914n/a for mode in ("wt", "xt"):
915n/a if mode == "xt":
916n/a unlink(self.filename)
917n/a with self.open(self.filename, mode) as f:
918n/a f.write(text)
919n/a with open(self.filename, "rb") as f:
920n/a file_data = ext_decompress(f.read()).decode("ascii")
921n/a self.assertEqual(file_data, text_native_eol)
922n/a with self.open(self.filename, "rt") as f:
923n/a self.assertEqual(f.read(), text)
924n/a with self.open(self.filename, "at") as f:
925n/a f.write(text)
926n/a with open(self.filename, "rb") as f:
927n/a file_data = ext_decompress(f.read()).decode("ascii")
928n/a self.assertEqual(file_data, text_native_eol * 2)
929n/a
930n/a def test_x_mode(self):
931n/a for mode in ("x", "xb", "xt"):
932n/a unlink(self.filename)
933n/a with self.open(self.filename, mode) as f:
934n/a pass
935n/a with self.assertRaises(FileExistsError):
936n/a with self.open(self.filename, mode) as f:
937n/a pass
938n/a
939n/a def test_fileobj(self):
940n/a with self.open(BytesIO(self.DATA), "r") as f:
941n/a self.assertEqual(f.read(), self.TEXT)
942n/a with self.open(BytesIO(self.DATA), "rb") as f:
943n/a self.assertEqual(f.read(), self.TEXT)
944n/a text = self.TEXT.decode("ascii")
945n/a with self.open(BytesIO(self.DATA), "rt") as f:
946n/a self.assertEqual(f.read(), text)
947n/a
948n/a def test_bad_params(self):
949n/a # Test invalid parameter combinations.
950n/a self.assertRaises(ValueError,
951n/a self.open, self.filename, "wbt")
952n/a self.assertRaises(ValueError,
953n/a self.open, self.filename, "xbt")
954n/a self.assertRaises(ValueError,
955n/a self.open, self.filename, "rb", encoding="utf-8")
956n/a self.assertRaises(ValueError,
957n/a self.open, self.filename, "rb", errors="ignore")
958n/a self.assertRaises(ValueError,
959n/a self.open, self.filename, "rb", newline="\n")
960n/a
961n/a def test_encoding(self):
962n/a # Test non-default encoding.
963n/a text = self.TEXT.decode("ascii")
964n/a text_native_eol = text.replace("\n", os.linesep)
965n/a with self.open(self.filename, "wt", encoding="utf-16-le") as f:
966n/a f.write(text)
967n/a with open(self.filename, "rb") as f:
968n/a file_data = ext_decompress(f.read()).decode("utf-16-le")
969n/a self.assertEqual(file_data, text_native_eol)
970n/a with self.open(self.filename, "rt", encoding="utf-16-le") as f:
971n/a self.assertEqual(f.read(), text)
972n/a
973n/a def test_encoding_error_handler(self):
974n/a # Test with non-default encoding error handler.
975n/a with self.open(self.filename, "wb") as f:
976n/a f.write(b"foo\xffbar")
977n/a with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
978n/a as f:
979n/a self.assertEqual(f.read(), "foobar")
980n/a
981n/a def test_newline(self):
982n/a # Test with explicit newline (universal newline mode disabled).
983n/a text = self.TEXT.decode("ascii")
984n/a with self.open(self.filename, "wt", newline="\n") as f:
985n/a f.write(text)
986n/a with self.open(self.filename, "rt", newline="\r") as f:
987n/a self.assertEqual(f.readlines(), [text])
988n/a
989n/a
990n/adef test_main():
991n/a support.run_unittest(
992n/a BZ2FileTest,
993n/a BZ2CompressorTest,
994n/a BZ2DecompressorTest,
995n/a CompressDecompressTest,
996n/a OpenTest,
997n/a )
998n/a support.reap_children()
999n/a
1000n/aif __name__ == '__main__':
1001n/a test_main()