1 | n/a | from unittest import mock |
---|
2 | n/a | from test import support |
---|
3 | n/a | from test.test_httpservers import NoLogRequestHandler |
---|
4 | n/a | from unittest import TestCase |
---|
5 | n/a | from wsgiref.util import setup_testing_defaults |
---|
6 | n/a | from wsgiref.headers import Headers |
---|
7 | n/a | from wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler |
---|
8 | n/a | from wsgiref import util |
---|
9 | n/a | from wsgiref.validate import validator |
---|
10 | n/a | from wsgiref.simple_server import WSGIServer, WSGIRequestHandler |
---|
11 | n/a | from wsgiref.simple_server import make_server |
---|
12 | n/a | from http.client import HTTPConnection |
---|
13 | n/a | from io import StringIO, BytesIO, BufferedReader |
---|
14 | n/a | from socketserver import BaseServer |
---|
15 | n/a | from platform import python_implementation |
---|
16 | n/a | |
---|
17 | n/a | import os |
---|
18 | n/a | import re |
---|
19 | n/a | import signal |
---|
20 | n/a | import sys |
---|
21 | n/a | import unittest |
---|
22 | n/a | |
---|
23 | n/a | |
---|
24 | n/a | class MockServer(WSGIServer): |
---|
25 | n/a | """Non-socket HTTP server""" |
---|
26 | n/a | |
---|
27 | n/a | def __init__(self, server_address, RequestHandlerClass): |
---|
28 | n/a | BaseServer.__init__(self, server_address, RequestHandlerClass) |
---|
29 | n/a | self.server_bind() |
---|
30 | n/a | |
---|
31 | n/a | def server_bind(self): |
---|
32 | n/a | host, port = self.server_address |
---|
33 | n/a | self.server_name = host |
---|
34 | n/a | self.server_port = port |
---|
35 | n/a | self.setup_environ() |
---|
36 | n/a | |
---|
37 | n/a | |
---|
38 | n/a | class MockHandler(WSGIRequestHandler): |
---|
39 | n/a | """Non-socket HTTP handler""" |
---|
40 | n/a | def setup(self): |
---|
41 | n/a | self.connection = self.request |
---|
42 | n/a | self.rfile, self.wfile = self.connection |
---|
43 | n/a | |
---|
44 | n/a | def finish(self): |
---|
45 | n/a | pass |
---|
46 | n/a | |
---|
47 | n/a | |
---|
48 | n/a | def hello_app(environ,start_response): |
---|
49 | n/a | start_response("200 OK", [ |
---|
50 | n/a | ('Content-Type','text/plain'), |
---|
51 | n/a | ('Date','Mon, 05 Jun 2006 18:49:54 GMT') |
---|
52 | n/a | ]) |
---|
53 | n/a | return [b"Hello, world!"] |
---|
54 | n/a | |
---|
55 | n/a | |
---|
56 | n/a | def header_app(environ, start_response): |
---|
57 | n/a | start_response("200 OK", [ |
---|
58 | n/a | ('Content-Type', 'text/plain'), |
---|
59 | n/a | ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT') |
---|
60 | n/a | ]) |
---|
61 | n/a | return [';'.join([ |
---|
62 | n/a | environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'], |
---|
63 | n/a | environ['PATH_INFO'] |
---|
64 | n/a | ]).encode('iso-8859-1')] |
---|
65 | n/a | |
---|
66 | n/a | |
---|
67 | n/a | def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): |
---|
68 | n/a | server = make_server("", 80, app, MockServer, MockHandler) |
---|
69 | n/a | inp = BufferedReader(BytesIO(data)) |
---|
70 | n/a | out = BytesIO() |
---|
71 | n/a | olderr = sys.stderr |
---|
72 | n/a | err = sys.stderr = StringIO() |
---|
73 | n/a | |
---|
74 | n/a | try: |
---|
75 | n/a | server.finish_request((inp, out), ("127.0.0.1",8888)) |
---|
76 | n/a | finally: |
---|
77 | n/a | sys.stderr = olderr |
---|
78 | n/a | |
---|
79 | n/a | return out.getvalue(), err.getvalue() |
---|
80 | n/a | |
---|
81 | n/a | def compare_generic_iter(make_it,match): |
---|
82 | n/a | """Utility to compare a generic 2.1/2.2+ iterator with an iterable |
---|
83 | n/a | |
---|
84 | n/a | If running under Python 2.2+, this tests the iterator using iter()/next(), |
---|
85 | n/a | as well as __getitem__. 'make_it' must be a function returning a fresh |
---|
86 | n/a | iterator to be tested (since this may test the iterator twice).""" |
---|
87 | n/a | |
---|
88 | n/a | it = make_it() |
---|
89 | n/a | n = 0 |
---|
90 | n/a | for item in match: |
---|
91 | n/a | if not it[n]==item: raise AssertionError |
---|
92 | n/a | n+=1 |
---|
93 | n/a | try: |
---|
94 | n/a | it[n] |
---|
95 | n/a | except IndexError: |
---|
96 | n/a | pass |
---|
97 | n/a | else: |
---|
98 | n/a | raise AssertionError("Too many items from __getitem__",it) |
---|
99 | n/a | |
---|
100 | n/a | try: |
---|
101 | n/a | iter, StopIteration |
---|
102 | n/a | except NameError: |
---|
103 | n/a | pass |
---|
104 | n/a | else: |
---|
105 | n/a | # Only test iter mode under 2.2+ |
---|
106 | n/a | it = make_it() |
---|
107 | n/a | if not iter(it) is it: raise AssertionError |
---|
108 | n/a | for item in match: |
---|
109 | n/a | if not next(it) == item: raise AssertionError |
---|
110 | n/a | try: |
---|
111 | n/a | next(it) |
---|
112 | n/a | except StopIteration: |
---|
113 | n/a | pass |
---|
114 | n/a | else: |
---|
115 | n/a | raise AssertionError("Too many items from .__next__()", it) |
---|
116 | n/a | |
---|
117 | n/a | |
---|
118 | n/a | class IntegrationTests(TestCase): |
---|
119 | n/a | |
---|
120 | n/a | def check_hello(self, out, has_length=True): |
---|
121 | n/a | pyver = (python_implementation() + "/" + |
---|
122 | n/a | sys.version.split()[0]) |
---|
123 | n/a | self.assertEqual(out, |
---|
124 | n/a | ("HTTP/1.0 200 OK\r\n" |
---|
125 | n/a | "Server: WSGIServer/0.2 " + pyver +"\r\n" |
---|
126 | n/a | "Content-Type: text/plain\r\n" |
---|
127 | n/a | "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" + |
---|
128 | n/a | (has_length and "Content-Length: 13\r\n" or "") + |
---|
129 | n/a | "\r\n" |
---|
130 | n/a | "Hello, world!").encode("iso-8859-1") |
---|
131 | n/a | ) |
---|
132 | n/a | |
---|
133 | n/a | def test_plain_hello(self): |
---|
134 | n/a | out, err = run_amock() |
---|
135 | n/a | self.check_hello(out) |
---|
136 | n/a | |
---|
137 | n/a | def test_environ(self): |
---|
138 | n/a | request = ( |
---|
139 | n/a | b"GET /p%61th/?query=test HTTP/1.0\n" |
---|
140 | n/a | b"X-Test-Header: Python test \n" |
---|
141 | n/a | b"X-Test-Header: Python test 2\n" |
---|
142 | n/a | b"Content-Length: 0\n\n" |
---|
143 | n/a | ) |
---|
144 | n/a | out, err = run_amock(header_app, request) |
---|
145 | n/a | self.assertEqual( |
---|
146 | n/a | out.splitlines()[-1], |
---|
147 | n/a | b"Python test,Python test 2;query=test;/path/" |
---|
148 | n/a | ) |
---|
149 | n/a | |
---|
150 | n/a | def test_request_length(self): |
---|
151 | n/a | out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n") |
---|
152 | n/a | self.assertEqual(out.splitlines()[0], |
---|
153 | n/a | b"HTTP/1.0 414 Request-URI Too Long") |
---|
154 | n/a | |
---|
155 | n/a | def test_validated_hello(self): |
---|
156 | n/a | out, err = run_amock(validator(hello_app)) |
---|
157 | n/a | # the middleware doesn't support len(), so content-length isn't there |
---|
158 | n/a | self.check_hello(out, has_length=False) |
---|
159 | n/a | |
---|
160 | n/a | def test_simple_validation_error(self): |
---|
161 | n/a | def bad_app(environ,start_response): |
---|
162 | n/a | start_response("200 OK", ('Content-Type','text/plain')) |
---|
163 | n/a | return ["Hello, world!"] |
---|
164 | n/a | out, err = run_amock(validator(bad_app)) |
---|
165 | n/a | self.assertTrue(out.endswith( |
---|
166 | n/a | b"A server error occurred. Please contact the administrator." |
---|
167 | n/a | )) |
---|
168 | n/a | self.assertEqual( |
---|
169 | n/a | err.splitlines()[-2], |
---|
170 | n/a | "AssertionError: Headers (('Content-Type', 'text/plain')) must" |
---|
171 | n/a | " be of type list: <class 'tuple'>" |
---|
172 | n/a | ) |
---|
173 | n/a | |
---|
174 | n/a | def test_status_validation_errors(self): |
---|
175 | n/a | def create_bad_app(status): |
---|
176 | n/a | def bad_app(environ, start_response): |
---|
177 | n/a | start_response(status, [("Content-Type", "text/plain; charset=utf-8")]) |
---|
178 | n/a | return [b"Hello, world!"] |
---|
179 | n/a | return bad_app |
---|
180 | n/a | |
---|
181 | n/a | tests = [ |
---|
182 | n/a | ('200', 'AssertionError: Status must be at least 4 characters'), |
---|
183 | n/a | ('20X OK', 'AssertionError: Status message must begin w/3-digit code'), |
---|
184 | n/a | ('200OK', 'AssertionError: Status message must have a space after code'), |
---|
185 | n/a | ] |
---|
186 | n/a | |
---|
187 | n/a | for status, exc_message in tests: |
---|
188 | n/a | with self.subTest(status=status): |
---|
189 | n/a | out, err = run_amock(create_bad_app(status)) |
---|
190 | n/a | self.assertTrue(out.endswith( |
---|
191 | n/a | b"A server error occurred. Please contact the administrator." |
---|
192 | n/a | )) |
---|
193 | n/a | self.assertEqual(err.splitlines()[-2], exc_message) |
---|
194 | n/a | |
---|
195 | n/a | def test_wsgi_input(self): |
---|
196 | n/a | def bad_app(e,s): |
---|
197 | n/a | e["wsgi.input"].read() |
---|
198 | n/a | s("200 OK", [("Content-Type", "text/plain; charset=utf-8")]) |
---|
199 | n/a | return [b"data"] |
---|
200 | n/a | out, err = run_amock(validator(bad_app)) |
---|
201 | n/a | self.assertTrue(out.endswith( |
---|
202 | n/a | b"A server error occurred. Please contact the administrator." |
---|
203 | n/a | )) |
---|
204 | n/a | self.assertEqual( |
---|
205 | n/a | err.splitlines()[-2], "AssertionError" |
---|
206 | n/a | ) |
---|
207 | n/a | |
---|
208 | n/a | def test_bytes_validation(self): |
---|
209 | n/a | def app(e, s): |
---|
210 | n/a | s("200 OK", [ |
---|
211 | n/a | ("Content-Type", "text/plain; charset=utf-8"), |
---|
212 | n/a | ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), |
---|
213 | n/a | ]) |
---|
214 | n/a | return [b"data"] |
---|
215 | n/a | out, err = run_amock(validator(app)) |
---|
216 | n/a | self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n')) |
---|
217 | n/a | ver = sys.version.split()[0].encode('ascii') |
---|
218 | n/a | py = python_implementation().encode('ascii') |
---|
219 | n/a | pyver = py + b"/" + ver |
---|
220 | n/a | self.assertEqual( |
---|
221 | n/a | b"HTTP/1.0 200 OK\r\n" |
---|
222 | n/a | b"Server: WSGIServer/0.2 "+ pyver + b"\r\n" |
---|
223 | n/a | b"Content-Type: text/plain; charset=utf-8\r\n" |
---|
224 | n/a | b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n" |
---|
225 | n/a | b"\r\n" |
---|
226 | n/a | b"data", |
---|
227 | n/a | out) |
---|
228 | n/a | |
---|
229 | n/a | def test_cp1252_url(self): |
---|
230 | n/a | def app(e, s): |
---|
231 | n/a | s("200 OK", [ |
---|
232 | n/a | ("Content-Type", "text/plain"), |
---|
233 | n/a | ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), |
---|
234 | n/a | ]) |
---|
235 | n/a | # PEP3333 says environ variables are decoded as latin1. |
---|
236 | n/a | # Encode as latin1 to get original bytes |
---|
237 | n/a | return [e["PATH_INFO"].encode("latin1")] |
---|
238 | n/a | |
---|
239 | n/a | out, err = run_amock( |
---|
240 | n/a | validator(app), data=b"GET /\x80%80 HTTP/1.0") |
---|
241 | n/a | self.assertEqual( |
---|
242 | n/a | [ |
---|
243 | n/a | b"HTTP/1.0 200 OK", |
---|
244 | n/a | mock.ANY, |
---|
245 | n/a | b"Content-Type: text/plain", |
---|
246 | n/a | b"Date: Wed, 24 Dec 2008 13:29:32 GMT", |
---|
247 | n/a | b"", |
---|
248 | n/a | b"/\x80\x80", |
---|
249 | n/a | ], |
---|
250 | n/a | out.splitlines()) |
---|
251 | n/a | |
---|
252 | n/a | def test_interrupted_write(self): |
---|
253 | n/a | # BaseHandler._write() and _flush() have to write all data, even if |
---|
254 | n/a | # it takes multiple send() calls. Test this by interrupting a send() |
---|
255 | n/a | # call with a Unix signal. |
---|
256 | n/a | threading = support.import_module("threading") |
---|
257 | n/a | pthread_kill = support.get_attribute(signal, "pthread_kill") |
---|
258 | n/a | |
---|
259 | n/a | def app(environ, start_response): |
---|
260 | n/a | start_response("200 OK", []) |
---|
261 | n/a | return [b'\0' * support.SOCK_MAX_SIZE] |
---|
262 | n/a | |
---|
263 | n/a | class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): |
---|
264 | n/a | pass |
---|
265 | n/a | |
---|
266 | n/a | server = make_server(support.HOST, 0, app, handler_class=WsgiHandler) |
---|
267 | n/a | self.addCleanup(server.server_close) |
---|
268 | n/a | interrupted = threading.Event() |
---|
269 | n/a | |
---|
270 | n/a | def signal_handler(signum, frame): |
---|
271 | n/a | interrupted.set() |
---|
272 | n/a | |
---|
273 | n/a | original = signal.signal(signal.SIGUSR1, signal_handler) |
---|
274 | n/a | self.addCleanup(signal.signal, signal.SIGUSR1, original) |
---|
275 | n/a | received = None |
---|
276 | n/a | main_thread = threading.get_ident() |
---|
277 | n/a | |
---|
278 | n/a | def run_client(): |
---|
279 | n/a | http = HTTPConnection(*server.server_address) |
---|
280 | n/a | http.request("GET", "/") |
---|
281 | n/a | with http.getresponse() as response: |
---|
282 | n/a | response.read(100) |
---|
283 | n/a | # The main thread should now be blocking in a send() system |
---|
284 | n/a | # call. But in theory, it could get interrupted by other |
---|
285 | n/a | # signals, and then retried. So keep sending the signal in a |
---|
286 | n/a | # loop, in case an earlier signal happens to be delivered at |
---|
287 | n/a | # an inconvenient moment. |
---|
288 | n/a | while True: |
---|
289 | n/a | pthread_kill(main_thread, signal.SIGUSR1) |
---|
290 | n/a | if interrupted.wait(timeout=float(1)): |
---|
291 | n/a | break |
---|
292 | n/a | nonlocal received |
---|
293 | n/a | received = len(response.read()) |
---|
294 | n/a | http.close() |
---|
295 | n/a | |
---|
296 | n/a | background = threading.Thread(target=run_client) |
---|
297 | n/a | background.start() |
---|
298 | n/a | server.handle_request() |
---|
299 | n/a | background.join() |
---|
300 | n/a | self.assertEqual(received, support.SOCK_MAX_SIZE - 100) |
---|
301 | n/a | |
---|
302 | n/a | |
---|
303 | n/a | class UtilityTests(TestCase): |
---|
304 | n/a | |
---|
305 | n/a | def checkShift(self,sn_in,pi_in,part,sn_out,pi_out): |
---|
306 | n/a | env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in} |
---|
307 | n/a | util.setup_testing_defaults(env) |
---|
308 | n/a | self.assertEqual(util.shift_path_info(env),part) |
---|
309 | n/a | self.assertEqual(env['PATH_INFO'],pi_out) |
---|
310 | n/a | self.assertEqual(env['SCRIPT_NAME'],sn_out) |
---|
311 | n/a | return env |
---|
312 | n/a | |
---|
313 | n/a | def checkDefault(self, key, value, alt=None): |
---|
314 | n/a | # Check defaulting when empty |
---|
315 | n/a | env = {} |
---|
316 | n/a | util.setup_testing_defaults(env) |
---|
317 | n/a | if isinstance(value, StringIO): |
---|
318 | n/a | self.assertIsInstance(env[key], StringIO) |
---|
319 | n/a | elif isinstance(value,BytesIO): |
---|
320 | n/a | self.assertIsInstance(env[key],BytesIO) |
---|
321 | n/a | else: |
---|
322 | n/a | self.assertEqual(env[key], value) |
---|
323 | n/a | |
---|
324 | n/a | # Check existing value |
---|
325 | n/a | env = {key:alt} |
---|
326 | n/a | util.setup_testing_defaults(env) |
---|
327 | n/a | self.assertIs(env[key], alt) |
---|
328 | n/a | |
---|
329 | n/a | def checkCrossDefault(self,key,value,**kw): |
---|
330 | n/a | util.setup_testing_defaults(kw) |
---|
331 | n/a | self.assertEqual(kw[key],value) |
---|
332 | n/a | |
---|
333 | n/a | def checkAppURI(self,uri,**kw): |
---|
334 | n/a | util.setup_testing_defaults(kw) |
---|
335 | n/a | self.assertEqual(util.application_uri(kw),uri) |
---|
336 | n/a | |
---|
337 | n/a | def checkReqURI(self,uri,query=1,**kw): |
---|
338 | n/a | util.setup_testing_defaults(kw) |
---|
339 | n/a | self.assertEqual(util.request_uri(kw,query),uri) |
---|
340 | n/a | |
---|
341 | n/a | def checkFW(self,text,size,match): |
---|
342 | n/a | |
---|
343 | n/a | def make_it(text=text,size=size): |
---|
344 | n/a | return util.FileWrapper(StringIO(text),size) |
---|
345 | n/a | |
---|
346 | n/a | compare_generic_iter(make_it,match) |
---|
347 | n/a | |
---|
348 | n/a | it = make_it() |
---|
349 | n/a | self.assertFalse(it.filelike.closed) |
---|
350 | n/a | |
---|
351 | n/a | for item in it: |
---|
352 | n/a | pass |
---|
353 | n/a | |
---|
354 | n/a | self.assertFalse(it.filelike.closed) |
---|
355 | n/a | |
---|
356 | n/a | it.close() |
---|
357 | n/a | self.assertTrue(it.filelike.closed) |
---|
358 | n/a | |
---|
359 | n/a | def testSimpleShifts(self): |
---|
360 | n/a | self.checkShift('','/', '', '/', '') |
---|
361 | n/a | self.checkShift('','/x', 'x', '/x', '') |
---|
362 | n/a | self.checkShift('/','', None, '/', '') |
---|
363 | n/a | self.checkShift('/a','/x/y', 'x', '/a/x', '/y') |
---|
364 | n/a | self.checkShift('/a','/x/', 'x', '/a/x', '/') |
---|
365 | n/a | |
---|
366 | n/a | def testNormalizedShifts(self): |
---|
367 | n/a | self.checkShift('/a/b', '/../y', '..', '/a', '/y') |
---|
368 | n/a | self.checkShift('', '/../y', '..', '', '/y') |
---|
369 | n/a | self.checkShift('/a/b', '//y', 'y', '/a/b/y', '') |
---|
370 | n/a | self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/') |
---|
371 | n/a | self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '') |
---|
372 | n/a | self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/') |
---|
373 | n/a | self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/') |
---|
374 | n/a | self.checkShift('/a/b', '///', '', '/a/b/', '') |
---|
375 | n/a | self.checkShift('/a/b', '/.//', '', '/a/b/', '') |
---|
376 | n/a | self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/') |
---|
377 | n/a | self.checkShift('/a/b', '/.', None, '/a/b', '') |
---|
378 | n/a | |
---|
379 | n/a | def testDefaults(self): |
---|
380 | n/a | for key, value in [ |
---|
381 | n/a | ('SERVER_NAME','127.0.0.1'), |
---|
382 | n/a | ('SERVER_PORT', '80'), |
---|
383 | n/a | ('SERVER_PROTOCOL','HTTP/1.0'), |
---|
384 | n/a | ('HTTP_HOST','127.0.0.1'), |
---|
385 | n/a | ('REQUEST_METHOD','GET'), |
---|
386 | n/a | ('SCRIPT_NAME',''), |
---|
387 | n/a | ('PATH_INFO','/'), |
---|
388 | n/a | ('wsgi.version', (1,0)), |
---|
389 | n/a | ('wsgi.run_once', 0), |
---|
390 | n/a | ('wsgi.multithread', 0), |
---|
391 | n/a | ('wsgi.multiprocess', 0), |
---|
392 | n/a | ('wsgi.input', BytesIO()), |
---|
393 | n/a | ('wsgi.errors', StringIO()), |
---|
394 | n/a | ('wsgi.url_scheme','http'), |
---|
395 | n/a | ]: |
---|
396 | n/a | self.checkDefault(key,value) |
---|
397 | n/a | |
---|
398 | n/a | def testCrossDefaults(self): |
---|
399 | n/a | self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar") |
---|
400 | n/a | self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on") |
---|
401 | n/a | self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1") |
---|
402 | n/a | self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes") |
---|
403 | n/a | self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo") |
---|
404 | n/a | self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo") |
---|
405 | n/a | self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on") |
---|
406 | n/a | |
---|
407 | n/a | def testGuessScheme(self): |
---|
408 | n/a | self.assertEqual(util.guess_scheme({}), "http") |
---|
409 | n/a | self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http") |
---|
410 | n/a | self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https") |
---|
411 | n/a | self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https") |
---|
412 | n/a | self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https") |
---|
413 | n/a | |
---|
414 | n/a | def testAppURIs(self): |
---|
415 | n/a | self.checkAppURI("http://127.0.0.1/") |
---|
416 | n/a | self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") |
---|
417 | n/a | self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m") |
---|
418 | n/a | self.checkAppURI("http://spam.example.com:2071/", |
---|
419 | n/a | HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071") |
---|
420 | n/a | self.checkAppURI("http://spam.example.com/", |
---|
421 | n/a | SERVER_NAME="spam.example.com") |
---|
422 | n/a | self.checkAppURI("http://127.0.0.1/", |
---|
423 | n/a | HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com") |
---|
424 | n/a | self.checkAppURI("https://127.0.0.1/", HTTPS="on") |
---|
425 | n/a | self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000", |
---|
426 | n/a | HTTP_HOST=None) |
---|
427 | n/a | |
---|
428 | n/a | def testReqURIs(self): |
---|
429 | n/a | self.checkReqURI("http://127.0.0.1/") |
---|
430 | n/a | self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") |
---|
431 | n/a | self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m") |
---|
432 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam", |
---|
433 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam") |
---|
434 | n/a | self.checkReqURI("http://127.0.0.1/spammity/sp%E4m", |
---|
435 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m") |
---|
436 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam;ham", |
---|
437 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham") |
---|
438 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678", |
---|
439 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678") |
---|
440 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni", |
---|
441 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") |
---|
442 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni", |
---|
443 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni") |
---|
444 | n/a | self.checkReqURI("http://127.0.0.1/spammity/spam", 0, |
---|
445 | n/a | SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") |
---|
446 | n/a | |
---|
447 | n/a | def testFileWrapper(self): |
---|
448 | n/a | self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10]) |
---|
449 | n/a | |
---|
450 | n/a | def testHopByHop(self): |
---|
451 | n/a | for hop in ( |
---|
452 | n/a | "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization " |
---|
453 | n/a | "TE Trailers Transfer-Encoding Upgrade" |
---|
454 | n/a | ).split(): |
---|
455 | n/a | for alt in hop, hop.title(), hop.upper(), hop.lower(): |
---|
456 | n/a | self.assertTrue(util.is_hop_by_hop(alt)) |
---|
457 | n/a | |
---|
458 | n/a | # Not comprehensive, just a few random header names |
---|
459 | n/a | for hop in ( |
---|
460 | n/a | "Accept Cache-Control Date Pragma Trailer Via Warning" |
---|
461 | n/a | ).split(): |
---|
462 | n/a | for alt in hop, hop.title(), hop.upper(), hop.lower(): |
---|
463 | n/a | self.assertFalse(util.is_hop_by_hop(alt)) |
---|
464 | n/a | |
---|
465 | n/a | class HeaderTests(TestCase): |
---|
466 | n/a | |
---|
467 | n/a | def testMappingInterface(self): |
---|
468 | n/a | test = [('x','y')] |
---|
469 | n/a | self.assertEqual(len(Headers()), 0) |
---|
470 | n/a | self.assertEqual(len(Headers([])),0) |
---|
471 | n/a | self.assertEqual(len(Headers(test[:])),1) |
---|
472 | n/a | self.assertEqual(Headers(test[:]).keys(), ['x']) |
---|
473 | n/a | self.assertEqual(Headers(test[:]).values(), ['y']) |
---|
474 | n/a | self.assertEqual(Headers(test[:]).items(), test) |
---|
475 | n/a | self.assertIsNot(Headers(test).items(), test) # must be copy! |
---|
476 | n/a | |
---|
477 | n/a | h = Headers() |
---|
478 | n/a | del h['foo'] # should not raise an error |
---|
479 | n/a | |
---|
480 | n/a | h['Foo'] = 'bar' |
---|
481 | n/a | for m in h.__contains__, h.get, h.get_all, h.__getitem__: |
---|
482 | n/a | self.assertTrue(m('foo')) |
---|
483 | n/a | self.assertTrue(m('Foo')) |
---|
484 | n/a | self.assertTrue(m('FOO')) |
---|
485 | n/a | self.assertFalse(m('bar')) |
---|
486 | n/a | |
---|
487 | n/a | self.assertEqual(h['foo'],'bar') |
---|
488 | n/a | h['foo'] = 'baz' |
---|
489 | n/a | self.assertEqual(h['FOO'],'baz') |
---|
490 | n/a | self.assertEqual(h.get_all('foo'),['baz']) |
---|
491 | n/a | |
---|
492 | n/a | self.assertEqual(h.get("foo","whee"), "baz") |
---|
493 | n/a | self.assertEqual(h.get("zoo","whee"), "whee") |
---|
494 | n/a | self.assertEqual(h.setdefault("foo","whee"), "baz") |
---|
495 | n/a | self.assertEqual(h.setdefault("zoo","whee"), "whee") |
---|
496 | n/a | self.assertEqual(h["foo"],"baz") |
---|
497 | n/a | self.assertEqual(h["zoo"],"whee") |
---|
498 | n/a | |
---|
499 | n/a | def testRequireList(self): |
---|
500 | n/a | self.assertRaises(TypeError, Headers, "foo") |
---|
501 | n/a | |
---|
502 | n/a | def testExtras(self): |
---|
503 | n/a | h = Headers() |
---|
504 | n/a | self.assertEqual(str(h),'\r\n') |
---|
505 | n/a | |
---|
506 | n/a | h.add_header('foo','bar',baz="spam") |
---|
507 | n/a | self.assertEqual(h['foo'], 'bar; baz="spam"') |
---|
508 | n/a | self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n') |
---|
509 | n/a | |
---|
510 | n/a | h.add_header('Foo','bar',cheese=None) |
---|
511 | n/a | self.assertEqual(h.get_all('foo'), |
---|
512 | n/a | ['bar; baz="spam"', 'bar; cheese']) |
---|
513 | n/a | |
---|
514 | n/a | self.assertEqual(str(h), |
---|
515 | n/a | 'foo: bar; baz="spam"\r\n' |
---|
516 | n/a | 'Foo: bar; cheese\r\n' |
---|
517 | n/a | '\r\n' |
---|
518 | n/a | ) |
---|
519 | n/a | |
---|
520 | n/a | class ErrorHandler(BaseCGIHandler): |
---|
521 | n/a | """Simple handler subclass for testing BaseHandler""" |
---|
522 | n/a | |
---|
523 | n/a | # BaseHandler records the OS environment at import time, but envvars |
---|
524 | n/a | # might have been changed later by other tests, which trips up |
---|
525 | n/a | # HandlerTests.testEnviron(). |
---|
526 | n/a | os_environ = dict(os.environ.items()) |
---|
527 | n/a | |
---|
528 | n/a | def __init__(self,**kw): |
---|
529 | n/a | setup_testing_defaults(kw) |
---|
530 | n/a | BaseCGIHandler.__init__( |
---|
531 | n/a | self, BytesIO(), BytesIO(), StringIO(), kw, |
---|
532 | n/a | multithread=True, multiprocess=True |
---|
533 | n/a | ) |
---|
534 | n/a | |
---|
535 | n/a | class TestHandler(ErrorHandler): |
---|
536 | n/a | """Simple handler subclass for testing BaseHandler, w/error passthru""" |
---|
537 | n/a | |
---|
538 | n/a | def handle_error(self): |
---|
539 | n/a | raise # for testing, we want to see what's happening |
---|
540 | n/a | |
---|
541 | n/a | |
---|
542 | n/a | class HandlerTests(TestCase): |
---|
543 | n/a | |
---|
544 | n/a | def checkEnvironAttrs(self, handler): |
---|
545 | n/a | env = handler.environ |
---|
546 | n/a | for attr in [ |
---|
547 | n/a | 'version','multithread','multiprocess','run_once','file_wrapper' |
---|
548 | n/a | ]: |
---|
549 | n/a | if attr=='file_wrapper' and handler.wsgi_file_wrapper is None: |
---|
550 | n/a | continue |
---|
551 | n/a | self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr]) |
---|
552 | n/a | |
---|
553 | n/a | def checkOSEnviron(self,handler): |
---|
554 | n/a | empty = {}; setup_testing_defaults(empty) |
---|
555 | n/a | env = handler.environ |
---|
556 | n/a | from os import environ |
---|
557 | n/a | for k,v in environ.items(): |
---|
558 | n/a | if k not in empty: |
---|
559 | n/a | self.assertEqual(env[k],v) |
---|
560 | n/a | for k,v in empty.items(): |
---|
561 | n/a | self.assertIn(k, env) |
---|
562 | n/a | |
---|
563 | n/a | def testEnviron(self): |
---|
564 | n/a | h = TestHandler(X="Y") |
---|
565 | n/a | h.setup_environ() |
---|
566 | n/a | self.checkEnvironAttrs(h) |
---|
567 | n/a | self.checkOSEnviron(h) |
---|
568 | n/a | self.assertEqual(h.environ["X"],"Y") |
---|
569 | n/a | |
---|
570 | n/a | def testCGIEnviron(self): |
---|
571 | n/a | h = BaseCGIHandler(None,None,None,{}) |
---|
572 | n/a | h.setup_environ() |
---|
573 | n/a | for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors': |
---|
574 | n/a | self.assertIn(key, h.environ) |
---|
575 | n/a | |
---|
576 | n/a | def testScheme(self): |
---|
577 | n/a | h=TestHandler(HTTPS="on"); h.setup_environ() |
---|
578 | n/a | self.assertEqual(h.environ['wsgi.url_scheme'],'https') |
---|
579 | n/a | h=TestHandler(); h.setup_environ() |
---|
580 | n/a | self.assertEqual(h.environ['wsgi.url_scheme'],'http') |
---|
581 | n/a | |
---|
582 | n/a | def testAbstractMethods(self): |
---|
583 | n/a | h = BaseHandler() |
---|
584 | n/a | for name in [ |
---|
585 | n/a | '_flush','get_stdin','get_stderr','add_cgi_vars' |
---|
586 | n/a | ]: |
---|
587 | n/a | self.assertRaises(NotImplementedError, getattr(h,name)) |
---|
588 | n/a | self.assertRaises(NotImplementedError, h._write, "test") |
---|
589 | n/a | |
---|
590 | n/a | def testContentLength(self): |
---|
591 | n/a | # Demo one reason iteration is better than write()... ;) |
---|
592 | n/a | |
---|
593 | n/a | def trivial_app1(e,s): |
---|
594 | n/a | s('200 OK',[]) |
---|
595 | n/a | return [e['wsgi.url_scheme'].encode('iso-8859-1')] |
---|
596 | n/a | |
---|
597 | n/a | def trivial_app2(e,s): |
---|
598 | n/a | s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1')) |
---|
599 | n/a | return [] |
---|
600 | n/a | |
---|
601 | n/a | def trivial_app3(e,s): |
---|
602 | n/a | s('200 OK',[]) |
---|
603 | n/a | return ['\u0442\u0435\u0441\u0442'.encode("utf-8")] |
---|
604 | n/a | |
---|
605 | n/a | def trivial_app4(e,s): |
---|
606 | n/a | # Simulate a response to a HEAD request |
---|
607 | n/a | s('200 OK',[('Content-Length', '12345')]) |
---|
608 | n/a | return [] |
---|
609 | n/a | |
---|
610 | n/a | h = TestHandler() |
---|
611 | n/a | h.run(trivial_app1) |
---|
612 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
613 | n/a | ("Status: 200 OK\r\n" |
---|
614 | n/a | "Content-Length: 4\r\n" |
---|
615 | n/a | "\r\n" |
---|
616 | n/a | "http").encode("iso-8859-1")) |
---|
617 | n/a | |
---|
618 | n/a | h = TestHandler() |
---|
619 | n/a | h.run(trivial_app2) |
---|
620 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
621 | n/a | ("Status: 200 OK\r\n" |
---|
622 | n/a | "\r\n" |
---|
623 | n/a | "http").encode("iso-8859-1")) |
---|
624 | n/a | |
---|
625 | n/a | h = TestHandler() |
---|
626 | n/a | h.run(trivial_app3) |
---|
627 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
628 | n/a | b'Status: 200 OK\r\n' |
---|
629 | n/a | b'Content-Length: 8\r\n' |
---|
630 | n/a | b'\r\n' |
---|
631 | n/a | b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82') |
---|
632 | n/a | |
---|
633 | n/a | h = TestHandler() |
---|
634 | n/a | h.run(trivial_app4) |
---|
635 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
636 | n/a | b'Status: 200 OK\r\n' |
---|
637 | n/a | b'Content-Length: 12345\r\n' |
---|
638 | n/a | b'\r\n') |
---|
639 | n/a | |
---|
640 | n/a | def testBasicErrorOutput(self): |
---|
641 | n/a | |
---|
642 | n/a | def non_error_app(e,s): |
---|
643 | n/a | s('200 OK',[]) |
---|
644 | n/a | return [] |
---|
645 | n/a | |
---|
646 | n/a | def error_app(e,s): |
---|
647 | n/a | raise AssertionError("This should be caught by handler") |
---|
648 | n/a | |
---|
649 | n/a | h = ErrorHandler() |
---|
650 | n/a | h.run(non_error_app) |
---|
651 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
652 | n/a | ("Status: 200 OK\r\n" |
---|
653 | n/a | "Content-Length: 0\r\n" |
---|
654 | n/a | "\r\n").encode("iso-8859-1")) |
---|
655 | n/a | self.assertEqual(h.stderr.getvalue(),"") |
---|
656 | n/a | |
---|
657 | n/a | h = ErrorHandler() |
---|
658 | n/a | h.run(error_app) |
---|
659 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
660 | n/a | ("Status: %s\r\n" |
---|
661 | n/a | "Content-Type: text/plain\r\n" |
---|
662 | n/a | "Content-Length: %d\r\n" |
---|
663 | n/a | "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1') |
---|
664 | n/a | + h.error_body) |
---|
665 | n/a | |
---|
666 | n/a | self.assertIn("AssertionError", h.stderr.getvalue()) |
---|
667 | n/a | |
---|
668 | n/a | def testErrorAfterOutput(self): |
---|
669 | n/a | MSG = b"Some output has been sent" |
---|
670 | n/a | def error_app(e,s): |
---|
671 | n/a | s("200 OK",[])(MSG) |
---|
672 | n/a | raise AssertionError("This should be caught by handler") |
---|
673 | n/a | |
---|
674 | n/a | h = ErrorHandler() |
---|
675 | n/a | h.run(error_app) |
---|
676 | n/a | self.assertEqual(h.stdout.getvalue(), |
---|
677 | n/a | ("Status: 200 OK\r\n" |
---|
678 | n/a | "\r\n".encode("iso-8859-1")+MSG)) |
---|
679 | n/a | self.assertIn("AssertionError", h.stderr.getvalue()) |
---|
680 | n/a | |
---|
681 | n/a | def testHeaderFormats(self): |
---|
682 | n/a | |
---|
683 | n/a | def non_error_app(e,s): |
---|
684 | n/a | s('200 OK',[]) |
---|
685 | n/a | return [] |
---|
686 | n/a | |
---|
687 | n/a | stdpat = ( |
---|
688 | n/a | r"HTTP/%s 200 OK\r\n" |
---|
689 | n/a | r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n" |
---|
690 | n/a | r"%s" r"Content-Length: 0\r\n" r"\r\n" |
---|
691 | n/a | ) |
---|
692 | n/a | shortpat = ( |
---|
693 | n/a | "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n" |
---|
694 | n/a | ).encode("iso-8859-1") |
---|
695 | n/a | |
---|
696 | n/a | for ssw in "FooBar/1.0", None: |
---|
697 | n/a | sw = ssw and "Server: %s\r\n" % ssw or "" |
---|
698 | n/a | |
---|
699 | n/a | for version in "1.0", "1.1": |
---|
700 | n/a | for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1": |
---|
701 | n/a | |
---|
702 | n/a | h = TestHandler(SERVER_PROTOCOL=proto) |
---|
703 | n/a | h.origin_server = False |
---|
704 | n/a | h.http_version = version |
---|
705 | n/a | h.server_software = ssw |
---|
706 | n/a | h.run(non_error_app) |
---|
707 | n/a | self.assertEqual(shortpat,h.stdout.getvalue()) |
---|
708 | n/a | |
---|
709 | n/a | h = TestHandler(SERVER_PROTOCOL=proto) |
---|
710 | n/a | h.origin_server = True |
---|
711 | n/a | h.http_version = version |
---|
712 | n/a | h.server_software = ssw |
---|
713 | n/a | h.run(non_error_app) |
---|
714 | n/a | if proto=="HTTP/0.9": |
---|
715 | n/a | self.assertEqual(h.stdout.getvalue(),b"") |
---|
716 | n/a | else: |
---|
717 | n/a | self.assertTrue( |
---|
718 | n/a | re.match((stdpat%(version,sw)).encode("iso-8859-1"), |
---|
719 | n/a | h.stdout.getvalue()), |
---|
720 | n/a | ((stdpat%(version,sw)).encode("iso-8859-1"), |
---|
721 | n/a | h.stdout.getvalue()) |
---|
722 | n/a | ) |
---|
723 | n/a | |
---|
724 | n/a | def testBytesData(self): |
---|
725 | n/a | def app(e, s): |
---|
726 | n/a | s("200 OK", [ |
---|
727 | n/a | ("Content-Type", "text/plain; charset=utf-8"), |
---|
728 | n/a | ]) |
---|
729 | n/a | return [b"data"] |
---|
730 | n/a | |
---|
731 | n/a | h = TestHandler() |
---|
732 | n/a | h.run(app) |
---|
733 | n/a | self.assertEqual(b"Status: 200 OK\r\n" |
---|
734 | n/a | b"Content-Type: text/plain; charset=utf-8\r\n" |
---|
735 | n/a | b"Content-Length: 4\r\n" |
---|
736 | n/a | b"\r\n" |
---|
737 | n/a | b"data", |
---|
738 | n/a | h.stdout.getvalue()) |
---|
739 | n/a | |
---|
740 | n/a | def testCloseOnError(self): |
---|
741 | n/a | side_effects = {'close_called': False} |
---|
742 | n/a | MSG = b"Some output has been sent" |
---|
743 | n/a | def error_app(e,s): |
---|
744 | n/a | s("200 OK",[])(MSG) |
---|
745 | n/a | class CrashyIterable(object): |
---|
746 | n/a | def __iter__(self): |
---|
747 | n/a | while True: |
---|
748 | n/a | yield b'blah' |
---|
749 | n/a | raise AssertionError("This should be caught by handler") |
---|
750 | n/a | def close(self): |
---|
751 | n/a | side_effects['close_called'] = True |
---|
752 | n/a | return CrashyIterable() |
---|
753 | n/a | |
---|
754 | n/a | h = ErrorHandler() |
---|
755 | n/a | h.run(error_app) |
---|
756 | n/a | self.assertEqual(side_effects['close_called'], True) |
---|
757 | n/a | |
---|
758 | n/a | def testPartialWrite(self): |
---|
759 | n/a | written = bytearray() |
---|
760 | n/a | |
---|
761 | n/a | class PartialWriter: |
---|
762 | n/a | def write(self, b): |
---|
763 | n/a | partial = b[:7] |
---|
764 | n/a | written.extend(partial) |
---|
765 | n/a | return len(partial) |
---|
766 | n/a | |
---|
767 | n/a | def flush(self): |
---|
768 | n/a | pass |
---|
769 | n/a | |
---|
770 | n/a | environ = {"SERVER_PROTOCOL": "HTTP/1.0"} |
---|
771 | n/a | h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ) |
---|
772 | n/a | msg = "should not do partial writes" |
---|
773 | n/a | with self.assertWarnsRegex(DeprecationWarning, msg): |
---|
774 | n/a | h.run(hello_app) |
---|
775 | n/a | self.assertEqual(b"HTTP/1.0 200 OK\r\n" |
---|
776 | n/a | b"Content-Type: text/plain\r\n" |
---|
777 | n/a | b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" |
---|
778 | n/a | b"Content-Length: 13\r\n" |
---|
779 | n/a | b"\r\n" |
---|
780 | n/a | b"Hello, world!", |
---|
781 | n/a | written) |
---|
782 | n/a | |
---|
783 | n/a | |
---|
784 | n/a | if __name__ == "__main__": |
---|
785 | n/a | unittest.main() |
---|