»Core Development>Code coverage>Lib/test/test_ssl.py

Python code coverage for Lib/test/test_ssl.py

1n/a# Test the support for SSL and sockets
3n/aimport sys
4n/aimport unittest
5n/afrom test import support
6n/aimport socket
7n/aimport select
8n/aimport time
9n/aimport datetime
10n/aimport gc
11n/aimport os
12n/aimport errno
13n/aimport pprint
14n/aimport urllib.request
15n/aimport traceback
16n/aimport asyncore
17n/aimport weakref
18n/aimport platform
19n/aimport functools
21n/assl = support.import_module("ssl")
24n/a import threading
25n/aexcept ImportError:
26n/a _have_threads = False
28n/a _have_threads = True
30n/aPROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
31n/aHOST = support.HOST
32n/aIS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33n/aIS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
36n/adef data_file(*name):
37n/a return os.path.join(os.path.dirname(__file__), *name)
39n/a# The custom key and certificate files used in test_ssl are generated
40n/a# using Lib/test/make_ssl_certs.py.
41n/a# Other certificates are simply fetched from the Internet servers they
42n/a# are meant to authenticate.
44n/aCERTFILE = data_file("keycert.pem")
45n/aBYTES_CERTFILE = os.fsencode(CERTFILE)
46n/aONLYCERT = data_file("ssl_cert.pem")
47n/aONLYKEY = data_file("ssl_key.pem")
48n/aBYTES_ONLYCERT = os.fsencode(ONLYCERT)
49n/aBYTES_ONLYKEY = os.fsencode(ONLYKEY)
50n/aCERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51n/aONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52n/aKEY_PASSWORD = "somepass"
53n/aCAPATH = data_file("capath")
54n/aBYTES_CAPATH = os.fsencode(CAPATH)
55n/aCAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56n/aCAFILE_CACERT = data_file("capath", "5ed36f99.0")
59n/a# empty CRL
60n/aCRLFILE = data_file("revocation.crl")
62n/a# Two keys and certs signed by the same CA (for SNI tests)
63n/aSIGNED_CERTFILE = data_file("keycert3.pem")
64n/aSIGNED_CERTFILE2 = data_file("keycert4.pem")
65n/a# Same certificate as pycacert.pem, but without extra text in file
66n/aSIGNING_CA = data_file("capath", "ceff1710.0")
67n/a# cert with all kinds of subject alt names
68n/aALLSANFILE = data_file("allsans.pem")
70n/aREMOTE_HOST = "self-signed.pythontest.net"
72n/aEMPTYCERT = data_file("nullcert.pem")
73n/aBADCERT = data_file("badcert.pem")
74n/aNONEXISTINGCERT = data_file("XXXnonexisting.pem")
75n/aBADKEY = data_file("badkey.pem")
76n/aNOKIACERT = data_file("nokia.pem")
77n/aNULLBYTECERT = data_file("nullbytecert.pem")
79n/aDHFILE = data_file("dh1024.pem")
80n/aBYTES_DHFILE = os.fsencode(DHFILE)
82n/a# Not defined in all versions of OpenSSL
83n/aOP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
84n/aOP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
85n/aOP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
89n/adef handle_error(prefix):
90n/a exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
91n/a if support.verbose:
92n/a sys.stdout.write(prefix + exc_format)
94n/adef can_clear_options():
95n/a # 0.9.8m or higher
96n/a return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
98n/adef no_sslv2_implies_sslv3_hello():
99n/a # 0.9.7h or higher
100n/a return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
102n/adef have_verify_flags():
103n/a # 0.9.8 or higher
104n/a return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
106n/adef utc_offset(): #NOTE: ignore issues like #1647654
107n/a # local time = utc time + utc offset
108n/a if time.daylight and time.localtime().tm_isdst > 0:
109n/a return -time.altzone # seconds
110n/a return -time.timezone
112n/adef asn1time(cert_time):
113n/a # Some versions of OpenSSL ignore seconds, see #18207
114n/a # 0.9.8.i
115n/a if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
116n/a fmt = "%b %d %H:%M:%S %Y GMT"
117n/a dt = datetime.datetime.strptime(cert_time, fmt)
118n/a dt = dt.replace(second=0)
119n/a cert_time = dt.strftime(fmt)
120n/a # %d adds leading zero but ASN1_TIME_print() uses leading space
121n/a if cert_time[4] == "0":
122n/a cert_time = cert_time[:4] + " " + cert_time[5:]
124n/a return cert_time
126n/a# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
127n/adef skip_if_broken_ubuntu_ssl(func):
128n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
129n/a @functools.wraps(func)
130n/a def f(*args, **kwargs):
131n/a try:
132n/a ssl.SSLContext(ssl.PROTOCOL_SSLv2)
133n/a except ssl.SSLError:
134n/a if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
135n/a platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
136n/a raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
137n/a return func(*args, **kwargs)
138n/a return f
139n/a else:
140n/a return func
142n/aneeds_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
145n/adef test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
146n/a cert_reqs=ssl.CERT_NONE, ca_certs=None,
147n/a ciphers=None, certfile=None, keyfile=None,
148n/a **kwargs):
149n/a context = ssl.SSLContext(ssl_version)
150n/a if cert_reqs is not None:
151n/a context.verify_mode = cert_reqs
152n/a if ca_certs is not None:
153n/a context.load_verify_locations(ca_certs)
154n/a if certfile is not None or keyfile is not None:
155n/a context.load_cert_chain(certfile, keyfile)
156n/a if ciphers is not None:
157n/a context.set_ciphers(ciphers)
158n/a return context.wrap_socket(sock, **kwargs)
160n/aclass BasicSocketTests(unittest.TestCase):
162n/a def test_constants(self):
163n/a ssl.CERT_NONE
164n/a ssl.CERT_OPTIONAL
165n/a ssl.CERT_REQUIRED
167n/a ssl.OP_SINGLE_DH_USE
168n/a if ssl.HAS_ECDH:
170n/a if ssl.OPENSSL_VERSION_INFO >= (1, 0):
172n/a self.assertIn(ssl.HAS_SNI, {True, False})
173n/a self.assertIn(ssl.HAS_ECDH, {True, False})
175n/a def test_str_for_enums(self):
176n/a # Make sure that the PROTOCOL_* constants have enum-like string
177n/a # reprs.
178n/a proto = ssl.PROTOCOL_TLS
179n/a self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
180n/a ctx = ssl.SSLContext(proto)
181n/a self.assertIs(ctx.protocol, proto)
183n/a def test_random(self):
184n/a v = ssl.RAND_status()
185n/a if support.verbose:
186n/a sys.stdout.write("\n RAND_status is %d (%s)\n"
187n/a % (v, (v and "sufficient randomness") or
188n/a "insufficient randomness"))
190n/a data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
191n/a self.assertEqual(len(data), 16)
192n/a self.assertEqual(is_cryptographic, v == 1)
193n/a if v:
194n/a data = ssl.RAND_bytes(16)
195n/a self.assertEqual(len(data), 16)
196n/a else:
197n/a self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
199n/a # negative num is invalid
200n/a self.assertRaises(ValueError, ssl.RAND_bytes, -5)
201n/a self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
203n/a if hasattr(ssl, 'RAND_egd'):
204n/a self.assertRaises(TypeError, ssl.RAND_egd, 1)
205n/a self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
206n/a ssl.RAND_add("this is a random string", 75.0)
207n/a ssl.RAND_add(b"this is a random bytes object", 75.0)
208n/a ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
210n/a @unittest.skipUnless(os.name == 'posix', 'requires posix')
211n/a def test_random_fork(self):
212n/a status = ssl.RAND_status()
213n/a if not status:
214n/a self.fail("OpenSSL's PRNG has insufficient randomness")
216n/a rfd, wfd = os.pipe()
217n/a pid = os.fork()
218n/a if pid == 0:
219n/a try:
220n/a os.close(rfd)
221n/a child_random = ssl.RAND_pseudo_bytes(16)[0]
222n/a self.assertEqual(len(child_random), 16)
223n/a os.write(wfd, child_random)
224n/a os.close(wfd)
225n/a except BaseException:
226n/a os._exit(1)
227n/a else:
228n/a os._exit(0)
229n/a else:
230n/a os.close(wfd)
231n/a self.addCleanup(os.close, rfd)
232n/a _, status = os.waitpid(pid, 0)
233n/a self.assertEqual(status, 0)
235n/a child_random = os.read(rfd, 16)
236n/a self.assertEqual(len(child_random), 16)
237n/a parent_random = ssl.RAND_pseudo_bytes(16)[0]
238n/a self.assertEqual(len(parent_random), 16)
240n/a self.assertNotEqual(child_random, parent_random)
242n/a def test_parse_cert(self):
243n/a # note that this uses an 'unofficial' function in _ssl.c,
244n/a # provided solely for this test, to exercise the certificate
245n/a # parsing code
246n/a p = ssl._ssl._test_decode_cert(CERTFILE)
247n/a if support.verbose:
248n/a sys.stdout.write("\n" + pprint.pformat(p) + "\n")
249n/a self.assertEqual(p['issuer'],
250n/a ((('countryName', 'XY'),),
251n/a (('localityName', 'Castle Anthrax'),),
252n/a (('organizationName', 'Python Software Foundation'),),
253n/a (('commonName', 'localhost'),))
254n/a )
255n/a # Note the next three asserts will fail if the keys are regenerated
256n/a self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
257n/a self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
258n/a self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
259n/a self.assertEqual(p['subject'],
260n/a ((('countryName', 'XY'),),
261n/a (('localityName', 'Castle Anthrax'),),
262n/a (('organizationName', 'Python Software Foundation'),),
263n/a (('commonName', 'localhost'),))
264n/a )
265n/a self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
266n/a # Issue #13034: the subjectAltName in some certificates
267n/a # (notably projects.developer.nokia.com:443) wasn't parsed
268n/a p = ssl._ssl._test_decode_cert(NOKIACERT)
269n/a if support.verbose:
270n/a sys.stdout.write("\n" + pprint.pformat(p) + "\n")
271n/a self.assertEqual(p['subjectAltName'],
272n/a (('DNS', 'projects.developer.nokia.com'),
273n/a ('DNS', 'projects.forum.nokia.com'))
274n/a )
275n/a # extra OCSP and AIA fields
276n/a self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
277n/a self.assertEqual(p['caIssuers'],
278n/a ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
279n/a self.assertEqual(p['crlDistributionPoints'],
280n/a ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
282n/a def test_parse_cert_CVE_2013_4238(self):
283n/a p = ssl._ssl._test_decode_cert(NULLBYTECERT)
284n/a if support.verbose:
285n/a sys.stdout.write("\n" + pprint.pformat(p) + "\n")
286n/a subject = ((('countryName', 'US'),),
287n/a (('stateOrProvinceName', 'Oregon'),),
288n/a (('localityName', 'Beaverton'),),
289n/a (('organizationName', 'Python Software Foundation'),),
290n/a (('organizationalUnitName', 'Python Core Development'),),
291n/a (('commonName', 'null.python.org\x00example.org'),),
292n/a (('emailAddress', 'python-dev@python.org'),))
293n/a self.assertEqual(p['subject'], subject)
294n/a self.assertEqual(p['issuer'], subject)
295n/a if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
296n/a san = (('DNS', 'altnull.python.org\x00example.com'),
297n/a ('email', 'null@python.org\x00user@example.org'),
298n/a ('URI', 'http://null.python.org\x00http://example.org'),
299n/a ('IP Address', ''),
300n/a ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
301n/a else:
302n/a # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
303n/a san = (('DNS', 'altnull.python.org\x00example.com'),
304n/a ('email', 'null@python.org\x00user@example.org'),
305n/a ('URI', 'http://null.python.org\x00http://example.org'),
306n/a ('IP Address', ''),
307n/a ('IP Address', '<invalid>'))
309n/a self.assertEqual(p['subjectAltName'], san)
311n/a def test_parse_all_sans(self):
312n/a p = ssl._ssl._test_decode_cert(ALLSANFILE)
313n/a self.assertEqual(p['subjectAltName'],
314n/a (
315n/a ('DNS', 'allsans'),
316n/a ('othername', '<unsupported>'),
317n/a ('othername', '<unsupported>'),
318n/a ('email', 'user@example.org'),
319n/a ('DNS', 'www.example.org'),
320n/a ('DirName',
321n/a ((('countryName', 'XY'),),
322n/a (('localityName', 'Castle Anthrax'),),
323n/a (('organizationName', 'Python Software Foundation'),),
324n/a (('commonName', 'dirname example'),))),
325n/a ('URI', 'https://www.python.org/'),
326n/a ('IP Address', ''),
327n/a ('IP Address', '0:0:0:0:0:0:0:1\n'),
328n/a ('Registered ID', '')
329n/a )
330n/a )
332n/a def test_DER_to_PEM(self):
333n/a with open(CAFILE_CACERT, 'r') as f:
334n/a pem = f.read()
335n/a d1 = ssl.PEM_cert_to_DER_cert(pem)
336n/a p2 = ssl.DER_cert_to_PEM_cert(d1)
337n/a d2 = ssl.PEM_cert_to_DER_cert(p2)
338n/a self.assertEqual(d1, d2)
339n/a if not p2.startswith(ssl.PEM_HEADER + '\n'):
340n/a self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
341n/a if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
342n/a self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
344n/a def test_openssl_version(self):
347n/a s = ssl.OPENSSL_VERSION
348n/a self.assertIsInstance(n, int)
349n/a self.assertIsInstance(t, tuple)
350n/a self.assertIsInstance(s, str)
351n/a # Some sanity checks follow
352n/a # >= 0.9
353n/a self.assertGreaterEqual(n, 0x900000)
354n/a # < 3.0
355n/a self.assertLess(n, 0x30000000)
356n/a major, minor, fix, patch, status = t
357n/a self.assertGreaterEqual(major, 0)
358n/a self.assertLess(major, 3)
359n/a self.assertGreaterEqual(minor, 0)
360n/a self.assertLess(minor, 256)
361n/a self.assertGreaterEqual(fix, 0)
362n/a self.assertLess(fix, 256)
363n/a self.assertGreaterEqual(patch, 0)
364n/a self.assertLessEqual(patch, 63)
365n/a self.assertGreaterEqual(status, 0)
366n/a self.assertLessEqual(status, 15)
367n/a # Version string as returned by {Open,Libre}SSL, the format might change
368n/a if IS_LIBRESSL:
369n/a self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
370n/a (s, t, hex(n)))
371n/a else:
372n/a self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
373n/a (s, t, hex(n)))
375n/a @support.cpython_only
376n/a def test_refcycle(self):
377n/a # Issue #7943: an SSL object doesn't create reference cycles with
378n/a # itself.
379n/a s = socket.socket(socket.AF_INET)
380n/a ss = test_wrap_socket(s)
381n/a wr = weakref.ref(ss)
382n/a with support.check_warnings(("", ResourceWarning)):
383n/a del ss
384n/a self.assertEqual(wr(), None)
386n/a def test_wrapped_unconnected(self):
387n/a # Methods on an unconnected SSLSocket propagate the original
388n/a # OSError raise by the underlying socket object.
389n/a s = socket.socket(socket.AF_INET)
390n/a with test_wrap_socket(s) as ss:
391n/a self.assertRaises(OSError, ss.recv, 1)
392n/a self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
393n/a self.assertRaises(OSError, ss.recvfrom, 1)
394n/a self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
395n/a self.assertRaises(OSError, ss.send, b'x')
396n/a self.assertRaises(OSError, ss.sendto, b'x', ('', 0))
398n/a def test_timeout(self):
399n/a # Issue #8524: when creating an SSL socket, the timeout of the
400n/a # original socket should be retained.
401n/a for timeout in (None, 0.0, 5.0):
402n/a s = socket.socket(socket.AF_INET)
403n/a s.settimeout(timeout)
404n/a with test_wrap_socket(s) as ss:
405n/a self.assertEqual(timeout, ss.gettimeout())
407n/a def test_errors_sslwrap(self):
408n/a sock = socket.socket()
409n/a self.assertRaisesRegex(ValueError,
410n/a "certfile must be specified",
411n/a ssl.wrap_socket, sock, keyfile=CERTFILE)
412n/a self.assertRaisesRegex(ValueError,
413n/a "certfile must be specified for server-side operations",
414n/a ssl.wrap_socket, sock, server_side=True)
415n/a self.assertRaisesRegex(ValueError,
416n/a "certfile must be specified for server-side operations",
417n/a ssl.wrap_socket, sock, server_side=True, certfile="")
418n/a with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
419n/a self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
420n/a s.connect, (HOST, 8080))
421n/a with self.assertRaises(OSError) as cm:
422n/a with socket.socket() as sock:
423n/a ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
424n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
425n/a with self.assertRaises(OSError) as cm:
426n/a with socket.socket() as sock:
427n/a ssl.wrap_socket(sock,
428n/a certfile=CERTFILE, keyfile=NONEXISTINGCERT)
429n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
430n/a with self.assertRaises(OSError) as cm:
431n/a with socket.socket() as sock:
432n/a ssl.wrap_socket(sock,
434n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
436n/a def bad_cert_test(self, certfile):
437n/a """Check that trying to use the given client certificate fails"""
438n/a certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
439n/a certfile)
440n/a sock = socket.socket()
441n/a self.addCleanup(sock.close)
442n/a with self.assertRaises(ssl.SSLError):
443n/a test_wrap_socket(sock,
444n/a certfile=certfile,
445n/a ssl_version=ssl.PROTOCOL_TLSv1)
447n/a def test_empty_cert(self):
448n/a """Wrapping with an empty cert file"""
449n/a self.bad_cert_test("nullcert.pem")
451n/a def test_malformed_cert(self):
452n/a """Wrapping with a badly formatted certificate (syntax error)"""
453n/a self.bad_cert_test("badcert.pem")
455n/a def test_malformed_key(self):
456n/a """Wrapping with a badly formatted key (syntax error)"""
457n/a self.bad_cert_test("badkey.pem")
459n/a def test_match_hostname(self):
460n/a def ok(cert, hostname):
461n/a ssl.match_hostname(cert, hostname)
462n/a def fail(cert, hostname):
463n/a self.assertRaises(ssl.CertificateError,
464n/a ssl.match_hostname, cert, hostname)
466n/a # -- Hostname matching --
468n/a cert = {'subject': ((('commonName', 'example.com'),),)}
469n/a ok(cert, 'example.com')
470n/a ok(cert, 'ExAmple.cOm')
471n/a fail(cert, 'www.example.com')
472n/a fail(cert, '.example.com')
473n/a fail(cert, 'example.org')
474n/a fail(cert, 'exampleXcom')
476n/a cert = {'subject': ((('commonName', '*.a.com'),),)}
477n/a ok(cert, 'foo.a.com')
478n/a fail(cert, 'bar.foo.a.com')
479n/a fail(cert, 'a.com')
480n/a fail(cert, 'Xa.com')
481n/a fail(cert, '.a.com')
483n/a # only match one left-most wildcard
484n/a cert = {'subject': ((('commonName', 'f*.com'),),)}
485n/a ok(cert, 'foo.com')
486n/a ok(cert, 'f.com')
487n/a fail(cert, 'bar.com')
488n/a fail(cert, 'foo.a.com')
489n/a fail(cert, 'bar.foo.com')
491n/a # NULL bytes are bad, CVE-2013-4073
492n/a cert = {'subject': ((('commonName',
493n/a 'null.python.org\x00example.org'),),)}
494n/a ok(cert, 'null.python.org\x00example.org') # or raise an error?
495n/a fail(cert, 'example.org')
496n/a fail(cert, 'null.python.org')
498n/a # error cases with wildcards
499n/a cert = {'subject': ((('commonName', '*.*.a.com'),),)}
500n/a fail(cert, 'bar.foo.a.com')
501n/a fail(cert, 'a.com')
502n/a fail(cert, 'Xa.com')
503n/a fail(cert, '.a.com')
505n/a cert = {'subject': ((('commonName', 'a.*.com'),),)}
506n/a fail(cert, 'a.foo.com')
507n/a fail(cert, 'a..com')
508n/a fail(cert, 'a.com')
510n/a # wildcard doesn't match IDNA prefix 'xn--'
511n/a idna = 'püthon.python.org'.encode("idna").decode("ascii")
512n/a cert = {'subject': ((('commonName', idna),),)}
513n/a ok(cert, idna)
514n/a cert = {'subject': ((('commonName', 'x*.python.org'),),)}
515n/a fail(cert, idna)
516n/a cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
517n/a fail(cert, idna)
519n/a # wildcard in first fragment and IDNA A-labels in sequent fragments
520n/a # are supported.
521n/a idna = 'www*.pythön.org'.encode("idna").decode("ascii")
522n/a cert = {'subject': ((('commonName', idna),),)}
523n/a ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
524n/a ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
525n/a fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
526n/a fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
528n/a # Slightly fake real-world example
529n/a cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
530n/a 'subject': ((('commonName', 'linuxfrz.org'),),),
531n/a 'subjectAltName': (('DNS', 'linuxfr.org'),
532n/a ('DNS', 'linuxfr.com'),
533n/a ('othername', '<unsupported>'))}
534n/a ok(cert, 'linuxfr.org')
535n/a ok(cert, 'linuxfr.com')
536n/a # Not a "DNS" entry
537n/a fail(cert, '<unsupported>')
538n/a # When there is a subjectAltName, commonName isn't used
539n/a fail(cert, 'linuxfrz.org')
541n/a # A pristine real-world example
542n/a cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
543n/a 'subject': ((('countryName', 'US'),),
544n/a (('stateOrProvinceName', 'California'),),
545n/a (('localityName', 'Mountain View'),),
546n/a (('organizationName', 'Google Inc'),),
547n/a (('commonName', 'mail.google.com'),))}
548n/a ok(cert, 'mail.google.com')
549n/a fail(cert, 'gmail.com')
550n/a # Only commonName is considered
551n/a fail(cert, 'California')
553n/a # -- IPv4 matching --
554n/a cert = {'subject': ((('commonName', 'example.com'),),),
555n/a 'subjectAltName': (('DNS', 'example.com'),
556n/a ('IP Address', ''),
557n/a ('IP Address', ''))}
558n/a ok(cert, '')
559n/a ok(cert, '')
560n/a fail(cert, '')
561n/a fail(cert, 'example.net')
563n/a # -- IPv6 matching --
564n/a cert = {'subject': ((('commonName', 'example.com'),),),
565n/a 'subjectAltName': (('DNS', 'example.com'),
566n/a ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
567n/a ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
568n/a ok(cert, '2001::cafe')
569n/a ok(cert, '2003::baba')
570n/a fail(cert, '2003::bebe')
571n/a fail(cert, 'example.net')
573n/a # -- Miscellaneous --
575n/a # Neither commonName nor subjectAltName
576n/a cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
577n/a 'subject': ((('countryName', 'US'),),
578n/a (('stateOrProvinceName', 'California'),),
579n/a (('localityName', 'Mountain View'),),
580n/a (('organizationName', 'Google Inc'),))}
581n/a fail(cert, 'mail.google.com')
583n/a # No DNS entry in subjectAltName but a commonName
584n/a cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
585n/a 'subject': ((('countryName', 'US'),),
586n/a (('stateOrProvinceName', 'California'),),
587n/a (('localityName', 'Mountain View'),),
588n/a (('commonName', 'mail.google.com'),)),
589n/a 'subjectAltName': (('othername', 'blabla'), )}
590n/a ok(cert, 'mail.google.com')
592n/a # No DNS entry subjectAltName and no commonName
593n/a cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
594n/a 'subject': ((('countryName', 'US'),),
595n/a (('stateOrProvinceName', 'California'),),
596n/a (('localityName', 'Mountain View'),),
597n/a (('organizationName', 'Google Inc'),)),
598n/a 'subjectAltName': (('othername', 'blabla'),)}
599n/a fail(cert, 'google.com')
601n/a # Empty cert / no cert
602n/a self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
603n/a self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
605n/a # Issue #17980: avoid denials of service by refusing more than one
606n/a # wildcard per fragment.
607n/a cert = {'subject': ((('commonName', 'a*b.com'),),)}
608n/a ok(cert, 'axxb.com')
609n/a cert = {'subject': ((('commonName', 'a*b.co*'),),)}
610n/a fail(cert, 'axxb.com')
611n/a cert = {'subject': ((('commonName', 'a*b*.com'),),)}
612n/a with self.assertRaises(ssl.CertificateError) as cm:
613n/a ssl.match_hostname(cert, 'axxbxxc.com')
614n/a self.assertIn("too many wildcards", str(cm.exception))
616n/a def test_server_side(self):
617n/a # server_hostname doesn't work for server sockets
618n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
619n/a with socket.socket() as sock:
620n/a self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
621n/a server_hostname="some.hostname")
623n/a def test_unknown_channel_binding(self):
624n/a # should raise ValueError for unknown type
625n/a s = socket.socket(socket.AF_INET)
626n/a s.bind(('', 0))
627n/a s.listen()
628n/a c = socket.socket(socket.AF_INET)
629n/a c.connect(s.getsockname())
630n/a with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
631n/a with self.assertRaises(ValueError):
632n/a ss.get_channel_binding("unknown-type")
633n/a s.close()
635n/a @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
636n/a "'tls-unique' channel binding not available")
637n/a def test_tls_unique_channel_binding(self):
638n/a # unconnected should return None for known type
639n/a s = socket.socket(socket.AF_INET)
640n/a with test_wrap_socket(s) as ss:
641n/a self.assertIsNone(ss.get_channel_binding("tls-unique"))
642n/a # the same for server-side
643n/a s = socket.socket(socket.AF_INET)
644n/a with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
645n/a self.assertIsNone(ss.get_channel_binding("tls-unique"))
647n/a def test_dealloc_warn(self):
648n/a ss = test_wrap_socket(socket.socket(socket.AF_INET))
649n/a r = repr(ss)
650n/a with self.assertWarns(ResourceWarning) as cm:
651n/a ss = None
652n/a support.gc_collect()
653n/a self.assertIn(r, str(cm.warning.args[0]))
655n/a def test_get_default_verify_paths(self):
656n/a paths = ssl.get_default_verify_paths()
657n/a self.assertEqual(len(paths), 6)
658n/a self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
660n/a with support.EnvironmentVarGuard() as env:
661n/a env["SSL_CERT_DIR"] = CAPATH
662n/a env["SSL_CERT_FILE"] = CERTFILE
663n/a paths = ssl.get_default_verify_paths()
664n/a self.assertEqual(paths.cafile, CERTFILE)
665n/a self.assertEqual(paths.capath, CAPATH)
667n/a @unittest.skipUnless(sys.platform == "win32", "Windows specific")
668n/a def test_enum_certificates(self):
669n/a self.assertTrue(ssl.enum_certificates("CA"))
670n/a self.assertTrue(ssl.enum_certificates("ROOT"))
672n/a self.assertRaises(TypeError, ssl.enum_certificates)
673n/a self.assertRaises(WindowsError, ssl.enum_certificates, "")
675n/a trust_oids = set()
676n/a for storename in ("CA", "ROOT"):
677n/a store = ssl.enum_certificates(storename)
678n/a self.assertIsInstance(store, list)
679n/a for element in store:
680n/a self.assertIsInstance(element, tuple)
681n/a self.assertEqual(len(element), 3)
682n/a cert, enc, trust = element
683n/a self.assertIsInstance(cert, bytes)
684n/a self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
685n/a self.assertIsInstance(trust, (set, bool))
686n/a if isinstance(trust, set):
687n/a trust_oids.update(trust)
689n/a serverAuth = ""
690n/a self.assertIn(serverAuth, trust_oids)
692n/a @unittest.skipUnless(sys.platform == "win32", "Windows specific")
693n/a def test_enum_crls(self):
694n/a self.assertTrue(ssl.enum_crls("CA"))
695n/a self.assertRaises(TypeError, ssl.enum_crls)
696n/a self.assertRaises(WindowsError, ssl.enum_crls, "")
698n/a crls = ssl.enum_crls("CA")
699n/a self.assertIsInstance(crls, list)
700n/a for element in crls:
701n/a self.assertIsInstance(element, tuple)
702n/a self.assertEqual(len(element), 2)
703n/a self.assertIsInstance(element[0], bytes)
704n/a self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
707n/a def test_asn1object(self):
708n/a expected = (129, 'serverAuth', 'TLS Web Server Authentication',
709n/a '')
711n/a val = ssl._ASN1Object('')
712n/a self.assertEqual(val, expected)
713n/a self.assertEqual(val.nid, 129)
714n/a self.assertEqual(val.shortname, 'serverAuth')
715n/a self.assertEqual(val.longname, 'TLS Web Server Authentication')
716n/a self.assertEqual(val.oid, '')
717n/a self.assertIsInstance(val, ssl._ASN1Object)
718n/a self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
720n/a val = ssl._ASN1Object.fromnid(129)
721n/a self.assertEqual(val, expected)
722n/a self.assertIsInstance(val, ssl._ASN1Object)
723n/a self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
724n/a with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
725n/a ssl._ASN1Object.fromnid(100000)
726n/a for i in range(1000):
727n/a try:
728n/a obj = ssl._ASN1Object.fromnid(i)
729n/a except ValueError:
730n/a pass
731n/a else:
732n/a self.assertIsInstance(obj.nid, int)
733n/a self.assertIsInstance(obj.shortname, str)
734n/a self.assertIsInstance(obj.longname, str)
735n/a self.assertIsInstance(obj.oid, (str, type(None)))
737n/a val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
738n/a self.assertEqual(val, expected)
739n/a self.assertIsInstance(val, ssl._ASN1Object)
740n/a self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
741n/a self.assertEqual(ssl._ASN1Object.fromname(''),
742n/a expected)
743n/a with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
744n/a ssl._ASN1Object.fromname('serverauth')
746n/a def test_purpose_enum(self):
747n/a val = ssl._ASN1Object('')
748n/a self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
749n/a self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
750n/a self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
751n/a self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
752n/a self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
753n/a '')
755n/a val = ssl._ASN1Object('')
756n/a self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
757n/a self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
758n/a self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
759n/a self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
760n/a self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
761n/a '')
763n/a def test_unsupported_dtls(self):
764n/a s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
765n/a self.addCleanup(s.close)
766n/a with self.assertRaises(NotImplementedError) as cx:
767n/a test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
768n/a self.assertEqual(str(cx.exception), "only stream sockets are supported")
769n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
770n/a with self.assertRaises(NotImplementedError) as cx:
771n/a ctx.wrap_socket(s)
772n/a self.assertEqual(str(cx.exception), "only stream sockets are supported")
774n/a def cert_time_ok(self, timestring, timestamp):
775n/a self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
777n/a def cert_time_fail(self, timestring):
778n/a with self.assertRaises(ValueError):
779n/a ssl.cert_time_to_seconds(timestring)
781n/a @unittest.skipUnless(utc_offset(),
782n/a 'local time needs to be different from UTC')
783n/a def test_cert_time_to_seconds_timezone(self):
784n/a # Issue #19940: ssl.cert_time_to_seconds() returns wrong
785n/a # results if local timezone is not UTC
786n/a self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
787n/a self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
789n/a def test_cert_time_to_seconds(self):
790n/a timestring = "Jan 5 09:34:43 2018 GMT"
791n/a ts = 1515144883.0
792n/a self.cert_time_ok(timestring, ts)
793n/a # accept keyword parameter, assert its name
794n/a self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
795n/a # accept both %e and %d (space or zero generated by strftime)
796n/a self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
797n/a # case-insensitive
798n/a self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
799n/a self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
800n/a self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
801n/a self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
802n/a self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
803n/a self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
804n/a self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
805n/a self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
807n/a newyear_ts = 1230768000.0
808n/a # leap seconds
809n/a self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
810n/a # same timestamp
811n/a self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
813n/a self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
814n/a # allow 60th second (even if it is not a leap second)
815n/a self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
816n/a # allow 2nd leap second for compatibility with time.strptime()
817n/a self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
818n/a self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
820n/a # no special treatement for the special value:
821n/a # 99991231235959Z (rfc 5280)
822n/a self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
824n/a @support.run_with_locale('LC_ALL', '')
825n/a def test_cert_time_to_seconds_locale(self):
826n/a # `cert_time_to_seconds()` should be locale independent
828n/a def local_february_name():
829n/a return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
831n/a if local_february_name().lower() == 'feb':
832n/a self.skipTest("locale-specific month name needs to be "
833n/a "different from C locale")
835n/a # locale-independent
836n/a self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
837n/a self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
839n/a def test_connect_ex_error(self):
840n/a server = socket.socket(socket.AF_INET)
841n/a self.addCleanup(server.close)
842n/a port = support.bind_port(server) # Reserve port but don't listen
843n/a s = test_wrap_socket(socket.socket(socket.AF_INET),
844n/a cert_reqs=ssl.CERT_REQUIRED)
845n/a self.addCleanup(s.close)
846n/a rc = s.connect_ex((HOST, port))
847n/a # Issue #19919: Windows machines or VMs hosted on Windows
848n/a # machines sometimes return EWOULDBLOCK.
849n/a errors = (
851n/a errno.EWOULDBLOCK,
852n/a )
853n/a self.assertIn(rc, errors)
856n/aclass ContextTests(unittest.TestCase):
858n/a @skip_if_broken_ubuntu_ssl
859n/a def test_constructor(self):
860n/a for protocol in PROTOCOLS:
861n/a ssl.SSLContext(protocol)
862n/a ctx = ssl.SSLContext()
863n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
864n/a self.assertRaises(ValueError, ssl.SSLContext, -1)
865n/a self.assertRaises(ValueError, ssl.SSLContext, 42)
867n/a @skip_if_broken_ubuntu_ssl
868n/a def test_protocol(self):
869n/a for proto in PROTOCOLS:
870n/a ctx = ssl.SSLContext(proto)
871n/a self.assertEqual(ctx.protocol, proto)
873n/a def test_ciphers(self):
874n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
875n/a ctx.set_ciphers("ALL")
876n/a ctx.set_ciphers("DEFAULT")
877n/a with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
878n/a ctx.set_ciphers("^$:,;?*'dorothyx")
880n/a @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
881n/a def test_get_ciphers(self):
882n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
883n/a ctx.set_ciphers('AESGCM')
884n/a names = set(d['name'] for d in ctx.get_ciphers())
885n/a self.assertIn('AES256-GCM-SHA384', names)
886n/a self.assertIn('AES128-GCM-SHA256', names)
888n/a @skip_if_broken_ubuntu_ssl
889n/a def test_options(self):
890n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
891n/a # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
892n/a default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
893n/a # SSLContext also enables these by default
896n/a self.assertEqual(default, ctx.options)
897n/a ctx.options |= ssl.OP_NO_TLSv1
898n/a self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
899n/a if can_clear_options():
900n/a ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
901n/a self.assertEqual(default, ctx.options)
902n/a ctx.options = 0
903n/a # Ubuntu has OP_NO_SSLv3 forced on by default
904n/a self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
905n/a else:
906n/a with self.assertRaises(ValueError):
907n/a ctx.options = 0
909n/a def test_verify_mode(self):
910n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
911n/a # Default value
912n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
913n/a ctx.verify_mode = ssl.CERT_OPTIONAL
914n/a self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
915n/a ctx.verify_mode = ssl.CERT_REQUIRED
916n/a self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
917n/a ctx.verify_mode = ssl.CERT_NONE
918n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
919n/a with self.assertRaises(TypeError):
920n/a ctx.verify_mode = None
921n/a with self.assertRaises(ValueError):
922n/a ctx.verify_mode = 42
924n/a @unittest.skipUnless(have_verify_flags(),
925n/a "verify_flags need OpenSSL > 0.9.8")
926n/a def test_verify_flags(self):
927n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
928n/a # default value
929n/a tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
930n/a self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
931n/a ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
932n/a self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
933n/a ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
934n/a self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
935n/a ctx.verify_flags = ssl.VERIFY_DEFAULT
936n/a self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
937n/a # supports any value
938n/a ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
939n/a self.assertEqual(ctx.verify_flags,
941n/a with self.assertRaises(TypeError):
942n/a ctx.verify_flags = None
944n/a def test_load_cert_chain(self):
945n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
946n/a # Combined key and cert in a single file
947n/a ctx.load_cert_chain(CERTFILE, keyfile=None)
948n/a ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
949n/a self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
950n/a with self.assertRaises(OSError) as cm:
951n/a ctx.load_cert_chain(NONEXISTINGCERT)
952n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
953n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
954n/a ctx.load_cert_chain(BADCERT)
955n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
956n/a ctx.load_cert_chain(EMPTYCERT)
957n/a # Separate key and cert
958n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
959n/a ctx.load_cert_chain(ONLYCERT, ONLYKEY)
960n/a ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
961n/a ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
962n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
963n/a ctx.load_cert_chain(ONLYCERT)
964n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
965n/a ctx.load_cert_chain(ONLYKEY)
966n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
967n/a ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
968n/a # Mismatching key and cert
969n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
970n/a with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
971n/a ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
972n/a # Password protected key and cert
973n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
974n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
975n/a ctx.load_cert_chain(CERTFILE_PROTECTED,
976n/a password=bytearray(KEY_PASSWORD.encode()))
978n/a ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
979n/a ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
980n/a bytearray(KEY_PASSWORD.encode()))
981n/a with self.assertRaisesRegex(TypeError, "should be a string"):
982n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
983n/a with self.assertRaises(ssl.SSLError):
984n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
985n/a with self.assertRaisesRegex(ValueError, "cannot be longer"):
986n/a # openssl has a fixed limit on the password buffer.
987n/a # PEM_BUFSIZE is generally set to 1kb.
988n/a # Return a string larger than this.
989n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
990n/a # Password callback
991n/a def getpass_unicode():
992n/a return KEY_PASSWORD
993n/a def getpass_bytes():
994n/a return KEY_PASSWORD.encode()
995n/a def getpass_bytearray():
996n/a return bytearray(KEY_PASSWORD.encode())
997n/a def getpass_badpass():
998n/a return "badpass"
999n/a def getpass_huge():
1000n/a return b'a' * (1024 * 1024)
1001n/a def getpass_bad_type():
1002n/a return 9
1003n/a def getpass_exception():
1004n/a raise Exception('getpass error')
1005n/a class GetPassCallable:
1006n/a def __call__(self):
1007n/a return KEY_PASSWORD
1008n/a def getpass(self):
1009n/a return KEY_PASSWORD
1010n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1011n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1012n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1013n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1014n/a ctx.load_cert_chain(CERTFILE_PROTECTED,
1015n/a password=GetPassCallable().getpass)
1016n/a with self.assertRaises(ssl.SSLError):
1017n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1018n/a with self.assertRaisesRegex(ValueError, "cannot be longer"):
1019n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1020n/a with self.assertRaisesRegex(TypeError, "must return a string"):
1021n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1022n/a with self.assertRaisesRegex(Exception, "getpass error"):
1023n/a ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1024n/a # Make sure the password function isn't called if it isn't needed
1025n/a ctx.load_cert_chain(CERTFILE, password=getpass_exception)
1027n/a def test_load_verify_locations(self):
1028n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1029n/a ctx.load_verify_locations(CERTFILE)
1030n/a ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1031n/a ctx.load_verify_locations(BYTES_CERTFILE)
1032n/a ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1033n/a self.assertRaises(TypeError, ctx.load_verify_locations)
1034n/a self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
1035n/a with self.assertRaises(OSError) as cm:
1036n/a ctx.load_verify_locations(NONEXISTINGCERT)
1037n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
1038n/a with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
1039n/a ctx.load_verify_locations(BADCERT)
1040n/a ctx.load_verify_locations(CERTFILE, CAPATH)
1041n/a ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1043n/a # Issue #10989: crash if the second argument type is invalid
1044n/a self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1046n/a def test_load_verify_cadata(self):
1047n/a # test cadata
1048n/a with open(CAFILE_CACERT) as f:
1049n/a cacert_pem = f.read()
1050n/a cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1051n/a with open(CAFILE_NEURONIO) as f:
1052n/a neuronio_pem = f.read()
1053n/a neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1055n/a # test PEM
1056n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1057n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1058n/a ctx.load_verify_locations(cadata=cacert_pem)
1059n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1060n/a ctx.load_verify_locations(cadata=neuronio_pem)
1061n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1062n/a # cert already in hash table
1063n/a ctx.load_verify_locations(cadata=neuronio_pem)
1064n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1066n/a # combined
1067n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1068n/a combined = "\n".join((cacert_pem, neuronio_pem))
1069n/a ctx.load_verify_locations(cadata=combined)
1070n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1072n/a # with junk around the certs
1073n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1074n/a combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1075n/a neuronio_pem, "tail"]
1076n/a ctx.load_verify_locations(cadata="\n".join(combined))
1077n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1079n/a # test DER
1080n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1081n/a ctx.load_verify_locations(cadata=cacert_der)
1082n/a ctx.load_verify_locations(cadata=neuronio_der)
1083n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1084n/a # cert already in hash table
1085n/a ctx.load_verify_locations(cadata=cacert_der)
1086n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1088n/a # combined
1089n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1090n/a combined = b"".join((cacert_der, neuronio_der))
1091n/a ctx.load_verify_locations(cadata=combined)
1092n/a self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1094n/a # error cases
1095n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1096n/a self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1098n/a with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1099n/a ctx.load_verify_locations(cadata="broken")
1100n/a with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1101n/a ctx.load_verify_locations(cadata=b"broken")
1104n/a def test_load_dh_params(self):
1105n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1106n/a ctx.load_dh_params(DHFILE)
1107n/a if os.name != 'nt':
1108n/a ctx.load_dh_params(BYTES_DHFILE)
1109n/a self.assertRaises(TypeError, ctx.load_dh_params)
1110n/a self.assertRaises(TypeError, ctx.load_dh_params, None)
1111n/a with self.assertRaises(FileNotFoundError) as cm:
1112n/a ctx.load_dh_params(NONEXISTINGCERT)
1113n/a self.assertEqual(cm.exception.errno, errno.ENOENT)
1114n/a with self.assertRaises(ssl.SSLError) as cm:
1115n/a ctx.load_dh_params(CERTFILE)
1117n/a @skip_if_broken_ubuntu_ssl
1118n/a def test_session_stats(self):
1119n/a for proto in PROTOCOLS:
1120n/a ctx = ssl.SSLContext(proto)
1121n/a self.assertEqual(ctx.session_stats(), {
1122n/a 'number': 0,
1123n/a 'connect': 0,
1124n/a 'connect_good': 0,
1125n/a 'connect_renegotiate': 0,
1126n/a 'accept': 0,
1127n/a 'accept_good': 0,
1128n/a 'accept_renegotiate': 0,
1129n/a 'hits': 0,
1130n/a 'misses': 0,
1131n/a 'timeouts': 0,
1132n/a 'cache_full': 0,
1133n/a })
1135n/a def test_set_default_verify_paths(self):
1136n/a # There's not much we can do to test that it acts as expected,
1137n/a # so just check it doesn't crash or raise an exception.
1138n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1139n/a ctx.set_default_verify_paths()
1141n/a @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
1142n/a def test_set_ecdh_curve(self):
1143n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1144n/a ctx.set_ecdh_curve("prime256v1")
1145n/a ctx.set_ecdh_curve(b"prime256v1")
1146n/a self.assertRaises(TypeError, ctx.set_ecdh_curve)
1147n/a self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1148n/a self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1149n/a self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1151n/a @needs_sni
1152n/a def test_sni_callback(self):
1153n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1155n/a # set_servername_callback expects a callable, or None
1156n/a self.assertRaises(TypeError, ctx.set_servername_callback)
1157n/a self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1158n/a self.assertRaises(TypeError, ctx.set_servername_callback, "")
1159n/a self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1161n/a def dummycallback(sock, servername, ctx):
1162n/a pass
1163n/a ctx.set_servername_callback(None)
1164n/a ctx.set_servername_callback(dummycallback)
1166n/a @needs_sni
1167n/a def test_sni_callback_refcycle(self):
1168n/a # Reference cycles through the servername callback are detected
1169n/a # and cleared.
1170n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1171n/a def dummycallback(sock, servername, ctx, cycle=ctx):
1172n/a pass
1173n/a ctx.set_servername_callback(dummycallback)
1174n/a wr = weakref.ref(ctx)
1175n/a del ctx, dummycallback
1176n/a gc.collect()
1177n/a self.assertIs(wr(), None)
1179n/a def test_cert_store_stats(self):
1180n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1181n/a self.assertEqual(ctx.cert_store_stats(),
1182n/a {'x509_ca': 0, 'crl': 0, 'x509': 0})
1183n/a ctx.load_cert_chain(CERTFILE)
1184n/a self.assertEqual(ctx.cert_store_stats(),
1185n/a {'x509_ca': 0, 'crl': 0, 'x509': 0})
1186n/a ctx.load_verify_locations(CERTFILE)
1187n/a self.assertEqual(ctx.cert_store_stats(),
1188n/a {'x509_ca': 0, 'crl': 0, 'x509': 1})
1189n/a ctx.load_verify_locations(CAFILE_CACERT)
1190n/a self.assertEqual(ctx.cert_store_stats(),
1191n/a {'x509_ca': 1, 'crl': 0, 'x509': 2})
1193n/a def test_get_ca_certs(self):
1194n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1195n/a self.assertEqual(ctx.get_ca_certs(), [])
1196n/a # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1197n/a ctx.load_verify_locations(CERTFILE)
1198n/a self.assertEqual(ctx.get_ca_certs(), [])
1199n/a # but CAFILE_CACERT is a CA cert
1200n/a ctx.load_verify_locations(CAFILE_CACERT)
1201n/a self.assertEqual(ctx.get_ca_certs(),
1202n/a [{'issuer': ((('organizationName', 'Root CA'),),
1203n/a (('organizationalUnitName', 'http://www.cacert.org'),),
1204n/a (('commonName', 'CA Cert Signing Authority'),),
1205n/a (('emailAddress', 'support@cacert.org'),)),
1206n/a 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1207n/a 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1208n/a 'serialNumber': '00',
1209n/a 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1210n/a 'subject': ((('organizationName', 'Root CA'),),
1211n/a (('organizationalUnitName', 'http://www.cacert.org'),),
1212n/a (('commonName', 'CA Cert Signing Authority'),),
1213n/a (('emailAddress', 'support@cacert.org'),)),
1214n/a 'version': 3}])
1216n/a with open(CAFILE_CACERT) as f:
1217n/a pem = f.read()
1218n/a der = ssl.PEM_cert_to_DER_cert(pem)
1219n/a self.assertEqual(ctx.get_ca_certs(True), [der])
1221n/a def test_load_default_certs(self):
1222n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1223n/a ctx.load_default_certs()
1225n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1226n/a ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1227n/a ctx.load_default_certs()
1229n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1230n/a ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1232n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1233n/a self.assertRaises(TypeError, ctx.load_default_certs, None)
1234n/a self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1236n/a @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
1237n/a @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
1238n/a def test_load_default_certs_env(self):
1239n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1240n/a with support.EnvironmentVarGuard() as env:
1241n/a env["SSL_CERT_DIR"] = CAPATH
1242n/a env["SSL_CERT_FILE"] = CERTFILE
1243n/a ctx.load_default_certs()
1244n/a self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1246n/a @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1247n/a def test_load_default_certs_env_windows(self):
1248n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1249n/a ctx.load_default_certs()
1250n/a stats = ctx.cert_store_stats()
1252n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1253n/a with support.EnvironmentVarGuard() as env:
1254n/a env["SSL_CERT_DIR"] = CAPATH
1255n/a env["SSL_CERT_FILE"] = CERTFILE
1256n/a ctx.load_default_certs()
1257n/a stats["x509"] += 1
1258n/a self.assertEqual(ctx.cert_store_stats(), stats)
1260n/a def _assert_context_options(self, ctx):
1261n/a self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1262n/a if OP_NO_COMPRESSION != 0:
1263n/a self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1265n/a if OP_SINGLE_DH_USE != 0:
1266n/a self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1268n/a if OP_SINGLE_ECDH_USE != 0:
1269n/a self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1272n/a self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1275n/a def test_create_default_context(self):
1276n/a ctx = ssl.create_default_context()
1278n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1279n/a self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1280n/a self.assertTrue(ctx.check_hostname)
1281n/a self._assert_context_options(ctx)
1284n/a with open(SIGNING_CA) as f:
1285n/a cadata = f.read()
1286n/a ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1287n/a cadata=cadata)
1288n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1289n/a self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1290n/a self._assert_context_options(ctx)
1292n/a ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1293n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1294n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1295n/a self._assert_context_options(ctx)
1297n/a def test__create_stdlib_context(self):
1298n/a ctx = ssl._create_stdlib_context()
1299n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1300n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1301n/a self.assertFalse(ctx.check_hostname)
1302n/a self._assert_context_options(ctx)
1304n/a ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1305n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1306n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1307n/a self._assert_context_options(ctx)
1309n/a ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1310n/a cert_reqs=ssl.CERT_REQUIRED,
1311n/a check_hostname=True)
1312n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1313n/a self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1314n/a self.assertTrue(ctx.check_hostname)
1315n/a self._assert_context_options(ctx)
1317n/a ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1318n/a self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1319n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1320n/a self._assert_context_options(ctx)
1322n/a def test_check_hostname(self):
1323n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1324n/a self.assertFalse(ctx.check_hostname)
1327n/a with self.assertRaises(ValueError):
1328n/a ctx.check_hostname = True
1329n/a ctx.verify_mode = ssl.CERT_REQUIRED
1330n/a self.assertFalse(ctx.check_hostname)
1331n/a ctx.check_hostname = True
1332n/a self.assertTrue(ctx.check_hostname)
1334n/a ctx.verify_mode = ssl.CERT_OPTIONAL
1335n/a ctx.check_hostname = True
1336n/a self.assertTrue(ctx.check_hostname)
1338n/a # Cannot set CERT_NONE with check_hostname enabled
1339n/a with self.assertRaises(ValueError):
1340n/a ctx.verify_mode = ssl.CERT_NONE
1341n/a ctx.check_hostname = False
1342n/a self.assertFalse(ctx.check_hostname)
1344n/a def test_context_client_server(self):
1345n/a # PROTOCOL_TLS_CLIENT has sane defaults
1346n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1347n/a self.assertTrue(ctx.check_hostname)
1348n/a self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1350n/a # PROTOCOL_TLS_SERVER has different but also sane defaults
1351n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1352n/a self.assertFalse(ctx.check_hostname)
1353n/a self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1356n/aclass SSLErrorTests(unittest.TestCase):
1358n/a def test_str(self):
1359n/a # The str() of a SSLError doesn't include the errno
1360n/a e = ssl.SSLError(1, "foo")
1361n/a self.assertEqual(str(e), "foo")
1362n/a self.assertEqual(e.errno, 1)
1363n/a # Same for a subclass
1364n/a e = ssl.SSLZeroReturnError(1, "foo")
1365n/a self.assertEqual(str(e), "foo")
1366n/a self.assertEqual(e.errno, 1)
1368n/a def test_lib_reason(self):
1369n/a # Test the library and reason attributes
1370n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1371n/a with self.assertRaises(ssl.SSLError) as cm:
1372n/a ctx.load_dh_params(CERTFILE)
1373n/a self.assertEqual(cm.exception.library, 'PEM')
1374n/a self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1375n/a s = str(cm.exception)
1376n/a self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1378n/a def test_subclass(self):
1379n/a # Check that the appropriate SSLError subclass is raised
1380n/a # (this only tests one of them)
1381n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1382n/a with socket.socket() as s:
1383n/a s.bind(("", 0))
1384n/a s.listen()
1385n/a c = socket.socket()
1386n/a c.connect(s.getsockname())
1387n/a c.setblocking(False)
1388n/a with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
1389n/a with self.assertRaises(ssl.SSLWantReadError) as cm:
1390n/a c.do_handshake()
1391n/a s = str(cm.exception)
1392n/a self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1393n/a # For compatibility
1394n/a self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1397n/aclass MemoryBIOTests(unittest.TestCase):
1399n/a def test_read_write(self):
1400n/a bio = ssl.MemoryBIO()
1401n/a bio.write(b'foo')
1402n/a self.assertEqual(bio.read(), b'foo')
1403n/a self.assertEqual(bio.read(), b'')
1404n/a bio.write(b'foo')
1405n/a bio.write(b'bar')
1406n/a self.assertEqual(bio.read(), b'foobar')
1407n/a self.assertEqual(bio.read(), b'')
1408n/a bio.write(b'baz')
1409n/a self.assertEqual(bio.read(2), b'ba')
1410n/a self.assertEqual(bio.read(1), b'z')
1411n/a self.assertEqual(bio.read(1), b'')
1413n/a def test_eof(self):
1414n/a bio = ssl.MemoryBIO()
1415n/a self.assertFalse(bio.eof)
1416n/a self.assertEqual(bio.read(), b'')
1417n/a self.assertFalse(bio.eof)
1418n/a bio.write(b'foo')
1419n/a self.assertFalse(bio.eof)
1420n/a bio.write_eof()
1421n/a self.assertFalse(bio.eof)
1422n/a self.assertEqual(bio.read(2), b'fo')
1423n/a self.assertFalse(bio.eof)
1424n/a self.assertEqual(bio.read(1), b'o')
1425n/a self.assertTrue(bio.eof)
1426n/a self.assertEqual(bio.read(), b'')
1427n/a self.assertTrue(bio.eof)
1429n/a def test_pending(self):
1430n/a bio = ssl.MemoryBIO()
1431n/a self.assertEqual(bio.pending, 0)
1432n/a bio.write(b'foo')
1433n/a self.assertEqual(bio.pending, 3)
1434n/a for i in range(3):
1435n/a bio.read(1)
1436n/a self.assertEqual(bio.pending, 3-i-1)
1437n/a for i in range(3):
1438n/a bio.write(b'x')
1439n/a self.assertEqual(bio.pending, i+1)
1440n/a bio.read()
1441n/a self.assertEqual(bio.pending, 0)
1443n/a def test_buffer_types(self):
1444n/a bio = ssl.MemoryBIO()
1445n/a bio.write(b'foo')
1446n/a self.assertEqual(bio.read(), b'foo')
1447n/a bio.write(bytearray(b'bar'))
1448n/a self.assertEqual(bio.read(), b'bar')
1449n/a bio.write(memoryview(b'baz'))
1450n/a self.assertEqual(bio.read(), b'baz')
1452n/a def test_error_types(self):
1453n/a bio = ssl.MemoryBIO()
1454n/a self.assertRaises(TypeError, bio.write, 'foo')
1455n/a self.assertRaises(TypeError, bio.write, None)
1456n/a self.assertRaises(TypeError, bio.write, True)
1457n/a self.assertRaises(TypeError, bio.write, 1)
1460n/a@unittest.skipUnless(_have_threads, "Needs threading module")
1461n/aclass SimpleBackgroundTests(unittest.TestCase):
1463n/a """Tests that connect to a simple server running in the background"""
1465n/a def setUp(self):
1466n/a server = ThreadedEchoServer(SIGNED_CERTFILE)
1467n/a self.server_addr = (HOST, server.port)
1468n/a server.__enter__()
1469n/a self.addCleanup(server.__exit__, None, None, None)
1471n/a def test_connect(self):
1472n/a with test_wrap_socket(socket.socket(socket.AF_INET),
1473n/a cert_reqs=ssl.CERT_NONE) as s:
1474n/a s.connect(self.server_addr)
1475n/a self.assertEqual({}, s.getpeercert())
1476n/a self.assertFalse(s.server_side)
1478n/a # this should succeed because we specify the root cert
1479n/a with test_wrap_socket(socket.socket(socket.AF_INET),
1480n/a cert_reqs=ssl.CERT_REQUIRED,
1481n/a ca_certs=SIGNING_CA) as s:
1482n/a s.connect(self.server_addr)
1483n/a self.assertTrue(s.getpeercert())
1484n/a self.assertFalse(s.server_side)
1486n/a def test_connect_fail(self):
1487n/a # This should fail because we have no verification certs. Connection
1488n/a # failure crashes ThreadedEchoServer, so run this in an independent
1489n/a # test method.
1490n/a s = test_wrap_socket(socket.socket(socket.AF_INET),
1491n/a cert_reqs=ssl.CERT_REQUIRED)
1492n/a self.addCleanup(s.close)
1493n/a self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1494n/a s.connect, self.server_addr)
1496n/a def test_connect_ex(self):
1497n/a # Issue #11326: check connect_ex() implementation
1498n/a s = test_wrap_socket(socket.socket(socket.AF_INET),
1499n/a cert_reqs=ssl.CERT_REQUIRED,
1500n/a ca_certs=SIGNING_CA)
1501n/a self.addCleanup(s.close)
1502n/a self.assertEqual(0, s.connect_ex(self.server_addr))
1503n/a self.assertTrue(s.getpeercert())
1505n/a def test_non_blocking_connect_ex(self):
1506n/a # Issue #11326: non-blocking connect_ex() should allow handshake
1507n/a # to proceed after the socket gets ready.
1508n/a s = test_wrap_socket(socket.socket(socket.AF_INET),
1509n/a cert_reqs=ssl.CERT_REQUIRED,
1510n/a ca_certs=SIGNING_CA,
1511n/a do_handshake_on_connect=False)
1512n/a self.addCleanup(s.close)
1513n/a s.setblocking(False)
1514n/a rc = s.connect_ex(self.server_addr)
1515n/a # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1516n/a self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1517n/a # Wait for connect to finish
1518n/a select.select([], [s], [], 5.0)
1519n/a # Non-blocking handshake
1520n/a while True:
1521n/a try:
1522n/a s.do_handshake()
1523n/a break
1524n/a except ssl.SSLWantReadError:
1525n/a select.select([s], [], [], 5.0)
1526n/a except ssl.SSLWantWriteError:
1527n/a select.select([], [s], [], 5.0)
1528n/a # SSL established
1529n/a self.assertTrue(s.getpeercert())
1531n/a def test_connect_with_context(self):
1532n/a # Same as test_connect, but with a separately created context
1533n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1534n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1535n/a s.connect(self.server_addr)
1536n/a self.assertEqual({}, s.getpeercert())
1537n/a # Same with a server hostname
1538n/a with ctx.wrap_socket(socket.socket(socket.AF_INET),
1539n/a server_hostname="dummy") as s:
1540n/a s.connect(self.server_addr)
1541n/a ctx.verify_mode = ssl.CERT_REQUIRED
1542n/a # This should succeed because we specify the root cert
1543n/a ctx.load_verify_locations(SIGNING_CA)
1544n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1545n/a s.connect(self.server_addr)
1546n/a cert = s.getpeercert()
1547n/a self.assertTrue(cert)
1549n/a def test_connect_with_context_fail(self):
1550n/a # This should fail because we have no verification certs. Connection
1551n/a # failure crashes ThreadedEchoServer, so run this in an independent
1552n/a # test method.
1553n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1554n/a ctx.verify_mode = ssl.CERT_REQUIRED
1555n/a s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1556n/a self.addCleanup(s.close)
1557n/a self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1558n/a s.connect, self.server_addr)
1560n/a def test_connect_capath(self):
1561n/a # Verify server certificates using the `capath` argument
1562n/a # NOTE: the subject hashing algorithm has been changed between
1563n/a # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1564n/a # contain both versions of each certificate (same content, different
1565n/a # filename) for this test to be portable across OpenSSL releases.
1566n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1567n/a ctx.verify_mode = ssl.CERT_REQUIRED
1568n/a ctx.load_verify_locations(capath=CAPATH)
1569n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1570n/a s.connect(self.server_addr)
1571n/a cert = s.getpeercert()
1572n/a self.assertTrue(cert)
1573n/a # Same with a bytes `capath` argument
1574n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1575n/a ctx.verify_mode = ssl.CERT_REQUIRED
1576n/a ctx.load_verify_locations(capath=BYTES_CAPATH)
1577n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1578n/a s.connect(self.server_addr)
1579n/a cert = s.getpeercert()
1580n/a self.assertTrue(cert)
1582n/a def test_connect_cadata(self):
1583n/a with open(SIGNING_CA) as f:
1584n/a pem = f.read()
1585n/a der = ssl.PEM_cert_to_DER_cert(pem)
1586n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1587n/a ctx.verify_mode = ssl.CERT_REQUIRED
1588n/a ctx.load_verify_locations(cadata=pem)
1589n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1590n/a s.connect(self.server_addr)
1591n/a cert = s.getpeercert()
1592n/a self.assertTrue(cert)
1594n/a # same with DER
1595n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1596n/a ctx.verify_mode = ssl.CERT_REQUIRED
1597n/a ctx.load_verify_locations(cadata=der)
1598n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1599n/a s.connect(self.server_addr)
1600n/a cert = s.getpeercert()
1601n/a self.assertTrue(cert)
1603n/a @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1604n/a def test_makefile_close(self):
1605n/a # Issue #5238: creating a file-like object with makefile() shouldn't
1606n/a # delay closing the underlying "real socket" (here tested with its
1607n/a # file descriptor, hence skipping the test under Windows).
1608n/a ss = test_wrap_socket(socket.socket(socket.AF_INET))
1609n/a ss.connect(self.server_addr)
1610n/a fd = ss.fileno()
1611n/a f = ss.makefile()
1612n/a f.close()
1613n/a # The fd is still open
1614n/a os.read(fd, 0)
1615n/a # Closing the SSL socket should close the fd too
1616n/a ss.close()
1617n/a gc.collect()
1618n/a with self.assertRaises(OSError) as e:
1619n/a os.read(fd, 0)
1620n/a self.assertEqual(e.exception.errno, errno.EBADF)
1622n/a def test_non_blocking_handshake(self):
1623n/a s = socket.socket(socket.AF_INET)
1624n/a s.connect(self.server_addr)
1625n/a s.setblocking(False)
1626n/a s = test_wrap_socket(s,
1627n/a cert_reqs=ssl.CERT_NONE,
1628n/a do_handshake_on_connect=False)
1629n/a self.addCleanup(s.close)
1630n/a count = 0
1631n/a while True:
1632n/a try:
1633n/a count += 1
1634n/a s.do_handshake()
1635n/a break
1636n/a except ssl.SSLWantReadError:
1637n/a select.select([s], [], [])
1638n/a except ssl.SSLWantWriteError:
1639n/a select.select([], [s], [])
1640n/a if support.verbose:
1641n/a sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
1643n/a def test_get_server_certificate(self):
1644n/a _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
1646n/a def test_get_server_certificate_fail(self):
1647n/a # Connection failure crashes ThreadedEchoServer, so run this in an
1648n/a # independent test method
1649n/a _test_get_server_certificate_fail(self, *self.server_addr)
1651n/a def test_ciphers(self):
1652n/a with test_wrap_socket(socket.socket(socket.AF_INET),
1653n/a cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1654n/a s.connect(self.server_addr)
1655n/a with test_wrap_socket(socket.socket(socket.AF_INET),
1656n/a cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1657n/a s.connect(self.server_addr)
1658n/a # Error checking can happen at instantiation or when connecting
1659n/a with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1660n/a with socket.socket(socket.AF_INET) as sock:
1661n/a s = test_wrap_socket(sock,
1662n/a cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1663n/a s.connect(self.server_addr)
1665n/a def test_get_ca_certs_capath(self):
1666n/a # capath certs are loaded on request
1667n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1668n/a ctx.verify_mode = ssl.CERT_REQUIRED
1669n/a ctx.load_verify_locations(capath=CAPATH)
1670n/a self.assertEqual(ctx.get_ca_certs(), [])
1671n/a with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1672n/a s.connect(self.server_addr)
1673n/a cert = s.getpeercert()
1674n/a self.assertTrue(cert)
1675n/a self.assertEqual(len(ctx.get_ca_certs()), 1)
1677n/a @needs_sni
1678n/a def test_context_setget(self):
1679n/a # Check that the context of a connected socket can be replaced.
1680n/a ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1681n/a ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1682n/a s = socket.socket(socket.AF_INET)
1683n/a with ctx1.wrap_socket(s) as ss:
1684n/a ss.connect(self.server_addr)
1685n/a self.assertIs(ss.context, ctx1)
1686n/a self.assertIs(ss._sslobj.context, ctx1)
1687n/a ss.context = ctx2
1688n/a self.assertIs(ss.context, ctx2)
1689n/a self.assertIs(ss._sslobj.context, ctx2)
1691n/a def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1692n/a # A simple IO loop. Call func(*args) depending on the error we get
1693n/a # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1694n/a timeout = kwargs.get('timeout', 10)
1695n/a count = 0
1696n/a while True:
1697n/a errno = None
1698n/a count += 1
1699n/a try:
1700n/a ret = func(*args)
1701n/a except ssl.SSLError as e:
1702n/a if e.errno not in (ssl.SSL_ERROR_WANT_READ,
1703n/a ssl.SSL_ERROR_WANT_WRITE):
1704n/a raise
1705n/a errno = e.errno
1706n/a # Get any data from the outgoing BIO irrespective of any error, and
1707n/a # send it to the socket.
1708n/a buf = outgoing.read()
1709n/a sock.sendall(buf)
1710n/a # If there's no error, we're done. For WANT_READ, we need to get
1711n/a # data from the socket and put it in the incoming BIO.
1712n/a if errno is None:
1713n/a break
1714n/a elif errno == ssl.SSL_ERROR_WANT_READ:
1715n/a buf = sock.recv(32768)
1716n/a if buf:
1717n/a incoming.write(buf)
1718n/a else:
1719n/a incoming.write_eof()
1720n/a if support.verbose:
1721n/a sys.stdout.write("Needed %d calls to complete %s().\n"
1722n/a % (count, func.__name__))
1723n/a return ret
1725n/a def test_bio_handshake(self):
1726n/a sock = socket.socket(socket.AF_INET)
1727n/a self.addCleanup(sock.close)
1728n/a sock.connect(self.server_addr)
1729n/a incoming = ssl.MemoryBIO()
1730n/a outgoing = ssl.MemoryBIO()
1731n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1732n/a ctx.verify_mode = ssl.CERT_REQUIRED
1733n/a ctx.load_verify_locations(SIGNING_CA)
1734n/a ctx.check_hostname = True
1735n/a sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1736n/a self.assertIs(sslobj._sslobj.owner, sslobj)
1737n/a self.assertIsNone(sslobj.cipher())
1738n/a self.assertIsNotNone(sslobj.shared_ciphers())
1739n/a self.assertRaises(ValueError, sslobj.getpeercert)
1740n/a if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1741n/a self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1742n/a self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1743n/a self.assertTrue(sslobj.cipher())
1744n/a self.assertIsNotNone(sslobj.shared_ciphers())
1745n/a self.assertTrue(sslobj.getpeercert())
1746n/a if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1747n/a self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1748n/a try:
1749n/a self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1750n/a except ssl.SSLSyscallError:
1751n/a # If the server shuts down the TCP connection without sending a
1752n/a # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1753n/a pass
1754n/a self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1756n/a def test_bio_read_write_data(self):
1757n/a sock = socket.socket(socket.AF_INET)
1758n/a self.addCleanup(sock.close)
1759n/a sock.connect(self.server_addr)
1760n/a incoming = ssl.MemoryBIO()
1761n/a outgoing = ssl.MemoryBIO()
1762n/a ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1763n/a ctx.verify_mode = ssl.CERT_NONE
1764n/a sslobj = ctx.wrap_bio(incoming, outgoing, False)
1765n/a self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1766n/a req = b'FOO\n'
1767n/a self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1768n/a buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1769n/a self.assertEqual(buf, b'foo\n')
1770n/a self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1773n/aclass NetworkedTests(unittest.TestCase):
1775n/a def test_timeout_connect_ex(self):
1776n/a # Issue #12065: on a timeout, connect_ex() should return the original
1777n/a # errno (mimicking the behaviour of non-SSL sockets).
1778n/a with support.transient_internet(REMOTE_HOST):
1779n/a s = test_wrap_socket(socket.socket(socket.AF_INET),
1780n/a cert_reqs=ssl.CERT_REQUIRED,
1781n/a do_handshake_on_connect=False)
1782n/a self.addCleanup(s.close)
1783n/a s.settimeout(0.0000001)
1784n/a rc = s.connect_ex((REMOTE_HOST, 443))
1785n/a if rc == 0:
1786n/a self.skipTest("REMOTE_HOST responded too quickly")
1787n/a self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1789n/a @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1790n/a def test_get_server_certificate_ipv6(self):
1791n/a with support.transient_internet('ipv6.google.com'):
1792n/a _test_get_server_certificate(self, 'ipv6.google.com', 443)
1793n/a _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1795n/a def test_algorithms(self):
1796n/a # Issue #8484: all algorithms should be available when verifying a
1797n/a # certificate.
1798n/a # SHA256 was added in OpenSSL 0.9.8
1799n/a if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1800n/a self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1801n/a # sha256.tbs-internet.com needs SNI to use the correct certificate
1802n/a if not ssl.HAS_SNI:
1803n/a self.skipTest("SNI needed for this test")
1804n/a # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1805n/a remote = ("sha256.tbs-internet.com", 443)
1806n/a sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1807n/a with support.transient_internet("sha256.tbs-internet.com"):
1808n/a ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1809n/a ctx.verify_mode = ssl.CERT_REQUIRED
1810n/a ctx.load_verify_locations(sha256_cert)
1811n/a s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1812n/a server_hostname="sha256.tbs-internet.com")
1813n/a try:
1814n/a s.connect(remote)
1815n/a if support.verbose:
1816n/a sys.stdout.write("\nCipher with %r is %r\n" %
1817n/a (remote, s.cipher()))
1818n/a sys.stdout.write("Certificate is:\n%s\n" %
1819n/a pprint.pformat(s.getpeercert()))
1820n/a finally:
1821n/a s.close()
1824n/adef _test_get_server_certificate(test, host, port, cert=None):
1825n/a pem = ssl.get_server_certificate((host, port))
1826n/a if not pem:
1827n/a test.fail("No server certificate on %s:%s!" % (host, port))
1829n/a pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1830n/a if not pem:
1831n/a test.fail("No server certificate on %s:%s!" % (host, port))
1832n/a if support.verbose:
1833n/a sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1835n/adef _test_get_server_certificate_fail(test, host, port):
1836n/a try:
1837n/a pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1838n/a except ssl.SSLError as x:
1839n/a #should fail
1840n/a if support.verbose:
1841n/a sys.stdout.write("%s\n" % x)
1842n/a else:
1843n/a test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1846n/aif _have_threads:
1847n/a from test.ssl_servers import make_https_server
1849n/a class ThreadedEchoServer(threading.Thread):
1851n/a class ConnectionHandler(threading.Thread):
1853n/a """A mildly complicated class, because we want it to work both
1854n/a with and without the SSL wrapper around the socket connection, so
1855n/a that we can test the STARTTLS functionality."""
1857n/a def __init__(self, server, connsock, addr):
1858n/a self.server = server
1859n/a self.running = False
1860n/a self.sock = connsock
1861n/a self.addr = addr
1862n/a self.sock.setblocking(1)
1863n/a self.sslconn = None
1864n/a threading.Thread.__init__(self)
1865n/a self.daemon = True
1867n/a def wrap_conn(self):
1868n/a try:
1869n/a self.sslconn = self.server.context.wrap_socket(
1870n/a self.sock, server_side=True)
1871n/a self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1872n/a self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1873n/a except (ssl.SSLError, ConnectionResetError) as e:
1874n/a # We treat ConnectionResetError as though it were an
1875n/a # SSLError - OpenSSL on Ubuntu abruptly closes the
1876n/a # connection when asked to use an unsupported protocol.
1877n/a #
1878n/a # XXX Various errors can have happened here, for example
1879n/a # a mismatching protocol version, an invalid certificate,
1880n/a # or a low-level bug. This should be made more discriminating.
1881n/a self.server.conn_errors.append(e)
1882n/a if self.server.chatty:
1883n/a handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
1884n/a self.running = False
1885n/a self.server.stop()
1886n/a self.close()
1887n/a return False
1888n/a else:
1889n/a self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
1890n/a if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1891n/a cert = self.sslconn.getpeercert()
1892n/a if support.verbose and self.server.chatty:
1893n/a sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1894n/a cert_binary = self.sslconn.getpeercert(True)
1895n/a if support.verbose and self.server.chatty:
1896n/a sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1897n/a cipher = self.sslconn.cipher()
1898n/a if support.verbose and self.server.chatty:
1899n/a sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1900n/a sys.stdout.write(" server: selected protocol is now "
1901n/a + str(self.sslconn.selected_npn_protocol()) + "\n")
1902n/a return True
1904n/a def read(self):
1905n/a if self.sslconn:
1906n/a return self.sslconn.read()
1907n/a else:
1908n/a return self.sock.recv(1024)
1910n/a def write(self, bytes):
1911n/a if self.sslconn:
1912n/a return self.sslconn.write(bytes)
1913n/a else:
1914n/a return self.sock.send(bytes)
1916n/a def close(self):
1917n/a if self.sslconn:
1918n/a self.sslconn.close()
1919n/a else:
1920n/a self.sock.close()
1922n/a def run(self):
1923n/a self.running = True
1924n/a if not self.server.starttls_server:
1925n/a if not self.wrap_conn():
1926n/a return
1927n/a while self.running:
1928n/a try:
1929n/a msg = self.read()
1930n/a stripped = msg.strip()
1931n/a if not stripped:
1932n/a # eof, so quit this handler
1933n/a self.running = False
1934n/a try:
1935n/a self.sock = self.sslconn.unwrap()
1936n/a except OSError:
1937n/a # Many tests shut the TCP connection down
1938n/a # without an SSL shutdown. This causes
1939n/a # unwrap() to raise OSError with errno=0!
1940n/a pass
1941n/a else:
1942n/a self.sslconn = None
1943n/a self.close()
1944n/a elif stripped == b'over':
1945n/a if support.verbose and self.server.connectionchatty:
1946n/a sys.stdout.write(" server: client closed connection\n")
1947n/a self.close()
1948n/a return
1949n/a elif (self.server.starttls_server and
1950n/a stripped == b'STARTTLS'):
1951n/a if support.verbose and self.server.connectionchatty:
1952n/a sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
1953n/a self.write(b"OK\n")
1954n/a if not self.wrap_conn():
1955n/a return
1956n/a elif (self.server.starttls_server and self.sslconn
1957n/a and stripped == b'ENDTLS'):
1958n/a if support.verbose and self.server.connectionchatty:
1959n/a sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
1960n/a self.write(b"OK\n")
1961n/a self.sock = self.sslconn.unwrap()
1962n/a self.sslconn = None
1963n/a if support.verbose and self.server.connectionchatty:
1964n/a sys.stdout.write(" server: connection is now unencrypted...\n")
1965n/a elif stripped == b'CB tls-unique':
1966n/a if support.verbose and self.server.connectionchatty:
1967n/a sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1968n/a data = self.sslconn.get_channel_binding("tls-unique")
1969n/a self.write(repr(data).encode("us-ascii") + b"\n")
1970n/a else:
1971n/a if (support.verbose and
1972n/a self.server.connectionchatty):
1973n/a ctype = (self.sslconn and "encrypted") or "unencrypted"
1974n/a sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1975n/a % (msg, ctype, msg.lower(), ctype))
1976n/a self.write(msg.lower())
1977n/a except OSError:
1978n/a if self.server.chatty:
1979n/a handle_error("Test server failure:\n")
1980n/a self.close()
1981n/a self.running = False
1982n/a # normally, we'd just stop here, but for the test
1983n/a # harness, we want to stop the server
1984n/a self.server.stop()
1986n/a def __init__(self, certificate=None, ssl_version=None,
1987n/a certreqs=None, cacerts=None,
1988n/a chatty=True, connectionchatty=False, starttls_server=False,
1989n/a npn_protocols=None, alpn_protocols=None,
1990n/a ciphers=None, context=None):
1991n/a if context:
1992n/a self.context = context
1993n/a else:
1994n/a self.context = ssl.SSLContext(ssl_version
1995n/a if ssl_version is not None
1996n/a else ssl.PROTOCOL_TLSv1)
1997n/a self.context.verify_mode = (certreqs if certreqs is not None
1998n/a else ssl.CERT_NONE)
1999n/a if cacerts:
2000n/a self.context.load_verify_locations(cacerts)
2001n/a if certificate:
2002n/a self.context.load_cert_chain(certificate)
2003n/a if npn_protocols:
2004n/a self.context.set_npn_protocols(npn_protocols)
2005n/a if alpn_protocols:
2006n/a self.context.set_alpn_protocols(alpn_protocols)
2007n/a if ciphers:
2008n/a self.context.set_ciphers(ciphers)
2009n/a self.chatty = chatty
2010n/a self.connectionchatty = connectionchatty
2011n/a self.starttls_server = starttls_server
2012n/a self.sock = socket.socket()
2013n/a self.port = support.bind_port(self.sock)
2014n/a self.flag = None
2015n/a self.active = False
2016n/a self.selected_npn_protocols = []
2017n/a self.selected_alpn_protocols = []
2018n/a self.shared_ciphers = []
2019n/a self.conn_errors = []
2020n/a threading.Thread.__init__(self)
2021n/a self.daemon = True
2023n/a def __enter__(self):
2024n/a self.start(threading.Event())
2025n/a self.flag.wait()
2026n/a return self
2028n/a def __exit__(self, *args):
2029n/a self.stop()
2030n/a self.join()
2032n/a def start(self, flag=None):
2033n/a self.flag = flag
2034n/a threading.Thread.start(self)
2036n/a def run(self):
2037n/a self.sock.settimeout(0.05)
2038n/a self.sock.listen()
2039n/a self.active = True
2040n/a if self.flag:
2041n/a # signal an event
2042n/a self.flag.set()
2043n/a while self.active:
2044n/a try:
2045n/a newconn, connaddr = self.sock.accept()
2046n/a if support.verbose and self.chatty:
2047n/a sys.stdout.write(' server: new connection from '
2048n/a + repr(connaddr) + '\n')
2049n/a handler = self.ConnectionHandler(self, newconn, connaddr)
2050n/a handler.start()
2051n/a handler.join()
2052n/a except socket.timeout:
2053n/a pass
2054n/a except KeyboardInterrupt:
2055n/a self.stop()
2056n/a self.sock.close()
2058n/a def stop(self):
2059n/a self.active = False
2061n/a class AsyncoreEchoServer(threading.Thread):
2063n/a # this one's based on asyncore.dispatcher
2065n/a class EchoServer (asyncore.dispatcher):
2067n/a class ConnectionHandler (asyncore.dispatcher_with_send):
2069n/a def __init__(self, conn, certfile):
2070n/a self.socket = test_wrap_socket(conn, server_side=True,
2071n/a certfile=certfile,
2072n/a do_handshake_on_connect=False)
2073n/a asyncore.dispatcher_with_send.__init__(self, self.socket)
2074n/a self._ssl_accepting = True
2075n/a self._do_ssl_handshake()
2077n/a def readable(self):
2078n/a if isinstance(self.socket, ssl.SSLSocket):
2079n/a while self.socket.pending() > 0:
2080n/a self.handle_read_event()
2081n/a return True
2083n/a def _do_ssl_handshake(self):
2084n/a try:
2085n/a self.socket.do_handshake()
2086n/a except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2087n/a return
2088n/a except ssl.SSLEOFError:
2089n/a return self.handle_close()
2090n/a except ssl.SSLError:
2091n/a raise
2092n/a except OSError as err:
2093n/a if err.args[0] == errno.ECONNABORTED:
2094n/a return self.handle_close()
2095n/a else:
2096n/a self._ssl_accepting = False
2098n/a def handle_read(self):
2099n/a if self._ssl_accepting:
2100n/a self._do_ssl_handshake()
2101n/a else:
2102n/a data = self.recv(1024)
2103n/a if support.verbose:
2104n/a sys.stdout.write(" server: read %s from client\n" % repr(data))
2105n/a if not data:
2106n/a self.close()
2107n/a else:
2108n/a self.send(data.lower())
2110n/a def handle_close(self):
2111n/a self.close()
2112n/a if support.verbose:
2113n/a sys.stdout.write(" server: closed connection %s\n" % self.socket)
2115n/a def handle_error(self):
2116n/a raise
2118n/a def __init__(self, certfile):
2119n/a self.certfile = certfile
2120n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2121n/a self.port = support.bind_port(sock, '')
2122n/a asyncore.dispatcher.__init__(self, sock)
2123n/a self.listen(5)
2125n/a def handle_accepted(self, sock_obj, addr):
2126n/a if support.verbose:
2127n/a sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2128n/a self.ConnectionHandler(sock_obj, self.certfile)
2130n/a def handle_error(self):
2131n/a raise
2133n/a def __init__(self, certfile):
2134n/a self.flag = None
2135n/a self.active = False
2136n/a self.server = self.EchoServer(certfile)
2137n/a self.port = self.server.port
2138n/a threading.Thread.__init__(self)
2139n/a self.daemon = True
2141n/a def __str__(self):
2142n/a return "<%s %s>" % (self.__class__.__name__, self.server)
2144n/a def __enter__(self):
2145n/a self.start(threading.Event())
2146n/a self.flag.wait()
2147n/a return self
2149n/a def __exit__(self, *args):
2150n/a if support.verbose:
2151n/a sys.stdout.write(" cleanup: stopping server.\n")
2152n/a self.stop()
2153n/a if support.verbose:
2154n/a sys.stdout.write(" cleanup: joining server thread.\n")
2155n/a self.join()
2156n/a if support.verbose:
2157n/a sys.stdout.write(" cleanup: successfully joined.\n")
2159n/a def start (self, flag=None):
2160n/a self.flag = flag
2161n/a threading.Thread.start(self)
2163n/a def run(self):
2164n/a self.active = True
2165n/a if self.flag:
2166n/a self.flag.set()
2167n/a while self.active:
2168n/a try:
2169n/a asyncore.loop(1)
2170n/a except:
2171n/a pass
2173n/a def stop(self):
2174n/a self.active = False
2175n/a self.server.close()
2177n/a def server_params_test(client_context, server_context, indata=b"FOO\n",
2178n/a chatty=True, connectionchatty=False, sni_name=None,
2179n/a session=None):
2180n/a """
2181n/a Launch a server, connect a client to it and try various reads
2182n/a and writes.
2183n/a """
2184n/a stats = {}
2185n/a server = ThreadedEchoServer(context=server_context,
2186n/a chatty=chatty,
2187n/a connectionchatty=False)
2188n/a with server:
2189n/a with client_context.wrap_socket(socket.socket(),
2190n/a server_hostname=sni_name, session=session) as s:
2191n/a s.connect((HOST, server.port))
2192n/a for arg in [indata, bytearray(indata), memoryview(indata)]:
2193n/a if connectionchatty:
2194n/a if support.verbose:
2195n/a sys.stdout.write(
2196n/a " client: sending %r...\n" % indata)
2197n/a s.write(arg)
2198n/a outdata = s.read()
2199n/a if connectionchatty:
2200n/a if support.verbose:
2201n/a sys.stdout.write(" client: read %r\n" % outdata)
2202n/a if outdata != indata.lower():
2203n/a raise AssertionError(
2204n/a "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2205n/a % (outdata[:20], len(outdata),
2206n/a indata[:20].lower(), len(indata)))
2207n/a s.write(b"over\n")
2208n/a if connectionchatty:
2209n/a if support.verbose:
2210n/a sys.stdout.write(" client: closing connection.\n")
2211n/a stats.update({
2212n/a 'compression': s.compression(),
2213n/a 'cipher': s.cipher(),
2214n/a 'peercert': s.getpeercert(),
2215n/a 'client_alpn_protocol': s.selected_alpn_protocol(),
2216n/a 'client_npn_protocol': s.selected_npn_protocol(),
2217n/a 'version': s.version(),
2218n/a 'session_reused': s.session_reused,
2219n/a 'session': s.session,
2220n/a })
2221n/a s.close()
2222n/a stats['server_alpn_protocols'] = server.selected_alpn_protocols
2223n/a stats['server_npn_protocols'] = server.selected_npn_protocols
2224n/a stats['server_shared_ciphers'] = server.shared_ciphers
2225n/a return stats
2227n/a def try_protocol_combo(server_protocol, client_protocol, expect_success,
2228n/a certsreqs=None, server_options=0, client_options=0):
2229n/a """
2230n/a Try to SSL-connect using *client_protocol* to *server_protocol*.
2231n/a If *expect_success* is true, assert that the connection succeeds,
2232n/a if it's false, assert that the connection fails.
2233n/a Also, if *expect_success* is a string, assert that it is the protocol
2234n/a version actually used by the connection.
2235n/a """
2236n/a if certsreqs is None:
2237n/a certsreqs = ssl.CERT_NONE
2238n/a certtype = {
2239n/a ssl.CERT_NONE: "CERT_NONE",
2242n/a }[certsreqs]
2243n/a if support.verbose:
2244n/a formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2245n/a sys.stdout.write(formatstr %
2246n/a (ssl.get_protocol_name(client_protocol),
2247n/a ssl.get_protocol_name(server_protocol),
2248n/a certtype))
2249n/a client_context = ssl.SSLContext(client_protocol)
2250n/a client_context.options |= client_options
2251n/a server_context = ssl.SSLContext(server_protocol)
2252n/a server_context.options |= server_options
2254n/a # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2255n/a # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2256n/a # starting from OpenSSL 1.0.0 (see issue #8322).
2257n/a if client_context.protocol == ssl.PROTOCOL_SSLv23:
2258n/a client_context.set_ciphers("ALL")
2260n/a for ctx in (client_context, server_context):
2261n/a ctx.verify_mode = certsreqs
2262n/a ctx.load_cert_chain(CERTFILE)
2263n/a ctx.load_verify_locations(CERTFILE)
2264n/a try:
2265n/a stats = server_params_test(client_context, server_context,
2266n/a chatty=False, connectionchatty=False)
2267n/a # Protocol mismatch can result in either an SSLError, or a
2268n/a # "Connection reset by peer" error.
2269n/a except ssl.SSLError:
2270n/a if expect_success:
2271n/a raise
2272n/a except OSError as e:
2273n/a if expect_success or e.errno != errno.ECONNRESET:
2274n/a raise
2275n/a else:
2276n/a if not expect_success:
2277n/a raise AssertionError(
2278n/a "Client protocol %s succeeded with server protocol %s!"
2279n/a % (ssl.get_protocol_name(client_protocol),
2280n/a ssl.get_protocol_name(server_protocol)))
2281n/a elif (expect_success is not True
2282n/a and expect_success != stats['version']):
2283n/a raise AssertionError("version mismatch: expected %r, got %r"
2284n/a % (expect_success, stats['version']))
2287n/a class ThreadedTests(unittest.TestCase):
2289n/a @skip_if_broken_ubuntu_ssl
2290n/a def test_echo(self):
2291n/a """Basic test of an SSL client connecting to a server"""
2292n/a if support.verbose:
2293n/a sys.stdout.write("\n")
2294n/a for protocol in PROTOCOLS:
2295n/a if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2296n/a continue
2297n/a with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2298n/a context = ssl.SSLContext(protocol)
2299n/a context.load_cert_chain(CERTFILE)
2300n/a server_params_test(context, context,
2301n/a chatty=True, connectionchatty=True)
2303n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2304n/a client_context.load_verify_locations(SIGNING_CA)
2305n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2306n/a # server_context.load_verify_locations(SIGNING_CA)
2307n/a server_context.load_cert_chain(SIGNED_CERTFILE2)
2309n/a with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2310n/a server_params_test(client_context=client_context,
2311n/a server_context=server_context,
2312n/a chatty=True, connectionchatty=True,
2313n/a sni_name='fakehostname')
2315n/a client_context.check_hostname = False
2316n/a with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2317n/a with self.assertRaises(ssl.SSLError) as e:
2318n/a server_params_test(client_context=server_context,
2319n/a server_context=client_context,
2320n/a chatty=True, connectionchatty=True,
2321n/a sni_name='fakehostname')
2322n/a self.assertIn('called a function you should not call',
2323n/a str(e.exception))
2325n/a with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2326n/a with self.assertRaises(ssl.SSLError) as e:
2327n/a server_params_test(client_context=server_context,
2328n/a server_context=server_context,
2329n/a chatty=True, connectionchatty=True)
2330n/a self.assertIn('called a function you should not call',
2331n/a str(e.exception))
2333n/a with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2334n/a with self.assertRaises(ssl.SSLError) as e:
2335n/a server_params_test(client_context=server_context,
2336n/a server_context=client_context,
2337n/a chatty=True, connectionchatty=True)
2338n/a self.assertIn('called a function you should not call',
2339n/a str(e.exception))
2342n/a def test_getpeercert(self):
2343n/a if support.verbose:
2344n/a sys.stdout.write("\n")
2345n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2346n/a context.verify_mode = ssl.CERT_REQUIRED
2347n/a context.load_verify_locations(CERTFILE)
2348n/a context.load_cert_chain(CERTFILE)
2349n/a server = ThreadedEchoServer(context=context, chatty=False)
2350n/a with server:
2351n/a s = context.wrap_socket(socket.socket(),
2352n/a do_handshake_on_connect=False)
2353n/a s.connect((HOST, server.port))
2354n/a # getpeercert() raise ValueError while the handshake isn't
2355n/a # done.
2356n/a with self.assertRaises(ValueError):
2357n/a s.getpeercert()
2358n/a s.do_handshake()
2359n/a cert = s.getpeercert()
2360n/a self.assertTrue(cert, "Can't get peer certificate.")
2361n/a cipher = s.cipher()
2362n/a if support.verbose:
2363n/a sys.stdout.write(pprint.pformat(cert) + '\n')
2364n/a sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2365n/a if 'subject' not in cert:
2366n/a self.fail("No subject field in certificate: %s." %
2367n/a pprint.pformat(cert))
2368n/a if ((('organizationName', 'Python Software Foundation'),)
2369n/a not in cert['subject']):
2370n/a self.fail(
2371n/a "Missing or invalid 'organizationName' field in certificate subject; "
2372n/a "should be 'Python Software Foundation'.")
2373n/a self.assertIn('notBefore', cert)
2374n/a self.assertIn('notAfter', cert)
2375n/a before = ssl.cert_time_to_seconds(cert['notBefore'])
2376n/a after = ssl.cert_time_to_seconds(cert['notAfter'])
2377n/a self.assertLess(before, after)
2378n/a s.close()
2380n/a @unittest.skipUnless(have_verify_flags(),
2381n/a "verify_flags need OpenSSL > 0.9.8")
2382n/a def test_crl_check(self):
2383n/a if support.verbose:
2384n/a sys.stdout.write("\n")
2386n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2387n/a server_context.load_cert_chain(SIGNED_CERTFILE)
2389n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2390n/a context.verify_mode = ssl.CERT_REQUIRED
2391n/a context.load_verify_locations(SIGNING_CA)
2392n/a tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2393n/a self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
2395n/a # VERIFY_DEFAULT should pass
2396n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2397n/a with server:
2398n/a with context.wrap_socket(socket.socket()) as s:
2399n/a s.connect((HOST, server.port))
2400n/a cert = s.getpeercert()
2401n/a self.assertTrue(cert, "Can't get peer certificate.")
2403n/a # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2404n/a context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2406n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2407n/a with server:
2408n/a with context.wrap_socket(socket.socket()) as s:
2409n/a with self.assertRaisesRegex(ssl.SSLError,
2410n/a "certificate verify failed"):
2411n/a s.connect((HOST, server.port))
2413n/a # now load a CRL file. The CRL file is signed by the CA.
2414n/a context.load_verify_locations(CRLFILE)
2416n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2417n/a with server:
2418n/a with context.wrap_socket(socket.socket()) as s:
2419n/a s.connect((HOST, server.port))
2420n/a cert = s.getpeercert()
2421n/a self.assertTrue(cert, "Can't get peer certificate.")
2423n/a def test_check_hostname(self):
2424n/a if support.verbose:
2425n/a sys.stdout.write("\n")
2427n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2428n/a server_context.load_cert_chain(SIGNED_CERTFILE)
2430n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2431n/a context.verify_mode = ssl.CERT_REQUIRED
2432n/a context.check_hostname = True
2433n/a context.load_verify_locations(SIGNING_CA)
2435n/a # correct hostname should verify
2436n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2437n/a with server:
2438n/a with context.wrap_socket(socket.socket(),
2439n/a server_hostname="localhost") as s:
2440n/a s.connect((HOST, server.port))
2441n/a cert = s.getpeercert()
2442n/a self.assertTrue(cert, "Can't get peer certificate.")
2444n/a # incorrect hostname should raise an exception
2445n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2446n/a with server:
2447n/a with context.wrap_socket(socket.socket(),
2448n/a server_hostname="invalid") as s:
2449n/a with self.assertRaisesRegex(ssl.CertificateError,
2450n/a "hostname 'invalid' doesn't match 'localhost'"):
2451n/a s.connect((HOST, server.port))
2453n/a # missing server_hostname arg should cause an exception, too
2454n/a server = ThreadedEchoServer(context=server_context, chatty=True)
2455n/a with server:
2456n/a with socket.socket() as s:
2457n/a with self.assertRaisesRegex(ValueError,
2458n/a "check_hostname requires server_hostname"):
2459n/a context.wrap_socket(s)
2461n/a def test_wrong_cert(self):
2462n/a """Connecting when the server rejects the client's certificate
2464n/a Launch a server with CERT_REQUIRED, and check that trying to
2465n/a connect to it with a wrong client certificate fails.
2466n/a """
2467n/a certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2468n/a "wrongcert.pem")
2469n/a server = ThreadedEchoServer(CERTFILE,
2470n/a certreqs=ssl.CERT_REQUIRED,
2471n/a cacerts=CERTFILE, chatty=False,
2472n/a connectionchatty=False)
2473n/a with server, \
2474n/a socket.socket() as sock, \
2475n/a test_wrap_socket(sock,
2476n/a certfile=certfile,
2477n/a ssl_version=ssl.PROTOCOL_TLSv1) as s:
2478n/a try:
2479n/a # Expect either an SSL error about the server rejecting
2480n/a # the connection, or a low-level connection reset (which
2481n/a # sometimes happens on Windows)
2482n/a s.connect((HOST, server.port))
2483n/a except ssl.SSLError as e:
2484n/a if support.verbose:
2485n/a sys.stdout.write("\nSSLError is %r\n" % e)
2486n/a except OSError as e:
2487n/a if e.errno != errno.ECONNRESET:
2488n/a raise
2489n/a if support.verbose:
2490n/a sys.stdout.write("\nsocket.error is %r\n" % e)
2491n/a else:
2492n/a self.fail("Use of invalid cert should have failed!")
2494n/a def test_rude_shutdown(self):
2495n/a """A brutal shutdown of an SSL server should raise an OSError
2496n/a in the client when attempting handshake.
2497n/a """
2498n/a listener_ready = threading.Event()
2499n/a listener_gone = threading.Event()
2501n/a s = socket.socket()
2502n/a port = support.bind_port(s, HOST)
2504n/a # `listener` runs in a thread. It sits in an accept() until
2505n/a # the main thread connects. Then it rudely closes the socket,
2506n/a # and sets Event `listener_gone` to let the main thread know
2507n/a # the socket is gone.
2508n/a def listener():
2509n/a s.listen()
2510n/a listener_ready.set()
2511n/a newsock, addr = s.accept()
2512n/a newsock.close()
2513n/a s.close()
2514n/a listener_gone.set()
2516n/a def connector():
2517n/a listener_ready.wait()
2518n/a with socket.socket() as c:
2519n/a c.connect((HOST, port))
2520n/a listener_gone.wait()
2521n/a try:
2522n/a ssl_sock = test_wrap_socket(c)
2523n/a except OSError:
2524n/a pass
2525n/a else:
2526n/a self.fail('connecting to closed SSL socket should have failed')
2528n/a t = threading.Thread(target=listener)
2529n/a t.start()
2530n/a try:
2531n/a connector()
2532n/a finally:
2533n/a t.join()
2535n/a @skip_if_broken_ubuntu_ssl
2536n/a @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2537n/a "OpenSSL is compiled without SSLv2 support")
2538n/a def test_protocol_sslv2(self):
2539n/a """Connecting to an SSLv2 server with various client options"""
2540n/a if support.verbose:
2541n/a sys.stdout.write("\n")
2542n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2543n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2544n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2545n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
2546n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2547n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2548n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2549n/a # SSLv23 client with specific SSL options
2550n/a if no_sslv2_implies_sslv3_hello():
2551n/a # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2552n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2553n/a client_options=ssl.OP_NO_SSLv2)
2554n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2555n/a client_options=ssl.OP_NO_SSLv3)
2556n/a try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2557n/a client_options=ssl.OP_NO_TLSv1)
2559n/a @skip_if_broken_ubuntu_ssl
2560n/a def test_protocol_sslv23(self):
2561n/a """Connecting to an SSLv23 server with various client options"""
2562n/a if support.verbose:
2563n/a sys.stdout.write("\n")
2564n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
2565n/a try:
2566n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2567n/a except OSError as x:
2568n/a # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2569n/a if support.verbose:
2570n/a sys.stdout.write(
2571n/a " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2572n/a % str(x))
2573n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2574n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
2575n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2576n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
2578n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2579n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2580n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2581n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2583n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2584n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2585n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2586n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2588n/a # Server with specific SSL options
2589n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2590n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2591n/a server_options=ssl.OP_NO_SSLv3)
2592n/a # Will choose TLSv1
2593n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2594n/a server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2595n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2596n/a server_options=ssl.OP_NO_TLSv1)
2599n/a @skip_if_broken_ubuntu_ssl
2600n/a @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2601n/a "OpenSSL is compiled without SSLv3 support")
2602n/a def test_protocol_sslv3(self):
2603n/a """Connecting to an SSLv3 server with various client options"""
2604n/a if support.verbose:
2605n/a sys.stdout.write("\n")
2606n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2607n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2608n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2609n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
2610n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
2611n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2612n/a client_options=ssl.OP_NO_SSLv3)
2613n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2614n/a if no_sslv2_implies_sslv3_hello():
2615n/a # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2616n/a try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2617n/a False, client_options=ssl.OP_NO_SSLv2)
2619n/a @skip_if_broken_ubuntu_ssl
2620n/a def test_protocol_tlsv1(self):
2621n/a """Connecting to a TLSv1 server with various client options"""
2622n/a if support.verbose:
2623n/a sys.stdout.write("\n")
2624n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2625n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2626n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2627n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
2628n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2629n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2630n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
2631n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2632n/a client_options=ssl.OP_NO_TLSv1)
2634n/a @skip_if_broken_ubuntu_ssl
2635n/a @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2636n/a "TLS version 1.1 not supported.")
2637n/a def test_protocol_tlsv1_1(self):
2638n/a """Connecting to a TLSv1.1 server with various client options.
2639n/a Testing against older TLS versions."""
2640n/a if support.verbose:
2641n/a sys.stdout.write("\n")
2642n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2643n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
2644n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2645n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2646n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2647n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2648n/a client_options=ssl.OP_NO_TLSv1_1)
2650n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2651n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2652n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2655n/a @skip_if_broken_ubuntu_ssl
2656n/a @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2657n/a "TLS version 1.2 not supported.")
2658n/a def test_protocol_tlsv1_2(self):
2659n/a """Connecting to a TLSv1.2 server with various client options.
2660n/a Testing against older TLS versions."""
2661n/a if support.verbose:
2662n/a sys.stdout.write("\n")
2663n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2664n/a server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2665n/a client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2666n/a if hasattr(ssl, 'PROTOCOL_SSLv2'):
2667n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2668n/a if hasattr(ssl, 'PROTOCOL_SSLv3'):
2669n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2670n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2671n/a client_options=ssl.OP_NO_TLSv1_2)
2673n/a try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
2674n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2675n/a try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2676n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2677n/a try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2679n/a def test_starttls(self):
2680n/a """Switching from clear text to encrypted and back again."""
2681n/a msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2683n/a server = ThreadedEchoServer(CERTFILE,
2684n/a ssl_version=ssl.PROTOCOL_TLSv1,
2685n/a starttls_server=True,
2686n/a chatty=True,
2687n/a connectionchatty=True)
2688n/a wrapped = False
2689n/a with server:
2690n/a s = socket.socket()
2691n/a s.setblocking(1)
2692n/a s.connect((HOST, server.port))
2693n/a if support.verbose:
2694n/a sys.stdout.write("\n")
2695n/a for indata in msgs:
2696n/a if support.verbose:
2697n/a sys.stdout.write(
2698n/a " client: sending %r...\n" % indata)
2699n/a if wrapped:
2700n/a conn.write(indata)
2701n/a outdata = conn.read()
2702n/a else:
2703n/a s.send(indata)
2704n/a outdata = s.recv(1024)
2705n/a msg = outdata.strip().lower()
2706n/a if indata == b"STARTTLS" and msg.startswith(b"ok"):
2707n/a # STARTTLS ok, switch to secure mode
2708n/a if support.verbose:
2709n/a sys.stdout.write(
2710n/a " client: read %r from server, starting TLS...\n"
2711n/a % msg)
2712n/a conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2713n/a wrapped = True
2714n/a elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2715n/a # ENDTLS ok, switch back to clear text
2716n/a if support.verbose:
2717n/a sys.stdout.write(
2718n/a " client: read %r from server, ending TLS...\n"
2719n/a % msg)
2720n/a s = conn.unwrap()
2721n/a wrapped = False
2722n/a else:
2723n/a if support.verbose:
2724n/a sys.stdout.write(
2725n/a " client: read %r from server\n" % msg)
2726n/a if support.verbose:
2727n/a sys.stdout.write(" client: closing connection.\n")
2728n/a if wrapped:
2729n/a conn.write(b"over\n")
2730n/a else:
2731n/a s.send(b"over\n")
2732n/a if wrapped:
2733n/a conn.close()
2734n/a else:
2735n/a s.close()
2737n/a def test_socketserver(self):
2738n/a """Using socketserver to create and manage SSL connections."""
2739n/a server = make_https_server(self, certfile=CERTFILE)
2740n/a # try to connect
2741n/a if support.verbose:
2742n/a sys.stdout.write('\n')
2743n/a with open(CERTFILE, 'rb') as f:
2744n/a d1 = f.read()
2745n/a d2 = ''
2746n/a # now fetch the same data from the HTTPS server
2747n/a url = 'https://localhost:%d/%s' % (
2748n/a server.port, os.path.split(CERTFILE)[1])
2749n/a context = ssl.create_default_context(cafile=CERTFILE)
2750n/a f = urllib.request.urlopen(url, context=context)
2751n/a try:
2752n/a dlen = f.info().get("content-length")
2753n/a if dlen and (int(dlen) > 0):
2754n/a d2 = f.read(int(dlen))
2755n/a if support.verbose:
2756n/a sys.stdout.write(
2757n/a " client: read %d bytes from remote server '%s'\n"
2758n/a % (len(d2), server))
2759n/a finally:
2760n/a f.close()
2761n/a self.assertEqual(d1, d2)
2763n/a def test_asyncore_server(self):
2764n/a """Check the example asyncore integration."""
2765n/a if support.verbose:
2766n/a sys.stdout.write("\n")
2768n/a indata = b"FOO\n"
2769n/a server = AsyncoreEchoServer(CERTFILE)
2770n/a with server:
2771n/a s = test_wrap_socket(socket.socket())
2772n/a s.connect(('', server.port))
2773n/a if support.verbose:
2774n/a sys.stdout.write(
2775n/a " client: sending %r...\n" % indata)
2776n/a s.write(indata)
2777n/a outdata = s.read()
2778n/a if support.verbose:
2779n/a sys.stdout.write(" client: read %r\n" % outdata)
2780n/a if outdata != indata.lower():
2781n/a self.fail(
2782n/a "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2783n/a % (outdata[:20], len(outdata),
2784n/a indata[:20].lower(), len(indata)))
2785n/a s.write(b"over\n")
2786n/a if support.verbose:
2787n/a sys.stdout.write(" client: closing connection.\n")
2788n/a s.close()
2789n/a if support.verbose:
2790n/a sys.stdout.write(" client: connection closed.\n")
2792n/a def test_recv_send(self):
2793n/a """Test recv(), send() and friends."""
2794n/a if support.verbose:
2795n/a sys.stdout.write("\n")
2797n/a server = ThreadedEchoServer(CERTFILE,
2798n/a certreqs=ssl.CERT_NONE,
2799n/a ssl_version=ssl.PROTOCOL_TLSv1,
2800n/a cacerts=CERTFILE,
2801n/a chatty=True,
2802n/a connectionchatty=False)
2803n/a with server:
2804n/a s = test_wrap_socket(socket.socket(),
2805n/a server_side=False,
2806n/a certfile=CERTFILE,
2807n/a ca_certs=CERTFILE,
2808n/a cert_reqs=ssl.CERT_NONE,
2809n/a ssl_version=ssl.PROTOCOL_TLSv1)
2810n/a s.connect((HOST, server.port))
2811n/a # helper methods for standardising recv* method signatures
2812n/a def _recv_into():
2813n/a b = bytearray(b"\0"*100)
2814n/a count = s.recv_into(b)
2815n/a return b[:count]
2817n/a def _recvfrom_into():
2818n/a b = bytearray(b"\0"*100)
2819n/a count, addr = s.recvfrom_into(b)
2820n/a return b[:count]
2822n/a # (name, method, expect success?, *args, return value func)
2823n/a send_methods = [
2824n/a ('send', s.send, True, [], len),
2825n/a ('sendto', s.sendto, False, ["some.address"], len),
2826n/a ('sendall', s.sendall, True, [], lambda x: None),
2827n/a ]
2828n/a # (name, method, whether to expect success, *args)
2829n/a recv_methods = [
2830n/a ('recv', s.recv, True, []),
2831n/a ('recvfrom', s.recvfrom, False, ["some.address"]),
2832n/a ('recv_into', _recv_into, True, []),
2833n/a ('recvfrom_into', _recvfrom_into, False, []),
2834n/a ]
2835n/a data_prefix = "PREFIX_"
2837n/a for (meth_name, send_meth, expect_success, args,
2838n/a ret_val_meth) in send_methods:
2839n/a indata = (data_prefix + meth_name).encode('ascii')
2840n/a try:
2841n/a ret = send_meth(indata, *args)
2842n/a msg = "sending with {}".format(meth_name)
2843n/a self.assertEqual(ret, ret_val_meth(indata), msg=msg)
2844n/a outdata = s.read()
2845n/a if outdata != indata.lower():
2846n/a self.fail(
2847n/a "While sending with <<{name:s}>> bad data "
2848n/a "<<{outdata:r}>> ({nout:d}) received; "
2849n/a "expected <<{indata:r}>> ({nin:d})\n".format(
2850n/a name=meth_name, outdata=outdata[:20],
2851n/a nout=len(outdata),
2852n/a indata=indata[:20], nin=len(indata)
2853n/a )
2854n/a )
2855n/a except ValueError as e:
2856n/a if expect_success:
2857n/a self.fail(
2858n/a "Failed to send with method <<{name:s}>>; "
2859n/a "expected to succeed.\n".format(name=meth_name)
2860n/a )
2861n/a if not str(e).startswith(meth_name):
2862n/a self.fail(
2863n/a "Method <<{name:s}>> failed with unexpected "
2864n/a "exception message: {exp:s}\n".format(
2865n/a name=meth_name, exp=e
2866n/a )
2867n/a )
2869n/a for meth_name, recv_meth, expect_success, args in recv_methods:
2870n/a indata = (data_prefix + meth_name).encode('ascii')
2871n/a try:
2872n/a s.send(indata)
2873n/a outdata = recv_meth(*args)
2874n/a if outdata != indata.lower():
2875n/a self.fail(
2876n/a "While receiving with <<{name:s}>> bad data "
2877n/a "<<{outdata:r}>> ({nout:d}) received; "
2878n/a "expected <<{indata:r}>> ({nin:d})\n".format(
2879n/a name=meth_name, outdata=outdata[:20],
2880n/a nout=len(outdata),
2881n/a indata=indata[:20], nin=len(indata)
2882n/a )
2883n/a )
2884n/a except ValueError as e:
2885n/a if expect_success:
2886n/a self.fail(
2887n/a "Failed to receive with method <<{name:s}>>; "
2888n/a "expected to succeed.\n".format(name=meth_name)
2889n/a )
2890n/a if not str(e).startswith(meth_name):
2891n/a self.fail(
2892n/a "Method <<{name:s}>> failed with unexpected "
2893n/a "exception message: {exp:s}\n".format(
2894n/a name=meth_name, exp=e
2895n/a )
2896n/a )
2897n/a # consume data
2898n/a s.read()
2900n/a # read(-1, buffer) is supported, even though read(-1) is not
2901n/a data = b"data"
2902n/a s.send(data)
2903n/a buffer = bytearray(len(data))
2904n/a self.assertEqual(s.read(-1, buffer), len(data))
2905n/a self.assertEqual(buffer, data)
2907n/a # Make sure sendmsg et al are disallowed to avoid
2908n/a # inadvertent disclosure of data and/or corruption
2909n/a # of the encrypted data stream
2910n/a self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2911n/a self.assertRaises(NotImplementedError, s.recvmsg, 100)
2912n/a self.assertRaises(NotImplementedError,
2913n/a s.recvmsg_into, bytearray(100))
2915n/a s.write(b"over\n")
2917n/a self.assertRaises(ValueError, s.recv, -1)
2918n/a self.assertRaises(ValueError, s.read, -1)
2920n/a s.close()
2922n/a def test_recv_zero(self):
2923n/a server = ThreadedEchoServer(CERTFILE)
2924n/a server.__enter__()
2925n/a self.addCleanup(server.__exit__, None, None)
2926n/a s = socket.create_connection((HOST, server.port))
2927n/a self.addCleanup(s.close)
2928n/a s = test_wrap_socket(s, suppress_ragged_eofs=False)
2929n/a self.addCleanup(s.close)
2931n/a # recv/read(0) should return no data
2932n/a s.send(b"data")
2933n/a self.assertEqual(s.recv(0), b"")
2934n/a self.assertEqual(s.read(0), b"")
2935n/a self.assertEqual(s.read(), b"data")
2937n/a # Should not block if the other end sends no data
2938n/a s.setblocking(False)
2939n/a self.assertEqual(s.recv(0), b"")
2940n/a self.assertEqual(s.recv_into(bytearray()), 0)
2942n/a def test_nonblocking_send(self):
2943n/a server = ThreadedEchoServer(CERTFILE,
2944n/a certreqs=ssl.CERT_NONE,
2945n/a ssl_version=ssl.PROTOCOL_TLSv1,
2946n/a cacerts=CERTFILE,
2947n/a chatty=True,
2948n/a connectionchatty=False)
2949n/a with server:
2950n/a s = test_wrap_socket(socket.socket(),
2951n/a server_side=False,
2952n/a certfile=CERTFILE,
2953n/a ca_certs=CERTFILE,
2954n/a cert_reqs=ssl.CERT_NONE,
2955n/a ssl_version=ssl.PROTOCOL_TLSv1)
2956n/a s.connect((HOST, server.port))
2957n/a s.setblocking(False)
2959n/a # If we keep sending data, at some point the buffers
2960n/a # will be full and the call will block
2961n/a buf = bytearray(8192)
2962n/a def fill_buffer():
2963n/a while True:
2964n/a s.send(buf)
2965n/a self.assertRaises((ssl.SSLWantWriteError,
2966n/a ssl.SSLWantReadError), fill_buffer)
2968n/a # Now read all the output and discard it
2969n/a s.setblocking(True)
2970n/a s.close()
2972n/a def test_handshake_timeout(self):
2973n/a # Issue #5103: SSL handshake must respect the socket timeout
2974n/a server = socket.socket(socket.AF_INET)
2975n/a host = ""
2976n/a port = support.bind_port(server)
2977n/a started = threading.Event()
2978n/a finish = False
2980n/a def serve():
2981n/a server.listen()
2982n/a started.set()
2983n/a conns = []
2984n/a while not finish:
2985n/a r, w, e = select.select([server], [], [], 0.1)
2986n/a if server in r:
2987n/a # Let the socket hang around rather than having
2988n/a # it closed by garbage collection.
2989n/a conns.append(server.accept()[0])
2990n/a for sock in conns:
2991n/a sock.close()
2993n/a t = threading.Thread(target=serve)
2994n/a t.start()
2995n/a started.wait()
2997n/a try:
2998n/a try:
2999n/a c = socket.socket(socket.AF_INET)
3000n/a c.settimeout(0.2)
3001n/a c.connect((host, port))
3002n/a # Will attempt handshake and time out
3003n/a self.assertRaisesRegex(socket.timeout, "timed out",
3004n/a test_wrap_socket, c)
3005n/a finally:
3006n/a c.close()
3007n/a try:
3008n/a c = socket.socket(socket.AF_INET)
3009n/a c = test_wrap_socket(c)
3010n/a c.settimeout(0.2)
3011n/a # Will attempt handshake and time out
3012n/a self.assertRaisesRegex(socket.timeout, "timed out",
3013n/a c.connect, (host, port))
3014n/a finally:
3015n/a c.close()
3016n/a finally:
3017n/a finish = True
3018n/a t.join()
3019n/a server.close()
3021n/a def test_server_accept(self):
3022n/a # Issue #16357: accept() on a SSLSocket created through
3023n/a # SSLContext.wrap_socket().
3024n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3025n/a context.verify_mode = ssl.CERT_REQUIRED
3026n/a context.load_verify_locations(CERTFILE)
3027n/a context.load_cert_chain(CERTFILE)
3028n/a server = socket.socket(socket.AF_INET)
3029n/a host = ""
3030n/a port = support.bind_port(server)
3031n/a server = context.wrap_socket(server, server_side=True)
3032n/a self.assertTrue(server.server_side)
3034n/a evt = threading.Event()
3035n/a remote = None
3036n/a peer = None
3037n/a def serve():
3038n/a nonlocal remote, peer
3039n/a server.listen()
3040n/a # Block on the accept and wait on the connection to close.
3041n/a evt.set()
3042n/a remote, peer = server.accept()
3043n/a remote.recv(1)
3045n/a t = threading.Thread(target=serve)
3046n/a t.start()
3047n/a # Client wait until server setup and perform a connect.
3048n/a evt.wait()
3049n/a client = context.wrap_socket(socket.socket())
3050n/a client.connect((host, port))
3051n/a client_addr = client.getsockname()
3052n/a client.close()
3053n/a t.join()
3054n/a remote.close()
3055n/a server.close()
3056n/a # Sanity checks.
3057n/a self.assertIsInstance(remote, ssl.SSLSocket)
3058n/a self.assertEqual(peer, client_addr)
3060n/a def test_getpeercert_enotconn(self):
3061n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3062n/a with context.wrap_socket(socket.socket()) as sock:
3063n/a with self.assertRaises(OSError) as cm:
3064n/a sock.getpeercert()
3065n/a self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3067n/a def test_do_handshake_enotconn(self):
3068n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3069n/a with context.wrap_socket(socket.socket()) as sock:
3070n/a with self.assertRaises(OSError) as cm:
3071n/a sock.do_handshake()
3072n/a self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3074n/a def test_default_ciphers(self):
3075n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3076n/a try:
3077n/a # Force a set of weak ciphers on our client context
3078n/a context.set_ciphers("DES")
3079n/a except ssl.SSLError:
3080n/a self.skipTest("no DES cipher available")
3081n/a with ThreadedEchoServer(CERTFILE,
3082n/a ssl_version=ssl.PROTOCOL_SSLv23,
3083n/a chatty=False) as server:
3084n/a with context.wrap_socket(socket.socket()) as s:
3085n/a with self.assertRaises(OSError):
3086n/a s.connect((HOST, server.port))
3087n/a self.assertIn("no shared cipher", str(server.conn_errors[0]))
3089n/a def test_version_basic(self):
3090n/a """
3091n/a Basic tests for SSLSocket.version().
3092n/a More tests are done in the test_protocol_*() methods.
3093n/a """
3094n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3095n/a with ThreadedEchoServer(CERTFILE,
3096n/a ssl_version=ssl.PROTOCOL_TLSv1,
3097n/a chatty=False) as server:
3098n/a with context.wrap_socket(socket.socket()) as s:
3099n/a self.assertIs(s.version(), None)
3100n/a s.connect((HOST, server.port))
3101n/a self.assertEqual(s.version(), 'TLSv1')
3102n/a self.assertIs(s.version(), None)
3104n/a @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3105n/a def test_default_ecdh_curve(self):
3106n/a # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3107n/a # should be enabled by default on SSL contexts.
3108n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3109n/a context.load_cert_chain(CERTFILE)
3110n/a # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3111n/a # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3112n/a # our default cipher list should prefer ECDH-based ciphers
3113n/a # automatically.
3114n/a if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3115n/a context.set_ciphers("ECCdraft:ECDH")
3116n/a with ThreadedEchoServer(context=context) as server:
3117n/a with context.wrap_socket(socket.socket()) as s:
3118n/a s.connect((HOST, server.port))
3119n/a self.assertIn("ECDH", s.cipher()[0])
3121n/a @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3122n/a "'tls-unique' channel binding not available")
3123n/a def test_tls_unique_channel_binding(self):
3124n/a """Test tls-unique channel binding."""
3125n/a if support.verbose:
3126n/a sys.stdout.write("\n")
3128n/a server = ThreadedEchoServer(CERTFILE,
3129n/a certreqs=ssl.CERT_NONE,
3130n/a ssl_version=ssl.PROTOCOL_TLSv1,
3131n/a cacerts=CERTFILE,
3132n/a chatty=True,
3133n/a connectionchatty=False)
3134n/a with server:
3135n/a s = test_wrap_socket(socket.socket(),
3136n/a server_side=False,
3137n/a certfile=CERTFILE,
3138n/a ca_certs=CERTFILE,
3139n/a cert_reqs=ssl.CERT_NONE,
3140n/a ssl_version=ssl.PROTOCOL_TLSv1)
3141n/a s.connect((HOST, server.port))
3142n/a # get the data
3143n/a cb_data = s.get_channel_binding("tls-unique")
3144n/a if support.verbose:
3145n/a sys.stdout.write(" got channel binding data: {0!r}\n"
3146n/a .format(cb_data))
3148n/a # check if it is sane
3149n/a self.assertIsNotNone(cb_data)
3150n/a self.assertEqual(len(cb_data), 12) # True for TLSv1
3152n/a # and compare with the peers version
3153n/a s.write(b"CB tls-unique\n")
3154n/a peer_data_repr = s.read().strip()
3155n/a self.assertEqual(peer_data_repr,
3156n/a repr(cb_data).encode("us-ascii"))
3157n/a s.close()
3159n/a # now, again
3160n/a s = test_wrap_socket(socket.socket(),
3161n/a server_side=False,
3162n/a certfile=CERTFILE,
3163n/a ca_certs=CERTFILE,
3164n/a cert_reqs=ssl.CERT_NONE,
3165n/a ssl_version=ssl.PROTOCOL_TLSv1)
3166n/a s.connect((HOST, server.port))
3167n/a new_cb_data = s.get_channel_binding("tls-unique")
3168n/a if support.verbose:
3169n/a sys.stdout.write(" got another channel binding data: {0!r}\n"
3170n/a .format(new_cb_data))
3171n/a # is it really unique
3172n/a self.assertNotEqual(cb_data, new_cb_data)
3173n/a self.assertIsNotNone(cb_data)
3174n/a self.assertEqual(len(cb_data), 12) # True for TLSv1
3175n/a s.write(b"CB tls-unique\n")
3176n/a peer_data_repr = s.read().strip()
3177n/a self.assertEqual(peer_data_repr,
3178n/a repr(new_cb_data).encode("us-ascii"))
3179n/a s.close()
3181n/a def test_compression(self):
3182n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3183n/a context.load_cert_chain(CERTFILE)
3184n/a stats = server_params_test(context, context,
3185n/a chatty=True, connectionchatty=True)
3186n/a if support.verbose:
3187n/a sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3188n/a self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3190n/a @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3191n/a "ssl.OP_NO_COMPRESSION needed for this test")
3192n/a def test_compression_disabled(self):
3193n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3194n/a context.load_cert_chain(CERTFILE)
3195n/a context.options |= ssl.OP_NO_COMPRESSION
3196n/a stats = server_params_test(context, context,
3197n/a chatty=True, connectionchatty=True)
3198n/a self.assertIs(stats['compression'], None)
3200n/a def test_dh_params(self):
3201n/a # Check we can get a connection with ephemeral Diffie-Hellman
3202n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3203n/a context.load_cert_chain(CERTFILE)
3204n/a context.load_dh_params(DHFILE)
3205n/a context.set_ciphers("kEDH")
3206n/a stats = server_params_test(context, context,
3207n/a chatty=True, connectionchatty=True)
3208n/a cipher = stats["cipher"][0]
3209n/a parts = cipher.split("-")
3210n/a if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3211n/a self.fail("Non-DH cipher: " + cipher[0])
3213n/a def test_selected_alpn_protocol(self):
3214n/a # selected_alpn_protocol() is None unless ALPN is used.
3215n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3216n/a context.load_cert_chain(CERTFILE)
3217n/a stats = server_params_test(context, context,
3218n/a chatty=True, connectionchatty=True)
3219n/a self.assertIs(stats['client_alpn_protocol'], None)
3221n/a @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3222n/a def test_selected_alpn_protocol_if_server_uses_alpn(self):
3223n/a # selected_alpn_protocol() is None unless ALPN is used by the client.
3224n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3225n/a client_context.load_verify_locations(CERTFILE)
3226n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3227n/a server_context.load_cert_chain(CERTFILE)
3228n/a server_context.set_alpn_protocols(['foo', 'bar'])
3229n/a stats = server_params_test(client_context, server_context,
3230n/a chatty=True, connectionchatty=True)
3231n/a self.assertIs(stats['client_alpn_protocol'], None)
3233n/a @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3234n/a def test_alpn_protocols(self):
3235n/a server_protocols = ['foo', 'bar', 'milkshake']
3236n/a protocol_tests = [
3237n/a (['foo', 'bar'], 'foo'),
3238n/a (['bar', 'foo'], 'foo'),
3239n/a (['milkshake'], 'milkshake'),
3240n/a (['http/3.0', 'http/4.0'], None)
3241n/a ]
3242n/a for client_protocols, expected in protocol_tests:
3243n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
3244n/a server_context.load_cert_chain(CERTFILE)
3245n/a server_context.set_alpn_protocols(server_protocols)
3246n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
3247n/a client_context.load_cert_chain(CERTFILE)
3248n/a client_context.set_alpn_protocols(client_protocols)
3250n/a try:
3251n/a stats = server_params_test(client_context,
3252n/a server_context,
3253n/a chatty=True,
3254n/a connectionchatty=True)
3255n/a except ssl.SSLError as e:
3256n/a stats = e
3258n/a if expected is None and IS_OPENSSL_1_1:
3259n/a # OpenSSL 1.1.0 raises handshake error
3260n/a self.assertIsInstance(stats, ssl.SSLError)
3261n/a else:
3262n/a msg = "failed trying %s (s) and %s (c).\n" \
3263n/a "was expecting %s, but got %%s from the %%s" \
3264n/a % (str(server_protocols), str(client_protocols),
3265n/a str(expected))
3266n/a client_result = stats['client_alpn_protocol']
3267n/a self.assertEqual(client_result, expected,
3268n/a msg % (client_result, "client"))
3269n/a server_result = stats['server_alpn_protocols'][-1] \
3270n/a if len(stats['server_alpn_protocols']) else 'nothing'
3271n/a self.assertEqual(server_result, expected,
3272n/a msg % (server_result, "server"))
3274n/a def test_selected_npn_protocol(self):
3275n/a # selected_npn_protocol() is None unless NPN is used
3276n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3277n/a context.load_cert_chain(CERTFILE)
3278n/a stats = server_params_test(context, context,
3279n/a chatty=True, connectionchatty=True)
3280n/a self.assertIs(stats['client_npn_protocol'], None)
3282n/a @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3283n/a def test_npn_protocols(self):
3284n/a server_protocols = ['http/1.1', 'spdy/2']
3285n/a protocol_tests = [
3286n/a (['http/1.1', 'spdy/2'], 'http/1.1'),
3287n/a (['spdy/2', 'http/1.1'], 'http/1.1'),
3288n/a (['spdy/2', 'test'], 'spdy/2'),
3289n/a (['abc', 'def'], 'abc')
3290n/a ]
3291n/a for client_protocols, expected in protocol_tests:
3292n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3293n/a server_context.load_cert_chain(CERTFILE)
3294n/a server_context.set_npn_protocols(server_protocols)
3295n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3296n/a client_context.load_cert_chain(CERTFILE)
3297n/a client_context.set_npn_protocols(client_protocols)
3298n/a stats = server_params_test(client_context, server_context,
3299n/a chatty=True, connectionchatty=True)
3301n/a msg = "failed trying %s (s) and %s (c).\n" \
3302n/a "was expecting %s, but got %%s from the %%s" \
3303n/a % (str(server_protocols), str(client_protocols),
3304n/a str(expected))
3305n/a client_result = stats['client_npn_protocol']
3306n/a self.assertEqual(client_result, expected, msg % (client_result, "client"))
3307n/a server_result = stats['server_npn_protocols'][-1] \
3308n/a if len(stats['server_npn_protocols']) else 'nothing'
3309n/a self.assertEqual(server_result, expected, msg % (server_result, "server"))
3311n/a def sni_contexts(self):
3312n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3313n/a server_context.load_cert_chain(SIGNED_CERTFILE)
3314n/a other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3315n/a other_context.load_cert_chain(SIGNED_CERTFILE2)
3316n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3317n/a client_context.verify_mode = ssl.CERT_REQUIRED
3318n/a client_context.load_verify_locations(SIGNING_CA)
3319n/a return server_context, other_context, client_context
3321n/a def check_common_name(self, stats, name):
3322n/a cert = stats['peercert']
3323n/a self.assertIn((('commonName', name),), cert['subject'])
3325n/a @needs_sni
3326n/a def test_sni_callback(self):
3327n/a calls = []
3328n/a server_context, other_context, client_context = self.sni_contexts()
3330n/a def servername_cb(ssl_sock, server_name, initial_context):
3331n/a calls.append((server_name, initial_context))
3332n/a if server_name is not None:
3333n/a ssl_sock.context = other_context
3334n/a server_context.set_servername_callback(servername_cb)
3336n/a stats = server_params_test(client_context, server_context,
3337n/a chatty=True,
3338n/a sni_name='supermessage')
3339n/a # The hostname was fetched properly, and the certificate was
3340n/a # changed for the connection.
3341n/a self.assertEqual(calls, [("supermessage", server_context)])
3342n/a # CERTFILE4 was selected
3343n/a self.check_common_name(stats, 'fakehostname')
3345n/a calls = []
3346n/a # The callback is called with server_name=None
3347n/a stats = server_params_test(client_context, server_context,
3348n/a chatty=True,
3349n/a sni_name=None)
3350n/a self.assertEqual(calls, [(None, server_context)])
3351n/a self.check_common_name(stats, 'localhost')
3353n/a # Check disabling the callback
3354n/a calls = []
3355n/a server_context.set_servername_callback(None)
3357n/a stats = server_params_test(client_context, server_context,
3358n/a chatty=True,
3359n/a sni_name='notfunny')
3360n/a # Certificate didn't change
3361n/a self.check_common_name(stats, 'localhost')
3362n/a self.assertEqual(calls, [])
3364n/a @needs_sni
3365n/a def test_sni_callback_alert(self):
3366n/a # Returning a TLS alert is reflected to the connecting client
3367n/a server_context, other_context, client_context = self.sni_contexts()
3369n/a def cb_returning_alert(ssl_sock, server_name, initial_context):
3371n/a server_context.set_servername_callback(cb_returning_alert)
3373n/a with self.assertRaises(ssl.SSLError) as cm:
3374n/a stats = server_params_test(client_context, server_context,
3375n/a chatty=False,
3376n/a sni_name='supermessage')
3377n/a self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3379n/a @needs_sni
3380n/a def test_sni_callback_raising(self):
3381n/a # Raising fails the connection with a TLS handshake failure alert.
3382n/a server_context, other_context, client_context = self.sni_contexts()
3384n/a def cb_raising(ssl_sock, server_name, initial_context):
3385n/a 1/0
3386n/a server_context.set_servername_callback(cb_raising)
3388n/a with self.assertRaises(ssl.SSLError) as cm, \
3389n/a support.captured_stderr() as stderr:
3390n/a stats = server_params_test(client_context, server_context,
3391n/a chatty=False,
3392n/a sni_name='supermessage')
3393n/a self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3394n/a self.assertIn("ZeroDivisionError", stderr.getvalue())
3396n/a @needs_sni
3397n/a def test_sni_callback_wrong_return_type(self):
3398n/a # Returning the wrong return type terminates the TLS connection
3399n/a # with an internal error alert.
3400n/a server_context, other_context, client_context = self.sni_contexts()
3402n/a def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3403n/a return "foo"
3404n/a server_context.set_servername_callback(cb_wrong_return_type)
3406n/a with self.assertRaises(ssl.SSLError) as cm, \
3407n/a support.captured_stderr() as stderr:
3408n/a stats = server_params_test(client_context, server_context,
3409n/a chatty=False,
3410n/a sni_name='supermessage')
3411n/a self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3412n/a self.assertIn("TypeError", stderr.getvalue())
3414n/a def test_shared_ciphers(self):
3415n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3416n/a server_context.load_cert_chain(SIGNED_CERTFILE)
3417n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3418n/a client_context.verify_mode = ssl.CERT_REQUIRED
3419n/a client_context.load_verify_locations(SIGNING_CA)
3420n/a if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3421n/a client_context.set_ciphers("AES128:AES256")
3422n/a server_context.set_ciphers("AES256")
3423n/a alg1 = "AES256"
3424n/a alg2 = "AES-256"
3425n/a else:
3426n/a client_context.set_ciphers("AES:3DES")
3427n/a server_context.set_ciphers("3DES")
3428n/a alg1 = "3DES"
3429n/a alg2 = "DES-CBC3"
3431n/a stats = server_params_test(client_context, server_context)
3432n/a ciphers = stats['server_shared_ciphers'][0]
3433n/a self.assertGreater(len(ciphers), 0)
3434n/a for name, tls_version, bits in ciphers:
3435n/a if not alg1 in name.split("-") and alg2 not in name:
3436n/a self.fail(name)
3438n/a def test_read_write_after_close_raises_valuerror(self):
3439n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3440n/a context.verify_mode = ssl.CERT_REQUIRED
3441n/a context.load_verify_locations(CERTFILE)
3442n/a context.load_cert_chain(CERTFILE)
3443n/a server = ThreadedEchoServer(context=context, chatty=False)
3445n/a with server:
3446n/a s = context.wrap_socket(socket.socket())
3447n/a s.connect((HOST, server.port))
3448n/a s.close()
3450n/a self.assertRaises(ValueError, s.read, 1024)
3451n/a self.assertRaises(ValueError, s.write, b'hello')
3453n/a def test_sendfile(self):
3454n/a TEST_DATA = b"x" * 512
3455n/a with open(support.TESTFN, 'wb') as f:
3456n/a f.write(TEST_DATA)
3457n/a self.addCleanup(support.unlink, support.TESTFN)
3458n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3459n/a context.verify_mode = ssl.CERT_REQUIRED
3460n/a context.load_verify_locations(CERTFILE)
3461n/a context.load_cert_chain(CERTFILE)
3462n/a server = ThreadedEchoServer(context=context, chatty=False)
3463n/a with server:
3464n/a with context.wrap_socket(socket.socket()) as s:
3465n/a s.connect((HOST, server.port))
3466n/a with open(support.TESTFN, 'rb') as file:
3467n/a s.sendfile(file)
3468n/a self.assertEqual(s.recv(1024), TEST_DATA)
3470n/a def test_session(self):
3471n/a server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3472n/a server_context.load_cert_chain(SIGNED_CERTFILE)
3473n/a client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3474n/a client_context.verify_mode = ssl.CERT_REQUIRED
3475n/a client_context.load_verify_locations(SIGNING_CA)
3477n/a # first connection without session
3478n/a stats = server_params_test(client_context, server_context)
3479n/a session = stats['session']
3480n/a self.assertTrue(session.id)
3481n/a self.assertGreater(session.time, 0)
3482n/a self.assertGreater(session.timeout, 0)
3483n/a self.assertTrue(session.has_ticket)
3484n/a if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3485n/a self.assertGreater(session.ticket_lifetime_hint, 0)
3486n/a self.assertFalse(stats['session_reused'])
3487n/a sess_stat = server_context.session_stats()
3488n/a self.assertEqual(sess_stat['accept'], 1)
3489n/a self.assertEqual(sess_stat['hits'], 0)
3491n/a # reuse session
3492n/a stats = server_params_test(client_context, server_context, session=session)
3493n/a sess_stat = server_context.session_stats()
3494n/a self.assertEqual(sess_stat['accept'], 2)
3495n/a self.assertEqual(sess_stat['hits'], 1)
3496n/a self.assertTrue(stats['session_reused'])
3497n/a session2 = stats['session']
3498n/a self.assertEqual(session2.id, session.id)
3499n/a self.assertEqual(session2, session)
3500n/a self.assertIsNot(session2, session)
3501n/a self.assertGreaterEqual(session2.time, session.time)
3502n/a self.assertGreaterEqual(session2.timeout, session.timeout)
3504n/a # another one without session
3505n/a stats = server_params_test(client_context, server_context)
3506n/a self.assertFalse(stats['session_reused'])
3507n/a session3 = stats['session']
3508n/a self.assertNotEqual(session3.id, session.id)
3509n/a self.assertNotEqual(session3, session)
3510n/a sess_stat = server_context.session_stats()
3511n/a self.assertEqual(sess_stat['accept'], 3)
3512n/a self.assertEqual(sess_stat['hits'], 1)
3514n/a # reuse session again
3515n/a stats = server_params_test(client_context, server_context, session=session)
3516n/a self.assertTrue(stats['session_reused'])
3517n/a session4 = stats['session']
3518n/a self.assertEqual(session4.id, session.id)
3519n/a self.assertEqual(session4, session)
3520n/a self.assertGreaterEqual(session4.time, session.time)
3521n/a self.assertGreaterEqual(session4.timeout, session.timeout)
3522n/a sess_stat = server_context.session_stats()
3523n/a self.assertEqual(sess_stat['accept'], 4)
3524n/a self.assertEqual(sess_stat['hits'], 2)
3526n/a def test_session_handling(self):
3527n/a context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3528n/a context.verify_mode = ssl.CERT_REQUIRED
3529n/a context.load_verify_locations(CERTFILE)
3530n/a context.load_cert_chain(CERTFILE)
3532n/a context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3533n/a context2.verify_mode = ssl.CERT_REQUIRED
3534n/a context2.load_verify_locations(CERTFILE)
3535n/a context2.load_cert_chain(CERTFILE)
3537n/a server = ThreadedEchoServer(context=context, chatty=False)
3538n/a with server:
3539n/a with context.wrap_socket(socket.socket()) as s:
3540n/a # session is None before handshake
3541n/a self.assertEqual(s.session, None)
3542n/a self.assertEqual(s.session_reused, None)
3543n/a s.connect((HOST, server.port))
3544n/a session = s.session
3545n/a self.assertTrue(session)
3546n/a with self.assertRaises(TypeError) as e:
3547n/a s.session = object
3548n/a self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
3550n/a with context.wrap_socket(socket.socket()) as s:
3551n/a s.connect((HOST, server.port))
3552n/a # cannot set session after handshake
3553n/a with self.assertRaises(ValueError) as e:
3554n/a s.session = session
3555n/a self.assertEqual(str(e.exception),
3556n/a 'Cannot set session after handshake.')
3558n/a with context.wrap_socket(socket.socket()) as s:
3559n/a # can set session before handshake and before the
3560n/a # connection was established
3561n/a s.session = session
3562n/a s.connect((HOST, server.port))
3563n/a self.assertEqual(s.session.id, session.id)
3564n/a self.assertEqual(s.session, session)
3565n/a self.assertEqual(s.session_reused, True)
3567n/a with context2.wrap_socket(socket.socket()) as s:
3568n/a # cannot re-use session with a different SSLContext
3569n/a with self.assertRaises(ValueError) as e:
3570n/a s.session = session
3571n/a s.connect((HOST, server.port))
3572n/a self.assertEqual(str(e.exception),
3573n/a 'Session refers to a different SSLContext.')
3576n/adef test_main(verbose=False):
3577n/a if support.verbose:
3578n/a import warnings
3579n/a plats = {
3580n/a 'Linux': platform.linux_distribution,
3581n/a 'Mac': platform.mac_ver,
3582n/a 'Windows': platform.win32_ver,
3583n/a }
3584n/a with warnings.catch_warnings():
3585n/a warnings.filterwarnings(
3586n/a 'ignore',
3587n/a r'dist\(\) and linux_distribution\(\) '
3588n/a 'functions are deprecated .*',
3589n/a PendingDeprecationWarning,
3590n/a )
3591n/a for name, func in plats.items():
3592n/a plat = func()
3593n/a if plat and plat[0]:
3594n/a plat = '%s %r' % (name, plat)
3595n/a break
3596n/a else:
3597n/a plat = repr(platform.platform())
3598n/a print("test_ssl: testing with %r %r" %
3600n/a print(" under %s" % plat)
3601n/a print(" HAS_SNI = %r" % ssl.HAS_SNI)
3602n/a print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3603n/a try:
3604n/a print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3605n/a except AttributeError:
3606n/a pass
3608n/a for filename in [
3613n/a if not os.path.exists(filename):
3614n/a raise support.TestFailed("Can't read certificate file %r" % filename)
3616n/a tests = [
3617n/a ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3618n/a SimpleBackgroundTests,
3619n/a ]
3621n/a if support.is_resource_enabled('network'):
3622n/a tests.append(NetworkedTests)
3624n/a if _have_threads:
3625n/a thread_info = support.threading_setup()
3626n/a if thread_info:
3627n/a tests.append(ThreadedTests)
3629n/a try:
3630n/a support.run_unittest(*tests)
3631n/a finally:
3632n/a if _have_threads:
3633n/a support.threading_cleanup(*thread_info)
3635n/aif __name__ == "__main__":
3636n/a test_main()