»Core Development>Code coverage>Lib/test/test_imaplib.py

Python code coverage for Lib/test/test_imaplib.py

#countcontent
1n/afrom test import support
2n/a# If we end up with a significant number of tests that don't require
3n/a# threading, this test module should be split. Right now we skip
4n/a# them all if we don't have threading.
5n/athreading = support.import_module('threading')
6n/a
7n/afrom contextlib import contextmanager
8n/aimport imaplib
9n/aimport os.path
10n/aimport socketserver
11n/aimport time
12n/aimport calendar
13n/aimport inspect
14n/a
15n/afrom test.support import (reap_threads, verbose, transient_internet,
16n/a run_with_tz, run_with_locale)
17n/aimport unittest
18n/afrom unittest import mock
19n/afrom datetime import datetime, timezone, timedelta
20n/atry:
21n/a import ssl
22n/aexcept ImportError:
23n/a ssl = None
24n/a
25n/aCERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
26n/aCAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
27n/a
28n/a
29n/aclass TestImaplib(unittest.TestCase):
30n/a
31n/a def test_Internaldate2tuple(self):
32n/a t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
33n/a tt = imaplib.Internaldate2tuple(
34n/a b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
35n/a self.assertEqual(time.mktime(tt), t0)
36n/a tt = imaplib.Internaldate2tuple(
37n/a b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
38n/a self.assertEqual(time.mktime(tt), t0)
39n/a tt = imaplib.Internaldate2tuple(
40n/a b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
41n/a self.assertEqual(time.mktime(tt), t0)
42n/a
43n/a @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
44n/a def test_Internaldate2tuple_issue10941(self):
45n/a self.assertNotEqual(imaplib.Internaldate2tuple(
46n/a b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
47n/a imaplib.Internaldate2tuple(
48n/a b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
49n/a
50n/a def timevalues(self):
51n/a return [2000000000, 2000000000.0, time.localtime(2000000000),
52n/a (2033, 5, 18, 5, 33, 20, -1, -1, -1),
53n/a (2033, 5, 18, 5, 33, 20, -1, -1, 1),
54n/a datetime.fromtimestamp(2000000000,
55n/a timezone(timedelta(0, 2 * 60 * 60))),
56n/a '"18-May-2033 05:33:20 +0200"']
57n/a
58n/a @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
59n/a # DST rules included to work around quirk where the Gnu C library may not
60n/a # otherwise restore the previous time zone
61n/a @run_with_tz('STD-1DST,M3.2.0,M11.1.0')
62n/a def test_Time2Internaldate(self):
63n/a expected = '"18-May-2033 05:33:20 +0200"'
64n/a
65n/a for t in self.timevalues():
66n/a internal = imaplib.Time2Internaldate(t)
67n/a self.assertEqual(internal, expected)
68n/a
69n/a def test_that_Time2Internaldate_returns_a_result(self):
70n/a # Without tzset, we can check only that it successfully
71n/a # produces a result, not the correctness of the result itself,
72n/a # since the result depends on the timezone the machine is in.
73n/a for t in self.timevalues():
74n/a imaplib.Time2Internaldate(t)
75n/a
76n/a
77n/aif ssl:
78n/a class SecureTCPServer(socketserver.TCPServer):
79n/a
80n/a def get_request(self):
81n/a newsocket, fromaddr = self.socket.accept()
82n/a context = ssl.SSLContext()
83n/a context.load_cert_chain(CERTFILE)
84n/a connstream = context.wrap_socket(newsocket, server_side=True)
85n/a return connstream, fromaddr
86n/a
87n/a IMAP4_SSL = imaplib.IMAP4_SSL
88n/a
89n/aelse:
90n/a
91n/a class SecureTCPServer:
92n/a pass
93n/a
94n/a IMAP4_SSL = None
95n/a
96n/a
97n/aclass SimpleIMAPHandler(socketserver.StreamRequestHandler):
98n/a timeout = 1
99n/a continuation = None
100n/a capabilities = ''
101n/a
102n/a def setup(self):
103n/a super().setup()
104n/a self.server.logged = None
105n/a
106n/a def _send(self, message):
107n/a if verbose:
108n/a print("SENT: %r" % message.strip())
109n/a self.wfile.write(message)
110n/a
111n/a def _send_line(self, message):
112n/a self._send(message + b'\r\n')
113n/a
114n/a def _send_textline(self, message):
115n/a self._send_line(message.encode('ASCII'))
116n/a
117n/a def _send_tagged(self, tag, code, message):
118n/a self._send_textline(' '.join((tag, code, message)))
119n/a
120n/a def handle(self):
121n/a # Send a welcome message.
122n/a self._send_textline('* OK IMAP4rev1')
123n/a while 1:
124n/a # Gather up input until we receive a line terminator or we timeout.
125n/a # Accumulate read(1) because it's simpler to handle the differences
126n/a # between naked sockets and SSL sockets.
127n/a line = b''
128n/a while 1:
129n/a try:
130n/a part = self.rfile.read(1)
131n/a if part == b'':
132n/a # Naked sockets return empty strings..
133n/a return
134n/a line += part
135n/a except OSError:
136n/a # ..but SSLSockets raise exceptions.
137n/a return
138n/a if line.endswith(b'\r\n'):
139n/a break
140n/a
141n/a if verbose:
142n/a print('GOT: %r' % line.strip())
143n/a if self.continuation:
144n/a try:
145n/a self.continuation.send(line)
146n/a except StopIteration:
147n/a self.continuation = None
148n/a continue
149n/a splitline = line.decode('ASCII').split()
150n/a tag = splitline[0]
151n/a cmd = splitline[1]
152n/a args = splitline[2:]
153n/a
154n/a if hasattr(self, 'cmd_' + cmd):
155n/a continuation = getattr(self, 'cmd_' + cmd)(tag, args)
156n/a if continuation:
157n/a self.continuation = continuation
158n/a next(continuation)
159n/a else:
160n/a self._send_tagged(tag, 'BAD', cmd + ' unknown')
161n/a
162n/a def cmd_CAPABILITY(self, tag, args):
163n/a caps = ('IMAP4rev1 ' + self.capabilities
164n/a if self.capabilities
165n/a else 'IMAP4rev1')
166n/a self._send_textline('* CAPABILITY ' + caps)
167n/a self._send_tagged(tag, 'OK', 'CAPABILITY completed')
168n/a
169n/a def cmd_LOGOUT(self, tag, args):
170n/a self.server.logged = None
171n/a self._send_textline('* BYE IMAP4ref1 Server logging out')
172n/a self._send_tagged(tag, 'OK', 'LOGOUT completed')
173n/a
174n/a def cmd_LOGIN(self, tag, args):
175n/a self.server.logged = args[0]
176n/a self._send_tagged(tag, 'OK', 'LOGIN completed')
177n/a
178n/a
179n/aclass NewIMAPTestsMixin():
180n/a client = None
181n/a
182n/a def _setup(self, imap_handler, connect=True):
183n/a """
184n/a Sets up imap_handler for tests. imap_handler should inherit from either:
185n/a - SimpleIMAPHandler - for testing IMAP commands,
186n/a - socketserver.StreamRequestHandler - if raw access to stream is needed.
187n/a Returns (client, server).
188n/a """
189n/a class TestTCPServer(self.server_class):
190n/a def handle_error(self, request, client_address):
191n/a """
192n/a End request and raise the error if one occurs.
193n/a """
194n/a self.close_request(request)
195n/a self.server_close()
196n/a raise
197n/a
198n/a self.addCleanup(self._cleanup)
199n/a self.server = self.server_class((support.HOST, 0), imap_handler)
200n/a self.thread = threading.Thread(
201n/a name=self._testMethodName+'-server',
202n/a target=self.server.serve_forever,
203n/a # Short poll interval to make the test finish quickly.
204n/a # Time between requests is short enough that we won't wake
205n/a # up spuriously too many times.
206n/a kwargs={'poll_interval': 0.01})
207n/a self.thread.daemon = True # In case this function raises.
208n/a self.thread.start()
209n/a
210n/a if connect:
211n/a self.client = self.imap_class(*self.server.server_address)
212n/a
213n/a return self.client, self.server
214n/a
215n/a def _cleanup(self):
216n/a """
217n/a Cleans up the test server. This method should not be called manually,
218n/a it is added to the cleanup queue in the _setup method already.
219n/a """
220n/a # if logout was called already we'd raise an exception trying to
221n/a # shutdown the client once again
222n/a if self.client is not None and self.client.state != 'LOGOUT':
223n/a self.client.shutdown()
224n/a # cleanup the server
225n/a self.server.shutdown()
226n/a self.server.server_close()
227n/a self.thread.join(3.0)
228n/a
229n/a def test_EOF_without_complete_welcome_message(self):
230n/a # http://bugs.python.org/issue5949
231n/a class EOFHandler(socketserver.StreamRequestHandler):
232n/a def handle(self):
233n/a self.wfile.write(b'* OK')
234n/a _, server = self._setup(EOFHandler, connect=False)
235n/a self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
236n/a *server.server_address)
237n/a
238n/a def test_line_termination(self):
239n/a class BadNewlineHandler(SimpleIMAPHandler):
240n/a def cmd_CAPABILITY(self, tag, args):
241n/a self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
242n/a self._send_tagged(tag, 'OK', 'CAPABILITY completed')
243n/a _, server = self._setup(BadNewlineHandler, connect=False)
244n/a self.assertRaises(imaplib.IMAP4.abort, self.imap_class,
245n/a *server.server_address)
246n/a
247n/a def test_enable_raises_error_if_not_AUTH(self):
248n/a class EnableHandler(SimpleIMAPHandler):
249n/a capabilities = 'AUTH ENABLE UTF8=ACCEPT'
250n/a client, _ = self._setup(EnableHandler)
251n/a self.assertFalse(client.utf8_enabled)
252n/a with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'):
253n/a client.enable('foo')
254n/a self.assertFalse(client.utf8_enabled)
255n/a
256n/a def test_enable_raises_error_if_no_capability(self):
257n/a client, _ = self._setup(SimpleIMAPHandler)
258n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
259n/a 'does not support ENABLE'):
260n/a client.enable('foo')
261n/a
262n/a def test_enable_UTF8_raises_error_if_not_supported(self):
263n/a client, _ = self._setup(SimpleIMAPHandler)
264n/a typ, data = client.login('user', 'pass')
265n/a self.assertEqual(typ, 'OK')
266n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
267n/a 'does not support ENABLE'):
268n/a client.enable('UTF8=ACCEPT')
269n/a
270n/a def test_enable_UTF8_True_append(self):
271n/a class UTF8AppendServer(SimpleIMAPHandler):
272n/a capabilities = 'ENABLE UTF8=ACCEPT'
273n/a def cmd_ENABLE(self, tag, args):
274n/a self._send_tagged(tag, 'OK', 'ENABLE successful')
275n/a def cmd_AUTHENTICATE(self, tag, args):
276n/a self._send_textline('+')
277n/a self.server.response = yield
278n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
279n/a def cmd_APPEND(self, tag, args):
280n/a self._send_textline('+')
281n/a self.server.response = yield
282n/a self._send_tagged(tag, 'OK', 'okay')
283n/a client, server = self._setup(UTF8AppendServer)
284n/a self.assertEqual(client._encoding, 'ascii')
285n/a code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
286n/a self.assertEqual(code, 'OK')
287n/a self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
288n/a code, _ = client.enable('UTF8=ACCEPT')
289n/a self.assertEqual(code, 'OK')
290n/a self.assertEqual(client._encoding, 'utf-8')
291n/a msg_string = 'Subject: üñí©öðé'
292n/a typ, data = client.append(None, None, None, msg_string.encode('utf-8'))
293n/a self.assertEqual(typ, 'OK')
294n/a self.assertEqual(server.response,
295n/a ('UTF8 (%s)\r\n' % msg_string).encode('utf-8'))
296n/a
297n/a def test_search_disallows_charset_in_utf8_mode(self):
298n/a class UTF8Server(SimpleIMAPHandler):
299n/a capabilities = 'AUTH ENABLE UTF8=ACCEPT'
300n/a def cmd_ENABLE(self, tag, args):
301n/a self._send_tagged(tag, 'OK', 'ENABLE successful')
302n/a def cmd_AUTHENTICATE(self, tag, args):
303n/a self._send_textline('+')
304n/a self.server.response = yield
305n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
306n/a client, _ = self._setup(UTF8Server)
307n/a typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
308n/a self.assertEqual(typ, 'OK')
309n/a typ, _ = client.enable('UTF8=ACCEPT')
310n/a self.assertEqual(typ, 'OK')
311n/a self.assertTrue(client.utf8_enabled)
312n/a with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'):
313n/a client.search('foo', 'bar')
314n/a
315n/a def test_bad_auth_name(self):
316n/a class MyServer(SimpleIMAPHandler):
317n/a def cmd_AUTHENTICATE(self, tag, args):
318n/a self._send_tagged(tag, 'NO',
319n/a 'unrecognized authentication type {}'.format(args[0]))
320n/a client, _ = self._setup(MyServer)
321n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
322n/a 'unrecognized authentication type METHOD'):
323n/a client.authenticate('METHOD', lambda: 1)
324n/a
325n/a def test_invalid_authentication(self):
326n/a class MyServer(SimpleIMAPHandler):
327n/a def cmd_AUTHENTICATE(self, tag, args):
328n/a self._send_textline('+')
329n/a self.response = yield
330n/a self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
331n/a client, _ = self._setup(MyServer)
332n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
333n/a r'\[AUTHENTICATIONFAILED\] invalid'):
334n/a client.authenticate('MYAUTH', lambda x: b'fake')
335n/a
336n/a def test_valid_authentication_bytes(self):
337n/a class MyServer(SimpleIMAPHandler):
338n/a def cmd_AUTHENTICATE(self, tag, args):
339n/a self._send_textline('+')
340n/a self.server.response = yield
341n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
342n/a client, server = self._setup(MyServer)
343n/a code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
344n/a self.assertEqual(code, 'OK')
345n/a self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
346n/a
347n/a def test_valid_authentication_plain_text(self):
348n/a class MyServer(SimpleIMAPHandler):
349n/a def cmd_AUTHENTICATE(self, tag, args):
350n/a self._send_textline('+')
351n/a self.server.response = yield
352n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
353n/a client, server = self._setup(MyServer)
354n/a code, _ = client.authenticate('MYAUTH', lambda x: 'fake')
355n/a self.assertEqual(code, 'OK')
356n/a self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake'
357n/a
358n/a def test_login_cram_md5_bytes(self):
359n/a class AuthHandler(SimpleIMAPHandler):
360n/a capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
361n/a def cmd_AUTHENTICATE(self, tag, args):
362n/a self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
363n/a 'VzdG9uLm1jaS5uZXQ=')
364n/a r = yield
365n/a if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
366n/a b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
367n/a self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
368n/a else:
369n/a self._send_tagged(tag, 'NO', 'No access')
370n/a client, _ = self._setup(AuthHandler)
371n/a self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
372n/a ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf")
373n/a self.assertEqual(ret, "OK")
374n/a
375n/a def test_login_cram_md5_plain_text(self):
376n/a class AuthHandler(SimpleIMAPHandler):
377n/a capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
378n/a def cmd_AUTHENTICATE(self, tag, args):
379n/a self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
380n/a 'VzdG9uLm1jaS5uZXQ=')
381n/a r = yield
382n/a if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
383n/a b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
384n/a self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
385n/a else:
386n/a self._send_tagged(tag, 'NO', 'No access')
387n/a client, _ = self._setup(AuthHandler)
388n/a self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
389n/a ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf")
390n/a self.assertEqual(ret, "OK")
391n/a
392n/a def test_aborted_authentication(self):
393n/a class MyServer(SimpleIMAPHandler):
394n/a def cmd_AUTHENTICATE(self, tag, args):
395n/a self._send_textline('+')
396n/a self.response = yield
397n/a if self.response == b'*\r\n':
398n/a self._send_tagged(
399n/a tag,
400n/a 'NO',
401n/a '[AUTHENTICATIONFAILED] aborted')
402n/a else:
403n/a self._send_tagged(tag, 'OK', 'MYAUTH successful')
404n/a client, _ = self._setup(MyServer)
405n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
406n/a r'\[AUTHENTICATIONFAILED\] aborted'):
407n/a client.authenticate('MYAUTH', lambda x: None)
408n/a
409n/a @mock.patch('imaplib._MAXLINE', 10)
410n/a def test_linetoolong(self):
411n/a class TooLongHandler(SimpleIMAPHandler):
412n/a def handle(self):
413n/a # send response line longer than the limit set in the next line
414n/a self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n')
415n/a _, server = self._setup(TooLongHandler, connect=False)
416n/a with self.assertRaisesRegex(imaplib.IMAP4.error,
417n/a 'got more than 10 bytes'):
418n/a self.imap_class(*server.server_address)
419n/a
420n/a def test_simple_with_statement(self):
421n/a _, server = self._setup(SimpleIMAPHandler, connect=False)
422n/a with self.imap_class(*server.server_address):
423n/a pass
424n/a
425n/a def test_with_statement(self):
426n/a _, server = self._setup(SimpleIMAPHandler, connect=False)
427n/a with self.imap_class(*server.server_address) as imap:
428n/a imap.login('user', 'pass')
429n/a self.assertEqual(server.logged, 'user')
430n/a self.assertIsNone(server.logged)
431n/a
432n/a def test_with_statement_logout(self):
433n/a # It is legal to log out explicitly inside the with block
434n/a _, server = self._setup(SimpleIMAPHandler, connect=False)
435n/a with self.imap_class(*server.server_address) as imap:
436n/a imap.login('user', 'pass')
437n/a self.assertEqual(server.logged, 'user')
438n/a imap.logout()
439n/a self.assertIsNone(server.logged)
440n/a self.assertIsNone(server.logged)
441n/a
442n/a # command tests
443n/a
444n/a def test_login(self):
445n/a client, _ = self._setup(SimpleIMAPHandler)
446n/a typ, data = client.login('user', 'pass')
447n/a self.assertEqual(typ, 'OK')
448n/a self.assertEqual(data[0], b'LOGIN completed')
449n/a self.assertEqual(client.state, 'AUTH')
450n/a
451n/a def test_logout(self):
452n/a client, _ = self._setup(SimpleIMAPHandler)
453n/a typ, data = client.login('user', 'pass')
454n/a self.assertEqual(typ, 'OK')
455n/a self.assertEqual(data[0], b'LOGIN completed')
456n/a typ, data = client.logout()
457n/a self.assertEqual(typ, 'BYE')
458n/a self.assertEqual(data[0], b'IMAP4ref1 Server logging out')
459n/a self.assertEqual(client.state, 'LOGOUT')
460n/a
461n/a def test_lsub(self):
462n/a class LsubCmd(SimpleIMAPHandler):
463n/a def cmd_LSUB(self, tag, args):
464n/a self._send_textline('* LSUB () "." directoryA')
465n/a return self._send_tagged(tag, 'OK', 'LSUB completed')
466n/a client, _ = self._setup(LsubCmd)
467n/a client.login('user', 'pass')
468n/a typ, data = client.lsub()
469n/a self.assertEqual(typ, 'OK')
470n/a self.assertEqual(data[0], b'() "." directoryA')
471n/a
472n/a
473n/aclass NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
474n/a imap_class = imaplib.IMAP4
475n/a server_class = socketserver.TCPServer
476n/a
477n/a
478n/a@unittest.skipUnless(ssl, "SSL not available")
479n/aclass NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
480n/a imap_class = IMAP4_SSL
481n/a server_class = SecureTCPServer
482n/a
483n/a def test_ssl_raises(self):
484n/a ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
485n/a ssl_context.verify_mode = ssl.CERT_REQUIRED
486n/a ssl_context.check_hostname = True
487n/a ssl_context.load_verify_locations(CAFILE)
488n/a
489n/a with self.assertRaisesRegex(ssl.CertificateError,
490n/a "hostname '127.0.0.1' doesn't match 'localhost'"):
491n/a _, server = self._setup(SimpleIMAPHandler)
492n/a client = self.imap_class(*server.server_address,
493n/a ssl_context=ssl_context)
494n/a client.shutdown()
495n/a
496n/a def test_ssl_verified(self):
497n/a ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
498n/a ssl_context.verify_mode = ssl.CERT_REQUIRED
499n/a ssl_context.check_hostname = True
500n/a ssl_context.load_verify_locations(CAFILE)
501n/a
502n/a _, server = self._setup(SimpleIMAPHandler)
503n/a client = self.imap_class("localhost", server.server_address[1],
504n/a ssl_context=ssl_context)
505n/a client.shutdown()
506n/a
507n/aclass ThreadedNetworkedTests(unittest.TestCase):
508n/a server_class = socketserver.TCPServer
509n/a imap_class = imaplib.IMAP4
510n/a
511n/a def make_server(self, addr, hdlr):
512n/a
513n/a class MyServer(self.server_class):
514n/a def handle_error(self, request, client_address):
515n/a self.close_request(request)
516n/a self.server_close()
517n/a raise
518n/a
519n/a if verbose:
520n/a print("creating server")
521n/a server = MyServer(addr, hdlr)
522n/a self.assertEqual(server.server_address, server.socket.getsockname())
523n/a
524n/a if verbose:
525n/a print("server created")
526n/a print("ADDR =", addr)
527n/a print("CLASS =", self.server_class)
528n/a print("HDLR =", server.RequestHandlerClass)
529n/a
530n/a t = threading.Thread(
531n/a name='%s serving' % self.server_class,
532n/a target=server.serve_forever,
533n/a # Short poll interval to make the test finish quickly.
534n/a # Time between requests is short enough that we won't wake
535n/a # up spuriously too many times.
536n/a kwargs={'poll_interval': 0.01})
537n/a t.daemon = True # In case this function raises.
538n/a t.start()
539n/a if verbose:
540n/a print("server running")
541n/a return server, t
542n/a
543n/a def reap_server(self, server, thread):
544n/a if verbose:
545n/a print("waiting for server")
546n/a server.shutdown()
547n/a server.server_close()
548n/a thread.join()
549n/a if verbose:
550n/a print("done")
551n/a
552n/a @contextmanager
553n/a def reaped_server(self, hdlr):
554n/a server, thread = self.make_server((support.HOST, 0), hdlr)
555n/a try:
556n/a yield server
557n/a finally:
558n/a self.reap_server(server, thread)
559n/a
560n/a @contextmanager
561n/a def reaped_pair(self, hdlr):
562n/a with self.reaped_server(hdlr) as server:
563n/a client = self.imap_class(*server.server_address)
564n/a try:
565n/a yield server, client
566n/a finally:
567n/a client.logout()
568n/a
569n/a @reap_threads
570n/a def test_connect(self):
571n/a with self.reaped_server(SimpleIMAPHandler) as server:
572n/a client = self.imap_class(*server.server_address)
573n/a client.shutdown()
574n/a
575n/a @reap_threads
576n/a def test_bracket_flags(self):
577n/a
578n/a # This violates RFC 3501, which disallows ']' characters in tag names,
579n/a # but imaplib has allowed producing such tags forever, other programs
580n/a # also produce them (eg: OtherInbox's Organizer app as of 20140716),
581n/a # and Gmail, for example, accepts them and produces them. So we
582n/a # support them. See issue #21815.
583n/a
584n/a class BracketFlagHandler(SimpleIMAPHandler):
585n/a
586n/a def handle(self):
587n/a self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']
588n/a super().handle()
589n/a
590n/a def cmd_AUTHENTICATE(self, tag, args):
591n/a self._send_textline('+')
592n/a self.server.response = yield
593n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
594n/a
595n/a def cmd_SELECT(self, tag, args):
596n/a flag_msg = ' \\'.join(self.flags)
597n/a self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii'))
598n/a self._send_line(b'* 2 EXISTS')
599n/a self._send_line(b'* 0 RECENT')
600n/a msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.'
601n/a % flag_msg)
602n/a self._send_line(msg.encode('ascii'))
603n/a self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
604n/a
605n/a def cmd_STORE(self, tag, args):
606n/a new_flags = args[2].strip('(').strip(')').split()
607n/a self.flags.extend(new_flags)
608n/a flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags)
609n/a msg = '* %s FETCH %s' % (args[0], flags_msg)
610n/a self._send_line(msg.encode('ascii'))
611n/a self._send_tagged(tag, 'OK', 'STORE completed.')
612n/a
613n/a with self.reaped_pair(BracketFlagHandler) as (server, client):
614n/a code, data = client.authenticate('MYAUTH', lambda x: b'fake')
615n/a self.assertEqual(code, 'OK')
616n/a self.assertEqual(server.response, b'ZmFrZQ==\r\n')
617n/a client.select('test')
618n/a typ, [data] = client.store(b'1', "+FLAGS", "[test]")
619n/a self.assertIn(b'[test]', data)
620n/a client.select('test')
621n/a typ, [data] = client.response('PERMANENTFLAGS')
622n/a self.assertIn(b'[test]', data)
623n/a
624n/a @reap_threads
625n/a def test_issue5949(self):
626n/a
627n/a class EOFHandler(socketserver.StreamRequestHandler):
628n/a def handle(self):
629n/a # EOF without sending a complete welcome message.
630n/a self.wfile.write(b'* OK')
631n/a
632n/a with self.reaped_server(EOFHandler) as server:
633n/a self.assertRaises(imaplib.IMAP4.abort,
634n/a self.imap_class, *server.server_address)
635n/a
636n/a @reap_threads
637n/a def test_line_termination(self):
638n/a
639n/a class BadNewlineHandler(SimpleIMAPHandler):
640n/a
641n/a def cmd_CAPABILITY(self, tag, args):
642n/a self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
643n/a self._send_tagged(tag, 'OK', 'CAPABILITY completed')
644n/a
645n/a with self.reaped_server(BadNewlineHandler) as server:
646n/a self.assertRaises(imaplib.IMAP4.abort,
647n/a self.imap_class, *server.server_address)
648n/a
649n/a class UTF8Server(SimpleIMAPHandler):
650n/a capabilities = 'AUTH ENABLE UTF8=ACCEPT'
651n/a
652n/a def cmd_ENABLE(self, tag, args):
653n/a self._send_tagged(tag, 'OK', 'ENABLE successful')
654n/a
655n/a def cmd_AUTHENTICATE(self, tag, args):
656n/a self._send_textline('+')
657n/a self.server.response = yield
658n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
659n/a
660n/a @reap_threads
661n/a def test_enable_raises_error_if_not_AUTH(self):
662n/a with self.reaped_pair(self.UTF8Server) as (server, client):
663n/a self.assertFalse(client.utf8_enabled)
664n/a self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
665n/a self.assertFalse(client.utf8_enabled)
666n/a
667n/a # XXX Also need a test that enable after SELECT raises an error.
668n/a
669n/a @reap_threads
670n/a def test_enable_raises_error_if_no_capability(self):
671n/a class NoEnableServer(self.UTF8Server):
672n/a capabilities = 'AUTH'
673n/a with self.reaped_pair(NoEnableServer) as (server, client):
674n/a self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo')
675n/a
676n/a @reap_threads
677n/a def test_enable_UTF8_raises_error_if_not_supported(self):
678n/a class NonUTF8Server(SimpleIMAPHandler):
679n/a pass
680n/a with self.assertRaises(imaplib.IMAP4.error):
681n/a with self.reaped_pair(NonUTF8Server) as (server, client):
682n/a typ, data = client.login('user', 'pass')
683n/a self.assertEqual(typ, 'OK')
684n/a client.enable('UTF8=ACCEPT')
685n/a pass
686n/a
687n/a @reap_threads
688n/a def test_enable_UTF8_True_append(self):
689n/a
690n/a class UTF8AppendServer(self.UTF8Server):
691n/a def cmd_APPEND(self, tag, args):
692n/a self._send_textline('+')
693n/a self.server.response = yield
694n/a self._send_tagged(tag, 'OK', 'okay')
695n/a
696n/a with self.reaped_pair(UTF8AppendServer) as (server, client):
697n/a self.assertEqual(client._encoding, 'ascii')
698n/a code, _ = client.authenticate('MYAUTH', lambda x: b'fake')
699n/a self.assertEqual(code, 'OK')
700n/a self.assertEqual(server.response,
701n/a b'ZmFrZQ==\r\n') # b64 encoded 'fake'
702n/a code, _ = client.enable('UTF8=ACCEPT')
703n/a self.assertEqual(code, 'OK')
704n/a self.assertEqual(client._encoding, 'utf-8')
705n/a msg_string = 'Subject: üñí©öðé'
706n/a typ, data = client.append(
707n/a None, None, None, msg_string.encode('utf-8'))
708n/a self.assertEqual(typ, 'OK')
709n/a self.assertEqual(
710n/a server.response,
711n/a ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')
712n/a )
713n/a
714n/a # XXX also need a test that makes sure that the Literal and Untagged_status
715n/a # regexes uses unicode in UTF8 mode instead of the default ASCII.
716n/a
717n/a @reap_threads
718n/a def test_search_disallows_charset_in_utf8_mode(self):
719n/a with self.reaped_pair(self.UTF8Server) as (server, client):
720n/a typ, _ = client.authenticate('MYAUTH', lambda x: b'fake')
721n/a self.assertEqual(typ, 'OK')
722n/a typ, _ = client.enable('UTF8=ACCEPT')
723n/a self.assertEqual(typ, 'OK')
724n/a self.assertTrue(client.utf8_enabled)
725n/a self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar')
726n/a
727n/a @reap_threads
728n/a def test_bad_auth_name(self):
729n/a
730n/a class MyServer(SimpleIMAPHandler):
731n/a
732n/a def cmd_AUTHENTICATE(self, tag, args):
733n/a self._send_tagged(tag, 'NO', 'unrecognized authentication '
734n/a 'type {}'.format(args[0]))
735n/a
736n/a with self.reaped_pair(MyServer) as (server, client):
737n/a with self.assertRaises(imaplib.IMAP4.error):
738n/a client.authenticate('METHOD', lambda: 1)
739n/a
740n/a @reap_threads
741n/a def test_invalid_authentication(self):
742n/a
743n/a class MyServer(SimpleIMAPHandler):
744n/a
745n/a def cmd_AUTHENTICATE(self, tag, args):
746n/a self._send_textline('+')
747n/a self.response = yield
748n/a self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
749n/a
750n/a with self.reaped_pair(MyServer) as (server, client):
751n/a with self.assertRaises(imaplib.IMAP4.error):
752n/a code, data = client.authenticate('MYAUTH', lambda x: b'fake')
753n/a
754n/a @reap_threads
755n/a def test_valid_authentication(self):
756n/a
757n/a class MyServer(SimpleIMAPHandler):
758n/a
759n/a def cmd_AUTHENTICATE(self, tag, args):
760n/a self._send_textline('+')
761n/a self.server.response = yield
762n/a self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
763n/a
764n/a with self.reaped_pair(MyServer) as (server, client):
765n/a code, data = client.authenticate('MYAUTH', lambda x: b'fake')
766n/a self.assertEqual(code, 'OK')
767n/a self.assertEqual(server.response,
768n/a b'ZmFrZQ==\r\n') # b64 encoded 'fake'
769n/a
770n/a with self.reaped_pair(MyServer) as (server, client):
771n/a code, data = client.authenticate('MYAUTH', lambda x: 'fake')
772n/a self.assertEqual(code, 'OK')
773n/a self.assertEqual(server.response,
774n/a b'ZmFrZQ==\r\n') # b64 encoded 'fake'
775n/a
776n/a @reap_threads
777n/a def test_login_cram_md5(self):
778n/a
779n/a class AuthHandler(SimpleIMAPHandler):
780n/a
781n/a capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
782n/a
783n/a def cmd_AUTHENTICATE(self, tag, args):
784n/a self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
785n/a 'VzdG9uLm1jaS5uZXQ=')
786n/a r = yield
787n/a if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT'
788n/a b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'):
789n/a self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
790n/a else:
791n/a self._send_tagged(tag, 'NO', 'No access')
792n/a
793n/a with self.reaped_pair(AuthHandler) as (server, client):
794n/a self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
795n/a ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
796n/a self.assertEqual(ret, "OK")
797n/a
798n/a with self.reaped_pair(AuthHandler) as (server, client):
799n/a self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
800n/a ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
801n/a self.assertEqual(ret, "OK")
802n/a
803n/a
804n/a @reap_threads
805n/a def test_aborted_authentication(self):
806n/a
807n/a class MyServer(SimpleIMAPHandler):
808n/a
809n/a def cmd_AUTHENTICATE(self, tag, args):
810n/a self._send_textline('+')
811n/a self.response = yield
812n/a
813n/a if self.response == b'*\r\n':
814n/a self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
815n/a else:
816n/a self._send_tagged(tag, 'OK', 'MYAUTH successful')
817n/a
818n/a with self.reaped_pair(MyServer) as (server, client):
819n/a with self.assertRaises(imaplib.IMAP4.error):
820n/a code, data = client.authenticate('MYAUTH', lambda x: None)
821n/a
822n/a
823n/a def test_linetoolong(self):
824n/a class TooLongHandler(SimpleIMAPHandler):
825n/a def handle(self):
826n/a # Send a very long response line
827n/a self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n')
828n/a
829n/a with self.reaped_server(TooLongHandler) as server:
830n/a self.assertRaises(imaplib.IMAP4.error,
831n/a self.imap_class, *server.server_address)
832n/a
833n/a @reap_threads
834n/a def test_simple_with_statement(self):
835n/a # simplest call
836n/a with self.reaped_server(SimpleIMAPHandler) as server:
837n/a with self.imap_class(*server.server_address):
838n/a pass
839n/a
840n/a @reap_threads
841n/a def test_with_statement(self):
842n/a with self.reaped_server(SimpleIMAPHandler) as server:
843n/a with self.imap_class(*server.server_address) as imap:
844n/a imap.login('user', 'pass')
845n/a self.assertEqual(server.logged, 'user')
846n/a self.assertIsNone(server.logged)
847n/a
848n/a @reap_threads
849n/a def test_with_statement_logout(self):
850n/a # what happens if already logout in the block?
851n/a with self.reaped_server(SimpleIMAPHandler) as server:
852n/a with self.imap_class(*server.server_address) as imap:
853n/a imap.login('user', 'pass')
854n/a self.assertEqual(server.logged, 'user')
855n/a imap.logout()
856n/a self.assertIsNone(server.logged)
857n/a self.assertIsNone(server.logged)
858n/a
859n/a
860n/a@unittest.skipUnless(ssl, "SSL not available")
861n/aclass ThreadedNetworkedTestsSSL(ThreadedNetworkedTests):
862n/a server_class = SecureTCPServer
863n/a imap_class = IMAP4_SSL
864n/a
865n/a @reap_threads
866n/a def test_ssl_verified(self):
867n/a ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
868n/a ssl_context.verify_mode = ssl.CERT_REQUIRED
869n/a ssl_context.check_hostname = True
870n/a ssl_context.load_verify_locations(CAFILE)
871n/a
872n/a with self.assertRaisesRegex(
873n/a ssl.CertificateError,
874n/a "hostname '127.0.0.1' doesn't match 'localhost'"):
875n/a with self.reaped_server(SimpleIMAPHandler) as server:
876n/a client = self.imap_class(*server.server_address,
877n/a ssl_context=ssl_context)
878n/a client.shutdown()
879n/a
880n/a with self.reaped_server(SimpleIMAPHandler) as server:
881n/a client = self.imap_class("localhost", server.server_address[1],
882n/a ssl_context=ssl_context)
883n/a client.shutdown()
884n/a
885n/a
886n/a@unittest.skipUnless(
887n/a support.is_resource_enabled('network'), 'network resource disabled')
888n/aclass RemoteIMAPTest(unittest.TestCase):
889n/a host = 'cyrus.andrew.cmu.edu'
890n/a port = 143
891n/a username = 'anonymous'
892n/a password = 'pass'
893n/a imap_class = imaplib.IMAP4
894n/a
895n/a def setUp(self):
896n/a with transient_internet(self.host):
897n/a self.server = self.imap_class(self.host, self.port)
898n/a
899n/a def tearDown(self):
900n/a if self.server is not None:
901n/a with transient_internet(self.host):
902n/a self.server.logout()
903n/a
904n/a def test_logincapa(self):
905n/a with transient_internet(self.host):
906n/a for cap in self.server.capabilities:
907n/a self.assertIsInstance(cap, str)
908n/a self.assertIn('LOGINDISABLED', self.server.capabilities)
909n/a self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
910n/a rs = self.server.login(self.username, self.password)
911n/a self.assertEqual(rs[0], 'OK')
912n/a
913n/a def test_logout(self):
914n/a with transient_internet(self.host):
915n/a rs = self.server.logout()
916n/a self.server = None
917n/a self.assertEqual(rs[0], 'BYE')
918n/a
919n/a
920n/a@unittest.skipUnless(ssl, "SSL not available")
921n/a@unittest.skipUnless(
922n/a support.is_resource_enabled('network'), 'network resource disabled')
923n/aclass RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
924n/a
925n/a def setUp(self):
926n/a super().setUp()
927n/a with transient_internet(self.host):
928n/a rs = self.server.starttls()
929n/a self.assertEqual(rs[0], 'OK')
930n/a
931n/a def test_logincapa(self):
932n/a for cap in self.server.capabilities:
933n/a self.assertIsInstance(cap, str)
934n/a self.assertNotIn('LOGINDISABLED', self.server.capabilities)
935n/a
936n/a
937n/a@unittest.skipUnless(ssl, "SSL not available")
938n/aclass RemoteIMAP_SSLTest(RemoteIMAPTest):
939n/a port = 993
940n/a imap_class = IMAP4_SSL
941n/a
942n/a def setUp(self):
943n/a pass
944n/a
945n/a def tearDown(self):
946n/a pass
947n/a
948n/a def create_ssl_context(self):
949n/a ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
950n/a ssl_context.load_cert_chain(CERTFILE)
951n/a return ssl_context
952n/a
953n/a def check_logincapa(self, server):
954n/a try:
955n/a for cap in server.capabilities:
956n/a self.assertIsInstance(cap, str)
957n/a self.assertNotIn('LOGINDISABLED', server.capabilities)
958n/a self.assertIn('AUTH=PLAIN', server.capabilities)
959n/a rs = server.login(self.username, self.password)
960n/a self.assertEqual(rs[0], 'OK')
961n/a finally:
962n/a server.logout()
963n/a
964n/a def test_logincapa(self):
965n/a with transient_internet(self.host):
966n/a _server = self.imap_class(self.host, self.port)
967n/a self.check_logincapa(_server)
968n/a
969n/a def test_logincapa_with_client_certfile(self):
970n/a with transient_internet(self.host):
971n/a with support.check_warnings(('', DeprecationWarning)):
972n/a _server = self.imap_class(self.host, self.port,
973n/a certfile=CERTFILE)
974n/a self.check_logincapa(_server)
975n/a
976n/a def test_logincapa_with_client_ssl_context(self):
977n/a with transient_internet(self.host):
978n/a _server = self.imap_class(
979n/a self.host, self.port, ssl_context=self.create_ssl_context())
980n/a self.check_logincapa(_server)
981n/a
982n/a def test_logout(self):
983n/a with transient_internet(self.host):
984n/a _server = self.imap_class(self.host, self.port)
985n/a rs = _server.logout()
986n/a self.assertEqual(rs[0], 'BYE')
987n/a
988n/a def test_ssl_context_certfile_exclusive(self):
989n/a with transient_internet(self.host):
990n/a self.assertRaises(
991n/a ValueError, self.imap_class, self.host, self.port,
992n/a certfile=CERTFILE, ssl_context=self.create_ssl_context())
993n/a
994n/a def test_ssl_context_keyfile_exclusive(self):
995n/a with transient_internet(self.host):
996n/a self.assertRaises(
997n/a ValueError, self.imap_class, self.host, self.port,
998n/a keyfile=CERTFILE, ssl_context=self.create_ssl_context())
999n/a
1000n/a
1001n/aif __name__ == "__main__":
1002n/a unittest.main()