ยปCore Development>Code coverage>Lib/test/test_urllib2_localnet.py

Python code coverage for Lib/test/test_urllib2_localnet.py

#countcontent
1n/aimport base64
2n/aimport os
3n/aimport email
4n/aimport urllib.parse
5n/aimport urllib.request
6n/aimport http.server
7n/aimport unittest
8n/aimport hashlib
9n/a
10n/afrom test import support
11n/a
12n/athreading = support.import_module('threading')
13n/a
14n/atry:
15n/a import ssl
16n/aexcept ImportError:
17n/a ssl = None
18n/a
19n/ahere = os.path.dirname(__file__)
20n/a# Self-signed cert file for 'localhost'
21n/aCERT_localhost = os.path.join(here, 'keycert.pem')
22n/a# Self-signed cert file for 'fakehostname'
23n/aCERT_fakehostname = os.path.join(here, 'keycert2.pem')
24n/a
25n/a
26n/a# Loopback http server infrastructure
27n/a
28n/aclass LoopbackHttpServer(http.server.HTTPServer):
29n/a """HTTP server w/ a few modifications that make it useful for
30n/a loopback testing purposes.
31n/a """
32n/a
33n/a def __init__(self, server_address, RequestHandlerClass):
34n/a http.server.HTTPServer.__init__(self,
35n/a server_address,
36n/a RequestHandlerClass)
37n/a
38n/a # Set the timeout of our listening socket really low so
39n/a # that we can stop the server easily.
40n/a self.socket.settimeout(0.1)
41n/a
42n/a def get_request(self):
43n/a """HTTPServer method, overridden."""
44n/a
45n/a request, client_address = self.socket.accept()
46n/a
47n/a # It's a loopback connection, so setting the timeout
48n/a # really low shouldn't affect anything, but should make
49n/a # deadlocks less likely to occur.
50n/a request.settimeout(10.0)
51n/a
52n/a return (request, client_address)
53n/a
54n/aclass LoopbackHttpServerThread(threading.Thread):
55n/a """Stoppable thread that runs a loopback http server."""
56n/a
57n/a def __init__(self, request_handler):
58n/a threading.Thread.__init__(self)
59n/a self._stop_server = False
60n/a self.ready = threading.Event()
61n/a request_handler.protocol_version = "HTTP/1.0"
62n/a self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
63n/a request_handler)
64n/a self.port = self.httpd.server_port
65n/a
66n/a def stop(self):
67n/a """Stops the webserver if it's currently running."""
68n/a
69n/a self._stop_server = True
70n/a
71n/a self.join()
72n/a self.httpd.server_close()
73n/a
74n/a def run(self):
75n/a self.ready.set()
76n/a while not self._stop_server:
77n/a self.httpd.handle_request()
78n/a
79n/a# Authentication infrastructure
80n/a
81n/aclass DigestAuthHandler:
82n/a """Handler for performing digest authentication."""
83n/a
84n/a def __init__(self):
85n/a self._request_num = 0
86n/a self._nonces = []
87n/a self._users = {}
88n/a self._realm_name = "Test Realm"
89n/a self._qop = "auth"
90n/a
91n/a def set_qop(self, qop):
92n/a self._qop = qop
93n/a
94n/a def set_users(self, users):
95n/a assert isinstance(users, dict)
96n/a self._users = users
97n/a
98n/a def set_realm(self, realm):
99n/a self._realm_name = realm
100n/a
101n/a def _generate_nonce(self):
102n/a self._request_num += 1
103n/a nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
104n/a self._nonces.append(nonce)
105n/a return nonce
106n/a
107n/a def _create_auth_dict(self, auth_str):
108n/a first_space_index = auth_str.find(" ")
109n/a auth_str = auth_str[first_space_index+1:]
110n/a
111n/a parts = auth_str.split(",")
112n/a
113n/a auth_dict = {}
114n/a for part in parts:
115n/a name, value = part.split("=")
116n/a name = name.strip()
117n/a if value[0] == '"' and value[-1] == '"':
118n/a value = value[1:-1]
119n/a else:
120n/a value = value.strip()
121n/a auth_dict[name] = value
122n/a return auth_dict
123n/a
124n/a def _validate_auth(self, auth_dict, password, method, uri):
125n/a final_dict = {}
126n/a final_dict.update(auth_dict)
127n/a final_dict["password"] = password
128n/a final_dict["method"] = method
129n/a final_dict["uri"] = uri
130n/a HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
131n/a HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
132n/a HA2_str = "%(method)s:%(uri)s" % final_dict
133n/a HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
134n/a final_dict["HA1"] = HA1
135n/a final_dict["HA2"] = HA2
136n/a response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
137n/a "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
138n/a response = hashlib.md5(response_str.encode("ascii")).hexdigest()
139n/a
140n/a return response == auth_dict["response"]
141n/a
142n/a def _return_auth_challenge(self, request_handler):
143n/a request_handler.send_response(407, "Proxy Authentication Required")
144n/a request_handler.send_header("Content-Type", "text/html")
145n/a request_handler.send_header(
146n/a 'Proxy-Authenticate', 'Digest realm="%s", '
147n/a 'qop="%s",'
148n/a 'nonce="%s", ' % \
149n/a (self._realm_name, self._qop, self._generate_nonce()))
150n/a # XXX: Not sure if we're supposed to add this next header or
151n/a # not.
152n/a #request_handler.send_header('Connection', 'close')
153n/a request_handler.end_headers()
154n/a request_handler.wfile.write(b"Proxy Authentication Required.")
155n/a return False
156n/a
157n/a def handle_request(self, request_handler):
158n/a """Performs digest authentication on the given HTTP request
159n/a handler. Returns True if authentication was successful, False
160n/a otherwise.
161n/a
162n/a If no users have been set, then digest auth is effectively
163n/a disabled and this method will always return True.
164n/a """
165n/a
166n/a if len(self._users) == 0:
167n/a return True
168n/a
169n/a if "Proxy-Authorization" not in request_handler.headers:
170n/a return self._return_auth_challenge(request_handler)
171n/a else:
172n/a auth_dict = self._create_auth_dict(
173n/a request_handler.headers["Proxy-Authorization"]
174n/a )
175n/a if auth_dict["username"] in self._users:
176n/a password = self._users[ auth_dict["username"] ]
177n/a else:
178n/a return self._return_auth_challenge(request_handler)
179n/a if not auth_dict.get("nonce") in self._nonces:
180n/a return self._return_auth_challenge(request_handler)
181n/a else:
182n/a self._nonces.remove(auth_dict["nonce"])
183n/a
184n/a auth_validated = False
185n/a
186n/a # MSIE uses short_path in its validation, but Python's
187n/a # urllib.request uses the full path, so we're going to see if
188n/a # either of them works here.
189n/a
190n/a for path in [request_handler.path, request_handler.short_path]:
191n/a if self._validate_auth(auth_dict,
192n/a password,
193n/a request_handler.command,
194n/a path):
195n/a auth_validated = True
196n/a
197n/a if not auth_validated:
198n/a return self._return_auth_challenge(request_handler)
199n/a return True
200n/a
201n/a
202n/aclass BasicAuthHandler(http.server.BaseHTTPRequestHandler):
203n/a """Handler for performing basic authentication."""
204n/a # Server side values
205n/a USER = 'testUser'
206n/a PASSWD = 'testPass'
207n/a REALM = 'Test'
208n/a USER_PASSWD = "%s:%s" % (USER, PASSWD)
209n/a ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
210n/a
211n/a def __init__(self, *args, **kwargs):
212n/a http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
213n/a
214n/a def log_message(self, format, *args):
215n/a # Suppress console log message
216n/a pass
217n/a
218n/a def do_HEAD(self):
219n/a self.send_response(200)
220n/a self.send_header("Content-type", "text/html")
221n/a self.end_headers()
222n/a
223n/a def do_AUTHHEAD(self):
224n/a self.send_response(401)
225n/a self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
226n/a self.send_header("Content-type", "text/html")
227n/a self.end_headers()
228n/a
229n/a def do_GET(self):
230n/a if not self.headers.get("Authorization", ""):
231n/a self.do_AUTHHEAD()
232n/a self.wfile.write(b"No Auth header received")
233n/a elif self.headers.get(
234n/a "Authorization", "") == "Basic " + self.ENCODED_AUTH:
235n/a self.send_response(200)
236n/a self.end_headers()
237n/a self.wfile.write(b"It works")
238n/a else:
239n/a # Request Unauthorized
240n/a self.do_AUTHHEAD()
241n/a
242n/a
243n/a
244n/a# Proxy test infrastructure
245n/a
246n/aclass FakeProxyHandler(http.server.BaseHTTPRequestHandler):
247n/a """This is a 'fake proxy' that makes it look like the entire
248n/a internet has gone down due to a sudden zombie invasion. It main
249n/a utility is in providing us with authentication support for
250n/a testing.
251n/a """
252n/a
253n/a def __init__(self, digest_auth_handler, *args, **kwargs):
254n/a # This has to be set before calling our parent's __init__(), which will
255n/a # try to call do_GET().
256n/a self.digest_auth_handler = digest_auth_handler
257n/a http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
258n/a
259n/a def log_message(self, format, *args):
260n/a # Uncomment the next line for debugging.
261n/a # sys.stderr.write(format % args)
262n/a pass
263n/a
264n/a def do_GET(self):
265n/a (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
266n/a self.path, "http")
267n/a self.short_path = path
268n/a if self.digest_auth_handler.handle_request(self):
269n/a self.send_response(200, "OK")
270n/a self.send_header("Content-Type", "text/html")
271n/a self.end_headers()
272n/a self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
273n/a "ascii"))
274n/a self.wfile.write(b"Our apologies, but our server is down due to "
275n/a b"a sudden zombie invasion.")
276n/a
277n/a# Test cases
278n/a
279n/a@unittest.skipUnless(threading, "Threading required for this test.")
280n/aclass BasicAuthTests(unittest.TestCase):
281n/a USER = "testUser"
282n/a PASSWD = "testPass"
283n/a INCORRECT_PASSWD = "Incorrect"
284n/a REALM = "Test"
285n/a
286n/a def setUp(self):
287n/a super(BasicAuthTests, self).setUp()
288n/a # With Basic Authentication
289n/a def http_server_with_basic_auth_handler(*args, **kwargs):
290n/a return BasicAuthHandler(*args, **kwargs)
291n/a self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
292n/a self.addCleanup(self.server.stop)
293n/a self.server_url = 'http://127.0.0.1:%s' % self.server.port
294n/a self.server.start()
295n/a self.server.ready.wait()
296n/a
297n/a def tearDown(self):
298n/a super(BasicAuthTests, self).tearDown()
299n/a
300n/a def test_basic_auth_success(self):
301n/a ah = urllib.request.HTTPBasicAuthHandler()
302n/a ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
303n/a urllib.request.install_opener(urllib.request.build_opener(ah))
304n/a try:
305n/a self.assertTrue(urllib.request.urlopen(self.server_url))
306n/a except urllib.error.HTTPError:
307n/a self.fail("Basic auth failed for the url: %s", self.server_url)
308n/a
309n/a def test_basic_auth_httperror(self):
310n/a ah = urllib.request.HTTPBasicAuthHandler()
311n/a ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
312n/a urllib.request.install_opener(urllib.request.build_opener(ah))
313n/a self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
314n/a
315n/a
316n/a@unittest.skipUnless(threading, "Threading required for this test.")
317n/aclass ProxyAuthTests(unittest.TestCase):
318n/a URL = "http://localhost"
319n/a
320n/a USER = "tester"
321n/a PASSWD = "test123"
322n/a REALM = "TestRealm"
323n/a
324n/a def setUp(self):
325n/a super(ProxyAuthTests, self).setUp()
326n/a # Ignore proxy bypass settings in the environment.
327n/a def restore_environ(old_environ):
328n/a os.environ.clear()
329n/a os.environ.update(old_environ)
330n/a self.addCleanup(restore_environ, os.environ.copy())
331n/a os.environ['NO_PROXY'] = ''
332n/a os.environ['no_proxy'] = ''
333n/a
334n/a self.digest_auth_handler = DigestAuthHandler()
335n/a self.digest_auth_handler.set_users({self.USER: self.PASSWD})
336n/a self.digest_auth_handler.set_realm(self.REALM)
337n/a # With Digest Authentication.
338n/a def create_fake_proxy_handler(*args, **kwargs):
339n/a return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
340n/a
341n/a self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
342n/a self.server.start()
343n/a self.server.ready.wait()
344n/a proxy_url = "http://127.0.0.1:%d" % self.server.port
345n/a handler = urllib.request.ProxyHandler({"http" : proxy_url})
346n/a self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
347n/a self.opener = urllib.request.build_opener(
348n/a handler, self.proxy_digest_handler)
349n/a
350n/a def tearDown(self):
351n/a self.server.stop()
352n/a super(ProxyAuthTests, self).tearDown()
353n/a
354n/a def test_proxy_with_bad_password_raises_httperror(self):
355n/a self.proxy_digest_handler.add_password(self.REALM, self.URL,
356n/a self.USER, self.PASSWD+"bad")
357n/a self.digest_auth_handler.set_qop("auth")
358n/a self.assertRaises(urllib.error.HTTPError,
359n/a self.opener.open,
360n/a self.URL)
361n/a
362n/a def test_proxy_with_no_password_raises_httperror(self):
363n/a self.digest_auth_handler.set_qop("auth")
364n/a self.assertRaises(urllib.error.HTTPError,
365n/a self.opener.open,
366n/a self.URL)
367n/a
368n/a def test_proxy_qop_auth_works(self):
369n/a self.proxy_digest_handler.add_password(self.REALM, self.URL,
370n/a self.USER, self.PASSWD)
371n/a self.digest_auth_handler.set_qop("auth")
372n/a result = self.opener.open(self.URL)
373n/a while result.read():
374n/a pass
375n/a result.close()
376n/a
377n/a def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
378n/a self.proxy_digest_handler.add_password(self.REALM, self.URL,
379n/a self.USER, self.PASSWD)
380n/a self.digest_auth_handler.set_qop("auth-int")
381n/a try:
382n/a result = self.opener.open(self.URL)
383n/a except urllib.error.URLError:
384n/a # It's okay if we don't support auth-int, but we certainly
385n/a # shouldn't receive any kind of exception here other than
386n/a # a URLError.
387n/a result = None
388n/a if result:
389n/a while result.read():
390n/a pass
391n/a result.close()
392n/a
393n/a
394n/adef GetRequestHandler(responses):
395n/a
396n/a class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
397n/a
398n/a server_version = "TestHTTP/"
399n/a requests = []
400n/a headers_received = []
401n/a port = 80
402n/a
403n/a def do_GET(self):
404n/a body = self.send_head()
405n/a while body:
406n/a done = self.wfile.write(body)
407n/a body = body[done:]
408n/a
409n/a def do_POST(self):
410n/a content_length = self.headers["Content-Length"]
411n/a post_data = self.rfile.read(int(content_length))
412n/a self.do_GET()
413n/a self.requests.append(post_data)
414n/a
415n/a def send_head(self):
416n/a FakeHTTPRequestHandler.headers_received = self.headers
417n/a self.requests.append(self.path)
418n/a response_code, headers, body = responses.pop(0)
419n/a
420n/a self.send_response(response_code)
421n/a
422n/a for (header, value) in headers:
423n/a self.send_header(header, value % {'port':self.port})
424n/a if body:
425n/a self.send_header("Content-type", "text/plain")
426n/a self.end_headers()
427n/a return body
428n/a self.end_headers()
429n/a
430n/a def log_message(self, *args):
431n/a pass
432n/a
433n/a
434n/a return FakeHTTPRequestHandler
435n/a
436n/a
437n/a@unittest.skipUnless(threading, "Threading required for this test.")
438n/aclass TestUrlopen(unittest.TestCase):
439n/a """Tests urllib.request.urlopen using the network.
440n/a
441n/a These tests are not exhaustive. Assuming that testing using files does a
442n/a good job overall of some of the basic interface features. There are no
443n/a tests exercising the optional 'data' and 'proxies' arguments. No tests
444n/a for transparent redirection have been written.
445n/a """
446n/a
447n/a def setUp(self):
448n/a super(TestUrlopen, self).setUp()
449n/a
450n/a # Ignore proxies for localhost tests.
451n/a def restore_environ(old_environ):
452n/a os.environ.clear()
453n/a os.environ.update(old_environ)
454n/a self.addCleanup(restore_environ, os.environ.copy())
455n/a os.environ['NO_PROXY'] = '*'
456n/a os.environ['no_proxy'] = '*'
457n/a
458n/a def urlopen(self, url, data=None, **kwargs):
459n/a l = []
460n/a f = urllib.request.urlopen(url, data, **kwargs)
461n/a try:
462n/a # Exercise various methods
463n/a l.extend(f.readlines(200))
464n/a l.append(f.readline())
465n/a l.append(f.read(1024))
466n/a l.append(f.read())
467n/a finally:
468n/a f.close()
469n/a return b"".join(l)
470n/a
471n/a def start_server(self, responses=None):
472n/a if responses is None:
473n/a responses = [(200, [], b"we don't care")]
474n/a handler = GetRequestHandler(responses)
475n/a
476n/a self.server = LoopbackHttpServerThread(handler)
477n/a self.addCleanup(self.server.stop)
478n/a self.server.start()
479n/a self.server.ready.wait()
480n/a port = self.server.port
481n/a handler.port = port
482n/a return handler
483n/a
484n/a def start_https_server(self, responses=None, **kwargs):
485n/a if not hasattr(urllib.request, 'HTTPSHandler'):
486n/a self.skipTest('ssl support required')
487n/a from test.ssl_servers import make_https_server
488n/a if responses is None:
489n/a responses = [(200, [], b"we care a bit")]
490n/a handler = GetRequestHandler(responses)
491n/a server = make_https_server(self, handler_class=handler, **kwargs)
492n/a handler.port = server.port
493n/a return handler
494n/a
495n/a def test_redirection(self):
496n/a expected_response = b"We got here..."
497n/a responses = [
498n/a (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
499n/a ""),
500n/a (200, [], expected_response)
501n/a ]
502n/a
503n/a handler = self.start_server(responses)
504n/a data = self.urlopen("http://localhost:%s/" % handler.port)
505n/a self.assertEqual(data, expected_response)
506n/a self.assertEqual(handler.requests, ["/", "/somewhere_else"])
507n/a
508n/a def test_chunked(self):
509n/a expected_response = b"hello world"
510n/a chunked_start = (
511n/a b'a\r\n'
512n/a b'hello worl\r\n'
513n/a b'1\r\n'
514n/a b'd\r\n'
515n/a b'0\r\n'
516n/a )
517n/a response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
518n/a handler = self.start_server(response)
519n/a data = self.urlopen("http://localhost:%s/" % handler.port)
520n/a self.assertEqual(data, expected_response)
521n/a
522n/a def test_404(self):
523n/a expected_response = b"Bad bad bad..."
524n/a handler = self.start_server([(404, [], expected_response)])
525n/a
526n/a try:
527n/a self.urlopen("http://localhost:%s/weeble" % handler.port)
528n/a except urllib.error.URLError as f:
529n/a data = f.read()
530n/a f.close()
531n/a else:
532n/a self.fail("404 should raise URLError")
533n/a
534n/a self.assertEqual(data, expected_response)
535n/a self.assertEqual(handler.requests, ["/weeble"])
536n/a
537n/a def test_200(self):
538n/a expected_response = b"pycon 2008..."
539n/a handler = self.start_server([(200, [], expected_response)])
540n/a data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
541n/a self.assertEqual(data, expected_response)
542n/a self.assertEqual(handler.requests, ["/bizarre"])
543n/a
544n/a def test_200_with_parameters(self):
545n/a expected_response = b"pycon 2008..."
546n/a handler = self.start_server([(200, [], expected_response)])
547n/a data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
548n/a b"get=with_feeling")
549n/a self.assertEqual(data, expected_response)
550n/a self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
551n/a
552n/a def test_https(self):
553n/a handler = self.start_https_server()
554n/a context = ssl.create_default_context(cafile=CERT_localhost)
555n/a data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
556n/a self.assertEqual(data, b"we care a bit")
557n/a
558n/a def test_https_with_cafile(self):
559n/a handler = self.start_https_server(certfile=CERT_localhost)
560n/a with support.check_warnings(('', DeprecationWarning)):
561n/a # Good cert
562n/a data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
563n/a cafile=CERT_localhost)
564n/a self.assertEqual(data, b"we care a bit")
565n/a # Bad cert
566n/a with self.assertRaises(urllib.error.URLError) as cm:
567n/a self.urlopen("https://localhost:%s/bizarre" % handler.port,
568n/a cafile=CERT_fakehostname)
569n/a # Good cert, but mismatching hostname
570n/a handler = self.start_https_server(certfile=CERT_fakehostname)
571n/a with self.assertRaises(ssl.CertificateError) as cm:
572n/a self.urlopen("https://localhost:%s/bizarre" % handler.port,
573n/a cafile=CERT_fakehostname)
574n/a
575n/a def test_https_with_cadefault(self):
576n/a handler = self.start_https_server(certfile=CERT_localhost)
577n/a # Self-signed cert should fail verification with system certificate store
578n/a with support.check_warnings(('', DeprecationWarning)):
579n/a with self.assertRaises(urllib.error.URLError) as cm:
580n/a self.urlopen("https://localhost:%s/bizarre" % handler.port,
581n/a cadefault=True)
582n/a
583n/a def test_https_sni(self):
584n/a if ssl is None:
585n/a self.skipTest("ssl module required")
586n/a if not ssl.HAS_SNI:
587n/a self.skipTest("SNI support required in OpenSSL")
588n/a sni_name = None
589n/a def cb_sni(ssl_sock, server_name, initial_context):
590n/a nonlocal sni_name
591n/a sni_name = server_name
592n/a context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
593n/a context.set_servername_callback(cb_sni)
594n/a handler = self.start_https_server(context=context, certfile=CERT_localhost)
595n/a context = ssl.create_default_context(cafile=CERT_localhost)
596n/a self.urlopen("https://localhost:%s" % handler.port, context=context)
597n/a self.assertEqual(sni_name, "localhost")
598n/a
599n/a def test_sending_headers(self):
600n/a handler = self.start_server()
601n/a req = urllib.request.Request("http://localhost:%s/" % handler.port,
602n/a headers={"Range": "bytes=20-39"})
603n/a with urllib.request.urlopen(req):
604n/a pass
605n/a self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
606n/a
607n/a def test_basic(self):
608n/a handler = self.start_server()
609n/a open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
610n/a for attr in ("read", "close", "info", "geturl"):
611n/a self.assertTrue(hasattr(open_url, attr), "object returned from "
612n/a "urlopen lacks the %s attribute" % attr)
613n/a try:
614n/a self.assertTrue(open_url.read(), "calling 'read' failed")
615n/a finally:
616n/a open_url.close()
617n/a
618n/a def test_info(self):
619n/a handler = self.start_server()
620n/a open_url = urllib.request.urlopen(
621n/a "http://localhost:%s" % handler.port)
622n/a with open_url:
623n/a info_obj = open_url.info()
624n/a self.assertIsInstance(info_obj, email.message.Message,
625n/a "object returned by 'info' is not an "
626n/a "instance of email.message.Message")
627n/a self.assertEqual(info_obj.get_content_subtype(), "plain")
628n/a
629n/a def test_geturl(self):
630n/a # Make sure same URL as opened is returned by geturl.
631n/a handler = self.start_server()
632n/a open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
633n/a with open_url:
634n/a url = open_url.geturl()
635n/a self.assertEqual(url, "http://localhost:%s" % handler.port)
636n/a
637n/a def test_iteration(self):
638n/a expected_response = b"pycon 2008..."
639n/a handler = self.start_server([(200, [], expected_response)])
640n/a data = urllib.request.urlopen("http://localhost:%s" % handler.port)
641n/a for line in data:
642n/a self.assertEqual(line, expected_response)
643n/a
644n/a def test_line_iteration(self):
645n/a lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
646n/a expected_response = b"".join(lines)
647n/a handler = self.start_server([(200, [], expected_response)])
648n/a data = urllib.request.urlopen("http://localhost:%s" % handler.port)
649n/a for index, line in enumerate(data):
650n/a self.assertEqual(line, lines[index],
651n/a "Fetched line number %s doesn't match expected:\n"
652n/a " Expected length was %s, got %s" %
653n/a (index, len(lines[index]), len(line)))
654n/a self.assertEqual(index + 1, len(lines))
655n/a
656n/a
657n/athreads_key = None
658n/a
659n/adef setUpModule():
660n/a # Store the threading_setup in a key and ensure that it is cleaned up
661n/a # in the tearDown
662n/a global threads_key
663n/a threads_key = support.threading_setup()
664n/a
665n/adef tearDownModule():
666n/a if threads_key:
667n/a support.threading_cleanup(threads_key)
668n/a
669n/aif __name__ == "__main__":
670n/a unittest.main()