ยปCore Development>Code coverage>Lib/email/utils.py

Python code coverage for Lib/email/utils.py

#countcontent
1n/a# Copyright (C) 2001-2010 Python Software Foundation
2n/a# Author: Barry Warsaw
3n/a# Contact: email-sig@python.org
4n/a
5n/a"""Miscellaneous utilities."""
6n/a
7n/a__all__ = [
8n/a 'collapse_rfc2231_value',
9n/a 'decode_params',
10n/a 'decode_rfc2231',
11n/a 'encode_rfc2231',
12n/a 'formataddr',
13n/a 'formatdate',
14n/a 'format_datetime',
15n/a 'getaddresses',
16n/a 'make_msgid',
17n/a 'mktime_tz',
18n/a 'parseaddr',
19n/a 'parsedate',
20n/a 'parsedate_tz',
21n/a 'parsedate_to_datetime',
22n/a 'unquote',
23n/a ]
24n/a
25n/aimport os
26n/aimport re
27n/aimport time
28n/aimport random
29n/aimport socket
30n/aimport datetime
31n/aimport urllib.parse
32n/a
33n/afrom email._parseaddr import quote
34n/afrom email._parseaddr import AddressList as _AddressList
35n/afrom email._parseaddr import mktime_tz
36n/a
37n/afrom email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
38n/a
39n/a# Intrapackage imports
40n/afrom email.charset import Charset
41n/a
42n/aCOMMASPACE = ', '
43n/aEMPTYSTRING = ''
44n/aUEMPTYSTRING = ''
45n/aCRLF = '\r\n'
46n/aTICK = "'"
47n/a
48n/aspecialsre = re.compile(r'[][\\()<>@,:;".]')
49n/aescapesre = re.compile(r'[\\"]')
50n/a
51n/adef _has_surrogates(s):
52n/a """Return True if s contains surrogate-escaped binary data."""
53n/a # This check is based on the fact that unless there are surrogates, utf8
54n/a # (Python's default encoding) can encode any string. This is the fastest
55n/a # way to check for surrogates, see issue 11454 for timings.
56n/a try:
57n/a s.encode()
58n/a return False
59n/a except UnicodeEncodeError:
60n/a return True
61n/a
62n/a# How to deal with a string containing bytes before handing it to the
63n/a# application through the 'normal' interface.
64n/adef _sanitize(string):
65n/a # Turn any escaped bytes into unicode 'unknown' char. If the escaped
66n/a # bytes happen to be utf-8 they will instead get decoded, even if they
67n/a # were invalid in the charset the source was supposed to be in. This
68n/a # seems like it is not a bad thing; a defect was still registered.
69n/a original_bytes = string.encode('utf-8', 'surrogateescape')
70n/a return original_bytes.decode('utf-8', 'replace')
71n/a
72n/a
73n/a
74n/a# Helpers
75n/a
76n/adef formataddr(pair, charset='utf-8'):
77n/a """The inverse of parseaddr(), this takes a 2-tuple of the form
78n/a (realname, email_address) and returns the string value suitable
79n/a for an RFC 2822 From, To or Cc header.
80n/a
81n/a If the first element of pair is false, then the second element is
82n/a returned unmodified.
83n/a
84n/a Optional charset if given is the character set that is used to encode
85n/a realname in case realname is not ASCII safe. Can be an instance of str or
86n/a a Charset-like object which has a header_encode method. Default is
87n/a 'utf-8'.
88n/a """
89n/a name, address = pair
90n/a # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
91n/a address.encode('ascii')
92n/a if name:
93n/a try:
94n/a name.encode('ascii')
95n/a except UnicodeEncodeError:
96n/a if isinstance(charset, str):
97n/a charset = Charset(charset)
98n/a encoded_name = charset.header_encode(name)
99n/a return "%s <%s>" % (encoded_name, address)
100n/a else:
101n/a quotes = ''
102n/a if specialsre.search(name):
103n/a quotes = '"'
104n/a name = escapesre.sub(r'\\\g<0>', name)
105n/a return '%s%s%s <%s>' % (quotes, name, quotes, address)
106n/a return address
107n/a
108n/a
109n/a
110n/adef getaddresses(fieldvalues):
111n/a """Return a list of (REALNAME, EMAIL) for each fieldvalue."""
112n/a all = COMMASPACE.join(fieldvalues)
113n/a a = _AddressList(all)
114n/a return a.addresslist
115n/a
116n/a
117n/a
118n/aecre = re.compile(r'''
119n/a =\? # literal =?
120n/a (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
121n/a \? # literal ?
122n/a (?P<encoding>[qb]) # either a "q" or a "b", case insensitive
123n/a \? # literal ?
124n/a (?P<atom>.*?) # non-greedy up to the next ?= is the atom
125n/a \?= # literal ?=
126n/a ''', re.VERBOSE | re.IGNORECASE)
127n/a
128n/a
129n/adef _format_timetuple_and_zone(timetuple, zone):
130n/a return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
131n/a ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
132n/a timetuple[2],
133n/a ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
134n/a 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
135n/a timetuple[0], timetuple[3], timetuple[4], timetuple[5],
136n/a zone)
137n/a
138n/adef formatdate(timeval=None, localtime=False, usegmt=False):
139n/a """Returns a date string as specified by RFC 2822, e.g.:
140n/a
141n/a Fri, 09 Nov 2001 01:08:47 -0000
142n/a
143n/a Optional timeval if given is a floating point time value as accepted by
144n/a gmtime() and localtime(), otherwise the current time is used.
145n/a
146n/a Optional localtime is a flag that when True, interprets timeval, and
147n/a returns a date relative to the local timezone instead of UTC, properly
148n/a taking daylight savings time into account.
149n/a
150n/a Optional argument usegmt means that the timezone is written out as
151n/a an ascii string, not numeric one (so "GMT" instead of "+0000"). This
152n/a is needed for HTTP, and is only used when localtime==False.
153n/a """
154n/a # Note: we cannot use strftime() because that honors the locale and RFC
155n/a # 2822 requires that day and month names be the English abbreviations.
156n/a if timeval is None:
157n/a timeval = time.time()
158n/a if localtime or usegmt:
159n/a dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
160n/a else:
161n/a dt = datetime.datetime.utcfromtimestamp(timeval)
162n/a if localtime:
163n/a dt = dt.astimezone()
164n/a usegmt = False
165n/a return format_datetime(dt, usegmt)
166n/a
167n/adef format_datetime(dt, usegmt=False):
168n/a """Turn a datetime into a date string as specified in RFC 2822.
169n/a
170n/a If usegmt is True, dt must be an aware datetime with an offset of zero. In
171n/a this case 'GMT' will be rendered instead of the normal +0000 required by
172n/a RFC2822. This is to support HTTP headers involving date stamps.
173n/a """
174n/a now = dt.timetuple()
175n/a if usegmt:
176n/a if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
177n/a raise ValueError("usegmt option requires a UTC datetime")
178n/a zone = 'GMT'
179n/a elif dt.tzinfo is None:
180n/a zone = '-0000'
181n/a else:
182n/a zone = dt.strftime("%z")
183n/a return _format_timetuple_and_zone(now, zone)
184n/a
185n/a
186n/adef make_msgid(idstring=None, domain=None):
187n/a """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
188n/a
189n/a <142480216486.20800.16526388040877946887@nightshade.la.mastaler.com>
190n/a
191n/a Optional idstring if given is a string used to strengthen the
192n/a uniqueness of the message id. Optional domain if given provides the
193n/a portion of the message id after the '@'. It defaults to the locally
194n/a defined hostname.
195n/a """
196n/a timeval = int(time.time()*100)
197n/a pid = os.getpid()
198n/a randint = random.getrandbits(64)
199n/a if idstring is None:
200n/a idstring = ''
201n/a else:
202n/a idstring = '.' + idstring
203n/a if domain is None:
204n/a domain = socket.getfqdn()
205n/a msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
206n/a return msgid
207n/a
208n/a
209n/adef parsedate_to_datetime(data):
210n/a *dtuple, tz = _parsedate_tz(data)
211n/a if tz is None:
212n/a return datetime.datetime(*dtuple[:6])
213n/a return datetime.datetime(*dtuple[:6],
214n/a tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
215n/a
216n/a
217n/adef parseaddr(addr):
218n/a addrs = _AddressList(addr).addresslist
219n/a if not addrs:
220n/a return '', ''
221n/a return addrs[0]
222n/a
223n/a
224n/a# rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
225n/adef unquote(str):
226n/a """Remove quotes from a string."""
227n/a if len(str) > 1:
228n/a if str.startswith('"') and str.endswith('"'):
229n/a return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
230n/a if str.startswith('<') and str.endswith('>'):
231n/a return str[1:-1]
232n/a return str
233n/a
234n/a
235n/a
236n/a# RFC2231-related functions - parameter encoding and decoding
237n/adef decode_rfc2231(s):
238n/a """Decode string according to RFC 2231"""
239n/a parts = s.split(TICK, 2)
240n/a if len(parts) <= 2:
241n/a return None, None, s
242n/a return parts
243n/a
244n/a
245n/adef encode_rfc2231(s, charset=None, language=None):
246n/a """Encode string according to RFC 2231.
247n/a
248n/a If neither charset nor language is given, then s is returned as-is. If
249n/a charset is given but not language, the string is encoded using the empty
250n/a string for language.
251n/a """
252n/a s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
253n/a if charset is None and language is None:
254n/a return s
255n/a if language is None:
256n/a language = ''
257n/a return "%s'%s'%s" % (charset, language, s)
258n/a
259n/a
260n/arfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
261n/a re.ASCII)
262n/a
263n/adef decode_params(params):
264n/a """Decode parameters list according to RFC 2231.
265n/a
266n/a params is a sequence of 2-tuples containing (param name, string value).
267n/a """
268n/a # Copy params so we don't mess with the original
269n/a params = params[:]
270n/a new_params = []
271n/a # Map parameter's name to a list of continuations. The values are a
272n/a # 3-tuple of the continuation number, the string value, and a flag
273n/a # specifying whether a particular segment is %-encoded.
274n/a rfc2231_params = {}
275n/a name, value = params.pop(0)
276n/a new_params.append((name, value))
277n/a while params:
278n/a name, value = params.pop(0)
279n/a if name.endswith('*'):
280n/a encoded = True
281n/a else:
282n/a encoded = False
283n/a value = unquote(value)
284n/a mo = rfc2231_continuation.match(name)
285n/a if mo:
286n/a name, num = mo.group('name', 'num')
287n/a if num is not None:
288n/a num = int(num)
289n/a rfc2231_params.setdefault(name, []).append((num, value, encoded))
290n/a else:
291n/a new_params.append((name, '"%s"' % quote(value)))
292n/a if rfc2231_params:
293n/a for name, continuations in rfc2231_params.items():
294n/a value = []
295n/a extended = False
296n/a # Sort by number
297n/a continuations.sort()
298n/a # And now append all values in numerical order, converting
299n/a # %-encodings for the encoded segments. If any of the
300n/a # continuation names ends in a *, then the entire string, after
301n/a # decoding segments and concatenating, must have the charset and
302n/a # language specifiers at the beginning of the string.
303n/a for num, s, encoded in continuations:
304n/a if encoded:
305n/a # Decode as "latin-1", so the characters in s directly
306n/a # represent the percent-encoded octet values.
307n/a # collapse_rfc2231_value treats this as an octet sequence.
308n/a s = urllib.parse.unquote(s, encoding="latin-1")
309n/a extended = True
310n/a value.append(s)
311n/a value = quote(EMPTYSTRING.join(value))
312n/a if extended:
313n/a charset, language, value = decode_rfc2231(value)
314n/a new_params.append((name, (charset, language, '"%s"' % value)))
315n/a else:
316n/a new_params.append((name, '"%s"' % value))
317n/a return new_params
318n/a
319n/adef collapse_rfc2231_value(value, errors='replace',
320n/a fallback_charset='us-ascii'):
321n/a if not isinstance(value, tuple) or len(value) != 3:
322n/a return unquote(value)
323n/a # While value comes to us as a unicode string, we need it to be a bytes
324n/a # object. We do not want bytes() normal utf-8 decoder, we want a straight
325n/a # interpretation of the string as character bytes.
326n/a charset, language, text = value
327n/a if charset is None:
328n/a # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
329n/a # the value, so use the fallback_charset.
330n/a charset = fallback_charset
331n/a rawbytes = bytes(text, 'raw-unicode-escape')
332n/a try:
333n/a return str(rawbytes, charset, errors)
334n/a except LookupError:
335n/a # charset is not a known codec.
336n/a return unquote(text)
337n/a
338n/a
339n/a#
340n/a# datetime doesn't provide a localtime function yet, so provide one. Code
341n/a# adapted from the patch in issue 9527. This may not be perfect, but it is
342n/a# better than not having it.
343n/a#
344n/a
345n/adef localtime(dt=None, isdst=-1):
346n/a """Return local time as an aware datetime object.
347n/a
348n/a If called without arguments, return current time. Otherwise *dt*
349n/a argument should be a datetime instance, and it is converted to the
350n/a local time zone according to the system time zone database. If *dt* is
351n/a naive (that is, dt.tzinfo is None), it is assumed to be in local time.
352n/a In this case, a positive or zero value for *isdst* causes localtime to
353n/a presume initially that summer time (for example, Daylight Saving Time)
354n/a is or is not (respectively) in effect for the specified time. A
355n/a negative value for *isdst* causes the localtime() function to attempt
356n/a to divine whether summer time is in effect for the specified time.
357n/a
358n/a """
359n/a if dt is None:
360n/a return datetime.datetime.now(datetime.timezone.utc).astimezone()
361n/a if dt.tzinfo is not None:
362n/a return dt.astimezone()
363n/a # We have a naive datetime. Convert to a (localtime) timetuple and pass to
364n/a # system mktime together with the isdst hint. System mktime will return
365n/a # seconds since epoch.
366n/a tm = dt.timetuple()[:-1] + (isdst,)
367n/a seconds = time.mktime(tm)
368n/a localtm = time.localtime(seconds)
369n/a try:
370n/a delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
371n/a tz = datetime.timezone(delta, localtm.tm_zone)
372n/a except AttributeError:
373n/a # Compute UTC offset and compare with the value implied by tm_isdst.
374n/a # If the values match, use the zone name implied by tm_isdst.
375n/a delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
376n/a dst = time.daylight and localtm.tm_isdst > 0
377n/a gmtoff = -(time.altzone if dst else time.timezone)
378n/a if delta == datetime.timedelta(seconds=gmtoff):
379n/a tz = datetime.timezone(delta, time.tzname[dst])
380n/a else:
381n/a tz = datetime.timezone(delta)
382n/a return dt.replace(tzinfo=tz)