ยปCore Development>Code coverage>Lib/mimetools.py

Python code coverage for Lib/mimetools.py

#countcontent
11"""Various tools used by MIME-reading or MIME-writing programs."""
2n/a
3n/a
41import os
51import sys
61import tempfile
71from warnings import filterwarnings, catch_warnings
81with catch_warnings():
91 if sys.py3kwarning:
100 filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
111 import rfc822
12n/a
131from warnings import warnpy3k
141warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
151 stacklevel=2)
16n/a
171__all__ = ["Message","choose_boundary","encode","decode","copyliteral",
181 "copybinary"]
19n/a
202class Message(rfc822.Message):
21n/a """A derived class of rfc822.Message that knows about MIME headers and
221 contains some hooks for decoding encoded and multipart messages."""
23n/a
241 def __init__(self, fp, seekable = 1):
25437 rfc822.Message.__init__(self, fp, seekable)
26n/a self.encodingheader = \
27437 self.getheader('content-transfer-encoding')
28n/a self.typeheader = \
29437 self.getheader('content-type')
30437 self.parsetype()
31437 self.parseplist()
32n/a
331 def parsetype(self):
34437 str = self.typeheader
35437 if str is None:
36252 str = 'text/plain'
37437 if ';' in str:
3816 i = str.index(';')
3916 self.plisttext = str[i:]
4016 str = str[:i]
41n/a else:
42421 self.plisttext = ''
43437 fields = str.split('/')
441311 for i in range(len(fields)):
45874 fields[i] = fields[i].strip().lower()
46437 self.type = '/'.join(fields)
47437 self.maintype = fields[0]
48437 self.subtype = '/'.join(fields[1:])
49n/a
501 def parseplist(self):
51437 str = self.plisttext
52437 self.plist = []
53456 while str[:1] == ';':
5419 str = str[1:]
5519 if ';' in str:
56n/a # XXX Should parse quotes!
573 end = str.index(';')
58n/a else:
5916 end = len(str)
6019 f = str[:end]
6119 if '=' in f:
6219 i = f.index('=')
63n/a f = f[:i].strip().lower() + \
6419 '=' + f[i+1:].strip()
6519 self.plist.append(f.strip())
6619 str = str[end:]
67n/a
681 def getplist(self):
691 return self.plist
70n/a
711 def getparam(self, name):
725 name = name.lower() + '='
735 n = len(name)
748 for p in self.plist:
757 if p[:n] == name:
764 return rfc822.unquote(p[n:])
771 return None
78n/a
791 def getparamnames(self):
801 result = []
813 for p in self.plist:
822 i = p.find('=')
832 if i >= 0:
842 result.append(p[:i].lower())
851 return result
86n/a
871 def getencoding(self):
883 if self.encodingheader is None:
892 return '7bit'
901 return self.encodingheader.lower()
91n/a
921 def gettype(self):
931 return self.type
94n/a
951 def getmaintype(self):
966 return self.maintype
97n/a
981 def getsubtype(self):
993 return self.subtype
100n/a
101n/a
102n/a
103n/a
104n/a# Utility functions
105n/a# -----------------
106n/a
1071try:
1081 import thread
1090except ImportError:
1100 import dummy_thread as thread
1111_counter_lock = thread.allocate_lock()
1121del thread
113n/a
1141_counter = 0
1151def _get_next_counter():
116n/a global _counter
117100 _counter_lock.acquire()
118100 _counter += 1
119100 result = _counter
120100 _counter_lock.release()
121100 return result
122n/a
1231_prefix = None
124n/a
1251def choose_boundary():
126n/a """Return a string usable as a multipart boundary.
127n/a
128n/a The string chosen is unique within a single program run, and
129n/a incorporates the user id (if available), process id (if available),
130n/a and current time. So it's very unlikely the returned string appears
131n/a in message text, but there's no guarantee.
132n/a
133n/a The boundary contains dots so you have to quote it in the header."""
134n/a
135n/a global _prefix
136100 import time
137100 if _prefix is None:
1381 import socket
1391 try:
1401 hostid = socket.gethostbyname(socket.gethostname())
1410 except socket.gaierror:
1420 hostid = '127.0.0.1'
1431 try:
1441 uid = repr(os.getuid())
1450 except AttributeError:
1460 uid = '1'
1471 try:
1481 pid = repr(os.getpid())
1490 except AttributeError:
1500 pid = '1'
1511 _prefix = hostid + '.' + uid + '.' + pid
152100 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
153n/a
154n/a
155n/a# Subroutines for decoding some common content-transfer-types
156n/a
1571def decode(input, output, encoding):
158n/a """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
1598 if encoding == 'base64':
1601 import base64
1611 return base64.decode(input, output)
1627 if encoding == 'quoted-printable':
1631 import quopri
1641 return quopri.decode(input, output)
1656 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
1664 import uu
1674 return uu.decode(input, output)
1682 if encoding in ('7bit', '8bit'):
1692 return output.write(input.read())
1700 if encoding in decodetab:
1710 pipethrough(input, decodetab[encoding], output)
172n/a else:
1730 raise ValueError, \
1740 'unknown Content-Transfer-Encoding: %s' % encoding
175n/a
1761def encode(input, output, encoding):
177n/a """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
1788 if encoding == 'base64':
1791 import base64
1801 return base64.encode(input, output)
1817 if encoding == 'quoted-printable':
1821 import quopri
1831 return quopri.encode(input, output, 0)
1846 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
1854 import uu
1864 return uu.encode(input, output)
1872 if encoding in ('7bit', '8bit'):
1882 return output.write(input.read())
1890 if encoding in encodetab:
1900 pipethrough(input, encodetab[encoding], output)
191n/a else:
1920 raise ValueError, \
1930 'unknown Content-Transfer-Encoding: %s' % encoding
194n/a
195n/a# The following is no longer used for standard encodings
196n/a
197n/a# XXX This requires that uudecode and mmencode are in $PATH
198n/a
199n/auudecode_pipe = '''(
200n/aTEMP=/tmp/@uu.$$
201n/ased "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
202n/acat $TEMP
203n/arm $TEMP
2041)'''
205n/a
2061decodetab = {
2071 'uuencode': uudecode_pipe,
2081 'x-uuencode': uudecode_pipe,
2091 'uue': uudecode_pipe,
2101 'x-uue': uudecode_pipe,
2111 'quoted-printable': 'mmencode -u -q',
2121 'base64': 'mmencode -u -b',
213n/a}
214n/a
2151encodetab = {
2161 'x-uuencode': 'uuencode tempfile',
2171 'uuencode': 'uuencode tempfile',
2181 'x-uue': 'uuencode tempfile',
2191 'uue': 'uuencode tempfile',
2201 'quoted-printable': 'mmencode -q',
2211 'base64': 'mmencode -b',
222n/a}
223n/a
2241def pipeto(input, command):
2250 pipe = os.popen(command, 'w')
2260 copyliteral(input, pipe)
2270 pipe.close()
228n/a
2291def pipethrough(input, command, output):
2300 (fd, tempname) = tempfile.mkstemp()
2310 temp = os.fdopen(fd, 'w')
2320 copyliteral(input, temp)
2330 temp.close()
2340 pipe = os.popen(command + ' <' + tempname, 'r')
2350 copybinary(pipe, output)
2360 pipe.close()
2370 os.unlink(tempname)
238n/a
2391def copyliteral(input, output):
2400 while 1:
2410 line = input.readline()
2420 if not line: break
2430 output.write(line)
244n/a
2451def copybinary(input, output):
2460 BUFSIZE = 8192
2470 while 1:
2480 line = input.read(BUFSIZE)
2490 if not line: break
2500 output.write(line)