ยปCore Development>Code coverage>Lib/test/test_ftplib.py

Python code coverage for Lib/test/test_ftplib.py

#countcontent
1n/a"""Test script for ftplib module."""
2n/a
3n/a# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
4n/a# environment
5n/a
6n/aimport ftplib
7n/aimport asyncore
8n/aimport asynchat
9n/aimport socket
10n/aimport io
11n/aimport errno
12n/aimport os
13n/aimport time
14n/atry:
15n/a import ssl
16n/aexcept ImportError:
17n/a ssl = None
18n/a
19n/afrom unittest import TestCase, skipUnless
20n/afrom test import support
21n/afrom test.support import HOST, HOSTv6
22n/athreading = support.import_module('threading')
23n/a
24n/aTIMEOUT = 3
25n/a# the dummy data returned by server over the data channel when
26n/a# RETR, LIST, NLST, MLSD commands are issued
27n/aRETR_DATA = 'abcde12345\r\n' * 1000
28n/aLIST_DATA = 'foo\r\nbar\r\n'
29n/aNLST_DATA = 'foo\r\nbar\r\n'
30n/aMLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
31n/a "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
32n/a "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
33n/a "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
34n/a "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
35n/a "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
36n/a "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
37n/a "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
38n/a "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
39n/a "type=file;perm=r;unique==keVO1+IH4; leading space\r\n"
40n/a "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
41n/a "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
42n/a "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
43n/a "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
44n/a "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
45n/a
46n/a
47n/aclass DummyDTPHandler(asynchat.async_chat):
48n/a dtp_conn_closed = False
49n/a
50n/a def __init__(self, conn, baseclass):
51n/a asynchat.async_chat.__init__(self, conn)
52n/a self.baseclass = baseclass
53n/a self.baseclass.last_received_data = ''
54n/a
55n/a def handle_read(self):
56n/a self.baseclass.last_received_data += self.recv(1024).decode('ascii')
57n/a
58n/a def handle_close(self):
59n/a # XXX: this method can be called many times in a row for a single
60n/a # connection, including in clear-text (non-TLS) mode.
61n/a # (behaviour witnessed with test_data_connection)
62n/a if not self.dtp_conn_closed:
63n/a self.baseclass.push('226 transfer complete')
64n/a self.close()
65n/a self.dtp_conn_closed = True
66n/a
67n/a def push(self, what):
68n/a if self.baseclass.next_data is not None:
69n/a what = self.baseclass.next_data
70n/a self.baseclass.next_data = None
71n/a if not what:
72n/a return self.close_when_done()
73n/a super(DummyDTPHandler, self).push(what.encode('ascii'))
74n/a
75n/a def handle_error(self):
76n/a raise Exception
77n/a
78n/a
79n/aclass DummyFTPHandler(asynchat.async_chat):
80n/a
81n/a dtp_handler = DummyDTPHandler
82n/a
83n/a def __init__(self, conn):
84n/a asynchat.async_chat.__init__(self, conn)
85n/a # tells the socket to handle urgent data inline (ABOR command)
86n/a self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
87n/a self.set_terminator(b"\r\n")
88n/a self.in_buffer = []
89n/a self.dtp = None
90n/a self.last_received_cmd = None
91n/a self.last_received_data = ''
92n/a self.next_response = ''
93n/a self.next_data = None
94n/a self.rest = None
95n/a self.next_retr_data = RETR_DATA
96n/a self.push('220 welcome')
97n/a
98n/a def collect_incoming_data(self, data):
99n/a self.in_buffer.append(data)
100n/a
101n/a def found_terminator(self):
102n/a line = b''.join(self.in_buffer).decode('ascii')
103n/a self.in_buffer = []
104n/a if self.next_response:
105n/a self.push(self.next_response)
106n/a self.next_response = ''
107n/a cmd = line.split(' ')[0].lower()
108n/a self.last_received_cmd = cmd
109n/a space = line.find(' ')
110n/a if space != -1:
111n/a arg = line[space + 1:]
112n/a else:
113n/a arg = ""
114n/a if hasattr(self, 'cmd_' + cmd):
115n/a method = getattr(self, 'cmd_' + cmd)
116n/a method(arg)
117n/a else:
118n/a self.push('550 command "%s" not understood.' %cmd)
119n/a
120n/a def handle_error(self):
121n/a raise Exception
122n/a
123n/a def push(self, data):
124n/a asynchat.async_chat.push(self, data.encode('ascii') + b'\r\n')
125n/a
126n/a def cmd_port(self, arg):
127n/a addr = list(map(int, arg.split(',')))
128n/a ip = '%d.%d.%d.%d' %tuple(addr[:4])
129n/a port = (addr[4] * 256) + addr[5]
130n/a s = socket.create_connection((ip, port), timeout=TIMEOUT)
131n/a self.dtp = self.dtp_handler(s, baseclass=self)
132n/a self.push('200 active data connection established')
133n/a
134n/a def cmd_pasv(self, arg):
135n/a with socket.socket() as sock:
136n/a sock.bind((self.socket.getsockname()[0], 0))
137n/a sock.listen()
138n/a sock.settimeout(TIMEOUT)
139n/a ip, port = sock.getsockname()[:2]
140n/a ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
141n/a self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
142n/a conn, addr = sock.accept()
143n/a self.dtp = self.dtp_handler(conn, baseclass=self)
144n/a
145n/a def cmd_eprt(self, arg):
146n/a af, ip, port = arg.split(arg[0])[1:-1]
147n/a port = int(port)
148n/a s = socket.create_connection((ip, port), timeout=TIMEOUT)
149n/a self.dtp = self.dtp_handler(s, baseclass=self)
150n/a self.push('200 active data connection established')
151n/a
152n/a def cmd_epsv(self, arg):
153n/a with socket.socket(socket.AF_INET6) as sock:
154n/a sock.bind((self.socket.getsockname()[0], 0))
155n/a sock.listen()
156n/a sock.settimeout(TIMEOUT)
157n/a port = sock.getsockname()[1]
158n/a self.push('229 entering extended passive mode (|||%d|)' %port)
159n/a conn, addr = sock.accept()
160n/a self.dtp = self.dtp_handler(conn, baseclass=self)
161n/a
162n/a def cmd_echo(self, arg):
163n/a # sends back the received string (used by the test suite)
164n/a self.push(arg)
165n/a
166n/a def cmd_noop(self, arg):
167n/a self.push('200 noop ok')
168n/a
169n/a def cmd_user(self, arg):
170n/a self.push('331 username ok')
171n/a
172n/a def cmd_pass(self, arg):
173n/a self.push('230 password ok')
174n/a
175n/a def cmd_acct(self, arg):
176n/a self.push('230 acct ok')
177n/a
178n/a def cmd_rnfr(self, arg):
179n/a self.push('350 rnfr ok')
180n/a
181n/a def cmd_rnto(self, arg):
182n/a self.push('250 rnto ok')
183n/a
184n/a def cmd_dele(self, arg):
185n/a self.push('250 dele ok')
186n/a
187n/a def cmd_cwd(self, arg):
188n/a self.push('250 cwd ok')
189n/a
190n/a def cmd_size(self, arg):
191n/a self.push('250 1000')
192n/a
193n/a def cmd_mkd(self, arg):
194n/a self.push('257 "%s"' %arg)
195n/a
196n/a def cmd_rmd(self, arg):
197n/a self.push('250 rmd ok')
198n/a
199n/a def cmd_pwd(self, arg):
200n/a self.push('257 "pwd ok"')
201n/a
202n/a def cmd_type(self, arg):
203n/a self.push('200 type ok')
204n/a
205n/a def cmd_quit(self, arg):
206n/a self.push('221 quit ok')
207n/a self.close()
208n/a
209n/a def cmd_abor(self, arg):
210n/a self.push('226 abor ok')
211n/a
212n/a def cmd_stor(self, arg):
213n/a self.push('125 stor ok')
214n/a
215n/a def cmd_rest(self, arg):
216n/a self.rest = arg
217n/a self.push('350 rest ok')
218n/a
219n/a def cmd_retr(self, arg):
220n/a self.push('125 retr ok')
221n/a if self.rest is not None:
222n/a offset = int(self.rest)
223n/a else:
224n/a offset = 0
225n/a self.dtp.push(self.next_retr_data[offset:])
226n/a self.dtp.close_when_done()
227n/a self.rest = None
228n/a
229n/a def cmd_list(self, arg):
230n/a self.push('125 list ok')
231n/a self.dtp.push(LIST_DATA)
232n/a self.dtp.close_when_done()
233n/a
234n/a def cmd_nlst(self, arg):
235n/a self.push('125 nlst ok')
236n/a self.dtp.push(NLST_DATA)
237n/a self.dtp.close_when_done()
238n/a
239n/a def cmd_opts(self, arg):
240n/a self.push('200 opts ok')
241n/a
242n/a def cmd_mlsd(self, arg):
243n/a self.push('125 mlsd ok')
244n/a self.dtp.push(MLSD_DATA)
245n/a self.dtp.close_when_done()
246n/a
247n/a def cmd_setlongretr(self, arg):
248n/a # For testing. Next RETR will return long line.
249n/a self.next_retr_data = 'x' * int(arg)
250n/a self.push('125 setlongretr ok')
251n/a
252n/a
253n/aclass DummyFTPServer(asyncore.dispatcher, threading.Thread):
254n/a
255n/a handler = DummyFTPHandler
256n/a
257n/a def __init__(self, address, af=socket.AF_INET):
258n/a threading.Thread.__init__(self)
259n/a asyncore.dispatcher.__init__(self)
260n/a self.create_socket(af, socket.SOCK_STREAM)
261n/a self.bind(address)
262n/a self.listen(5)
263n/a self.active = False
264n/a self.active_lock = threading.Lock()
265n/a self.host, self.port = self.socket.getsockname()[:2]
266n/a self.handler_instance = None
267n/a
268n/a def start(self):
269n/a assert not self.active
270n/a self.__flag = threading.Event()
271n/a threading.Thread.start(self)
272n/a self.__flag.wait()
273n/a
274n/a def run(self):
275n/a self.active = True
276n/a self.__flag.set()
277n/a while self.active and asyncore.socket_map:
278n/a self.active_lock.acquire()
279n/a asyncore.loop(timeout=0.1, count=1)
280n/a self.active_lock.release()
281n/a asyncore.close_all(ignore_all=True)
282n/a
283n/a def stop(self):
284n/a assert self.active
285n/a self.active = False
286n/a self.join()
287n/a
288n/a def handle_accepted(self, conn, addr):
289n/a self.handler_instance = self.handler(conn)
290n/a
291n/a def handle_connect(self):
292n/a self.close()
293n/a handle_read = handle_connect
294n/a
295n/a def writable(self):
296n/a return 0
297n/a
298n/a def handle_error(self):
299n/a raise Exception
300n/a
301n/a
302n/aif ssl is not None:
303n/a
304n/a CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
305n/a CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
306n/a
307n/a class SSLConnection(asyncore.dispatcher):
308n/a """An asyncore.dispatcher subclass supporting TLS/SSL."""
309n/a
310n/a _ssl_accepting = False
311n/a _ssl_closing = False
312n/a
313n/a def secure_connection(self):
314n/a context = ssl.SSLContext()
315n/a context.load_cert_chain(CERTFILE)
316n/a socket = context.wrap_socket(self.socket,
317n/a suppress_ragged_eofs=False,
318n/a server_side=True,
319n/a do_handshake_on_connect=False)
320n/a self.del_channel()
321n/a self.set_socket(socket)
322n/a self._ssl_accepting = True
323n/a
324n/a def _do_ssl_handshake(self):
325n/a try:
326n/a self.socket.do_handshake()
327n/a except ssl.SSLError as err:
328n/a if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
329n/a ssl.SSL_ERROR_WANT_WRITE):
330n/a return
331n/a elif err.args[0] == ssl.SSL_ERROR_EOF:
332n/a return self.handle_close()
333n/a raise
334n/a except OSError as err:
335n/a if err.args[0] == errno.ECONNABORTED:
336n/a return self.handle_close()
337n/a else:
338n/a self._ssl_accepting = False
339n/a
340n/a def _do_ssl_shutdown(self):
341n/a self._ssl_closing = True
342n/a try:
343n/a self.socket = self.socket.unwrap()
344n/a except ssl.SSLError as err:
345n/a if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
346n/a ssl.SSL_ERROR_WANT_WRITE):
347n/a return
348n/a except OSError as err:
349n/a # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
350n/a # from OpenSSL's SSL_shutdown(), corresponding to a
351n/a # closed socket condition. See also:
352n/a # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
353n/a pass
354n/a self._ssl_closing = False
355n/a if getattr(self, '_ccc', False) is False:
356n/a super(SSLConnection, self).close()
357n/a else:
358n/a pass
359n/a
360n/a def handle_read_event(self):
361n/a if self._ssl_accepting:
362n/a self._do_ssl_handshake()
363n/a elif self._ssl_closing:
364n/a self._do_ssl_shutdown()
365n/a else:
366n/a super(SSLConnection, self).handle_read_event()
367n/a
368n/a def handle_write_event(self):
369n/a if self._ssl_accepting:
370n/a self._do_ssl_handshake()
371n/a elif self._ssl_closing:
372n/a self._do_ssl_shutdown()
373n/a else:
374n/a super(SSLConnection, self).handle_write_event()
375n/a
376n/a def send(self, data):
377n/a try:
378n/a return super(SSLConnection, self).send(data)
379n/a except ssl.SSLError as err:
380n/a if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
381n/a ssl.SSL_ERROR_WANT_READ,
382n/a ssl.SSL_ERROR_WANT_WRITE):
383n/a return 0
384n/a raise
385n/a
386n/a def recv(self, buffer_size):
387n/a try:
388n/a return super(SSLConnection, self).recv(buffer_size)
389n/a except ssl.SSLError as err:
390n/a if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
391n/a ssl.SSL_ERROR_WANT_WRITE):
392n/a return b''
393n/a if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
394n/a self.handle_close()
395n/a return b''
396n/a raise
397n/a
398n/a def handle_error(self):
399n/a raise Exception
400n/a
401n/a def close(self):
402n/a if (isinstance(self.socket, ssl.SSLSocket) and
403n/a self.socket._sslobj is not None):
404n/a self._do_ssl_shutdown()
405n/a else:
406n/a super(SSLConnection, self).close()
407n/a
408n/a
409n/a class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
410n/a """A DummyDTPHandler subclass supporting TLS/SSL."""
411n/a
412n/a def __init__(self, conn, baseclass):
413n/a DummyDTPHandler.__init__(self, conn, baseclass)
414n/a if self.baseclass.secure_data_channel:
415n/a self.secure_connection()
416n/a
417n/a
418n/a class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
419n/a """A DummyFTPHandler subclass supporting TLS/SSL."""
420n/a
421n/a dtp_handler = DummyTLS_DTPHandler
422n/a
423n/a def __init__(self, conn):
424n/a DummyFTPHandler.__init__(self, conn)
425n/a self.secure_data_channel = False
426n/a self._ccc = False
427n/a
428n/a def cmd_auth(self, line):
429n/a """Set up secure control channel."""
430n/a self.push('234 AUTH TLS successful')
431n/a self.secure_connection()
432n/a
433n/a def cmd_ccc(self, line):
434n/a self.push('220 Reverting back to clear-text')
435n/a self._ccc = True
436n/a self._do_ssl_shutdown()
437n/a
438n/a def cmd_pbsz(self, line):
439n/a """Negotiate size of buffer for secure data transfer.
440n/a For TLS/SSL the only valid value for the parameter is '0'.
441n/a Any other value is accepted but ignored.
442n/a """
443n/a self.push('200 PBSZ=0 successful.')
444n/a
445n/a def cmd_prot(self, line):
446n/a """Setup un/secure data channel."""
447n/a arg = line.upper()
448n/a if arg == 'C':
449n/a self.push('200 Protection set to Clear')
450n/a self.secure_data_channel = False
451n/a elif arg == 'P':
452n/a self.push('200 Protection set to Private')
453n/a self.secure_data_channel = True
454n/a else:
455n/a self.push("502 Unrecognized PROT type (use C or P).")
456n/a
457n/a
458n/a class DummyTLS_FTPServer(DummyFTPServer):
459n/a handler = DummyTLS_FTPHandler
460n/a
461n/a
462n/aclass TestFTPClass(TestCase):
463n/a
464n/a def setUp(self):
465n/a self.server = DummyFTPServer((HOST, 0))
466n/a self.server.start()
467n/a self.client = ftplib.FTP(timeout=TIMEOUT)
468n/a self.client.connect(self.server.host, self.server.port)
469n/a
470n/a def tearDown(self):
471n/a self.client.close()
472n/a self.server.stop()
473n/a
474n/a def check_data(self, received, expected):
475n/a self.assertEqual(len(received), len(expected))
476n/a self.assertEqual(received, expected)
477n/a
478n/a def test_getwelcome(self):
479n/a self.assertEqual(self.client.getwelcome(), '220 welcome')
480n/a
481n/a def test_sanitize(self):
482n/a self.assertEqual(self.client.sanitize('foo'), repr('foo'))
483n/a self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
484n/a self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
485n/a
486n/a def test_exceptions(self):
487n/a self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
488n/a self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
489n/a self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
490n/a self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
491n/a self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
492n/a
493n/a def test_all_errors(self):
494n/a exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
495n/a ftplib.error_proto, ftplib.Error, OSError, EOFError)
496n/a for x in exceptions:
497n/a try:
498n/a raise x('exception not included in all_errors set')
499n/a except ftplib.all_errors:
500n/a pass
501n/a
502n/a def test_set_pasv(self):
503n/a # passive mode is supposed to be enabled by default
504n/a self.assertTrue(self.client.passiveserver)
505n/a self.client.set_pasv(True)
506n/a self.assertTrue(self.client.passiveserver)
507n/a self.client.set_pasv(False)
508n/a self.assertFalse(self.client.passiveserver)
509n/a
510n/a def test_voidcmd(self):
511n/a self.client.voidcmd('echo 200')
512n/a self.client.voidcmd('echo 299')
513n/a self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
514n/a self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
515n/a
516n/a def test_login(self):
517n/a self.client.login()
518n/a
519n/a def test_acct(self):
520n/a self.client.acct('passwd')
521n/a
522n/a def test_rename(self):
523n/a self.client.rename('a', 'b')
524n/a self.server.handler_instance.next_response = '200'
525n/a self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
526n/a
527n/a def test_delete(self):
528n/a self.client.delete('foo')
529n/a self.server.handler_instance.next_response = '199'
530n/a self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
531n/a
532n/a def test_size(self):
533n/a self.client.size('foo')
534n/a
535n/a def test_mkd(self):
536n/a dir = self.client.mkd('/foo')
537n/a self.assertEqual(dir, '/foo')
538n/a
539n/a def test_rmd(self):
540n/a self.client.rmd('foo')
541n/a
542n/a def test_cwd(self):
543n/a dir = self.client.cwd('/foo')
544n/a self.assertEqual(dir, '250 cwd ok')
545n/a
546n/a def test_pwd(self):
547n/a dir = self.client.pwd()
548n/a self.assertEqual(dir, 'pwd ok')
549n/a
550n/a def test_quit(self):
551n/a self.assertEqual(self.client.quit(), '221 quit ok')
552n/a # Ensure the connection gets closed; sock attribute should be None
553n/a self.assertEqual(self.client.sock, None)
554n/a
555n/a def test_abort(self):
556n/a self.client.abort()
557n/a
558n/a def test_retrbinary(self):
559n/a def callback(data):
560n/a received.append(data.decode('ascii'))
561n/a received = []
562n/a self.client.retrbinary('retr', callback)
563n/a self.check_data(''.join(received), RETR_DATA)
564n/a
565n/a def test_retrbinary_rest(self):
566n/a def callback(data):
567n/a received.append(data.decode('ascii'))
568n/a for rest in (0, 10, 20):
569n/a received = []
570n/a self.client.retrbinary('retr', callback, rest=rest)
571n/a self.check_data(''.join(received), RETR_DATA[rest:])
572n/a
573n/a def test_retrlines(self):
574n/a received = []
575n/a self.client.retrlines('retr', received.append)
576n/a self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
577n/a
578n/a def test_storbinary(self):
579n/a f = io.BytesIO(RETR_DATA.encode('ascii'))
580n/a self.client.storbinary('stor', f)
581n/a self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
582n/a # test new callback arg
583n/a flag = []
584n/a f.seek(0)
585n/a self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
586n/a self.assertTrue(flag)
587n/a
588n/a def test_storbinary_rest(self):
589n/a f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
590n/a for r in (30, '30'):
591n/a f.seek(0)
592n/a self.client.storbinary('stor', f, rest=r)
593n/a self.assertEqual(self.server.handler_instance.rest, str(r))
594n/a
595n/a def test_storlines(self):
596n/a f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
597n/a self.client.storlines('stor', f)
598n/a self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
599n/a # test new callback arg
600n/a flag = []
601n/a f.seek(0)
602n/a self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
603n/a self.assertTrue(flag)
604n/a
605n/a f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
606n/a # storlines() expects a binary file, not a text file
607n/a with support.check_warnings(('', BytesWarning), quiet=True):
608n/a self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
609n/a
610n/a def test_nlst(self):
611n/a self.client.nlst()
612n/a self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
613n/a
614n/a def test_dir(self):
615n/a l = []
616n/a self.client.dir(lambda x: l.append(x))
617n/a self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
618n/a
619n/a def test_mlsd(self):
620n/a list(self.client.mlsd())
621n/a list(self.client.mlsd(path='/'))
622n/a list(self.client.mlsd(path='/', facts=['size', 'type']))
623n/a
624n/a ls = list(self.client.mlsd())
625n/a for name, facts in ls:
626n/a self.assertIsInstance(name, str)
627n/a self.assertIsInstance(facts, dict)
628n/a self.assertTrue(name)
629n/a self.assertIn('type', facts)
630n/a self.assertIn('perm', facts)
631n/a self.assertIn('unique', facts)
632n/a
633n/a def set_data(data):
634n/a self.server.handler_instance.next_data = data
635n/a
636n/a def test_entry(line, type=None, perm=None, unique=None, name=None):
637n/a type = 'type' if type is None else type
638n/a perm = 'perm' if perm is None else perm
639n/a unique = 'unique' if unique is None else unique
640n/a name = 'name' if name is None else name
641n/a set_data(line)
642n/a _name, facts = next(self.client.mlsd())
643n/a self.assertEqual(_name, name)
644n/a self.assertEqual(facts['type'], type)
645n/a self.assertEqual(facts['perm'], perm)
646n/a self.assertEqual(facts['unique'], unique)
647n/a
648n/a # plain
649n/a test_entry('type=type;perm=perm;unique=unique; name\r\n')
650n/a # "=" in fact value
651n/a test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
652n/a test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
653n/a test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
654n/a test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
655n/a # spaces in name
656n/a test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
657n/a test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
658n/a test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name")
659n/a test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e")
660n/a # ";" in name
661n/a test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
662n/a test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
663n/a test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
664n/a test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
665n/a # case sensitiveness
666n/a set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
667n/a _name, facts = next(self.client.mlsd())
668n/a for x in facts:
669n/a self.assertTrue(x.islower())
670n/a # no data (directory empty)
671n/a set_data('')
672n/a self.assertRaises(StopIteration, next, self.client.mlsd())
673n/a set_data('')
674n/a for x in self.client.mlsd():
675n/a self.fail("unexpected data %s" % x)
676n/a
677n/a def test_makeport(self):
678n/a with self.client.makeport():
679n/a # IPv4 is in use, just make sure send_eprt has not been used
680n/a self.assertEqual(self.server.handler_instance.last_received_cmd,
681n/a 'port')
682n/a
683n/a def test_makepasv(self):
684n/a host, port = self.client.makepasv()
685n/a conn = socket.create_connection((host, port), timeout=TIMEOUT)
686n/a conn.close()
687n/a # IPv4 is in use, just make sure send_epsv has not been used
688n/a self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
689n/a
690n/a def test_with_statement(self):
691n/a self.client.quit()
692n/a
693n/a def is_client_connected():
694n/a if self.client.sock is None:
695n/a return False
696n/a try:
697n/a self.client.sendcmd('noop')
698n/a except (OSError, EOFError):
699n/a return False
700n/a return True
701n/a
702n/a # base test
703n/a with ftplib.FTP(timeout=TIMEOUT) as self.client:
704n/a self.client.connect(self.server.host, self.server.port)
705n/a self.client.sendcmd('noop')
706n/a self.assertTrue(is_client_connected())
707n/a self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
708n/a self.assertFalse(is_client_connected())
709n/a
710n/a # QUIT sent inside the with block
711n/a with ftplib.FTP(timeout=TIMEOUT) as self.client:
712n/a self.client.connect(self.server.host, self.server.port)
713n/a self.client.sendcmd('noop')
714n/a self.client.quit()
715n/a self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
716n/a self.assertFalse(is_client_connected())
717n/a
718n/a # force a wrong response code to be sent on QUIT: error_perm
719n/a # is expected and the connection is supposed to be closed
720n/a try:
721n/a with ftplib.FTP(timeout=TIMEOUT) as self.client:
722n/a self.client.connect(self.server.host, self.server.port)
723n/a self.client.sendcmd('noop')
724n/a self.server.handler_instance.next_response = '550 error on quit'
725n/a except ftplib.error_perm as err:
726n/a self.assertEqual(str(err), '550 error on quit')
727n/a else:
728n/a self.fail('Exception not raised')
729n/a # needed to give the threaded server some time to set the attribute
730n/a # which otherwise would still be == 'noop'
731n/a time.sleep(0.1)
732n/a self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
733n/a self.assertFalse(is_client_connected())
734n/a
735n/a def test_source_address(self):
736n/a self.client.quit()
737n/a port = support.find_unused_port()
738n/a try:
739n/a self.client.connect(self.server.host, self.server.port,
740n/a source_address=(HOST, port))
741n/a self.assertEqual(self.client.sock.getsockname()[1], port)
742n/a self.client.quit()
743n/a except OSError as e:
744n/a if e.errno == errno.EADDRINUSE:
745n/a self.skipTest("couldn't bind to port %d" % port)
746n/a raise
747n/a
748n/a def test_source_address_passive_connection(self):
749n/a port = support.find_unused_port()
750n/a self.client.source_address = (HOST, port)
751n/a try:
752n/a with self.client.transfercmd('list') as sock:
753n/a self.assertEqual(sock.getsockname()[1], port)
754n/a except OSError as e:
755n/a if e.errno == errno.EADDRINUSE:
756n/a self.skipTest("couldn't bind to port %d" % port)
757n/a raise
758n/a
759n/a def test_parse257(self):
760n/a self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
761n/a self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
762n/a self.assertEqual(ftplib.parse257('257 ""'), '')
763n/a self.assertEqual(ftplib.parse257('257 "" created'), '')
764n/a self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
765n/a # The 257 response is supposed to include the directory
766n/a # name and in case it contains embedded double-quotes
767n/a # they must be doubled (see RFC-959, chapter 7, appendix 2).
768n/a self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
769n/a self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
770n/a
771n/a def test_line_too_long(self):
772n/a self.assertRaises(ftplib.Error, self.client.sendcmd,
773n/a 'x' * self.client.maxline * 2)
774n/a
775n/a def test_retrlines_too_long(self):
776n/a self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
777n/a received = []
778n/a self.assertRaises(ftplib.Error,
779n/a self.client.retrlines, 'retr', received.append)
780n/a
781n/a def test_storlines_too_long(self):
782n/a f = io.BytesIO(b'x' * self.client.maxline * 2)
783n/a self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
784n/a
785n/a
786n/a@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
787n/aclass TestIPv6Environment(TestCase):
788n/a
789n/a def setUp(self):
790n/a self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
791n/a self.server.start()
792n/a self.client = ftplib.FTP(timeout=TIMEOUT)
793n/a self.client.connect(self.server.host, self.server.port)
794n/a
795n/a def tearDown(self):
796n/a self.client.close()
797n/a self.server.stop()
798n/a
799n/a def test_af(self):
800n/a self.assertEqual(self.client.af, socket.AF_INET6)
801n/a
802n/a def test_makeport(self):
803n/a with self.client.makeport():
804n/a self.assertEqual(self.server.handler_instance.last_received_cmd,
805n/a 'eprt')
806n/a
807n/a def test_makepasv(self):
808n/a host, port = self.client.makepasv()
809n/a conn = socket.create_connection((host, port), timeout=TIMEOUT)
810n/a conn.close()
811n/a self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
812n/a
813n/a def test_transfer(self):
814n/a def retr():
815n/a def callback(data):
816n/a received.append(data.decode('ascii'))
817n/a received = []
818n/a self.client.retrbinary('retr', callback)
819n/a self.assertEqual(len(''.join(received)), len(RETR_DATA))
820n/a self.assertEqual(''.join(received), RETR_DATA)
821n/a self.client.set_pasv(True)
822n/a retr()
823n/a self.client.set_pasv(False)
824n/a retr()
825n/a
826n/a
827n/a@skipUnless(ssl, "SSL not available")
828n/aclass TestTLS_FTPClassMixin(TestFTPClass):
829n/a """Repeat TestFTPClass tests starting the TLS layer for both control
830n/a and data connections first.
831n/a """
832n/a
833n/a def setUp(self):
834n/a self.server = DummyTLS_FTPServer((HOST, 0))
835n/a self.server.start()
836n/a self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
837n/a self.client.connect(self.server.host, self.server.port)
838n/a # enable TLS
839n/a self.client.auth()
840n/a self.client.prot_p()
841n/a
842n/a
843n/a@skipUnless(ssl, "SSL not available")
844n/aclass TestTLS_FTPClass(TestCase):
845n/a """Specific TLS_FTP class tests."""
846n/a
847n/a def setUp(self):
848n/a self.server = DummyTLS_FTPServer((HOST, 0))
849n/a self.server.start()
850n/a self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
851n/a self.client.connect(self.server.host, self.server.port)
852n/a
853n/a def tearDown(self):
854n/a self.client.close()
855n/a self.server.stop()
856n/a
857n/a def test_control_connection(self):
858n/a self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
859n/a self.client.auth()
860n/a self.assertIsInstance(self.client.sock, ssl.SSLSocket)
861n/a
862n/a def test_data_connection(self):
863n/a # clear text
864n/a with self.client.transfercmd('list') as sock:
865n/a self.assertNotIsInstance(sock, ssl.SSLSocket)
866n/a self.assertEqual(self.client.voidresp(), "226 transfer complete")
867n/a
868n/a # secured, after PROT P
869n/a self.client.prot_p()
870n/a with self.client.transfercmd('list') as sock:
871n/a self.assertIsInstance(sock, ssl.SSLSocket)
872n/a self.assertEqual(self.client.voidresp(), "226 transfer complete")
873n/a
874n/a # PROT C is issued, the connection must be in cleartext again
875n/a self.client.prot_c()
876n/a with self.client.transfercmd('list') as sock:
877n/a self.assertNotIsInstance(sock, ssl.SSLSocket)
878n/a self.assertEqual(self.client.voidresp(), "226 transfer complete")
879n/a
880n/a def test_login(self):
881n/a # login() is supposed to implicitly secure the control connection
882n/a self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
883n/a self.client.login()
884n/a self.assertIsInstance(self.client.sock, ssl.SSLSocket)
885n/a # make sure that AUTH TLS doesn't get issued again
886n/a self.client.login()
887n/a
888n/a def test_auth_issued_twice(self):
889n/a self.client.auth()
890n/a self.assertRaises(ValueError, self.client.auth)
891n/a
892n/a def test_auth_ssl(self):
893n/a try:
894n/a self.client.ssl_version = ssl.PROTOCOL_SSLv23
895n/a self.client.auth()
896n/a self.assertRaises(ValueError, self.client.auth)
897n/a finally:
898n/a self.client.ssl_version = ssl.PROTOCOL_TLSv1
899n/a
900n/a def test_context(self):
901n/a self.client.quit()
902n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
903n/a self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
904n/a context=ctx)
905n/a self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
906n/a context=ctx)
907n/a self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
908n/a keyfile=CERTFILE, context=ctx)
909n/a
910n/a self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
911n/a self.client.connect(self.server.host, self.server.port)
912n/a self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
913n/a self.client.auth()
914n/a self.assertIs(self.client.sock.context, ctx)
915n/a self.assertIsInstance(self.client.sock, ssl.SSLSocket)
916n/a
917n/a self.client.prot_p()
918n/a with self.client.transfercmd('list') as sock:
919n/a self.assertIs(sock.context, ctx)
920n/a self.assertIsInstance(sock, ssl.SSLSocket)
921n/a
922n/a def test_ccc(self):
923n/a self.assertRaises(ValueError, self.client.ccc)
924n/a self.client.login(secure=True)
925n/a self.assertIsInstance(self.client.sock, ssl.SSLSocket)
926n/a self.client.ccc()
927n/a self.assertRaises(ValueError, self.client.sock.unwrap)
928n/a
929n/a def test_check_hostname(self):
930n/a self.client.quit()
931n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
932n/a ctx.verify_mode = ssl.CERT_REQUIRED
933n/a ctx.check_hostname = True
934n/a ctx.load_verify_locations(CAFILE)
935n/a self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
936n/a
937n/a # 127.0.0.1 doesn't match SAN
938n/a self.client.connect(self.server.host, self.server.port)
939n/a with self.assertRaises(ssl.CertificateError):
940n/a self.client.auth()
941n/a # exception quits connection
942n/a
943n/a self.client.connect(self.server.host, self.server.port)
944n/a self.client.prot_p()
945n/a with self.assertRaises(ssl.CertificateError):
946n/a with self.client.transfercmd("list") as sock:
947n/a pass
948n/a self.client.quit()
949n/a
950n/a self.client.connect("localhost", self.server.port)
951n/a self.client.auth()
952n/a self.client.quit()
953n/a
954n/a self.client.connect("localhost", self.server.port)
955n/a self.client.prot_p()
956n/a with self.client.transfercmd("list") as sock:
957n/a pass
958n/a
959n/a
960n/aclass TestTimeouts(TestCase):
961n/a
962n/a def setUp(self):
963n/a self.evt = threading.Event()
964n/a self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
965n/a self.sock.settimeout(20)
966n/a self.port = support.bind_port(self.sock)
967n/a self.server_thread = threading.Thread(target=self.server)
968n/a self.server_thread.start()
969n/a # Wait for the server to be ready.
970n/a self.evt.wait()
971n/a self.evt.clear()
972n/a self.old_port = ftplib.FTP.port
973n/a ftplib.FTP.port = self.port
974n/a
975n/a def tearDown(self):
976n/a ftplib.FTP.port = self.old_port
977n/a self.server_thread.join()
978n/a
979n/a def server(self):
980n/a # This method sets the evt 3 times:
981n/a # 1) when the connection is ready to be accepted.
982n/a # 2) when it is safe for the caller to close the connection
983n/a # 3) when we have closed the socket
984n/a self.sock.listen()
985n/a # (1) Signal the caller that we are ready to accept the connection.
986n/a self.evt.set()
987n/a try:
988n/a conn, addr = self.sock.accept()
989n/a except socket.timeout:
990n/a pass
991n/a else:
992n/a conn.sendall(b"1 Hola mundo\n")
993n/a conn.shutdown(socket.SHUT_WR)
994n/a # (2) Signal the caller that it is safe to close the socket.
995n/a self.evt.set()
996n/a conn.close()
997n/a finally:
998n/a self.sock.close()
999n/a
1000n/a def testTimeoutDefault(self):
1001n/a # default -- use global socket timeout
1002n/a self.assertIsNone(socket.getdefaulttimeout())
1003n/a socket.setdefaulttimeout(30)
1004n/a try:
1005n/a ftp = ftplib.FTP(HOST)
1006n/a finally:
1007n/a socket.setdefaulttimeout(None)
1008n/a self.assertEqual(ftp.sock.gettimeout(), 30)
1009n/a self.evt.wait()
1010n/a ftp.close()
1011n/a
1012n/a def testTimeoutNone(self):
1013n/a # no timeout -- do not use global socket timeout
1014n/a self.assertIsNone(socket.getdefaulttimeout())
1015n/a socket.setdefaulttimeout(30)
1016n/a try:
1017n/a ftp = ftplib.FTP(HOST, timeout=None)
1018n/a finally:
1019n/a socket.setdefaulttimeout(None)
1020n/a self.assertIsNone(ftp.sock.gettimeout())
1021n/a self.evt.wait()
1022n/a ftp.close()
1023n/a
1024n/a def testTimeoutValue(self):
1025n/a # a value
1026n/a ftp = ftplib.FTP(HOST, timeout=30)
1027n/a self.assertEqual(ftp.sock.gettimeout(), 30)
1028n/a self.evt.wait()
1029n/a ftp.close()
1030n/a
1031n/a def testTimeoutConnect(self):
1032n/a ftp = ftplib.FTP()
1033n/a ftp.connect(HOST, timeout=30)
1034n/a self.assertEqual(ftp.sock.gettimeout(), 30)
1035n/a self.evt.wait()
1036n/a ftp.close()
1037n/a
1038n/a def testTimeoutDifferentOrder(self):
1039n/a ftp = ftplib.FTP(timeout=30)
1040n/a ftp.connect(HOST)
1041n/a self.assertEqual(ftp.sock.gettimeout(), 30)
1042n/a self.evt.wait()
1043n/a ftp.close()
1044n/a
1045n/a def testTimeoutDirectAccess(self):
1046n/a ftp = ftplib.FTP()
1047n/a ftp.timeout = 30
1048n/a ftp.connect(HOST)
1049n/a self.assertEqual(ftp.sock.gettimeout(), 30)
1050n/a self.evt.wait()
1051n/a ftp.close()
1052n/a
1053n/a
1054n/aclass MiscTestCase(TestCase):
1055n/a def test__all__(self):
1056n/a blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
1057n/a 'Error', 'parse150', 'parse227', 'parse229', 'parse257',
1058n/a 'print_line', 'ftpcp', 'test'}
1059n/a support.check__all__(self, ftplib, blacklist=blacklist)
1060n/a
1061n/a
1062n/adef test_main():
1063n/a tests = [TestFTPClass, TestTimeouts,
1064n/a TestIPv6Environment,
1065n/a TestTLS_FTPClassMixin, TestTLS_FTPClass,
1066n/a MiscTestCase]
1067n/a
1068n/a thread_info = support.threading_setup()
1069n/a try:
1070n/a support.run_unittest(*tests)
1071n/a finally:
1072n/a support.threading_cleanup(*thread_info)
1073n/a
1074n/a
1075n/aif __name__ == '__main__':
1076n/a test_main()