ยปCore Development>Code coverage>Lib/test/test_wsgiref.py

Python code coverage for Lib/test/test_wsgiref.py

#countcontent
1n/afrom unittest import mock
2n/afrom test import support
3n/afrom test.test_httpservers import NoLogRequestHandler
4n/afrom unittest import TestCase
5n/afrom wsgiref.util import setup_testing_defaults
6n/afrom wsgiref.headers import Headers
7n/afrom wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler
8n/afrom wsgiref import util
9n/afrom wsgiref.validate import validator
10n/afrom wsgiref.simple_server import WSGIServer, WSGIRequestHandler
11n/afrom wsgiref.simple_server import make_server
12n/afrom http.client import HTTPConnection
13n/afrom io import StringIO, BytesIO, BufferedReader
14n/afrom socketserver import BaseServer
15n/afrom platform import python_implementation
16n/a
17n/aimport os
18n/aimport re
19n/aimport signal
20n/aimport sys
21n/aimport unittest
22n/a
23n/a
24n/aclass MockServer(WSGIServer):
25n/a """Non-socket HTTP server"""
26n/a
27n/a def __init__(self, server_address, RequestHandlerClass):
28n/a BaseServer.__init__(self, server_address, RequestHandlerClass)
29n/a self.server_bind()
30n/a
31n/a def server_bind(self):
32n/a host, port = self.server_address
33n/a self.server_name = host
34n/a self.server_port = port
35n/a self.setup_environ()
36n/a
37n/a
38n/aclass MockHandler(WSGIRequestHandler):
39n/a """Non-socket HTTP handler"""
40n/a def setup(self):
41n/a self.connection = self.request
42n/a self.rfile, self.wfile = self.connection
43n/a
44n/a def finish(self):
45n/a pass
46n/a
47n/a
48n/adef hello_app(environ,start_response):
49n/a start_response("200 OK", [
50n/a ('Content-Type','text/plain'),
51n/a ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
52n/a ])
53n/a return [b"Hello, world!"]
54n/a
55n/a
56n/adef header_app(environ, start_response):
57n/a start_response("200 OK", [
58n/a ('Content-Type', 'text/plain'),
59n/a ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT')
60n/a ])
61n/a return [';'.join([
62n/a environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'],
63n/a environ['PATH_INFO']
64n/a ]).encode('iso-8859-1')]
65n/a
66n/a
67n/adef run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
68n/a server = make_server("", 80, app, MockServer, MockHandler)
69n/a inp = BufferedReader(BytesIO(data))
70n/a out = BytesIO()
71n/a olderr = sys.stderr
72n/a err = sys.stderr = StringIO()
73n/a
74n/a try:
75n/a server.finish_request((inp, out), ("127.0.0.1",8888))
76n/a finally:
77n/a sys.stderr = olderr
78n/a
79n/a return out.getvalue(), err.getvalue()
80n/a
81n/adef compare_generic_iter(make_it,match):
82n/a """Utility to compare a generic 2.1/2.2+ iterator with an iterable
83n/a
84n/a If running under Python 2.2+, this tests the iterator using iter()/next(),
85n/a as well as __getitem__. 'make_it' must be a function returning a fresh
86n/a iterator to be tested (since this may test the iterator twice)."""
87n/a
88n/a it = make_it()
89n/a n = 0
90n/a for item in match:
91n/a if not it[n]==item: raise AssertionError
92n/a n+=1
93n/a try:
94n/a it[n]
95n/a except IndexError:
96n/a pass
97n/a else:
98n/a raise AssertionError("Too many items from __getitem__",it)
99n/a
100n/a try:
101n/a iter, StopIteration
102n/a except NameError:
103n/a pass
104n/a else:
105n/a # Only test iter mode under 2.2+
106n/a it = make_it()
107n/a if not iter(it) is it: raise AssertionError
108n/a for item in match:
109n/a if not next(it) == item: raise AssertionError
110n/a try:
111n/a next(it)
112n/a except StopIteration:
113n/a pass
114n/a else:
115n/a raise AssertionError("Too many items from .__next__()", it)
116n/a
117n/a
118n/aclass IntegrationTests(TestCase):
119n/a
120n/a def check_hello(self, out, has_length=True):
121n/a pyver = (python_implementation() + "/" +
122n/a sys.version.split()[0])
123n/a self.assertEqual(out,
124n/a ("HTTP/1.0 200 OK\r\n"
125n/a "Server: WSGIServer/0.2 " + pyver +"\r\n"
126n/a "Content-Type: text/plain\r\n"
127n/a "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
128n/a (has_length and "Content-Length: 13\r\n" or "") +
129n/a "\r\n"
130n/a "Hello, world!").encode("iso-8859-1")
131n/a )
132n/a
133n/a def test_plain_hello(self):
134n/a out, err = run_amock()
135n/a self.check_hello(out)
136n/a
137n/a def test_environ(self):
138n/a request = (
139n/a b"GET /p%61th/?query=test HTTP/1.0\n"
140n/a b"X-Test-Header: Python test \n"
141n/a b"X-Test-Header: Python test 2\n"
142n/a b"Content-Length: 0\n\n"
143n/a )
144n/a out, err = run_amock(header_app, request)
145n/a self.assertEqual(
146n/a out.splitlines()[-1],
147n/a b"Python test,Python test 2;query=test;/path/"
148n/a )
149n/a
150n/a def test_request_length(self):
151n/a out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n")
152n/a self.assertEqual(out.splitlines()[0],
153n/a b"HTTP/1.0 414 Request-URI Too Long")
154n/a
155n/a def test_validated_hello(self):
156n/a out, err = run_amock(validator(hello_app))
157n/a # the middleware doesn't support len(), so content-length isn't there
158n/a self.check_hello(out, has_length=False)
159n/a
160n/a def test_simple_validation_error(self):
161n/a def bad_app(environ,start_response):
162n/a start_response("200 OK", ('Content-Type','text/plain'))
163n/a return ["Hello, world!"]
164n/a out, err = run_amock(validator(bad_app))
165n/a self.assertTrue(out.endswith(
166n/a b"A server error occurred. Please contact the administrator."
167n/a ))
168n/a self.assertEqual(
169n/a err.splitlines()[-2],
170n/a "AssertionError: Headers (('Content-Type', 'text/plain')) must"
171n/a " be of type list: <class 'tuple'>"
172n/a )
173n/a
174n/a def test_status_validation_errors(self):
175n/a def create_bad_app(status):
176n/a def bad_app(environ, start_response):
177n/a start_response(status, [("Content-Type", "text/plain; charset=utf-8")])
178n/a return [b"Hello, world!"]
179n/a return bad_app
180n/a
181n/a tests = [
182n/a ('200', 'AssertionError: Status must be at least 4 characters'),
183n/a ('20X OK', 'AssertionError: Status message must begin w/3-digit code'),
184n/a ('200OK', 'AssertionError: Status message must have a space after code'),
185n/a ]
186n/a
187n/a for status, exc_message in tests:
188n/a with self.subTest(status=status):
189n/a out, err = run_amock(create_bad_app(status))
190n/a self.assertTrue(out.endswith(
191n/a b"A server error occurred. Please contact the administrator."
192n/a ))
193n/a self.assertEqual(err.splitlines()[-2], exc_message)
194n/a
195n/a def test_wsgi_input(self):
196n/a def bad_app(e,s):
197n/a e["wsgi.input"].read()
198n/a s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
199n/a return [b"data"]
200n/a out, err = run_amock(validator(bad_app))
201n/a self.assertTrue(out.endswith(
202n/a b"A server error occurred. Please contact the administrator."
203n/a ))
204n/a self.assertEqual(
205n/a err.splitlines()[-2], "AssertionError"
206n/a )
207n/a
208n/a def test_bytes_validation(self):
209n/a def app(e, s):
210n/a s("200 OK", [
211n/a ("Content-Type", "text/plain; charset=utf-8"),
212n/a ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
213n/a ])
214n/a return [b"data"]
215n/a out, err = run_amock(validator(app))
216n/a self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
217n/a ver = sys.version.split()[0].encode('ascii')
218n/a py = python_implementation().encode('ascii')
219n/a pyver = py + b"/" + ver
220n/a self.assertEqual(
221n/a b"HTTP/1.0 200 OK\r\n"
222n/a b"Server: WSGIServer/0.2 "+ pyver + b"\r\n"
223n/a b"Content-Type: text/plain; charset=utf-8\r\n"
224n/a b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
225n/a b"\r\n"
226n/a b"data",
227n/a out)
228n/a
229n/a def test_cp1252_url(self):
230n/a def app(e, s):
231n/a s("200 OK", [
232n/a ("Content-Type", "text/plain"),
233n/a ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
234n/a ])
235n/a # PEP3333 says environ variables are decoded as latin1.
236n/a # Encode as latin1 to get original bytes
237n/a return [e["PATH_INFO"].encode("latin1")]
238n/a
239n/a out, err = run_amock(
240n/a validator(app), data=b"GET /\x80%80 HTTP/1.0")
241n/a self.assertEqual(
242n/a [
243n/a b"HTTP/1.0 200 OK",
244n/a mock.ANY,
245n/a b"Content-Type: text/plain",
246n/a b"Date: Wed, 24 Dec 2008 13:29:32 GMT",
247n/a b"",
248n/a b"/\x80\x80",
249n/a ],
250n/a out.splitlines())
251n/a
252n/a def test_interrupted_write(self):
253n/a # BaseHandler._write() and _flush() have to write all data, even if
254n/a # it takes multiple send() calls. Test this by interrupting a send()
255n/a # call with a Unix signal.
256n/a threading = support.import_module("threading")
257n/a pthread_kill = support.get_attribute(signal, "pthread_kill")
258n/a
259n/a def app(environ, start_response):
260n/a start_response("200 OK", [])
261n/a return [b'\0' * support.SOCK_MAX_SIZE]
262n/a
263n/a class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
264n/a pass
265n/a
266n/a server = make_server(support.HOST, 0, app, handler_class=WsgiHandler)
267n/a self.addCleanup(server.server_close)
268n/a interrupted = threading.Event()
269n/a
270n/a def signal_handler(signum, frame):
271n/a interrupted.set()
272n/a
273n/a original = signal.signal(signal.SIGUSR1, signal_handler)
274n/a self.addCleanup(signal.signal, signal.SIGUSR1, original)
275n/a received = None
276n/a main_thread = threading.get_ident()
277n/a
278n/a def run_client():
279n/a http = HTTPConnection(*server.server_address)
280n/a http.request("GET", "/")
281n/a with http.getresponse() as response:
282n/a response.read(100)
283n/a # The main thread should now be blocking in a send() system
284n/a # call. But in theory, it could get interrupted by other
285n/a # signals, and then retried. So keep sending the signal in a
286n/a # loop, in case an earlier signal happens to be delivered at
287n/a # an inconvenient moment.
288n/a while True:
289n/a pthread_kill(main_thread, signal.SIGUSR1)
290n/a if interrupted.wait(timeout=float(1)):
291n/a break
292n/a nonlocal received
293n/a received = len(response.read())
294n/a http.close()
295n/a
296n/a background = threading.Thread(target=run_client)
297n/a background.start()
298n/a server.handle_request()
299n/a background.join()
300n/a self.assertEqual(received, support.SOCK_MAX_SIZE - 100)
301n/a
302n/a
303n/aclass UtilityTests(TestCase):
304n/a
305n/a def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
306n/a env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
307n/a util.setup_testing_defaults(env)
308n/a self.assertEqual(util.shift_path_info(env),part)
309n/a self.assertEqual(env['PATH_INFO'],pi_out)
310n/a self.assertEqual(env['SCRIPT_NAME'],sn_out)
311n/a return env
312n/a
313n/a def checkDefault(self, key, value, alt=None):
314n/a # Check defaulting when empty
315n/a env = {}
316n/a util.setup_testing_defaults(env)
317n/a if isinstance(value, StringIO):
318n/a self.assertIsInstance(env[key], StringIO)
319n/a elif isinstance(value,BytesIO):
320n/a self.assertIsInstance(env[key],BytesIO)
321n/a else:
322n/a self.assertEqual(env[key], value)
323n/a
324n/a # Check existing value
325n/a env = {key:alt}
326n/a util.setup_testing_defaults(env)
327n/a self.assertIs(env[key], alt)
328n/a
329n/a def checkCrossDefault(self,key,value,**kw):
330n/a util.setup_testing_defaults(kw)
331n/a self.assertEqual(kw[key],value)
332n/a
333n/a def checkAppURI(self,uri,**kw):
334n/a util.setup_testing_defaults(kw)
335n/a self.assertEqual(util.application_uri(kw),uri)
336n/a
337n/a def checkReqURI(self,uri,query=1,**kw):
338n/a util.setup_testing_defaults(kw)
339n/a self.assertEqual(util.request_uri(kw,query),uri)
340n/a
341n/a def checkFW(self,text,size,match):
342n/a
343n/a def make_it(text=text,size=size):
344n/a return util.FileWrapper(StringIO(text),size)
345n/a
346n/a compare_generic_iter(make_it,match)
347n/a
348n/a it = make_it()
349n/a self.assertFalse(it.filelike.closed)
350n/a
351n/a for item in it:
352n/a pass
353n/a
354n/a self.assertFalse(it.filelike.closed)
355n/a
356n/a it.close()
357n/a self.assertTrue(it.filelike.closed)
358n/a
359n/a def testSimpleShifts(self):
360n/a self.checkShift('','/', '', '/', '')
361n/a self.checkShift('','/x', 'x', '/x', '')
362n/a self.checkShift('/','', None, '/', '')
363n/a self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
364n/a self.checkShift('/a','/x/', 'x', '/a/x', '/')
365n/a
366n/a def testNormalizedShifts(self):
367n/a self.checkShift('/a/b', '/../y', '..', '/a', '/y')
368n/a self.checkShift('', '/../y', '..', '', '/y')
369n/a self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
370n/a self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
371n/a self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
372n/a self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
373n/a self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
374n/a self.checkShift('/a/b', '///', '', '/a/b/', '')
375n/a self.checkShift('/a/b', '/.//', '', '/a/b/', '')
376n/a self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
377n/a self.checkShift('/a/b', '/.', None, '/a/b', '')
378n/a
379n/a def testDefaults(self):
380n/a for key, value in [
381n/a ('SERVER_NAME','127.0.0.1'),
382n/a ('SERVER_PORT', '80'),
383n/a ('SERVER_PROTOCOL','HTTP/1.0'),
384n/a ('HTTP_HOST','127.0.0.1'),
385n/a ('REQUEST_METHOD','GET'),
386n/a ('SCRIPT_NAME',''),
387n/a ('PATH_INFO','/'),
388n/a ('wsgi.version', (1,0)),
389n/a ('wsgi.run_once', 0),
390n/a ('wsgi.multithread', 0),
391n/a ('wsgi.multiprocess', 0),
392n/a ('wsgi.input', BytesIO()),
393n/a ('wsgi.errors', StringIO()),
394n/a ('wsgi.url_scheme','http'),
395n/a ]:
396n/a self.checkDefault(key,value)
397n/a
398n/a def testCrossDefaults(self):
399n/a self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
400n/a self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
401n/a self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
402n/a self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
403n/a self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
404n/a self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
405n/a self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
406n/a
407n/a def testGuessScheme(self):
408n/a self.assertEqual(util.guess_scheme({}), "http")
409n/a self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
410n/a self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
411n/a self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
412n/a self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
413n/a
414n/a def testAppURIs(self):
415n/a self.checkAppURI("http://127.0.0.1/")
416n/a self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
417n/a self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
418n/a self.checkAppURI("http://spam.example.com:2071/",
419n/a HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
420n/a self.checkAppURI("http://spam.example.com/",
421n/a SERVER_NAME="spam.example.com")
422n/a self.checkAppURI("http://127.0.0.1/",
423n/a HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
424n/a self.checkAppURI("https://127.0.0.1/", HTTPS="on")
425n/a self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
426n/a HTTP_HOST=None)
427n/a
428n/a def testReqURIs(self):
429n/a self.checkReqURI("http://127.0.0.1/")
430n/a self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
431n/a self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
432n/a self.checkReqURI("http://127.0.0.1/spammity/spam",
433n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam")
434n/a self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
435n/a SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
436n/a self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
437n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
438n/a self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
439n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
440n/a self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
441n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
442n/a self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
443n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
444n/a self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
445n/a SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
446n/a
447n/a def testFileWrapper(self):
448n/a self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
449n/a
450n/a def testHopByHop(self):
451n/a for hop in (
452n/a "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
453n/a "TE Trailers Transfer-Encoding Upgrade"
454n/a ).split():
455n/a for alt in hop, hop.title(), hop.upper(), hop.lower():
456n/a self.assertTrue(util.is_hop_by_hop(alt))
457n/a
458n/a # Not comprehensive, just a few random header names
459n/a for hop in (
460n/a "Accept Cache-Control Date Pragma Trailer Via Warning"
461n/a ).split():
462n/a for alt in hop, hop.title(), hop.upper(), hop.lower():
463n/a self.assertFalse(util.is_hop_by_hop(alt))
464n/a
465n/aclass HeaderTests(TestCase):
466n/a
467n/a def testMappingInterface(self):
468n/a test = [('x','y')]
469n/a self.assertEqual(len(Headers()), 0)
470n/a self.assertEqual(len(Headers([])),0)
471n/a self.assertEqual(len(Headers(test[:])),1)
472n/a self.assertEqual(Headers(test[:]).keys(), ['x'])
473n/a self.assertEqual(Headers(test[:]).values(), ['y'])
474n/a self.assertEqual(Headers(test[:]).items(), test)
475n/a self.assertIsNot(Headers(test).items(), test) # must be copy!
476n/a
477n/a h = Headers()
478n/a del h['foo'] # should not raise an error
479n/a
480n/a h['Foo'] = 'bar'
481n/a for m in h.__contains__, h.get, h.get_all, h.__getitem__:
482n/a self.assertTrue(m('foo'))
483n/a self.assertTrue(m('Foo'))
484n/a self.assertTrue(m('FOO'))
485n/a self.assertFalse(m('bar'))
486n/a
487n/a self.assertEqual(h['foo'],'bar')
488n/a h['foo'] = 'baz'
489n/a self.assertEqual(h['FOO'],'baz')
490n/a self.assertEqual(h.get_all('foo'),['baz'])
491n/a
492n/a self.assertEqual(h.get("foo","whee"), "baz")
493n/a self.assertEqual(h.get("zoo","whee"), "whee")
494n/a self.assertEqual(h.setdefault("foo","whee"), "baz")
495n/a self.assertEqual(h.setdefault("zoo","whee"), "whee")
496n/a self.assertEqual(h["foo"],"baz")
497n/a self.assertEqual(h["zoo"],"whee")
498n/a
499n/a def testRequireList(self):
500n/a self.assertRaises(TypeError, Headers, "foo")
501n/a
502n/a def testExtras(self):
503n/a h = Headers()
504n/a self.assertEqual(str(h),'\r\n')
505n/a
506n/a h.add_header('foo','bar',baz="spam")
507n/a self.assertEqual(h['foo'], 'bar; baz="spam"')
508n/a self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
509n/a
510n/a h.add_header('Foo','bar',cheese=None)
511n/a self.assertEqual(h.get_all('foo'),
512n/a ['bar; baz="spam"', 'bar; cheese'])
513n/a
514n/a self.assertEqual(str(h),
515n/a 'foo: bar; baz="spam"\r\n'
516n/a 'Foo: bar; cheese\r\n'
517n/a '\r\n'
518n/a )
519n/a
520n/aclass ErrorHandler(BaseCGIHandler):
521n/a """Simple handler subclass for testing BaseHandler"""
522n/a
523n/a # BaseHandler records the OS environment at import time, but envvars
524n/a # might have been changed later by other tests, which trips up
525n/a # HandlerTests.testEnviron().
526n/a os_environ = dict(os.environ.items())
527n/a
528n/a def __init__(self,**kw):
529n/a setup_testing_defaults(kw)
530n/a BaseCGIHandler.__init__(
531n/a self, BytesIO(), BytesIO(), StringIO(), kw,
532n/a multithread=True, multiprocess=True
533n/a )
534n/a
535n/aclass TestHandler(ErrorHandler):
536n/a """Simple handler subclass for testing BaseHandler, w/error passthru"""
537n/a
538n/a def handle_error(self):
539n/a raise # for testing, we want to see what's happening
540n/a
541n/a
542n/aclass HandlerTests(TestCase):
543n/a
544n/a def checkEnvironAttrs(self, handler):
545n/a env = handler.environ
546n/a for attr in [
547n/a 'version','multithread','multiprocess','run_once','file_wrapper'
548n/a ]:
549n/a if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
550n/a continue
551n/a self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
552n/a
553n/a def checkOSEnviron(self,handler):
554n/a empty = {}; setup_testing_defaults(empty)
555n/a env = handler.environ
556n/a from os import environ
557n/a for k,v in environ.items():
558n/a if k not in empty:
559n/a self.assertEqual(env[k],v)
560n/a for k,v in empty.items():
561n/a self.assertIn(k, env)
562n/a
563n/a def testEnviron(self):
564n/a h = TestHandler(X="Y")
565n/a h.setup_environ()
566n/a self.checkEnvironAttrs(h)
567n/a self.checkOSEnviron(h)
568n/a self.assertEqual(h.environ["X"],"Y")
569n/a
570n/a def testCGIEnviron(self):
571n/a h = BaseCGIHandler(None,None,None,{})
572n/a h.setup_environ()
573n/a for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
574n/a self.assertIn(key, h.environ)
575n/a
576n/a def testScheme(self):
577n/a h=TestHandler(HTTPS="on"); h.setup_environ()
578n/a self.assertEqual(h.environ['wsgi.url_scheme'],'https')
579n/a h=TestHandler(); h.setup_environ()
580n/a self.assertEqual(h.environ['wsgi.url_scheme'],'http')
581n/a
582n/a def testAbstractMethods(self):
583n/a h = BaseHandler()
584n/a for name in [
585n/a '_flush','get_stdin','get_stderr','add_cgi_vars'
586n/a ]:
587n/a self.assertRaises(NotImplementedError, getattr(h,name))
588n/a self.assertRaises(NotImplementedError, h._write, "test")
589n/a
590n/a def testContentLength(self):
591n/a # Demo one reason iteration is better than write()... ;)
592n/a
593n/a def trivial_app1(e,s):
594n/a s('200 OK',[])
595n/a return [e['wsgi.url_scheme'].encode('iso-8859-1')]
596n/a
597n/a def trivial_app2(e,s):
598n/a s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
599n/a return []
600n/a
601n/a def trivial_app3(e,s):
602n/a s('200 OK',[])
603n/a return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
604n/a
605n/a def trivial_app4(e,s):
606n/a # Simulate a response to a HEAD request
607n/a s('200 OK',[('Content-Length', '12345')])
608n/a return []
609n/a
610n/a h = TestHandler()
611n/a h.run(trivial_app1)
612n/a self.assertEqual(h.stdout.getvalue(),
613n/a ("Status: 200 OK\r\n"
614n/a "Content-Length: 4\r\n"
615n/a "\r\n"
616n/a "http").encode("iso-8859-1"))
617n/a
618n/a h = TestHandler()
619n/a h.run(trivial_app2)
620n/a self.assertEqual(h.stdout.getvalue(),
621n/a ("Status: 200 OK\r\n"
622n/a "\r\n"
623n/a "http").encode("iso-8859-1"))
624n/a
625n/a h = TestHandler()
626n/a h.run(trivial_app3)
627n/a self.assertEqual(h.stdout.getvalue(),
628n/a b'Status: 200 OK\r\n'
629n/a b'Content-Length: 8\r\n'
630n/a b'\r\n'
631n/a b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
632n/a
633n/a h = TestHandler()
634n/a h.run(trivial_app4)
635n/a self.assertEqual(h.stdout.getvalue(),
636n/a b'Status: 200 OK\r\n'
637n/a b'Content-Length: 12345\r\n'
638n/a b'\r\n')
639n/a
640n/a def testBasicErrorOutput(self):
641n/a
642n/a def non_error_app(e,s):
643n/a s('200 OK',[])
644n/a return []
645n/a
646n/a def error_app(e,s):
647n/a raise AssertionError("This should be caught by handler")
648n/a
649n/a h = ErrorHandler()
650n/a h.run(non_error_app)
651n/a self.assertEqual(h.stdout.getvalue(),
652n/a ("Status: 200 OK\r\n"
653n/a "Content-Length: 0\r\n"
654n/a "\r\n").encode("iso-8859-1"))
655n/a self.assertEqual(h.stderr.getvalue(),"")
656n/a
657n/a h = ErrorHandler()
658n/a h.run(error_app)
659n/a self.assertEqual(h.stdout.getvalue(),
660n/a ("Status: %s\r\n"
661n/a "Content-Type: text/plain\r\n"
662n/a "Content-Length: %d\r\n"
663n/a "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
664n/a + h.error_body)
665n/a
666n/a self.assertIn("AssertionError", h.stderr.getvalue())
667n/a
668n/a def testErrorAfterOutput(self):
669n/a MSG = b"Some output has been sent"
670n/a def error_app(e,s):
671n/a s("200 OK",[])(MSG)
672n/a raise AssertionError("This should be caught by handler")
673n/a
674n/a h = ErrorHandler()
675n/a h.run(error_app)
676n/a self.assertEqual(h.stdout.getvalue(),
677n/a ("Status: 200 OK\r\n"
678n/a "\r\n".encode("iso-8859-1")+MSG))
679n/a self.assertIn("AssertionError", h.stderr.getvalue())
680n/a
681n/a def testHeaderFormats(self):
682n/a
683n/a def non_error_app(e,s):
684n/a s('200 OK',[])
685n/a return []
686n/a
687n/a stdpat = (
688n/a r"HTTP/%s 200 OK\r\n"
689n/a r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
690n/a r"%s" r"Content-Length: 0\r\n" r"\r\n"
691n/a )
692n/a shortpat = (
693n/a "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
694n/a ).encode("iso-8859-1")
695n/a
696n/a for ssw in "FooBar/1.0", None:
697n/a sw = ssw and "Server: %s\r\n" % ssw or ""
698n/a
699n/a for version in "1.0", "1.1":
700n/a for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
701n/a
702n/a h = TestHandler(SERVER_PROTOCOL=proto)
703n/a h.origin_server = False
704n/a h.http_version = version
705n/a h.server_software = ssw
706n/a h.run(non_error_app)
707n/a self.assertEqual(shortpat,h.stdout.getvalue())
708n/a
709n/a h = TestHandler(SERVER_PROTOCOL=proto)
710n/a h.origin_server = True
711n/a h.http_version = version
712n/a h.server_software = ssw
713n/a h.run(non_error_app)
714n/a if proto=="HTTP/0.9":
715n/a self.assertEqual(h.stdout.getvalue(),b"")
716n/a else:
717n/a self.assertTrue(
718n/a re.match((stdpat%(version,sw)).encode("iso-8859-1"),
719n/a h.stdout.getvalue()),
720n/a ((stdpat%(version,sw)).encode("iso-8859-1"),
721n/a h.stdout.getvalue())
722n/a )
723n/a
724n/a def testBytesData(self):
725n/a def app(e, s):
726n/a s("200 OK", [
727n/a ("Content-Type", "text/plain; charset=utf-8"),
728n/a ])
729n/a return [b"data"]
730n/a
731n/a h = TestHandler()
732n/a h.run(app)
733n/a self.assertEqual(b"Status: 200 OK\r\n"
734n/a b"Content-Type: text/plain; charset=utf-8\r\n"
735n/a b"Content-Length: 4\r\n"
736n/a b"\r\n"
737n/a b"data",
738n/a h.stdout.getvalue())
739n/a
740n/a def testCloseOnError(self):
741n/a side_effects = {'close_called': False}
742n/a MSG = b"Some output has been sent"
743n/a def error_app(e,s):
744n/a s("200 OK",[])(MSG)
745n/a class CrashyIterable(object):
746n/a def __iter__(self):
747n/a while True:
748n/a yield b'blah'
749n/a raise AssertionError("This should be caught by handler")
750n/a def close(self):
751n/a side_effects['close_called'] = True
752n/a return CrashyIterable()
753n/a
754n/a h = ErrorHandler()
755n/a h.run(error_app)
756n/a self.assertEqual(side_effects['close_called'], True)
757n/a
758n/a def testPartialWrite(self):
759n/a written = bytearray()
760n/a
761n/a class PartialWriter:
762n/a def write(self, b):
763n/a partial = b[:7]
764n/a written.extend(partial)
765n/a return len(partial)
766n/a
767n/a def flush(self):
768n/a pass
769n/a
770n/a environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
771n/a h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ)
772n/a msg = "should not do partial writes"
773n/a with self.assertWarnsRegex(DeprecationWarning, msg):
774n/a h.run(hello_app)
775n/a self.assertEqual(b"HTTP/1.0 200 OK\r\n"
776n/a b"Content-Type: text/plain\r\n"
777n/a b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n"
778n/a b"Content-Length: 13\r\n"
779n/a b"\r\n"
780n/a b"Hello, world!",
781n/a written)
782n/a
783n/a
784n/aif __name__ == "__main__":
785n/a unittest.main()