ยปCore Development>Code coverage>Lib/gzip.py

Python code coverage for Lib/gzip.py

#countcontent
1n/a"""Functions that read and write gzipped files.
2n/a
3n/aThe user of the file doesn't have to worry about the compression,
4n/abut random access is not allowed."""
5n/a
6n/a# based on Andrew Kuchling's minigzip.py distributed with the zlib module
7n/a
8n/aimport struct, sys, time, os
9n/aimport zlib
10n/aimport builtins
11n/aimport io
12n/aimport _compression
13n/a
14n/a__all__ = ["GzipFile", "open", "compress", "decompress"]
15n/a
16n/aFTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16
17n/a
18n/aREAD, WRITE = 1, 2
19n/a
20n/adef open(filename, mode="rb", compresslevel=9,
21n/a encoding=None, errors=None, newline=None):
22n/a """Open a gzip-compressed file in binary or text mode.
23n/a
24n/a The filename argument can be an actual filename (a str or bytes object), or
25n/a an existing file object to read from or write to.
26n/a
27n/a The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for
28n/a binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is
29n/a "rb", and the default compresslevel is 9.
30n/a
31n/a For binary mode, this function is equivalent to the GzipFile constructor:
32n/a GzipFile(filename, mode, compresslevel). In this case, the encoding, errors
33n/a and newline arguments must not be provided.
34n/a
35n/a For text mode, a GzipFile object is created, and wrapped in an
36n/a io.TextIOWrapper instance with the specified encoding, error handling
37n/a behavior, and line ending(s).
38n/a
39n/a """
40n/a if "t" in mode:
41n/a if "b" in mode:
42n/a raise ValueError("Invalid mode: %r" % (mode,))
43n/a else:
44n/a if encoding is not None:
45n/a raise ValueError("Argument 'encoding' not supported in binary mode")
46n/a if errors is not None:
47n/a raise ValueError("Argument 'errors' not supported in binary mode")
48n/a if newline is not None:
49n/a raise ValueError("Argument 'newline' not supported in binary mode")
50n/a
51n/a gz_mode = mode.replace("t", "")
52n/a if isinstance(filename, (str, bytes, os.PathLike)):
53n/a binary_file = GzipFile(filename, gz_mode, compresslevel)
54n/a elif hasattr(filename, "read") or hasattr(filename, "write"):
55n/a binary_file = GzipFile(None, gz_mode, compresslevel, filename)
56n/a else:
57n/a raise TypeError("filename must be a str or bytes object, or a file")
58n/a
59n/a if "t" in mode:
60n/a return io.TextIOWrapper(binary_file, encoding, errors, newline)
61n/a else:
62n/a return binary_file
63n/a
64n/adef write32u(output, value):
65n/a # The L format writes the bit pattern correctly whether signed
66n/a # or unsigned.
67n/a output.write(struct.pack("<L", value))
68n/a
69n/aclass _PaddedFile:
70n/a """Minimal read-only file object that prepends a string to the contents
71n/a of an actual file. Shouldn't be used outside of gzip.py, as it lacks
72n/a essential functionality."""
73n/a
74n/a def __init__(self, f, prepend=b''):
75n/a self._buffer = prepend
76n/a self._length = len(prepend)
77n/a self.file = f
78n/a self._read = 0
79n/a
80n/a def read(self, size):
81n/a if self._read is None:
82n/a return self.file.read(size)
83n/a if self._read + size <= self._length:
84n/a read = self._read
85n/a self._read += size
86n/a return self._buffer[read:self._read]
87n/a else:
88n/a read = self._read
89n/a self._read = None
90n/a return self._buffer[read:] + \
91n/a self.file.read(size-self._length+read)
92n/a
93n/a def prepend(self, prepend=b''):
94n/a if self._read is None:
95n/a self._buffer = prepend
96n/a else: # Assume data was read since the last prepend() call
97n/a self._read -= len(prepend)
98n/a return
99n/a self._length = len(self._buffer)
100n/a self._read = 0
101n/a
102n/a def seek(self, off):
103n/a self._read = None
104n/a self._buffer = None
105n/a return self.file.seek(off)
106n/a
107n/a def seekable(self):
108n/a return True # Allows fast-forwarding even in unseekable streams
109n/a
110n/aclass GzipFile(_compression.BaseStream):
111n/a """The GzipFile class simulates most of the methods of a file object with
112n/a the exception of the truncate() method.
113n/a
114n/a This class only supports opening files in binary mode. If you need to open a
115n/a compressed file in text mode, use the gzip.open() function.
116n/a
117n/a """
118n/a
119n/a # Overridden with internal file object to be closed, if only a filename
120n/a # is passed in
121n/a myfileobj = None
122n/a
123n/a def __init__(self, filename=None, mode=None,
124n/a compresslevel=9, fileobj=None, mtime=None):
125n/a """Constructor for the GzipFile class.
126n/a
127n/a At least one of fileobj and filename must be given a
128n/a non-trivial value.
129n/a
130n/a The new class instance is based on fileobj, which can be a regular
131n/a file, an io.BytesIO object, or any other object which simulates a file.
132n/a It defaults to None, in which case filename is opened to provide
133n/a a file object.
134n/a
135n/a When fileobj is not None, the filename argument is only used to be
136n/a included in the gzip file header, which may include the original
137n/a filename of the uncompressed file. It defaults to the filename of
138n/a fileobj, if discernible; otherwise, it defaults to the empty string,
139n/a and in this case the original filename is not included in the header.
140n/a
141n/a The mode argument can be any of 'r', 'rb', 'a', 'ab', 'w', 'wb', 'x', or
142n/a 'xb' depending on whether the file will be read or written. The default
143n/a is the mode of fileobj if discernible; otherwise, the default is 'rb'.
144n/a A mode of 'r' is equivalent to one of 'rb', and similarly for 'w' and
145n/a 'wb', 'a' and 'ab', and 'x' and 'xb'.
146n/a
147n/a The compresslevel argument is an integer from 0 to 9 controlling the
148n/a level of compression; 1 is fastest and produces the least compression,
149n/a and 9 is slowest and produces the most compression. 0 is no compression
150n/a at all. The default is 9.
151n/a
152n/a The mtime argument is an optional numeric timestamp to be written
153n/a to the last modification time field in the stream when compressing.
154n/a If omitted or None, the current time is used.
155n/a
156n/a """
157n/a
158n/a if mode and ('t' in mode or 'U' in mode):
159n/a raise ValueError("Invalid mode: {!r}".format(mode))
160n/a if mode and 'b' not in mode:
161n/a mode += 'b'
162n/a if fileobj is None:
163n/a fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
164n/a if filename is None:
165n/a filename = getattr(fileobj, 'name', '')
166n/a if not isinstance(filename, (str, bytes)):
167n/a filename = ''
168n/a else:
169n/a filename = os.fspath(filename)
170n/a if mode is None:
171n/a mode = getattr(fileobj, 'mode', 'rb')
172n/a
173n/a if mode.startswith('r'):
174n/a self.mode = READ
175n/a raw = _GzipReader(fileobj)
176n/a self._buffer = io.BufferedReader(raw)
177n/a self.name = filename
178n/a
179n/a elif mode.startswith(('w', 'a', 'x')):
180n/a self.mode = WRITE
181n/a self._init_write(filename)
182n/a self.compress = zlib.compressobj(compresslevel,
183n/a zlib.DEFLATED,
184n/a -zlib.MAX_WBITS,
185n/a zlib.DEF_MEM_LEVEL,
186n/a 0)
187n/a self._write_mtime = mtime
188n/a else:
189n/a raise ValueError("Invalid mode: {!r}".format(mode))
190n/a
191n/a self.fileobj = fileobj
192n/a
193n/a if self.mode == WRITE:
194n/a self._write_gzip_header()
195n/a
196n/a @property
197n/a def filename(self):
198n/a import warnings
199n/a warnings.warn("use the name attribute", DeprecationWarning, 2)
200n/a if self.mode == WRITE and self.name[-3:] != ".gz":
201n/a return self.name + ".gz"
202n/a return self.name
203n/a
204n/a @property
205n/a def mtime(self):
206n/a """Last modification time read from stream, or None"""
207n/a return self._buffer.raw._last_mtime
208n/a
209n/a def __repr__(self):
210n/a s = repr(self.fileobj)
211n/a return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>'
212n/a
213n/a def _init_write(self, filename):
214n/a self.name = filename
215n/a self.crc = zlib.crc32(b"")
216n/a self.size = 0
217n/a self.writebuf = []
218n/a self.bufsize = 0
219n/a self.offset = 0 # Current file offset for seek(), tell(), etc
220n/a
221n/a def _write_gzip_header(self):
222n/a self.fileobj.write(b'\037\213') # magic header
223n/a self.fileobj.write(b'\010') # compression method
224n/a try:
225n/a # RFC 1952 requires the FNAME field to be Latin-1. Do not
226n/a # include filenames that cannot be represented that way.
227n/a fname = os.path.basename(self.name)
228n/a if not isinstance(fname, bytes):
229n/a fname = fname.encode('latin-1')
230n/a if fname.endswith(b'.gz'):
231n/a fname = fname[:-3]
232n/a except UnicodeEncodeError:
233n/a fname = b''
234n/a flags = 0
235n/a if fname:
236n/a flags = FNAME
237n/a self.fileobj.write(chr(flags).encode('latin-1'))
238n/a mtime = self._write_mtime
239n/a if mtime is None:
240n/a mtime = time.time()
241n/a write32u(self.fileobj, int(mtime))
242n/a self.fileobj.write(b'\002')
243n/a self.fileobj.write(b'\377')
244n/a if fname:
245n/a self.fileobj.write(fname + b'\000')
246n/a
247n/a def write(self,data):
248n/a self._check_not_closed()
249n/a if self.mode != WRITE:
250n/a import errno
251n/a raise OSError(errno.EBADF, "write() on read-only GzipFile object")
252n/a
253n/a if self.fileobj is None:
254n/a raise ValueError("write() on closed GzipFile object")
255n/a
256n/a if isinstance(data, bytes):
257n/a length = len(data)
258n/a else:
259n/a # accept any data that supports the buffer protocol
260n/a data = memoryview(data)
261n/a length = data.nbytes
262n/a
263n/a if length > 0:
264n/a self.fileobj.write(self.compress.compress(data))
265n/a self.size += length
266n/a self.crc = zlib.crc32(data, self.crc)
267n/a self.offset += length
268n/a
269n/a return length
270n/a
271n/a def read(self, size=-1):
272n/a self._check_not_closed()
273n/a if self.mode != READ:
274n/a import errno
275n/a raise OSError(errno.EBADF, "read() on write-only GzipFile object")
276n/a return self._buffer.read(size)
277n/a
278n/a def read1(self, size=-1):
279n/a """Implements BufferedIOBase.read1()
280n/a
281n/a Reads up to a buffer's worth of data is size is negative."""
282n/a self._check_not_closed()
283n/a if self.mode != READ:
284n/a import errno
285n/a raise OSError(errno.EBADF, "read1() on write-only GzipFile object")
286n/a
287n/a if size < 0:
288n/a size = io.DEFAULT_BUFFER_SIZE
289n/a return self._buffer.read1(size)
290n/a
291n/a def peek(self, n):
292n/a self._check_not_closed()
293n/a if self.mode != READ:
294n/a import errno
295n/a raise OSError(errno.EBADF, "peek() on write-only GzipFile object")
296n/a return self._buffer.peek(n)
297n/a
298n/a @property
299n/a def closed(self):
300n/a return self.fileobj is None
301n/a
302n/a def close(self):
303n/a fileobj = self.fileobj
304n/a if fileobj is None:
305n/a return
306n/a self.fileobj = None
307n/a try:
308n/a if self.mode == WRITE:
309n/a fileobj.write(self.compress.flush())
310n/a write32u(fileobj, self.crc)
311n/a # self.size may exceed 2GB, or even 4GB
312n/a write32u(fileobj, self.size & 0xffffffff)
313n/a elif self.mode == READ:
314n/a self._buffer.close()
315n/a finally:
316n/a myfileobj = self.myfileobj
317n/a if myfileobj:
318n/a self.myfileobj = None
319n/a myfileobj.close()
320n/a
321n/a def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
322n/a self._check_not_closed()
323n/a if self.mode == WRITE:
324n/a # Ensure the compressor's buffer is flushed
325n/a self.fileobj.write(self.compress.flush(zlib_mode))
326n/a self.fileobj.flush()
327n/a
328n/a def fileno(self):
329n/a """Invoke the underlying file object's fileno() method.
330n/a
331n/a This will raise AttributeError if the underlying file object
332n/a doesn't support fileno().
333n/a """
334n/a return self.fileobj.fileno()
335n/a
336n/a def rewind(self):
337n/a '''Return the uncompressed stream file position indicator to the
338n/a beginning of the file'''
339n/a if self.mode != READ:
340n/a raise OSError("Can't rewind in write mode")
341n/a self._buffer.seek(0)
342n/a
343n/a def readable(self):
344n/a return self.mode == READ
345n/a
346n/a def writable(self):
347n/a return self.mode == WRITE
348n/a
349n/a def seekable(self):
350n/a return True
351n/a
352n/a def seek(self, offset, whence=io.SEEK_SET):
353n/a if self.mode == WRITE:
354n/a if whence != io.SEEK_SET:
355n/a if whence == io.SEEK_CUR:
356n/a offset = self.offset + offset
357n/a else:
358n/a raise ValueError('Seek from end not supported')
359n/a if offset < self.offset:
360n/a raise OSError('Negative seek in write mode')
361n/a count = offset - self.offset
362n/a chunk = b'\0' * 1024
363n/a for i in range(count // 1024):
364n/a self.write(chunk)
365n/a self.write(b'\0' * (count % 1024))
366n/a elif self.mode == READ:
367n/a self._check_not_closed()
368n/a return self._buffer.seek(offset, whence)
369n/a
370n/a return self.offset
371n/a
372n/a def readline(self, size=-1):
373n/a self._check_not_closed()
374n/a return self._buffer.readline(size)
375n/a
376n/a
377n/aclass _GzipReader(_compression.DecompressReader):
378n/a def __init__(self, fp):
379n/a super().__init__(_PaddedFile(fp), zlib.decompressobj,
380n/a wbits=-zlib.MAX_WBITS)
381n/a # Set flag indicating start of a new member
382n/a self._new_member = True
383n/a self._last_mtime = None
384n/a
385n/a def _init_read(self):
386n/a self._crc = zlib.crc32(b"")
387n/a self._stream_size = 0 # Decompressed size of unconcatenated stream
388n/a
389n/a def _read_exact(self, n):
390n/a '''Read exactly *n* bytes from `self._fp`
391n/a
392n/a This method is required because self._fp may be unbuffered,
393n/a i.e. return short reads.
394n/a '''
395n/a
396n/a data = self._fp.read(n)
397n/a while len(data) < n:
398n/a b = self._fp.read(n - len(data))
399n/a if not b:
400n/a raise EOFError("Compressed file ended before the "
401n/a "end-of-stream marker was reached")
402n/a data += b
403n/a return data
404n/a
405n/a def _read_gzip_header(self):
406n/a magic = self._fp.read(2)
407n/a if magic == b'':
408n/a return False
409n/a
410n/a if magic != b'\037\213':
411n/a raise OSError('Not a gzipped file (%r)' % magic)
412n/a
413n/a (method, flag,
414n/a self._last_mtime) = struct.unpack("<BBIxx", self._read_exact(8))
415n/a if method != 8:
416n/a raise OSError('Unknown compression method')
417n/a
418n/a if flag & FEXTRA:
419n/a # Read & discard the extra field, if present
420n/a extra_len, = struct.unpack("<H", self._read_exact(2))
421n/a self._read_exact(extra_len)
422n/a if flag & FNAME:
423n/a # Read and discard a null-terminated string containing the filename
424n/a while True:
425n/a s = self._fp.read(1)
426n/a if not s or s==b'\000':
427n/a break
428n/a if flag & FCOMMENT:
429n/a # Read and discard a null-terminated string containing a comment
430n/a while True:
431n/a s = self._fp.read(1)
432n/a if not s or s==b'\000':
433n/a break
434n/a if flag & FHCRC:
435n/a self._read_exact(2) # Read & discard the 16-bit header CRC
436n/a return True
437n/a
438n/a def read(self, size=-1):
439n/a if size < 0:
440n/a return self.readall()
441n/a # size=0 is special because decompress(max_length=0) is not supported
442n/a if not size:
443n/a return b""
444n/a
445n/a # For certain input data, a single
446n/a # call to decompress() may not return
447n/a # any data. In this case, retry until we get some data or reach EOF.
448n/a while True:
449n/a if self._decompressor.eof:
450n/a # Ending case: we've come to the end of a member in the file,
451n/a # so finish up this member, and read a new gzip header.
452n/a # Check the CRC and file size, and set the flag so we read
453n/a # a new member
454n/a self._read_eof()
455n/a self._new_member = True
456n/a self._decompressor = self._decomp_factory(
457n/a **self._decomp_args)
458n/a
459n/a if self._new_member:
460n/a # If the _new_member flag is set, we have to
461n/a # jump to the next member, if there is one.
462n/a self._init_read()
463n/a if not self._read_gzip_header():
464n/a self._size = self._pos
465n/a return b""
466n/a self._new_member = False
467n/a
468n/a # Read a chunk of data from the file
469n/a buf = self._fp.read(io.DEFAULT_BUFFER_SIZE)
470n/a
471n/a uncompress = self._decompressor.decompress(buf, size)
472n/a if self._decompressor.unconsumed_tail != b"":
473n/a self._fp.prepend(self._decompressor.unconsumed_tail)
474n/a elif self._decompressor.unused_data != b"":
475n/a # Prepend the already read bytes to the fileobj so they can
476n/a # be seen by _read_eof() and _read_gzip_header()
477n/a self._fp.prepend(self._decompressor.unused_data)
478n/a
479n/a if uncompress != b"":
480n/a break
481n/a if buf == b"":
482n/a raise EOFError("Compressed file ended before the "
483n/a "end-of-stream marker was reached")
484n/a
485n/a self._add_read_data( uncompress )
486n/a self._pos += len(uncompress)
487n/a return uncompress
488n/a
489n/a def _add_read_data(self, data):
490n/a self._crc = zlib.crc32(data, self._crc)
491n/a self._stream_size = self._stream_size + len(data)
492n/a
493n/a def _read_eof(self):
494n/a # We've read to the end of the file
495n/a # We check the that the computed CRC and size of the
496n/a # uncompressed data matches the stored values. Note that the size
497n/a # stored is the true file size mod 2**32.
498n/a crc32, isize = struct.unpack("<II", self._read_exact(8))
499n/a if crc32 != self._crc:
500n/a raise OSError("CRC check failed %s != %s" % (hex(crc32),
501n/a hex(self._crc)))
502n/a elif isize != (self._stream_size & 0xffffffff):
503n/a raise OSError("Incorrect length of data produced")
504n/a
505n/a # Gzip files can be padded with zeroes and still have archives.
506n/a # Consume all zero bytes and set the file position to the first
507n/a # non-zero byte. See http://www.gzip.org/#faq8
508n/a c = b"\x00"
509n/a while c == b"\x00":
510n/a c = self._fp.read(1)
511n/a if c:
512n/a self._fp.prepend(c)
513n/a
514n/a def _rewind(self):
515n/a super()._rewind()
516n/a self._new_member = True
517n/a
518n/adef compress(data, compresslevel=9):
519n/a """Compress data in one shot and return the compressed string.
520n/a Optional argument is the compression level, in range of 0-9.
521n/a """
522n/a buf = io.BytesIO()
523n/a with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel) as f:
524n/a f.write(data)
525n/a return buf.getvalue()
526n/a
527n/adef decompress(data):
528n/a """Decompress a gzip compressed string in one shot.
529n/a Return the decompressed string.
530n/a """
531n/a with GzipFile(fileobj=io.BytesIO(data)) as f:
532n/a return f.read()
533n/a
534n/a
535n/adef _test():
536n/a # Act like gzip; with -d, act like gunzip.
537n/a # The input file is not deleted, however, nor are any other gzip
538n/a # options or features supported.
539n/a args = sys.argv[1:]
540n/a decompress = args and args[0] == "-d"
541n/a if decompress:
542n/a args = args[1:]
543n/a if not args:
544n/a args = ["-"]
545n/a for arg in args:
546n/a if decompress:
547n/a if arg == "-":
548n/a f = GzipFile(filename="", mode="rb", fileobj=sys.stdin.buffer)
549n/a g = sys.stdout.buffer
550n/a else:
551n/a if arg[-3:] != ".gz":
552n/a print("filename doesn't end in .gz:", repr(arg))
553n/a continue
554n/a f = open(arg, "rb")
555n/a g = builtins.open(arg[:-3], "wb")
556n/a else:
557n/a if arg == "-":
558n/a f = sys.stdin.buffer
559n/a g = GzipFile(filename="", mode="wb", fileobj=sys.stdout.buffer)
560n/a else:
561n/a f = builtins.open(arg, "rb")
562n/a g = open(arg + ".gz", "wb")
563n/a while True:
564n/a chunk = f.read(1024)
565n/a if not chunk:
566n/a break
567n/a g.write(chunk)
568n/a if g is not sys.stdout.buffer:
569n/a g.close()
570n/a if f is not sys.stdin.buffer:
571n/a f.close()
572n/a
573n/aif __name__ == '__main__':
574n/a _test()