1 | n/a | import base64 |
---|
2 | n/a | import os |
---|
3 | n/a | import email |
---|
4 | n/a | import urllib.parse |
---|
5 | n/a | import urllib.request |
---|
6 | n/a | import http.server |
---|
7 | n/a | import unittest |
---|
8 | n/a | import hashlib |
---|
9 | n/a | |
---|
10 | n/a | from test import support |
---|
11 | n/a | |
---|
12 | n/a | threading = support.import_module('threading') |
---|
13 | n/a | |
---|
14 | n/a | try: |
---|
15 | n/a | import ssl |
---|
16 | n/a | except ImportError: |
---|
17 | n/a | ssl = None |
---|
18 | n/a | |
---|
19 | n/a | here = os.path.dirname(__file__) |
---|
20 | n/a | # Self-signed cert file for 'localhost' |
---|
21 | n/a | CERT_localhost = os.path.join(here, 'keycert.pem') |
---|
22 | n/a | # Self-signed cert file for 'fakehostname' |
---|
23 | n/a | CERT_fakehostname = os.path.join(here, 'keycert2.pem') |
---|
24 | n/a | |
---|
25 | n/a | |
---|
26 | n/a | # Loopback http server infrastructure |
---|
27 | n/a | |
---|
28 | n/a | class LoopbackHttpServer(http.server.HTTPServer): |
---|
29 | n/a | """HTTP server w/ a few modifications that make it useful for |
---|
30 | n/a | loopback testing purposes. |
---|
31 | n/a | """ |
---|
32 | n/a | |
---|
33 | n/a | def __init__(self, server_address, RequestHandlerClass): |
---|
34 | n/a | http.server.HTTPServer.__init__(self, |
---|
35 | n/a | server_address, |
---|
36 | n/a | RequestHandlerClass) |
---|
37 | n/a | |
---|
38 | n/a | # Set the timeout of our listening socket really low so |
---|
39 | n/a | # that we can stop the server easily. |
---|
40 | n/a | self.socket.settimeout(0.1) |
---|
41 | n/a | |
---|
42 | n/a | def get_request(self): |
---|
43 | n/a | """HTTPServer method, overridden.""" |
---|
44 | n/a | |
---|
45 | n/a | request, client_address = self.socket.accept() |
---|
46 | n/a | |
---|
47 | n/a | # It's a loopback connection, so setting the timeout |
---|
48 | n/a | # really low shouldn't affect anything, but should make |
---|
49 | n/a | # deadlocks less likely to occur. |
---|
50 | n/a | request.settimeout(10.0) |
---|
51 | n/a | |
---|
52 | n/a | return (request, client_address) |
---|
53 | n/a | |
---|
54 | n/a | class LoopbackHttpServerThread(threading.Thread): |
---|
55 | n/a | """Stoppable thread that runs a loopback http server.""" |
---|
56 | n/a | |
---|
57 | n/a | def __init__(self, request_handler): |
---|
58 | n/a | threading.Thread.__init__(self) |
---|
59 | n/a | self._stop_server = False |
---|
60 | n/a | self.ready = threading.Event() |
---|
61 | n/a | request_handler.protocol_version = "HTTP/1.0" |
---|
62 | n/a | self.httpd = LoopbackHttpServer(("127.0.0.1", 0), |
---|
63 | n/a | request_handler) |
---|
64 | n/a | self.port = self.httpd.server_port |
---|
65 | n/a | |
---|
66 | n/a | def stop(self): |
---|
67 | n/a | """Stops the webserver if it's currently running.""" |
---|
68 | n/a | |
---|
69 | n/a | self._stop_server = True |
---|
70 | n/a | |
---|
71 | n/a | self.join() |
---|
72 | n/a | self.httpd.server_close() |
---|
73 | n/a | |
---|
74 | n/a | def run(self): |
---|
75 | n/a | self.ready.set() |
---|
76 | n/a | while not self._stop_server: |
---|
77 | n/a | self.httpd.handle_request() |
---|
78 | n/a | |
---|
79 | n/a | # Authentication infrastructure |
---|
80 | n/a | |
---|
81 | n/a | class DigestAuthHandler: |
---|
82 | n/a | """Handler for performing digest authentication.""" |
---|
83 | n/a | |
---|
84 | n/a | def __init__(self): |
---|
85 | n/a | self._request_num = 0 |
---|
86 | n/a | self._nonces = [] |
---|
87 | n/a | self._users = {} |
---|
88 | n/a | self._realm_name = "Test Realm" |
---|
89 | n/a | self._qop = "auth" |
---|
90 | n/a | |
---|
91 | n/a | def set_qop(self, qop): |
---|
92 | n/a | self._qop = qop |
---|
93 | n/a | |
---|
94 | n/a | def set_users(self, users): |
---|
95 | n/a | assert isinstance(users, dict) |
---|
96 | n/a | self._users = users |
---|
97 | n/a | |
---|
98 | n/a | def set_realm(self, realm): |
---|
99 | n/a | self._realm_name = realm |
---|
100 | n/a | |
---|
101 | n/a | def _generate_nonce(self): |
---|
102 | n/a | self._request_num += 1 |
---|
103 | n/a | nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() |
---|
104 | n/a | self._nonces.append(nonce) |
---|
105 | n/a | return nonce |
---|
106 | n/a | |
---|
107 | n/a | def _create_auth_dict(self, auth_str): |
---|
108 | n/a | first_space_index = auth_str.find(" ") |
---|
109 | n/a | auth_str = auth_str[first_space_index+1:] |
---|
110 | n/a | |
---|
111 | n/a | parts = auth_str.split(",") |
---|
112 | n/a | |
---|
113 | n/a | auth_dict = {} |
---|
114 | n/a | for part in parts: |
---|
115 | n/a | name, value = part.split("=") |
---|
116 | n/a | name = name.strip() |
---|
117 | n/a | if value[0] == '"' and value[-1] == '"': |
---|
118 | n/a | value = value[1:-1] |
---|
119 | n/a | else: |
---|
120 | n/a | value = value.strip() |
---|
121 | n/a | auth_dict[name] = value |
---|
122 | n/a | return auth_dict |
---|
123 | n/a | |
---|
124 | n/a | def _validate_auth(self, auth_dict, password, method, uri): |
---|
125 | n/a | final_dict = {} |
---|
126 | n/a | final_dict.update(auth_dict) |
---|
127 | n/a | final_dict["password"] = password |
---|
128 | n/a | final_dict["method"] = method |
---|
129 | n/a | final_dict["uri"] = uri |
---|
130 | n/a | HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict |
---|
131 | n/a | HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() |
---|
132 | n/a | HA2_str = "%(method)s:%(uri)s" % final_dict |
---|
133 | n/a | HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() |
---|
134 | n/a | final_dict["HA1"] = HA1 |
---|
135 | n/a | final_dict["HA2"] = HA2 |
---|
136 | n/a | response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ |
---|
137 | n/a | "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict |
---|
138 | n/a | response = hashlib.md5(response_str.encode("ascii")).hexdigest() |
---|
139 | n/a | |
---|
140 | n/a | return response == auth_dict["response"] |
---|
141 | n/a | |
---|
142 | n/a | def _return_auth_challenge(self, request_handler): |
---|
143 | n/a | request_handler.send_response(407, "Proxy Authentication Required") |
---|
144 | n/a | request_handler.send_header("Content-Type", "text/html") |
---|
145 | n/a | request_handler.send_header( |
---|
146 | n/a | 'Proxy-Authenticate', 'Digest realm="%s", ' |
---|
147 | n/a | 'qop="%s",' |
---|
148 | n/a | 'nonce="%s", ' % \ |
---|
149 | n/a | (self._realm_name, self._qop, self._generate_nonce())) |
---|
150 | n/a | # XXX: Not sure if we're supposed to add this next header or |
---|
151 | n/a | # not. |
---|
152 | n/a | #request_handler.send_header('Connection', 'close') |
---|
153 | n/a | request_handler.end_headers() |
---|
154 | n/a | request_handler.wfile.write(b"Proxy Authentication Required.") |
---|
155 | n/a | return False |
---|
156 | n/a | |
---|
157 | n/a | def handle_request(self, request_handler): |
---|
158 | n/a | """Performs digest authentication on the given HTTP request |
---|
159 | n/a | handler. Returns True if authentication was successful, False |
---|
160 | n/a | otherwise. |
---|
161 | n/a | |
---|
162 | n/a | If no users have been set, then digest auth is effectively |
---|
163 | n/a | disabled and this method will always return True. |
---|
164 | n/a | """ |
---|
165 | n/a | |
---|
166 | n/a | if len(self._users) == 0: |
---|
167 | n/a | return True |
---|
168 | n/a | |
---|
169 | n/a | if "Proxy-Authorization" not in request_handler.headers: |
---|
170 | n/a | return self._return_auth_challenge(request_handler) |
---|
171 | n/a | else: |
---|
172 | n/a | auth_dict = self._create_auth_dict( |
---|
173 | n/a | request_handler.headers["Proxy-Authorization"] |
---|
174 | n/a | ) |
---|
175 | n/a | if auth_dict["username"] in self._users: |
---|
176 | n/a | password = self._users[ auth_dict["username"] ] |
---|
177 | n/a | else: |
---|
178 | n/a | return self._return_auth_challenge(request_handler) |
---|
179 | n/a | if not auth_dict.get("nonce") in self._nonces: |
---|
180 | n/a | return self._return_auth_challenge(request_handler) |
---|
181 | n/a | else: |
---|
182 | n/a | self._nonces.remove(auth_dict["nonce"]) |
---|
183 | n/a | |
---|
184 | n/a | auth_validated = False |
---|
185 | n/a | |
---|
186 | n/a | # MSIE uses short_path in its validation, but Python's |
---|
187 | n/a | # urllib.request uses the full path, so we're going to see if |
---|
188 | n/a | # either of them works here. |
---|
189 | n/a | |
---|
190 | n/a | for path in [request_handler.path, request_handler.short_path]: |
---|
191 | n/a | if self._validate_auth(auth_dict, |
---|
192 | n/a | password, |
---|
193 | n/a | request_handler.command, |
---|
194 | n/a | path): |
---|
195 | n/a | auth_validated = True |
---|
196 | n/a | |
---|
197 | n/a | if not auth_validated: |
---|
198 | n/a | return self._return_auth_challenge(request_handler) |
---|
199 | n/a | return True |
---|
200 | n/a | |
---|
201 | n/a | |
---|
202 | n/a | class BasicAuthHandler(http.server.BaseHTTPRequestHandler): |
---|
203 | n/a | """Handler for performing basic authentication.""" |
---|
204 | n/a | # Server side values |
---|
205 | n/a | USER = 'testUser' |
---|
206 | n/a | PASSWD = 'testPass' |
---|
207 | n/a | REALM = 'Test' |
---|
208 | n/a | USER_PASSWD = "%s:%s" % (USER, PASSWD) |
---|
209 | n/a | ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') |
---|
210 | n/a | |
---|
211 | n/a | def __init__(self, *args, **kwargs): |
---|
212 | n/a | http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) |
---|
213 | n/a | |
---|
214 | n/a | def log_message(self, format, *args): |
---|
215 | n/a | # Suppress console log message |
---|
216 | n/a | pass |
---|
217 | n/a | |
---|
218 | n/a | def do_HEAD(self): |
---|
219 | n/a | self.send_response(200) |
---|
220 | n/a | self.send_header("Content-type", "text/html") |
---|
221 | n/a | self.end_headers() |
---|
222 | n/a | |
---|
223 | n/a | def do_AUTHHEAD(self): |
---|
224 | n/a | self.send_response(401) |
---|
225 | n/a | self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) |
---|
226 | n/a | self.send_header("Content-type", "text/html") |
---|
227 | n/a | self.end_headers() |
---|
228 | n/a | |
---|
229 | n/a | def do_GET(self): |
---|
230 | n/a | if not self.headers.get("Authorization", ""): |
---|
231 | n/a | self.do_AUTHHEAD() |
---|
232 | n/a | self.wfile.write(b"No Auth header received") |
---|
233 | n/a | elif self.headers.get( |
---|
234 | n/a | "Authorization", "") == "Basic " + self.ENCODED_AUTH: |
---|
235 | n/a | self.send_response(200) |
---|
236 | n/a | self.end_headers() |
---|
237 | n/a | self.wfile.write(b"It works") |
---|
238 | n/a | else: |
---|
239 | n/a | # Request Unauthorized |
---|
240 | n/a | self.do_AUTHHEAD() |
---|
241 | n/a | |
---|
242 | n/a | |
---|
243 | n/a | |
---|
244 | n/a | # Proxy test infrastructure |
---|
245 | n/a | |
---|
246 | n/a | class FakeProxyHandler(http.server.BaseHTTPRequestHandler): |
---|
247 | n/a | """This is a 'fake proxy' that makes it look like the entire |
---|
248 | n/a | internet has gone down due to a sudden zombie invasion. It main |
---|
249 | n/a | utility is in providing us with authentication support for |
---|
250 | n/a | testing. |
---|
251 | n/a | """ |
---|
252 | n/a | |
---|
253 | n/a | def __init__(self, digest_auth_handler, *args, **kwargs): |
---|
254 | n/a | # This has to be set before calling our parent's __init__(), which will |
---|
255 | n/a | # try to call do_GET(). |
---|
256 | n/a | self.digest_auth_handler = digest_auth_handler |
---|
257 | n/a | http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) |
---|
258 | n/a | |
---|
259 | n/a | def log_message(self, format, *args): |
---|
260 | n/a | # Uncomment the next line for debugging. |
---|
261 | n/a | # sys.stderr.write(format % args) |
---|
262 | n/a | pass |
---|
263 | n/a | |
---|
264 | n/a | def do_GET(self): |
---|
265 | n/a | (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( |
---|
266 | n/a | self.path, "http") |
---|
267 | n/a | self.short_path = path |
---|
268 | n/a | if self.digest_auth_handler.handle_request(self): |
---|
269 | n/a | self.send_response(200, "OK") |
---|
270 | n/a | self.send_header("Content-Type", "text/html") |
---|
271 | n/a | self.end_headers() |
---|
272 | n/a | self.wfile.write(bytes("You've reached %s!<BR>" % self.path, |
---|
273 | n/a | "ascii")) |
---|
274 | n/a | self.wfile.write(b"Our apologies, but our server is down due to " |
---|
275 | n/a | b"a sudden zombie invasion.") |
---|
276 | n/a | |
---|
277 | n/a | # Test cases |
---|
278 | n/a | |
---|
279 | n/a | @unittest.skipUnless(threading, "Threading required for this test.") |
---|
280 | n/a | class BasicAuthTests(unittest.TestCase): |
---|
281 | n/a | USER = "testUser" |
---|
282 | n/a | PASSWD = "testPass" |
---|
283 | n/a | INCORRECT_PASSWD = "Incorrect" |
---|
284 | n/a | REALM = "Test" |
---|
285 | n/a | |
---|
286 | n/a | def setUp(self): |
---|
287 | n/a | super(BasicAuthTests, self).setUp() |
---|
288 | n/a | # With Basic Authentication |
---|
289 | n/a | def http_server_with_basic_auth_handler(*args, **kwargs): |
---|
290 | n/a | return BasicAuthHandler(*args, **kwargs) |
---|
291 | n/a | self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) |
---|
292 | n/a | self.addCleanup(self.server.stop) |
---|
293 | n/a | self.server_url = 'http://127.0.0.1:%s' % self.server.port |
---|
294 | n/a | self.server.start() |
---|
295 | n/a | self.server.ready.wait() |
---|
296 | n/a | |
---|
297 | n/a | def tearDown(self): |
---|
298 | n/a | super(BasicAuthTests, self).tearDown() |
---|
299 | n/a | |
---|
300 | n/a | def test_basic_auth_success(self): |
---|
301 | n/a | ah = urllib.request.HTTPBasicAuthHandler() |
---|
302 | n/a | ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) |
---|
303 | n/a | urllib.request.install_opener(urllib.request.build_opener(ah)) |
---|
304 | n/a | try: |
---|
305 | n/a | self.assertTrue(urllib.request.urlopen(self.server_url)) |
---|
306 | n/a | except urllib.error.HTTPError: |
---|
307 | n/a | self.fail("Basic auth failed for the url: %s", self.server_url) |
---|
308 | n/a | |
---|
309 | n/a | def test_basic_auth_httperror(self): |
---|
310 | n/a | ah = urllib.request.HTTPBasicAuthHandler() |
---|
311 | n/a | ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) |
---|
312 | n/a | urllib.request.install_opener(urllib.request.build_opener(ah)) |
---|
313 | n/a | self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) |
---|
314 | n/a | |
---|
315 | n/a | |
---|
316 | n/a | @unittest.skipUnless(threading, "Threading required for this test.") |
---|
317 | n/a | class ProxyAuthTests(unittest.TestCase): |
---|
318 | n/a | URL = "http://localhost" |
---|
319 | n/a | |
---|
320 | n/a | USER = "tester" |
---|
321 | n/a | PASSWD = "test123" |
---|
322 | n/a | REALM = "TestRealm" |
---|
323 | n/a | |
---|
324 | n/a | def setUp(self): |
---|
325 | n/a | super(ProxyAuthTests, self).setUp() |
---|
326 | n/a | # Ignore proxy bypass settings in the environment. |
---|
327 | n/a | def restore_environ(old_environ): |
---|
328 | n/a | os.environ.clear() |
---|
329 | n/a | os.environ.update(old_environ) |
---|
330 | n/a | self.addCleanup(restore_environ, os.environ.copy()) |
---|
331 | n/a | os.environ['NO_PROXY'] = '' |
---|
332 | n/a | os.environ['no_proxy'] = '' |
---|
333 | n/a | |
---|
334 | n/a | self.digest_auth_handler = DigestAuthHandler() |
---|
335 | n/a | self.digest_auth_handler.set_users({self.USER: self.PASSWD}) |
---|
336 | n/a | self.digest_auth_handler.set_realm(self.REALM) |
---|
337 | n/a | # With Digest Authentication. |
---|
338 | n/a | def create_fake_proxy_handler(*args, **kwargs): |
---|
339 | n/a | return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) |
---|
340 | n/a | |
---|
341 | n/a | self.server = LoopbackHttpServerThread(create_fake_proxy_handler) |
---|
342 | n/a | self.server.start() |
---|
343 | n/a | self.server.ready.wait() |
---|
344 | n/a | proxy_url = "http://127.0.0.1:%d" % self.server.port |
---|
345 | n/a | handler = urllib.request.ProxyHandler({"http" : proxy_url}) |
---|
346 | n/a | self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() |
---|
347 | n/a | self.opener = urllib.request.build_opener( |
---|
348 | n/a | handler, self.proxy_digest_handler) |
---|
349 | n/a | |
---|
350 | n/a | def tearDown(self): |
---|
351 | n/a | self.server.stop() |
---|
352 | n/a | super(ProxyAuthTests, self).tearDown() |
---|
353 | n/a | |
---|
354 | n/a | def test_proxy_with_bad_password_raises_httperror(self): |
---|
355 | n/a | self.proxy_digest_handler.add_password(self.REALM, self.URL, |
---|
356 | n/a | self.USER, self.PASSWD+"bad") |
---|
357 | n/a | self.digest_auth_handler.set_qop("auth") |
---|
358 | n/a | self.assertRaises(urllib.error.HTTPError, |
---|
359 | n/a | self.opener.open, |
---|
360 | n/a | self.URL) |
---|
361 | n/a | |
---|
362 | n/a | def test_proxy_with_no_password_raises_httperror(self): |
---|
363 | n/a | self.digest_auth_handler.set_qop("auth") |
---|
364 | n/a | self.assertRaises(urllib.error.HTTPError, |
---|
365 | n/a | self.opener.open, |
---|
366 | n/a | self.URL) |
---|
367 | n/a | |
---|
368 | n/a | def test_proxy_qop_auth_works(self): |
---|
369 | n/a | self.proxy_digest_handler.add_password(self.REALM, self.URL, |
---|
370 | n/a | self.USER, self.PASSWD) |
---|
371 | n/a | self.digest_auth_handler.set_qop("auth") |
---|
372 | n/a | result = self.opener.open(self.URL) |
---|
373 | n/a | while result.read(): |
---|
374 | n/a | pass |
---|
375 | n/a | result.close() |
---|
376 | n/a | |
---|
377 | n/a | def test_proxy_qop_auth_int_works_or_throws_urlerror(self): |
---|
378 | n/a | self.proxy_digest_handler.add_password(self.REALM, self.URL, |
---|
379 | n/a | self.USER, self.PASSWD) |
---|
380 | n/a | self.digest_auth_handler.set_qop("auth-int") |
---|
381 | n/a | try: |
---|
382 | n/a | result = self.opener.open(self.URL) |
---|
383 | n/a | except urllib.error.URLError: |
---|
384 | n/a | # It's okay if we don't support auth-int, but we certainly |
---|
385 | n/a | # shouldn't receive any kind of exception here other than |
---|
386 | n/a | # a URLError. |
---|
387 | n/a | result = None |
---|
388 | n/a | if result: |
---|
389 | n/a | while result.read(): |
---|
390 | n/a | pass |
---|
391 | n/a | result.close() |
---|
392 | n/a | |
---|
393 | n/a | |
---|
394 | n/a | def GetRequestHandler(responses): |
---|
395 | n/a | |
---|
396 | n/a | class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): |
---|
397 | n/a | |
---|
398 | n/a | server_version = "TestHTTP/" |
---|
399 | n/a | requests = [] |
---|
400 | n/a | headers_received = [] |
---|
401 | n/a | port = 80 |
---|
402 | n/a | |
---|
403 | n/a | def do_GET(self): |
---|
404 | n/a | body = self.send_head() |
---|
405 | n/a | while body: |
---|
406 | n/a | done = self.wfile.write(body) |
---|
407 | n/a | body = body[done:] |
---|
408 | n/a | |
---|
409 | n/a | def do_POST(self): |
---|
410 | n/a | content_length = self.headers["Content-Length"] |
---|
411 | n/a | post_data = self.rfile.read(int(content_length)) |
---|
412 | n/a | self.do_GET() |
---|
413 | n/a | self.requests.append(post_data) |
---|
414 | n/a | |
---|
415 | n/a | def send_head(self): |
---|
416 | n/a | FakeHTTPRequestHandler.headers_received = self.headers |
---|
417 | n/a | self.requests.append(self.path) |
---|
418 | n/a | response_code, headers, body = responses.pop(0) |
---|
419 | n/a | |
---|
420 | n/a | self.send_response(response_code) |
---|
421 | n/a | |
---|
422 | n/a | for (header, value) in headers: |
---|
423 | n/a | self.send_header(header, value % {'port':self.port}) |
---|
424 | n/a | if body: |
---|
425 | n/a | self.send_header("Content-type", "text/plain") |
---|
426 | n/a | self.end_headers() |
---|
427 | n/a | return body |
---|
428 | n/a | self.end_headers() |
---|
429 | n/a | |
---|
430 | n/a | def log_message(self, *args): |
---|
431 | n/a | pass |
---|
432 | n/a | |
---|
433 | n/a | |
---|
434 | n/a | return FakeHTTPRequestHandler |
---|
435 | n/a | |
---|
436 | n/a | |
---|
437 | n/a | @unittest.skipUnless(threading, "Threading required for this test.") |
---|
438 | n/a | class TestUrlopen(unittest.TestCase): |
---|
439 | n/a | """Tests urllib.request.urlopen using the network. |
---|
440 | n/a | |
---|
441 | n/a | These tests are not exhaustive. Assuming that testing using files does a |
---|
442 | n/a | good job overall of some of the basic interface features. There are no |
---|
443 | n/a | tests exercising the optional 'data' and 'proxies' arguments. No tests |
---|
444 | n/a | for transparent redirection have been written. |
---|
445 | n/a | """ |
---|
446 | n/a | |
---|
447 | n/a | def setUp(self): |
---|
448 | n/a | super(TestUrlopen, self).setUp() |
---|
449 | n/a | |
---|
450 | n/a | # Ignore proxies for localhost tests. |
---|
451 | n/a | def restore_environ(old_environ): |
---|
452 | n/a | os.environ.clear() |
---|
453 | n/a | os.environ.update(old_environ) |
---|
454 | n/a | self.addCleanup(restore_environ, os.environ.copy()) |
---|
455 | n/a | os.environ['NO_PROXY'] = '*' |
---|
456 | n/a | os.environ['no_proxy'] = '*' |
---|
457 | n/a | |
---|
458 | n/a | def urlopen(self, url, data=None, **kwargs): |
---|
459 | n/a | l = [] |
---|
460 | n/a | f = urllib.request.urlopen(url, data, **kwargs) |
---|
461 | n/a | try: |
---|
462 | n/a | # Exercise various methods |
---|
463 | n/a | l.extend(f.readlines(200)) |
---|
464 | n/a | l.append(f.readline()) |
---|
465 | n/a | l.append(f.read(1024)) |
---|
466 | n/a | l.append(f.read()) |
---|
467 | n/a | finally: |
---|
468 | n/a | f.close() |
---|
469 | n/a | return b"".join(l) |
---|
470 | n/a | |
---|
471 | n/a | def start_server(self, responses=None): |
---|
472 | n/a | if responses is None: |
---|
473 | n/a | responses = [(200, [], b"we don't care")] |
---|
474 | n/a | handler = GetRequestHandler(responses) |
---|
475 | n/a | |
---|
476 | n/a | self.server = LoopbackHttpServerThread(handler) |
---|
477 | n/a | self.addCleanup(self.server.stop) |
---|
478 | n/a | self.server.start() |
---|
479 | n/a | self.server.ready.wait() |
---|
480 | n/a | port = self.server.port |
---|
481 | n/a | handler.port = port |
---|
482 | n/a | return handler |
---|
483 | n/a | |
---|
484 | n/a | def start_https_server(self, responses=None, **kwargs): |
---|
485 | n/a | if not hasattr(urllib.request, 'HTTPSHandler'): |
---|
486 | n/a | self.skipTest('ssl support required') |
---|
487 | n/a | from test.ssl_servers import make_https_server |
---|
488 | n/a | if responses is None: |
---|
489 | n/a | responses = [(200, [], b"we care a bit")] |
---|
490 | n/a | handler = GetRequestHandler(responses) |
---|
491 | n/a | server = make_https_server(self, handler_class=handler, **kwargs) |
---|
492 | n/a | handler.port = server.port |
---|
493 | n/a | return handler |
---|
494 | n/a | |
---|
495 | n/a | def test_redirection(self): |
---|
496 | n/a | expected_response = b"We got here..." |
---|
497 | n/a | responses = [ |
---|
498 | n/a | (302, [("Location", "http://localhost:%(port)s/somewhere_else")], |
---|
499 | n/a | ""), |
---|
500 | n/a | (200, [], expected_response) |
---|
501 | n/a | ] |
---|
502 | n/a | |
---|
503 | n/a | handler = self.start_server(responses) |
---|
504 | n/a | data = self.urlopen("http://localhost:%s/" % handler.port) |
---|
505 | n/a | self.assertEqual(data, expected_response) |
---|
506 | n/a | self.assertEqual(handler.requests, ["/", "/somewhere_else"]) |
---|
507 | n/a | |
---|
508 | n/a | def test_chunked(self): |
---|
509 | n/a | expected_response = b"hello world" |
---|
510 | n/a | chunked_start = ( |
---|
511 | n/a | b'a\r\n' |
---|
512 | n/a | b'hello worl\r\n' |
---|
513 | n/a | b'1\r\n' |
---|
514 | n/a | b'd\r\n' |
---|
515 | n/a | b'0\r\n' |
---|
516 | n/a | ) |
---|
517 | n/a | response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] |
---|
518 | n/a | handler = self.start_server(response) |
---|
519 | n/a | data = self.urlopen("http://localhost:%s/" % handler.port) |
---|
520 | n/a | self.assertEqual(data, expected_response) |
---|
521 | n/a | |
---|
522 | n/a | def test_404(self): |
---|
523 | n/a | expected_response = b"Bad bad bad..." |
---|
524 | n/a | handler = self.start_server([(404, [], expected_response)]) |
---|
525 | n/a | |
---|
526 | n/a | try: |
---|
527 | n/a | self.urlopen("http://localhost:%s/weeble" % handler.port) |
---|
528 | n/a | except urllib.error.URLError as f: |
---|
529 | n/a | data = f.read() |
---|
530 | n/a | f.close() |
---|
531 | n/a | else: |
---|
532 | n/a | self.fail("404 should raise URLError") |
---|
533 | n/a | |
---|
534 | n/a | self.assertEqual(data, expected_response) |
---|
535 | n/a | self.assertEqual(handler.requests, ["/weeble"]) |
---|
536 | n/a | |
---|
537 | n/a | def test_200(self): |
---|
538 | n/a | expected_response = b"pycon 2008..." |
---|
539 | n/a | handler = self.start_server([(200, [], expected_response)]) |
---|
540 | n/a | data = self.urlopen("http://localhost:%s/bizarre" % handler.port) |
---|
541 | n/a | self.assertEqual(data, expected_response) |
---|
542 | n/a | self.assertEqual(handler.requests, ["/bizarre"]) |
---|
543 | n/a | |
---|
544 | n/a | def test_200_with_parameters(self): |
---|
545 | n/a | expected_response = b"pycon 2008..." |
---|
546 | n/a | handler = self.start_server([(200, [], expected_response)]) |
---|
547 | n/a | data = self.urlopen("http://localhost:%s/bizarre" % handler.port, |
---|
548 | n/a | b"get=with_feeling") |
---|
549 | n/a | self.assertEqual(data, expected_response) |
---|
550 | n/a | self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) |
---|
551 | n/a | |
---|
552 | n/a | def test_https(self): |
---|
553 | n/a | handler = self.start_https_server() |
---|
554 | n/a | context = ssl.create_default_context(cafile=CERT_localhost) |
---|
555 | n/a | data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) |
---|
556 | n/a | self.assertEqual(data, b"we care a bit") |
---|
557 | n/a | |
---|
558 | n/a | def test_https_with_cafile(self): |
---|
559 | n/a | handler = self.start_https_server(certfile=CERT_localhost) |
---|
560 | n/a | with support.check_warnings(('', DeprecationWarning)): |
---|
561 | n/a | # Good cert |
---|
562 | n/a | data = self.urlopen("https://localhost:%s/bizarre" % handler.port, |
---|
563 | n/a | cafile=CERT_localhost) |
---|
564 | n/a | self.assertEqual(data, b"we care a bit") |
---|
565 | n/a | # Bad cert |
---|
566 | n/a | with self.assertRaises(urllib.error.URLError) as cm: |
---|
567 | n/a | self.urlopen("https://localhost:%s/bizarre" % handler.port, |
---|
568 | n/a | cafile=CERT_fakehostname) |
---|
569 | n/a | # Good cert, but mismatching hostname |
---|
570 | n/a | handler = self.start_https_server(certfile=CERT_fakehostname) |
---|
571 | n/a | with self.assertRaises(ssl.CertificateError) as cm: |
---|
572 | n/a | self.urlopen("https://localhost:%s/bizarre" % handler.port, |
---|
573 | n/a | cafile=CERT_fakehostname) |
---|
574 | n/a | |
---|
575 | n/a | def test_https_with_cadefault(self): |
---|
576 | n/a | handler = self.start_https_server(certfile=CERT_localhost) |
---|
577 | n/a | # Self-signed cert should fail verification with system certificate store |
---|
578 | n/a | with support.check_warnings(('', DeprecationWarning)): |
---|
579 | n/a | with self.assertRaises(urllib.error.URLError) as cm: |
---|
580 | n/a | self.urlopen("https://localhost:%s/bizarre" % handler.port, |
---|
581 | n/a | cadefault=True) |
---|
582 | n/a | |
---|
583 | n/a | def test_https_sni(self): |
---|
584 | n/a | if ssl is None: |
---|
585 | n/a | self.skipTest("ssl module required") |
---|
586 | n/a | if not ssl.HAS_SNI: |
---|
587 | n/a | self.skipTest("SNI support required in OpenSSL") |
---|
588 | n/a | sni_name = None |
---|
589 | n/a | def cb_sni(ssl_sock, server_name, initial_context): |
---|
590 | n/a | nonlocal sni_name |
---|
591 | n/a | sni_name = server_name |
---|
592 | n/a | context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) |
---|
593 | n/a | context.set_servername_callback(cb_sni) |
---|
594 | n/a | handler = self.start_https_server(context=context, certfile=CERT_localhost) |
---|
595 | n/a | context = ssl.create_default_context(cafile=CERT_localhost) |
---|
596 | n/a | self.urlopen("https://localhost:%s" % handler.port, context=context) |
---|
597 | n/a | self.assertEqual(sni_name, "localhost") |
---|
598 | n/a | |
---|
599 | n/a | def test_sending_headers(self): |
---|
600 | n/a | handler = self.start_server() |
---|
601 | n/a | req = urllib.request.Request("http://localhost:%s/" % handler.port, |
---|
602 | n/a | headers={"Range": "bytes=20-39"}) |
---|
603 | n/a | with urllib.request.urlopen(req): |
---|
604 | n/a | pass |
---|
605 | n/a | self.assertEqual(handler.headers_received["Range"], "bytes=20-39") |
---|
606 | n/a | |
---|
607 | n/a | def test_basic(self): |
---|
608 | n/a | handler = self.start_server() |
---|
609 | n/a | open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) |
---|
610 | n/a | for attr in ("read", "close", "info", "geturl"): |
---|
611 | n/a | self.assertTrue(hasattr(open_url, attr), "object returned from " |
---|
612 | n/a | "urlopen lacks the %s attribute" % attr) |
---|
613 | n/a | try: |
---|
614 | n/a | self.assertTrue(open_url.read(), "calling 'read' failed") |
---|
615 | n/a | finally: |
---|
616 | n/a | open_url.close() |
---|
617 | n/a | |
---|
618 | n/a | def test_info(self): |
---|
619 | n/a | handler = self.start_server() |
---|
620 | n/a | open_url = urllib.request.urlopen( |
---|
621 | n/a | "http://localhost:%s" % handler.port) |
---|
622 | n/a | with open_url: |
---|
623 | n/a | info_obj = open_url.info() |
---|
624 | n/a | self.assertIsInstance(info_obj, email.message.Message, |
---|
625 | n/a | "object returned by 'info' is not an " |
---|
626 | n/a | "instance of email.message.Message") |
---|
627 | n/a | self.assertEqual(info_obj.get_content_subtype(), "plain") |
---|
628 | n/a | |
---|
629 | n/a | def test_geturl(self): |
---|
630 | n/a | # Make sure same URL as opened is returned by geturl. |
---|
631 | n/a | handler = self.start_server() |
---|
632 | n/a | open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) |
---|
633 | n/a | with open_url: |
---|
634 | n/a | url = open_url.geturl() |
---|
635 | n/a | self.assertEqual(url, "http://localhost:%s" % handler.port) |
---|
636 | n/a | |
---|
637 | n/a | def test_iteration(self): |
---|
638 | n/a | expected_response = b"pycon 2008..." |
---|
639 | n/a | handler = self.start_server([(200, [], expected_response)]) |
---|
640 | n/a | data = urllib.request.urlopen("http://localhost:%s" % handler.port) |
---|
641 | n/a | for line in data: |
---|
642 | n/a | self.assertEqual(line, expected_response) |
---|
643 | n/a | |
---|
644 | n/a | def test_line_iteration(self): |
---|
645 | n/a | lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] |
---|
646 | n/a | expected_response = b"".join(lines) |
---|
647 | n/a | handler = self.start_server([(200, [], expected_response)]) |
---|
648 | n/a | data = urllib.request.urlopen("http://localhost:%s" % handler.port) |
---|
649 | n/a | for index, line in enumerate(data): |
---|
650 | n/a | self.assertEqual(line, lines[index], |
---|
651 | n/a | "Fetched line number %s doesn't match expected:\n" |
---|
652 | n/a | " Expected length was %s, got %s" % |
---|
653 | n/a | (index, len(lines[index]), len(line))) |
---|
654 | n/a | self.assertEqual(index + 1, len(lines)) |
---|
655 | n/a | |
---|
656 | n/a | |
---|
657 | n/a | threads_key = None |
---|
658 | n/a | |
---|
659 | n/a | def setUpModule(): |
---|
660 | n/a | # Store the threading_setup in a key and ensure that it is cleaned up |
---|
661 | n/a | # in the tearDown |
---|
662 | n/a | global threads_key |
---|
663 | n/a | threads_key = support.threading_setup() |
---|
664 | n/a | |
---|
665 | n/a | def tearDownModule(): |
---|
666 | n/a | if threads_key: |
---|
667 | n/a | support.threading_cleanup(threads_key) |
---|
668 | n/a | |
---|
669 | n/a | if __name__ == "__main__": |
---|
670 | n/a | unittest.main() |
---|