»Core Development>Code coverage>Lib/test/test_mailbox.py

Python code coverage for Lib/test/test_mailbox.py

#countcontent
1n/aimport os
2n/aimport sys
3n/aimport time
4n/aimport stat
5n/aimport socket
6n/aimport email
7n/aimport email.message
8n/aimport re
9n/aimport io
10n/aimport tempfile
11n/afrom test import support
12n/aimport unittest
13n/aimport textwrap
14n/aimport mailbox
15n/aimport glob
16n/a
17n/a
18n/aclass TestBase:
19n/a
20n/a all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
21n/a mailbox.mboxMessage, mailbox.MHMessage,
22n/a mailbox.BabylMessage, mailbox.MMDFMessage)
23n/a
24n/a def _check_sample(self, msg):
25n/a # Inspect a mailbox.Message representation of the sample message
26n/a self.assertIsInstance(msg, email.message.Message)
27n/a self.assertIsInstance(msg, mailbox.Message)
28n/a for key, value in _sample_headers.items():
29n/a self.assertIn(value, msg.get_all(key))
30n/a self.assertTrue(msg.is_multipart())
31n/a self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
32n/a for i, payload in enumerate(_sample_payloads):
33n/a part = msg.get_payload(i)
34n/a self.assertIsInstance(part, email.message.Message)
35n/a self.assertNotIsInstance(part, mailbox.Message)
36n/a self.assertEqual(part.get_payload(), payload)
37n/a
38n/a def _delete_recursively(self, target):
39n/a # Delete a file or delete a directory recursively
40n/a if os.path.isdir(target):
41n/a support.rmtree(target)
42n/a elif os.path.exists(target):
43n/a support.unlink(target)
44n/a
45n/a
46n/aclass TestMailbox(TestBase):
47n/a
48n/a maxDiff = None
49n/a
50n/a _factory = None # Overridden by subclasses to reuse tests
51n/a _template = 'From: foo\n\n%s\n'
52n/a
53n/a def setUp(self):
54n/a self._path = support.TESTFN
55n/a self._delete_recursively(self._path)
56n/a self._box = self._factory(self._path)
57n/a
58n/a def tearDown(self):
59n/a self._box.close()
60n/a self._delete_recursively(self._path)
61n/a
62n/a def test_add(self):
63n/a # Add copies of a sample message
64n/a keys = []
65n/a keys.append(self._box.add(self._template % 0))
66n/a self.assertEqual(len(self._box), 1)
67n/a keys.append(self._box.add(mailbox.Message(_sample_message)))
68n/a self.assertEqual(len(self._box), 2)
69n/a keys.append(self._box.add(email.message_from_string(_sample_message)))
70n/a self.assertEqual(len(self._box), 3)
71n/a keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
72n/a self.assertEqual(len(self._box), 4)
73n/a keys.append(self._box.add(_sample_message))
74n/a self.assertEqual(len(self._box), 5)
75n/a keys.append(self._box.add(_bytes_sample_message))
76n/a self.assertEqual(len(self._box), 6)
77n/a with self.assertWarns(DeprecationWarning):
78n/a keys.append(self._box.add(
79n/a io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
80n/a self.assertEqual(len(self._box), 7)
81n/a self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
82n/a for i in (1, 2, 3, 4, 5, 6):
83n/a self._check_sample(self._box[keys[i]])
84n/a
85n/a _nonascii_msg = textwrap.dedent("""\
86n/a From: foo
87n/a Subject: Falinaptár házhozszállítással. Már rendeltél?
88n/a
89n/a 0
90n/a """)
91n/a
92n/a def test_add_invalid_8bit_bytes_header(self):
93n/a key = self._box.add(self._nonascii_msg.encode('latin-1'))
94n/a self.assertEqual(len(self._box), 1)
95n/a self.assertEqual(self._box.get_bytes(key),
96n/a self._nonascii_msg.encode('latin-1'))
97n/a
98n/a def test_invalid_nonascii_header_as_string(self):
99n/a subj = self._nonascii_msg.splitlines()[1]
100n/a key = self._box.add(subj.encode('latin-1'))
101n/a self.assertEqual(self._box.get_string(key),
102n/a 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
103n/a 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
104n/a
105n/a def test_add_nonascii_string_header_raises(self):
106n/a with self.assertRaisesRegex(ValueError, "ASCII-only"):
107n/a self._box.add(self._nonascii_msg)
108n/a self._box.flush()
109n/a self.assertEqual(len(self._box), 0)
110n/a self.assertMailboxEmpty()
111n/a
112n/a def test_add_that_raises_leaves_mailbox_empty(self):
113n/a def raiser(*args, **kw):
114n/a raise Exception("a fake error")
115n/a support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
116n/a with self.assertRaises(Exception):
117n/a self._box.add(email.message_from_string("From: Alphöso"))
118n/a self.assertEqual(len(self._box), 0)
119n/a self._box.close()
120n/a self.assertMailboxEmpty()
121n/a
122n/a _non_latin_bin_msg = textwrap.dedent("""\
123n/a From: foo@bar.com
124n/a To: báz
125n/a Subject: Maintenant je vous présente mon collègue, le pouf célèbre
126n/a \tJean de Baddie
127n/a Mime-Version: 1.0
128n/a Content-Type: text/plain; charset="utf-8"
129n/a Content-Transfer-Encoding: 8bit
130n/a
131n/a Да, они летят.
132n/a """).encode('utf-8')
133n/a
134n/a def test_add_8bit_body(self):
135n/a key = self._box.add(self._non_latin_bin_msg)
136n/a self.assertEqual(self._box.get_bytes(key),
137n/a self._non_latin_bin_msg)
138n/a with self._box.get_file(key) as f:
139n/a self.assertEqual(f.read(),
140n/a self._non_latin_bin_msg.replace(b'\n',
141n/a os.linesep.encode()))
142n/a self.assertEqual(self._box[key].get_payload(),
143n/a "Да, они летят.\n")
144n/a
145n/a def test_add_binary_file(self):
146n/a with tempfile.TemporaryFile('wb+') as f:
147n/a f.write(_bytes_sample_message)
148n/a f.seek(0)
149n/a key = self._box.add(f)
150n/a self.assertEqual(self._box.get_bytes(key).split(b'\n'),
151n/a _bytes_sample_message.split(b'\n'))
152n/a
153n/a def test_add_binary_nonascii_file(self):
154n/a with tempfile.TemporaryFile('wb+') as f:
155n/a f.write(self._non_latin_bin_msg)
156n/a f.seek(0)
157n/a key = self._box.add(f)
158n/a self.assertEqual(self._box.get_bytes(key).split(b'\n'),
159n/a self._non_latin_bin_msg.split(b'\n'))
160n/a
161n/a def test_add_text_file_warns(self):
162n/a with tempfile.TemporaryFile('w+') as f:
163n/a f.write(_sample_message)
164n/a f.seek(0)
165n/a with self.assertWarns(DeprecationWarning):
166n/a key = self._box.add(f)
167n/a self.assertEqual(self._box.get_bytes(key).split(b'\n'),
168n/a _bytes_sample_message.split(b'\n'))
169n/a
170n/a def test_add_StringIO_warns(self):
171n/a with self.assertWarns(DeprecationWarning):
172n/a key = self._box.add(io.StringIO(self._template % "0"))
173n/a self.assertEqual(self._box.get_string(key), self._template % "0")
174n/a
175n/a def test_add_nonascii_StringIO_raises(self):
176n/a with self.assertWarns(DeprecationWarning):
177n/a with self.assertRaisesRegex(ValueError, "ASCII-only"):
178n/a self._box.add(io.StringIO(self._nonascii_msg))
179n/a self.assertEqual(len(self._box), 0)
180n/a self._box.close()
181n/a self.assertMailboxEmpty()
182n/a
183n/a def test_remove(self):
184n/a # Remove messages using remove()
185n/a self._test_remove_or_delitem(self._box.remove)
186n/a
187n/a def test_delitem(self):
188n/a # Remove messages using __delitem__()
189n/a self._test_remove_or_delitem(self._box.__delitem__)
190n/a
191n/a def _test_remove_or_delitem(self, method):
192n/a # (Used by test_remove() and test_delitem().)
193n/a key0 = self._box.add(self._template % 0)
194n/a key1 = self._box.add(self._template % 1)
195n/a self.assertEqual(len(self._box), 2)
196n/a method(key0)
197n/a self.assertEqual(len(self._box), 1)
198n/a self.assertRaises(KeyError, lambda: self._box[key0])
199n/a self.assertRaises(KeyError, lambda: method(key0))
200n/a self.assertEqual(self._box.get_string(key1), self._template % 1)
201n/a key2 = self._box.add(self._template % 2)
202n/a self.assertEqual(len(self._box), 2)
203n/a method(key2)
204n/a self.assertEqual(len(self._box), 1)
205n/a self.assertRaises(KeyError, lambda: self._box[key2])
206n/a self.assertRaises(KeyError, lambda: method(key2))
207n/a self.assertEqual(self._box.get_string(key1), self._template % 1)
208n/a method(key1)
209n/a self.assertEqual(len(self._box), 0)
210n/a self.assertRaises(KeyError, lambda: self._box[key1])
211n/a self.assertRaises(KeyError, lambda: method(key1))
212n/a
213n/a def test_discard(self, repetitions=10):
214n/a # Discard messages
215n/a key0 = self._box.add(self._template % 0)
216n/a key1 = self._box.add(self._template % 1)
217n/a self.assertEqual(len(self._box), 2)
218n/a self._box.discard(key0)
219n/a self.assertEqual(len(self._box), 1)
220n/a self.assertRaises(KeyError, lambda: self._box[key0])
221n/a self._box.discard(key0)
222n/a self.assertEqual(len(self._box), 1)
223n/a self.assertRaises(KeyError, lambda: self._box[key0])
224n/a
225n/a def test_get(self):
226n/a # Retrieve messages using get()
227n/a key0 = self._box.add(self._template % 0)
228n/a msg = self._box.get(key0)
229n/a self.assertEqual(msg['from'], 'foo')
230n/a self.assertEqual(msg.get_payload(), '0\n')
231n/a self.assertIsNone(self._box.get('foo'))
232n/a self.assertIs(self._box.get('foo', False), False)
233n/a self._box.close()
234n/a self._box = self._factory(self._path)
235n/a key1 = self._box.add(self._template % 1)
236n/a msg = self._box.get(key1)
237n/a self.assertEqual(msg['from'], 'foo')
238n/a self.assertEqual(msg.get_payload(), '1\n')
239n/a
240n/a def test_getitem(self):
241n/a # Retrieve message using __getitem__()
242n/a key0 = self._box.add(self._template % 0)
243n/a msg = self._box[key0]
244n/a self.assertEqual(msg['from'], 'foo')
245n/a self.assertEqual(msg.get_payload(), '0\n')
246n/a self.assertRaises(KeyError, lambda: self._box['foo'])
247n/a self._box.discard(key0)
248n/a self.assertRaises(KeyError, lambda: self._box[key0])
249n/a
250n/a def test_get_message(self):
251n/a # Get Message representations of messages
252n/a key0 = self._box.add(self._template % 0)
253n/a key1 = self._box.add(_sample_message)
254n/a msg0 = self._box.get_message(key0)
255n/a self.assertIsInstance(msg0, mailbox.Message)
256n/a self.assertEqual(msg0['from'], 'foo')
257n/a self.assertEqual(msg0.get_payload(), '0\n')
258n/a self._check_sample(self._box.get_message(key1))
259n/a
260n/a def test_get_bytes(self):
261n/a # Get bytes representations of messages
262n/a key0 = self._box.add(self._template % 0)
263n/a key1 = self._box.add(_sample_message)
264n/a self.assertEqual(self._box.get_bytes(key0),
265n/a (self._template % 0).encode('ascii'))
266n/a self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
267n/a
268n/a def test_get_string(self):
269n/a # Get string representations of messages
270n/a key0 = self._box.add(self._template % 0)
271n/a key1 = self._box.add(_sample_message)
272n/a self.assertEqual(self._box.get_string(key0), self._template % 0)
273n/a self.assertEqual(self._box.get_string(key1).split('\n'),
274n/a _sample_message.split('\n'))
275n/a
276n/a def test_get_file(self):
277n/a # Get file representations of messages
278n/a key0 = self._box.add(self._template % 0)
279n/a key1 = self._box.add(_sample_message)
280n/a with self._box.get_file(key0) as file:
281n/a data0 = file.read()
282n/a with self._box.get_file(key1) as file:
283n/a data1 = file.read()
284n/a self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
285n/a self._template % 0)
286n/a self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
287n/a _sample_message)
288n/a
289n/a def test_get_file_can_be_closed_twice(self):
290n/a # Issue 11700
291n/a key = self._box.add(_sample_message)
292n/a f = self._box.get_file(key)
293n/a f.close()
294n/a f.close()
295n/a
296n/a def test_iterkeys(self):
297n/a # Get keys using iterkeys()
298n/a self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
299n/a
300n/a def test_keys(self):
301n/a # Get keys using keys()
302n/a self._check_iteration(self._box.keys, do_keys=True, do_values=False)
303n/a
304n/a def test_itervalues(self):
305n/a # Get values using itervalues()
306n/a self._check_iteration(self._box.itervalues, do_keys=False,
307n/a do_values=True)
308n/a
309n/a def test_iter(self):
310n/a # Get values using __iter__()
311n/a self._check_iteration(self._box.__iter__, do_keys=False,
312n/a do_values=True)
313n/a
314n/a def test_values(self):
315n/a # Get values using values()
316n/a self._check_iteration(self._box.values, do_keys=False, do_values=True)
317n/a
318n/a def test_iteritems(self):
319n/a # Get keys and values using iteritems()
320n/a self._check_iteration(self._box.iteritems, do_keys=True,
321n/a do_values=True)
322n/a
323n/a def test_items(self):
324n/a # Get keys and values using items()
325n/a self._check_iteration(self._box.items, do_keys=True, do_values=True)
326n/a
327n/a def _check_iteration(self, method, do_keys, do_values, repetitions=10):
328n/a for value in method():
329n/a self.fail("Not empty")
330n/a keys, values = [], []
331n/a for i in range(repetitions):
332n/a keys.append(self._box.add(self._template % i))
333n/a values.append(self._template % i)
334n/a if do_keys and not do_values:
335n/a returned_keys = list(method())
336n/a elif do_values and not do_keys:
337n/a returned_values = list(method())
338n/a else:
339n/a returned_keys, returned_values = [], []
340n/a for key, value in method():
341n/a returned_keys.append(key)
342n/a returned_values.append(value)
343n/a if do_keys:
344n/a self.assertEqual(len(keys), len(returned_keys))
345n/a self.assertEqual(set(keys), set(returned_keys))
346n/a if do_values:
347n/a count = 0
348n/a for value in returned_values:
349n/a self.assertEqual(value['from'], 'foo')
350n/a self.assertLess(int(value.get_payload()), repetitions)
351n/a count += 1
352n/a self.assertEqual(len(values), count)
353n/a
354n/a def test_contains(self):
355n/a # Check existence of keys using __contains__()
356n/a self.assertNotIn('foo', self._box)
357n/a key0 = self._box.add(self._template % 0)
358n/a self.assertIn(key0, self._box)
359n/a self.assertNotIn('foo', self._box)
360n/a key1 = self._box.add(self._template % 1)
361n/a self.assertIn(key1, self._box)
362n/a self.assertIn(key0, self._box)
363n/a self.assertNotIn('foo', self._box)
364n/a self._box.remove(key0)
365n/a self.assertNotIn(key0, self._box)
366n/a self.assertIn(key1, self._box)
367n/a self.assertNotIn('foo', self._box)
368n/a self._box.remove(key1)
369n/a self.assertNotIn(key1, self._box)
370n/a self.assertNotIn(key0, self._box)
371n/a self.assertNotIn('foo', self._box)
372n/a
373n/a def test_len(self, repetitions=10):
374n/a # Get message count
375n/a keys = []
376n/a for i in range(repetitions):
377n/a self.assertEqual(len(self._box), i)
378n/a keys.append(self._box.add(self._template % i))
379n/a self.assertEqual(len(self._box), i + 1)
380n/a for i in range(repetitions):
381n/a self.assertEqual(len(self._box), repetitions - i)
382n/a self._box.remove(keys[i])
383n/a self.assertEqual(len(self._box), repetitions - i - 1)
384n/a
385n/a def test_set_item(self):
386n/a # Modify messages using __setitem__()
387n/a key0 = self._box.add(self._template % 'original 0')
388n/a self.assertEqual(self._box.get_string(key0),
389n/a self._template % 'original 0')
390n/a key1 = self._box.add(self._template % 'original 1')
391n/a self.assertEqual(self._box.get_string(key1),
392n/a self._template % 'original 1')
393n/a self._box[key0] = self._template % 'changed 0'
394n/a self.assertEqual(self._box.get_string(key0),
395n/a self._template % 'changed 0')
396n/a self._box[key1] = self._template % 'changed 1'
397n/a self.assertEqual(self._box.get_string(key1),
398n/a self._template % 'changed 1')
399n/a self._box[key0] = _sample_message
400n/a self._check_sample(self._box[key0])
401n/a self._box[key1] = self._box[key0]
402n/a self._check_sample(self._box[key1])
403n/a self._box[key0] = self._template % 'original 0'
404n/a self.assertEqual(self._box.get_string(key0),
405n/a self._template % 'original 0')
406n/a self._check_sample(self._box[key1])
407n/a self.assertRaises(KeyError,
408n/a lambda: self._box.__setitem__('foo', 'bar'))
409n/a self.assertRaises(KeyError, lambda: self._box['foo'])
410n/a self.assertEqual(len(self._box), 2)
411n/a
412n/a def test_clear(self, iterations=10):
413n/a # Remove all messages using clear()
414n/a keys = []
415n/a for i in range(iterations):
416n/a self._box.add(self._template % i)
417n/a for i, key in enumerate(keys):
418n/a self.assertEqual(self._box.get_string(key), self._template % i)
419n/a self._box.clear()
420n/a self.assertEqual(len(self._box), 0)
421n/a for i, key in enumerate(keys):
422n/a self.assertRaises(KeyError, lambda: self._box.get_string(key))
423n/a
424n/a def test_pop(self):
425n/a # Get and remove a message using pop()
426n/a key0 = self._box.add(self._template % 0)
427n/a self.assertIn(key0, self._box)
428n/a key1 = self._box.add(self._template % 1)
429n/a self.assertIn(key1, self._box)
430n/a self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
431n/a self.assertNotIn(key0, self._box)
432n/a self.assertIn(key1, self._box)
433n/a key2 = self._box.add(self._template % 2)
434n/a self.assertIn(key2, self._box)
435n/a self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
436n/a self.assertNotIn(key2, self._box)
437n/a self.assertIn(key1, self._box)
438n/a self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
439n/a self.assertNotIn(key1, self._box)
440n/a self.assertEqual(len(self._box), 0)
441n/a
442n/a def test_popitem(self, iterations=10):
443n/a # Get and remove an arbitrary (key, message) using popitem()
444n/a keys = []
445n/a for i in range(10):
446n/a keys.append(self._box.add(self._template % i))
447n/a seen = []
448n/a for i in range(10):
449n/a key, msg = self._box.popitem()
450n/a self.assertIn(key, keys)
451n/a self.assertNotIn(key, seen)
452n/a seen.append(key)
453n/a self.assertEqual(int(msg.get_payload()), keys.index(key))
454n/a self.assertEqual(len(self._box), 0)
455n/a for key in keys:
456n/a self.assertRaises(KeyError, lambda: self._box[key])
457n/a
458n/a def test_update(self):
459n/a # Modify multiple messages using update()
460n/a key0 = self._box.add(self._template % 'original 0')
461n/a key1 = self._box.add(self._template % 'original 1')
462n/a key2 = self._box.add(self._template % 'original 2')
463n/a self._box.update({key0: self._template % 'changed 0',
464n/a key2: _sample_message})
465n/a self.assertEqual(len(self._box), 3)
466n/a self.assertEqual(self._box.get_string(key0),
467n/a self._template % 'changed 0')
468n/a self.assertEqual(self._box.get_string(key1),
469n/a self._template % 'original 1')
470n/a self._check_sample(self._box[key2])
471n/a self._box.update([(key2, self._template % 'changed 2'),
472n/a (key1, self._template % 'changed 1'),
473n/a (key0, self._template % 'original 0')])
474n/a self.assertEqual(len(self._box), 3)
475n/a self.assertEqual(self._box.get_string(key0),
476n/a self._template % 'original 0')
477n/a self.assertEqual(self._box.get_string(key1),
478n/a self._template % 'changed 1')
479n/a self.assertEqual(self._box.get_string(key2),
480n/a self._template % 'changed 2')
481n/a self.assertRaises(KeyError,
482n/a lambda: self._box.update({'foo': 'bar',
483n/a key0: self._template % "changed 0"}))
484n/a self.assertEqual(len(self._box), 3)
485n/a self.assertEqual(self._box.get_string(key0),
486n/a self._template % "changed 0")
487n/a self.assertEqual(self._box.get_string(key1),
488n/a self._template % "changed 1")
489n/a self.assertEqual(self._box.get_string(key2),
490n/a self._template % "changed 2")
491n/a
492n/a def test_flush(self):
493n/a # Write changes to disk
494n/a self._test_flush_or_close(self._box.flush, True)
495n/a
496n/a def test_popitem_and_flush_twice(self):
497n/a # See #15036.
498n/a self._box.add(self._template % 0)
499n/a self._box.add(self._template % 1)
500n/a self._box.flush()
501n/a
502n/a self._box.popitem()
503n/a self._box.flush()
504n/a self._box.popitem()
505n/a self._box.flush()
506n/a
507n/a def test_lock_unlock(self):
508n/a # Lock and unlock the mailbox
509n/a self.assertFalse(os.path.exists(self._get_lock_path()))
510n/a self._box.lock()
511n/a self.assertTrue(os.path.exists(self._get_lock_path()))
512n/a self._box.unlock()
513n/a self.assertFalse(os.path.exists(self._get_lock_path()))
514n/a
515n/a def test_close(self):
516n/a # Close mailbox and flush changes to disk
517n/a self._test_flush_or_close(self._box.close, False)
518n/a
519n/a def _test_flush_or_close(self, method, should_call_close):
520n/a contents = [self._template % i for i in range(3)]
521n/a self._box.add(contents[0])
522n/a self._box.add(contents[1])
523n/a self._box.add(contents[2])
524n/a oldbox = self._box
525n/a method()
526n/a if should_call_close:
527n/a self._box.close()
528n/a self._box = self._factory(self._path)
529n/a keys = self._box.keys()
530n/a self.assertEqual(len(keys), 3)
531n/a for key in keys:
532n/a self.assertIn(self._box.get_string(key), contents)
533n/a oldbox.close()
534n/a
535n/a def test_dump_message(self):
536n/a # Write message representations to disk
537n/a for input in (email.message_from_string(_sample_message),
538n/a _sample_message, io.BytesIO(_bytes_sample_message)):
539n/a output = io.BytesIO()
540n/a self._box._dump_message(input, output)
541n/a self.assertEqual(output.getvalue(),
542n/a _bytes_sample_message.replace(b'\n', os.linesep.encode()))
543n/a output = io.BytesIO()
544n/a self.assertRaises(TypeError,
545n/a lambda: self._box._dump_message(None, output))
546n/a
547n/a def _get_lock_path(self):
548n/a # Return the path of the dot lock file. May be overridden.
549n/a return self._path + '.lock'
550n/a
551n/a
552n/aclass TestMailboxSuperclass(TestBase, unittest.TestCase):
553n/a
554n/a def test_notimplemented(self):
555n/a # Test that all Mailbox methods raise NotImplementedException.
556n/a box = mailbox.Mailbox('path')
557n/a self.assertRaises(NotImplementedError, lambda: box.add(''))
558n/a self.assertRaises(NotImplementedError, lambda: box.remove(''))
559n/a self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
560n/a self.assertRaises(NotImplementedError, lambda: box.discard(''))
561n/a self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
562n/a self.assertRaises(NotImplementedError, lambda: box.iterkeys())
563n/a self.assertRaises(NotImplementedError, lambda: box.keys())
564n/a self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
565n/a self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
566n/a self.assertRaises(NotImplementedError, lambda: box.values())
567n/a self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
568n/a self.assertRaises(NotImplementedError, lambda: box.items())
569n/a self.assertRaises(NotImplementedError, lambda: box.get(''))
570n/a self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
571n/a self.assertRaises(NotImplementedError, lambda: box.get_message(''))
572n/a self.assertRaises(NotImplementedError, lambda: box.get_string(''))
573n/a self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
574n/a self.assertRaises(NotImplementedError, lambda: box.get_file(''))
575n/a self.assertRaises(NotImplementedError, lambda: '' in box)
576n/a self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
577n/a self.assertRaises(NotImplementedError, lambda: box.__len__())
578n/a self.assertRaises(NotImplementedError, lambda: box.clear())
579n/a self.assertRaises(NotImplementedError, lambda: box.pop(''))
580n/a self.assertRaises(NotImplementedError, lambda: box.popitem())
581n/a self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
582n/a self.assertRaises(NotImplementedError, lambda: box.flush())
583n/a self.assertRaises(NotImplementedError, lambda: box.lock())
584n/a self.assertRaises(NotImplementedError, lambda: box.unlock())
585n/a self.assertRaises(NotImplementedError, lambda: box.close())
586n/a
587n/a
588n/aclass TestMaildir(TestMailbox, unittest.TestCase):
589n/a
590n/a _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
591n/a
592n/a def setUp(self):
593n/a TestMailbox.setUp(self)
594n/a if (os.name == 'nt') or (sys.platform == 'cygwin'):
595n/a self._box.colon = '!'
596n/a
597n/a def assertMailboxEmpty(self):
598n/a self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
599n/a
600n/a def test_add_MM(self):
601n/a # Add a MaildirMessage instance
602n/a msg = mailbox.MaildirMessage(self._template % 0)
603n/a msg.set_subdir('cur')
604n/a msg.set_info('foo')
605n/a key = self._box.add(msg)
606n/a self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
607n/a (key, self._box.colon))))
608n/a
609n/a def test_get_MM(self):
610n/a # Get a MaildirMessage instance
611n/a msg = mailbox.MaildirMessage(self._template % 0)
612n/a msg.set_subdir('cur')
613n/a msg.set_flags('RF')
614n/a key = self._box.add(msg)
615n/a msg_returned = self._box.get_message(key)
616n/a self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
617n/a self.assertEqual(msg_returned.get_subdir(), 'cur')
618n/a self.assertEqual(msg_returned.get_flags(), 'FR')
619n/a
620n/a def test_set_MM(self):
621n/a # Set with a MaildirMessage instance
622n/a msg0 = mailbox.MaildirMessage(self._template % 0)
623n/a msg0.set_flags('TP')
624n/a key = self._box.add(msg0)
625n/a msg_returned = self._box.get_message(key)
626n/a self.assertEqual(msg_returned.get_subdir(), 'new')
627n/a self.assertEqual(msg_returned.get_flags(), 'PT')
628n/a msg1 = mailbox.MaildirMessage(self._template % 1)
629n/a self._box[key] = msg1
630n/a msg_returned = self._box.get_message(key)
631n/a self.assertEqual(msg_returned.get_subdir(), 'new')
632n/a self.assertEqual(msg_returned.get_flags(), '')
633n/a self.assertEqual(msg_returned.get_payload(), '1\n')
634n/a msg2 = mailbox.MaildirMessage(self._template % 2)
635n/a msg2.set_info('2,S')
636n/a self._box[key] = msg2
637n/a self._box[key] = self._template % 3
638n/a msg_returned = self._box.get_message(key)
639n/a self.assertEqual(msg_returned.get_subdir(), 'new')
640n/a self.assertEqual(msg_returned.get_flags(), 'S')
641n/a self.assertEqual(msg_returned.get_payload(), '3\n')
642n/a
643n/a def test_consistent_factory(self):
644n/a # Add a message.
645n/a msg = mailbox.MaildirMessage(self._template % 0)
646n/a msg.set_subdir('cur')
647n/a msg.set_flags('RF')
648n/a key = self._box.add(msg)
649n/a
650n/a # Create new mailbox with
651n/a class FakeMessage(mailbox.MaildirMessage):
652n/a pass
653n/a box = mailbox.Maildir(self._path, factory=FakeMessage)
654n/a box.colon = self._box.colon
655n/a msg2 = box.get_message(key)
656n/a self.assertIsInstance(msg2, FakeMessage)
657n/a
658n/a def test_initialize_new(self):
659n/a # Initialize a non-existent mailbox
660n/a self.tearDown()
661n/a self._box = mailbox.Maildir(self._path)
662n/a self._check_basics()
663n/a self._delete_recursively(self._path)
664n/a self._box = self._factory(self._path, factory=None)
665n/a self._check_basics()
666n/a
667n/a def test_initialize_existing(self):
668n/a # Initialize an existing mailbox
669n/a self.tearDown()
670n/a for subdir in '', 'tmp', 'new', 'cur':
671n/a os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
672n/a self._box = mailbox.Maildir(self._path)
673n/a self._check_basics()
674n/a
675n/a def _check_basics(self, factory=None):
676n/a # (Used by test_open_new() and test_open_existing().)
677n/a self.assertEqual(self._box._path, os.path.abspath(self._path))
678n/a self.assertEqual(self._box._factory, factory)
679n/a for subdir in '', 'tmp', 'new', 'cur':
680n/a path = os.path.join(self._path, subdir)
681n/a mode = os.stat(path)[stat.ST_MODE]
682n/a self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
683n/a
684n/a def test_list_folders(self):
685n/a # List folders
686n/a self._box.add_folder('one')
687n/a self._box.add_folder('two')
688n/a self._box.add_folder('three')
689n/a self.assertEqual(len(self._box.list_folders()), 3)
690n/a self.assertEqual(set(self._box.list_folders()),
691n/a set(('one', 'two', 'three')))
692n/a
693n/a def test_get_folder(self):
694n/a # Open folders
695n/a self._box.add_folder('foo.bar')
696n/a folder0 = self._box.get_folder('foo.bar')
697n/a folder0.add(self._template % 'bar')
698n/a self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
699n/a folder1 = self._box.get_folder('foo.bar')
700n/a self.assertEqual(folder1.get_string(folder1.keys()[0]),
701n/a self._template % 'bar')
702n/a
703n/a def test_add_and_remove_folders(self):
704n/a # Delete folders
705n/a self._box.add_folder('one')
706n/a self._box.add_folder('two')
707n/a self.assertEqual(len(self._box.list_folders()), 2)
708n/a self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
709n/a self._box.remove_folder('one')
710n/a self.assertEqual(len(self._box.list_folders()), 1)
711n/a self.assertEqual(set(self._box.list_folders()), set(('two',)))
712n/a self._box.add_folder('three')
713n/a self.assertEqual(len(self._box.list_folders()), 2)
714n/a self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
715n/a self._box.remove_folder('three')
716n/a self.assertEqual(len(self._box.list_folders()), 1)
717n/a self.assertEqual(set(self._box.list_folders()), set(('two',)))
718n/a self._box.remove_folder('two')
719n/a self.assertEqual(len(self._box.list_folders()), 0)
720n/a self.assertEqual(self._box.list_folders(), [])
721n/a
722n/a def test_clean(self):
723n/a # Remove old files from 'tmp'
724n/a foo_path = os.path.join(self._path, 'tmp', 'foo')
725n/a bar_path = os.path.join(self._path, 'tmp', 'bar')
726n/a with open(foo_path, 'w') as f:
727n/a f.write("@")
728n/a with open(bar_path, 'w') as f:
729n/a f.write("@")
730n/a self._box.clean()
731n/a self.assertTrue(os.path.exists(foo_path))
732n/a self.assertTrue(os.path.exists(bar_path))
733n/a foo_stat = os.stat(foo_path)
734n/a os.utime(foo_path, (time.time() - 129600 - 2,
735n/a foo_stat.st_mtime))
736n/a self._box.clean()
737n/a self.assertFalse(os.path.exists(foo_path))
738n/a self.assertTrue(os.path.exists(bar_path))
739n/a
740n/a def test_create_tmp(self, repetitions=10):
741n/a # Create files in tmp directory
742n/a hostname = socket.gethostname()
743n/a if '/' in hostname:
744n/a hostname = hostname.replace('/', r'\057')
745n/a if ':' in hostname:
746n/a hostname = hostname.replace(':', r'\072')
747n/a pid = os.getpid()
748n/a pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
749n/a r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
750n/a previous_groups = None
751n/a for x in range(repetitions):
752n/a tmp_file = self._box._create_tmp()
753n/a head, tail = os.path.split(tmp_file.name)
754n/a self.assertEqual(head, os.path.abspath(os.path.join(self._path,
755n/a "tmp")),
756n/a "File in wrong location: '%s'" % head)
757n/a match = pattern.match(tail)
758n/a self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
759n/a groups = match.groups()
760n/a if previous_groups is not None:
761n/a self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
762n/a "Non-monotonic seconds: '%s' before '%s'" %
763n/a (previous_groups[0], groups[0]))
764n/a if int(groups[0]) == int(previous_groups[0]):
765n/a self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
766n/a "Non-monotonic milliseconds: '%s' before '%s'" %
767n/a (previous_groups[1], groups[1]))
768n/a self.assertEqual(int(groups[2]), pid,
769n/a "Process ID mismatch: '%s' should be '%s'" %
770n/a (groups[2], pid))
771n/a self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
772n/a "Non-sequential counter: '%s' before '%s'" %
773n/a (previous_groups[3], groups[3]))
774n/a self.assertEqual(groups[4], hostname,
775n/a "Host name mismatch: '%s' should be '%s'" %
776n/a (groups[4], hostname))
777n/a previous_groups = groups
778n/a tmp_file.write(_bytes_sample_message)
779n/a tmp_file.seek(0)
780n/a self.assertEqual(tmp_file.read(), _bytes_sample_message)
781n/a tmp_file.close()
782n/a file_count = len(os.listdir(os.path.join(self._path, "tmp")))
783n/a self.assertEqual(file_count, repetitions,
784n/a "Wrong file count: '%s' should be '%s'" %
785n/a (file_count, repetitions))
786n/a
787n/a def test_refresh(self):
788n/a # Update the table of contents
789n/a self.assertEqual(self._box._toc, {})
790n/a key0 = self._box.add(self._template % 0)
791n/a key1 = self._box.add(self._template % 1)
792n/a self.assertEqual(self._box._toc, {})
793n/a self._box._refresh()
794n/a self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
795n/a key1: os.path.join('new', key1)})
796n/a key2 = self._box.add(self._template % 2)
797n/a self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
798n/a key1: os.path.join('new', key1)})
799n/a self._box._refresh()
800n/a self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
801n/a key1: os.path.join('new', key1),
802n/a key2: os.path.join('new', key2)})
803n/a
804n/a def test_refresh_after_safety_period(self):
805n/a # Issue #13254: Call _refresh after the "file system safety
806n/a # period" of 2 seconds has passed; _toc should still be
807n/a # updated because this is the first call to _refresh.
808n/a key0 = self._box.add(self._template % 0)
809n/a key1 = self._box.add(self._template % 1)
810n/a
811n/a self._box = self._factory(self._path)
812n/a self.assertEqual(self._box._toc, {})
813n/a
814n/a # Emulate sleeping. Instead of sleeping for 2 seconds, use the
815n/a # skew factor to make _refresh think that the filesystem
816n/a # safety period has passed and re-reading the _toc is only
817n/a # required if mtimes differ.
818n/a self._box._skewfactor = -3
819n/a
820n/a self._box._refresh()
821n/a self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
822n/a
823n/a def test_lookup(self):
824n/a # Look up message subpaths in the TOC
825n/a self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
826n/a key0 = self._box.add(self._template % 0)
827n/a self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
828n/a os.remove(os.path.join(self._path, 'new', key0))
829n/a self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
830n/a # Be sure that the TOC is read back from disk (see issue #6896
831n/a # about bad mtime behaviour on some systems).
832n/a self._box.flush()
833n/a self.assertRaises(KeyError, lambda: self._box._lookup(key0))
834n/a self.assertEqual(self._box._toc, {})
835n/a
836n/a def test_lock_unlock(self):
837n/a # Lock and unlock the mailbox. For Maildir, this does nothing.
838n/a self._box.lock()
839n/a self._box.unlock()
840n/a
841n/a def test_folder (self):
842n/a # Test for bug #1569790: verify that folders returned by .get_folder()
843n/a # use the same factory function.
844n/a def dummy_factory (s):
845n/a return None
846n/a box = self._factory(self._path, factory=dummy_factory)
847n/a folder = box.add_folder('folder1')
848n/a self.assertIs(folder._factory, dummy_factory)
849n/a
850n/a folder1_alias = box.get_folder('folder1')
851n/a self.assertIs(folder1_alias._factory, dummy_factory)
852n/a
853n/a def test_directory_in_folder (self):
854n/a # Test that mailboxes still work if there's a stray extra directory
855n/a # in a folder.
856n/a for i in range(10):
857n/a self._box.add(mailbox.Message(_sample_message))
858n/a
859n/a # Create a stray directory
860n/a os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
861n/a
862n/a # Check that looping still works with the directory present.
863n/a for msg in self._box:
864n/a pass
865n/a
866n/a @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
867n/a @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
868n/a def test_file_permissions(self):
869n/a # Verify that message files are created without execute permissions
870n/a msg = mailbox.MaildirMessage(self._template % 0)
871n/a orig_umask = os.umask(0)
872n/a try:
873n/a key = self._box.add(msg)
874n/a finally:
875n/a os.umask(orig_umask)
876n/a path = os.path.join(self._path, self._box._lookup(key))
877n/a mode = os.stat(path).st_mode
878n/a self.assertFalse(mode & 0o111)
879n/a
880n/a @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
881n/a @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
882n/a def test_folder_file_perms(self):
883n/a # From bug #3228, we want to verify that the file created inside a Maildir
884n/a # subfolder isn't marked as executable.
885n/a orig_umask = os.umask(0)
886n/a try:
887n/a subfolder = self._box.add_folder('subfolder')
888n/a finally:
889n/a os.umask(orig_umask)
890n/a
891n/a path = os.path.join(subfolder._path, 'maildirfolder')
892n/a st = os.stat(path)
893n/a perms = st.st_mode
894n/a self.assertFalse((perms & 0o111)) # Execute bits should all be off.
895n/a
896n/a def test_reread(self):
897n/a # Do an initial unconditional refresh
898n/a self._box._refresh()
899n/a
900n/a # Put the last modified times more than two seconds into the past
901n/a # (because mtime may have a two second granularity)
902n/a for subdir in ('cur', 'new'):
903n/a os.utime(os.path.join(self._box._path, subdir),
904n/a (time.time()-5,)*2)
905n/a
906n/a # Because mtime has a two second granularity in worst case (FAT), a
907n/a # refresh is done unconditionally if called for within
908n/a # two-second-plus-a-bit of the last one, just in case the mbox has
909n/a # changed; so now we have to wait for that interval to expire.
910n/a #
911n/a # Because this is a test, emulate sleeping. Instead of
912n/a # sleeping for 2 seconds, use the skew factor to make _refresh
913n/a # think that 2 seconds have passed and re-reading the _toc is
914n/a # only required if mtimes differ.
915n/a self._box._skewfactor = -3
916n/a
917n/a # Re-reading causes the ._toc attribute to be assigned a new dictionary
918n/a # object, so we'll check that the ._toc attribute isn't a different
919n/a # object.
920n/a orig_toc = self._box._toc
921n/a def refreshed():
922n/a return self._box._toc is not orig_toc
923n/a
924n/a self._box._refresh()
925n/a self.assertFalse(refreshed())
926n/a
927n/a # Now, write something into cur and remove it. This changes
928n/a # the mtime and should cause a re-read. Note that "sleep
929n/a # emulation" is still in effect, as skewfactor is -3.
930n/a filename = os.path.join(self._path, 'cur', 'stray-file')
931n/a support.create_empty_file(filename)
932n/a os.unlink(filename)
933n/a self._box._refresh()
934n/a self.assertTrue(refreshed())
935n/a
936n/a
937n/aclass _TestSingleFile(TestMailbox):
938n/a '''Common tests for single-file mailboxes'''
939n/a
940n/a def test_add_doesnt_rewrite(self):
941n/a # When only adding messages, flush() should not rewrite the
942n/a # mailbox file. See issue #9559.
943n/a
944n/a # Inode number changes if the contents are written to another
945n/a # file which is then renamed over the original file. So we
946n/a # must check that the inode number doesn't change.
947n/a inode_before = os.stat(self._path).st_ino
948n/a
949n/a self._box.add(self._template % 0)
950n/a self._box.flush()
951n/a
952n/a inode_after = os.stat(self._path).st_ino
953n/a self.assertEqual(inode_before, inode_after)
954n/a
955n/a # Make sure the message was really added
956n/a self._box.close()
957n/a self._box = self._factory(self._path)
958n/a self.assertEqual(len(self._box), 1)
959n/a
960n/a def test_permissions_after_flush(self):
961n/a # See issue #5346
962n/a
963n/a # Make the mailbox world writable. It's unlikely that the new
964n/a # mailbox file would have these permissions after flush(),
965n/a # because umask usually prevents it.
966n/a mode = os.stat(self._path).st_mode | 0o666
967n/a os.chmod(self._path, mode)
968n/a
969n/a self._box.add(self._template % 0)
970n/a i = self._box.add(self._template % 1)
971n/a # Need to remove one message to make flush() create a new file
972n/a self._box.remove(i)
973n/a self._box.flush()
974n/a
975n/a self.assertEqual(os.stat(self._path).st_mode, mode)
976n/a
977n/a
978n/aclass _TestMboxMMDF(_TestSingleFile):
979n/a
980n/a def tearDown(self):
981n/a super().tearDown()
982n/a self._box.close()
983n/a self._delete_recursively(self._path)
984n/a for lock_remnant in glob.glob(self._path + '.*'):
985n/a support.unlink(lock_remnant)
986n/a
987n/a def assertMailboxEmpty(self):
988n/a with open(self._path) as f:
989n/a self.assertEqual(f.readlines(), [])
990n/a
991n/a def test_add_from_string(self):
992n/a # Add a string starting with 'From ' to the mailbox
993n/a key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
994n/a self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
995n/a self.assertEqual(self._box[key].get_payload(), '0\n')
996n/a
997n/a def test_add_from_bytes(self):
998n/a # Add a byte string starting with 'From ' to the mailbox
999n/a key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1000n/a self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1001n/a self.assertEqual(self._box[key].get_payload(), '0\n')
1002n/a
1003n/a def test_add_mbox_or_mmdf_message(self):
1004n/a # Add an mboxMessage or MMDFMessage
1005n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1006n/a msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
1007n/a key = self._box.add(msg)
1008n/a
1009n/a def test_open_close_open(self):
1010n/a # Open and inspect previously-created mailbox
1011n/a values = [self._template % i for i in range(3)]
1012n/a for value in values:
1013n/a self._box.add(value)
1014n/a self._box.close()
1015n/a mtime = os.path.getmtime(self._path)
1016n/a self._box = self._factory(self._path)
1017n/a self.assertEqual(len(self._box), 3)
1018n/a for key in self._box.iterkeys():
1019n/a self.assertIn(self._box.get_string(key), values)
1020n/a self._box.close()
1021n/a self.assertEqual(mtime, os.path.getmtime(self._path))
1022n/a
1023n/a def test_add_and_close(self):
1024n/a # Verifying that closing a mailbox doesn't change added items
1025n/a self._box.add(_sample_message)
1026n/a for i in range(3):
1027n/a self._box.add(self._template % i)
1028n/a self._box.add(_sample_message)
1029n/a self._box._file.flush()
1030n/a self._box._file.seek(0)
1031n/a contents = self._box._file.read()
1032n/a self._box.close()
1033n/a with open(self._path, 'rb') as f:
1034n/a self.assertEqual(contents, f.read())
1035n/a self._box = self._factory(self._path)
1036n/a
1037n/a @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
1038n/a @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
1039n/a def test_lock_conflict(self):
1040n/a # Fork off a child process that will lock the mailbox temporarily,
1041n/a # unlock it and exit.
1042n/a c, p = socket.socketpair()
1043n/a self.addCleanup(c.close)
1044n/a self.addCleanup(p.close)
1045n/a
1046n/a pid = os.fork()
1047n/a if pid == 0:
1048n/a # child
1049n/a try:
1050n/a # lock the mailbox, and signal the parent it can proceed
1051n/a self._box.lock()
1052n/a c.send(b'c')
1053n/a
1054n/a # wait until the parent is done, and unlock the mailbox
1055n/a c.recv(1)
1056n/a self._box.unlock()
1057n/a finally:
1058n/a os._exit(0)
1059n/a
1060n/a # In the parent, wait until the child signals it locked the mailbox.
1061n/a p.recv(1)
1062n/a try:
1063n/a self.assertRaises(mailbox.ExternalClashError,
1064n/a self._box.lock)
1065n/a finally:
1066n/a # Signal the child it can now release the lock and exit.
1067n/a p.send(b'p')
1068n/a # Wait for child to exit. Locking should now succeed.
1069n/a exited_pid, status = os.waitpid(pid, 0)
1070n/a
1071n/a self._box.lock()
1072n/a self._box.unlock()
1073n/a
1074n/a def test_relock(self):
1075n/a # Test case for bug #1575506: the mailbox class was locking the
1076n/a # wrong file object in its flush() method.
1077n/a msg = "Subject: sub\n\nbody\n"
1078n/a key1 = self._box.add(msg)
1079n/a self._box.flush()
1080n/a self._box.close()
1081n/a
1082n/a self._box = self._factory(self._path)
1083n/a self._box.lock()
1084n/a key2 = self._box.add(msg)
1085n/a self._box.flush()
1086n/a self.assertTrue(self._box._locked)
1087n/a self._box.close()
1088n/a
1089n/a
1090n/aclass TestMbox(_TestMboxMMDF, unittest.TestCase):
1091n/a
1092n/a _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1093n/a
1094n/a @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
1095n/a @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
1096n/a def test_file_perms(self):
1097n/a # From bug #3228, we want to verify that the mailbox file isn't executable,
1098n/a # even if the umask is set to something that would leave executable bits set.
1099n/a # We only run this test on platforms that support umask.
1100n/a try:
1101n/a old_umask = os.umask(0o077)
1102n/a self._box.close()
1103n/a os.unlink(self._path)
1104n/a self._box = mailbox.mbox(self._path, create=True)
1105n/a self._box.add('')
1106n/a self._box.close()
1107n/a finally:
1108n/a os.umask(old_umask)
1109n/a
1110n/a st = os.stat(self._path)
1111n/a perms = st.st_mode
1112n/a self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1113n/a
1114n/a def test_terminating_newline(self):
1115n/a message = email.message.Message()
1116n/a message['From'] = 'john@example.com'
1117n/a message.set_payload('No newline at the end')
1118n/a i = self._box.add(message)
1119n/a
1120n/a # A newline should have been appended to the payload
1121n/a message = self._box.get(i)
1122n/a self.assertEqual(message.get_payload(), 'No newline at the end\n')
1123n/a
1124n/a def test_message_separator(self):
1125n/a # Check there's always a single blank line after each message
1126n/a self._box.add('From: foo\n\n0') # No newline at the end
1127n/a with open(self._path) as f:
1128n/a data = f.read()
1129n/a self.assertEqual(data[-3:], '0\n\n')
1130n/a
1131n/a self._box.add('From: foo\n\n0\n') # Newline at the end
1132n/a with open(self._path) as f:
1133n/a data = f.read()
1134n/a self.assertEqual(data[-3:], '0\n\n')
1135n/a
1136n/a
1137n/aclass TestMMDF(_TestMboxMMDF, unittest.TestCase):
1138n/a
1139n/a _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1140n/a
1141n/a
1142n/aclass TestMH(TestMailbox, unittest.TestCase):
1143n/a
1144n/a _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1145n/a
1146n/a def assertMailboxEmpty(self):
1147n/a self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1148n/a
1149n/a def test_list_folders(self):
1150n/a # List folders
1151n/a self._box.add_folder('one')
1152n/a self._box.add_folder('two')
1153n/a self._box.add_folder('three')
1154n/a self.assertEqual(len(self._box.list_folders()), 3)
1155n/a self.assertEqual(set(self._box.list_folders()),
1156n/a set(('one', 'two', 'three')))
1157n/a
1158n/a def test_get_folder(self):
1159n/a # Open folders
1160n/a def dummy_factory (s):
1161n/a return None
1162n/a self._box = self._factory(self._path, dummy_factory)
1163n/a
1164n/a new_folder = self._box.add_folder('foo.bar')
1165n/a folder0 = self._box.get_folder('foo.bar')
1166n/a folder0.add(self._template % 'bar')
1167n/a self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1168n/a folder1 = self._box.get_folder('foo.bar')
1169n/a self.assertEqual(folder1.get_string(folder1.keys()[0]),
1170n/a self._template % 'bar')
1171n/a
1172n/a # Test for bug #1569790: verify that folders returned by .get_folder()
1173n/a # use the same factory function.
1174n/a self.assertIs(new_folder._factory, self._box._factory)
1175n/a self.assertIs(folder0._factory, self._box._factory)
1176n/a
1177n/a def test_add_and_remove_folders(self):
1178n/a # Delete folders
1179n/a self._box.add_folder('one')
1180n/a self._box.add_folder('two')
1181n/a self.assertEqual(len(self._box.list_folders()), 2)
1182n/a self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1183n/a self._box.remove_folder('one')
1184n/a self.assertEqual(len(self._box.list_folders()), 1)
1185n/a self.assertEqual(set(self._box.list_folders()), set(('two',)))
1186n/a self._box.add_folder('three')
1187n/a self.assertEqual(len(self._box.list_folders()), 2)
1188n/a self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1189n/a self._box.remove_folder('three')
1190n/a self.assertEqual(len(self._box.list_folders()), 1)
1191n/a self.assertEqual(set(self._box.list_folders()), set(('two',)))
1192n/a self._box.remove_folder('two')
1193n/a self.assertEqual(len(self._box.list_folders()), 0)
1194n/a self.assertEqual(self._box.list_folders(), [])
1195n/a
1196n/a def test_sequences(self):
1197n/a # Get and set sequences
1198n/a self.assertEqual(self._box.get_sequences(), {})
1199n/a msg0 = mailbox.MHMessage(self._template % 0)
1200n/a msg0.add_sequence('foo')
1201n/a key0 = self._box.add(msg0)
1202n/a self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1203n/a msg1 = mailbox.MHMessage(self._template % 1)
1204n/a msg1.set_sequences(['bar', 'replied', 'foo'])
1205n/a key1 = self._box.add(msg1)
1206n/a self.assertEqual(self._box.get_sequences(),
1207n/a {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
1208n/a msg0.set_sequences(['flagged'])
1209n/a self._box[key0] = msg0
1210n/a self.assertEqual(self._box.get_sequences(),
1211n/a {'foo':[key1], 'bar':[key1], 'replied':[key1],
1212n/a 'flagged':[key0]})
1213n/a self._box.remove(key1)
1214n/a self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1215n/a
1216n/a def test_issue2625(self):
1217n/a msg0 = mailbox.MHMessage(self._template % 0)
1218n/a msg0.add_sequence('foo')
1219n/a key0 = self._box.add(msg0)
1220n/a refmsg0 = self._box.get_message(key0)
1221n/a
1222n/a def test_issue7627(self):
1223n/a msg0 = mailbox.MHMessage(self._template % 0)
1224n/a key0 = self._box.add(msg0)
1225n/a self._box.lock()
1226n/a self._box.remove(key0)
1227n/a self._box.unlock()
1228n/a
1229n/a def test_pack(self):
1230n/a # Pack the contents of the mailbox
1231n/a msg0 = mailbox.MHMessage(self._template % 0)
1232n/a msg1 = mailbox.MHMessage(self._template % 1)
1233n/a msg2 = mailbox.MHMessage(self._template % 2)
1234n/a msg3 = mailbox.MHMessage(self._template % 3)
1235n/a msg0.set_sequences(['foo', 'unseen'])
1236n/a msg1.set_sequences(['foo'])
1237n/a msg2.set_sequences(['foo', 'flagged'])
1238n/a msg3.set_sequences(['foo', 'bar', 'replied'])
1239n/a key0 = self._box.add(msg0)
1240n/a key1 = self._box.add(msg1)
1241n/a key2 = self._box.add(msg2)
1242n/a key3 = self._box.add(msg3)
1243n/a self.assertEqual(self._box.get_sequences(),
1244n/a {'foo':[key0,key1,key2,key3], 'unseen':[key0],
1245n/a 'flagged':[key2], 'bar':[key3], 'replied':[key3]})
1246n/a self._box.remove(key2)
1247n/a self.assertEqual(self._box.get_sequences(),
1248n/a {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
1249n/a 'replied':[key3]})
1250n/a self._box.pack()
1251n/a self.assertEqual(self._box.keys(), [1, 2, 3])
1252n/a key0 = key0
1253n/a key1 = key0 + 1
1254n/a key2 = key1 + 1
1255n/a self.assertEqual(self._box.get_sequences(),
1256n/a {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1257n/a
1258n/a # Test case for packing while holding the mailbox locked.
1259n/a key0 = self._box.add(msg1)
1260n/a key1 = self._box.add(msg1)
1261n/a key2 = self._box.add(msg1)
1262n/a key3 = self._box.add(msg1)
1263n/a
1264n/a self._box.remove(key0)
1265n/a self._box.remove(key2)
1266n/a self._box.lock()
1267n/a self._box.pack()
1268n/a self._box.unlock()
1269n/a self.assertEqual(self._box.get_sequences(),
1270n/a {'foo':[1, 2, 3, 4, 5],
1271n/a 'unseen':[1], 'bar':[3], 'replied':[3]})
1272n/a
1273n/a def _get_lock_path(self):
1274n/a return os.path.join(self._path, '.mh_sequences.lock')
1275n/a
1276n/a
1277n/aclass TestBabyl(_TestSingleFile, unittest.TestCase):
1278n/a
1279n/a _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1280n/a
1281n/a def assertMailboxEmpty(self):
1282n/a with open(self._path) as f:
1283n/a self.assertEqual(f.readlines(), [])
1284n/a
1285n/a def tearDown(self):
1286n/a super().tearDown()
1287n/a self._box.close()
1288n/a self._delete_recursively(self._path)
1289n/a for lock_remnant in glob.glob(self._path + '.*'):
1290n/a support.unlink(lock_remnant)
1291n/a
1292n/a def test_labels(self):
1293n/a # Get labels from the mailbox
1294n/a self.assertEqual(self._box.get_labels(), [])
1295n/a msg0 = mailbox.BabylMessage(self._template % 0)
1296n/a msg0.add_label('foo')
1297n/a key0 = self._box.add(msg0)
1298n/a self.assertEqual(self._box.get_labels(), ['foo'])
1299n/a msg1 = mailbox.BabylMessage(self._template % 1)
1300n/a msg1.set_labels(['bar', 'answered', 'foo'])
1301n/a key1 = self._box.add(msg1)
1302n/a self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1303n/a msg0.set_labels(['blah', 'filed'])
1304n/a self._box[key0] = msg0
1305n/a self.assertEqual(set(self._box.get_labels()),
1306n/a set(['foo', 'bar', 'blah']))
1307n/a self._box.remove(key1)
1308n/a self.assertEqual(set(self._box.get_labels()), set(['blah']))
1309n/a
1310n/a
1311n/aclass FakeFileLikeObject:
1312n/a
1313n/a def __init__(self):
1314n/a self.closed = False
1315n/a
1316n/a def close(self):
1317n/a self.closed = True
1318n/a
1319n/a
1320n/aclass FakeMailBox(mailbox.Mailbox):
1321n/a
1322n/a def __init__(self):
1323n/a mailbox.Mailbox.__init__(self, '', lambda file: None)
1324n/a self.files = [FakeFileLikeObject() for i in range(10)]
1325n/a
1326n/a def get_file(self, key):
1327n/a return self.files[key]
1328n/a
1329n/a
1330n/aclass TestFakeMailBox(unittest.TestCase):
1331n/a
1332n/a def test_closing_fd(self):
1333n/a box = FakeMailBox()
1334n/a for i in range(10):
1335n/a self.assertFalse(box.files[i].closed)
1336n/a for i in range(10):
1337n/a box[i]
1338n/a for i in range(10):
1339n/a self.assertTrue(box.files[i].closed)
1340n/a
1341n/a
1342n/aclass TestMessage(TestBase, unittest.TestCase):
1343n/a
1344n/a _factory = mailbox.Message # Overridden by subclasses to reuse tests
1345n/a
1346n/a def setUp(self):
1347n/a self._path = support.TESTFN
1348n/a
1349n/a def tearDown(self):
1350n/a self._delete_recursively(self._path)
1351n/a
1352n/a def test_initialize_with_eMM(self):
1353n/a # Initialize based on email.message.Message instance
1354n/a eMM = email.message_from_string(_sample_message)
1355n/a msg = self._factory(eMM)
1356n/a self._post_initialize_hook(msg)
1357n/a self._check_sample(msg)
1358n/a
1359n/a def test_initialize_with_string(self):
1360n/a # Initialize based on string
1361n/a msg = self._factory(_sample_message)
1362n/a self._post_initialize_hook(msg)
1363n/a self._check_sample(msg)
1364n/a
1365n/a def test_initialize_with_file(self):
1366n/a # Initialize based on contents of file
1367n/a with open(self._path, 'w+') as f:
1368n/a f.write(_sample_message)
1369n/a f.seek(0)
1370n/a msg = self._factory(f)
1371n/a self._post_initialize_hook(msg)
1372n/a self._check_sample(msg)
1373n/a
1374n/a def test_initialize_with_binary_file(self):
1375n/a # Initialize based on contents of binary file
1376n/a with open(self._path, 'wb+') as f:
1377n/a f.write(_bytes_sample_message)
1378n/a f.seek(0)
1379n/a msg = self._factory(f)
1380n/a self._post_initialize_hook(msg)
1381n/a self._check_sample(msg)
1382n/a
1383n/a def test_initialize_with_nothing(self):
1384n/a # Initialize without arguments
1385n/a msg = self._factory()
1386n/a self._post_initialize_hook(msg)
1387n/a self.assertIsInstance(msg, email.message.Message)
1388n/a self.assertIsInstance(msg, mailbox.Message)
1389n/a self.assertIsInstance(msg, self._factory)
1390n/a self.assertEqual(msg.keys(), [])
1391n/a self.assertFalse(msg.is_multipart())
1392n/a self.assertIsNone(msg.get_payload())
1393n/a
1394n/a def test_initialize_incorrectly(self):
1395n/a # Initialize with invalid argument
1396n/a self.assertRaises(TypeError, lambda: self._factory(object()))
1397n/a
1398n/a def test_all_eMM_attribues_exist(self):
1399n/a # Issue 12537
1400n/a eMM = email.message_from_string(_sample_message)
1401n/a msg = self._factory(_sample_message)
1402n/a for attr in eMM.__dict__:
1403n/a self.assertIn(attr, msg.__dict__,
1404n/a '{} attribute does not exist'.format(attr))
1405n/a
1406n/a def test_become_message(self):
1407n/a # Take on the state of another message
1408n/a eMM = email.message_from_string(_sample_message)
1409n/a msg = self._factory()
1410n/a msg._become_message(eMM)
1411n/a self._check_sample(msg)
1412n/a
1413n/a def test_explain_to(self):
1414n/a # Copy self's format-specific data to other message formats.
1415n/a # This test is superficial; better ones are in TestMessageConversion.
1416n/a msg = self._factory()
1417n/a for class_ in self.all_mailbox_types:
1418n/a other_msg = class_()
1419n/a msg._explain_to(other_msg)
1420n/a other_msg = email.message.Message()
1421n/a self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1422n/a
1423n/a def _post_initialize_hook(self, msg):
1424n/a # Overridden by subclasses to check extra things after initialization
1425n/a pass
1426n/a
1427n/a
1428n/aclass TestMaildirMessage(TestMessage, unittest.TestCase):
1429n/a
1430n/a _factory = mailbox.MaildirMessage
1431n/a
1432n/a def _post_initialize_hook(self, msg):
1433n/a self.assertEqual(msg._subdir, 'new')
1434n/a self.assertEqual(msg._info, '')
1435n/a
1436n/a def test_subdir(self):
1437n/a # Use get_subdir() and set_subdir()
1438n/a msg = mailbox.MaildirMessage(_sample_message)
1439n/a self.assertEqual(msg.get_subdir(), 'new')
1440n/a msg.set_subdir('cur')
1441n/a self.assertEqual(msg.get_subdir(), 'cur')
1442n/a msg.set_subdir('new')
1443n/a self.assertEqual(msg.get_subdir(), 'new')
1444n/a self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1445n/a self.assertEqual(msg.get_subdir(), 'new')
1446n/a msg.set_subdir('new')
1447n/a self.assertEqual(msg.get_subdir(), 'new')
1448n/a self._check_sample(msg)
1449n/a
1450n/a def test_flags(self):
1451n/a # Use get_flags(), set_flags(), add_flag(), remove_flag()
1452n/a msg = mailbox.MaildirMessage(_sample_message)
1453n/a self.assertEqual(msg.get_flags(), '')
1454n/a self.assertEqual(msg.get_subdir(), 'new')
1455n/a msg.set_flags('F')
1456n/a self.assertEqual(msg.get_subdir(), 'new')
1457n/a self.assertEqual(msg.get_flags(), 'F')
1458n/a msg.set_flags('SDTP')
1459n/a self.assertEqual(msg.get_flags(), 'DPST')
1460n/a msg.add_flag('FT')
1461n/a self.assertEqual(msg.get_flags(), 'DFPST')
1462n/a msg.remove_flag('TDRP')
1463n/a self.assertEqual(msg.get_flags(), 'FS')
1464n/a self.assertEqual(msg.get_subdir(), 'new')
1465n/a self._check_sample(msg)
1466n/a
1467n/a def test_date(self):
1468n/a # Use get_date() and set_date()
1469n/a msg = mailbox.MaildirMessage(_sample_message)
1470n/a self.assertLess(abs(msg.get_date() - time.time()), 60)
1471n/a msg.set_date(0.0)
1472n/a self.assertEqual(msg.get_date(), 0.0)
1473n/a
1474n/a def test_info(self):
1475n/a # Use get_info() and set_info()
1476n/a msg = mailbox.MaildirMessage(_sample_message)
1477n/a self.assertEqual(msg.get_info(), '')
1478n/a msg.set_info('1,foo=bar')
1479n/a self.assertEqual(msg.get_info(), '1,foo=bar')
1480n/a self.assertRaises(TypeError, lambda: msg.set_info(None))
1481n/a self._check_sample(msg)
1482n/a
1483n/a def test_info_and_flags(self):
1484n/a # Test interaction of info and flag methods
1485n/a msg = mailbox.MaildirMessage(_sample_message)
1486n/a self.assertEqual(msg.get_info(), '')
1487n/a msg.set_flags('SF')
1488n/a self.assertEqual(msg.get_flags(), 'FS')
1489n/a self.assertEqual(msg.get_info(), '2,FS')
1490n/a msg.set_info('1,')
1491n/a self.assertEqual(msg.get_flags(), '')
1492n/a self.assertEqual(msg.get_info(), '1,')
1493n/a msg.remove_flag('RPT')
1494n/a self.assertEqual(msg.get_flags(), '')
1495n/a self.assertEqual(msg.get_info(), '1,')
1496n/a msg.add_flag('D')
1497n/a self.assertEqual(msg.get_flags(), 'D')
1498n/a self.assertEqual(msg.get_info(), '2,D')
1499n/a self._check_sample(msg)
1500n/a
1501n/a
1502n/aclass _TestMboxMMDFMessage:
1503n/a
1504n/a _factory = mailbox._mboxMMDFMessage
1505n/a
1506n/a def _post_initialize_hook(self, msg):
1507n/a self._check_from(msg)
1508n/a
1509n/a def test_initialize_with_unixfrom(self):
1510n/a # Initialize with a message that already has a _unixfrom attribute
1511n/a msg = mailbox.Message(_sample_message)
1512n/a msg.set_unixfrom('From foo@bar blah')
1513n/a msg = mailbox.mboxMessage(msg)
1514n/a self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1515n/a
1516n/a def test_from(self):
1517n/a # Get and set "From " line
1518n/a msg = mailbox.mboxMessage(_sample_message)
1519n/a self._check_from(msg)
1520n/a msg.set_from('foo bar')
1521n/a self.assertEqual(msg.get_from(), 'foo bar')
1522n/a msg.set_from('foo@bar', True)
1523n/a self._check_from(msg, 'foo@bar')
1524n/a msg.set_from('blah@temp', time.localtime())
1525n/a self._check_from(msg, 'blah@temp')
1526n/a
1527n/a def test_flags(self):
1528n/a # Use get_flags(), set_flags(), add_flag(), remove_flag()
1529n/a msg = mailbox.mboxMessage(_sample_message)
1530n/a self.assertEqual(msg.get_flags(), '')
1531n/a msg.set_flags('F')
1532n/a self.assertEqual(msg.get_flags(), 'F')
1533n/a msg.set_flags('XODR')
1534n/a self.assertEqual(msg.get_flags(), 'RODX')
1535n/a msg.add_flag('FA')
1536n/a self.assertEqual(msg.get_flags(), 'RODFAX')
1537n/a msg.remove_flag('FDXA')
1538n/a self.assertEqual(msg.get_flags(), 'RO')
1539n/a self._check_sample(msg)
1540n/a
1541n/a def _check_from(self, msg, sender=None):
1542n/a # Check contents of "From " line
1543n/a if sender is None:
1544n/a sender = "MAILER-DAEMON"
1545n/a self.assertIsNotNone(re.match(
1546n/a sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
1547n/a msg.get_from()))
1548n/a
1549n/a
1550n/aclass TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
1551n/a
1552n/a _factory = mailbox.mboxMessage
1553n/a
1554n/a
1555n/aclass TestMHMessage(TestMessage, unittest.TestCase):
1556n/a
1557n/a _factory = mailbox.MHMessage
1558n/a
1559n/a def _post_initialize_hook(self, msg):
1560n/a self.assertEqual(msg._sequences, [])
1561n/a
1562n/a def test_sequences(self):
1563n/a # Get, set, join, and leave sequences
1564n/a msg = mailbox.MHMessage(_sample_message)
1565n/a self.assertEqual(msg.get_sequences(), [])
1566n/a msg.set_sequences(['foobar'])
1567n/a self.assertEqual(msg.get_sequences(), ['foobar'])
1568n/a msg.set_sequences([])
1569n/a self.assertEqual(msg.get_sequences(), [])
1570n/a msg.add_sequence('unseen')
1571n/a self.assertEqual(msg.get_sequences(), ['unseen'])
1572n/a msg.add_sequence('flagged')
1573n/a self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1574n/a msg.add_sequence('flagged')
1575n/a self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1576n/a msg.remove_sequence('unseen')
1577n/a self.assertEqual(msg.get_sequences(), ['flagged'])
1578n/a msg.add_sequence('foobar')
1579n/a self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1580n/a msg.remove_sequence('replied')
1581n/a self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1582n/a msg.set_sequences(['foobar', 'replied'])
1583n/a self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1584n/a
1585n/a
1586n/aclass TestBabylMessage(TestMessage, unittest.TestCase):
1587n/a
1588n/a _factory = mailbox.BabylMessage
1589n/a
1590n/a def _post_initialize_hook(self, msg):
1591n/a self.assertEqual(msg._labels, [])
1592n/a
1593n/a def test_labels(self):
1594n/a # Get, set, join, and leave labels
1595n/a msg = mailbox.BabylMessage(_sample_message)
1596n/a self.assertEqual(msg.get_labels(), [])
1597n/a msg.set_labels(['foobar'])
1598n/a self.assertEqual(msg.get_labels(), ['foobar'])
1599n/a msg.set_labels([])
1600n/a self.assertEqual(msg.get_labels(), [])
1601n/a msg.add_label('filed')
1602n/a self.assertEqual(msg.get_labels(), ['filed'])
1603n/a msg.add_label('resent')
1604n/a self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1605n/a msg.add_label('resent')
1606n/a self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1607n/a msg.remove_label('filed')
1608n/a self.assertEqual(msg.get_labels(), ['resent'])
1609n/a msg.add_label('foobar')
1610n/a self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1611n/a msg.remove_label('unseen')
1612n/a self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1613n/a msg.set_labels(['foobar', 'answered'])
1614n/a self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1615n/a
1616n/a def test_visible(self):
1617n/a # Get, set, and update visible headers
1618n/a msg = mailbox.BabylMessage(_sample_message)
1619n/a visible = msg.get_visible()
1620n/a self.assertEqual(visible.keys(), [])
1621n/a self.assertIsNone(visible.get_payload())
1622n/a visible['User-Agent'] = 'FooBar 1.0'
1623n/a visible['X-Whatever'] = 'Blah'
1624n/a self.assertEqual(msg.get_visible().keys(), [])
1625n/a msg.set_visible(visible)
1626n/a visible = msg.get_visible()
1627n/a self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1628n/a self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1629n/a self.assertEqual(visible['X-Whatever'], 'Blah')
1630n/a self.assertIsNone(visible.get_payload())
1631n/a msg.update_visible()
1632n/a self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1633n/a self.assertIsNone(visible.get_payload())
1634n/a visible = msg.get_visible()
1635n/a self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1636n/a 'Subject'])
1637n/a for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1638n/a self.assertEqual(visible[header], msg[header])
1639n/a
1640n/a
1641n/aclass TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
1642n/a
1643n/a _factory = mailbox.MMDFMessage
1644n/a
1645n/a
1646n/aclass TestMessageConversion(TestBase, unittest.TestCase):
1647n/a
1648n/a def test_plain_to_x(self):
1649n/a # Convert Message to all formats
1650n/a for class_ in self.all_mailbox_types:
1651n/a msg_plain = mailbox.Message(_sample_message)
1652n/a msg = class_(msg_plain)
1653n/a self._check_sample(msg)
1654n/a
1655n/a def test_x_to_plain(self):
1656n/a # Convert all formats to Message
1657n/a for class_ in self.all_mailbox_types:
1658n/a msg = class_(_sample_message)
1659n/a msg_plain = mailbox.Message(msg)
1660n/a self._check_sample(msg_plain)
1661n/a
1662n/a def test_x_from_bytes(self):
1663n/a # Convert all formats to Message
1664n/a for class_ in self.all_mailbox_types:
1665n/a msg = class_(_bytes_sample_message)
1666n/a self._check_sample(msg)
1667n/a
1668n/a def test_x_to_invalid(self):
1669n/a # Convert all formats to an invalid format
1670n/a for class_ in self.all_mailbox_types:
1671n/a self.assertRaises(TypeError, lambda: class_(False))
1672n/a
1673n/a def test_type_specific_attributes_removed_on_conversion(self):
1674n/a reference = {class_: class_(_sample_message).__dict__
1675n/a for class_ in self.all_mailbox_types}
1676n/a for class1 in self.all_mailbox_types:
1677n/a for class2 in self.all_mailbox_types:
1678n/a if class1 is class2:
1679n/a continue
1680n/a source = class1(_sample_message)
1681n/a target = class2(source)
1682n/a type_specific = [a for a in reference[class1]
1683n/a if a not in reference[class2]]
1684n/a for attr in type_specific:
1685n/a self.assertNotIn(attr, target.__dict__,
1686n/a "while converting {} to {}".format(class1, class2))
1687n/a
1688n/a def test_maildir_to_maildir(self):
1689n/a # Convert MaildirMessage to MaildirMessage
1690n/a msg_maildir = mailbox.MaildirMessage(_sample_message)
1691n/a msg_maildir.set_flags('DFPRST')
1692n/a msg_maildir.set_subdir('cur')
1693n/a date = msg_maildir.get_date()
1694n/a msg = mailbox.MaildirMessage(msg_maildir)
1695n/a self._check_sample(msg)
1696n/a self.assertEqual(msg.get_flags(), 'DFPRST')
1697n/a self.assertEqual(msg.get_subdir(), 'cur')
1698n/a self.assertEqual(msg.get_date(), date)
1699n/a
1700n/a def test_maildir_to_mboxmmdf(self):
1701n/a # Convert MaildirMessage to mboxmessage and MMDFMessage
1702n/a pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1703n/a ('T', 'D'), ('DFPRST', 'RDFA'))
1704n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1705n/a msg_maildir = mailbox.MaildirMessage(_sample_message)
1706n/a msg_maildir.set_date(0.0)
1707n/a for setting, result in pairs:
1708n/a msg_maildir.set_flags(setting)
1709n/a msg = class_(msg_maildir)
1710n/a self.assertEqual(msg.get_flags(), result)
1711n/a self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1712n/a time.asctime(time.gmtime(0.0)))
1713n/a msg_maildir.set_subdir('cur')
1714n/a self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1715n/a
1716n/a def test_maildir_to_mh(self):
1717n/a # Convert MaildirMessage to MHMessage
1718n/a msg_maildir = mailbox.MaildirMessage(_sample_message)
1719n/a pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1720n/a ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1721n/a ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1722n/a for setting, result in pairs:
1723n/a msg_maildir.set_flags(setting)
1724n/a self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1725n/a result)
1726n/a
1727n/a def test_maildir_to_babyl(self):
1728n/a # Convert MaildirMessage to Babyl
1729n/a msg_maildir = mailbox.MaildirMessage(_sample_message)
1730n/a pairs = (('D', ['unseen']), ('F', ['unseen']),
1731n/a ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1732n/a ('S', []), ('T', ['unseen', 'deleted']),
1733n/a ('DFPRST', ['deleted', 'answered', 'forwarded']))
1734n/a for setting, result in pairs:
1735n/a msg_maildir.set_flags(setting)
1736n/a self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1737n/a result)
1738n/a
1739n/a def test_mboxmmdf_to_maildir(self):
1740n/a # Convert mboxMessage and MMDFMessage to MaildirMessage
1741n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1742n/a msg_mboxMMDF = class_(_sample_message)
1743n/a msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
1744n/a pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1745n/a ('RODFA', 'FRST'))
1746n/a for setting, result in pairs:
1747n/a msg_mboxMMDF.set_flags(setting)
1748n/a msg = mailbox.MaildirMessage(msg_mboxMMDF)
1749n/a self.assertEqual(msg.get_flags(), result)
1750n/a self.assertEqual(msg.get_date(), 0.0)
1751n/a msg_mboxMMDF.set_flags('O')
1752n/a self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1753n/a 'cur')
1754n/a
1755n/a def test_mboxmmdf_to_mboxmmdf(self):
1756n/a # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1757n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1758n/a msg_mboxMMDF = class_(_sample_message)
1759n/a msg_mboxMMDF.set_flags('RODFA')
1760n/a msg_mboxMMDF.set_from('foo@bar')
1761n/a for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1762n/a msg2 = class2_(msg_mboxMMDF)
1763n/a self.assertEqual(msg2.get_flags(), 'RODFA')
1764n/a self.assertEqual(msg2.get_from(), 'foo@bar')
1765n/a
1766n/a def test_mboxmmdf_to_mh(self):
1767n/a # Convert mboxMessage and MMDFMessage to MHMessage
1768n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1769n/a msg_mboxMMDF = class_(_sample_message)
1770n/a pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1771n/a ('F', ['unseen', 'flagged']),
1772n/a ('A', ['unseen', 'replied']),
1773n/a ('RODFA', ['replied', 'flagged']))
1774n/a for setting, result in pairs:
1775n/a msg_mboxMMDF.set_flags(setting)
1776n/a self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1777n/a result)
1778n/a
1779n/a def test_mboxmmdf_to_babyl(self):
1780n/a # Convert mboxMessage and MMDFMessage to BabylMessage
1781n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1782n/a msg = class_(_sample_message)
1783n/a pairs = (('R', []), ('O', ['unseen']),
1784n/a ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1785n/a ('A', ['unseen', 'answered']),
1786n/a ('RODFA', ['deleted', 'answered']))
1787n/a for setting, result in pairs:
1788n/a msg.set_flags(setting)
1789n/a self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1790n/a
1791n/a def test_mh_to_maildir(self):
1792n/a # Convert MHMessage to MaildirMessage
1793n/a pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1794n/a for setting, result in pairs:
1795n/a msg = mailbox.MHMessage(_sample_message)
1796n/a msg.add_sequence(setting)
1797n/a self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1798n/a self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1799n/a msg = mailbox.MHMessage(_sample_message)
1800n/a msg.add_sequence('unseen')
1801n/a msg.add_sequence('replied')
1802n/a msg.add_sequence('flagged')
1803n/a self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1804n/a self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1805n/a
1806n/a def test_mh_to_mboxmmdf(self):
1807n/a # Convert MHMessage to mboxMessage and MMDFMessage
1808n/a pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1809n/a for setting, result in pairs:
1810n/a msg = mailbox.MHMessage(_sample_message)
1811n/a msg.add_sequence(setting)
1812n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1813n/a self.assertEqual(class_(msg).get_flags(), result)
1814n/a msg = mailbox.MHMessage(_sample_message)
1815n/a msg.add_sequence('unseen')
1816n/a msg.add_sequence('replied')
1817n/a msg.add_sequence('flagged')
1818n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1819n/a self.assertEqual(class_(msg).get_flags(), 'OFA')
1820n/a
1821n/a def test_mh_to_mh(self):
1822n/a # Convert MHMessage to MHMessage
1823n/a msg = mailbox.MHMessage(_sample_message)
1824n/a msg.add_sequence('unseen')
1825n/a msg.add_sequence('replied')
1826n/a msg.add_sequence('flagged')
1827n/a self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1828n/a ['unseen', 'replied', 'flagged'])
1829n/a
1830n/a def test_mh_to_babyl(self):
1831n/a # Convert MHMessage to BabylMessage
1832n/a pairs = (('unseen', ['unseen']), ('replied', ['answered']),
1833n/a ('flagged', []))
1834n/a for setting, result in pairs:
1835n/a msg = mailbox.MHMessage(_sample_message)
1836n/a msg.add_sequence(setting)
1837n/a self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1838n/a msg = mailbox.MHMessage(_sample_message)
1839n/a msg.add_sequence('unseen')
1840n/a msg.add_sequence('replied')
1841n/a msg.add_sequence('flagged')
1842n/a self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1843n/a ['unseen', 'answered'])
1844n/a
1845n/a def test_babyl_to_maildir(self):
1846n/a # Convert BabylMessage to MaildirMessage
1847n/a pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1848n/a ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1849n/a ('resent', 'PS'))
1850n/a for setting, result in pairs:
1851n/a msg = mailbox.BabylMessage(_sample_message)
1852n/a msg.add_label(setting)
1853n/a self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1854n/a self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1855n/a msg = mailbox.BabylMessage(_sample_message)
1856n/a for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1857n/a 'edited', 'resent'):
1858n/a msg.add_label(label)
1859n/a self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1860n/a self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1861n/a
1862n/a def test_babyl_to_mboxmmdf(self):
1863n/a # Convert BabylMessage to mboxMessage and MMDFMessage
1864n/a pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1865n/a ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1866n/a ('resent', 'RO'))
1867n/a for setting, result in pairs:
1868n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1869n/a msg = mailbox.BabylMessage(_sample_message)
1870n/a msg.add_label(setting)
1871n/a self.assertEqual(class_(msg).get_flags(), result)
1872n/a msg = mailbox.BabylMessage(_sample_message)
1873n/a for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1874n/a 'edited', 'resent'):
1875n/a msg.add_label(label)
1876n/a for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1877n/a self.assertEqual(class_(msg).get_flags(), 'ODA')
1878n/a
1879n/a def test_babyl_to_mh(self):
1880n/a # Convert BabylMessage to MHMessage
1881n/a pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1882n/a ('answered', ['replied']), ('forwarded', []), ('edited', []),
1883n/a ('resent', []))
1884n/a for setting, result in pairs:
1885n/a msg = mailbox.BabylMessage(_sample_message)
1886n/a msg.add_label(setting)
1887n/a self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1888n/a msg = mailbox.BabylMessage(_sample_message)
1889n/a for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1890n/a 'edited', 'resent'):
1891n/a msg.add_label(label)
1892n/a self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1893n/a ['unseen', 'replied'])
1894n/a
1895n/a def test_babyl_to_babyl(self):
1896n/a # Convert BabylMessage to BabylMessage
1897n/a msg = mailbox.BabylMessage(_sample_message)
1898n/a msg.update_visible()
1899n/a for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1900n/a 'edited', 'resent'):
1901n/a msg.add_label(label)
1902n/a msg2 = mailbox.BabylMessage(msg)
1903n/a self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1904n/a 'answered', 'forwarded', 'edited',
1905n/a 'resent'])
1906n/a self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1907n/a for key in msg.get_visible().keys():
1908n/a self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1909n/a
1910n/a
1911n/aclass TestProxyFileBase(TestBase):
1912n/a
1913n/a def _test_read(self, proxy):
1914n/a # Read by byte
1915n/a proxy.seek(0)
1916n/a self.assertEqual(proxy.read(), b'bar')
1917n/a proxy.seek(1)
1918n/a self.assertEqual(proxy.read(), b'ar')
1919n/a proxy.seek(0)
1920n/a self.assertEqual(proxy.read(2), b'ba')
1921n/a proxy.seek(1)
1922n/a self.assertEqual(proxy.read(-1), b'ar')
1923n/a proxy.seek(2)
1924n/a self.assertEqual(proxy.read(1000), b'r')
1925n/a
1926n/a def _test_readline(self, proxy):
1927n/a # Read by line
1928n/a linesep = os.linesep.encode()
1929n/a proxy.seek(0)
1930n/a self.assertEqual(proxy.readline(), b'foo' + linesep)
1931n/a self.assertEqual(proxy.readline(), b'bar' + linesep)
1932n/a self.assertEqual(proxy.readline(), b'fred' + linesep)
1933n/a self.assertEqual(proxy.readline(), b'bob')
1934n/a proxy.seek(2)
1935n/a self.assertEqual(proxy.readline(), b'o' + linesep)
1936n/a proxy.seek(6 + 2 * len(os.linesep))
1937n/a self.assertEqual(proxy.readline(), b'fred' + linesep)
1938n/a proxy.seek(6 + 2 * len(os.linesep))
1939n/a self.assertEqual(proxy.readline(2), b'fr')
1940n/a self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1941n/a
1942n/a def _test_readlines(self, proxy):
1943n/a # Read multiple lines
1944n/a linesep = os.linesep.encode()
1945n/a proxy.seek(0)
1946n/a self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1947n/a b'bar' + linesep,
1948n/a b'fred' + linesep, b'bob'])
1949n/a proxy.seek(0)
1950n/a self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1951n/a proxy.seek(3 + len(linesep))
1952n/a self.assertEqual(proxy.readlines(4 + len(linesep)),
1953n/a [b'bar' + linesep, b'fred' + linesep])
1954n/a proxy.seek(3)
1955n/a self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1956n/a b'fred' + linesep, b'bob'])
1957n/a
1958n/a def _test_iteration(self, proxy):
1959n/a # Iterate by line
1960n/a linesep = os.linesep.encode()
1961n/a proxy.seek(0)
1962n/a iterator = iter(proxy)
1963n/a self.assertEqual(next(iterator), b'foo' + linesep)
1964n/a self.assertEqual(next(iterator), b'bar' + linesep)
1965n/a self.assertEqual(next(iterator), b'fred' + linesep)
1966n/a self.assertEqual(next(iterator), b'bob')
1967n/a self.assertRaises(StopIteration, next, iterator)
1968n/a
1969n/a def _test_seek_and_tell(self, proxy):
1970n/a # Seek and use tell to check position
1971n/a linesep = os.linesep.encode()
1972n/a proxy.seek(3)
1973n/a self.assertEqual(proxy.tell(), 3)
1974n/a self.assertEqual(proxy.read(len(linesep)), linesep)
1975n/a proxy.seek(2, 1)
1976n/a self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
1977n/a proxy.seek(-3 - len(linesep), 2)
1978n/a self.assertEqual(proxy.read(3), b'bar')
1979n/a proxy.seek(2, 0)
1980n/a self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
1981n/a proxy.seek(100)
1982n/a self.assertFalse(proxy.read())
1983n/a
1984n/a def _test_close(self, proxy):
1985n/a # Close a file
1986n/a self.assertFalse(proxy.closed)
1987n/a proxy.close()
1988n/a self.assertTrue(proxy.closed)
1989n/a # Issue 11700 subsequent closes should be a no-op.
1990n/a proxy.close()
1991n/a self.assertTrue(proxy.closed)
1992n/a
1993n/a
1994n/aclass TestProxyFile(TestProxyFileBase, unittest.TestCase):
1995n/a
1996n/a def setUp(self):
1997n/a self._path = support.TESTFN
1998n/a self._file = open(self._path, 'wb+')
1999n/a
2000n/a def tearDown(self):
2001n/a self._file.close()
2002n/a self._delete_recursively(self._path)
2003n/a
2004n/a def test_initialize(self):
2005n/a # Initialize and check position
2006n/a self._file.write(b'foo')
2007n/a pos = self._file.tell()
2008n/a proxy0 = mailbox._ProxyFile(self._file)
2009n/a self.assertEqual(proxy0.tell(), pos)
2010n/a self.assertEqual(self._file.tell(), pos)
2011n/a proxy1 = mailbox._ProxyFile(self._file, 0)
2012n/a self.assertEqual(proxy1.tell(), 0)
2013n/a self.assertEqual(self._file.tell(), pos)
2014n/a
2015n/a def test_read(self):
2016n/a self._file.write(b'bar')
2017n/a self._test_read(mailbox._ProxyFile(self._file))
2018n/a
2019n/a def test_readline(self):
2020n/a self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2021n/a os.linesep), 'ascii'))
2022n/a self._test_readline(mailbox._ProxyFile(self._file))
2023n/a
2024n/a def test_readlines(self):
2025n/a self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2026n/a os.linesep), 'ascii'))
2027n/a self._test_readlines(mailbox._ProxyFile(self._file))
2028n/a
2029n/a def test_iteration(self):
2030n/a self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2031n/a os.linesep), 'ascii'))
2032n/a self._test_iteration(mailbox._ProxyFile(self._file))
2033n/a
2034n/a def test_seek_and_tell(self):
2035n/a self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2036n/a self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2037n/a
2038n/a def test_close(self):
2039n/a self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2040n/a self._test_close(mailbox._ProxyFile(self._file))
2041n/a
2042n/a
2043n/aclass TestPartialFile(TestProxyFileBase, unittest.TestCase):
2044n/a
2045n/a def setUp(self):
2046n/a self._path = support.TESTFN
2047n/a self._file = open(self._path, 'wb+')
2048n/a
2049n/a def tearDown(self):
2050n/a self._file.close()
2051n/a self._delete_recursively(self._path)
2052n/a
2053n/a def test_initialize(self):
2054n/a # Initialize and check position
2055n/a self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2056n/a pos = self._file.tell()
2057n/a proxy = mailbox._PartialFile(self._file, 2, 5)
2058n/a self.assertEqual(proxy.tell(), 0)
2059n/a self.assertEqual(self._file.tell(), pos)
2060n/a
2061n/a def test_read(self):
2062n/a self._file.write(bytes('***bar***', 'ascii'))
2063n/a self._test_read(mailbox._PartialFile(self._file, 3, 6))
2064n/a
2065n/a def test_readline(self):
2066n/a self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2067n/a (os.linesep, os.linesep, os.linesep), 'ascii'))
2068n/a self._test_readline(mailbox._PartialFile(self._file, 5,
2069n/a 18 + 3 * len(os.linesep)))
2070n/a
2071n/a def test_readlines(self):
2072n/a self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2073n/a (os.linesep, os.linesep, os.linesep), 'ascii'))
2074n/a self._test_readlines(mailbox._PartialFile(self._file, 0,
2075n/a 13 + 3 * len(os.linesep)))
2076n/a
2077n/a def test_iteration(self):
2078n/a self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2079n/a (os.linesep, os.linesep, os.linesep), 'ascii'))
2080n/a self._test_iteration(mailbox._PartialFile(self._file, 4,
2081n/a 17 + 3 * len(os.linesep)))
2082n/a
2083n/a def test_seek_and_tell(self):
2084n/a self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2085n/a self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2086n/a 9 + 2 * len(os.linesep)))
2087n/a
2088n/a def test_close(self):
2089n/a self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2090n/a self._test_close(mailbox._PartialFile(self._file, 1,
2091n/a 6 + 3 * len(os.linesep)))
2092n/a
2093n/a
2094n/a## Start: tests from the original module (for backward compatibility).
2095n/a
2096n/aFROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
2097n/aDUMMY_MESSAGE = """\
2098n/aFrom: some.body@dummy.domain
2099n/aTo: me@my.domain
2100n/aSubject: Simple Test
2101n/a
2102n/aThis is a dummy message.
2103n/a"""
2104n/a
2105n/aclass MaildirTestCase(unittest.TestCase):
2106n/a
2107n/a def setUp(self):
2108n/a # create a new maildir mailbox to work with:
2109n/a self._dir = support.TESTFN
2110n/a if os.path.isdir(self._dir):
2111n/a support.rmtree(self._dir)
2112n/a elif os.path.isfile(self._dir):
2113n/a support.unlink(self._dir)
2114n/a os.mkdir(self._dir)
2115n/a os.mkdir(os.path.join(self._dir, "cur"))
2116n/a os.mkdir(os.path.join(self._dir, "tmp"))
2117n/a os.mkdir(os.path.join(self._dir, "new"))
2118n/a self._counter = 1
2119n/a self._msgfiles = []
2120n/a
2121n/a def tearDown(self):
2122n/a list(map(os.unlink, self._msgfiles))
2123n/a support.rmdir(os.path.join(self._dir, "cur"))
2124n/a support.rmdir(os.path.join(self._dir, "tmp"))
2125n/a support.rmdir(os.path.join(self._dir, "new"))
2126n/a support.rmdir(self._dir)
2127n/a
2128n/a def createMessage(self, dir, mbox=False):
2129n/a t = int(time.time() % 1000000)
2130n/a pid = self._counter
2131n/a self._counter += 1
2132n/a filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
2133n/a tmpname = os.path.join(self._dir, "tmp", filename)
2134n/a newname = os.path.join(self._dir, dir, filename)
2135n/a with open(tmpname, "w") as fp:
2136n/a self._msgfiles.append(tmpname)
2137n/a if mbox:
2138n/a fp.write(FROM_)
2139n/a fp.write(DUMMY_MESSAGE)
2140n/a try:
2141n/a os.link(tmpname, newname)
2142n/a except (AttributeError, PermissionError):
2143n/a with open(newname, "w") as fp:
2144n/a fp.write(DUMMY_MESSAGE)
2145n/a self._msgfiles.append(newname)
2146n/a return tmpname
2147n/a
2148n/a def test_empty_maildir(self):
2149n/a """Test an empty maildir mailbox"""
2150n/a # Test for regression on bug #117490:
2151n/a # Make sure the boxes attribute actually gets set.
2152n/a self.mbox = mailbox.Maildir(support.TESTFN)
2153n/a #self.assertTrue(hasattr(self.mbox, "boxes"))
2154n/a #self.assertEqual(len(self.mbox.boxes), 0)
2155n/a self.assertIsNone(self.mbox.next())
2156n/a self.assertIsNone(self.mbox.next())
2157n/a
2158n/a def test_nonempty_maildir_cur(self):
2159n/a self.createMessage("cur")
2160n/a self.mbox = mailbox.Maildir(support.TESTFN)
2161n/a #self.assertEqual(len(self.mbox.boxes), 1)
2162n/a self.assertIsNotNone(self.mbox.next())
2163n/a self.assertIsNone(self.mbox.next())
2164n/a self.assertIsNone(self.mbox.next())
2165n/a
2166n/a def test_nonempty_maildir_new(self):
2167n/a self.createMessage("new")
2168n/a self.mbox = mailbox.Maildir(support.TESTFN)
2169n/a #self.assertEqual(len(self.mbox.boxes), 1)
2170n/a self.assertIsNotNone(self.mbox.next())
2171n/a self.assertIsNone(self.mbox.next())
2172n/a self.assertIsNone(self.mbox.next())
2173n/a
2174n/a def test_nonempty_maildir_both(self):
2175n/a self.createMessage("cur")
2176n/a self.createMessage("new")
2177n/a self.mbox = mailbox.Maildir(support.TESTFN)
2178n/a #self.assertEqual(len(self.mbox.boxes), 2)
2179n/a self.assertIsNotNone(self.mbox.next())
2180n/a self.assertIsNotNone(self.mbox.next())
2181n/a self.assertIsNone(self.mbox.next())
2182n/a self.assertIsNone(self.mbox.next())
2183n/a
2184n/a## End: tests from the original module (for backward compatibility).
2185n/a
2186n/a
2187n/a_sample_message = """\
2188n/aReturn-Path: <gkj@gregorykjohnson.com>
2189n/aX-Original-To: gkj+person@localhost
2190n/aDelivered-To: gkj+person@localhost
2191n/aReceived: from localhost (localhost [127.0.0.1])
2192n/a by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2193n/a for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2194n/aDelivered-To: gkj@sundance.gregorykjohnson.com
2195n/aReceived: from localhost [127.0.0.1]
2196n/a by localhost with POP3 (fetchmail-6.2.5)
2197n/a for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2198n/aReceived: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2199n/a by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2200n/a for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2201n/aReceived: by andy.gregorykjohnson.com (Postfix, from userid 1000)
2202n/a id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2203n/aDate: Wed, 13 Jul 2005 17:23:11 -0400
2204n/aFrom: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
2205n/aTo: gkj@gregorykjohnson.com
2206n/aSubject: Sample message
2207n/aMessage-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
2208n/aMime-Version: 1.0
2209n/aContent-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
2210n/aContent-Disposition: inline
2211n/aUser-Agent: Mutt/1.5.9i
2212n/a
2213n/a
2214n/a--NMuMz9nt05w80d4+
2215n/aContent-Type: text/plain; charset=us-ascii
2216n/aContent-Disposition: inline
2217n/a
2218n/aThis is a sample message.
2219n/a
2220n/a--
2221n/aGregory K. Johnson
2222n/a
2223n/a--NMuMz9nt05w80d4+
2224n/aContent-Type: application/octet-stream
2225n/aContent-Disposition: attachment; filename="text.gz"
2226n/aContent-Transfer-Encoding: base64
2227n/a
2228n/aH4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
2229n/a3FYlAAAA
2230n/a
2231n/a--NMuMz9nt05w80d4+--
2232n/a"""
2233n/a
2234n/a_bytes_sample_message = _sample_message.encode('ascii')
2235n/a
2236n/a_sample_headers = {
2237n/a "Return-Path":"<gkj@gregorykjohnson.com>",
2238n/a "X-Original-To":"gkj+person@localhost",
2239n/a "Delivered-To":"gkj+person@localhost",
2240n/a "Received":"""from localhost (localhost [127.0.0.1])
2241n/a by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2242n/a for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2243n/a "Delivered-To":"gkj@sundance.gregorykjohnson.com",
2244n/a "Received":"""from localhost [127.0.0.1]
2245n/a by localhost with POP3 (fetchmail-6.2.5)
2246n/a for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2247n/a "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2248n/a by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2249n/a for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2250n/a "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
2251n/a id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2252n/a "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
2253n/a "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
2254n/a "To":"gkj@gregorykjohnson.com",
2255n/a "Subject":"Sample message",
2256n/a "Mime-Version":"1.0",
2257n/a "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
2258n/a "Content-Disposition":"inline",
2259n/a "User-Agent": "Mutt/1.5.9i" }
2260n/a
2261n/a_sample_payloads = ("""This is a sample message.
2262n/a
2263n/a--
2264n/aGregory K. Johnson
2265n/a""",
2266n/a"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
2267n/a3FYlAAAA
2268n/a""")
2269n/a
2270n/a
2271n/aclass MiscTestCase(unittest.TestCase):
2272n/a def test__all__(self):
2273n/a blacklist = {"linesep", "fcntl"}
2274n/a support.check__all__(self, mailbox, blacklist=blacklist)
2275n/a
2276n/a
2277n/adef test_main():
2278n/a tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
2279n/a TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
2280n/a TestMHMessage, TestBabylMessage, TestMMDFMessage,
2281n/a TestMessageConversion, TestProxyFile, TestPartialFile,
2282n/a MaildirTestCase, TestFakeMailBox, MiscTestCase)
2283n/a support.run_unittest(*tests)
2284n/a support.reap_children()
2285n/a
2286n/a
2287n/aif __name__ == '__main__':
2288n/a test_main()