ยปCore Development>Code coverage>Lib/CGIHTTPServer.py

Python code coverage for Lib/CGIHTTPServer.py

#countcontent
1n/a"""CGI-savvy HTTP Server.
2n/a
3n/aThis module builds on SimpleHTTPServer by implementing GET and POST
4n/arequests to cgi-bin scripts.
5n/a
6n/aIf the os.fork() function is not present (e.g. on Windows),
7n/aos.popen2() is used as a fallback, with slightly altered semantics; if
8n/athat function is not present either (e.g. on Macintosh), only Python
9n/ascripts are supported, and they are executed by the current process.
10n/a
11n/aIn all cases, the implementation is intentionally naive -- all
12n/arequests are executed sychronously.
13n/a
14n/aSECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
15n/a-- it may execute arbitrary Python code or external programs.
16n/a
17n/aNote that status code 200 is sent prior to execution of a CGI script, so
18n/ascripts cannot send other status codes such as 302 (redirect).
191"""
20n/a
21n/a
221__version__ = "0.4"
23n/a
241__all__ = ["CGIHTTPRequestHandler"]
25n/a
261import os
271import sys
281import urllib
291import BaseHTTPServer
301import SimpleHTTPServer
311import select
32n/a
33n/a
342class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
35n/a
36n/a """Complete HTTP server with GET, HEAD and POST commands.
37n/a
38n/a GET and HEAD also support running CGI scripts.
39n/a
40n/a The POST command is *only* implemented for CGI scripts.
41n/a
421 """
43n/a
44n/a # Determine platform specifics
451 have_fork = hasattr(os, 'fork')
461 have_popen2 = hasattr(os, 'popen2')
471 have_popen3 = hasattr(os, 'popen3')
48n/a
49n/a # Make rfile unbuffered -- we need to read one line and then pass
50n/a # the rest to a subprocess, so we can't use buffered input.
511 rbufsize = 0
52n/a
531 def do_POST(self):
54n/a """Serve a POST request.
55n/a
56n/a This is only implemented for CGI scripts.
57n/a
58n/a """
59n/a
601 if self.is_cgi():
611 self.run_cgi()
62n/a else:
630 self.send_error(501, "Can only POST to CGI scripts")
64n/a
651 def send_head(self):
66n/a """Version of send_head that support CGI scripts"""
674 if self.is_cgi():
684 return self.run_cgi()
69n/a else:
700 return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
71n/a
721 def is_cgi(self):
73n/a """Test whether self.path corresponds to a CGI script.
74n/a
75n/a Returns True and updates the cgi_info attribute to the tuple
76n/a (dir, rest) if self.path requires running a CGI script.
77n/a Returns False otherwise.
78n/a
79n/a If any exception is raised, the caller should assume that
80n/a self.path was rejected as invalid and act accordingly.
81n/a
82n/a The default implementation tests whether the normalized url
83n/a path begins with one of the strings in self.cgi_directories
84n/a (and the next character is a '/' or the end of the string).
85n/a """
865 splitpath = _url_collapse_path_split(self.path)
875 if splitpath[0] in self.cgi_directories:
885 self.cgi_info = splitpath
895 return True
900 return False
91n/a
921 cgi_directories = ['/cgi-bin', '/htbin']
93n/a
941 def is_executable(self, path):
95n/a """Test whether argument path is an executable file."""
960 return executable(path)
97n/a
981 def is_python(self, path):
99n/a """Test whether argument path is a Python script."""
1004 head, tail = os.path.splitext(path)
1014 return tail.lower() in (".py", ".pyw")
102n/a
1031 def run_cgi(self):
104n/a """Execute a CGI script."""
1055 path = self.path
1065 dir, rest = self.cgi_info
107n/a
1085 i = path.find('/', len(dir) + 1)
1095 while i >= 0:
1100 nextdir = path[:i]
1110 nextrest = path[i+1:]
112n/a
1130 scriptdir = self.translate_path(nextdir)
1140 if os.path.isdir(scriptdir):
1150 dir, rest = nextdir, nextrest
1160 i = path.find('/', len(dir) + 1)
117n/a else:
1180 break
119n/a
120n/a # find an explicit query string, if present.
1215 i = rest.rfind('?')
1225 if i >= 0:
1230 rest, query = rest[:i], rest[i+1:]
124n/a else:
1255 query = ''
126n/a
127n/a # dissect the part after the directory name into a script name &
128n/a # a possible additional path, to be stored in PATH_INFO.
1295 i = rest.find('/')
1305 if i >= 0:
1310 script, rest = rest[:i], rest[i:]
132n/a else:
1335 script, rest = rest, ''
134n/a
1355 scriptname = dir + '/' + script
1365 scriptfile = self.translate_path(scriptname)
1375 if not os.path.exists(scriptfile):
1381 self.send_error(404, "No such CGI script (%r)" % scriptname)
1391 return
1404 if not os.path.isfile(scriptfile):
1410 self.send_error(403, "CGI script is not a plain file (%r)" %
1420 scriptname)
1430 return
1444 ispy = self.is_python(scriptname)
1454 if not ispy:
1460 if not (self.have_fork or self.have_popen2 or self.have_popen3):
1470 self.send_error(403, "CGI script is not a Python script (%r)" %
1480 scriptname)
1490 return
1500 if not self.is_executable(scriptfile):
1510 self.send_error(403, "CGI script is not executable (%r)" %
1520 scriptname)
1530 return
154n/a
155n/a # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
156n/a # XXX Much of the following could be prepared ahead of time!
1574 env = {}
1584 env['SERVER_SOFTWARE'] = self.version_string()
1594 env['SERVER_NAME'] = self.server.server_name
1604 env['GATEWAY_INTERFACE'] = 'CGI/1.1'
1614 env['SERVER_PROTOCOL'] = self.protocol_version
1624 env['SERVER_PORT'] = str(self.server.server_port)
1634 env['REQUEST_METHOD'] = self.command
1644 uqrest = urllib.unquote(rest)
1654 env['PATH_INFO'] = uqrest
1664 env['PATH_TRANSLATED'] = self.translate_path(uqrest)
1674 env['SCRIPT_NAME'] = scriptname
1684 if query:
1690 env['QUERY_STRING'] = query
1704 host = self.address_string()
1714 if host != self.client_address[0]:
1724 env['REMOTE_HOST'] = host
1734 env['REMOTE_ADDR'] = self.client_address[0]
1744 authorization = self.headers.getheader("authorization")
1754 if authorization:
1761 authorization = authorization.split()
1771 if len(authorization) == 2:
1781 import base64, binascii
1791 env['AUTH_TYPE'] = authorization[0]
1801 if authorization[0].lower() == "basic":
1811 try:
1821 authorization = base64.decodestring(authorization[1])
1830 except binascii.Error:
1840 pass
185n/a else:
1861 authorization = authorization.split(':')
1871 if len(authorization) == 2:
1881 env['REMOTE_USER'] = authorization[0]
189n/a # XXX REMOTE_IDENT
1904 if self.headers.typeheader is None:
1913 env['CONTENT_TYPE'] = self.headers.type
192n/a else:
1931 env['CONTENT_TYPE'] = self.headers.typeheader
1944 length = self.headers.getheader('content-length')
1954 if length:
1961 env['CONTENT_LENGTH'] = length
1974 referer = self.headers.getheader('referer')
1984 if referer:
1990 env['HTTP_REFERER'] = referer
2004 accept = []
2014 for line in self.headers.getallmatchingheaders('accept'):
2020 if line[:1] in "\t\n\r ":
2030 accept.append(line.strip())
204n/a else:
2050 accept = accept + line[7:].split(',')
2064 env['HTTP_ACCEPT'] = ','.join(accept)
2074 ua = self.headers.getheader('user-agent')
2084 if ua:
2090 env['HTTP_USER_AGENT'] = ua
2104 co = filter(None, self.headers.getheaders('cookie'))
2114 if co:
2120 env['HTTP_COOKIE'] = ', '.join(co)
213n/a # XXX Other HTTP_* headers
214n/a # Since we're setting the env in the parent, provide empty
215n/a # values to override previously set values
2164 for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
21728 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
21824 env.setdefault(k, "")
2194 os.environ.update(env)
220n/a
2214 self.send_response(200, "Script output follows")
222n/a
2234 decoded_query = query.replace('+', ' ')
224n/a
2254 if self.have_fork:
226n/a # Unix -- fork as we should
2274 args = [script]
2284 if '=' not in decoded_query:
2294 args.append(decoded_query)
2304 nobody = nobody_uid()
2314 self.wfile.flush() # Always flush before forking
2324 pid = os.fork()
2334 if pid != 0:
234n/a # Parent
2354 pid, sts = os.waitpid(pid, 0)
236n/a # throw away additional data [see bug #427345]
2374 while select.select([self.rfile], [], [], 0)[0]:
2380 if not self.rfile.read(1):
2390 break
2404 if sts:
2410 self.log_error("CGI script exit status %#x", sts)
2424 return
243n/a # Child
2440 try:
2450 try:
2460 os.setuid(nobody)
2470 except os.error:
2480 pass
2490 os.dup2(self.rfile.fileno(), 0)
2500 os.dup2(self.wfile.fileno(), 1)
2510 os.execve(scriptfile, args, os.environ)
2520 except:
2530 self.server.handle_error(self.request, self.client_address)
2540 os._exit(127)
255n/a
256n/a else:
257n/a # Non Unix - use subprocess
2580 import subprocess
2590 cmdline = [scriptfile]
2600 if self.is_python(scriptfile):
2610 interp = sys.executable
2620 if interp.lower().endswith("w.exe"):
263n/a # On Windows, use python.exe, not pythonw.exe
2640 interp = interp[:-5] + interp[-4:]
2650 cmdline = [interp, '-u'] + cmdline
2660 if '=' not in query:
2670 cmdline.append(query)
268n/a
2690 self.log_message("command: %s", subprocess.list2cmdline(cmdline))
2700 try:
2710 nbytes = int(length)
2720 except (TypeError, ValueError):
2730 nbytes = 0
2740 p = subprocess.Popen(cmdline,
2750 stdin = subprocess.PIPE,
2760 stdout = subprocess.PIPE,
2770 stderr = subprocess.PIPE
278n/a )
2790 if self.command.lower() == "post" and nbytes > 0:
2800 data = self.rfile.read(nbytes)
281n/a else:
2820 data = None
283n/a # throw away additional data [see bug #427345]
2840 while select.select([self.rfile._sock], [], [], 0)[0]:
2850 if not self.rfile._sock.recv(1):
2860 break
2870 stdout, stderr = p.communicate(data)
2880 self.wfile.write(stdout)
2890 if stderr:
2900 self.log_error('%s', stderr)
2910 status = p.returncode
2920 if status:
2930 self.log_error("CGI script exit status %#x", status)
294n/a else:
2950 self.log_message("CGI script exited OK")
296n/a
297n/a
298n/a# TODO(gregory.p.smith): Move this into an appropriate library.
2991def _url_collapse_path_split(path):
300n/a """
301n/a Given a URL path, remove extra '/'s and '.' path elements and collapse
302n/a any '..' references.
303n/a
304n/a Implements something akin to RFC-2396 5.2 step 6 to parse relative paths.
305n/a
306n/a Returns: A tuple of (head, tail) where tail is everything after the final /
307n/a and head is everything before it. Head will always start with a '/' and,
308n/a if it contains anything else, never have a trailing '/'.
309n/a
310n/a Raises: IndexError if too many '..' occur within the path.
311n/a """
312n/a # Similar to os.path.split(os.path.normpath(path)) but specific to URL
313n/a # path semantics rather than local operating system semantics.
31431 path_parts = []
315192 for part in path.split('/'):
316161 if part == '.':
31710 path_parts.append('')
318n/a else:
319151 path_parts.append(part)
320n/a # Filter out blank non trailing parts before consuming the '..'.
321161 path_parts = [part for part in path_parts[:-1] if part] + path_parts[-1:]
32231 if path_parts:
32331 tail_part = path_parts.pop()
324n/a else:
3250 tail_part = ''
32631 head_parts = []
327112 for part in path_parts:
32883 if part == '..':
32930 head_parts.pop()
330n/a else:
33153 head_parts.append(part)
33229 if tail_part and tail_part == '..':
3334 head_parts.pop()
3342 tail_part = ''
33527 return ('/' + '/'.join(head_parts), tail_part)
336n/a
337n/a
3381nobody = None
339n/a
3401def nobody_uid():
341n/a """Internal routine to get nobody's uid"""
342n/a global nobody
3434 if nobody:
3443 return nobody
3451 try:
3461 import pwd
3470 except ImportError:
3480 return -1
3491 try:
3501 nobody = pwd.getpwnam('nobody')[2]
3510 except KeyError:
3520 nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
3531 return nobody
354n/a
355n/a
3561def executable(path):
357n/a """Test for executable file."""
3580 try:
3590 st = os.stat(path)
3600 except os.error:
3610 return False
3620 return st.st_mode & 0111 != 0
363n/a
364n/a
3651def test(HandlerClass = CGIHTTPRequestHandler,
3661 ServerClass = BaseHTTPServer.HTTPServer):
3670 SimpleHTTPServer.test(HandlerClass, ServerClass)
368n/a
369n/a
3701if __name__ == '__main__':
3710 test()