ยปCore Development>Code coverage>Lib/test/test_email/test_policy.py

Python code coverage for Lib/test/test_email/test_policy.py

#countcontent
1n/aimport io
2n/aimport types
3n/aimport textwrap
4n/aimport unittest
5n/aimport email.policy
6n/aimport email.parser
7n/aimport email.generator
8n/aimport email.message
9n/afrom email import headerregistry
10n/a
11n/adef make_defaults(base_defaults, differences):
12n/a defaults = base_defaults.copy()
13n/a defaults.update(differences)
14n/a return defaults
15n/a
16n/aclass PolicyAPITests(unittest.TestCase):
17n/a
18n/a longMessage = True
19n/a
20n/a # Base default values.
21n/a compat32_defaults = {
22n/a 'max_line_length': 78,
23n/a 'linesep': '\n',
24n/a 'cte_type': '8bit',
25n/a 'raise_on_defect': False,
26n/a 'mangle_from_': True,
27n/a 'message_factory': None,
28n/a }
29n/a # These default values are the ones set on email.policy.default.
30n/a # If any of these defaults change, the docs must be updated.
31n/a policy_defaults = compat32_defaults.copy()
32n/a policy_defaults.update({
33n/a 'utf8': False,
34n/a 'raise_on_defect': False,
35n/a 'header_factory': email.policy.EmailPolicy.header_factory,
36n/a 'refold_source': 'long',
37n/a 'content_manager': email.policy.EmailPolicy.content_manager,
38n/a 'mangle_from_': False,
39n/a 'message_factory': email.message.EmailMessage,
40n/a })
41n/a
42n/a # For each policy under test, we give here what we expect the defaults to
43n/a # be for that policy. The second argument to make defaults is the
44n/a # difference between the base defaults and that for the particular policy.
45n/a new_policy = email.policy.EmailPolicy()
46n/a policies = {
47n/a email.policy.compat32: make_defaults(compat32_defaults, {}),
48n/a email.policy.default: make_defaults(policy_defaults, {}),
49n/a email.policy.SMTP: make_defaults(policy_defaults,
50n/a {'linesep': '\r\n'}),
51n/a email.policy.SMTPUTF8: make_defaults(policy_defaults,
52n/a {'linesep': '\r\n',
53n/a 'utf8': True}),
54n/a email.policy.HTTP: make_defaults(policy_defaults,
55n/a {'linesep': '\r\n',
56n/a 'max_line_length': None}),
57n/a email.policy.strict: make_defaults(policy_defaults,
58n/a {'raise_on_defect': True}),
59n/a new_policy: make_defaults(policy_defaults, {}),
60n/a }
61n/a # Creating a new policy creates a new header factory. There is a test
62n/a # later that proves this.
63n/a policies[new_policy]['header_factory'] = new_policy.header_factory
64n/a
65n/a def test_defaults(self):
66n/a for policy, expected in self.policies.items():
67n/a for attr, value in expected.items():
68n/a with self.subTest(policy=policy, attr=attr):
69n/a self.assertEqual(getattr(policy, attr), value,
70n/a ("change {} docs/docstrings if defaults have "
71n/a "changed").format(policy))
72n/a
73n/a def test_all_attributes_covered(self):
74n/a for policy, expected in self.policies.items():
75n/a for attr in dir(policy):
76n/a with self.subTest(policy=policy, attr=attr):
77n/a if (attr.startswith('_') or
78n/a isinstance(getattr(email.policy.EmailPolicy, attr),
79n/a types.FunctionType)):
80n/a continue
81n/a else:
82n/a self.assertIn(attr, expected,
83n/a "{} is not fully tested".format(attr))
84n/a
85n/a def test_abc(self):
86n/a with self.assertRaises(TypeError) as cm:
87n/a email.policy.Policy()
88n/a msg = str(cm.exception)
89n/a abstract_methods = ('fold',
90n/a 'fold_binary',
91n/a 'header_fetch_parse',
92n/a 'header_source_parse',
93n/a 'header_store_parse')
94n/a for method in abstract_methods:
95n/a self.assertIn(method, msg)
96n/a
97n/a def test_policy_is_immutable(self):
98n/a for policy, defaults in self.policies.items():
99n/a for attr in defaults:
100n/a with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
101n/a setattr(policy, attr, None)
102n/a with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
103n/a policy.foo = None
104n/a
105n/a def test_set_policy_attrs_when_cloned(self):
106n/a # None of the attributes has a default value of None, so we set them
107n/a # all to None in the clone call and check that it worked.
108n/a for policyclass, defaults in self.policies.items():
109n/a testattrdict = {attr: None for attr in defaults}
110n/a policy = policyclass.clone(**testattrdict)
111n/a for attr in defaults:
112n/a self.assertIsNone(getattr(policy, attr))
113n/a
114n/a def test_reject_non_policy_keyword_when_called(self):
115n/a for policyclass in self.policies:
116n/a with self.assertRaises(TypeError):
117n/a policyclass(this_keyword_should_not_be_valid=None)
118n/a with self.assertRaises(TypeError):
119n/a policyclass(newtline=None)
120n/a
121n/a def test_policy_addition(self):
122n/a expected = self.policy_defaults.copy()
123n/a p1 = email.policy.default.clone(max_line_length=100)
124n/a p2 = email.policy.default.clone(max_line_length=50)
125n/a added = p1 + p2
126n/a expected.update(max_line_length=50)
127n/a for attr, value in expected.items():
128n/a self.assertEqual(getattr(added, attr), value)
129n/a added = p2 + p1
130n/a expected.update(max_line_length=100)
131n/a for attr, value in expected.items():
132n/a self.assertEqual(getattr(added, attr), value)
133n/a added = added + email.policy.default
134n/a for attr, value in expected.items():
135n/a self.assertEqual(getattr(added, attr), value)
136n/a
137n/a def test_register_defect(self):
138n/a class Dummy:
139n/a def __init__(self):
140n/a self.defects = []
141n/a obj = Dummy()
142n/a defect = object()
143n/a policy = email.policy.EmailPolicy()
144n/a policy.register_defect(obj, defect)
145n/a self.assertEqual(obj.defects, [defect])
146n/a defect2 = object()
147n/a policy.register_defect(obj, defect2)
148n/a self.assertEqual(obj.defects, [defect, defect2])
149n/a
150n/a class MyObj:
151n/a def __init__(self):
152n/a self.defects = []
153n/a
154n/a class MyDefect(Exception):
155n/a pass
156n/a
157n/a def test_handle_defect_raises_on_strict(self):
158n/a foo = self.MyObj()
159n/a defect = self.MyDefect("the telly is broken")
160n/a with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
161n/a email.policy.strict.handle_defect(foo, defect)
162n/a
163n/a def test_handle_defect_registers_defect(self):
164n/a foo = self.MyObj()
165n/a defect1 = self.MyDefect("one")
166n/a email.policy.default.handle_defect(foo, defect1)
167n/a self.assertEqual(foo.defects, [defect1])
168n/a defect2 = self.MyDefect("two")
169n/a email.policy.default.handle_defect(foo, defect2)
170n/a self.assertEqual(foo.defects, [defect1, defect2])
171n/a
172n/a class MyPolicy(email.policy.EmailPolicy):
173n/a defects = None
174n/a def __init__(self, *args, **kw):
175n/a super().__init__(*args, defects=[], **kw)
176n/a def register_defect(self, obj, defect):
177n/a self.defects.append(defect)
178n/a
179n/a def test_overridden_register_defect_still_raises(self):
180n/a foo = self.MyObj()
181n/a defect = self.MyDefect("the telly is broken")
182n/a with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
183n/a self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
184n/a
185n/a def test_overridden_register_defect_works(self):
186n/a foo = self.MyObj()
187n/a defect1 = self.MyDefect("one")
188n/a my_policy = self.MyPolicy()
189n/a my_policy.handle_defect(foo, defect1)
190n/a self.assertEqual(my_policy.defects, [defect1])
191n/a self.assertEqual(foo.defects, [])
192n/a defect2 = self.MyDefect("two")
193n/a my_policy.handle_defect(foo, defect2)
194n/a self.assertEqual(my_policy.defects, [defect1, defect2])
195n/a self.assertEqual(foo.defects, [])
196n/a
197n/a def test_default_header_factory(self):
198n/a h = email.policy.default.header_factory('Test', 'test')
199n/a self.assertEqual(h.name, 'Test')
200n/a self.assertIsInstance(h, headerregistry.UnstructuredHeader)
201n/a self.assertIsInstance(h, headerregistry.BaseHeader)
202n/a
203n/a class Foo:
204n/a parse = headerregistry.UnstructuredHeader.parse
205n/a
206n/a def test_each_Policy_gets_unique_factory(self):
207n/a policy1 = email.policy.EmailPolicy()
208n/a policy2 = email.policy.EmailPolicy()
209n/a policy1.header_factory.map_to_type('foo', self.Foo)
210n/a h = policy1.header_factory('foo', 'test')
211n/a self.assertIsInstance(h, self.Foo)
212n/a self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
213n/a h = policy2.header_factory('foo', 'test')
214n/a self.assertNotIsInstance(h, self.Foo)
215n/a self.assertIsInstance(h, headerregistry.UnstructuredHeader)
216n/a
217n/a def test_clone_copies_factory(self):
218n/a policy1 = email.policy.EmailPolicy()
219n/a policy2 = policy1.clone()
220n/a policy1.header_factory.map_to_type('foo', self.Foo)
221n/a h = policy1.header_factory('foo', 'test')
222n/a self.assertIsInstance(h, self.Foo)
223n/a h = policy2.header_factory('foo', 'test')
224n/a self.assertIsInstance(h, self.Foo)
225n/a
226n/a def test_new_factory_overrides_default(self):
227n/a mypolicy = email.policy.EmailPolicy()
228n/a myfactory = mypolicy.header_factory
229n/a newpolicy = mypolicy + email.policy.strict
230n/a self.assertEqual(newpolicy.header_factory, myfactory)
231n/a newpolicy = email.policy.strict + mypolicy
232n/a self.assertEqual(newpolicy.header_factory, myfactory)
233n/a
234n/a def test_adding_default_policies_preserves_default_factory(self):
235n/a newpolicy = email.policy.default + email.policy.strict
236n/a self.assertEqual(newpolicy.header_factory,
237n/a email.policy.EmailPolicy.header_factory)
238n/a self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
239n/a
240n/a # XXX: Need subclassing tests.
241n/a # For adding subclassed objects, make sure the usual rules apply (subclass
242n/a # wins), but that the order still works (right overrides left).
243n/a
244n/a
245n/aclass TestException(Exception):
246n/a pass
247n/a
248n/aclass TestPolicyPropagation(unittest.TestCase):
249n/a
250n/a # The abstract methods are used by the parser but not by the wrapper
251n/a # functions that call it, so if the exception gets raised we know that the
252n/a # policy was actually propagated all the way to feedparser.
253n/a class MyPolicy(email.policy.Policy):
254n/a def badmethod(self, *args, **kw):
255n/a raise TestException("test")
256n/a fold = fold_binary = header_fetch_parser = badmethod
257n/a header_source_parse = header_store_parse = badmethod
258n/a
259n/a def test_message_from_string(self):
260n/a with self.assertRaisesRegex(TestException, "^test$"):
261n/a email.message_from_string("Subject: test\n\n",
262n/a policy=self.MyPolicy)
263n/a
264n/a def test_message_from_bytes(self):
265n/a with self.assertRaisesRegex(TestException, "^test$"):
266n/a email.message_from_bytes(b"Subject: test\n\n",
267n/a policy=self.MyPolicy)
268n/a
269n/a def test_message_from_file(self):
270n/a f = io.StringIO('Subject: test\n\n')
271n/a with self.assertRaisesRegex(TestException, "^test$"):
272n/a email.message_from_file(f, policy=self.MyPolicy)
273n/a
274n/a def test_message_from_binary_file(self):
275n/a f = io.BytesIO(b'Subject: test\n\n')
276n/a with self.assertRaisesRegex(TestException, "^test$"):
277n/a email.message_from_binary_file(f, policy=self.MyPolicy)
278n/a
279n/a # These are redundant, but we need them for black-box completeness.
280n/a
281n/a def test_parser(self):
282n/a p = email.parser.Parser(policy=self.MyPolicy)
283n/a with self.assertRaisesRegex(TestException, "^test$"):
284n/a p.parsestr('Subject: test\n\n')
285n/a
286n/a def test_bytes_parser(self):
287n/a p = email.parser.BytesParser(policy=self.MyPolicy)
288n/a with self.assertRaisesRegex(TestException, "^test$"):
289n/a p.parsebytes(b'Subject: test\n\n')
290n/a
291n/a # Now that we've established that all the parse methods get the
292n/a # policy in to feedparser, we can use message_from_string for
293n/a # the rest of the propagation tests.
294n/a
295n/a def _make_msg(self, source='Subject: test\n\n', policy=None):
296n/a self.policy = email.policy.default.clone() if policy is None else policy
297n/a return email.message_from_string(source, policy=self.policy)
298n/a
299n/a def test_parser_propagates_policy_to_message(self):
300n/a msg = self._make_msg()
301n/a self.assertIs(msg.policy, self.policy)
302n/a
303n/a def test_parser_propagates_policy_to_sub_messages(self):
304n/a msg = self._make_msg(textwrap.dedent("""\
305n/a Subject: mime test
306n/a MIME-Version: 1.0
307n/a Content-Type: multipart/mixed, boundary="XXX"
308n/a
309n/a --XXX
310n/a Content-Type: text/plain
311n/a
312n/a test
313n/a --XXX
314n/a Content-Type: text/plain
315n/a
316n/a test2
317n/a --XXX--
318n/a """))
319n/a for part in msg.walk():
320n/a self.assertIs(part.policy, self.policy)
321n/a
322n/a def test_message_policy_propagates_to_generator(self):
323n/a msg = self._make_msg("Subject: test\nTo: foo\n\n",
324n/a policy=email.policy.default.clone(linesep='X'))
325n/a s = io.StringIO()
326n/a g = email.generator.Generator(s)
327n/a g.flatten(msg)
328n/a self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
329n/a
330n/a def test_message_policy_used_by_as_string(self):
331n/a msg = self._make_msg("Subject: test\nTo: foo\n\n",
332n/a policy=email.policy.default.clone(linesep='X'))
333n/a self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
334n/a
335n/a
336n/aclass TestConcretePolicies(unittest.TestCase):
337n/a
338n/a def test_header_store_parse_rejects_newlines(self):
339n/a instance = email.policy.EmailPolicy()
340n/a self.assertRaises(ValueError,
341n/a instance.header_store_parse,
342n/a 'From', 'spam\negg@foo.py')
343n/a
344n/a
345n/aif __name__ == '__main__':
346n/a unittest.main()