ยปCore Development>Code coverage>Lib/lzma.py

Python code coverage for Lib/lzma.py

#countcontent
1n/a"""Interface to the liblzma compression library.
2n/a
3n/aThis module provides a class for reading and writing compressed files,
4n/aclasses for incremental (de)compression, and convenience functions for
5n/aone-shot (de)compression.
6n/a
7n/aThese classes and functions support both the XZ and legacy LZMA
8n/acontainer formats, as well as raw compressed data streams.
9n/a"""
10n/a
11n/a__all__ = [
12n/a "CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256",
13n/a "CHECK_ID_MAX", "CHECK_UNKNOWN",
14n/a "FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64",
15n/a "FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC",
16n/a "FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW",
17n/a "MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4",
18n/a "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
19n/a
20n/a "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
21n/a "open", "compress", "decompress", "is_check_supported",
22n/a]
23n/a
24n/aimport builtins
25n/aimport io
26n/aimport os
27n/afrom _lzma import *
28n/afrom _lzma import _encode_filter_properties, _decode_filter_properties
29n/aimport _compression
30n/a
31n/a
32n/a_MODE_CLOSED = 0
33n/a_MODE_READ = 1
34n/a# Value 2 no longer used
35n/a_MODE_WRITE = 3
36n/a
37n/a
38n/aclass LZMAFile(_compression.BaseStream):
39n/a
40n/a """A file object providing transparent LZMA (de)compression.
41n/a
42n/a An LZMAFile can act as a wrapper for an existing file object, or
43n/a refer directly to a named file on disk.
44n/a
45n/a Note that LZMAFile provides a *binary* file interface - data read
46n/a is returned as bytes, and data to be written must be given as bytes.
47n/a """
48n/a
49n/a def __init__(self, filename=None, mode="r", *,
50n/a format=None, check=-1, preset=None, filters=None):
51n/a """Open an LZMA-compressed file in binary mode.
52n/a
53n/a filename can be either an actual file name (given as a str,
54n/a bytes, or PathLike object), in which case the named file is
55n/a opened, or it can be an existing file object to read from or
56n/a write to.
57n/a
58n/a mode can be "r" for reading (default), "w" for (over)writing,
59n/a "x" for creating exclusively, or "a" for appending. These can
60n/a equivalently be given as "rb", "wb", "xb" and "ab" respectively.
61n/a
62n/a format specifies the container format to use for the file.
63n/a If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the
64n/a default is FORMAT_XZ.
65n/a
66n/a check specifies the integrity check to use. This argument can
67n/a only be used when opening a file for writing. For FORMAT_XZ,
68n/a the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not
69n/a support integrity checks - for these formats, check must be
70n/a omitted, or be CHECK_NONE.
71n/a
72n/a When opening a file for reading, the *preset* argument is not
73n/a meaningful, and should be omitted. The *filters* argument should
74n/a also be omitted, except when format is FORMAT_RAW (in which case
75n/a it is required).
76n/a
77n/a When opening a file for writing, the settings used by the
78n/a compressor can be specified either as a preset compression
79n/a level (with the *preset* argument), or in detail as a custom
80n/a filter chain (with the *filters* argument). For FORMAT_XZ and
81n/a FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset
82n/a level. For FORMAT_RAW, the caller must always specify a filter
83n/a chain; the raw compressor does not support preset compression
84n/a levels.
85n/a
86n/a preset (if provided) should be an integer in the range 0-9,
87n/a optionally OR-ed with the constant PRESET_EXTREME.
88n/a
89n/a filters (if provided) should be a sequence of dicts. Each dict
90n/a should have an entry for "id" indicating ID of the filter, plus
91n/a additional entries for options to the filter.
92n/a """
93n/a self._fp = None
94n/a self._closefp = False
95n/a self._mode = _MODE_CLOSED
96n/a
97n/a if mode in ("r", "rb"):
98n/a if check != -1:
99n/a raise ValueError("Cannot specify an integrity check "
100n/a "when opening a file for reading")
101n/a if preset is not None:
102n/a raise ValueError("Cannot specify a preset compression "
103n/a "level when opening a file for reading")
104n/a if format is None:
105n/a format = FORMAT_AUTO
106n/a mode_code = _MODE_READ
107n/a elif mode in ("w", "wb", "a", "ab", "x", "xb"):
108n/a if format is None:
109n/a format = FORMAT_XZ
110n/a mode_code = _MODE_WRITE
111n/a self._compressor = LZMACompressor(format=format, check=check,
112n/a preset=preset, filters=filters)
113n/a self._pos = 0
114n/a else:
115n/a raise ValueError("Invalid mode: {!r}".format(mode))
116n/a
117n/a if isinstance(filename, (str, bytes, os.PathLike)):
118n/a if "b" not in mode:
119n/a mode += "b"
120n/a self._fp = builtins.open(filename, mode)
121n/a self._closefp = True
122n/a self._mode = mode_code
123n/a elif hasattr(filename, "read") or hasattr(filename, "write"):
124n/a self._fp = filename
125n/a self._mode = mode_code
126n/a else:
127n/a raise TypeError("filename must be a str, bytes, file or PathLike object")
128n/a
129n/a if self._mode == _MODE_READ:
130n/a raw = _compression.DecompressReader(self._fp, LZMADecompressor,
131n/a trailing_error=LZMAError, format=format, filters=filters)
132n/a self._buffer = io.BufferedReader(raw)
133n/a
134n/a def close(self):
135n/a """Flush and close the file.
136n/a
137n/a May be called more than once without error. Once the file is
138n/a closed, any other operation on it will raise a ValueError.
139n/a """
140n/a if self._mode == _MODE_CLOSED:
141n/a return
142n/a try:
143n/a if self._mode == _MODE_READ:
144n/a self._buffer.close()
145n/a self._buffer = None
146n/a elif self._mode == _MODE_WRITE:
147n/a self._fp.write(self._compressor.flush())
148n/a self._compressor = None
149n/a finally:
150n/a try:
151n/a if self._closefp:
152n/a self._fp.close()
153n/a finally:
154n/a self._fp = None
155n/a self._closefp = False
156n/a self._mode = _MODE_CLOSED
157n/a
158n/a @property
159n/a def closed(self):
160n/a """True if this file is closed."""
161n/a return self._mode == _MODE_CLOSED
162n/a
163n/a def fileno(self):
164n/a """Return the file descriptor for the underlying file."""
165n/a self._check_not_closed()
166n/a return self._fp.fileno()
167n/a
168n/a def seekable(self):
169n/a """Return whether the file supports seeking."""
170n/a return self.readable() and self._buffer.seekable()
171n/a
172n/a def readable(self):
173n/a """Return whether the file was opened for reading."""
174n/a self._check_not_closed()
175n/a return self._mode == _MODE_READ
176n/a
177n/a def writable(self):
178n/a """Return whether the file was opened for writing."""
179n/a self._check_not_closed()
180n/a return self._mode == _MODE_WRITE
181n/a
182n/a def peek(self, size=-1):
183n/a """Return buffered data without advancing the file position.
184n/a
185n/a Always returns at least one byte of data, unless at EOF.
186n/a The exact number of bytes returned is unspecified.
187n/a """
188n/a self._check_can_read()
189n/a # Relies on the undocumented fact that BufferedReader.peek() always
190n/a # returns at least one byte (except at EOF)
191n/a return self._buffer.peek(size)
192n/a
193n/a def read(self, size=-1):
194n/a """Read up to size uncompressed bytes from the file.
195n/a
196n/a If size is negative or omitted, read until EOF is reached.
197n/a Returns b"" if the file is already at EOF.
198n/a """
199n/a self._check_can_read()
200n/a return self._buffer.read(size)
201n/a
202n/a def read1(self, size=-1):
203n/a """Read up to size uncompressed bytes, while trying to avoid
204n/a making multiple reads from the underlying stream. Reads up to a
205n/a buffer's worth of data if size is negative.
206n/a
207n/a Returns b"" if the file is at EOF.
208n/a """
209n/a self._check_can_read()
210n/a if size < 0:
211n/a size = io.DEFAULT_BUFFER_SIZE
212n/a return self._buffer.read1(size)
213n/a
214n/a def readline(self, size=-1):
215n/a """Read a line of uncompressed bytes from the file.
216n/a
217n/a The terminating newline (if present) is retained. If size is
218n/a non-negative, no more than size bytes will be read (in which
219n/a case the line may be incomplete). Returns b'' if already at EOF.
220n/a """
221n/a self._check_can_read()
222n/a return self._buffer.readline(size)
223n/a
224n/a def write(self, data):
225n/a """Write a bytes object to the file.
226n/a
227n/a Returns the number of uncompressed bytes written, which is
228n/a always len(data). Note that due to buffering, the file on disk
229n/a may not reflect the data written until close() is called.
230n/a """
231n/a self._check_can_write()
232n/a compressed = self._compressor.compress(data)
233n/a self._fp.write(compressed)
234n/a self._pos += len(data)
235n/a return len(data)
236n/a
237n/a def seek(self, offset, whence=io.SEEK_SET):
238n/a """Change the file position.
239n/a
240n/a The new position is specified by offset, relative to the
241n/a position indicated by whence. Possible values for whence are:
242n/a
243n/a 0: start of stream (default): offset must not be negative
244n/a 1: current stream position
245n/a 2: end of stream; offset must not be positive
246n/a
247n/a Returns the new file position.
248n/a
249n/a Note that seeking is emulated, so depending on the parameters,
250n/a this operation may be extremely slow.
251n/a """
252n/a self._check_can_seek()
253n/a return self._buffer.seek(offset, whence)
254n/a
255n/a def tell(self):
256n/a """Return the current file position."""
257n/a self._check_not_closed()
258n/a if self._mode == _MODE_READ:
259n/a return self._buffer.tell()
260n/a return self._pos
261n/a
262n/a
263n/adef open(filename, mode="rb", *,
264n/a format=None, check=-1, preset=None, filters=None,
265n/a encoding=None, errors=None, newline=None):
266n/a """Open an LZMA-compressed file in binary or text mode.
267n/a
268n/a filename can be either an actual file name (given as a str, bytes,
269n/a or PathLike object), in which case the named file is opened, or it
270n/a can be an existing file object to read from or write to.
271n/a
272n/a The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb",
273n/a "a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text
274n/a mode.
275n/a
276n/a The format, check, preset and filters arguments specify the
277n/a compression settings, as for LZMACompressor, LZMADecompressor and
278n/a LZMAFile.
279n/a
280n/a For binary mode, this function is equivalent to the LZMAFile
281n/a constructor: LZMAFile(filename, mode, ...). In this case, the
282n/a encoding, errors and newline arguments must not be provided.
283n/a
284n/a For text mode, an LZMAFile object is created, and wrapped in an
285n/a io.TextIOWrapper instance with the specified encoding, error
286n/a handling behavior, and line ending(s).
287n/a
288n/a """
289n/a if "t" in mode:
290n/a if "b" in mode:
291n/a raise ValueError("Invalid mode: %r" % (mode,))
292n/a else:
293n/a if encoding is not None:
294n/a raise ValueError("Argument 'encoding' not supported in binary mode")
295n/a if errors is not None:
296n/a raise ValueError("Argument 'errors' not supported in binary mode")
297n/a if newline is not None:
298n/a raise ValueError("Argument 'newline' not supported in binary mode")
299n/a
300n/a lz_mode = mode.replace("t", "")
301n/a binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
302n/a preset=preset, filters=filters)
303n/a
304n/a if "t" in mode:
305n/a return io.TextIOWrapper(binary_file, encoding, errors, newline)
306n/a else:
307n/a return binary_file
308n/a
309n/a
310n/adef compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
311n/a """Compress a block of data.
312n/a
313n/a Refer to LZMACompressor's docstring for a description of the
314n/a optional arguments *format*, *check*, *preset* and *filters*.
315n/a
316n/a For incremental compression, use an LZMACompressor instead.
317n/a """
318n/a comp = LZMACompressor(format, check, preset, filters)
319n/a return comp.compress(data) + comp.flush()
320n/a
321n/a
322n/adef decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None):
323n/a """Decompress a block of data.
324n/a
325n/a Refer to LZMADecompressor's docstring for a description of the
326n/a optional arguments *format*, *check* and *filters*.
327n/a
328n/a For incremental decompression, use an LZMADecompressor instead.
329n/a """
330n/a results = []
331n/a while True:
332n/a decomp = LZMADecompressor(format, memlimit, filters)
333n/a try:
334n/a res = decomp.decompress(data)
335n/a except LZMAError:
336n/a if results:
337n/a break # Leftover data is not a valid LZMA/XZ stream; ignore it.
338n/a else:
339n/a raise # Error on the first iteration; bail out.
340n/a results.append(res)
341n/a if not decomp.eof:
342n/a raise LZMAError("Compressed data ended before the "
343n/a "end-of-stream marker was reached")
344n/a data = decomp.unused_data
345n/a if not data:
346n/a break
347n/a return b"".join(results)