ยปCore Development>Code coverage>Lib/test/test_poplib.py

Python code coverage for Lib/test/test_poplib.py

#countcontent
1n/a"""Test script for poplib module."""
2n/a
3n/a# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
4n/a# a real test suite
5n/a
6n/aimport poplib
7n/aimport asyncore
8n/aimport asynchat
9n/aimport socket
10n/aimport os
11n/aimport errno
12n/a
13n/afrom unittest import TestCase, skipUnless
14n/afrom test import support as test_support
15n/athreading = test_support.import_module('threading')
16n/a
17n/aHOST = test_support.HOST
18n/aPORT = 0
19n/a
20n/aSUPPORTS_SSL = False
21n/aif hasattr(poplib, 'POP3_SSL'):
22n/a import ssl
23n/a
24n/a SUPPORTS_SSL = True
25n/a CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
26n/a CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
27n/a
28n/arequires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
29n/a
30n/a# the dummy data returned by server when LIST and RETR commands are issued
31n/aLIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
32n/aRETR_RESP = b"""From: postmaster@python.org\
33n/a\r\nContent-Type: text/plain\r\n\
34n/aMIME-Version: 1.0\r\n\
35n/aSubject: Dummy\r\n\
36n/a\r\n\
37n/aline1\r\n\
38n/aline2\r\n\
39n/aline3\r\n\
40n/a.\r\n"""
41n/a
42n/a
43n/aclass DummyPOP3Handler(asynchat.async_chat):
44n/a
45n/a CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
46n/a enable_UTF8 = False
47n/a
48n/a def __init__(self, conn):
49n/a asynchat.async_chat.__init__(self, conn)
50n/a self.set_terminator(b"\r\n")
51n/a self.in_buffer = []
52n/a self.push('+OK dummy pop3 server ready. <timestamp>')
53n/a self.tls_active = False
54n/a self.tls_starting = False
55n/a
56n/a def collect_incoming_data(self, data):
57n/a self.in_buffer.append(data)
58n/a
59n/a def found_terminator(self):
60n/a line = b''.join(self.in_buffer)
61n/a line = str(line, 'ISO-8859-1')
62n/a self.in_buffer = []
63n/a cmd = line.split(' ')[0].lower()
64n/a space = line.find(' ')
65n/a if space != -1:
66n/a arg = line[space + 1:]
67n/a else:
68n/a arg = ""
69n/a if hasattr(self, 'cmd_' + cmd):
70n/a method = getattr(self, 'cmd_' + cmd)
71n/a method(arg)
72n/a else:
73n/a self.push('-ERR unrecognized POP3 command "%s".' %cmd)
74n/a
75n/a def handle_error(self):
76n/a raise
77n/a
78n/a def push(self, data):
79n/a asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
80n/a
81n/a def cmd_echo(self, arg):
82n/a # sends back the received string (used by the test suite)
83n/a self.push(arg)
84n/a
85n/a def cmd_user(self, arg):
86n/a if arg != "guido":
87n/a self.push("-ERR no such user")
88n/a self.push('+OK password required')
89n/a
90n/a def cmd_pass(self, arg):
91n/a if arg != "python":
92n/a self.push("-ERR wrong password")
93n/a self.push('+OK 10 messages')
94n/a
95n/a def cmd_stat(self, arg):
96n/a self.push('+OK 10 100')
97n/a
98n/a def cmd_list(self, arg):
99n/a if arg:
100n/a self.push('+OK %s %s' % (arg, arg))
101n/a else:
102n/a self.push('+OK')
103n/a asynchat.async_chat.push(self, LIST_RESP)
104n/a
105n/a cmd_uidl = cmd_list
106n/a
107n/a def cmd_retr(self, arg):
108n/a self.push('+OK %s bytes' %len(RETR_RESP))
109n/a asynchat.async_chat.push(self, RETR_RESP)
110n/a
111n/a cmd_top = cmd_retr
112n/a
113n/a def cmd_dele(self, arg):
114n/a self.push('+OK message marked for deletion.')
115n/a
116n/a def cmd_noop(self, arg):
117n/a self.push('+OK done nothing.')
118n/a
119n/a def cmd_rpop(self, arg):
120n/a self.push('+OK done nothing.')
121n/a
122n/a def cmd_apop(self, arg):
123n/a self.push('+OK done nothing.')
124n/a
125n/a def cmd_quit(self, arg):
126n/a self.push('+OK closing.')
127n/a self.close_when_done()
128n/a
129n/a def _get_capas(self):
130n/a _capas = dict(self.CAPAS)
131n/a if not self.tls_active and SUPPORTS_SSL:
132n/a _capas['STLS'] = []
133n/a return _capas
134n/a
135n/a def cmd_capa(self, arg):
136n/a self.push('+OK Capability list follows')
137n/a if self._get_capas():
138n/a for cap, params in self._get_capas().items():
139n/a _ln = [cap]
140n/a if params:
141n/a _ln.extend(params)
142n/a self.push(' '.join(_ln))
143n/a self.push('.')
144n/a
145n/a def cmd_utf8(self, arg):
146n/a self.push('+OK I know RFC6856'
147n/a if self.enable_UTF8
148n/a else '-ERR What is UTF8?!')
149n/a
150n/a if SUPPORTS_SSL:
151n/a
152n/a def cmd_stls(self, arg):
153n/a if self.tls_active is False:
154n/a self.push('+OK Begin TLS negotiation')
155n/a context = ssl.SSLContext()
156n/a context.load_cert_chain(CERTFILE)
157n/a tls_sock = context.wrap_socket(self.socket,
158n/a server_side=True,
159n/a do_handshake_on_connect=False,
160n/a suppress_ragged_eofs=False)
161n/a self.del_channel()
162n/a self.set_socket(tls_sock)
163n/a self.tls_active = True
164n/a self.tls_starting = True
165n/a self.in_buffer = []
166n/a self._do_tls_handshake()
167n/a else:
168n/a self.push('-ERR Command not permitted when TLS active')
169n/a
170n/a def _do_tls_handshake(self):
171n/a try:
172n/a self.socket.do_handshake()
173n/a except ssl.SSLError as err:
174n/a if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
175n/a ssl.SSL_ERROR_WANT_WRITE):
176n/a return
177n/a elif err.args[0] == ssl.SSL_ERROR_EOF:
178n/a return self.handle_close()
179n/a raise
180n/a except OSError as err:
181n/a if err.args[0] == errno.ECONNABORTED:
182n/a return self.handle_close()
183n/a else:
184n/a self.tls_active = True
185n/a self.tls_starting = False
186n/a
187n/a def handle_read(self):
188n/a if self.tls_starting:
189n/a self._do_tls_handshake()
190n/a else:
191n/a try:
192n/a asynchat.async_chat.handle_read(self)
193n/a except ssl.SSLEOFError:
194n/a self.handle_close()
195n/a
196n/aclass DummyPOP3Server(asyncore.dispatcher, threading.Thread):
197n/a
198n/a handler = DummyPOP3Handler
199n/a
200n/a def __init__(self, address, af=socket.AF_INET):
201n/a threading.Thread.__init__(self)
202n/a asyncore.dispatcher.__init__(self)
203n/a self.create_socket(af, socket.SOCK_STREAM)
204n/a self.bind(address)
205n/a self.listen(5)
206n/a self.active = False
207n/a self.active_lock = threading.Lock()
208n/a self.host, self.port = self.socket.getsockname()[:2]
209n/a self.handler_instance = None
210n/a
211n/a def start(self):
212n/a assert not self.active
213n/a self.__flag = threading.Event()
214n/a threading.Thread.start(self)
215n/a self.__flag.wait()
216n/a
217n/a def run(self):
218n/a self.active = True
219n/a self.__flag.set()
220n/a while self.active and asyncore.socket_map:
221n/a self.active_lock.acquire()
222n/a asyncore.loop(timeout=0.1, count=1)
223n/a self.active_lock.release()
224n/a asyncore.close_all(ignore_all=True)
225n/a
226n/a def stop(self):
227n/a assert self.active
228n/a self.active = False
229n/a self.join()
230n/a
231n/a def handle_accepted(self, conn, addr):
232n/a self.handler_instance = self.handler(conn)
233n/a
234n/a def handle_connect(self):
235n/a self.close()
236n/a handle_read = handle_connect
237n/a
238n/a def writable(self):
239n/a return 0
240n/a
241n/a def handle_error(self):
242n/a raise
243n/a
244n/a
245n/aclass TestPOP3Class(TestCase):
246n/a def assertOK(self, resp):
247n/a self.assertTrue(resp.startswith(b"+OK"))
248n/a
249n/a def setUp(self):
250n/a self.server = DummyPOP3Server((HOST, PORT))
251n/a self.server.start()
252n/a self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
253n/a
254n/a def tearDown(self):
255n/a self.client.close()
256n/a self.server.stop()
257n/a
258n/a def test_getwelcome(self):
259n/a self.assertEqual(self.client.getwelcome(),
260n/a b'+OK dummy pop3 server ready. <timestamp>')
261n/a
262n/a def test_exceptions(self):
263n/a self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
264n/a
265n/a def test_user(self):
266n/a self.assertOK(self.client.user('guido'))
267n/a self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
268n/a
269n/a def test_pass_(self):
270n/a self.assertOK(self.client.pass_('python'))
271n/a self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
272n/a
273n/a def test_stat(self):
274n/a self.assertEqual(self.client.stat(), (10, 100))
275n/a
276n/a def test_list(self):
277n/a self.assertEqual(self.client.list()[1:],
278n/a ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
279n/a 25))
280n/a self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
281n/a
282n/a def test_retr(self):
283n/a expected = (b'+OK 116 bytes',
284n/a [b'From: postmaster@python.org', b'Content-Type: text/plain',
285n/a b'MIME-Version: 1.0', b'Subject: Dummy',
286n/a b'', b'line1', b'line2', b'line3'],
287n/a 113)
288n/a foo = self.client.retr('foo')
289n/a self.assertEqual(foo, expected)
290n/a
291n/a def test_too_long_lines(self):
292n/a self.assertRaises(poplib.error_proto, self.client._shortcmd,
293n/a 'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
294n/a
295n/a def test_dele(self):
296n/a self.assertOK(self.client.dele('foo'))
297n/a
298n/a def test_noop(self):
299n/a self.assertOK(self.client.noop())
300n/a
301n/a def test_rpop(self):
302n/a self.assertOK(self.client.rpop('foo'))
303n/a
304n/a def test_apop(self):
305n/a self.assertOK(self.client.apop('foo', 'dummypassword'))
306n/a
307n/a def test_top(self):
308n/a expected = (b'+OK 116 bytes',
309n/a [b'From: postmaster@python.org', b'Content-Type: text/plain',
310n/a b'MIME-Version: 1.0', b'Subject: Dummy', b'',
311n/a b'line1', b'line2', b'line3'],
312n/a 113)
313n/a self.assertEqual(self.client.top(1, 1), expected)
314n/a
315n/a def test_uidl(self):
316n/a self.client.uidl()
317n/a self.client.uidl('foo')
318n/a
319n/a def test_utf8_raises_if_unsupported(self):
320n/a self.server.handler.enable_UTF8 = False
321n/a self.assertRaises(poplib.error_proto, self.client.utf8)
322n/a
323n/a def test_utf8(self):
324n/a self.server.handler.enable_UTF8 = True
325n/a expected = b'+OK I know RFC6856'
326n/a result = self.client.utf8()
327n/a self.assertEqual(result, expected)
328n/a
329n/a def test_capa(self):
330n/a capa = self.client.capa()
331n/a self.assertTrue('IMPLEMENTATION' in capa.keys())
332n/a
333n/a def test_quit(self):
334n/a resp = self.client.quit()
335n/a self.assertTrue(resp)
336n/a self.assertIsNone(self.client.sock)
337n/a self.assertIsNone(self.client.file)
338n/a
339n/a @requires_ssl
340n/a def test_stls_capa(self):
341n/a capa = self.client.capa()
342n/a self.assertTrue('STLS' in capa.keys())
343n/a
344n/a @requires_ssl
345n/a def test_stls(self):
346n/a expected = b'+OK Begin TLS negotiation'
347n/a resp = self.client.stls()
348n/a self.assertEqual(resp, expected)
349n/a
350n/a @requires_ssl
351n/a def test_stls_context(self):
352n/a expected = b'+OK Begin TLS negotiation'
353n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
354n/a ctx.load_verify_locations(CAFILE)
355n/a ctx.verify_mode = ssl.CERT_REQUIRED
356n/a ctx.check_hostname = True
357n/a with self.assertRaises(ssl.CertificateError):
358n/a resp = self.client.stls(context=ctx)
359n/a self.client = poplib.POP3("localhost", self.server.port, timeout=3)
360n/a resp = self.client.stls(context=ctx)
361n/a self.assertEqual(resp, expected)
362n/a
363n/a
364n/aif SUPPORTS_SSL:
365n/a from test.test_ftplib import SSLConnection
366n/a
367n/a class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler):
368n/a
369n/a def __init__(self, conn):
370n/a asynchat.async_chat.__init__(self, conn)
371n/a self.secure_connection()
372n/a self.set_terminator(b"\r\n")
373n/a self.in_buffer = []
374n/a self.push('+OK dummy pop3 server ready. <timestamp>')
375n/a self.tls_active = True
376n/a self.tls_starting = False
377n/a
378n/a
379n/a@requires_ssl
380n/aclass TestPOP3_SSLClass(TestPOP3Class):
381n/a # repeat previous tests by using poplib.POP3_SSL
382n/a
383n/a def setUp(self):
384n/a self.server = DummyPOP3Server((HOST, PORT))
385n/a self.server.handler = DummyPOP3_SSLHandler
386n/a self.server.start()
387n/a self.client = poplib.POP3_SSL(self.server.host, self.server.port)
388n/a
389n/a def test__all__(self):
390n/a self.assertIn('POP3_SSL', poplib.__all__)
391n/a
392n/a def test_context(self):
393n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
394n/a self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
395n/a self.server.port, keyfile=CERTFILE, context=ctx)
396n/a self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
397n/a self.server.port, certfile=CERTFILE, context=ctx)
398n/a self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
399n/a self.server.port, keyfile=CERTFILE,
400n/a certfile=CERTFILE, context=ctx)
401n/a
402n/a self.client.quit()
403n/a self.client = poplib.POP3_SSL(self.server.host, self.server.port,
404n/a context=ctx)
405n/a self.assertIsInstance(self.client.sock, ssl.SSLSocket)
406n/a self.assertIs(self.client.sock.context, ctx)
407n/a self.assertTrue(self.client.noop().startswith(b'+OK'))
408n/a
409n/a def test_stls(self):
410n/a self.assertRaises(poplib.error_proto, self.client.stls)
411n/a
412n/a test_stls_context = test_stls
413n/a
414n/a def test_stls_capa(self):
415n/a capa = self.client.capa()
416n/a self.assertFalse('STLS' in capa.keys())
417n/a
418n/a
419n/a@requires_ssl
420n/aclass TestPOP3_TLSClass(TestPOP3Class):
421n/a # repeat previous tests by using poplib.POP3.stls()
422n/a
423n/a def setUp(self):
424n/a self.server = DummyPOP3Server((HOST, PORT))
425n/a self.server.start()
426n/a self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
427n/a self.client.stls()
428n/a
429n/a def tearDown(self):
430n/a if self.client.file is not None and self.client.sock is not None:
431n/a try:
432n/a self.client.quit()
433n/a except poplib.error_proto:
434n/a # happens in the test_too_long_lines case; the overlong
435n/a # response will be treated as response to QUIT and raise
436n/a # this exception
437n/a self.client.close()
438n/a self.server.stop()
439n/a
440n/a def test_stls(self):
441n/a self.assertRaises(poplib.error_proto, self.client.stls)
442n/a
443n/a test_stls_context = test_stls
444n/a
445n/a def test_stls_capa(self):
446n/a capa = self.client.capa()
447n/a self.assertFalse(b'STLS' in capa.keys())
448n/a
449n/a
450n/aclass TestTimeouts(TestCase):
451n/a
452n/a def setUp(self):
453n/a self.evt = threading.Event()
454n/a self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455n/a self.sock.settimeout(60) # Safety net. Look issue 11812
456n/a self.port = test_support.bind_port(self.sock)
457n/a self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock))
458n/a self.thread.setDaemon(True)
459n/a self.thread.start()
460n/a self.evt.wait()
461n/a
462n/a def tearDown(self):
463n/a self.thread.join()
464n/a del self.thread # Clear out any dangling Thread objects.
465n/a
466n/a def server(self, evt, serv):
467n/a serv.listen()
468n/a evt.set()
469n/a try:
470n/a conn, addr = serv.accept()
471n/a conn.send(b"+ Hola mundo\n")
472n/a conn.close()
473n/a except socket.timeout:
474n/a pass
475n/a finally:
476n/a serv.close()
477n/a
478n/a def testTimeoutDefault(self):
479n/a self.assertIsNone(socket.getdefaulttimeout())
480n/a socket.setdefaulttimeout(30)
481n/a try:
482n/a pop = poplib.POP3(HOST, self.port)
483n/a finally:
484n/a socket.setdefaulttimeout(None)
485n/a self.assertEqual(pop.sock.gettimeout(), 30)
486n/a pop.sock.close()
487n/a
488n/a def testTimeoutNone(self):
489n/a self.assertIsNone(socket.getdefaulttimeout())
490n/a socket.setdefaulttimeout(30)
491n/a try:
492n/a pop = poplib.POP3(HOST, self.port, timeout=None)
493n/a finally:
494n/a socket.setdefaulttimeout(None)
495n/a self.assertIsNone(pop.sock.gettimeout())
496n/a pop.sock.close()
497n/a
498n/a def testTimeoutValue(self):
499n/a pop = poplib.POP3(HOST, self.port, timeout=30)
500n/a self.assertEqual(pop.sock.gettimeout(), 30)
501n/a pop.sock.close()
502n/a
503n/a
504n/adef test_main():
505n/a tests = [TestPOP3Class, TestTimeouts,
506n/a TestPOP3_SSLClass, TestPOP3_TLSClass]
507n/a thread_info = test_support.threading_setup()
508n/a try:
509n/a test_support.run_unittest(*tests)
510n/a finally:
511n/a test_support.threading_cleanup(*thread_info)
512n/a
513n/a
514n/aif __name__ == '__main__':
515n/a test_main()