| 1 | n/a | import io |
|---|
| 2 | n/a | import email |
|---|
| 3 | n/a | import unittest |
|---|
| 4 | n/a | from email.message import Message, EmailMessage |
|---|
| 5 | n/a | from email.policy import default |
|---|
| 6 | n/a | from test.test_email import TestEmailBase |
|---|
| 7 | n/a | |
|---|
| 8 | n/a | |
|---|
| 9 | n/a | class TestCustomMessage(TestEmailBase): |
|---|
| 10 | n/a | |
|---|
| 11 | n/a | class MyMessage(Message): |
|---|
| 12 | n/a | def __init__(self, policy): |
|---|
| 13 | n/a | self.check_policy = policy |
|---|
| 14 | n/a | super().__init__() |
|---|
| 15 | n/a | |
|---|
| 16 | n/a | MyPolicy = TestEmailBase.policy.clone(linesep='boo') |
|---|
| 17 | n/a | |
|---|
| 18 | n/a | def test_custom_message_gets_policy_if_possible_from_string(self): |
|---|
| 19 | n/a | msg = email.message_from_string("Subject: bogus\n\nmsg\n", |
|---|
| 20 | n/a | self.MyMessage, |
|---|
| 21 | n/a | policy=self.MyPolicy) |
|---|
| 22 | n/a | self.assertIsInstance(msg, self.MyMessage) |
|---|
| 23 | n/a | self.assertIs(msg.check_policy, self.MyPolicy) |
|---|
| 24 | n/a | |
|---|
| 25 | n/a | def test_custom_message_gets_policy_if_possible_from_file(self): |
|---|
| 26 | n/a | source_file = io.StringIO("Subject: bogus\n\nmsg\n") |
|---|
| 27 | n/a | msg = email.message_from_file(source_file, |
|---|
| 28 | n/a | self.MyMessage, |
|---|
| 29 | n/a | policy=self.MyPolicy) |
|---|
| 30 | n/a | self.assertIsInstance(msg, self.MyMessage) |
|---|
| 31 | n/a | self.assertIs(msg.check_policy, self.MyPolicy) |
|---|
| 32 | n/a | |
|---|
| 33 | n/a | # XXX add tests for other functions that take Message arg. |
|---|
| 34 | n/a | |
|---|
| 35 | n/a | |
|---|
| 36 | n/a | class TestParserBase: |
|---|
| 37 | n/a | |
|---|
| 38 | n/a | def test_only_split_on_cr_lf(self): |
|---|
| 39 | n/a | # The unicode line splitter splits on unicode linebreaks, which are |
|---|
| 40 | n/a | # more numerous than allowed by the email RFCs; make sure we are only |
|---|
| 41 | n/a | # splitting on those two. |
|---|
| 42 | n/a | for parser in self.parsers: |
|---|
| 43 | n/a | with self.subTest(parser=parser.__name__): |
|---|
| 44 | n/a | msg = parser( |
|---|
| 45 | n/a | "Next-Line: not\x85broken\r\n" |
|---|
| 46 | n/a | "Null: not\x00broken\r\n" |
|---|
| 47 | n/a | "Vertical-Tab: not\vbroken\r\n" |
|---|
| 48 | n/a | "Form-Feed: not\fbroken\r\n" |
|---|
| 49 | n/a | "File-Separator: not\x1Cbroken\r\n" |
|---|
| 50 | n/a | "Group-Separator: not\x1Dbroken\r\n" |
|---|
| 51 | n/a | "Record-Separator: not\x1Ebroken\r\n" |
|---|
| 52 | n/a | "Line-Separator: not\u2028broken\r\n" |
|---|
| 53 | n/a | "Paragraph-Separator: not\u2029broken\r\n" |
|---|
| 54 | n/a | "\r\n", |
|---|
| 55 | n/a | policy=default, |
|---|
| 56 | n/a | ) |
|---|
| 57 | n/a | self.assertEqual(msg.items(), [ |
|---|
| 58 | n/a | ("Next-Line", "not\x85broken"), |
|---|
| 59 | n/a | ("Null", "not\x00broken"), |
|---|
| 60 | n/a | ("Vertical-Tab", "not\vbroken"), |
|---|
| 61 | n/a | ("Form-Feed", "not\fbroken"), |
|---|
| 62 | n/a | ("File-Separator", "not\x1Cbroken"), |
|---|
| 63 | n/a | ("Group-Separator", "not\x1Dbroken"), |
|---|
| 64 | n/a | ("Record-Separator", "not\x1Ebroken"), |
|---|
| 65 | n/a | ("Line-Separator", "not\u2028broken"), |
|---|
| 66 | n/a | ("Paragraph-Separator", "not\u2029broken"), |
|---|
| 67 | n/a | ]) |
|---|
| 68 | n/a | self.assertEqual(msg.get_payload(), "") |
|---|
| 69 | n/a | |
|---|
| 70 | n/a | class MyMessage(EmailMessage): |
|---|
| 71 | n/a | pass |
|---|
| 72 | n/a | |
|---|
| 73 | n/a | def test_custom_message_factory_on_policy(self): |
|---|
| 74 | n/a | for parser in self.parsers: |
|---|
| 75 | n/a | with self.subTest(parser=parser.__name__): |
|---|
| 76 | n/a | MyPolicy = default.clone(message_factory=self.MyMessage) |
|---|
| 77 | n/a | msg = parser("To: foo\n\ntest", policy=MyPolicy) |
|---|
| 78 | n/a | self.assertIsInstance(msg, self.MyMessage) |
|---|
| 79 | n/a | |
|---|
| 80 | n/a | def test_factory_arg_overrides_policy(self): |
|---|
| 81 | n/a | for parser in self.parsers: |
|---|
| 82 | n/a | with self.subTest(parser=parser.__name__): |
|---|
| 83 | n/a | MyPolicy = default.clone(message_factory=self.MyMessage) |
|---|
| 84 | n/a | msg = parser("To: foo\n\ntest", Message, policy=MyPolicy) |
|---|
| 85 | n/a | self.assertNotIsInstance(msg, self.MyMessage) |
|---|
| 86 | n/a | self.assertIsInstance(msg, Message) |
|---|
| 87 | n/a | |
|---|
| 88 | n/a | # Play some games to get nice output in subTest. This code could be clearer |
|---|
| 89 | n/a | # if staticmethod supported __name__. |
|---|
| 90 | n/a | |
|---|
| 91 | n/a | def message_from_file(s, *args, **kw): |
|---|
| 92 | n/a | f = io.StringIO(s) |
|---|
| 93 | n/a | return email.message_from_file(f, *args, **kw) |
|---|
| 94 | n/a | |
|---|
| 95 | n/a | class TestParser(TestParserBase, TestEmailBase): |
|---|
| 96 | n/a | parsers = (email.message_from_string, message_from_file) |
|---|
| 97 | n/a | |
|---|
| 98 | n/a | def message_from_bytes(s, *args, **kw): |
|---|
| 99 | n/a | return email.message_from_bytes(s.encode(), *args, **kw) |
|---|
| 100 | n/a | |
|---|
| 101 | n/a | def message_from_binary_file(s, *args, **kw): |
|---|
| 102 | n/a | f = io.BytesIO(s.encode()) |
|---|
| 103 | n/a | return email.message_from_binary_file(f, *args, **kw) |
|---|
| 104 | n/a | |
|---|
| 105 | n/a | class TestBytesParser(TestParserBase, TestEmailBase): |
|---|
| 106 | n/a | parsers = (message_from_bytes, message_from_binary_file) |
|---|
| 107 | n/a | |
|---|
| 108 | n/a | |
|---|
| 109 | n/a | if __name__ == '__main__': |
|---|
| 110 | n/a | unittest.main() |
|---|