ยปCore Development>Code coverage>Lib/email/contentmanager.py

Python code coverage for Lib/email/contentmanager.py

#countcontent
1n/aimport binascii
2n/aimport email.charset
3n/aimport email.message
4n/aimport email.errors
5n/afrom email import quoprimime
6n/a
7n/aclass ContentManager:
8n/a
9n/a def __init__(self):
10n/a self.get_handlers = {}
11n/a self.set_handlers = {}
12n/a
13n/a def add_get_handler(self, key, handler):
14n/a self.get_handlers[key] = handler
15n/a
16n/a def get_content(self, msg, *args, **kw):
17n/a content_type = msg.get_content_type()
18n/a if content_type in self.get_handlers:
19n/a return self.get_handlers[content_type](msg, *args, **kw)
20n/a maintype = msg.get_content_maintype()
21n/a if maintype in self.get_handlers:
22n/a return self.get_handlers[maintype](msg, *args, **kw)
23n/a if '' in self.get_handlers:
24n/a return self.get_handlers[''](msg, *args, **kw)
25n/a raise KeyError(content_type)
26n/a
27n/a def add_set_handler(self, typekey, handler):
28n/a self.set_handlers[typekey] = handler
29n/a
30n/a def set_content(self, msg, obj, *args, **kw):
31n/a if msg.get_content_maintype() == 'multipart':
32n/a # XXX: is this error a good idea or not? We can remove it later,
33n/a # but we can't add it later, so do it for now.
34n/a raise TypeError("set_content not valid on multipart")
35n/a handler = self._find_set_handler(msg, obj)
36n/a msg.clear_content()
37n/a handler(msg, obj, *args, **kw)
38n/a
39n/a def _find_set_handler(self, msg, obj):
40n/a full_path_for_error = None
41n/a for typ in type(obj).__mro__:
42n/a if typ in self.set_handlers:
43n/a return self.set_handlers[typ]
44n/a qname = typ.__qualname__
45n/a modname = getattr(typ, '__module__', '')
46n/a full_path = '.'.join((modname, qname)) if modname else qname
47n/a if full_path_for_error is None:
48n/a full_path_for_error = full_path
49n/a if full_path in self.set_handlers:
50n/a return self.set_handlers[full_path]
51n/a if qname in self.set_handlers:
52n/a return self.set_handlers[qname]
53n/a name = typ.__name__
54n/a if name in self.set_handlers:
55n/a return self.set_handlers[name]
56n/a if None in self.set_handlers:
57n/a return self.set_handlers[None]
58n/a raise KeyError(full_path_for_error)
59n/a
60n/a
61n/araw_data_manager = ContentManager()
62n/a
63n/a
64n/adef get_text_content(msg, errors='replace'):
65n/a content = msg.get_payload(decode=True)
66n/a charset = msg.get_param('charset', 'ASCII')
67n/a return content.decode(charset, errors=errors)
68n/araw_data_manager.add_get_handler('text', get_text_content)
69n/a
70n/a
71n/adef get_non_text_content(msg):
72n/a return msg.get_payload(decode=True)
73n/afor maintype in 'audio image video application'.split():
74n/a raw_data_manager.add_get_handler(maintype, get_non_text_content)
75n/a
76n/a
77n/adef get_message_content(msg):
78n/a return msg.get_payload(0)
79n/afor subtype in 'rfc822 external-body'.split():
80n/a raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
81n/a
82n/a
83n/adef get_and_fixup_unknown_message_content(msg):
84n/a # If we don't understand a message subtype, we are supposed to treat it as
85n/a # if it were application/octet-stream, per
86n/a # tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that,
87n/a # so do our best to fix things up. Note that it is *not* appropriate to
88n/a # model message/partial content as Message objects, so they are handled
89n/a # here as well. (How to reassemble them is out of scope for this comment :)
90n/a return bytes(msg.get_payload(0))
91n/araw_data_manager.add_get_handler('message',
92n/a get_and_fixup_unknown_message_content)
93n/a
94n/a
95n/adef _prepare_set(msg, maintype, subtype, headers):
96n/a msg['Content-Type'] = '/'.join((maintype, subtype))
97n/a if headers:
98n/a if not hasattr(headers[0], 'name'):
99n/a mp = msg.policy
100n/a headers = [mp.header_factory(*mp.header_source_parse([header]))
101n/a for header in headers]
102n/a try:
103n/a for header in headers:
104n/a if header.defects:
105n/a raise header.defects[0]
106n/a msg[header.name] = header
107n/a except email.errors.HeaderDefect as exc:
108n/a raise ValueError("Invalid header: {}".format(
109n/a header.fold(policy=msg.policy))) from exc
110n/a
111n/a
112n/adef _finalize_set(msg, disposition, filename, cid, params):
113n/a if disposition is None and filename is not None:
114n/a disposition = 'attachment'
115n/a if disposition is not None:
116n/a msg['Content-Disposition'] = disposition
117n/a if filename is not None:
118n/a msg.set_param('filename',
119n/a filename,
120n/a header='Content-Disposition',
121n/a replace=True)
122n/a if cid is not None:
123n/a msg['Content-ID'] = cid
124n/a if params is not None:
125n/a for key, value in params.items():
126n/a msg.set_param(key, value)
127n/a
128n/a
129n/a# XXX: This is a cleaned-up version of base64mime.body_encode (including a bug
130n/a# fix in the calculation of unencoded_bytes_per_line). It would be nice to
131n/a# drop both this and quoprimime.body_encode in favor of enhanced binascii
132n/a# routines that accepted a max_line_length parameter.
133n/adef _encode_base64(data, max_line_length):
134n/a encoded_lines = []
135n/a unencoded_bytes_per_line = max_line_length // 4 * 3
136n/a for i in range(0, len(data), unencoded_bytes_per_line):
137n/a thisline = data[i:i+unencoded_bytes_per_line]
138n/a encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
139n/a return ''.join(encoded_lines)
140n/a
141n/a
142n/adef _encode_text(string, charset, cte, policy):
143n/a lines = string.encode(charset).splitlines()
144n/a linesep = policy.linesep.encode('ascii')
145n/a def embedded_body(lines): return linesep.join(lines) + linesep
146n/a def normal_body(lines): return b'\n'.join(lines) + b'\n'
147n/a if cte==None:
148n/a # Use heuristics to decide on the "best" encoding.
149n/a try:
150n/a return '7bit', normal_body(lines).decode('ascii')
151n/a except UnicodeDecodeError:
152n/a pass
153n/a if (policy.cte_type == '8bit' and
154n/a max(len(x) for x in lines) <= policy.max_line_length):
155n/a return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
156n/a sniff = embedded_body(lines[:10])
157n/a sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
158n/a policy.max_line_length)
159n/a sniff_base64 = binascii.b2a_base64(sniff)
160n/a # This is a little unfair to qp; it includes lineseps, base64 doesn't.
161n/a if len(sniff_qp) > len(sniff_base64):
162n/a cte = 'base64'
163n/a else:
164n/a cte = 'quoted-printable'
165n/a if len(lines) <= 10:
166n/a return cte, sniff_qp
167n/a if cte == '7bit':
168n/a data = normal_body(lines).decode('ascii')
169n/a elif cte == '8bit':
170n/a data = normal_body(lines).decode('ascii', 'surrogateescape')
171n/a elif cte == 'quoted-printable':
172n/a data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
173n/a policy.max_line_length)
174n/a elif cte == 'base64':
175n/a data = _encode_base64(embedded_body(lines), policy.max_line_length)
176n/a else:
177n/a raise ValueError("Unknown content transfer encoding {}".format(cte))
178n/a return cte, data
179n/a
180n/a
181n/adef set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
182n/a disposition=None, filename=None, cid=None,
183n/a params=None, headers=None):
184n/a _prepare_set(msg, 'text', subtype, headers)
185n/a cte, payload = _encode_text(string, charset, cte, msg.policy)
186n/a msg.set_payload(payload)
187n/a msg.set_param('charset',
188n/a email.charset.ALIASES.get(charset, charset),
189n/a replace=True)
190n/a msg['Content-Transfer-Encoding'] = cte
191n/a _finalize_set(msg, disposition, filename, cid, params)
192n/araw_data_manager.add_set_handler(str, set_text_content)
193n/a
194n/a
195n/adef set_message_content(msg, message, subtype="rfc822", cte=None,
196n/a disposition=None, filename=None, cid=None,
197n/a params=None, headers=None):
198n/a if subtype == 'partial':
199n/a raise ValueError("message/partial is not supported for Message objects")
200n/a if subtype == 'rfc822':
201n/a if cte not in (None, '7bit', '8bit', 'binary'):
202n/a # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
203n/a raise ValueError(
204n/a "message/rfc822 parts do not support cte={}".format(cte))
205n/a # 8bit will get coerced on serialization if policy.cte_type='7bit'. We
206n/a # may end up claiming 8bit when it isn't needed, but the only negative
207n/a # result of that should be a gateway that needs to coerce to 7bit
208n/a # having to look through the whole embedded message to discover whether
209n/a # or not it actually has to do anything.
210n/a cte = '8bit' if cte is None else cte
211n/a elif subtype == 'external-body':
212n/a if cte not in (None, '7bit'):
213n/a # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
214n/a raise ValueError(
215n/a "message/external-body parts do not support cte={}".format(cte))
216n/a cte = '7bit'
217n/a elif cte is None:
218n/a # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
219n/a # subtypes should be restricted to 7bit, so assume that.
220n/a cte = '7bit'
221n/a _prepare_set(msg, 'message', subtype, headers)
222n/a msg.set_payload([message])
223n/a msg['Content-Transfer-Encoding'] = cte
224n/a _finalize_set(msg, disposition, filename, cid, params)
225n/araw_data_manager.add_set_handler(email.message.Message, set_message_content)
226n/a
227n/a
228n/adef set_bytes_content(msg, data, maintype, subtype, cte='base64',
229n/a disposition=None, filename=None, cid=None,
230n/a params=None, headers=None):
231n/a _prepare_set(msg, maintype, subtype, headers)
232n/a if cte == 'base64':
233n/a data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
234n/a elif cte == 'quoted-printable':
235n/a # XXX: quoprimime.body_encode won't encode newline characters in data,
236n/a # so we can't use it. This means max_line_length is ignored. Another
237n/a # bug to fix later. (Note: encoders.quopri is broken on line ends.)
238n/a data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
239n/a data = data.decode('ascii')
240n/a elif cte == '7bit':
241n/a # Make sure it really is only ASCII. The early warning here seems
242n/a # worth the overhead...if you care write your own content manager :).
243n/a data.encode('ascii')
244n/a elif cte in ('8bit', 'binary'):
245n/a data = data.decode('ascii', 'surrogateescape')
246n/a msg.set_payload(data)
247n/a msg['Content-Transfer-Encoding'] = cte
248n/a _finalize_set(msg, disposition, filename, cid, params)
249n/afor typ in (bytes, bytearray, memoryview):
250n/a raw_data_manager.add_set_handler(typ, set_bytes_content)