ยปCore Development>Code coverage>Lib/wsgiref/validate.py

Python code coverage for Lib/wsgiref/validate.py

#countcontent
1n/a# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
2n/a# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
3n/a# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php
4n/a# Licensed to PSF under a Contributor Agreement
5n/a"""
6n/aMiddleware to check for obedience to the WSGI specification.
7n/a
8n/aSome of the things this checks:
9n/a
10n/a* Signature of the application and start_response (including that
11n/a keyword arguments are not used).
12n/a
13n/a* Environment checks:
14n/a
15n/a - Environment is a dictionary (and not a subclass).
16n/a
17n/a - That all the required keys are in the environment: REQUEST_METHOD,
18n/a SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors,
19n/a wsgi.multithread, wsgi.multiprocess, wsgi.run_once
20n/a
21n/a - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the
22n/a environment (these headers should appear as CONTENT_LENGTH and
23n/a CONTENT_TYPE).
24n/a
25n/a - Warns if QUERY_STRING is missing, as the cgi module acts
26n/a unpredictably in that case.
27n/a
28n/a - That CGI-style variables (that don't contain a .) have
29n/a (non-unicode) string values
30n/a
31n/a - That wsgi.version is a tuple
32n/a
33n/a - That wsgi.url_scheme is 'http' or 'https' (@@: is this too
34n/a restrictive?)
35n/a
36n/a - Warns if the REQUEST_METHOD is not known (@@: probably too
37n/a restrictive).
38n/a
39n/a - That SCRIPT_NAME and PATH_INFO are empty or start with /
40n/a
41n/a - That at least one of SCRIPT_NAME or PATH_INFO are set.
42n/a
43n/a - That CONTENT_LENGTH is a positive integer.
44n/a
45n/a - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should
46n/a be '/').
47n/a
48n/a - That wsgi.input has the methods read, readline, readlines, and
49n/a __iter__
50n/a
51n/a - That wsgi.errors has the methods flush, write, writelines
52n/a
53n/a* The status is a string, contains a space, starts with an integer,
54n/a and that integer is in range (> 100).
55n/a
56n/a* That the headers is a list (not a subclass, not another kind of
57n/a sequence).
58n/a
59n/a* That the items of the headers are tuples of strings.
60n/a
61n/a* That there is no 'status' header (that is used in CGI, but not in
62n/a WSGI).
63n/a
64n/a* That the headers don't contain newlines or colons, end in _ or -, or
65n/a contain characters codes below 037.
66n/a
67n/a* That Content-Type is given if there is content (CGI often has a
68n/a default content type, but WSGI does not).
69n/a
70n/a* That no Content-Type is given when there is no content (@@: is this
71n/a too restrictive?)
72n/a
73n/a* That the exc_info argument to start_response is a tuple or None.
74n/a
75n/a* That all calls to the writer are with strings, and no other methods
76n/a on the writer are accessed.
77n/a
78n/a* That wsgi.input is used properly:
79n/a
80n/a - .read() is called with zero or one argument
81n/a
82n/a - That it returns a string
83n/a
84n/a - That readline, readlines, and __iter__ return strings
85n/a
86n/a - That .close() is not called
87n/a
88n/a - No other methods are provided
89n/a
90n/a* That wsgi.errors is used properly:
91n/a
92n/a - .write() and .writelines() is called with a string
93n/a
94n/a - That .close() is not called, and no other methods are provided.
95n/a
96n/a* The response iterator:
97n/a
98n/a - That it is not a string (it should be a list of a single string; a
99n/a string will work, but perform horribly).
100n/a
101n/a - That .__next__() returns a string
102n/a
103n/a - That the iterator is not iterated over until start_response has
104n/a been called (that can signal either a server or application
105n/a error).
106n/a
107n/a - That .close() is called (doesn't raise exception, only prints to
108n/a sys.stderr, because we only know it isn't called when the object
109n/a is garbage collected).
110n/a"""
111n/a__all__ = ['validator']
112n/a
113n/a
114n/aimport re
115n/aimport sys
116n/aimport warnings
117n/a
118n/aheader_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$')
119n/abad_header_value_re = re.compile(r'[\000-\037]')
120n/a
121n/aclass WSGIWarning(Warning):
122n/a """
123n/a Raised in response to WSGI-spec-related warnings
124n/a """
125n/a
126n/adef assert_(cond, *args):
127n/a if not cond:
128n/a raise AssertionError(*args)
129n/a
130n/adef check_string_type(value, title):
131n/a if type (value) is str:
132n/a return value
133n/a raise AssertionError(
134n/a "{0} must be of type str (got {1})".format(title, repr(value)))
135n/a
136n/adef validator(application):
137n/a
138n/a """
139n/a When applied between a WSGI server and a WSGI application, this
140n/a middleware will check for WSGI compliancy on a number of levels.
141n/a This middleware does not modify the request or response in any
142n/a way, but will raise an AssertionError if anything seems off
143n/a (except for a failure to close the application iterator, which
144n/a will be printed to stderr -- there's no way to raise an exception
145n/a at that point).
146n/a """
147n/a
148n/a def lint_app(*args, **kw):
149n/a assert_(len(args) == 2, "Two arguments required")
150n/a assert_(not kw, "No keyword arguments allowed")
151n/a environ, start_response = args
152n/a
153n/a check_environ(environ)
154n/a
155n/a # We use this to check if the application returns without
156n/a # calling start_response:
157n/a start_response_started = []
158n/a
159n/a def start_response_wrapper(*args, **kw):
160n/a assert_(len(args) == 2 or len(args) == 3, (
161n/a "Invalid number of arguments: %s" % (args,)))
162n/a assert_(not kw, "No keyword arguments allowed")
163n/a status = args[0]
164n/a headers = args[1]
165n/a if len(args) == 3:
166n/a exc_info = args[2]
167n/a else:
168n/a exc_info = None
169n/a
170n/a check_status(status)
171n/a check_headers(headers)
172n/a check_content_type(status, headers)
173n/a check_exc_info(exc_info)
174n/a
175n/a start_response_started.append(None)
176n/a return WriteWrapper(start_response(*args))
177n/a
178n/a environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
179n/a environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
180n/a
181n/a iterator = application(environ, start_response_wrapper)
182n/a assert_(iterator is not None and iterator != False,
183n/a "The application must return an iterator, if only an empty list")
184n/a
185n/a check_iterator(iterator)
186n/a
187n/a return IteratorWrapper(iterator, start_response_started)
188n/a
189n/a return lint_app
190n/a
191n/aclass InputWrapper:
192n/a
193n/a def __init__(self, wsgi_input):
194n/a self.input = wsgi_input
195n/a
196n/a def read(self, *args):
197n/a assert_(len(args) == 1)
198n/a v = self.input.read(*args)
199n/a assert_(type(v) is bytes)
200n/a return v
201n/a
202n/a def readline(self, *args):
203n/a assert_(len(args) <= 1)
204n/a v = self.input.readline(*args)
205n/a assert_(type(v) is bytes)
206n/a return v
207n/a
208n/a def readlines(self, *args):
209n/a assert_(len(args) <= 1)
210n/a lines = self.input.readlines(*args)
211n/a assert_(type(lines) is list)
212n/a for line in lines:
213n/a assert_(type(line) is bytes)
214n/a return lines
215n/a
216n/a def __iter__(self):
217n/a while 1:
218n/a line = self.readline()
219n/a if not line:
220n/a return
221n/a yield line
222n/a
223n/a def close(self):
224n/a assert_(0, "input.close() must not be called")
225n/a
226n/aclass ErrorWrapper:
227n/a
228n/a def __init__(self, wsgi_errors):
229n/a self.errors = wsgi_errors
230n/a
231n/a def write(self, s):
232n/a assert_(type(s) is str)
233n/a self.errors.write(s)
234n/a
235n/a def flush(self):
236n/a self.errors.flush()
237n/a
238n/a def writelines(self, seq):
239n/a for line in seq:
240n/a self.write(line)
241n/a
242n/a def close(self):
243n/a assert_(0, "errors.close() must not be called")
244n/a
245n/aclass WriteWrapper:
246n/a
247n/a def __init__(self, wsgi_writer):
248n/a self.writer = wsgi_writer
249n/a
250n/a def __call__(self, s):
251n/a assert_(type(s) is bytes)
252n/a self.writer(s)
253n/a
254n/aclass PartialIteratorWrapper:
255n/a
256n/a def __init__(self, wsgi_iterator):
257n/a self.iterator = wsgi_iterator
258n/a
259n/a def __iter__(self):
260n/a # We want to make sure __iter__ is called
261n/a return IteratorWrapper(self.iterator, None)
262n/a
263n/aclass IteratorWrapper:
264n/a
265n/a def __init__(self, wsgi_iterator, check_start_response):
266n/a self.original_iterator = wsgi_iterator
267n/a self.iterator = iter(wsgi_iterator)
268n/a self.closed = False
269n/a self.check_start_response = check_start_response
270n/a
271n/a def __iter__(self):
272n/a return self
273n/a
274n/a def __next__(self):
275n/a assert_(not self.closed,
276n/a "Iterator read after closed")
277n/a v = next(self.iterator)
278n/a if type(v) is not bytes:
279n/a assert_(False, "Iterator yielded non-bytestring (%r)" % (v,))
280n/a if self.check_start_response is not None:
281n/a assert_(self.check_start_response,
282n/a "The application returns and we started iterating over its body, but start_response has not yet been called")
283n/a self.check_start_response = None
284n/a return v
285n/a
286n/a def close(self):
287n/a self.closed = True
288n/a if hasattr(self.original_iterator, 'close'):
289n/a self.original_iterator.close()
290n/a
291n/a def __del__(self):
292n/a if not self.closed:
293n/a sys.stderr.write(
294n/a "Iterator garbage collected without being closed")
295n/a assert_(self.closed,
296n/a "Iterator garbage collected without being closed")
297n/a
298n/adef check_environ(environ):
299n/a assert_(type(environ) is dict,
300n/a "Environment is not of the right type: %r (environment: %r)"
301n/a % (type(environ), environ))
302n/a
303n/a for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
304n/a 'wsgi.version', 'wsgi.input', 'wsgi.errors',
305n/a 'wsgi.multithread', 'wsgi.multiprocess',
306n/a 'wsgi.run_once']:
307n/a assert_(key in environ,
308n/a "Environment missing required key: %r" % (key,))
309n/a
310n/a for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']:
311n/a assert_(key not in environ,
312n/a "Environment should not have the key: %s "
313n/a "(use %s instead)" % (key, key[5:]))
314n/a
315n/a if 'QUERY_STRING' not in environ:
316n/a warnings.warn(
317n/a 'QUERY_STRING is not in the WSGI environment; the cgi '
318n/a 'module will use sys.argv when this variable is missing, '
319n/a 'so application errors are more likely',
320n/a WSGIWarning)
321n/a
322n/a for key in environ.keys():
323n/a if '.' in key:
324n/a # Extension, we don't care about its type
325n/a continue
326n/a assert_(type(environ[key]) is str,
327n/a "Environmental variable %s is not a string: %r (value: %r)"
328n/a % (key, type(environ[key]), environ[key]))
329n/a
330n/a assert_(type(environ['wsgi.version']) is tuple,
331n/a "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],))
332n/a assert_(environ['wsgi.url_scheme'] in ('http', 'https'),
333n/a "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
334n/a
335n/a check_input(environ['wsgi.input'])
336n/a check_errors(environ['wsgi.errors'])
337n/a
338n/a # @@: these need filling out:
339n/a if environ['REQUEST_METHOD'] not in (
340n/a 'GET', 'HEAD', 'POST', 'OPTIONS', 'PATCH', 'PUT', 'DELETE', 'TRACE'):
341n/a warnings.warn(
342n/a "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'],
343n/a WSGIWarning)
344n/a
345n/a assert_(not environ.get('SCRIPT_NAME')
346n/a or environ['SCRIPT_NAME'].startswith('/'),
347n/a "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
348n/a assert_(not environ.get('PATH_INFO')
349n/a or environ['PATH_INFO'].startswith('/'),
350n/a "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
351n/a if environ.get('CONTENT_LENGTH'):
352n/a assert_(int(environ['CONTENT_LENGTH']) >= 0,
353n/a "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'])
354n/a
355n/a if not environ.get('SCRIPT_NAME'):
356n/a assert_('PATH_INFO' in environ,
357n/a "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO "
358n/a "should at least be '/' if SCRIPT_NAME is empty)")
359n/a assert_(environ.get('SCRIPT_NAME') != '/',
360n/a "SCRIPT_NAME cannot be '/'; it should instead be '', and "
361n/a "PATH_INFO should be '/'")
362n/a
363n/adef check_input(wsgi_input):
364n/a for attr in ['read', 'readline', 'readlines', '__iter__']:
365n/a assert_(hasattr(wsgi_input, attr),
366n/a "wsgi.input (%r) doesn't have the attribute %s"
367n/a % (wsgi_input, attr))
368n/a
369n/adef check_errors(wsgi_errors):
370n/a for attr in ['flush', 'write', 'writelines']:
371n/a assert_(hasattr(wsgi_errors, attr),
372n/a "wsgi.errors (%r) doesn't have the attribute %s"
373n/a % (wsgi_errors, attr))
374n/a
375n/adef check_status(status):
376n/a status = check_string_type(status, "Status")
377n/a # Implicitly check that we can turn it into an integer:
378n/a status_code = status.split(None, 1)[0]
379n/a assert_(len(status_code) == 3,
380n/a "Status codes must be three characters: %r" % status_code)
381n/a status_int = int(status_code)
382n/a assert_(status_int >= 100, "Status code is invalid: %r" % status_int)
383n/a if len(status) < 4 or status[3] != ' ':
384n/a warnings.warn(
385n/a "The status string (%r) should be a three-digit integer "
386n/a "followed by a single space and a status explanation"
387n/a % status, WSGIWarning)
388n/a
389n/adef check_headers(headers):
390n/a assert_(type(headers) is list,
391n/a "Headers (%r) must be of type list: %r"
392n/a % (headers, type(headers)))
393n/a header_names = {}
394n/a for item in headers:
395n/a assert_(type(item) is tuple,
396n/a "Individual headers (%r) must be of type tuple: %r"
397n/a % (item, type(item)))
398n/a assert_(len(item) == 2)
399n/a name, value = item
400n/a name = check_string_type(name, "Header name")
401n/a value = check_string_type(value, "Header value")
402n/a assert_(name.lower() != 'status',
403n/a "The Status header cannot be used; it conflicts with CGI "
404n/a "script, and HTTP status is not given through headers "
405n/a "(value: %r)." % value)
406n/a header_names[name.lower()] = None
407n/a assert_('\n' not in name and ':' not in name,
408n/a "Header names may not contain ':' or '\\n': %r" % name)
409n/a assert_(header_re.search(name), "Bad header name: %r" % name)
410n/a assert_(not name.endswith('-') and not name.endswith('_'),
411n/a "Names may not end in '-' or '_': %r" % name)
412n/a if bad_header_value_re.search(value):
413n/a assert_(0, "Bad header value: %r (bad char: %r)"
414n/a % (value, bad_header_value_re.search(value).group(0)))
415n/a
416n/adef check_content_type(status, headers):
417n/a status = check_string_type(status, "Status")
418n/a code = int(status.split(None, 1)[0])
419n/a # @@: need one more person to verify this interpretation of RFC 2616
420n/a # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
421n/a NO_MESSAGE_BODY = (204, 304)
422n/a for name, value in headers:
423n/a name = check_string_type(name, "Header name")
424n/a if name.lower() == 'content-type':
425n/a if code not in NO_MESSAGE_BODY:
426n/a return
427n/a assert_(0, ("Content-Type header found in a %s response, "
428n/a "which must not return content.") % code)
429n/a if code not in NO_MESSAGE_BODY:
430n/a assert_(0, "No Content-Type header found in headers (%s)" % headers)
431n/a
432n/adef check_exc_info(exc_info):
433n/a assert_(exc_info is None or type(exc_info) is tuple,
434n/a "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
435n/a # More exc_info checks?
436n/a
437n/adef check_iterator(iterator):
438n/a # Technically a bytestring is legal, which is why it's a really bad
439n/a # idea, because it may cause the response to be returned
440n/a # character-by-character
441n/a assert_(not isinstance(iterator, (str, bytes)),
442n/a "You should not return a string as your application iterator, "
443n/a "instead return a single-item list containing a bytestring.")