ยปCore Development>Code coverage>Lib/test/ssl_servers.py

Python code coverage for Lib/test/ssl_servers.py

#countcontent
1n/aimport os
2n/aimport sys
3n/aimport ssl
4n/aimport pprint
5n/aimport socket
6n/aimport urllib.parse
7n/a# Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
8n/afrom http.server import (HTTPServer as _HTTPServer,
9n/a SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
10n/a
11n/afrom test import support
12n/athreading = support.import_module("threading")
13n/a
14n/ahere = os.path.dirname(__file__)
15n/a
16n/aHOST = support.HOST
17n/aCERTFILE = os.path.join(here, 'keycert.pem')
18n/a
19n/a# This one's based on HTTPServer, which is based on socketserver
20n/a
21n/aclass HTTPSServer(_HTTPServer):
22n/a
23n/a def __init__(self, server_address, handler_class, context):
24n/a _HTTPServer.__init__(self, server_address, handler_class)
25n/a self.context = context
26n/a
27n/a def __str__(self):
28n/a return ('<%s %s:%s>' %
29n/a (self.__class__.__name__,
30n/a self.server_name,
31n/a self.server_port))
32n/a
33n/a def get_request(self):
34n/a # override this to wrap socket with SSL
35n/a try:
36n/a sock, addr = self.socket.accept()
37n/a sslconn = self.context.wrap_socket(sock, server_side=True)
38n/a except OSError as e:
39n/a # socket errors are silenced by the caller, print them here
40n/a if support.verbose:
41n/a sys.stderr.write("Got an error:\n%s\n" % e)
42n/a raise
43n/a return sslconn, addr
44n/a
45n/aclass RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
46n/a # need to override translate_path to get a known root,
47n/a # instead of using os.curdir, since the test could be
48n/a # run from anywhere
49n/a
50n/a server_version = "TestHTTPS/1.0"
51n/a root = here
52n/a # Avoid hanging when a request gets interrupted by the client
53n/a timeout = 5
54n/a
55n/a def translate_path(self, path):
56n/a """Translate a /-separated PATH to the local filename syntax.
57n/a
58n/a Components that mean special things to the local file system
59n/a (e.g. drive or directory names) are ignored. (XXX They should
60n/a probably be diagnosed.)
61n/a
62n/a """
63n/a # abandon query parameters
64n/a path = urllib.parse.urlparse(path)[2]
65n/a path = os.path.normpath(urllib.parse.unquote(path))
66n/a words = path.split('/')
67n/a words = filter(None, words)
68n/a path = self.root
69n/a for word in words:
70n/a drive, word = os.path.splitdrive(word)
71n/a head, word = os.path.split(word)
72n/a path = os.path.join(path, word)
73n/a return path
74n/a
75n/a def log_message(self, format, *args):
76n/a # we override this to suppress logging unless "verbose"
77n/a if support.verbose:
78n/a sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
79n/a (self.server.server_address,
80n/a self.server.server_port,
81n/a self.request.cipher(),
82n/a self.log_date_time_string(),
83n/a format%args))
84n/a
85n/a
86n/aclass StatsRequestHandler(BaseHTTPRequestHandler):
87n/a """Example HTTP request handler which returns SSL statistics on GET
88n/a requests.
89n/a """
90n/a
91n/a server_version = "StatsHTTPS/1.0"
92n/a
93n/a def do_GET(self, send_body=True):
94n/a """Serve a GET request."""
95n/a sock = self.rfile.raw._sock
96n/a context = sock.context
97n/a stats = {
98n/a 'session_cache': context.session_stats(),
99n/a 'cipher': sock.cipher(),
100n/a 'compression': sock.compression(),
101n/a }
102n/a body = pprint.pformat(stats)
103n/a body = body.encode('utf-8')
104n/a self.send_response(200)
105n/a self.send_header("Content-type", "text/plain; charset=utf-8")
106n/a self.send_header("Content-Length", str(len(body)))
107n/a self.end_headers()
108n/a if send_body:
109n/a self.wfile.write(body)
110n/a
111n/a def do_HEAD(self):
112n/a """Serve a HEAD request."""
113n/a self.do_GET(send_body=False)
114n/a
115n/a def log_request(self, format, *args):
116n/a if support.verbose:
117n/a BaseHTTPRequestHandler.log_request(self, format, *args)
118n/a
119n/a
120n/aclass HTTPSServerThread(threading.Thread):
121n/a
122n/a def __init__(self, context, host=HOST, handler_class=None):
123n/a self.flag = None
124n/a self.server = HTTPSServer((host, 0),
125n/a handler_class or RootedHTTPRequestHandler,
126n/a context)
127n/a self.port = self.server.server_port
128n/a threading.Thread.__init__(self)
129n/a self.daemon = True
130n/a
131n/a def __str__(self):
132n/a return "<%s %s>" % (self.__class__.__name__, self.server)
133n/a
134n/a def start(self, flag=None):
135n/a self.flag = flag
136n/a threading.Thread.start(self)
137n/a
138n/a def run(self):
139n/a if self.flag:
140n/a self.flag.set()
141n/a try:
142n/a self.server.serve_forever(0.05)
143n/a finally:
144n/a self.server.server_close()
145n/a
146n/a def stop(self):
147n/a self.server.shutdown()
148n/a
149n/a
150n/adef make_https_server(case, *, context=None, certfile=CERTFILE,
151n/a host=HOST, handler_class=None):
152n/a if context is None:
153n/a context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
154n/a # We assume the certfile contains both private key and certificate
155n/a context.load_cert_chain(certfile)
156n/a server = HTTPSServerThread(context, host, handler_class)
157n/a flag = threading.Event()
158n/a server.start(flag)
159n/a flag.wait()
160n/a def cleanup():
161n/a if support.verbose:
162n/a sys.stdout.write('stopping HTTPS server\n')
163n/a server.stop()
164n/a if support.verbose:
165n/a sys.stdout.write('joining HTTPS thread\n')
166n/a server.join()
167n/a case.addCleanup(cleanup)
168n/a return server
169n/a
170n/a
171n/aif __name__ == "__main__":
172n/a import argparse
173n/a parser = argparse.ArgumentParser(
174n/a description='Run a test HTTPS server. '
175n/a 'By default, the current directory is served.')
176n/a parser.add_argument('-p', '--port', type=int, default=4433,
177n/a help='port to listen on (default: %(default)s)')
178n/a parser.add_argument('-q', '--quiet', dest='verbose', default=True,
179n/a action='store_false', help='be less verbose')
180n/a parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
181n/a action='store_true', help='always return stats page')
182n/a parser.add_argument('--curve-name', dest='curve_name', type=str,
183n/a action='store',
184n/a help='curve name for EC-based Diffie-Hellman')
185n/a parser.add_argument('--ciphers', dest='ciphers', type=str,
186n/a help='allowed cipher list')
187n/a parser.add_argument('--dh', dest='dh_file', type=str, action='store',
188n/a help='PEM file containing DH parameters')
189n/a args = parser.parse_args()
190n/a
191n/a support.verbose = args.verbose
192n/a if args.use_stats_handler:
193n/a handler_class = StatsRequestHandler
194n/a else:
195n/a handler_class = RootedHTTPRequestHandler
196n/a handler_class.root = os.getcwd()
197n/a context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
198n/a context.load_cert_chain(CERTFILE)
199n/a if args.curve_name:
200n/a context.set_ecdh_curve(args.curve_name)
201n/a if args.dh_file:
202n/a context.load_dh_params(args.dh_file)
203n/a if args.ciphers:
204n/a context.set_ciphers(args.ciphers)
205n/a
206n/a server = HTTPSServer(("", args.port), handler_class, context)
207n/a if args.verbose:
208n/a print("Listening on https://localhost:{0.port}".format(args))
209n/a server.serve_forever(0.1)