ยปCore Development>Code coverage>Lib/_compression.py

Python code coverage for Lib/_compression.py

#countcontent
1n/a"""Internal classes used by the gzip, lzma and bz2 modules"""
2n/a
3n/aimport io
4n/a
5n/a
6n/aBUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size
7n/a
8n/a
9n/aclass BaseStream(io.BufferedIOBase):
10n/a """Mode-checking helper functions."""
11n/a
12n/a def _check_not_closed(self):
13n/a if self.closed:
14n/a raise ValueError("I/O operation on closed file")
15n/a
16n/a def _check_can_read(self):
17n/a if not self.readable():
18n/a raise io.UnsupportedOperation("File not open for reading")
19n/a
20n/a def _check_can_write(self):
21n/a if not self.writable():
22n/a raise io.UnsupportedOperation("File not open for writing")
23n/a
24n/a def _check_can_seek(self):
25n/a if not self.readable():
26n/a raise io.UnsupportedOperation("Seeking is only supported "
27n/a "on files open for reading")
28n/a if not self.seekable():
29n/a raise io.UnsupportedOperation("The underlying file object "
30n/a "does not support seeking")
31n/a
32n/a
33n/aclass DecompressReader(io.RawIOBase):
34n/a """Adapts the decompressor API to a RawIOBase reader API"""
35n/a
36n/a def readable(self):
37n/a return True
38n/a
39n/a def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
40n/a self._fp = fp
41n/a self._eof = False
42n/a self._pos = 0 # Current offset in decompressed stream
43n/a
44n/a # Set to size of decompressed stream once it is known, for SEEK_END
45n/a self._size = -1
46n/a
47n/a # Save the decompressor factory and arguments.
48n/a # If the file contains multiple compressed streams, each
49n/a # stream will need a separate decompressor object. A new decompressor
50n/a # object is also needed when implementing a backwards seek().
51n/a self._decomp_factory = decomp_factory
52n/a self._decomp_args = decomp_args
53n/a self._decompressor = self._decomp_factory(**self._decomp_args)
54n/a
55n/a # Exception class to catch from decompressor signifying invalid
56n/a # trailing data to ignore
57n/a self._trailing_error = trailing_error
58n/a
59n/a def close(self):
60n/a self._decompressor = None
61n/a return super().close()
62n/a
63n/a def seekable(self):
64n/a return self._fp.seekable()
65n/a
66n/a def readinto(self, b):
67n/a with memoryview(b) as view, view.cast("B") as byte_view:
68n/a data = self.read(len(byte_view))
69n/a byte_view[:len(data)] = data
70n/a return len(data)
71n/a
72n/a def read(self, size=-1):
73n/a if size < 0:
74n/a return self.readall()
75n/a
76n/a if not size or self._eof:
77n/a return b""
78n/a data = None # Default if EOF is encountered
79n/a # Depending on the input data, our call to the decompressor may not
80n/a # return any data. In this case, try again after reading another block.
81n/a while True:
82n/a if self._decompressor.eof:
83n/a rawblock = (self._decompressor.unused_data or
84n/a self._fp.read(BUFFER_SIZE))
85n/a if not rawblock:
86n/a break
87n/a # Continue to next stream.
88n/a self._decompressor = self._decomp_factory(
89n/a **self._decomp_args)
90n/a try:
91n/a data = self._decompressor.decompress(rawblock, size)
92n/a except self._trailing_error:
93n/a # Trailing data isn't a valid compressed stream; ignore it.
94n/a break
95n/a else:
96n/a if self._decompressor.needs_input:
97n/a rawblock = self._fp.read(BUFFER_SIZE)
98n/a if not rawblock:
99n/a raise EOFError("Compressed file ended before the "
100n/a "end-of-stream marker was reached")
101n/a else:
102n/a rawblock = b""
103n/a data = self._decompressor.decompress(rawblock, size)
104n/a if data:
105n/a break
106n/a if not data:
107n/a self._eof = True
108n/a self._size = self._pos
109n/a return b""
110n/a self._pos += len(data)
111n/a return data
112n/a
113n/a # Rewind the file to the beginning of the data stream.
114n/a def _rewind(self):
115n/a self._fp.seek(0)
116n/a self._eof = False
117n/a self._pos = 0
118n/a self._decompressor = self._decomp_factory(**self._decomp_args)
119n/a
120n/a def seek(self, offset, whence=io.SEEK_SET):
121n/a # Recalculate offset as an absolute file position.
122n/a if whence == io.SEEK_SET:
123n/a pass
124n/a elif whence == io.SEEK_CUR:
125n/a offset = self._pos + offset
126n/a elif whence == io.SEEK_END:
127n/a # Seeking relative to EOF - we need to know the file's size.
128n/a if self._size < 0:
129n/a while self.read(io.DEFAULT_BUFFER_SIZE):
130n/a pass
131n/a offset = self._size + offset
132n/a else:
133n/a raise ValueError("Invalid value for whence: {}".format(whence))
134n/a
135n/a # Make it so that offset is the number of bytes to skip forward.
136n/a if offset < self._pos:
137n/a self._rewind()
138n/a else:
139n/a offset -= self._pos
140n/a
141n/a # Read and discard data until we reach the desired position.
142n/a while offset > 0:
143n/a data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
144n/a if not data:
145n/a break
146n/a offset -= len(data)
147n/a
148n/a return self._pos
149n/a
150n/a def tell(self):
151n/a """Return the current file position."""
152n/a return self._pos