»Core Development>Code coverage>Lib/test/test_email/test_contentmanager.py

Python code coverage for Lib/test/test_email/test_contentmanager.py

#countcontent
1n/aimport unittest
2n/afrom test.test_email import TestEmailBase, parameterize
3n/aimport textwrap
4n/afrom email import policy
5n/afrom email.message import EmailMessage
6n/afrom email.contentmanager import ContentManager, raw_data_manager
7n/a
8n/a
9n/a@parameterize
10n/aclass TestContentManager(TestEmailBase):
11n/a
12n/a policy = policy.default
13n/a message = EmailMessage
14n/a
15n/a get_key_params = {
16n/a 'full_type': (1, 'text/plain',),
17n/a 'maintype_only': (2, 'text',),
18n/a 'null_key': (3, '',),
19n/a }
20n/a
21n/a def get_key_as_get_content_key(self, order, key):
22n/a def foo_getter(msg, foo=None):
23n/a bar = msg['X-Bar-Header']
24n/a return foo, bar
25n/a cm = ContentManager()
26n/a cm.add_get_handler(key, foo_getter)
27n/a m = self._make_message()
28n/a m['Content-Type'] = 'text/plain'
29n/a m['X-Bar-Header'] = 'foo'
30n/a self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
31n/a
32n/a def get_key_as_get_content_key_order(self, order, key):
33n/a def bar_getter(msg):
34n/a return msg['X-Bar-Header']
35n/a def foo_getter(msg):
36n/a return msg['X-Foo-Header']
37n/a cm = ContentManager()
38n/a cm.add_get_handler(key, foo_getter)
39n/a for precedence, key in self.get_key_params.values():
40n/a if precedence > order:
41n/a cm.add_get_handler(key, bar_getter)
42n/a m = self._make_message()
43n/a m['Content-Type'] = 'text/plain'
44n/a m['X-Bar-Header'] = 'bar'
45n/a m['X-Foo-Header'] = 'foo'
46n/a self.assertEqual(cm.get_content(m), ('foo'))
47n/a
48n/a def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
49n/a cm = ContentManager()
50n/a m = self._make_message()
51n/a m['Content-Type'] = 'text/plain'
52n/a with self.assertRaisesRegex(KeyError, 'text/plain'):
53n/a cm.get_content(m)
54n/a
55n/a class BaseThing(str):
56n/a pass
57n/a baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
58n/a class Thing(BaseThing):
59n/a pass
60n/a testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
61n/a
62n/a set_key_params = {
63n/a 'type': (0, Thing,),
64n/a 'full_path': (1, testobject_full_path,),
65n/a 'qualname': (2, 'TestContentManager.Thing',),
66n/a 'name': (3, 'Thing',),
67n/a 'base_type': (4, BaseThing,),
68n/a 'base_full_path': (5, baseobject_full_path,),
69n/a 'base_qualname': (6, 'TestContentManager.BaseThing',),
70n/a 'base_name': (7, 'BaseThing',),
71n/a 'str_type': (8, str,),
72n/a 'str_full_path': (9, 'builtins.str',),
73n/a 'str_name': (10, 'str',), # str name and qualname are the same
74n/a 'null_key': (11, None,),
75n/a }
76n/a
77n/a def set_key_as_set_content_key(self, order, key):
78n/a def foo_setter(msg, obj, foo=None):
79n/a msg['X-Foo-Header'] = foo
80n/a msg.set_payload(obj)
81n/a cm = ContentManager()
82n/a cm.add_set_handler(key, foo_setter)
83n/a m = self._make_message()
84n/a msg_obj = self.Thing()
85n/a cm.set_content(m, msg_obj, foo='bar')
86n/a self.assertEqual(m['X-Foo-Header'], 'bar')
87n/a self.assertEqual(m.get_payload(), msg_obj)
88n/a
89n/a def set_key_as_set_content_key_order(self, order, key):
90n/a def foo_setter(msg, obj):
91n/a msg['X-FooBar-Header'] = 'foo'
92n/a msg.set_payload(obj)
93n/a def bar_setter(msg, obj):
94n/a msg['X-FooBar-Header'] = 'bar'
95n/a cm = ContentManager()
96n/a cm.add_set_handler(key, foo_setter)
97n/a for precedence, key in self.get_key_params.values():
98n/a if precedence > order:
99n/a cm.add_set_handler(key, bar_setter)
100n/a m = self._make_message()
101n/a msg_obj = self.Thing()
102n/a cm.set_content(m, msg_obj)
103n/a self.assertEqual(m['X-FooBar-Header'], 'foo')
104n/a self.assertEqual(m.get_payload(), msg_obj)
105n/a
106n/a def test_set_content_raises_if_unknown_type_and_no_default(self):
107n/a cm = ContentManager()
108n/a m = self._make_message()
109n/a msg_obj = self.Thing()
110n/a with self.assertRaisesRegex(KeyError, self.testobject_full_path):
111n/a cm.set_content(m, msg_obj)
112n/a
113n/a def test_set_content_raises_if_called_on_multipart(self):
114n/a cm = ContentManager()
115n/a m = self._make_message()
116n/a m['Content-Type'] = 'multipart/foo'
117n/a with self.assertRaises(TypeError):
118n/a cm.set_content(m, 'test')
119n/a
120n/a def test_set_content_calls_clear_content(self):
121n/a m = self._make_message()
122n/a m['Content-Foo'] = 'bar'
123n/a m['Content-Type'] = 'text/html'
124n/a m['To'] = 'test'
125n/a m.set_payload('abc')
126n/a cm = ContentManager()
127n/a cm.add_set_handler(str, lambda *args, **kw: None)
128n/a m.set_content('xyz', content_manager=cm)
129n/a self.assertIsNone(m['Content-Foo'])
130n/a self.assertIsNone(m['Content-Type'])
131n/a self.assertEqual(m['To'], 'test')
132n/a self.assertIsNone(m.get_payload())
133n/a
134n/a
135n/a@parameterize
136n/aclass TestRawDataManager(TestEmailBase):
137n/a # Note: these tests are dependent on the order in which headers are added
138n/a # to the message objects by the code. There's no defined ordering in
139n/a # RFC5322/MIME, so this makes the tests more fragile than the standards
140n/a # require. However, if the header order changes it is best to understand
141n/a # *why*, and make sure it isn't a subtle bug in whatever change was
142n/a # applied.
143n/a
144n/a policy = policy.default.clone(max_line_length=60,
145n/a content_manager=raw_data_manager)
146n/a message = EmailMessage
147n/a
148n/a def test_get_text_plain(self):
149n/a m = self._str_msg(textwrap.dedent("""\
150n/a Content-Type: text/plain
151n/a
152n/a Basic text.
153n/a """))
154n/a self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
155n/a
156n/a def test_get_text_html(self):
157n/a m = self._str_msg(textwrap.dedent("""\
158n/a Content-Type: text/html
159n/a
160n/a <p>Basic text.</p>
161n/a """))
162n/a self.assertEqual(raw_data_manager.get_content(m),
163n/a "<p>Basic text.</p>\n")
164n/a
165n/a def test_get_text_plain_latin1(self):
166n/a m = self._bytes_msg(textwrap.dedent("""\
167n/a Content-Type: text/plain; charset=latin1
168n/a
169n/a Basìc tëxt.
170n/a """).encode('latin1'))
171n/a self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
172n/a
173n/a def test_get_text_plain_latin1_quoted_printable(self):
174n/a m = self._str_msg(textwrap.dedent("""\
175n/a Content-Type: text/plain; charset="latin-1"
176n/a Content-Transfer-Encoding: quoted-printable
177n/a
178n/a Bas=ECc t=EBxt.
179n/a """))
180n/a self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
181n/a
182n/a def test_get_text_plain_utf8_base64(self):
183n/a m = self._str_msg(textwrap.dedent("""\
184n/a Content-Type: text/plain; charset="utf8"
185n/a Content-Transfer-Encoding: base64
186n/a
187n/a QmFzw6xjIHTDq3h0Lgo=
188n/a """))
189n/a self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
190n/a
191n/a def test_get_text_plain_bad_utf8_quoted_printable(self):
192n/a m = self._str_msg(textwrap.dedent("""\
193n/a Content-Type: text/plain; charset="utf8"
194n/a Content-Transfer-Encoding: quoted-printable
195n/a
196n/a Bas=c3=acc t=c3=abxt=fd.
197n/a """))
198n/a self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
199n/a
200n/a def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
201n/a m = self._str_msg(textwrap.dedent("""\
202n/a Content-Type: text/plain; charset="utf8"
203n/a Content-Transfer-Encoding: quoted-printable
204n/a
205n/a Bas=c3=acc t=c3=abxt=fd.
206n/a """))
207n/a self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
208n/a "Basìc tëxt.\n")
209n/a
210n/a def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
211n/a m = self._str_msg(textwrap.dedent("""\
212n/a Content-Type: text/plain; charset="utf8"
213n/a Content-Transfer-Encoding: base64
214n/a
215n/a QmFzw6xjIHTDq3h0Lgo\xFF=
216n/a """))
217n/a self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
218n/a "Basìc tëxt.\n")
219n/a
220n/a def test_get_text_invalid_keyword(self):
221n/a m = self._str_msg(textwrap.dedent("""\
222n/a Content-Type: text/plain
223n/a
224n/a Basic text.
225n/a """))
226n/a with self.assertRaises(TypeError):
227n/a raw_data_manager.get_content(m, foo='ignore')
228n/a
229n/a def test_get_non_text(self):
230n/a template = textwrap.dedent("""\
231n/a Content-Type: {}
232n/a Content-Transfer-Encoding: base64
233n/a
234n/a Ym9ndXMgZGF0YQ==
235n/a """)
236n/a for maintype in 'audio image video application'.split():
237n/a with self.subTest(maintype=maintype):
238n/a m = self._str_msg(template.format(maintype+'/foo'))
239n/a self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
240n/a
241n/a def test_get_non_text_invalid_keyword(self):
242n/a m = self._str_msg(textwrap.dedent("""\
243n/a Content-Type: image/jpg
244n/a Content-Transfer-Encoding: base64
245n/a
246n/a Ym9ndXMgZGF0YQ==
247n/a """))
248n/a with self.assertRaises(TypeError):
249n/a raw_data_manager.get_content(m, errors='ignore')
250n/a
251n/a def test_get_raises_on_multipart(self):
252n/a m = self._str_msg(textwrap.dedent("""\
253n/a Content-Type: multipart/mixed; boundary="==="
254n/a
255n/a --===
256n/a --===--
257n/a """))
258n/a with self.assertRaises(KeyError):
259n/a raw_data_manager.get_content(m)
260n/a
261n/a def test_get_message_rfc822_and_external_body(self):
262n/a template = textwrap.dedent("""\
263n/a Content-Type: message/{}
264n/a
265n/a To: foo@example.com
266n/a From: bar@example.com
267n/a Subject: example
268n/a
269n/a an example message
270n/a """)
271n/a for subtype in 'rfc822 external-body'.split():
272n/a with self.subTest(subtype=subtype):
273n/a m = self._str_msg(template.format(subtype))
274n/a sub_msg = raw_data_manager.get_content(m)
275n/a self.assertIsInstance(sub_msg, self.message)
276n/a self.assertEqual(raw_data_manager.get_content(sub_msg),
277n/a "an example message\n")
278n/a self.assertEqual(sub_msg['to'], 'foo@example.com')
279n/a self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
280n/a
281n/a def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
282n/a m = self._str_msg(textwrap.dedent("""\
283n/a Content-Type: message/partial
284n/a
285n/a To: foo@example.com
286n/a From: bar@example.com
287n/a Subject: example
288n/a
289n/a The real body is in another message.
290n/a """))
291n/a self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
292n/a
293n/a def test_set_text_plain(self):
294n/a m = self._make_message()
295n/a content = "Simple message.\n"
296n/a raw_data_manager.set_content(m, content)
297n/a self.assertEqual(str(m), textwrap.dedent("""\
298n/a Content-Type: text/plain; charset="utf-8"
299n/a Content-Transfer-Encoding: 7bit
300n/a
301n/a Simple message.
302n/a """))
303n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
304n/a self.assertEqual(m.get_content(), content)
305n/a
306n/a def test_set_text_html(self):
307n/a m = self._make_message()
308n/a content = "<p>Simple message.</p>\n"
309n/a raw_data_manager.set_content(m, content, subtype='html')
310n/a self.assertEqual(str(m), textwrap.dedent("""\
311n/a Content-Type: text/html; charset="utf-8"
312n/a Content-Transfer-Encoding: 7bit
313n/a
314n/a <p>Simple message.</p>
315n/a """))
316n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
317n/a self.assertEqual(m.get_content(), content)
318n/a
319n/a def test_set_text_charset_latin_1(self):
320n/a m = self._make_message()
321n/a content = "Simple message.\n"
322n/a raw_data_manager.set_content(m, content, charset='latin-1')
323n/a self.assertEqual(str(m), textwrap.dedent("""\
324n/a Content-Type: text/plain; charset="iso-8859-1"
325n/a Content-Transfer-Encoding: 7bit
326n/a
327n/a Simple message.
328n/a """))
329n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
330n/a self.assertEqual(m.get_content(), content)
331n/a
332n/a def test_set_text_short_line_minimal_non_ascii_heuristics(self):
333n/a m = self._make_message()
334n/a content = "et là il est monté sur moi et il commence à m'éto.\n"
335n/a raw_data_manager.set_content(m, content)
336n/a self.assertEqual(bytes(m), textwrap.dedent("""\
337n/a Content-Type: text/plain; charset="utf-8"
338n/a Content-Transfer-Encoding: 8bit
339n/a
340n/a et là il est monté sur moi et il commence à m'éto.
341n/a """).encode('utf-8'))
342n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
343n/a self.assertEqual(m.get_content(), content)
344n/a
345n/a def test_set_text_long_line_minimal_non_ascii_heuristics(self):
346n/a m = self._make_message()
347n/a content = ("j'ai un problème de python. il est sorti de son"
348n/a " vivarium. et là il est monté sur moi et il commence"
349n/a " à m'éto.\n")
350n/a raw_data_manager.set_content(m, content)
351n/a self.assertEqual(bytes(m), textwrap.dedent("""\
352n/a Content-Type: text/plain; charset="utf-8"
353n/a Content-Transfer-Encoding: quoted-printable
354n/a
355n/a j'ai un probl=C3=A8me de python. il est sorti de son vivari=
356n/a um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
357n/a =C3=A0 m'=C3=A9to.
358n/a """).encode('utf-8'))
359n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
360n/a self.assertEqual(m.get_content(), content)
361n/a
362n/a def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
363n/a m = self._make_message()
364n/a content = '\n'*10 + (
365n/a "j'ai un problème de python. il est sorti de son"
366n/a " vivarium. et là il est monté sur moi et il commence"
367n/a " à m'éto.\n")
368n/a raw_data_manager.set_content(m, content)
369n/a self.assertEqual(bytes(m), textwrap.dedent("""\
370n/a Content-Type: text/plain; charset="utf-8"
371n/a Content-Transfer-Encoding: quoted-printable
372n/a """ + '\n'*10 + """
373n/a j'ai un probl=C3=A8me de python. il est sorti de son vivari=
374n/a um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
375n/a =C3=A0 m'=C3=A9to.
376n/a """).encode('utf-8'))
377n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
378n/a self.assertEqual(m.get_content(), content)
379n/a
380n/a def test_set_text_maximal_non_ascii_heuristics(self):
381n/a m = self._make_message()
382n/a content = "áàäéèęöő.\n"
383n/a raw_data_manager.set_content(m, content)
384n/a self.assertEqual(bytes(m), textwrap.dedent("""\
385n/a Content-Type: text/plain; charset="utf-8"
386n/a Content-Transfer-Encoding: 8bit
387n/a
388n/a áàäéèęöő.
389n/a """).encode('utf-8'))
390n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
391n/a self.assertEqual(m.get_content(), content)
392n/a
393n/a def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
394n/a m = self._make_message()
395n/a content = '\n'*10 + "áàäéèęöő.\n"
396n/a raw_data_manager.set_content(m, content)
397n/a self.assertEqual(bytes(m), textwrap.dedent("""\
398n/a Content-Type: text/plain; charset="utf-8"
399n/a Content-Transfer-Encoding: 8bit
400n/a """ + '\n'*10 + """
401n/a áàäéèęöő.
402n/a """).encode('utf-8'))
403n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
404n/a self.assertEqual(m.get_content(), content)
405n/a
406n/a def test_set_text_long_line_maximal_non_ascii_heuristics(self):
407n/a m = self._make_message()
408n/a content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
409n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
410n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
411n/a raw_data_manager.set_content(m, content)
412n/a self.assertEqual(bytes(m), textwrap.dedent("""\
413n/a Content-Type: text/plain; charset="utf-8"
414n/a Content-Transfer-Encoding: base64
415n/a
416n/a w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
417n/a tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
418n/a xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
419n/a qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
420n/a w6TDqcOoxJnDtsWRLgo=
421n/a """).encode('utf-8'))
422n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
423n/a self.assertEqual(m.get_content(), content)
424n/a
425n/a def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
426n/a # Yes, it chooses "wrong" here. It's a heuristic. So this result
427n/a # could change if we come up with a better heuristic.
428n/a m = self._make_message()
429n/a content = ('\n'*10 +
430n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
431n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
432n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
433n/a raw_data_manager.set_content(m, "\n"*10 +
434n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
435n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
436n/a "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
437n/a self.assertEqual(bytes(m), textwrap.dedent("""\
438n/a Content-Type: text/plain; charset="utf-8"
439n/a Content-Transfer-Encoding: quoted-printable
440n/a """ + '\n'*10 + """
441n/a =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
442n/a =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
443n/a =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
444n/a =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
445n/a =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
446n/a =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
447n/a =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
448n/a =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
449n/a =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
450n/a =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
451n/a =C5=91.
452n/a """).encode('utf-8'))
453n/a self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
454n/a self.assertEqual(m.get_content(), content)
455n/a
456n/a def test_set_text_non_ascii_with_cte_7bit_raises(self):
457n/a m = self._make_message()
458n/a with self.assertRaises(UnicodeError):
459n/a raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
460n/a
461n/a def test_set_text_non_ascii_with_charset_ascii_raises(self):
462n/a m = self._make_message()
463n/a with self.assertRaises(UnicodeError):
464n/a raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
465n/a
466n/a def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
467n/a m = self._make_message()
468n/a with self.assertRaises(UnicodeError):
469n/a raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
470n/a
471n/a def test_set_message(self):
472n/a m = self._make_message()
473n/a m['Subject'] = "Forwarded message"
474n/a content = self._make_message()
475n/a content['To'] = 'python@vivarium.org'
476n/a content['From'] = 'police@monty.org'
477n/a content['Subject'] = "get back in your box"
478n/a content.set_content("Or face the comfy chair.")
479n/a raw_data_manager.set_content(m, content)
480n/a self.assertEqual(str(m), textwrap.dedent("""\
481n/a Subject: Forwarded message
482n/a Content-Type: message/rfc822
483n/a Content-Transfer-Encoding: 8bit
484n/a
485n/a To: python@vivarium.org
486n/a From: police@monty.org
487n/a Subject: get back in your box
488n/a Content-Type: text/plain; charset="utf-8"
489n/a Content-Transfer-Encoding: 7bit
490n/a MIME-Version: 1.0
491n/a
492n/a Or face the comfy chair.
493n/a """))
494n/a payload = m.get_payload(0)
495n/a self.assertIsInstance(payload, self.message)
496n/a self.assertEqual(str(payload), str(content))
497n/a self.assertIsInstance(m.get_content(), self.message)
498n/a self.assertEqual(str(m.get_content()), str(content))
499n/a
500n/a def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
501n/a m = self._make_message()
502n/a m['Subject'] = "Escape report"
503n/a content = self._make_message()
504n/a content['To'] = 'police@monty.org'
505n/a content['From'] = 'victim@monty.org'
506n/a content['Subject'] = "Help"
507n/a content.set_content("j'ai un problème de python. il est sorti de son"
508n/a " vivarium.")
509n/a raw_data_manager.set_content(m, content)
510n/a self.assertEqual(bytes(m), textwrap.dedent("""\
511n/a Subject: Escape report
512n/a Content-Type: message/rfc822
513n/a Content-Transfer-Encoding: 8bit
514n/a
515n/a To: police@monty.org
516n/a From: victim@monty.org
517n/a Subject: Help
518n/a Content-Type: text/plain; charset="utf-8"
519n/a Content-Transfer-Encoding: 8bit
520n/a MIME-Version: 1.0
521n/a
522n/a j'ai un problème de python. il est sorti de son vivarium.
523n/a """).encode('utf-8'))
524n/a # The choice of base64 for the body encoding is because generator
525n/a # doesn't bother with heuristics and uses it unconditionally for utf-8
526n/a # text.
527n/a # XXX: the first cte should be 7bit, too...that's a generator bug.
528n/a # XXX: the line length in the body also looks like a generator bug.
529n/a self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
530n/a textwrap.dedent("""\
531n/a Subject: Escape report
532n/a Content-Type: message/rfc822
533n/a Content-Transfer-Encoding: 8bit
534n/a
535n/a To: police@monty.org
536n/a From: victim@monty.org
537n/a Subject: Help
538n/a Content-Type: text/plain; charset="utf-8"
539n/a Content-Transfer-Encoding: base64
540n/a MIME-Version: 1.0
541n/a
542n/a aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
543n/a Lgo=
544n/a """))
545n/a self.assertIsInstance(m.get_content(), self.message)
546n/a self.assertEqual(str(m.get_content()), str(content))
547n/a
548n/a def test_set_message_invalid_cte_raises(self):
549n/a m = self._make_message()
550n/a content = self._make_message()
551n/a for cte in 'quoted-printable base64'.split():
552n/a for subtype in 'rfc822 external-body'.split():
553n/a with self.subTest(cte=cte, subtype=subtype):
554n/a with self.assertRaises(ValueError) as ar:
555n/a m.set_content(content, subtype, cte=cte)
556n/a exc = str(ar.exception)
557n/a self.assertIn(cte, exc)
558n/a self.assertIn(subtype, exc)
559n/a subtype = 'external-body'
560n/a for cte in '8bit binary'.split():
561n/a with self.subTest(cte=cte, subtype=subtype):
562n/a with self.assertRaises(ValueError) as ar:
563n/a m.set_content(content, subtype, cte=cte)
564n/a exc = str(ar.exception)
565n/a self.assertIn(cte, exc)
566n/a self.assertIn(subtype, exc)
567n/a
568n/a def test_set_image_jpg(self):
569n/a for content in (b"bogus content",
570n/a bytearray(b"bogus content"),
571n/a memoryview(b"bogus content")):
572n/a with self.subTest(content=content):
573n/a m = self._make_message()
574n/a raw_data_manager.set_content(m, content, 'image', 'jpeg')
575n/a self.assertEqual(str(m), textwrap.dedent("""\
576n/a Content-Type: image/jpeg
577n/a Content-Transfer-Encoding: base64
578n/a
579n/a Ym9ndXMgY29udGVudA==
580n/a """))
581n/a self.assertEqual(m.get_payload(decode=True), content)
582n/a self.assertEqual(m.get_content(), content)
583n/a
584n/a def test_set_audio_aif_with_quoted_printable_cte(self):
585n/a # Why you would use qp, I don't know, but it is technically supported.
586n/a # XXX: the incorrect line length is because binascii.b2a_qp doesn't
587n/a # support a line length parameter, but we must use it to get newline
588n/a # encoding.
589n/a # XXX: what about that lack of tailing newline? Do we actually handle
590n/a # that correctly in all cases? That is, if the *source* has an
591n/a # unencoded newline, do we add an extra newline to the returned payload
592n/a # or not? And can that actually be disambiguated based on the RFC?
593n/a m = self._make_message()
594n/a content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
595n/a m.set_content(content, 'audio', 'aif', cte='quoted-printable')
596n/a self.assertEqual(bytes(m), textwrap.dedent("""\
597n/a Content-Type: audio/aif
598n/a Content-Transfer-Encoding: quoted-printable
599n/a MIME-Version: 1.0
600n/a
601n/a b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
602n/a zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
603n/a self.assertEqual(m.get_payload(decode=True), content)
604n/a self.assertEqual(m.get_content(), content)
605n/a
606n/a def test_set_video_mpeg_with_binary_cte(self):
607n/a m = self._make_message()
608n/a content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
609n/a m.set_content(content, 'video', 'mpeg', cte='binary')
610n/a self.assertEqual(bytes(m), textwrap.dedent("""\
611n/a Content-Type: video/mpeg
612n/a Content-Transfer-Encoding: binary
613n/a MIME-Version: 1.0
614n/a
615n/a """).encode('ascii') +
616n/a # XXX: the second \n ought to be a \r, but generator gets it wrong.
617n/a # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
618n/a b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
619n/a b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
620n/a self.assertEqual(m.get_payload(decode=True), content)
621n/a self.assertEqual(m.get_content(), content)
622n/a
623n/a def test_set_application_octet_stream_with_8bit_cte(self):
624n/a # In 8bit mode, universal line end logic applies. It is up to the
625n/a # application to make sure the lines are short enough; we don't check.
626n/a m = self._make_message()
627n/a content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
628n/a m.set_content(content, 'application', 'octet-stream', cte='8bit')
629n/a self.assertEqual(bytes(m), textwrap.dedent("""\
630n/a Content-Type: application/octet-stream
631n/a Content-Transfer-Encoding: 8bit
632n/a MIME-Version: 1.0
633n/a
634n/a """).encode('ascii') +
635n/a b'b\xFFgus\tcon\nt\nent\n' +
636n/a b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
637n/a self.assertEqual(m.get_payload(decode=True), content)
638n/a self.assertEqual(m.get_content(), content)
639n/a
640n/a def test_set_headers_from_header_objects(self):
641n/a m = self._make_message()
642n/a content = "Simple message.\n"
643n/a header_factory = self.policy.header_factory
644n/a raw_data_manager.set_content(m, content, headers=(
645n/a header_factory("To", "foo@example.com"),
646n/a header_factory("From", "foo@example.com"),
647n/a header_factory("Subject", "I'm talking to myself.")))
648n/a self.assertEqual(str(m), textwrap.dedent("""\
649n/a Content-Type: text/plain; charset="utf-8"
650n/a To: foo@example.com
651n/a From: foo@example.com
652n/a Subject: I'm talking to myself.
653n/a Content-Transfer-Encoding: 7bit
654n/a
655n/a Simple message.
656n/a """))
657n/a
658n/a def test_set_headers_from_strings(self):
659n/a m = self._make_message()
660n/a content = "Simple message.\n"
661n/a raw_data_manager.set_content(m, content, headers=(
662n/a "X-Foo-Header: foo",
663n/a "X-Bar-Header: bar",))
664n/a self.assertEqual(str(m), textwrap.dedent("""\
665n/a Content-Type: text/plain; charset="utf-8"
666n/a X-Foo-Header: foo
667n/a X-Bar-Header: bar
668n/a Content-Transfer-Encoding: 7bit
669n/a
670n/a Simple message.
671n/a """))
672n/a
673n/a def test_set_headers_with_invalid_duplicate_string_header_raises(self):
674n/a m = self._make_message()
675n/a content = "Simple message.\n"
676n/a with self.assertRaisesRegex(ValueError, 'Content-Type'):
677n/a raw_data_manager.set_content(m, content, headers=(
678n/a "Content-Type: foo/bar",)
679n/a )
680n/a
681n/a def test_set_headers_with_invalid_duplicate_header_header_raises(self):
682n/a m = self._make_message()
683n/a content = "Simple message.\n"
684n/a header_factory = self.policy.header_factory
685n/a with self.assertRaisesRegex(ValueError, 'Content-Type'):
686n/a raw_data_manager.set_content(m, content, headers=(
687n/a header_factory("Content-Type", " foo/bar"),)
688n/a )
689n/a
690n/a def test_set_headers_with_defective_string_header_raises(self):
691n/a m = self._make_message()
692n/a content = "Simple message.\n"
693n/a with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
694n/a raw_data_manager.set_content(m, content, headers=(
695n/a 'To: a@fairly@@invalid@address',)
696n/a )
697n/a print(m['To'].defects)
698n/a
699n/a def test_set_headers_with_defective_header_header_raises(self):
700n/a m = self._make_message()
701n/a content = "Simple message.\n"
702n/a header_factory = self.policy.header_factory
703n/a with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
704n/a raw_data_manager.set_content(m, content, headers=(
705n/a header_factory('To', 'a@fairly@@invalid@address'),)
706n/a )
707n/a print(m['To'].defects)
708n/a
709n/a def test_set_disposition_inline(self):
710n/a m = self._make_message()
711n/a m.set_content('foo', disposition='inline')
712n/a self.assertEqual(m['Content-Disposition'], 'inline')
713n/a
714n/a def test_set_disposition_attachment(self):
715n/a m = self._make_message()
716n/a m.set_content('foo', disposition='attachment')
717n/a self.assertEqual(m['Content-Disposition'], 'attachment')
718n/a
719n/a def test_set_disposition_foo(self):
720n/a m = self._make_message()
721n/a m.set_content('foo', disposition='foo')
722n/a self.assertEqual(m['Content-Disposition'], 'foo')
723n/a
724n/a # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
725n/a # would cause 'foo' above to raise.
726n/a
727n/a def test_set_filename(self):
728n/a m = self._make_message()
729n/a m.set_content('foo', filename='bar.txt')
730n/a self.assertEqual(m['Content-Disposition'],
731n/a 'attachment; filename="bar.txt"')
732n/a
733n/a def test_set_filename_and_disposition_inline(self):
734n/a m = self._make_message()
735n/a m.set_content('foo', disposition='inline', filename='bar.txt')
736n/a self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
737n/a
738n/a def test_set_non_ascii_filename(self):
739n/a m = self._make_message()
740n/a m.set_content('foo', filename='ábárî.txt')
741n/a self.assertEqual(bytes(m), textwrap.dedent("""\
742n/a Content-Type: text/plain; charset="utf-8"
743n/a Content-Transfer-Encoding: 7bit
744n/a Content-Disposition: attachment;
745n/a filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
746n/a MIME-Version: 1.0
747n/a
748n/a foo
749n/a """).encode('ascii'))
750n/a
751n/a content_object_params = {
752n/a 'text_plain': ('content', ()),
753n/a 'text_html': ('content', ('html',)),
754n/a 'application_octet_stream': (b'content',
755n/a ('application', 'octet_stream')),
756n/a 'image_jpeg': (b'content', ('image', 'jpeg')),
757n/a 'message_rfc822': (message(), ()),
758n/a 'message_external_body': (message(), ('external-body',)),
759n/a }
760n/a
761n/a def content_object_as_header_receiver(self, obj, mimetype):
762n/a m = self._make_message()
763n/a m.set_content(obj, *mimetype, headers=(
764n/a 'To: foo@example.com',
765n/a 'From: bar@simple.net'))
766n/a self.assertEqual(m['to'], 'foo@example.com')
767n/a self.assertEqual(m['from'], 'bar@simple.net')
768n/a
769n/a def content_object_as_disposition_inline_receiver(self, obj, mimetype):
770n/a m = self._make_message()
771n/a m.set_content(obj, *mimetype, disposition='inline')
772n/a self.assertEqual(m['Content-Disposition'], 'inline')
773n/a
774n/a def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
775n/a m = self._make_message()
776n/a m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
777n/a self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
778n/a self.assertEqual(m.get_filename(), "bár.txt")
779n/a self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
780n/a
781n/a def content_object_as_cid_receiver(self, obj, mimetype):
782n/a m = self._make_message()
783n/a m.set_content(obj, *mimetype, cid='some_random_stuff')
784n/a self.assertEqual(m['Content-ID'], 'some_random_stuff')
785n/a
786n/a def content_object_as_params_receiver(self, obj, mimetype):
787n/a m = self._make_message()
788n/a params = {'foo': 'bár', 'abc': 'xyz'}
789n/a m.set_content(obj, *mimetype, params=params)
790n/a if isinstance(obj, str):
791n/a params['charset'] = 'utf-8'
792n/a self.assertEqual(m['Content-Type'].params, params)
793n/a
794n/a
795n/aif __name__ == '__main__':
796n/a unittest.main()