ยปCore Development>Code coverage>Lib/email/header.py

Python code coverage for Lib/email/header.py

#countcontent
1n/a# Copyright (C) 2002-2007 Python Software Foundation
2n/a# Author: Ben Gertzfield, Barry Warsaw
3n/a# Contact: email-sig@python.org
4n/a
5n/a"""Header encoding and decoding functionality."""
6n/a
7n/a__all__ = [
8n/a 'Header',
9n/a 'decode_header',
10n/a 'make_header',
11n/a ]
12n/a
13n/aimport re
14n/aimport binascii
15n/a
16n/aimport email.quoprimime
17n/aimport email.base64mime
18n/a
19n/afrom email.errors import HeaderParseError
20n/afrom email import charset as _charset
21n/aCharset = _charset.Charset
22n/a
23n/aNL = '\n'
24n/aSPACE = ' '
25n/aBSPACE = b' '
26n/aSPACE8 = ' ' * 8
27n/aEMPTYSTRING = ''
28n/aMAXLINELEN = 78
29n/aFWS = ' \t'
30n/a
31n/aUSASCII = Charset('us-ascii')
32n/aUTF8 = Charset('utf-8')
33n/a
34n/a# Match encoded-word strings in the form =?charset?q?Hello_World?=
35n/aecre = re.compile(r'''
36n/a =\? # literal =?
37n/a (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
38n/a \? # literal ?
39n/a (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
40n/a \? # literal ?
41n/a (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
42n/a \?= # literal ?=
43n/a ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
44n/a
45n/a# Field name regexp, including trailing colon, but not separating whitespace,
46n/a# according to RFC 2822. Character range is from tilde to exclamation mark.
47n/a# For use with .match()
48n/afcre = re.compile(r'[\041-\176]+:$')
49n/a
50n/a# Find a header embedded in a putative header value. Used to check for
51n/a# header injection attack.
52n/a_embedded_header = re.compile(r'\n[^ \t]+:')
53n/a
54n/a
55n/a
56n/a# Helpers
57n/a_max_append = email.quoprimime._max_append
58n/a
59n/a
60n/a
61n/adef decode_header(header):
62n/a """Decode a message header value without converting charset.
63n/a
64n/a Returns a list of (string, charset) pairs containing each of the decoded
65n/a parts of the header. Charset is None for non-encoded parts of the header,
66n/a otherwise a lower-case string containing the name of the character set
67n/a specified in the encoded string.
68n/a
69n/a header may be a string that may or may not contain RFC2047 encoded words,
70n/a or it may be a Header object.
71n/a
72n/a An email.errors.HeaderParseError may be raised when certain decoding error
73n/a occurs (e.g. a base64 decoding exception).
74n/a """
75n/a # If it is a Header object, we can just return the encoded chunks.
76n/a if hasattr(header, '_chunks'):
77n/a return [(_charset._encode(string, str(charset)), str(charset))
78n/a for string, charset in header._chunks]
79n/a # If no encoding, just return the header with no charset.
80n/a if not ecre.search(header):
81n/a return [(header, None)]
82n/a # First step is to parse all the encoded parts into triplets of the form
83n/a # (encoded_string, encoding, charset). For unencoded strings, the last
84n/a # two parts will be None.
85n/a words = []
86n/a for line in header.splitlines():
87n/a parts = ecre.split(line)
88n/a first = True
89n/a while parts:
90n/a unencoded = parts.pop(0)
91n/a if first:
92n/a unencoded = unencoded.lstrip()
93n/a first = False
94n/a if unencoded:
95n/a words.append((unencoded, None, None))
96n/a if parts:
97n/a charset = parts.pop(0).lower()
98n/a encoding = parts.pop(0).lower()
99n/a encoded = parts.pop(0)
100n/a words.append((encoded, encoding, charset))
101n/a # Now loop over words and remove words that consist of whitespace
102n/a # between two encoded strings.
103n/a droplist = []
104n/a for n, w in enumerate(words):
105n/a if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
106n/a droplist.append(n-1)
107n/a for d in reversed(droplist):
108n/a del words[d]
109n/a
110n/a # The next step is to decode each encoded word by applying the reverse
111n/a # base64 or quopri transformation. decoded_words is now a list of the
112n/a # form (decoded_word, charset).
113n/a decoded_words = []
114n/a for encoded_string, encoding, charset in words:
115n/a if encoding is None:
116n/a # This is an unencoded word.
117n/a decoded_words.append((encoded_string, charset))
118n/a elif encoding == 'q':
119n/a word = email.quoprimime.header_decode(encoded_string)
120n/a decoded_words.append((word, charset))
121n/a elif encoding == 'b':
122n/a paderr = len(encoded_string) % 4 # Postel's law: add missing padding
123n/a if paderr:
124n/a encoded_string += '==='[:4 - paderr]
125n/a try:
126n/a word = email.base64mime.decode(encoded_string)
127n/a except binascii.Error:
128n/a raise HeaderParseError('Base64 decoding error')
129n/a else:
130n/a decoded_words.append((word, charset))
131n/a else:
132n/a raise AssertionError('Unexpected encoding: ' + encoding)
133n/a # Now convert all words to bytes and collapse consecutive runs of
134n/a # similarly encoded words.
135n/a collapsed = []
136n/a last_word = last_charset = None
137n/a for word, charset in decoded_words:
138n/a if isinstance(word, str):
139n/a word = bytes(word, 'raw-unicode-escape')
140n/a if last_word is None:
141n/a last_word = word
142n/a last_charset = charset
143n/a elif charset != last_charset:
144n/a collapsed.append((last_word, last_charset))
145n/a last_word = word
146n/a last_charset = charset
147n/a elif last_charset is None:
148n/a last_word += BSPACE + word
149n/a else:
150n/a last_word += word
151n/a collapsed.append((last_word, last_charset))
152n/a return collapsed
153n/a
154n/a
155n/a
156n/adef make_header(decoded_seq, maxlinelen=None, header_name=None,
157n/a continuation_ws=' '):
158n/a """Create a Header from a sequence of pairs as returned by decode_header()
159n/a
160n/a decode_header() takes a header value string and returns a sequence of
161n/a pairs of the format (decoded_string, charset) where charset is the string
162n/a name of the character set.
163n/a
164n/a This function takes one of those sequence of pairs and returns a Header
165n/a instance. Optional maxlinelen, header_name, and continuation_ws are as in
166n/a the Header constructor.
167n/a """
168n/a h = Header(maxlinelen=maxlinelen, header_name=header_name,
169n/a continuation_ws=continuation_ws)
170n/a for s, charset in decoded_seq:
171n/a # None means us-ascii but we can simply pass it on to h.append()
172n/a if charset is not None and not isinstance(charset, Charset):
173n/a charset = Charset(charset)
174n/a h.append(s, charset)
175n/a return h
176n/a
177n/a
178n/a
179n/aclass Header:
180n/a def __init__(self, s=None, charset=None,
181n/a maxlinelen=None, header_name=None,
182n/a continuation_ws=' ', errors='strict'):
183n/a """Create a MIME-compliant header that can contain many character sets.
184n/a
185n/a Optional s is the initial header value. If None, the initial header
186n/a value is not set. You can later append to the header with .append()
187n/a method calls. s may be a byte string or a Unicode string, but see the
188n/a .append() documentation for semantics.
189n/a
190n/a Optional charset serves two purposes: it has the same meaning as the
191n/a charset argument to the .append() method. It also sets the default
192n/a character set for all subsequent .append() calls that omit the charset
193n/a argument. If charset is not provided in the constructor, the us-ascii
194n/a charset is used both as s's initial charset and as the default for
195n/a subsequent .append() calls.
196n/a
197n/a The maximum line length can be specified explicitly via maxlinelen. For
198n/a splitting the first line to a shorter value (to account for the field
199n/a header which isn't included in s, e.g. `Subject') pass in the name of
200n/a the field in header_name. The default maxlinelen is 78 as recommended
201n/a by RFC 2822.
202n/a
203n/a continuation_ws must be RFC 2822 compliant folding whitespace (usually
204n/a either a space or a hard tab) which will be prepended to continuation
205n/a lines.
206n/a
207n/a errors is passed through to the .append() call.
208n/a """
209n/a if charset is None:
210n/a charset = USASCII
211n/a elif not isinstance(charset, Charset):
212n/a charset = Charset(charset)
213n/a self._charset = charset
214n/a self._continuation_ws = continuation_ws
215n/a self._chunks = []
216n/a if s is not None:
217n/a self.append(s, charset, errors)
218n/a if maxlinelen is None:
219n/a maxlinelen = MAXLINELEN
220n/a self._maxlinelen = maxlinelen
221n/a if header_name is None:
222n/a self._headerlen = 0
223n/a else:
224n/a # Take the separating colon and space into account.
225n/a self._headerlen = len(header_name) + 2
226n/a
227n/a def __str__(self):
228n/a """Return the string value of the header."""
229n/a self._normalize()
230n/a uchunks = []
231n/a lastcs = None
232n/a lastspace = None
233n/a for string, charset in self._chunks:
234n/a # We must preserve spaces between encoded and non-encoded word
235n/a # boundaries, which means for us we need to add a space when we go
236n/a # from a charset to None/us-ascii, or from None/us-ascii to a
237n/a # charset. Only do this for the second and subsequent chunks.
238n/a # Don't add a space if the None/us-ascii string already has
239n/a # a space (trailing or leading depending on transition)
240n/a nextcs = charset
241n/a if nextcs == _charset.UNKNOWN8BIT:
242n/a original_bytes = string.encode('ascii', 'surrogateescape')
243n/a string = original_bytes.decode('ascii', 'replace')
244n/a if uchunks:
245n/a hasspace = string and self._nonctext(string[0])
246n/a if lastcs not in (None, 'us-ascii'):
247n/a if nextcs in (None, 'us-ascii') and not hasspace:
248n/a uchunks.append(SPACE)
249n/a nextcs = None
250n/a elif nextcs not in (None, 'us-ascii') and not lastspace:
251n/a uchunks.append(SPACE)
252n/a lastspace = string and self._nonctext(string[-1])
253n/a lastcs = nextcs
254n/a uchunks.append(string)
255n/a return EMPTYSTRING.join(uchunks)
256n/a
257n/a # Rich comparison operators for equality only. BAW: does it make sense to
258n/a # have or explicitly disable <, <=, >, >= operators?
259n/a def __eq__(self, other):
260n/a # other may be a Header or a string. Both are fine so coerce
261n/a # ourselves to a unicode (of the unencoded header value), swap the
262n/a # args and do another comparison.
263n/a return other == str(self)
264n/a
265n/a def append(self, s, charset=None, errors='strict'):
266n/a """Append a string to the MIME header.
267n/a
268n/a Optional charset, if given, should be a Charset instance or the name
269n/a of a character set (which will be converted to a Charset instance). A
270n/a value of None (the default) means that the charset given in the
271n/a constructor is used.
272n/a
273n/a s may be a byte string or a Unicode string. If it is a byte string
274n/a (i.e. isinstance(s, str) is false), then charset is the encoding of
275n/a that byte string, and a UnicodeError will be raised if the string
276n/a cannot be decoded with that charset. If s is a Unicode string, then
277n/a charset is a hint specifying the character set of the characters in
278n/a the string. In either case, when producing an RFC 2822 compliant
279n/a header using RFC 2047 rules, the string will be encoded using the
280n/a output codec of the charset. If the string cannot be encoded to the
281n/a output codec, a UnicodeError will be raised.
282n/a
283n/a Optional `errors' is passed as the errors argument to the decode
284n/a call if s is a byte string.
285n/a """
286n/a if charset is None:
287n/a charset = self._charset
288n/a elif not isinstance(charset, Charset):
289n/a charset = Charset(charset)
290n/a if not isinstance(s, str):
291n/a input_charset = charset.input_codec or 'us-ascii'
292n/a if input_charset == _charset.UNKNOWN8BIT:
293n/a s = s.decode('us-ascii', 'surrogateescape')
294n/a else:
295n/a s = s.decode(input_charset, errors)
296n/a # Ensure that the bytes we're storing can be decoded to the output
297n/a # character set, otherwise an early error is raised.
298n/a output_charset = charset.output_codec or 'us-ascii'
299n/a if output_charset != _charset.UNKNOWN8BIT:
300n/a try:
301n/a s.encode(output_charset, errors)
302n/a except UnicodeEncodeError:
303n/a if output_charset!='us-ascii':
304n/a raise
305n/a charset = UTF8
306n/a self._chunks.append((s, charset))
307n/a
308n/a def _nonctext(self, s):
309n/a """True if string s is not a ctext character of RFC822.
310n/a """
311n/a return s.isspace() or s in ('(', ')', '\\')
312n/a
313n/a def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
314n/a r"""Encode a message header into an RFC-compliant format.
315n/a
316n/a There are many issues involved in converting a given string for use in
317n/a an email header. Only certain character sets are readable in most
318n/a email clients, and as header strings can only contain a subset of
319n/a 7-bit ASCII, care must be taken to properly convert and encode (with
320n/a Base64 or quoted-printable) header strings. In addition, there is a
321n/a 75-character length limit on any given encoded header field, so
322n/a line-wrapping must be performed, even with double-byte character sets.
323n/a
324n/a Optional maxlinelen specifies the maximum length of each generated
325n/a line, exclusive of the linesep string. Individual lines may be longer
326n/a than maxlinelen if a folding point cannot be found. The first line
327n/a will be shorter by the length of the header name plus ": " if a header
328n/a name was specified at Header construction time. The default value for
329n/a maxlinelen is determined at header construction time.
330n/a
331n/a Optional splitchars is a string containing characters which should be
332n/a given extra weight by the splitting algorithm during normal header
333n/a wrapping. This is in very rough support of RFC 2822's `higher level
334n/a syntactic breaks': split points preceded by a splitchar are preferred
335n/a during line splitting, with the characters preferred in the order in
336n/a which they appear in the string. Space and tab may be included in the
337n/a string to indicate whether preference should be given to one over the
338n/a other as a split point when other split chars do not appear in the line
339n/a being split. Splitchars does not affect RFC 2047 encoded lines.
340n/a
341n/a Optional linesep is a string to be used to separate the lines of
342n/a the value. The default value is the most useful for typical
343n/a Python applications, but it can be set to \r\n to produce RFC-compliant
344n/a line separators when needed.
345n/a """
346n/a self._normalize()
347n/a if maxlinelen is None:
348n/a maxlinelen = self._maxlinelen
349n/a # A maxlinelen of 0 means don't wrap. For all practical purposes,
350n/a # choosing a huge number here accomplishes that and makes the
351n/a # _ValueFormatter algorithm much simpler.
352n/a if maxlinelen == 0:
353n/a maxlinelen = 1000000
354n/a formatter = _ValueFormatter(self._headerlen, maxlinelen,
355n/a self._continuation_ws, splitchars)
356n/a lastcs = None
357n/a hasspace = lastspace = None
358n/a for string, charset in self._chunks:
359n/a if hasspace is not None:
360n/a hasspace = string and self._nonctext(string[0])
361n/a if lastcs not in (None, 'us-ascii'):
362n/a if not hasspace or charset not in (None, 'us-ascii'):
363n/a formatter.add_transition()
364n/a elif charset not in (None, 'us-ascii') and not lastspace:
365n/a formatter.add_transition()
366n/a lastspace = string and self._nonctext(string[-1])
367n/a lastcs = charset
368n/a hasspace = False
369n/a lines = string.splitlines()
370n/a if lines:
371n/a formatter.feed('', lines[0], charset)
372n/a else:
373n/a formatter.feed('', '', charset)
374n/a for line in lines[1:]:
375n/a formatter.newline()
376n/a if charset.header_encoding is not None:
377n/a formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
378n/a charset)
379n/a else:
380n/a sline = line.lstrip()
381n/a fws = line[:len(line)-len(sline)]
382n/a formatter.feed(fws, sline, charset)
383n/a if len(lines) > 1:
384n/a formatter.newline()
385n/a if self._chunks:
386n/a formatter.add_transition()
387n/a value = formatter._str(linesep)
388n/a if _embedded_header.search(value):
389n/a raise HeaderParseError("header value appears to contain "
390n/a "an embedded header: {!r}".format(value))
391n/a return value
392n/a
393n/a def _normalize(self):
394n/a # Step 1: Normalize the chunks so that all runs of identical charsets
395n/a # get collapsed into a single unicode string.
396n/a chunks = []
397n/a last_charset = None
398n/a last_chunk = []
399n/a for string, charset in self._chunks:
400n/a if charset == last_charset:
401n/a last_chunk.append(string)
402n/a else:
403n/a if last_charset is not None:
404n/a chunks.append((SPACE.join(last_chunk), last_charset))
405n/a last_chunk = [string]
406n/a last_charset = charset
407n/a if last_chunk:
408n/a chunks.append((SPACE.join(last_chunk), last_charset))
409n/a self._chunks = chunks
410n/a
411n/a
412n/a
413n/aclass _ValueFormatter:
414n/a def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
415n/a self._maxlen = maxlen
416n/a self._continuation_ws = continuation_ws
417n/a self._continuation_ws_len = len(continuation_ws)
418n/a self._splitchars = splitchars
419n/a self._lines = []
420n/a self._current_line = _Accumulator(headerlen)
421n/a
422n/a def _str(self, linesep):
423n/a self.newline()
424n/a return linesep.join(self._lines)
425n/a
426n/a def __str__(self):
427n/a return self._str(NL)
428n/a
429n/a def newline(self):
430n/a end_of_line = self._current_line.pop()
431n/a if end_of_line != (' ', ''):
432n/a self._current_line.push(*end_of_line)
433n/a if len(self._current_line) > 0:
434n/a if self._current_line.is_onlyws():
435n/a self._lines[-1] += str(self._current_line)
436n/a else:
437n/a self._lines.append(str(self._current_line))
438n/a self._current_line.reset()
439n/a
440n/a def add_transition(self):
441n/a self._current_line.push(' ', '')
442n/a
443n/a def feed(self, fws, string, charset):
444n/a # If the charset has no header encoding (i.e. it is an ASCII encoding)
445n/a # then we must split the header at the "highest level syntactic break"
446n/a # possible. Note that we don't have a lot of smarts about field
447n/a # syntax; we just try to break on semi-colons, then commas, then
448n/a # whitespace. Eventually, this should be pluggable.
449n/a if charset.header_encoding is None:
450n/a self._ascii_split(fws, string, self._splitchars)
451n/a return
452n/a # Otherwise, we're doing either a Base64 or a quoted-printable
453n/a # encoding which means we don't need to split the line on syntactic
454n/a # breaks. We can basically just find enough characters to fit on the
455n/a # current line, minus the RFC 2047 chrome. What makes this trickier
456n/a # though is that we have to split at octet boundaries, not character
457n/a # boundaries but it's only safe to split at character boundaries so at
458n/a # best we can only get close.
459n/a encoded_lines = charset.header_encode_lines(string, self._maxlengths())
460n/a # The first element extends the current line, but if it's None then
461n/a # nothing more fit on the current line so start a new line.
462n/a try:
463n/a first_line = encoded_lines.pop(0)
464n/a except IndexError:
465n/a # There are no encoded lines, so we're done.
466n/a return
467n/a if first_line is not None:
468n/a self._append_chunk(fws, first_line)
469n/a try:
470n/a last_line = encoded_lines.pop()
471n/a except IndexError:
472n/a # There was only one line.
473n/a return
474n/a self.newline()
475n/a self._current_line.push(self._continuation_ws, last_line)
476n/a # Everything else are full lines in themselves.
477n/a for line in encoded_lines:
478n/a self._lines.append(self._continuation_ws + line)
479n/a
480n/a def _maxlengths(self):
481n/a # The first line's length.
482n/a yield self._maxlen - len(self._current_line)
483n/a while True:
484n/a yield self._maxlen - self._continuation_ws_len
485n/a
486n/a def _ascii_split(self, fws, string, splitchars):
487n/a # The RFC 2822 header folding algorithm is simple in principle but
488n/a # complex in practice. Lines may be folded any place where "folding
489n/a # white space" appears by inserting a linesep character in front of the
490n/a # FWS. The complication is that not all spaces or tabs qualify as FWS,
491n/a # and we are also supposed to prefer to break at "higher level
492n/a # syntactic breaks". We can't do either of these without intimate
493n/a # knowledge of the structure of structured headers, which we don't have
494n/a # here. So the best we can do here is prefer to break at the specified
495n/a # splitchars, and hope that we don't choose any spaces or tabs that
496n/a # aren't legal FWS. (This is at least better than the old algorithm,
497n/a # where we would sometimes *introduce* FWS after a splitchar, or the
498n/a # algorithm before that, where we would turn all white space runs into
499n/a # single spaces or tabs.)
500n/a parts = re.split("(["+FWS+"]+)", fws+string)
501n/a if parts[0]:
502n/a parts[:0] = ['']
503n/a else:
504n/a parts.pop(0)
505n/a for fws, part in zip(*[iter(parts)]*2):
506n/a self._append_chunk(fws, part)
507n/a
508n/a def _append_chunk(self, fws, string):
509n/a self._current_line.push(fws, string)
510n/a if len(self._current_line) > self._maxlen:
511n/a # Find the best split point, working backward from the end.
512n/a # There might be none, on a long first line.
513n/a for ch in self._splitchars:
514n/a for i in range(self._current_line.part_count()-1, 0, -1):
515n/a if ch.isspace():
516n/a fws = self._current_line[i][0]
517n/a if fws and fws[0]==ch:
518n/a break
519n/a prevpart = self._current_line[i-1][1]
520n/a if prevpart and prevpart[-1]==ch:
521n/a break
522n/a else:
523n/a continue
524n/a break
525n/a else:
526n/a fws, part = self._current_line.pop()
527n/a if self._current_line._initial_size > 0:
528n/a # There will be a header, so leave it on a line by itself.
529n/a self.newline()
530n/a if not fws:
531n/a # We don't use continuation_ws here because the whitespace
532n/a # after a header should always be a space.
533n/a fws = ' '
534n/a self._current_line.push(fws, part)
535n/a return
536n/a remainder = self._current_line.pop_from(i)
537n/a self._lines.append(str(self._current_line))
538n/a self._current_line.reset(remainder)
539n/a
540n/a
541n/aclass _Accumulator(list):
542n/a
543n/a def __init__(self, initial_size=0):
544n/a self._initial_size = initial_size
545n/a super().__init__()
546n/a
547n/a def push(self, fws, string):
548n/a self.append((fws, string))
549n/a
550n/a def pop_from(self, i=0):
551n/a popped = self[i:]
552n/a self[i:] = []
553n/a return popped
554n/a
555n/a def pop(self):
556n/a if self.part_count()==0:
557n/a return ('', '')
558n/a return super().pop()
559n/a
560n/a def __len__(self):
561n/a return sum((len(fws)+len(part) for fws, part in self),
562n/a self._initial_size)
563n/a
564n/a def __str__(self):
565n/a return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
566n/a for fws, part in self))
567n/a
568n/a def reset(self, startval=None):
569n/a if startval is None:
570n/a startval = []
571n/a self[:] = startval
572n/a self._initial_size = 0
573n/a
574n/a def is_onlyws(self):
575n/a return self._initial_size==0 and (not self or str(self).isspace())
576n/a
577n/a def part_count(self):
578n/a return super().__len__()