ยปCore Development>Code coverage>Lib/packaging/tests/pypi_server.py

Python code coverage for Lib/packaging/tests/pypi_server.py

#countcontent
1n/a"""Mock PyPI Server implementation, to use in tests.
2n/a
3n/aThis module also provides a simple test case to extend if you need to use
4n/athe PyPIServer all along your test case. Be sure to read the documentation
5n/abefore any use.
6n/a
7n/aXXX TODO:
8n/a
9n/aThe mock server can handle simple HTTP request (to simulate a simple index) or
10n/aXMLRPC requests, over HTTP. Both does not have the same intergface to deal
11n/awith, and I think it's a pain.
12n/a
13n/aA good idea could be to re-think a bit the way dstributions are handled in the
14n/amock server. As it should return malformed HTML pages, we need to keep the
15n/astatic behavior.
16n/a
17n/aI think of something like that:
18n/a
19n/a >>> server = PyPIMockServer()
20n/a >>> server.startHTTP()
21n/a >>> server.startXMLRPC()
22n/a
23n/aThen, the server must have only one port to rely on, eg.
24n/a
25n/a >>> server.fulladdress()
26n/a "http://ip:port/"
27n/a
28n/aIt could be simple to have one HTTP server, relaying the requests to the two
29n/aimplementations (static HTTP and XMLRPC over HTTP).
30n/a"""
31n/a
32n/aimport os
33n/aimport queue
34n/aimport select
35n/aimport threading
36n/afrom functools import wraps
37n/afrom http.server import HTTPServer, SimpleHTTPRequestHandler
38n/afrom xmlrpc.server import SimpleXMLRPCServer
39n/a
40n/afrom packaging.tests import unittest
41n/a
42n/a
43n/aPYPI_DEFAULT_STATIC_PATH = os.path.join(
44n/a os.path.dirname(os.path.abspath(__file__)), 'pypiserver')
45n/a
46n/a
47n/adef use_xmlrpc_server(*server_args, **server_kwargs):
48n/a server_kwargs['serve_xmlrpc'] = True
49n/a return use_pypi_server(*server_args, **server_kwargs)
50n/a
51n/a
52n/adef use_http_server(*server_args, **server_kwargs):
53n/a server_kwargs['serve_xmlrpc'] = False
54n/a return use_pypi_server(*server_args, **server_kwargs)
55n/a
56n/a
57n/adef use_pypi_server(*server_args, **server_kwargs):
58n/a """Decorator to make use of the PyPIServer for test methods,
59n/a just when needed, and not for the entire duration of the testcase.
60n/a """
61n/a def wrapper(func):
62n/a @wraps(func)
63n/a def wrapped(*args, **kwargs):
64n/a server = PyPIServer(*server_args, **server_kwargs)
65n/a server.start()
66n/a try:
67n/a func(server=server, *args, **kwargs)
68n/a finally:
69n/a server.stop()
70n/a return wrapped
71n/a return wrapper
72n/a
73n/a
74n/aclass PyPIServerTestCase(unittest.TestCase):
75n/a
76n/a def setUp(self):
77n/a super(PyPIServerTestCase, self).setUp()
78n/a self.pypi = PyPIServer()
79n/a self.pypi.start()
80n/a self.addCleanup(self.pypi.stop)
81n/a
82n/a
83n/aclass PyPIServer(threading.Thread):
84n/a """PyPI Mocked server.
85n/a Provides a mocked version of the PyPI API's, to ease tests.
86n/a
87n/a Support serving static content and serving previously given text.
88n/a """
89n/a
90n/a def __init__(self, test_static_path=None,
91n/a static_filesystem_paths=None,
92n/a static_uri_paths=["simple", "packages"], serve_xmlrpc=False):
93n/a """Initialize the server.
94n/a
95n/a Default behavior is to start the HTTP server. You can either start the
96n/a xmlrpc server by setting xmlrpc to True. Caution: Only one server will
97n/a be started.
98n/a
99n/a static_uri_paths and static_base_path are parameters used to provides
100n/a respectively the http_paths to serve statically, and where to find the
101n/a matching files on the filesystem.
102n/a """
103n/a # we want to launch the server in a new dedicated thread, to not freeze
104n/a # tests.
105n/a super(PyPIServer, self).__init__()
106n/a self._run = True
107n/a self._serve_xmlrpc = serve_xmlrpc
108n/a if static_filesystem_paths is None:
109n/a static_filesystem_paths = ["default"]
110n/a
111n/a #TODO allow to serve XMLRPC and HTTP static files at the same time.
112n/a if not self._serve_xmlrpc:
113n/a self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler)
114n/a self.server.RequestHandlerClass.pypi_server = self
115n/a
116n/a self.request_queue = queue.Queue()
117n/a self._requests = []
118n/a self.default_response_status = 404
119n/a self.default_response_headers = [('Content-type', 'text/plain')]
120n/a self.default_response_data = "The page does not exists"
121n/a
122n/a # initialize static paths / filesystems
123n/a self.static_uri_paths = static_uri_paths
124n/a
125n/a # append the static paths defined locally
126n/a if test_static_path is not None:
127n/a static_filesystem_paths.append(test_static_path)
128n/a self.static_filesystem_paths = [
129n/a PYPI_DEFAULT_STATIC_PATH + "/" + path
130n/a for path in static_filesystem_paths]
131n/a else:
132n/a # XMLRPC server
133n/a self.server = PyPIXMLRPCServer(('127.0.0.1', 0))
134n/a self.xmlrpc = XMLRPCMockIndex()
135n/a # register the xmlrpc methods
136n/a self.server.register_introspection_functions()
137n/a self.server.register_instance(self.xmlrpc)
138n/a
139n/a self.address = ('127.0.0.1', self.server.server_port)
140n/a # to not have unwanted outputs.
141n/a self.server.RequestHandlerClass.log_request = lambda *_: None
142n/a
143n/a def run(self):
144n/a # loop because we can't stop it otherwise, for python < 2.6
145n/a while self._run:
146n/a r, w, e = select.select([self.server], [], [], 0.5)
147n/a if r:
148n/a self.server.handle_request()
149n/a
150n/a def stop(self):
151n/a """self shutdown is not supported for python < 2.6"""
152n/a self._run = False
153n/a if self.is_alive():
154n/a self.join()
155n/a self.server.server_close()
156n/a
157n/a def get_next_response(self):
158n/a return (self.default_response_status,
159n/a self.default_response_headers,
160n/a self.default_response_data)
161n/a
162n/a @property
163n/a def requests(self):
164n/a """Use this property to get all requests that have been made
165n/a to the server
166n/a """
167n/a while True:
168n/a try:
169n/a self._requests.append(self.request_queue.get_nowait())
170n/a except queue.Empty:
171n/a break
172n/a return self._requests
173n/a
174n/a @property
175n/a def full_address(self):
176n/a return "http://%s:%s" % self.address
177n/a
178n/a
179n/aclass PyPIRequestHandler(SimpleHTTPRequestHandler):
180n/a # we need to access the pypi server while serving the content
181n/a pypi_server = None
182n/a
183n/a def serve_request(self):
184n/a """Serve the content.
185n/a
186n/a Also record the requests to be accessed later. If trying to access an
187n/a url matching a static uri, serve static content, otherwise serve
188n/a what is provided by the `get_next_response` method.
189n/a
190n/a If nothing is defined there, return a 404 header.
191n/a """
192n/a # record the request. Read the input only on PUT or POST requests
193n/a if self.command in ("PUT", "POST"):
194n/a if 'content-length' in self.headers:
195n/a request_data = self.rfile.read(
196n/a int(self.headers['content-length']))
197n/a else:
198n/a request_data = self.rfile.read()
199n/a
200n/a elif self.command in ("GET", "DELETE"):
201n/a request_data = ''
202n/a
203n/a self.pypi_server.request_queue.put((self, request_data))
204n/a
205n/a # serve the content from local disc if we request an URL beginning
206n/a # by a pattern defined in `static_paths`
207n/a url_parts = self.path.split("/")
208n/a if (len(url_parts) > 1 and
209n/a url_parts[1] in self.pypi_server.static_uri_paths):
210n/a data = None
211n/a # always take the last first.
212n/a fs_paths = []
213n/a fs_paths.extend(self.pypi_server.static_filesystem_paths)
214n/a fs_paths.reverse()
215n/a relative_path = self.path
216n/a for fs_path in fs_paths:
217n/a try:
218n/a if self.path.endswith("/"):
219n/a relative_path += "index.html"
220n/a
221n/a if relative_path.endswith('.tar.gz'):
222n/a with open(fs_path + relative_path, 'rb') as file:
223n/a data = file.read()
224n/a headers = [('Content-type', 'application/x-gtar')]
225n/a else:
226n/a with open(fs_path + relative_path) as file:
227n/a data = file.read().encode()
228n/a headers = [('Content-type', 'text/html')]
229n/a
230n/a headers.append(('Content-Length', len(data)))
231n/a self.make_response(data, headers=headers)
232n/a
233n/a except IOError:
234n/a pass
235n/a
236n/a if data is None:
237n/a self.make_response("Not found", 404)
238n/a
239n/a # otherwise serve the content from get_next_response
240n/a else:
241n/a # send back a response
242n/a status, headers, data = self.pypi_server.get_next_response()
243n/a self.make_response(data, status, headers)
244n/a
245n/a do_POST = do_GET = do_DELETE = do_PUT = serve_request
246n/a
247n/a def make_response(self, data, status=200,
248n/a headers=[('Content-type', 'text/html')]):
249n/a """Send the response to the HTTP client"""
250n/a if not isinstance(status, int):
251n/a try:
252n/a status = int(status)
253n/a except ValueError:
254n/a # we probably got something like YYY Codename.
255n/a # Just get the first 3 digits
256n/a status = int(status[:3])
257n/a
258n/a self.send_response(status)
259n/a for header, value in headers:
260n/a self.send_header(header, value)
261n/a self.end_headers()
262n/a
263n/a if isinstance(data, str):
264n/a data = data.encode('utf-8')
265n/a
266n/a self.wfile.write(data)
267n/a
268n/a
269n/aclass PyPIXMLRPCServer(SimpleXMLRPCServer):
270n/a def server_bind(self):
271n/a """Override server_bind to store the server name."""
272n/a super(PyPIXMLRPCServer, self).server_bind()
273n/a host, port = self.socket.getsockname()[:2]
274n/a self.server_port = port
275n/a
276n/a
277n/aclass MockDist:
278n/a """Fake distribution, used in the Mock PyPI Server"""
279n/a
280n/a def __init__(self, name, version="1.0", hidden=False, url="http://url/",
281n/a type="sdist", filename="", size=10000,
282n/a digest="123456", downloads=7, has_sig=False,
283n/a python_version="source", comment="comment",
284n/a author="John Doe", author_email="john@doe.name",
285n/a maintainer="Main Tayner", maintainer_email="maintainer_mail",
286n/a project_url="http://project_url/", homepage="http://homepage/",
287n/a keywords="", platform="UNKNOWN", classifiers=[], licence="",
288n/a description="Description", summary="Summary", stable_version="",
289n/a ordering="", documentation_id="", code_kwalitee_id="",
290n/a installability_id="", obsoletes=[], obsoletes_dist=[],
291n/a provides=[], provides_dist=[], requires=[], requires_dist=[],
292n/a requires_external=[], requires_python=""):
293n/a
294n/a # basic fields
295n/a self.name = name
296n/a self.version = version
297n/a self.hidden = hidden
298n/a
299n/a # URL infos
300n/a self.url = url
301n/a self.digest = digest
302n/a self.downloads = downloads
303n/a self.has_sig = has_sig
304n/a self.python_version = python_version
305n/a self.comment = comment
306n/a self.type = type
307n/a
308n/a # metadata
309n/a self.author = author
310n/a self.author_email = author_email
311n/a self.maintainer = maintainer
312n/a self.maintainer_email = maintainer_email
313n/a self.project_url = project_url
314n/a self.homepage = homepage
315n/a self.keywords = keywords
316n/a self.platform = platform
317n/a self.classifiers = classifiers
318n/a self.licence = licence
319n/a self.description = description
320n/a self.summary = summary
321n/a self.stable_version = stable_version
322n/a self.ordering = ordering
323n/a self.cheesecake_documentation_id = documentation_id
324n/a self.cheesecake_code_kwalitee_id = code_kwalitee_id
325n/a self.cheesecake_installability_id = installability_id
326n/a
327n/a self.obsoletes = obsoletes
328n/a self.obsoletes_dist = obsoletes_dist
329n/a self.provides = provides
330n/a self.provides_dist = provides_dist
331n/a self.requires = requires
332n/a self.requires_dist = requires_dist
333n/a self.requires_external = requires_external
334n/a self.requires_python = requires_python
335n/a
336n/a def url_infos(self):
337n/a return {
338n/a 'url': self.url,
339n/a 'packagetype': self.type,
340n/a 'filename': 'filename.tar.gz',
341n/a 'size': '6000',
342n/a 'md5_digest': self.digest,
343n/a 'downloads': self.downloads,
344n/a 'has_sig': self.has_sig,
345n/a 'python_version': self.python_version,
346n/a 'comment_text': self.comment,
347n/a }
348n/a
349n/a def metadata(self):
350n/a return {
351n/a 'maintainer': self.maintainer,
352n/a 'project_url': [self.project_url],
353n/a 'maintainer_email': self.maintainer_email,
354n/a 'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id,
355n/a 'keywords': self.keywords,
356n/a 'obsoletes_dist': self.obsoletes_dist,
357n/a 'requires_external': self.requires_external,
358n/a 'author': self.author,
359n/a 'author_email': self.author_email,
360n/a 'download_url': self.url,
361n/a 'platform': self.platform,
362n/a 'version': self.version,
363n/a 'obsoletes': self.obsoletes,
364n/a 'provides': self.provides,
365n/a 'cheesecake_documentation_id': self.cheesecake_documentation_id,
366n/a '_pypi_hidden': self.hidden,
367n/a 'description': self.description,
368n/a '_pypi_ordering': 19,
369n/a 'requires_dist': self.requires_dist,
370n/a 'requires_python': self.requires_python,
371n/a 'classifiers': [],
372n/a 'name': self.name,
373n/a 'licence': self.licence, # XXX licence or license?
374n/a 'summary': self.summary,
375n/a 'home_page': self.homepage,
376n/a 'stable_version': self.stable_version,
377n/a # FIXME doesn't that reproduce the bug from 6527d3106e9f?
378n/a 'provides_dist': (self.provides_dist or
379n/a "%s (%s)" % (self.name, self.version)),
380n/a 'requires': self.requires,
381n/a 'cheesecake_installability_id': self.cheesecake_installability_id,
382n/a }
383n/a
384n/a def search_result(self):
385n/a return {
386n/a '_pypi_ordering': 0,
387n/a 'version': self.version,
388n/a 'name': self.name,
389n/a 'summary': self.summary,
390n/a }
391n/a
392n/a
393n/aclass XMLRPCMockIndex:
394n/a """Mock XMLRPC server"""
395n/a
396n/a def __init__(self, dists=[]):
397n/a self._dists = dists
398n/a self._search_result = []
399n/a
400n/a def add_distributions(self, dists):
401n/a for dist in dists:
402n/a self._dists.append(MockDist(**dist))
403n/a
404n/a def set_distributions(self, dists):
405n/a self._dists = []
406n/a self.add_distributions(dists)
407n/a
408n/a def set_search_result(self, result):
409n/a """set a predefined search result"""
410n/a self._search_result = result
411n/a
412n/a def _get_search_results(self):
413n/a results = []
414n/a for name in self._search_result:
415n/a found_dist = [d for d in self._dists if d.name == name]
416n/a if found_dist:
417n/a results.append(found_dist[0])
418n/a else:
419n/a dist = MockDist(name)
420n/a results.append(dist)
421n/a self._dists.append(dist)
422n/a return [r.search_result() for r in results]
423n/a
424n/a def list_packages(self):
425n/a return [d.name for d in self._dists]
426n/a
427n/a def package_releases(self, package_name, show_hidden=False):
428n/a if show_hidden:
429n/a # return all
430n/a return [d.version for d in self._dists if d.name == package_name]
431n/a else:
432n/a # return only un-hidden
433n/a return [d.version for d in self._dists if d.name == package_name
434n/a and not d.hidden]
435n/a
436n/a def release_urls(self, package_name, version):
437n/a return [d.url_infos() for d in self._dists
438n/a if d.name == package_name and d.version == version]
439n/a
440n/a def release_data(self, package_name, version):
441n/a release = [d for d in self._dists
442n/a if d.name == package_name and d.version == version]
443n/a if release:
444n/a return release[0].metadata()
445n/a else:
446n/a return {}
447n/a
448n/a def search(self, spec, operator="and"):
449n/a return self._get_search_results()