ยปCore Development>Code coverage>Lib/email/headerregistry.py

Python code coverage for Lib/email/headerregistry.py

#countcontent
1n/a"""Representing and manipulating email headers via custom objects.
2n/a
3n/aThis module provides an implementation of the HeaderRegistry API.
4n/aThe implementation is designed to flexibly follow RFC5322 rules.
5n/a
6n/aEventually HeaderRegistry will be a public API, but it isn't yet,
7n/aand will probably change some before that happens.
8n/a
9n/a"""
10n/afrom types import MappingProxyType
11n/a
12n/afrom email import utils
13n/afrom email import errors
14n/afrom email import _header_value_parser as parser
15n/a
16n/aclass Address:
17n/a
18n/a def __init__(self, display_name='', username='', domain='', addr_spec=None):
19n/a """Create an object representing a full email address.
20n/a
21n/a An address can have a 'display_name', a 'username', and a 'domain'. In
22n/a addition to specifying the username and domain separately, they may be
23n/a specified together by using the addr_spec keyword *instead of* the
24n/a username and domain keywords. If an addr_spec string is specified it
25n/a must be properly quoted according to RFC 5322 rules; an error will be
26n/a raised if it is not.
27n/a
28n/a An Address object has display_name, username, domain, and addr_spec
29n/a attributes, all of which are read-only. The addr_spec and the string
30n/a value of the object are both quoted according to RFC5322 rules, but
31n/a without any Content Transfer Encoding.
32n/a
33n/a """
34n/a # This clause with its potential 'raise' may only happen when an
35n/a # application program creates an Address object using an addr_spec
36n/a # keyword. The email library code itself must always supply username
37n/a # and domain.
38n/a if addr_spec is not None:
39n/a if username or domain:
40n/a raise TypeError("addrspec specified when username and/or "
41n/a "domain also specified")
42n/a a_s, rest = parser.get_addr_spec(addr_spec)
43n/a if rest:
44n/a raise ValueError("Invalid addr_spec; only '{}' "
45n/a "could be parsed from '{}'".format(
46n/a a_s, addr_spec))
47n/a if a_s.all_defects:
48n/a raise a_s.all_defects[0]
49n/a username = a_s.local_part
50n/a domain = a_s.domain
51n/a self._display_name = display_name
52n/a self._username = username
53n/a self._domain = domain
54n/a
55n/a @property
56n/a def display_name(self):
57n/a return self._display_name
58n/a
59n/a @property
60n/a def username(self):
61n/a return self._username
62n/a
63n/a @property
64n/a def domain(self):
65n/a return self._domain
66n/a
67n/a @property
68n/a def addr_spec(self):
69n/a """The addr_spec (username@domain) portion of the address, quoted
70n/a according to RFC 5322 rules, but with no Content Transfer Encoding.
71n/a """
72n/a nameset = set(self.username)
73n/a if len(nameset) > len(nameset-parser.DOT_ATOM_ENDS):
74n/a lp = parser.quote_string(self.username)
75n/a else:
76n/a lp = self.username
77n/a if self.domain:
78n/a return lp + '@' + self.domain
79n/a if not lp:
80n/a return '<>'
81n/a return lp
82n/a
83n/a def __repr__(self):
84n/a return "{}(display_name={!r}, username={!r}, domain={!r})".format(
85n/a self.__class__.__name__,
86n/a self.display_name, self.username, self.domain)
87n/a
88n/a def __str__(self):
89n/a nameset = set(self.display_name)
90n/a if len(nameset) > len(nameset-parser.SPECIALS):
91n/a disp = parser.quote_string(self.display_name)
92n/a else:
93n/a disp = self.display_name
94n/a if disp:
95n/a addr_spec = '' if self.addr_spec=='<>' else self.addr_spec
96n/a return "{} <{}>".format(disp, addr_spec)
97n/a return self.addr_spec
98n/a
99n/a def __eq__(self, other):
100n/a if type(other) != type(self):
101n/a return False
102n/a return (self.display_name == other.display_name and
103n/a self.username == other.username and
104n/a self.domain == other.domain)
105n/a
106n/a
107n/aclass Group:
108n/a
109n/a def __init__(self, display_name=None, addresses=None):
110n/a """Create an object representing an address group.
111n/a
112n/a An address group consists of a display_name followed by colon and a
113n/a list of addresses (see Address) terminated by a semi-colon. The Group
114n/a is created by specifying a display_name and a possibly empty list of
115n/a Address objects. A Group can also be used to represent a single
116n/a address that is not in a group, which is convenient when manipulating
117n/a lists that are a combination of Groups and individual Addresses. In
118n/a this case the display_name should be set to None. In particular, the
119n/a string representation of a Group whose display_name is None is the same
120n/a as the Address object, if there is one and only one Address object in
121n/a the addresses list.
122n/a
123n/a """
124n/a self._display_name = display_name
125n/a self._addresses = tuple(addresses) if addresses else tuple()
126n/a
127n/a @property
128n/a def display_name(self):
129n/a return self._display_name
130n/a
131n/a @property
132n/a def addresses(self):
133n/a return self._addresses
134n/a
135n/a def __repr__(self):
136n/a return "{}(display_name={!r}, addresses={!r}".format(
137n/a self.__class__.__name__,
138n/a self.display_name, self.addresses)
139n/a
140n/a def __str__(self):
141n/a if self.display_name is None and len(self.addresses)==1:
142n/a return str(self.addresses[0])
143n/a disp = self.display_name
144n/a if disp is not None:
145n/a nameset = set(disp)
146n/a if len(nameset) > len(nameset-parser.SPECIALS):
147n/a disp = parser.quote_string(disp)
148n/a adrstr = ", ".join(str(x) for x in self.addresses)
149n/a adrstr = ' ' + adrstr if adrstr else adrstr
150n/a return "{}:{};".format(disp, adrstr)
151n/a
152n/a def __eq__(self, other):
153n/a if type(other) != type(self):
154n/a return False
155n/a return (self.display_name == other.display_name and
156n/a self.addresses == other.addresses)
157n/a
158n/a
159n/a# Header Classes #
160n/a
161n/aclass BaseHeader(str):
162n/a
163n/a """Base class for message headers.
164n/a
165n/a Implements generic behavior and provides tools for subclasses.
166n/a
167n/a A subclass must define a classmethod named 'parse' that takes an unfolded
168n/a value string and a dictionary as its arguments. The dictionary will
169n/a contain one key, 'defects', initialized to an empty list. After the call
170n/a the dictionary must contain two additional keys: parse_tree, set to the
171n/a parse tree obtained from parsing the header, and 'decoded', set to the
172n/a string value of the idealized representation of the data from the value.
173n/a (That is, encoded words are decoded, and values that have canonical
174n/a representations are so represented.)
175n/a
176n/a The defects key is intended to collect parsing defects, which the message
177n/a parser will subsequently dispose of as appropriate. The parser should not,
178n/a insofar as practical, raise any errors. Defects should be added to the
179n/a list instead. The standard header parsers register defects for RFC
180n/a compliance issues, for obsolete RFC syntax, and for unrecoverable parsing
181n/a errors.
182n/a
183n/a The parse method may add additional keys to the dictionary. In this case
184n/a the subclass must define an 'init' method, which will be passed the
185n/a dictionary as its keyword arguments. The method should use (usually by
186n/a setting them as the value of similarly named attributes) and remove all the
187n/a extra keys added by its parse method, and then use super to call its parent
188n/a class with the remaining arguments and keywords.
189n/a
190n/a The subclass should also make sure that a 'max_count' attribute is defined
191n/a that is either None or 1. XXX: need to better define this API.
192n/a
193n/a """
194n/a
195n/a def __new__(cls, name, value):
196n/a kwds = {'defects': []}
197n/a cls.parse(value, kwds)
198n/a if utils._has_surrogates(kwds['decoded']):
199n/a kwds['decoded'] = utils._sanitize(kwds['decoded'])
200n/a self = str.__new__(cls, kwds['decoded'])
201n/a del kwds['decoded']
202n/a self.init(name, **kwds)
203n/a return self
204n/a
205n/a def init(self, name, *, parse_tree, defects):
206n/a self._name = name
207n/a self._parse_tree = parse_tree
208n/a self._defects = defects
209n/a
210n/a @property
211n/a def name(self):
212n/a return self._name
213n/a
214n/a @property
215n/a def defects(self):
216n/a return tuple(self._defects)
217n/a
218n/a def __reduce__(self):
219n/a return (
220n/a _reconstruct_header,
221n/a (
222n/a self.__class__.__name__,
223n/a self.__class__.__bases__,
224n/a str(self),
225n/a ),
226n/a self.__dict__)
227n/a
228n/a @classmethod
229n/a def _reconstruct(cls, value):
230n/a return str.__new__(cls, value)
231n/a
232n/a def fold(self, *, policy):
233n/a """Fold header according to policy.
234n/a
235n/a The parsed representation of the header is folded according to
236n/a RFC5322 rules, as modified by the policy. If the parse tree
237n/a contains surrogateescaped bytes, the bytes are CTE encoded using
238n/a the charset 'unknown-8bit".
239n/a
240n/a Any non-ASCII characters in the parse tree are CTE encoded using
241n/a charset utf-8. XXX: make this a policy setting.
242n/a
243n/a The returned value is an ASCII-only string possibly containing linesep
244n/a characters, and ending with a linesep character. The string includes
245n/a the header name and the ': ' separator.
246n/a
247n/a """
248n/a # At some point we need to only put fws here if it was in the source.
249n/a header = parser.Header([
250n/a parser.HeaderLabel([
251n/a parser.ValueTerminal(self.name, 'header-name'),
252n/a parser.ValueTerminal(':', 'header-sep')]),
253n/a parser.CFWSList([parser.WhiteSpaceTerminal(' ', 'fws')]),
254n/a self._parse_tree])
255n/a return header.fold(policy=policy)
256n/a
257n/a
258n/adef _reconstruct_header(cls_name, bases, value):
259n/a return type(cls_name, bases, {})._reconstruct(value)
260n/a
261n/a
262n/aclass UnstructuredHeader:
263n/a
264n/a max_count = None
265n/a value_parser = staticmethod(parser.get_unstructured)
266n/a
267n/a @classmethod
268n/a def parse(cls, value, kwds):
269n/a kwds['parse_tree'] = cls.value_parser(value)
270n/a kwds['decoded'] = str(kwds['parse_tree'])
271n/a
272n/a
273n/aclass UniqueUnstructuredHeader(UnstructuredHeader):
274n/a
275n/a max_count = 1
276n/a
277n/a
278n/aclass DateHeader:
279n/a
280n/a """Header whose value consists of a single timestamp.
281n/a
282n/a Provides an additional attribute, datetime, which is either an aware
283n/a datetime using a timezone, or a naive datetime if the timezone
284n/a in the input string is -0000. Also accepts a datetime as input.
285n/a The 'value' attribute is the normalized form of the timestamp,
286n/a which means it is the output of format_datetime on the datetime.
287n/a """
288n/a
289n/a max_count = None
290n/a
291n/a # This is used only for folding, not for creating 'decoded'.
292n/a value_parser = staticmethod(parser.get_unstructured)
293n/a
294n/a @classmethod
295n/a def parse(cls, value, kwds):
296n/a if not value:
297n/a kwds['defects'].append(errors.HeaderMissingRequiredValue())
298n/a kwds['datetime'] = None
299n/a kwds['decoded'] = ''
300n/a kwds['parse_tree'] = parser.TokenList()
301n/a return
302n/a if isinstance(value, str):
303n/a value = utils.parsedate_to_datetime(value)
304n/a kwds['datetime'] = value
305n/a kwds['decoded'] = utils.format_datetime(kwds['datetime'])
306n/a kwds['parse_tree'] = cls.value_parser(kwds['decoded'])
307n/a
308n/a def init(self, *args, **kw):
309n/a self._datetime = kw.pop('datetime')
310n/a super().init(*args, **kw)
311n/a
312n/a @property
313n/a def datetime(self):
314n/a return self._datetime
315n/a
316n/a
317n/aclass UniqueDateHeader(DateHeader):
318n/a
319n/a max_count = 1
320n/a
321n/a
322n/aclass AddressHeader:
323n/a
324n/a max_count = None
325n/a
326n/a @staticmethod
327n/a def value_parser(value):
328n/a address_list, value = parser.get_address_list(value)
329n/a assert not value, 'this should not happen'
330n/a return address_list
331n/a
332n/a @classmethod
333n/a def parse(cls, value, kwds):
334n/a if isinstance(value, str):
335n/a # We are translating here from the RFC language (address/mailbox)
336n/a # to our API language (group/address).
337n/a kwds['parse_tree'] = address_list = cls.value_parser(value)
338n/a groups = []
339n/a for addr in address_list.addresses:
340n/a groups.append(Group(addr.display_name,
341n/a [Address(mb.display_name or '',
342n/a mb.local_part or '',
343n/a mb.domain or '')
344n/a for mb in addr.all_mailboxes]))
345n/a defects = list(address_list.all_defects)
346n/a else:
347n/a # Assume it is Address/Group stuff
348n/a if not hasattr(value, '__iter__'):
349n/a value = [value]
350n/a groups = [Group(None, [item]) if not hasattr(item, 'addresses')
351n/a else item
352n/a for item in value]
353n/a defects = []
354n/a kwds['groups'] = groups
355n/a kwds['defects'] = defects
356n/a kwds['decoded'] = ', '.join([str(item) for item in groups])
357n/a if 'parse_tree' not in kwds:
358n/a kwds['parse_tree'] = cls.value_parser(kwds['decoded'])
359n/a
360n/a def init(self, *args, **kw):
361n/a self._groups = tuple(kw.pop('groups'))
362n/a self._addresses = None
363n/a super().init(*args, **kw)
364n/a
365n/a @property
366n/a def groups(self):
367n/a return self._groups
368n/a
369n/a @property
370n/a def addresses(self):
371n/a if self._addresses is None:
372n/a self._addresses = tuple([address for group in self._groups
373n/a for address in group.addresses])
374n/a return self._addresses
375n/a
376n/a
377n/aclass UniqueAddressHeader(AddressHeader):
378n/a
379n/a max_count = 1
380n/a
381n/a
382n/aclass SingleAddressHeader(AddressHeader):
383n/a
384n/a @property
385n/a def address(self):
386n/a if len(self.addresses)!=1:
387n/a raise ValueError(("value of single address header {} is not "
388n/a "a single address").format(self.name))
389n/a return self.addresses[0]
390n/a
391n/a
392n/aclass UniqueSingleAddressHeader(SingleAddressHeader):
393n/a
394n/a max_count = 1
395n/a
396n/a
397n/aclass MIMEVersionHeader:
398n/a
399n/a max_count = 1
400n/a
401n/a value_parser = staticmethod(parser.parse_mime_version)
402n/a
403n/a @classmethod
404n/a def parse(cls, value, kwds):
405n/a kwds['parse_tree'] = parse_tree = cls.value_parser(value)
406n/a kwds['decoded'] = str(parse_tree)
407n/a kwds['defects'].extend(parse_tree.all_defects)
408n/a kwds['major'] = None if parse_tree.minor is None else parse_tree.major
409n/a kwds['minor'] = parse_tree.minor
410n/a if parse_tree.minor is not None:
411n/a kwds['version'] = '{}.{}'.format(kwds['major'], kwds['minor'])
412n/a else:
413n/a kwds['version'] = None
414n/a
415n/a def init(self, *args, **kw):
416n/a self._version = kw.pop('version')
417n/a self._major = kw.pop('major')
418n/a self._minor = kw.pop('minor')
419n/a super().init(*args, **kw)
420n/a
421n/a @property
422n/a def major(self):
423n/a return self._major
424n/a
425n/a @property
426n/a def minor(self):
427n/a return self._minor
428n/a
429n/a @property
430n/a def version(self):
431n/a return self._version
432n/a
433n/a
434n/aclass ParameterizedMIMEHeader:
435n/a
436n/a # Mixin that handles the params dict. Must be subclassed and
437n/a # a property value_parser for the specific header provided.
438n/a
439n/a max_count = 1
440n/a
441n/a @classmethod
442n/a def parse(cls, value, kwds):
443n/a kwds['parse_tree'] = parse_tree = cls.value_parser(value)
444n/a kwds['decoded'] = str(parse_tree)
445n/a kwds['defects'].extend(parse_tree.all_defects)
446n/a if parse_tree.params is None:
447n/a kwds['params'] = {}
448n/a else:
449n/a # The MIME RFCs specify that parameter ordering is arbitrary.
450n/a kwds['params'] = {utils._sanitize(name).lower():
451n/a utils._sanitize(value)
452n/a for name, value in parse_tree.params}
453n/a
454n/a def init(self, *args, **kw):
455n/a self._params = kw.pop('params')
456n/a super().init(*args, **kw)
457n/a
458n/a @property
459n/a def params(self):
460n/a return MappingProxyType(self._params)
461n/a
462n/a
463n/aclass ContentTypeHeader(ParameterizedMIMEHeader):
464n/a
465n/a value_parser = staticmethod(parser.parse_content_type_header)
466n/a
467n/a def init(self, *args, **kw):
468n/a super().init(*args, **kw)
469n/a self._maintype = utils._sanitize(self._parse_tree.maintype)
470n/a self._subtype = utils._sanitize(self._parse_tree.subtype)
471n/a
472n/a @property
473n/a def maintype(self):
474n/a return self._maintype
475n/a
476n/a @property
477n/a def subtype(self):
478n/a return self._subtype
479n/a
480n/a @property
481n/a def content_type(self):
482n/a return self.maintype + '/' + self.subtype
483n/a
484n/a
485n/aclass ContentDispositionHeader(ParameterizedMIMEHeader):
486n/a
487n/a value_parser = staticmethod(parser.parse_content_disposition_header)
488n/a
489n/a def init(self, *args, **kw):
490n/a super().init(*args, **kw)
491n/a cd = self._parse_tree.content_disposition
492n/a self._content_disposition = cd if cd is None else utils._sanitize(cd)
493n/a
494n/a @property
495n/a def content_disposition(self):
496n/a return self._content_disposition
497n/a
498n/a
499n/aclass ContentTransferEncodingHeader:
500n/a
501n/a max_count = 1
502n/a
503n/a value_parser = staticmethod(parser.parse_content_transfer_encoding_header)
504n/a
505n/a @classmethod
506n/a def parse(cls, value, kwds):
507n/a kwds['parse_tree'] = parse_tree = cls.value_parser(value)
508n/a kwds['decoded'] = str(parse_tree)
509n/a kwds['defects'].extend(parse_tree.all_defects)
510n/a
511n/a def init(self, *args, **kw):
512n/a super().init(*args, **kw)
513n/a self._cte = utils._sanitize(self._parse_tree.cte)
514n/a
515n/a @property
516n/a def cte(self):
517n/a return self._cte
518n/a
519n/a
520n/a# The header factory #
521n/a
522n/a_default_header_map = {
523n/a 'subject': UniqueUnstructuredHeader,
524n/a 'date': UniqueDateHeader,
525n/a 'resent-date': DateHeader,
526n/a 'orig-date': UniqueDateHeader,
527n/a 'sender': UniqueSingleAddressHeader,
528n/a 'resent-sender': SingleAddressHeader,
529n/a 'to': UniqueAddressHeader,
530n/a 'resent-to': AddressHeader,
531n/a 'cc': UniqueAddressHeader,
532n/a 'resent-cc': AddressHeader,
533n/a 'bcc': UniqueAddressHeader,
534n/a 'resent-bcc': AddressHeader,
535n/a 'from': UniqueAddressHeader,
536n/a 'resent-from': AddressHeader,
537n/a 'reply-to': UniqueAddressHeader,
538n/a 'mime-version': MIMEVersionHeader,
539n/a 'content-type': ContentTypeHeader,
540n/a 'content-disposition': ContentDispositionHeader,
541n/a 'content-transfer-encoding': ContentTransferEncodingHeader,
542n/a }
543n/a
544n/aclass HeaderRegistry:
545n/a
546n/a """A header_factory and header registry."""
547n/a
548n/a def __init__(self, base_class=BaseHeader, default_class=UnstructuredHeader,
549n/a use_default_map=True):
550n/a """Create a header_factory that works with the Policy API.
551n/a
552n/a base_class is the class that will be the last class in the created
553n/a header class's __bases__ list. default_class is the class that will be
554n/a used if "name" (see __call__) does not appear in the registry.
555n/a use_default_map controls whether or not the default mapping of names to
556n/a specialized classes is copied in to the registry when the factory is
557n/a created. The default is True.
558n/a
559n/a """
560n/a self.registry = {}
561n/a self.base_class = base_class
562n/a self.default_class = default_class
563n/a if use_default_map:
564n/a self.registry.update(_default_header_map)
565n/a
566n/a def map_to_type(self, name, cls):
567n/a """Register cls as the specialized class for handling "name" headers.
568n/a
569n/a """
570n/a self.registry[name.lower()] = cls
571n/a
572n/a def __getitem__(self, name):
573n/a cls = self.registry.get(name.lower(), self.default_class)
574n/a return type('_'+cls.__name__, (cls, self.base_class), {})
575n/a
576n/a def __call__(self, name, value):
577n/a """Create a header instance for header 'name' from 'value'.
578n/a
579n/a Creates a header instance by creating a specialized class for parsing
580n/a and representing the specified header by combining the factory
581n/a base_class with a specialized class from the registry or the
582n/a default_class, and passing the name and value to the constructed
583n/a class's constructor.
584n/a
585n/a """
586n/a return self[name](name, value)