»Core Development>Code coverage>Lib/test/test_httpservers.py

Python code coverage for Lib/test/test_httpservers.py

#countcontent
1n/a"""Unittests for the various HTTPServer modules.
2n/a
3n/aWritten by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4n/aJosip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5n/a"""
6n/a
7n/afrom http.server import BaseHTTPRequestHandler, HTTPServer, \
8n/a SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9n/afrom http import server, HTTPStatus
10n/a
11n/aimport os
12n/aimport sys
13n/aimport re
14n/aimport base64
15n/aimport ntpath
16n/aimport shutil
17n/aimport urllib.parse
18n/aimport html
19n/aimport http.client
20n/aimport tempfile
21n/aimport time
22n/afrom io import BytesIO
23n/a
24n/aimport unittest
25n/afrom test import support
26n/athreading = support.import_module('threading')
27n/a
28n/aclass NoLogRequestHandler:
29n/a def log_message(self, *args):
30n/a # don't write log messages to stderr
31n/a pass
32n/a
33n/a def read(self, n=None):
34n/a return ''
35n/a
36n/a
37n/aclass TestServerThread(threading.Thread):
38n/a def __init__(self, test_object, request_handler):
39n/a threading.Thread.__init__(self)
40n/a self.request_handler = request_handler
41n/a self.test_object = test_object
42n/a
43n/a def run(self):
44n/a self.server = HTTPServer(('localhost', 0), self.request_handler)
45n/a self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
46n/a self.test_object.server_started.set()
47n/a self.test_object = None
48n/a try:
49n/a self.server.serve_forever(0.05)
50n/a finally:
51n/a self.server.server_close()
52n/a
53n/a def stop(self):
54n/a self.server.shutdown()
55n/a
56n/a
57n/aclass BaseTestCase(unittest.TestCase):
58n/a def setUp(self):
59n/a self._threads = support.threading_setup()
60n/a os.environ = support.EnvironmentVarGuard()
61n/a self.server_started = threading.Event()
62n/a self.thread = TestServerThread(self, self.request_handler)
63n/a self.thread.start()
64n/a self.server_started.wait()
65n/a
66n/a def tearDown(self):
67n/a self.thread.stop()
68n/a self.thread = None
69n/a os.environ.__exit__()
70n/a support.threading_cleanup(*self._threads)
71n/a
72n/a def request(self, uri, method='GET', body=None, headers={}):
73n/a self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
74n/a self.connection.request(method, uri, body, headers)
75n/a return self.connection.getresponse()
76n/a
77n/a
78n/aclass BaseHTTPServerTestCase(BaseTestCase):
79n/a class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
80n/a protocol_version = 'HTTP/1.1'
81n/a default_request_version = 'HTTP/1.1'
82n/a
83n/a def do_TEST(self):
84n/a self.send_response(HTTPStatus.NO_CONTENT)
85n/a self.send_header('Content-Type', 'text/html')
86n/a self.send_header('Connection', 'close')
87n/a self.end_headers()
88n/a
89n/a def do_KEEP(self):
90n/a self.send_response(HTTPStatus.NO_CONTENT)
91n/a self.send_header('Content-Type', 'text/html')
92n/a self.send_header('Connection', 'keep-alive')
93n/a self.end_headers()
94n/a
95n/a def do_KEYERROR(self):
96n/a self.send_error(999)
97n/a
98n/a def do_NOTFOUND(self):
99n/a self.send_error(HTTPStatus.NOT_FOUND)
100n/a
101n/a def do_EXPLAINERROR(self):
102n/a self.send_error(999, "Short Message",
103n/a "This is a long \n explanation")
104n/a
105n/a def do_CUSTOM(self):
106n/a self.send_response(999)
107n/a self.send_header('Content-Type', 'text/html')
108n/a self.send_header('Connection', 'close')
109n/a self.end_headers()
110n/a
111n/a def do_LATINONEHEADER(self):
112n/a self.send_response(999)
113n/a self.send_header('X-Special', 'Dängerous Mind')
114n/a self.send_header('Connection', 'close')
115n/a self.end_headers()
116n/a body = self.headers['x-special-incoming'].encode('utf-8')
117n/a self.wfile.write(body)
118n/a
119n/a def do_SEND_ERROR(self):
120n/a self.send_error(int(self.path[1:]))
121n/a
122n/a def do_HEAD(self):
123n/a self.send_error(int(self.path[1:]))
124n/a
125n/a def setUp(self):
126n/a BaseTestCase.setUp(self)
127n/a self.con = http.client.HTTPConnection(self.HOST, self.PORT)
128n/a self.con.connect()
129n/a
130n/a def test_command(self):
131n/a self.con.request('GET', '/')
132n/a res = self.con.getresponse()
133n/a self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
134n/a
135n/a def test_request_line_trimming(self):
136n/a self.con._http_vsn_str = 'HTTP/1.1\n'
137n/a self.con.putrequest('XYZBOGUS', '/')
138n/a self.con.endheaders()
139n/a res = self.con.getresponse()
140n/a self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
141n/a
142n/a def test_version_bogus(self):
143n/a self.con._http_vsn_str = 'FUBAR'
144n/a self.con.putrequest('GET', '/')
145n/a self.con.endheaders()
146n/a res = self.con.getresponse()
147n/a self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
148n/a
149n/a def test_version_digits(self):
150n/a self.con._http_vsn_str = 'HTTP/9.9.9'
151n/a self.con.putrequest('GET', '/')
152n/a self.con.endheaders()
153n/a res = self.con.getresponse()
154n/a self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
155n/a
156n/a def test_version_none_get(self):
157n/a self.con._http_vsn_str = ''
158n/a self.con.putrequest('GET', '/')
159n/a self.con.endheaders()
160n/a res = self.con.getresponse()
161n/a self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
162n/a
163n/a def test_version_none(self):
164n/a # Test that a valid method is rejected when not HTTP/1.x
165n/a self.con._http_vsn_str = ''
166n/a self.con.putrequest('CUSTOM', '/')
167n/a self.con.endheaders()
168n/a res = self.con.getresponse()
169n/a self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
170n/a
171n/a def test_version_invalid(self):
172n/a self.con._http_vsn = 99
173n/a self.con._http_vsn_str = 'HTTP/9.9'
174n/a self.con.putrequest('GET', '/')
175n/a self.con.endheaders()
176n/a res = self.con.getresponse()
177n/a self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
178n/a
179n/a def test_send_blank(self):
180n/a self.con._http_vsn_str = ''
181n/a self.con.putrequest('', '')
182n/a self.con.endheaders()
183n/a res = self.con.getresponse()
184n/a self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
185n/a
186n/a def test_header_close(self):
187n/a self.con.putrequest('GET', '/')
188n/a self.con.putheader('Connection', 'close')
189n/a self.con.endheaders()
190n/a res = self.con.getresponse()
191n/a self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
192n/a
193n/a def test_header_keep_alive(self):
194n/a self.con._http_vsn_str = 'HTTP/1.1'
195n/a self.con.putrequest('GET', '/')
196n/a self.con.putheader('Connection', 'keep-alive')
197n/a self.con.endheaders()
198n/a res = self.con.getresponse()
199n/a self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
200n/a
201n/a def test_handler(self):
202n/a self.con.request('TEST', '/')
203n/a res = self.con.getresponse()
204n/a self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
205n/a
206n/a def test_return_header_keep_alive(self):
207n/a self.con.request('KEEP', '/')
208n/a res = self.con.getresponse()
209n/a self.assertEqual(res.getheader('Connection'), 'keep-alive')
210n/a self.con.request('TEST', '/')
211n/a self.addCleanup(self.con.close)
212n/a
213n/a def test_internal_key_error(self):
214n/a self.con.request('KEYERROR', '/')
215n/a res = self.con.getresponse()
216n/a self.assertEqual(res.status, 999)
217n/a
218n/a def test_return_custom_status(self):
219n/a self.con.request('CUSTOM', '/')
220n/a res = self.con.getresponse()
221n/a self.assertEqual(res.status, 999)
222n/a
223n/a def test_return_explain_error(self):
224n/a self.con.request('EXPLAINERROR', '/')
225n/a res = self.con.getresponse()
226n/a self.assertEqual(res.status, 999)
227n/a self.assertTrue(int(res.getheader('Content-Length')))
228n/a
229n/a def test_latin1_header(self):
230n/a self.con.request('LATINONEHEADER', '/', headers={
231n/a 'X-Special-Incoming': 'Ärger mit Unicode'
232n/a })
233n/a res = self.con.getresponse()
234n/a self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
235n/a self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
236n/a
237n/a def test_error_content_length(self):
238n/a # Issue #16088: standard error responses should have a content-length
239n/a self.con.request('NOTFOUND', '/')
240n/a res = self.con.getresponse()
241n/a self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
242n/a
243n/a data = res.read()
244n/a self.assertEqual(int(res.getheader('Content-Length')), len(data))
245n/a
246n/a def test_send_error(self):
247n/a allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
248n/a HTTPStatus.RESET_CONTENT)
249n/a for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
250n/a HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
251n/a HTTPStatus.SWITCHING_PROTOCOLS):
252n/a self.con.request('SEND_ERROR', '/{}'.format(code))
253n/a res = self.con.getresponse()
254n/a self.assertEqual(code, res.status)
255n/a self.assertEqual(None, res.getheader('Content-Length'))
256n/a self.assertEqual(None, res.getheader('Content-Type'))
257n/a if code not in allow_transfer_encoding_codes:
258n/a self.assertEqual(None, res.getheader('Transfer-Encoding'))
259n/a
260n/a data = res.read()
261n/a self.assertEqual(b'', data)
262n/a
263n/a def test_head_via_send_error(self):
264n/a allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
265n/a HTTPStatus.RESET_CONTENT)
266n/a for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
267n/a HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
268n/a HTTPStatus.SWITCHING_PROTOCOLS):
269n/a self.con.request('HEAD', '/{}'.format(code))
270n/a res = self.con.getresponse()
271n/a self.assertEqual(code, res.status)
272n/a if code == HTTPStatus.OK:
273n/a self.assertTrue(int(res.getheader('Content-Length')) > 0)
274n/a self.assertIn('text/html', res.getheader('Content-Type'))
275n/a else:
276n/a self.assertEqual(None, res.getheader('Content-Length'))
277n/a self.assertEqual(None, res.getheader('Content-Type'))
278n/a if code not in allow_transfer_encoding_codes:
279n/a self.assertEqual(None, res.getheader('Transfer-Encoding'))
280n/a
281n/a data = res.read()
282n/a self.assertEqual(b'', data)
283n/a
284n/a
285n/aclass RequestHandlerLoggingTestCase(BaseTestCase):
286n/a class request_handler(BaseHTTPRequestHandler):
287n/a protocol_version = 'HTTP/1.1'
288n/a default_request_version = 'HTTP/1.1'
289n/a
290n/a def do_GET(self):
291n/a self.send_response(HTTPStatus.OK)
292n/a self.end_headers()
293n/a
294n/a def do_ERROR(self):
295n/a self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
296n/a
297n/a def test_get(self):
298n/a self.con = http.client.HTTPConnection(self.HOST, self.PORT)
299n/a self.con.connect()
300n/a
301n/a with support.captured_stderr() as err:
302n/a self.con.request('GET', '/')
303n/a self.con.getresponse()
304n/a
305n/a self.assertTrue(
306n/a err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
307n/a
308n/a def test_err(self):
309n/a self.con = http.client.HTTPConnection(self.HOST, self.PORT)
310n/a self.con.connect()
311n/a
312n/a with support.captured_stderr() as err:
313n/a self.con.request('ERROR', '/')
314n/a self.con.getresponse()
315n/a
316n/a lines = err.getvalue().split('\n')
317n/a self.assertTrue(lines[0].endswith('code 404, message File not found'))
318n/a self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
319n/a
320n/a
321n/aclass SimpleHTTPServerTestCase(BaseTestCase):
322n/a class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
323n/a pass
324n/a
325n/a def setUp(self):
326n/a BaseTestCase.setUp(self)
327n/a self.cwd = os.getcwd()
328n/a basetempdir = tempfile.gettempdir()
329n/a os.chdir(basetempdir)
330n/a self.data = b'We are the knights who say Ni!'
331n/a self.tempdir = tempfile.mkdtemp(dir=basetempdir)
332n/a self.tempdir_name = os.path.basename(self.tempdir)
333n/a self.base_url = '/' + self.tempdir_name
334n/a with open(os.path.join(self.tempdir, 'test'), 'wb') as temp:
335n/a temp.write(self.data)
336n/a
337n/a def tearDown(self):
338n/a try:
339n/a os.chdir(self.cwd)
340n/a try:
341n/a shutil.rmtree(self.tempdir)
342n/a except:
343n/a pass
344n/a finally:
345n/a BaseTestCase.tearDown(self)
346n/a
347n/a def check_status_and_reason(self, response, status, data=None):
348n/a def close_conn():
349n/a """Don't close reader yet so we can check if there was leftover
350n/a buffered input"""
351n/a nonlocal reader
352n/a reader = response.fp
353n/a response.fp = None
354n/a reader = None
355n/a response._close_conn = close_conn
356n/a
357n/a body = response.read()
358n/a self.assertTrue(response)
359n/a self.assertEqual(response.status, status)
360n/a self.assertIsNotNone(response.reason)
361n/a if data:
362n/a self.assertEqual(data, body)
363n/a # Ensure the server has not set up a persistent connection, and has
364n/a # not sent any extra data
365n/a self.assertEqual(response.version, 10)
366n/a self.assertEqual(response.msg.get("Connection", "close"), "close")
367n/a self.assertEqual(reader.read(30), b'', 'Connection should be closed')
368n/a
369n/a reader.close()
370n/a return body
371n/a
372n/a @support.requires_mac_ver(10, 5)
373n/a @unittest.skipIf(sys.platform == 'win32',
374n/a 'undecodable name cannot be decoded on win32')
375n/a @unittest.skipUnless(support.TESTFN_UNDECODABLE,
376n/a 'need support.TESTFN_UNDECODABLE')
377n/a def test_undecodable_filename(self):
378n/a enc = sys.getfilesystemencoding()
379n/a filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
380n/a with open(os.path.join(self.tempdir, filename), 'wb') as f:
381n/a f.write(support.TESTFN_UNDECODABLE)
382n/a response = self.request(self.base_url + '/')
383n/a if sys.platform == 'darwin':
384n/a # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
385n/a # UTF-8 into a percent-encoded value.
386n/a for name in os.listdir(self.tempdir):
387n/a if name != 'test': # Ignore a filename created in setUp().
388n/a filename = name
389n/a break
390n/a body = self.check_status_and_reason(response, HTTPStatus.OK)
391n/a quotedname = urllib.parse.quote(filename, errors='surrogatepass')
392n/a self.assertIn(('href="%s"' % quotedname)
393n/a .encode(enc, 'surrogateescape'), body)
394n/a self.assertIn(('>%s<' % html.escape(filename, quote=False))
395n/a .encode(enc, 'surrogateescape'), body)
396n/a response = self.request(self.base_url + '/' + quotedname)
397n/a self.check_status_and_reason(response, HTTPStatus.OK,
398n/a data=support.TESTFN_UNDECODABLE)
399n/a
400n/a def test_get(self):
401n/a #constructs the path relative to the root directory of the HTTPServer
402n/a response = self.request(self.base_url + '/test')
403n/a self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
404n/a # check for trailing "/" which should return 404. See Issue17324
405n/a response = self.request(self.base_url + '/test/')
406n/a self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
407n/a response = self.request(self.base_url + '/')
408n/a self.check_status_and_reason(response, HTTPStatus.OK)
409n/a response = self.request(self.base_url)
410n/a self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
411n/a response = self.request(self.base_url + '/?hi=2')
412n/a self.check_status_and_reason(response, HTTPStatus.OK)
413n/a response = self.request(self.base_url + '?hi=1')
414n/a self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
415n/a self.assertEqual(response.getheader("Location"),
416n/a self.base_url + "/?hi=1")
417n/a response = self.request('/ThisDoesNotExist')
418n/a self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
419n/a response = self.request('/' + 'ThisDoesNotExist' + '/')
420n/a self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
421n/a
422n/a data = b"Dummy index file\r\n"
423n/a with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
424n/a f.write(data)
425n/a response = self.request(self.base_url + '/')
426n/a self.check_status_and_reason(response, HTTPStatus.OK, data)
427n/a
428n/a # chmod() doesn't work as expected on Windows, and filesystem
429n/a # permissions are ignored by root on Unix.
430n/a if os.name == 'posix' and os.geteuid() != 0:
431n/a os.chmod(self.tempdir, 0)
432n/a try:
433n/a response = self.request(self.base_url + '/')
434n/a self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
435n/a finally:
436n/a os.chmod(self.tempdir, 0o755)
437n/a
438n/a def test_head(self):
439n/a response = self.request(
440n/a self.base_url + '/test', method='HEAD')
441n/a self.check_status_and_reason(response, HTTPStatus.OK)
442n/a self.assertEqual(response.getheader('content-length'),
443n/a str(len(self.data)))
444n/a self.assertEqual(response.getheader('content-type'),
445n/a 'application/octet-stream')
446n/a
447n/a def test_invalid_requests(self):
448n/a response = self.request('/', method='FOO')
449n/a self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
450n/a # requests must be case sensitive,so this should fail too
451n/a response = self.request('/', method='custom')
452n/a self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
453n/a response = self.request('/', method='GETs')
454n/a self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
455n/a
456n/a def test_path_without_leading_slash(self):
457n/a response = self.request(self.tempdir_name + '/test')
458n/a self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
459n/a response = self.request(self.tempdir_name + '/test/')
460n/a self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
461n/a response = self.request(self.tempdir_name + '/')
462n/a self.check_status_and_reason(response, HTTPStatus.OK)
463n/a response = self.request(self.tempdir_name)
464n/a self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
465n/a response = self.request(self.tempdir_name + '/?hi=2')
466n/a self.check_status_and_reason(response, HTTPStatus.OK)
467n/a response = self.request(self.tempdir_name + '?hi=1')
468n/a self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
469n/a self.assertEqual(response.getheader("Location"),
470n/a self.tempdir_name + "/?hi=1")
471n/a
472n/a def test_html_escape_filename(self):
473n/a filename = '<test&>.txt'
474n/a fullpath = os.path.join(self.tempdir, filename)
475n/a
476n/a try:
477n/a open(fullpath, 'w').close()
478n/a except OSError:
479n/a raise unittest.SkipTest('Can not create file %s on current file '
480n/a 'system' % filename)
481n/a
482n/a try:
483n/a response = self.request(self.base_url + '/')
484n/a body = self.check_status_and_reason(response, HTTPStatus.OK)
485n/a enc = response.headers.get_content_charset()
486n/a finally:
487n/a os.unlink(fullpath) # avoid affecting test_undecodable_filename
488n/a
489n/a self.assertIsNotNone(enc)
490n/a html_text = '>%s<' % html.escape(filename, quote=False)
491n/a self.assertIn(html_text.encode(enc), body)
492n/a
493n/a
494n/acgi_file1 = """\
495n/a#!%s
496n/a
497n/aprint("Content-type: text/html")
498n/aprint()
499n/aprint("Hello World")
500n/a"""
501n/a
502n/acgi_file2 = """\
503n/a#!%s
504n/aimport cgi
505n/a
506n/aprint("Content-type: text/html")
507n/aprint()
508n/a
509n/aform = cgi.FieldStorage()
510n/aprint("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
511n/a form.getfirst("bacon")))
512n/a"""
513n/a
514n/acgi_file4 = """\
515n/a#!%s
516n/aimport os
517n/a
518n/aprint("Content-type: text/html")
519n/aprint()
520n/a
521n/aprint(os.environ["%s"])
522n/a"""
523n/a
524n/a
525n/a@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
526n/a "This test can't be run reliably as root (issue #13308).")
527n/aclass CGIHTTPServerTestCase(BaseTestCase):
528n/a class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
529n/a pass
530n/a
531n/a linesep = os.linesep.encode('ascii')
532n/a
533n/a def setUp(self):
534n/a BaseTestCase.setUp(self)
535n/a self.cwd = os.getcwd()
536n/a self.parent_dir = tempfile.mkdtemp()
537n/a self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
538n/a self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
539n/a os.mkdir(self.cgi_dir)
540n/a os.mkdir(self.cgi_child_dir)
541n/a self.nocgi_path = None
542n/a self.file1_path = None
543n/a self.file2_path = None
544n/a self.file3_path = None
545n/a self.file4_path = None
546n/a
547n/a # The shebang line should be pure ASCII: use symlink if possible.
548n/a # See issue #7668.
549n/a if support.can_symlink():
550n/a self.pythonexe = os.path.join(self.parent_dir, 'python')
551n/a os.symlink(sys.executable, self.pythonexe)
552n/a else:
553n/a self.pythonexe = sys.executable
554n/a
555n/a try:
556n/a # The python executable path is written as the first line of the
557n/a # CGI Python script. The encoding cookie cannot be used, and so the
558n/a # path should be encodable to the default script encoding (utf-8)
559n/a self.pythonexe.encode('utf-8')
560n/a except UnicodeEncodeError:
561n/a self.tearDown()
562n/a self.skipTest("Python executable path is not encodable to utf-8")
563n/a
564n/a self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
565n/a with open(self.nocgi_path, 'w') as fp:
566n/a fp.write(cgi_file1 % self.pythonexe)
567n/a os.chmod(self.nocgi_path, 0o777)
568n/a
569n/a self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
570n/a with open(self.file1_path, 'w', encoding='utf-8') as file1:
571n/a file1.write(cgi_file1 % self.pythonexe)
572n/a os.chmod(self.file1_path, 0o777)
573n/a
574n/a self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
575n/a with open(self.file2_path, 'w', encoding='utf-8') as file2:
576n/a file2.write(cgi_file2 % self.pythonexe)
577n/a os.chmod(self.file2_path, 0o777)
578n/a
579n/a self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
580n/a with open(self.file3_path, 'w', encoding='utf-8') as file3:
581n/a file3.write(cgi_file1 % self.pythonexe)
582n/a os.chmod(self.file3_path, 0o777)
583n/a
584n/a self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
585n/a with open(self.file4_path, 'w', encoding='utf-8') as file4:
586n/a file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
587n/a os.chmod(self.file4_path, 0o777)
588n/a
589n/a os.chdir(self.parent_dir)
590n/a
591n/a def tearDown(self):
592n/a try:
593n/a os.chdir(self.cwd)
594n/a if self.pythonexe != sys.executable:
595n/a os.remove(self.pythonexe)
596n/a if self.nocgi_path:
597n/a os.remove(self.nocgi_path)
598n/a if self.file1_path:
599n/a os.remove(self.file1_path)
600n/a if self.file2_path:
601n/a os.remove(self.file2_path)
602n/a if self.file3_path:
603n/a os.remove(self.file3_path)
604n/a if self.file4_path:
605n/a os.remove(self.file4_path)
606n/a os.rmdir(self.cgi_child_dir)
607n/a os.rmdir(self.cgi_dir)
608n/a os.rmdir(self.parent_dir)
609n/a finally:
610n/a BaseTestCase.tearDown(self)
611n/a
612n/a def test_url_collapse_path(self):
613n/a # verify tail is the last portion and head is the rest on proper urls
614n/a test_vectors = {
615n/a '': '//',
616n/a '..': IndexError,
617n/a '/.//..': IndexError,
618n/a '/': '//',
619n/a '//': '//',
620n/a '/\\': '//\\',
621n/a '/.//': '//',
622n/a 'cgi-bin/file1.py': '/cgi-bin/file1.py',
623n/a '/cgi-bin/file1.py': '/cgi-bin/file1.py',
624n/a 'a': '//a',
625n/a '/a': '//a',
626n/a '//a': '//a',
627n/a './a': '//a',
628n/a './C:/': '/C:/',
629n/a '/a/b': '/a/b',
630n/a '/a/b/': '/a/b/',
631n/a '/a/b/.': '/a/b/',
632n/a '/a/b/c/..': '/a/b/',
633n/a '/a/b/c/../d': '/a/b/d',
634n/a '/a/b/c/../d/e/../f': '/a/b/d/f',
635n/a '/a/b/c/../d/e/../../f': '/a/b/f',
636n/a '/a/b/c/../d/e/.././././..//f': '/a/b/f',
637n/a '../a/b/c/../d/e/.././././..//f': IndexError,
638n/a '/a/b/c/../d/e/../../../f': '/a/f',
639n/a '/a/b/c/../d/e/../../../../f': '//f',
640n/a '/a/b/c/../d/e/../../../../../f': IndexError,
641n/a '/a/b/c/../d/e/../../../../f/..': '//',
642n/a '/a/b/c/../d/e/../../../../f/../.': '//',
643n/a }
644n/a for path, expected in test_vectors.items():
645n/a if isinstance(expected, type) and issubclass(expected, Exception):
646n/a self.assertRaises(expected,
647n/a server._url_collapse_path, path)
648n/a else:
649n/a actual = server._url_collapse_path(path)
650n/a self.assertEqual(expected, actual,
651n/a msg='path = %r\nGot: %r\nWanted: %r' %
652n/a (path, actual, expected))
653n/a
654n/a def test_headers_and_content(self):
655n/a res = self.request('/cgi-bin/file1.py')
656n/a self.assertEqual(
657n/a (res.read(), res.getheader('Content-type'), res.status),
658n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
659n/a
660n/a def test_issue19435(self):
661n/a res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
662n/a self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
663n/a
664n/a def test_post(self):
665n/a params = urllib.parse.urlencode(
666n/a {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
667n/a headers = {'Content-type' : 'application/x-www-form-urlencoded'}
668n/a res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
669n/a
670n/a self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
671n/a
672n/a def test_invaliduri(self):
673n/a res = self.request('/cgi-bin/invalid')
674n/a res.read()
675n/a self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
676n/a
677n/a def test_authorization(self):
678n/a headers = {b'Authorization' : b'Basic ' +
679n/a base64.b64encode(b'username:pass')}
680n/a res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
681n/a self.assertEqual(
682n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
683n/a (res.read(), res.getheader('Content-type'), res.status))
684n/a
685n/a def test_no_leading_slash(self):
686n/a # http://bugs.python.org/issue2254
687n/a res = self.request('cgi-bin/file1.py')
688n/a self.assertEqual(
689n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
690n/a (res.read(), res.getheader('Content-type'), res.status))
691n/a
692n/a def test_os_environ_is_not_altered(self):
693n/a signature = "Test CGI Server"
694n/a os.environ['SERVER_SOFTWARE'] = signature
695n/a res = self.request('/cgi-bin/file1.py')
696n/a self.assertEqual(
697n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
698n/a (res.read(), res.getheader('Content-type'), res.status))
699n/a self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
700n/a
701n/a def test_urlquote_decoding_in_cgi_check(self):
702n/a res = self.request('/cgi-bin%2ffile1.py')
703n/a self.assertEqual(
704n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
705n/a (res.read(), res.getheader('Content-type'), res.status))
706n/a
707n/a def test_nested_cgi_path_issue21323(self):
708n/a res = self.request('/cgi-bin/child-dir/file3.py')
709n/a self.assertEqual(
710n/a (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
711n/a (res.read(), res.getheader('Content-type'), res.status))
712n/a
713n/a def test_query_with_multiple_question_mark(self):
714n/a res = self.request('/cgi-bin/file4.py?a=b?c=d')
715n/a self.assertEqual(
716n/a (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
717n/a (res.read(), res.getheader('Content-type'), res.status))
718n/a
719n/a def test_query_with_continuous_slashes(self):
720n/a res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
721n/a self.assertEqual(
722n/a (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
723n/a 'text/html', HTTPStatus.OK),
724n/a (res.read(), res.getheader('Content-type'), res.status))
725n/a
726n/a
727n/aclass SocketlessRequestHandler(SimpleHTTPRequestHandler):
728n/a def __init__(self):
729n/a self.get_called = False
730n/a self.protocol_version = "HTTP/1.1"
731n/a
732n/a def do_GET(self):
733n/a self.get_called = True
734n/a self.send_response(HTTPStatus.OK)
735n/a self.send_header('Content-Type', 'text/html')
736n/a self.end_headers()
737n/a self.wfile.write(b'<html><body>Data</body></html>\r\n')
738n/a
739n/a def log_message(self, format, *args):
740n/a pass
741n/a
742n/aclass RejectingSocketlessRequestHandler(SocketlessRequestHandler):
743n/a def handle_expect_100(self):
744n/a self.send_error(HTTPStatus.EXPECTATION_FAILED)
745n/a return False
746n/a
747n/a
748n/aclass AuditableBytesIO:
749n/a
750n/a def __init__(self):
751n/a self.datas = []
752n/a
753n/a def write(self, data):
754n/a self.datas.append(data)
755n/a
756n/a def getData(self):
757n/a return b''.join(self.datas)
758n/a
759n/a @property
760n/a def numWrites(self):
761n/a return len(self.datas)
762n/a
763n/a
764n/aclass BaseHTTPRequestHandlerTestCase(unittest.TestCase):
765n/a """Test the functionality of the BaseHTTPServer.
766n/a
767n/a Test the support for the Expect 100-continue header.
768n/a """
769n/a
770n/a HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
771n/a
772n/a def setUp (self):
773n/a self.handler = SocketlessRequestHandler()
774n/a
775n/a def send_typical_request(self, message):
776n/a input = BytesIO(message)
777n/a output = BytesIO()
778n/a self.handler.rfile = input
779n/a self.handler.wfile = output
780n/a self.handler.handle_one_request()
781n/a output.seek(0)
782n/a return output.readlines()
783n/a
784n/a def verify_get_called(self):
785n/a self.assertTrue(self.handler.get_called)
786n/a
787n/a def verify_expected_headers(self, headers):
788n/a for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
789n/a self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
790n/a
791n/a def verify_http_server_response(self, response):
792n/a match = self.HTTPResponseMatch.search(response)
793n/a self.assertIsNotNone(match)
794n/a
795n/a def test_http_1_1(self):
796n/a result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
797n/a self.verify_http_server_response(result[0])
798n/a self.verify_expected_headers(result[1:-1])
799n/a self.verify_get_called()
800n/a self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
801n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
802n/a self.assertEqual(self.handler.command, 'GET')
803n/a self.assertEqual(self.handler.path, '/')
804n/a self.assertEqual(self.handler.request_version, 'HTTP/1.1')
805n/a self.assertSequenceEqual(self.handler.headers.items(), ())
806n/a
807n/a def test_http_1_0(self):
808n/a result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
809n/a self.verify_http_server_response(result[0])
810n/a self.verify_expected_headers(result[1:-1])
811n/a self.verify_get_called()
812n/a self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
813n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
814n/a self.assertEqual(self.handler.command, 'GET')
815n/a self.assertEqual(self.handler.path, '/')
816n/a self.assertEqual(self.handler.request_version, 'HTTP/1.0')
817n/a self.assertSequenceEqual(self.handler.headers.items(), ())
818n/a
819n/a def test_http_0_9(self):
820n/a result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
821n/a self.assertEqual(len(result), 1)
822n/a self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
823n/a self.verify_get_called()
824n/a
825n/a def test_extra_space(self):
826n/a result = self.send_typical_request(
827n/a b'GET /spaced out HTTP/1.1\r\n'
828n/a b'Host: dummy\r\n'
829n/a b'\r\n'
830n/a )
831n/a self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
832n/a self.verify_expected_headers(result[1:result.index(b'\r\n')])
833n/a self.assertFalse(self.handler.get_called)
834n/a
835n/a def test_with_continue_1_0(self):
836n/a result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
837n/a self.verify_http_server_response(result[0])
838n/a self.verify_expected_headers(result[1:-1])
839n/a self.verify_get_called()
840n/a self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
841n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
842n/a self.assertEqual(self.handler.command, 'GET')
843n/a self.assertEqual(self.handler.path, '/')
844n/a self.assertEqual(self.handler.request_version, 'HTTP/1.0')
845n/a headers = (("Expect", "100-continue"),)
846n/a self.assertSequenceEqual(self.handler.headers.items(), headers)
847n/a
848n/a def test_with_continue_1_1(self):
849n/a result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
850n/a self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
851n/a self.assertEqual(result[1], b'\r\n')
852n/a self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
853n/a self.verify_expected_headers(result[2:-1])
854n/a self.verify_get_called()
855n/a self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
856n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
857n/a self.assertEqual(self.handler.command, 'GET')
858n/a self.assertEqual(self.handler.path, '/')
859n/a self.assertEqual(self.handler.request_version, 'HTTP/1.1')
860n/a headers = (("Expect", "100-continue"),)
861n/a self.assertSequenceEqual(self.handler.headers.items(), headers)
862n/a
863n/a def test_header_buffering_of_send_error(self):
864n/a
865n/a input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
866n/a output = AuditableBytesIO()
867n/a handler = SocketlessRequestHandler()
868n/a handler.rfile = input
869n/a handler.wfile = output
870n/a handler.request_version = 'HTTP/1.1'
871n/a handler.requestline = ''
872n/a handler.command = None
873n/a
874n/a handler.send_error(418)
875n/a self.assertEqual(output.numWrites, 2)
876n/a
877n/a def test_header_buffering_of_send_response_only(self):
878n/a
879n/a input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
880n/a output = AuditableBytesIO()
881n/a handler = SocketlessRequestHandler()
882n/a handler.rfile = input
883n/a handler.wfile = output
884n/a handler.request_version = 'HTTP/1.1'
885n/a
886n/a handler.send_response_only(418)
887n/a self.assertEqual(output.numWrites, 0)
888n/a handler.end_headers()
889n/a self.assertEqual(output.numWrites, 1)
890n/a
891n/a def test_header_buffering_of_send_header(self):
892n/a
893n/a input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
894n/a output = AuditableBytesIO()
895n/a handler = SocketlessRequestHandler()
896n/a handler.rfile = input
897n/a handler.wfile = output
898n/a handler.request_version = 'HTTP/1.1'
899n/a
900n/a handler.send_header('Foo', 'foo')
901n/a handler.send_header('bar', 'bar')
902n/a self.assertEqual(output.numWrites, 0)
903n/a handler.end_headers()
904n/a self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
905n/a self.assertEqual(output.numWrites, 1)
906n/a
907n/a def test_header_unbuffered_when_continue(self):
908n/a
909n/a def _readAndReseek(f):
910n/a pos = f.tell()
911n/a f.seek(0)
912n/a data = f.read()
913n/a f.seek(pos)
914n/a return data
915n/a
916n/a input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
917n/a output = BytesIO()
918n/a self.handler.rfile = input
919n/a self.handler.wfile = output
920n/a self.handler.request_version = 'HTTP/1.1'
921n/a
922n/a self.handler.handle_one_request()
923n/a self.assertNotEqual(_readAndReseek(output), b'')
924n/a result = _readAndReseek(output).split(b'\r\n')
925n/a self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
926n/a self.assertEqual(result[1], b'')
927n/a self.assertEqual(result[2], b'HTTP/1.1 200 OK')
928n/a
929n/a def test_with_continue_rejected(self):
930n/a usual_handler = self.handler # Save to avoid breaking any subsequent tests.
931n/a self.handler = RejectingSocketlessRequestHandler()
932n/a result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
933n/a self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
934n/a self.verify_expected_headers(result[1:-1])
935n/a # The expect handler should short circuit the usual get method by
936n/a # returning false here, so get_called should be false
937n/a self.assertFalse(self.handler.get_called)
938n/a self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
939n/a self.handler = usual_handler # Restore to avoid breaking any subsequent tests.
940n/a
941n/a def test_request_length(self):
942n/a # Issue #10714: huge request lines are discarded, to avoid Denial
943n/a # of Service attacks.
944n/a result = self.send_typical_request(b'GET ' + b'x' * 65537)
945n/a self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
946n/a self.assertFalse(self.handler.get_called)
947n/a self.assertIsInstance(self.handler.requestline, str)
948n/a
949n/a def test_header_length(self):
950n/a # Issue #6791: same for headers
951n/a result = self.send_typical_request(
952n/a b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
953n/a self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
954n/a self.assertFalse(self.handler.get_called)
955n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
956n/a
957n/a def test_too_many_headers(self):
958n/a result = self.send_typical_request(
959n/a b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
960n/a self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
961n/a self.assertFalse(self.handler.get_called)
962n/a self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
963n/a
964n/a def test_html_escape_on_error(self):
965n/a result = self.send_typical_request(
966n/a b'<script>alert("hello")</script> / HTTP/1.1')
967n/a result = b''.join(result)
968n/a text = '<script>alert("hello")</script>'
969n/a self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
970n/a
971n/a def test_close_connection(self):
972n/a # handle_one_request() should be repeatedly called until
973n/a # it sets close_connection
974n/a def handle_one_request():
975n/a self.handler.close_connection = next(close_values)
976n/a self.handler.handle_one_request = handle_one_request
977n/a
978n/a close_values = iter((True,))
979n/a self.handler.handle()
980n/a self.assertRaises(StopIteration, next, close_values)
981n/a
982n/a close_values = iter((False, False, True))
983n/a self.handler.handle()
984n/a self.assertRaises(StopIteration, next, close_values)
985n/a
986n/a def test_date_time_string(self):
987n/a now = time.time()
988n/a # this is the old code that formats the timestamp
989n/a year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
990n/a expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
991n/a self.handler.weekdayname[wd],
992n/a day,
993n/a self.handler.monthname[month],
994n/a year, hh, mm, ss
995n/a )
996n/a self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
997n/a
998n/a
999n/aclass SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1000n/a """ Test url parsing """
1001n/a def setUp(self):
1002n/a self.translated = os.getcwd()
1003n/a self.translated = os.path.join(self.translated, 'filename')
1004n/a self.handler = SocketlessRequestHandler()
1005n/a
1006n/a def test_query_arguments(self):
1007n/a path = self.handler.translate_path('/filename')
1008n/a self.assertEqual(path, self.translated)
1009n/a path = self.handler.translate_path('/filename?foo=bar')
1010n/a self.assertEqual(path, self.translated)
1011n/a path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
1012n/a self.assertEqual(path, self.translated)
1013n/a
1014n/a def test_start_with_double_slash(self):
1015n/a path = self.handler.translate_path('//filename')
1016n/a self.assertEqual(path, self.translated)
1017n/a path = self.handler.translate_path('//filename?foo=bar')
1018n/a self.assertEqual(path, self.translated)
1019n/a
1020n/a def test_windows_colon(self):
1021n/a with support.swap_attr(server.os, 'path', ntpath):
1022n/a path = self.handler.translate_path('c:c:c:foo/filename')
1023n/a path = path.replace(ntpath.sep, os.sep)
1024n/a self.assertEqual(path, self.translated)
1025n/a
1026n/a path = self.handler.translate_path('\\c:../filename')
1027n/a path = path.replace(ntpath.sep, os.sep)
1028n/a self.assertEqual(path, self.translated)
1029n/a
1030n/a path = self.handler.translate_path('c:\\c:..\\foo/filename')
1031n/a path = path.replace(ntpath.sep, os.sep)
1032n/a self.assertEqual(path, self.translated)
1033n/a
1034n/a path = self.handler.translate_path('c:c:foo\\c:c:bar/filename')
1035n/a path = path.replace(ntpath.sep, os.sep)
1036n/a self.assertEqual(path, self.translated)
1037n/a
1038n/a
1039n/aclass MiscTestCase(unittest.TestCase):
1040n/a def test_all(self):
1041n/a expected = []
1042n/a blacklist = {'executable', 'nobody_uid', 'test'}
1043n/a for name in dir(server):
1044n/a if name.startswith('_') or name in blacklist:
1045n/a continue
1046n/a module_object = getattr(server, name)
1047n/a if getattr(module_object, '__module__', None) == 'http.server':
1048n/a expected.append(name)
1049n/a self.assertCountEqual(server.__all__, expected)
1050n/a
1051n/a
1052n/adef test_main(verbose=None):
1053n/a cwd = os.getcwd()
1054n/a try:
1055n/a support.run_unittest(
1056n/a RequestHandlerLoggingTestCase,
1057n/a BaseHTTPRequestHandlerTestCase,
1058n/a BaseHTTPServerTestCase,
1059n/a SimpleHTTPServerTestCase,
1060n/a CGIHTTPServerTestCase,
1061n/a SimpleHTTPRequestHandlerTestCase,
1062n/a MiscTestCase,
1063n/a )
1064n/a finally:
1065n/a os.chdir(cwd)
1066n/a
1067n/aif __name__ == '__main__':
1068n/a test_main()