»Core Development>Code coverage>Lib/test/test_xmlrpc.py

Python code coverage for Lib/test/test_xmlrpc.py

1n/aimport base64
2n/aimport datetime
3n/aimport decimal
4n/aimport sys
5n/aimport time
6n/aimport unittest
7n/afrom unittest import mock
8n/aimport xmlrpc.client as xmlrpclib
9n/aimport xmlrpc.server
10n/aimport http.client
11n/aimport http, http.server
12n/aimport socket
13n/aimport re
14n/aimport io
15n/aimport contextlib
16n/afrom test import support
19n/a import gzip
20n/aexcept ImportError:
21n/a gzip = None
23n/a import threading
24n/aexcept ImportError:
25n/a threading = None
27n/aalist = [{'astring': 'foo@bar.baz.spam',
28n/a 'afloat': 7283.43,
29n/a 'anint': 2**20,
30n/a 'ashortlong': 2,
31n/a 'anotherlist': ['.zyx.41'],
32n/a 'abase64': xmlrpclib.Binary(b"my dog has fleas"),
33n/a 'b64bytes': b"my dog has fleas",
34n/a 'b64bytearray': bytearray(b"my dog has fleas"),
35n/a 'boolean': False,
36n/a 'unicode': '\u4000\u6000\u8000',
37n/a 'ukey\u4000': 'regular value',
38n/a 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
39n/a 'datetime2': xmlrpclib.DateTime(
40n/a (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
41n/a 'datetime3': xmlrpclib.DateTime(
42n/a datetime.datetime(2005, 2, 10, 11, 41, 23)),
43n/a }]
45n/aclass XMLRPCTestCase(unittest.TestCase):
47n/a def test_dump_load(self):
48n/a dump = xmlrpclib.dumps((alist,))
49n/a load = xmlrpclib.loads(dump)
50n/a self.assertEqual(alist, load[0][0])
52n/a def test_dump_bare_datetime(self):
53n/a # This checks that an unwrapped datetime.date object can be handled
54n/a # by the marshalling code. This can't be done via test_dump_load()
55n/a # since with use_builtin_types set to 1 the unmarshaller would create
56n/a # datetime objects for the 'datetime[123]' keys as well
57n/a dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
58n/a self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
59n/a s = xmlrpclib.dumps((dt,))
61n/a result, m = xmlrpclib.loads(s, use_builtin_types=True)
62n/a (newdt,) = result
63n/a self.assertEqual(newdt, dt)
64n/a self.assertIs(type(newdt), datetime.datetime)
65n/a self.assertIsNone(m)
67n/a result, m = xmlrpclib.loads(s, use_builtin_types=False)
68n/a (newdt,) = result
69n/a self.assertEqual(newdt, dt)
70n/a self.assertIs(type(newdt), xmlrpclib.DateTime)
71n/a self.assertIsNone(m)
73n/a result, m = xmlrpclib.loads(s, use_datetime=True)
74n/a (newdt,) = result
75n/a self.assertEqual(newdt, dt)
76n/a self.assertIs(type(newdt), datetime.datetime)
77n/a self.assertIsNone(m)
79n/a result, m = xmlrpclib.loads(s, use_datetime=False)
80n/a (newdt,) = result
81n/a self.assertEqual(newdt, dt)
82n/a self.assertIs(type(newdt), xmlrpclib.DateTime)
83n/a self.assertIsNone(m)
86n/a def test_datetime_before_1900(self):
87n/a # same as before but with a date before 1900
88n/a dt = datetime.datetime(1, 2, 10, 11, 41, 23)
89n/a self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
90n/a s = xmlrpclib.dumps((dt,))
92n/a result, m = xmlrpclib.loads(s, use_builtin_types=True)
93n/a (newdt,) = result
94n/a self.assertEqual(newdt, dt)
95n/a self.assertIs(type(newdt), datetime.datetime)
96n/a self.assertIsNone(m)
98n/a result, m = xmlrpclib.loads(s, use_builtin_types=False)
99n/a (newdt,) = result
100n/a self.assertEqual(newdt, dt)
101n/a self.assertIs(type(newdt), xmlrpclib.DateTime)
102n/a self.assertIsNone(m)
104n/a def test_bug_1164912 (self):
105n/a d = xmlrpclib.DateTime()
106n/a ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
107n/a methodresponse=True))
108n/a self.assertIsInstance(new_d.value, str)
110n/a # Check that the output of dumps() is still an 8-bit string
111n/a s = xmlrpclib.dumps((new_d,), methodresponse=True)
112n/a self.assertIsInstance(s, str)
114n/a def test_newstyle_class(self):
115n/a class T(object):
116n/a pass
117n/a t = T()
118n/a t.x = 100
119n/a t.y = "Hello"
120n/a ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
121n/a self.assertEqual(t2, t.__dict__)
123n/a def test_dump_big_long(self):
124n/a self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
126n/a def test_dump_bad_dict(self):
127n/a self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
129n/a def test_dump_recursive_seq(self):
130n/a l = [1,2,3]
131n/a t = [3,4,5,l]
132n/a l.append(t)
133n/a self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
135n/a def test_dump_recursive_dict(self):
136n/a d = {'1':1, '2':1}
137n/a t = {'3':3, 'd':d}
138n/a d['t'] = t
139n/a self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
141n/a def test_dump_big_int(self):
142n/a if sys.maxsize > 2**31-1:
143n/a self.assertRaises(OverflowError, xmlrpclib.dumps,
144n/a (int(2**34),))
146n/a xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
147n/a self.assertRaises(OverflowError, xmlrpclib.dumps,
148n/a (xmlrpclib.MAXINT+1,))
149n/a self.assertRaises(OverflowError, xmlrpclib.dumps,
150n/a (xmlrpclib.MININT-1,))
152n/a def dummy_write(s):
153n/a pass
155n/a m = xmlrpclib.Marshaller()
156n/a m.dump_int(xmlrpclib.MAXINT, dummy_write)
157n/a m.dump_int(xmlrpclib.MININT, dummy_write)
158n/a self.assertRaises(OverflowError, m.dump_int,
159n/a xmlrpclib.MAXINT+1, dummy_write)
160n/a self.assertRaises(OverflowError, m.dump_int,
161n/a xmlrpclib.MININT-1, dummy_write)
163n/a def test_dump_double(self):
164n/a xmlrpclib.dumps((float(2 ** 34),))
165n/a xmlrpclib.dumps((float(xmlrpclib.MAXINT),
166n/a float(xmlrpclib.MININT)))
167n/a xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
168n/a float(xmlrpclib.MININT - 42)))
170n/a def dummy_write(s):
171n/a pass
173n/a m = xmlrpclib.Marshaller()
174n/a m.dump_double(xmlrpclib.MAXINT, dummy_write)
175n/a m.dump_double(xmlrpclib.MININT, dummy_write)
176n/a m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
177n/a m.dump_double(xmlrpclib.MININT - 42, dummy_write)
179n/a def test_dump_none(self):
180n/a value = alist + [None]
181n/a arg1 = (alist + [None],)
182n/a strg = xmlrpclib.dumps(arg1, allow_none=True)
183n/a self.assertEqual(value,
184n/a xmlrpclib.loads(strg)[0][0])
185n/a self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
187n/a def test_dump_encoding(self):
188n/a value = {'key\u20ac\xa4':
189n/a 'value\u20ac\xa4'}
190n/a strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
191n/a strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
192n/a self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
193n/a strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
194n/a self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196n/a strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
197n/a methodresponse=True)
198n/a self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
199n/a strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
200n/a self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
202n/a methodname = 'method\u20ac\xa4'
203n/a strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
204n/a methodname=methodname)
205n/a self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
206n/a self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
208n/a def test_dump_bytes(self):
209n/a sample = b"my dog has fleas"
210n/a self.assertEqual(sample, xmlrpclib.Binary(sample))
211n/a for type_ in bytes, bytearray, xmlrpclib.Binary:
212n/a value = type_(sample)
213n/a s = xmlrpclib.dumps((value,))
215n/a result, m = xmlrpclib.loads(s, use_builtin_types=True)
216n/a (newvalue,) = result
217n/a self.assertEqual(newvalue, sample)
218n/a self.assertIs(type(newvalue), bytes)
219n/a self.assertIsNone(m)
221n/a result, m = xmlrpclib.loads(s, use_builtin_types=False)
222n/a (newvalue,) = result
223n/a self.assertEqual(newvalue, sample)
224n/a self.assertIs(type(newvalue), xmlrpclib.Binary)
225n/a self.assertIsNone(m)
227n/a def test_loads_unsupported(self):
228n/a ResponseError = xmlrpclib.ResponseError
229n/a data = '<params><param><value><spam/></value></param></params>'
230n/a self.assertRaises(ResponseError, xmlrpclib.loads, data)
231n/a data = ('<params><param><value><array>'
232n/a '<value><spam/></value>'
233n/a '</array></value></param></params>')
234n/a self.assertRaises(ResponseError, xmlrpclib.loads, data)
235n/a data = ('<params><param><value><struct>'
236n/a '<member><name>a</name><value><spam/></value></member>'
237n/a '<member><name>b</name><value><spam/></value></member>'
238n/a '</struct></value></param></params>')
239n/a self.assertRaises(ResponseError, xmlrpclib.loads, data)
241n/a def check_loads(self, s, value, **kwargs):
242n/a dump = '<params><param><value>%s</value></param></params>' % s
243n/a result, m = xmlrpclib.loads(dump, **kwargs)
244n/a (newvalue,) = result
245n/a self.assertEqual(newvalue, value)
246n/a self.assertIs(type(newvalue), type(value))
247n/a self.assertIsNone(m)
249n/a def test_load_standard_types(self):
250n/a check = self.check_loads
251n/a check('string', 'string')
252n/a check('<string>string</string>', 'string')
253n/a check('<string>𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string</string>', '𝔘𝔫𝔦𝔠𝔬𝔡𝔢 string')
254n/a check('<int>2056183947</int>', 2056183947)
255n/a check('<int>-2056183947</int>', -2056183947)
256n/a check('<i4>2056183947</i4>', 2056183947)
257n/a check('<double>46093.78125</double>', 46093.78125)
258n/a check('<boolean>0</boolean>', False)
259n/a check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
260n/a xmlrpclib.Binary(b'\x00byte string\xff'))
261n/a check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
262n/a b'\x00byte string\xff', use_builtin_types=True)
263n/a check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
264n/a xmlrpclib.DateTime('20050210T11:41:23'))
265n/a check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
266n/a datetime.datetime(2005, 2, 10, 11, 41, 23),
267n/a use_builtin_types=True)
268n/a check('<array><data>'
269n/a '<value><int>1</int></value><value><int>2</int></value>'
270n/a '</data></array>', [1, 2])
271n/a check('<struct>'
272n/a '<member><name>b</name><value><int>2</int></value></member>'
273n/a '<member><name>a</name><value><int>1</int></value></member>'
274n/a '</struct>', {'a': 1, 'b': 2})
276n/a def test_load_extension_types(self):
277n/a check = self.check_loads
278n/a check('<nil/>', None)
279n/a check('<ex:nil/>', None)
280n/a check('<i1>205</i1>', 205)
281n/a check('<i2>20561</i2>', 20561)
282n/a check('<i8>9876543210</i8>', 9876543210)
283n/a check('<biginteger>98765432100123456789</biginteger>',
284n/a 98765432100123456789)
285n/a check('<float>93.78125</float>', 93.78125)
286n/a check('<bigdecimal>9876543210.0123456789</bigdecimal>',
287n/a decimal.Decimal('9876543210.0123456789'))
289n/a def test_get_host_info(self):
290n/a # see bug #3613, this raised a TypeError
291n/a transp = xmlrpc.client.Transport()
292n/a self.assertEqual(transp.get_host_info("user@host.tld"),
293n/a ('host.tld',
294n/a [('Authorization', 'Basic dXNlcg==')], {}))
296n/a def test_ssl_presence(self):
297n/a try:
298n/a import ssl
299n/a except ImportError:
300n/a has_ssl = False
301n/a else:
302n/a has_ssl = True
303n/a try:
304n/a xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
305n/a except NotImplementedError:
306n/a self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
307n/a except OSError:
308n/a self.assertTrue(has_ssl)
310n/a @unittest.skipUnless(threading, "Threading required for this test.")
311n/a def test_keepalive_disconnect(self):
312n/a class RequestHandler(http.server.BaseHTTPRequestHandler):
313n/a protocol_version = "HTTP/1.1"
314n/a handled = False
316n/a def do_POST(self):
317n/a length = int(self.headers.get("Content-Length"))
318n/a self.rfile.read(length)
319n/a if self.handled:
320n/a self.close_connection = True
321n/a return
322n/a response = xmlrpclib.dumps((5,), methodresponse=True)
323n/a response = response.encode()
324n/a self.send_response(http.HTTPStatus.OK)
325n/a self.send_header("Content-Length", len(response))
326n/a self.end_headers()
327n/a self.wfile.write(response)
328n/a self.handled = True
329n/a self.close_connection = False
331n/a def run_server():
332n/a server.socket.settimeout(float(1)) # Don't hang if client fails
333n/a server.handle_request() # First request and attempt at second
334n/a server.handle_request() # Retried second request
336n/a server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
337n/a self.addCleanup(server.server_close)
338n/a thread = threading.Thread(target=run_server)
339n/a thread.start()
340n/a self.addCleanup(thread.join)
341n/a url = "http://{}:{}/".format(*server.server_address)
342n/a with xmlrpclib.ServerProxy(url) as p:
343n/a self.assertEqual(p.method(), 5)
344n/a self.assertEqual(p.method(), 5)
346n/aclass HelperTestCase(unittest.TestCase):
347n/a def test_escape(self):
348n/a self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
349n/a self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
350n/a self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
352n/aclass FaultTestCase(unittest.TestCase):
353n/a def test_repr(self):
354n/a f = xmlrpclib.Fault(42, 'Test Fault')
355n/a self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
356n/a self.assertEqual(repr(f), str(f))
358n/a def test_dump_fault(self):
359n/a f = xmlrpclib.Fault(42, 'Test Fault')
360n/a s = xmlrpclib.dumps((f,))
361n/a (newf,), m = xmlrpclib.loads(s)
362n/a self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
363n/a self.assertEqual(m, None)
365n/a s = xmlrpclib.Marshaller().dumps(f)
366n/a self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
368n/a def test_dotted_attribute(self):
369n/a # this will raise AttributeError because code don't want us to use
370n/a # private methods
371n/a self.assertRaises(AttributeError,
372n/a xmlrpc.server.resolve_dotted_attribute, str, '__add')
373n/a self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
375n/aclass DateTimeTestCase(unittest.TestCase):
376n/a def test_default(self):
377n/a with mock.patch('time.localtime') as localtime_mock:
378n/a time_struct = time.struct_time(
379n/a [2013, 7, 15, 0, 24, 49, 0, 196, 0])
380n/a localtime_mock.return_value = time_struct
381n/a localtime = time.localtime()
382n/a t = xmlrpclib.DateTime()
383n/a self.assertEqual(str(t),
384n/a time.strftime("%Y%m%dT%H:%M:%S", localtime))
386n/a def test_time(self):
387n/a d = 1181399930.036952
388n/a t = xmlrpclib.DateTime(d)
389n/a self.assertEqual(str(t),
390n/a time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
392n/a def test_time_tuple(self):
393n/a d = (2007,6,9,10,38,50,5,160,0)
394n/a t = xmlrpclib.DateTime(d)
395n/a self.assertEqual(str(t), '20070609T10:38:50')
397n/a def test_time_struct(self):
398n/a d = time.localtime(1181399930.036952)
399n/a t = xmlrpclib.DateTime(d)
400n/a self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
402n/a def test_datetime_datetime(self):
403n/a d = datetime.datetime(2007,1,2,3,4,5)
404n/a t = xmlrpclib.DateTime(d)
405n/a self.assertEqual(str(t), '20070102T03:04:05')
407n/a def test_repr(self):
408n/a d = datetime.datetime(2007,1,2,3,4,5)
409n/a t = xmlrpclib.DateTime(d)
410n/a val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
411n/a self.assertEqual(repr(t), val)
413n/a def test_decode(self):
414n/a d = ' 20070908T07:11:13 '
415n/a t1 = xmlrpclib.DateTime()
416n/a t1.decode(d)
417n/a tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
418n/a self.assertEqual(t1, tref)
420n/a t2 = xmlrpclib._datetime(d)
421n/a self.assertEqual(t2, tref)
423n/a def test_comparison(self):
424n/a now = datetime.datetime.now()
425n/a dtime = xmlrpclib.DateTime(now.timetuple())
427n/a # datetime vs. DateTime
428n/a self.assertTrue(dtime == now)
429n/a self.assertTrue(now == dtime)
430n/a then = now + datetime.timedelta(seconds=4)
431n/a self.assertTrue(then >= dtime)
432n/a self.assertTrue(dtime < then)
434n/a # str vs. DateTime
435n/a dstr = now.strftime("%Y%m%dT%H:%M:%S")
436n/a self.assertTrue(dtime == dstr)
437n/a self.assertTrue(dstr == dtime)
438n/a dtime_then = xmlrpclib.DateTime(then.timetuple())
439n/a self.assertTrue(dtime_then >= dstr)
440n/a self.assertTrue(dstr < dtime_then)
442n/a # some other types
443n/a dbytes = dstr.encode('ascii')
444n/a dtuple = now.timetuple()
445n/a with self.assertRaises(TypeError):
446n/a dtime == 1970
447n/a with self.assertRaises(TypeError):
448n/a dtime != dbytes
449n/a with self.assertRaises(TypeError):
450n/a dtime == bytearray(dbytes)
451n/a with self.assertRaises(TypeError):
452n/a dtime != dtuple
453n/a with self.assertRaises(TypeError):
454n/a dtime < float(1970)
455n/a with self.assertRaises(TypeError):
456n/a dtime > dbytes
457n/a with self.assertRaises(TypeError):
458n/a dtime <= bytearray(dbytes)
459n/a with self.assertRaises(TypeError):
460n/a dtime >= dtuple
462n/aclass BinaryTestCase(unittest.TestCase):
464n/a # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff"
465n/a # for now (i.e. interpreting the binary data as Latin-1-encoded
466n/a # text). But this feels very unsatisfactory. Perhaps we should
467n/a # only define repr(), and return r"Binary(b'\xff')" instead?
469n/a def test_default(self):
470n/a t = xmlrpclib.Binary()
471n/a self.assertEqual(str(t), '')
473n/a def test_string(self):
474n/a d = b'\x01\x02\x03abc123\xff\xfe'
475n/a t = xmlrpclib.Binary(d)
476n/a self.assertEqual(str(t), str(d, "latin-1"))
478n/a def test_decode(self):
479n/a d = b'\x01\x02\x03abc123\xff\xfe'
480n/a de = base64.encodebytes(d)
481n/a t1 = xmlrpclib.Binary()
482n/a t1.decode(de)
483n/a self.assertEqual(str(t1), str(d, "latin-1"))
485n/a t2 = xmlrpclib._binary(de)
486n/a self.assertEqual(str(t2), str(d, "latin-1"))
489n/aADDR = PORT = URL = None
491n/a# The evt is set twice. First when the server is ready to serve.
492n/a# Second when the server has been shutdown. The user must clear
493n/a# the event after it has been set the first time to catch the second set.
494n/adef http_server(evt, numrequests, requestHandler=None, encoding=None):
495n/a class TestInstanceClass:
496n/a def div(self, x, y):
497n/a return x // y
499n/a def _methodHelp(self, name):
500n/a if name == 'div':
501n/a return 'This is the div function'
503n/a class Fixture:
504n/a @staticmethod
505n/a def getData():
506n/a return '42'
508n/a def my_function():
509n/a '''This is my function'''
510n/a return True
512n/a class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
513n/a def get_request(self):
514n/a # Ensure the socket is always non-blocking. On Linux, socket
515n/a # attributes are not inherited like they are on *BSD and Windows.
516n/a s, port = self.socket.accept()
517n/a s.setblocking(True)
518n/a return s, port
520n/a if not requestHandler:
521n/a requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
522n/a serv = MyXMLRPCServer(("localhost", 0), requestHandler,
523n/a encoding=encoding,
524n/a logRequests=False, bind_and_activate=False)
525n/a try:
526n/a serv.server_bind()
527n/a global ADDR, PORT, URL
528n/a ADDR, PORT = serv.socket.getsockname()
529n/a #connect to IP address directly. This avoids socket.create_connection()
530n/a #trying to connect to "localhost" using all address families, which
531n/a #causes slowdown e.g. on vista which supports AF_INET6. The server listens
532n/a #on AF_INET only.
533n/a URL = "http://%s:%d"%(ADDR, PORT)
534n/a serv.server_activate()
535n/a serv.register_introspection_functions()
536n/a serv.register_multicall_functions()
537n/a serv.register_function(pow)
538n/a serv.register_function(lambda x,y: x+y, 'add')
539n/a serv.register_function(lambda x: x, 'têšt')
540n/a serv.register_function(my_function)
541n/a testInstance = TestInstanceClass()
542n/a serv.register_instance(testInstance, allow_dotted_names=True)
543n/a evt.set()
545n/a # handle up to 'numrequests' requests
546n/a while numrequests > 0:
547n/a serv.handle_request()
548n/a numrequests -= 1
550n/a except socket.timeout:
551n/a pass
552n/a finally:
553n/a serv.socket.close()
554n/a PORT = None
555n/a evt.set()
557n/adef http_multi_server(evt, numrequests, requestHandler=None):
558n/a class TestInstanceClass:
559n/a def div(self, x, y):
560n/a return x // y
562n/a def _methodHelp(self, name):
563n/a if name == 'div':
564n/a return 'This is the div function'
566n/a def my_function():
567n/a '''This is my function'''
568n/a return True
570n/a class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
571n/a def get_request(self):
572n/a # Ensure the socket is always non-blocking. On Linux, socket
573n/a # attributes are not inherited like they are on *BSD and Windows.
574n/a s, port = self.socket.accept()
575n/a s.setblocking(True)
576n/a return s, port
578n/a if not requestHandler:
579n/a requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
580n/a class MyRequestHandler(requestHandler):
581n/a rpc_paths = []
583n/a class BrokenDispatcher:
584n/a def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
585n/a raise RuntimeError("broken dispatcher")
587n/a serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
588n/a logRequests=False, bind_and_activate=False)
589n/a serv.socket.settimeout(3)
590n/a serv.server_bind()
591n/a try:
592n/a global ADDR, PORT, URL
593n/a ADDR, PORT = serv.socket.getsockname()
594n/a #connect to IP address directly. This avoids socket.create_connection()
595n/a #trying to connect to "localhost" using all address families, which
596n/a #causes slowdown e.g. on vista which supports AF_INET6. The server listens
597n/a #on AF_INET only.
598n/a URL = "http://%s:%d"%(ADDR, PORT)
599n/a serv.server_activate()
600n/a paths = ["/foo", "/foo/bar"]
601n/a for path in paths:
602n/a d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
603n/a d.register_introspection_functions()
604n/a d.register_multicall_functions()
605n/a serv.get_dispatcher(paths[0]).register_function(pow)
606n/a serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
607n/a serv.add_dispatcher("/is/broken", BrokenDispatcher())
608n/a evt.set()
610n/a # handle up to 'numrequests' requests
611n/a while numrequests > 0:
612n/a serv.handle_request()
613n/a numrequests -= 1
615n/a except socket.timeout:
616n/a pass
617n/a finally:
618n/a serv.socket.close()
619n/a PORT = None
620n/a evt.set()
622n/a# This function prevents errors like:
623n/a# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
624n/adef is_unavailable_exception(e):
625n/a '''Returns True if the given ProtocolError is the product of a server-side
626n/a exception caused by the 'temporarily unavailable' response sometimes
627n/a given by operations on non-blocking sockets.'''
629n/a # sometimes we get a -1 error code and/or empty headers
630n/a try:
631n/a if e.errcode == -1 or e.headers is None:
632n/a return True
633n/a exc_mess = e.headers.get('X-exception')
634n/a except AttributeError:
635n/a # Ignore OSErrors here.
636n/a exc_mess = str(e)
638n/a if exc_mess and 'temporarily unavailable' in exc_mess.lower():
639n/a return True
641n/adef make_request_and_skipIf(condition, reason):
642n/a # If we skip the test, we have to make a request because
643n/a # the server created in setUp blocks expecting one to come in.
644n/a if not condition:
645n/a return lambda func: func
646n/a def decorator(func):
647n/a def make_request_and_skip(self):
648n/a try:
649n/a xmlrpclib.ServerProxy(URL).my_function()
650n/a except (xmlrpclib.ProtocolError, OSError) as e:
651n/a if not is_unavailable_exception(e):
652n/a raise
653n/a raise unittest.SkipTest(reason)
654n/a return make_request_and_skip
655n/a return decorator
657n/a@unittest.skipUnless(threading, 'Threading required for this test.')
658n/aclass BaseServerTestCase(unittest.TestCase):
659n/a requestHandler = None
660n/a request_count = 1
661n/a threadFunc = staticmethod(http_server)
663n/a def setUp(self):
664n/a # enable traceback reporting
665n/a xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
667n/a self.evt = threading.Event()
668n/a # start server thread to handle requests
669n/a serv_args = (self.evt, self.request_count, self.requestHandler)
670n/a threading.Thread(target=self.threadFunc, args=serv_args).start()
672n/a # wait for the server to be ready
673n/a self.evt.wait()
674n/a self.evt.clear()
676n/a def tearDown(self):
677n/a # wait on the server thread to terminate
678n/a self.evt.wait()
680n/a # disable traceback reporting
681n/a xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
683n/aclass SimpleServerTestCase(BaseServerTestCase):
684n/a def test_simple1(self):
685n/a try:
686n/a p = xmlrpclib.ServerProxy(URL)
687n/a self.assertEqual(p.pow(6,8), 6**8)
688n/a except (xmlrpclib.ProtocolError, OSError) as e:
689n/a # ignore failures due to non-blocking socket 'unavailable' errors
690n/a if not is_unavailable_exception(e):
691n/a # protocol error; provide additional information in test output
692n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
694n/a def test_nonascii(self):
695n/a start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
696n/a end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
697n/a try:
698n/a p = xmlrpclib.ServerProxy(URL)
699n/a self.assertEqual(p.add(start_string, end_string),
700n/a start_string + end_string)
701n/a except (xmlrpclib.ProtocolError, OSError) as e:
702n/a # ignore failures due to non-blocking socket 'unavailable' errors
703n/a if not is_unavailable_exception(e):
704n/a # protocol error; provide additional information in test output
705n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
707n/a def test_client_encoding(self):
708n/a start_string = '\u20ac'
709n/a end_string = '\xa4'
711n/a try:
712n/a p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
713n/a self.assertEqual(p.add(start_string, end_string),
714n/a start_string + end_string)
715n/a except (xmlrpclib.ProtocolError, socket.error) as e:
716n/a # ignore failures due to non-blocking socket unavailable errors.
717n/a if not is_unavailable_exception(e):
718n/a # protocol error; provide additional information in test output
719n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
721n/a def test_nonascii_methodname(self):
722n/a try:
723n/a p = xmlrpclib.ServerProxy(URL, encoding='ascii')
724n/a self.assertEqual(p.têšt(42), 42)
725n/a except (xmlrpclib.ProtocolError, socket.error) as e:
726n/a # ignore failures due to non-blocking socket unavailable errors.
727n/a if not is_unavailable_exception(e):
728n/a # protocol error; provide additional information in test output
729n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
731n/a # [ch] The test 404 is causing lots of false alarms.
732n/a def XXXtest_404(self):
733n/a # send POST with http.client, it should return 404 header and
734n/a # 'Not Found' message.
735n/a conn = httplib.client.HTTPConnection(ADDR, PORT)
736n/a conn.request('POST', '/this-is-not-valid')
737n/a response = conn.getresponse()
738n/a conn.close()
740n/a self.assertEqual(response.status, 404)
741n/a self.assertEqual(response.reason, 'Not Found')
743n/a def test_introspection1(self):
744n/a expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
745n/a 'system.listMethods', 'system.methodHelp',
746n/a 'system.methodSignature', 'system.multicall',
747n/a 'Fixture'])
748n/a try:
749n/a p = xmlrpclib.ServerProxy(URL)
750n/a meth = p.system.listMethods()
751n/a self.assertEqual(set(meth), expected_methods)
752n/a except (xmlrpclib.ProtocolError, OSError) as e:
753n/a # ignore failures due to non-blocking socket 'unavailable' errors
754n/a if not is_unavailable_exception(e):
755n/a # protocol error; provide additional information in test output
756n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
759n/a def test_introspection2(self):
760n/a try:
761n/a # test _methodHelp()
762n/a p = xmlrpclib.ServerProxy(URL)
763n/a divhelp = p.system.methodHelp('div')
764n/a self.assertEqual(divhelp, 'This is the div function')
765n/a except (xmlrpclib.ProtocolError, OSError) as e:
766n/a # ignore failures due to non-blocking socket 'unavailable' errors
767n/a if not is_unavailable_exception(e):
768n/a # protocol error; provide additional information in test output
769n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
771n/a @make_request_and_skipIf(sys.flags.optimize >= 2,
772n/a "Docstrings are omitted with -O2 and above")
773n/a def test_introspection3(self):
774n/a try:
775n/a # test native doc
776n/a p = xmlrpclib.ServerProxy(URL)
777n/a myfunction = p.system.methodHelp('my_function')
778n/a self.assertEqual(myfunction, 'This is my function')
779n/a except (xmlrpclib.ProtocolError, OSError) as e:
780n/a # ignore failures due to non-blocking socket 'unavailable' errors
781n/a if not is_unavailable_exception(e):
782n/a # protocol error; provide additional information in test output
783n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
785n/a def test_introspection4(self):
786n/a # the SimpleXMLRPCServer doesn't support signatures, but
787n/a # at least check that we can try making the call
788n/a try:
789n/a p = xmlrpclib.ServerProxy(URL)
790n/a divsig = p.system.methodSignature('div')
791n/a self.assertEqual(divsig, 'signatures not supported')
792n/a except (xmlrpclib.ProtocolError, OSError) as e:
793n/a # ignore failures due to non-blocking socket 'unavailable' errors
794n/a if not is_unavailable_exception(e):
795n/a # protocol error; provide additional information in test output
796n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
798n/a def test_multicall(self):
799n/a try:
800n/a p = xmlrpclib.ServerProxy(URL)
801n/a multicall = xmlrpclib.MultiCall(p)
802n/a multicall.add(2,3)
803n/a multicall.pow(6,8)
804n/a multicall.div(127,42)
805n/a add_result, pow_result, div_result = multicall()
806n/a self.assertEqual(add_result, 2+3)
807n/a self.assertEqual(pow_result, 6**8)
808n/a self.assertEqual(div_result, 127//42)
809n/a except (xmlrpclib.ProtocolError, OSError) as e:
810n/a # ignore failures due to non-blocking socket 'unavailable' errors
811n/a if not is_unavailable_exception(e):
812n/a # protocol error; provide additional information in test output
813n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
815n/a def test_non_existing_multicall(self):
816n/a try:
817n/a p = xmlrpclib.ServerProxy(URL)
818n/a multicall = xmlrpclib.MultiCall(p)
819n/a multicall.this_is_not_exists()
820n/a result = multicall()
822n/a # result.results contains;
823n/a # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
824n/a # 'method "this_is_not_exists" is not supported'>}]
826n/a self.assertEqual(result.results[0]['faultCode'], 1)
827n/a self.assertEqual(result.results[0]['faultString'],
828n/a '<class \'Exception\'>:method "this_is_not_exists" '
829n/a 'is not supported')
830n/a except (xmlrpclib.ProtocolError, OSError) as e:
831n/a # ignore failures due to non-blocking socket 'unavailable' errors
832n/a if not is_unavailable_exception(e):
833n/a # protocol error; provide additional information in test output
834n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
836n/a def test_dotted_attribute(self):
837n/a # Raises an AttributeError because private methods are not allowed.
838n/a self.assertRaises(AttributeError,
839n/a xmlrpc.server.resolve_dotted_attribute, str, '__add')
841n/a self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
842n/a # Get the test to run faster by sending a request with test_simple1.
843n/a # This avoids waiting for the socket timeout.
844n/a self.test_simple1()
846n/a def test_allow_dotted_names_true(self):
847n/a # XXX also need allow_dotted_names_false test.
848n/a server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
849n/a data = server.Fixture.getData()
850n/a self.assertEqual(data, '42')
852n/a def test_unicode_host(self):
853n/a server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
854n/a self.assertEqual(server.add("a", "\xe9"), "a\xe9")
856n/a def test_partial_post(self):
857n/a # Check that a partial POST doesn't make the server loop: issue #14001.
858n/a conn = http.client.HTTPConnection(ADDR, PORT)
859n/a conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
860n/a conn.close()
862n/a def test_context_manager(self):
863n/a with xmlrpclib.ServerProxy(URL) as server:
864n/a server.add(2, 3)
865n/a self.assertNotEqual(server('transport')._connection,
866n/a (None, None))
867n/a self.assertEqual(server('transport')._connection,
868n/a (None, None))
870n/a def test_context_manager_method_error(self):
871n/a try:
872n/a with xmlrpclib.ServerProxy(URL) as server:
873n/a server.add(2, "a")
874n/a except xmlrpclib.Fault:
875n/a pass
876n/a self.assertEqual(server('transport')._connection,
877n/a (None, None))
880n/aclass SimpleServerEncodingTestCase(BaseServerTestCase):
881n/a @staticmethod
882n/a def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
883n/a http_server(evt, numrequests, requestHandler, 'iso-8859-15')
885n/a def test_server_encoding(self):
886n/a start_string = '\u20ac'
887n/a end_string = '\xa4'
889n/a try:
890n/a p = xmlrpclib.ServerProxy(URL)
891n/a self.assertEqual(p.add(start_string, end_string),
892n/a start_string + end_string)
893n/a except (xmlrpclib.ProtocolError, socket.error) as e:
894n/a # ignore failures due to non-blocking socket unavailable errors.
895n/a if not is_unavailable_exception(e):
896n/a # protocol error; provide additional information in test output
897n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
900n/aclass MultiPathServerTestCase(BaseServerTestCase):
901n/a threadFunc = staticmethod(http_multi_server)
902n/a request_count = 2
903n/a def test_path1(self):
904n/a p = xmlrpclib.ServerProxy(URL+"/foo")
905n/a self.assertEqual(p.pow(6,8), 6**8)
906n/a self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
908n/a def test_path2(self):
909n/a p = xmlrpclib.ServerProxy(URL+"/foo/bar")
910n/a self.assertEqual(p.add(6,8), 6+8)
911n/a self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
913n/a def test_path3(self):
914n/a p = xmlrpclib.ServerProxy(URL+"/is/broken")
915n/a self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
917n/a#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
918n/a#does indeed serve subsequent requests on the same connection
919n/aclass BaseKeepaliveServerTestCase(BaseServerTestCase):
920n/a #a request handler that supports keep-alive and logs requests into a
921n/a #class variable
922n/a class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
923n/a parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
924n/a protocol_version = 'HTTP/1.1'
925n/a myRequests = []
926n/a def handle(self):
927n/a self.myRequests.append([])
928n/a self.reqidx = len(self.myRequests)-1
929n/a return self.parentClass.handle(self)
930n/a def handle_one_request(self):
931n/a result = self.parentClass.handle_one_request(self)
932n/a self.myRequests[self.reqidx].append(self.raw_requestline)
933n/a return result
935n/a requestHandler = RequestHandler
936n/a def setUp(self):
937n/a #clear request log
938n/a self.RequestHandler.myRequests = []
939n/a return BaseServerTestCase.setUp(self)
941n/a#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
942n/a#does indeed serve subsequent requests on the same connection
943n/aclass KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
944n/a def test_two(self):
945n/a p = xmlrpclib.ServerProxy(URL)
946n/a #do three requests.
947n/a self.assertEqual(p.pow(6,8), 6**8)
948n/a self.assertEqual(p.pow(6,8), 6**8)
949n/a self.assertEqual(p.pow(6,8), 6**8)
950n/a p("close")()
952n/a #they should have all been handled by a single request handler
953n/a self.assertEqual(len(self.RequestHandler.myRequests), 1)
955n/a #check that we did at least two (the third may be pending append
956n/a #due to thread scheduling)
957n/a self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
960n/a#test special attribute access on the serverproxy, through the __call__
962n/aclass KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
963n/a #ask for two keepalive requests to be handled.
964n/a request_count=2
966n/a def test_close(self):
967n/a p = xmlrpclib.ServerProxy(URL)
968n/a #do some requests with close.
969n/a self.assertEqual(p.pow(6,8), 6**8)
970n/a self.assertEqual(p.pow(6,8), 6**8)
971n/a self.assertEqual(p.pow(6,8), 6**8)
972n/a p("close")() #this should trigger a new keep-alive request
973n/a self.assertEqual(p.pow(6,8), 6**8)
974n/a self.assertEqual(p.pow(6,8), 6**8)
975n/a self.assertEqual(p.pow(6,8), 6**8)
976n/a p("close")()
978n/a #they should have all been two request handlers, each having logged at least
979n/a #two complete requests
980n/a self.assertEqual(len(self.RequestHandler.myRequests), 2)
981n/a self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
982n/a self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
985n/a def test_transport(self):
986n/a p = xmlrpclib.ServerProxy(URL)
987n/a #do some requests with close.
988n/a self.assertEqual(p.pow(6,8), 6**8)
989n/a p("transport").close() #same as above, really.
990n/a self.assertEqual(p.pow(6,8), 6**8)
991n/a p("close")()
992n/a self.assertEqual(len(self.RequestHandler.myRequests), 2)
994n/a#A test case that verifies that gzip encoding works in both directions
995n/a#(for a request and the response)
996n/a@unittest.skipIf(gzip is None, 'requires gzip')
997n/aclass GzipServerTestCase(BaseServerTestCase):
998n/a #a request handler that supports keep-alive and logs requests into a
999n/a #class variable
1000n/a class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1001n/a parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1002n/a protocol_version = 'HTTP/1.1'
1004n/a def do_POST(self):
1005n/a #store content of last request in class
1006n/a self.__class__.content_length = int(self.headers["content-length"])
1007n/a return self.parentClass.do_POST(self)
1008n/a requestHandler = RequestHandler
1010n/a class Transport(xmlrpclib.Transport):
1011n/a #custom transport, stores the response length for our perusal
1012n/a fake_gzip = False
1013n/a def parse_response(self, response):
1014n/a self.response_length=int(response.getheader("content-length", 0))
1015n/a return xmlrpclib.Transport.parse_response(self, response)
1017n/a def send_content(self, connection, body):
1018n/a if self.fake_gzip:
1019n/a #add a lone gzip header to induce decode error remotely
1020n/a connection.putheader("Content-Encoding", "gzip")
1021n/a return xmlrpclib.Transport.send_content(self, connection, body)
1023n/a def setUp(self):
1024n/a BaseServerTestCase.setUp(self)
1026n/a def test_gzip_request(self):
1027n/a t = self.Transport()
1028n/a t.encode_threshold = None
1029n/a p = xmlrpclib.ServerProxy(URL, transport=t)
1030n/a self.assertEqual(p.pow(6,8), 6**8)
1031n/a a = self.RequestHandler.content_length
1032n/a t.encode_threshold = 0 #turn on request encoding
1033n/a self.assertEqual(p.pow(6,8), 6**8)
1034n/a b = self.RequestHandler.content_length
1035n/a self.assertTrue(a>b)
1036n/a p("close")()
1038n/a def test_bad_gzip_request(self):
1039n/a t = self.Transport()
1040n/a t.encode_threshold = None
1041n/a t.fake_gzip = True
1042n/a p = xmlrpclib.ServerProxy(URL, transport=t)
1043n/a cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1044n/a re.compile(r"\b400\b"))
1045n/a with cm:
1046n/a p.pow(6, 8)
1047n/a p("close")()
1049n/a def test_gzip_response(self):
1050n/a t = self.Transport()
1051n/a p = xmlrpclib.ServerProxy(URL, transport=t)
1052n/a old = self.requestHandler.encode_threshold
1053n/a self.requestHandler.encode_threshold = None #no encoding
1054n/a self.assertEqual(p.pow(6,8), 6**8)
1055n/a a = t.response_length
1056n/a self.requestHandler.encode_threshold = 0 #always encode
1057n/a self.assertEqual(p.pow(6,8), 6**8)
1058n/a p("close")()
1059n/a b = t.response_length
1060n/a self.requestHandler.encode_threshold = old
1061n/a self.assertTrue(a>b)
1064n/a@unittest.skipIf(gzip is None, 'requires gzip')
1065n/aclass GzipUtilTestCase(unittest.TestCase):
1067n/a def test_gzip_decode_limit(self):
1068n/a max_gzip_decode = 20 * 1024 * 1024
1069n/a data = b'\0' * max_gzip_decode
1070n/a encoded = xmlrpclib.gzip_encode(data)
1071n/a decoded = xmlrpclib.gzip_decode(encoded)
1072n/a self.assertEqual(len(decoded), max_gzip_decode)
1074n/a data = b'\0' * (max_gzip_decode + 1)
1075n/a encoded = xmlrpclib.gzip_encode(data)
1077n/a with self.assertRaisesRegex(ValueError,
1078n/a "max gzipped payload length exceeded"):
1079n/a xmlrpclib.gzip_decode(encoded)
1081n/a xmlrpclib.gzip_decode(encoded, max_decode=-1)
1084n/a#Test special attributes of the ServerProxy object
1085n/aclass ServerProxyTestCase(unittest.TestCase):
1086n/a def setUp(self):
1087n/a unittest.TestCase.setUp(self)
1088n/a if threading:
1089n/a self.url = URL
1090n/a else:
1091n/a # Without threading, http_server() and http_multi_server() will not
1092n/a # be executed and URL is still equal to None. 'http://' is a just
1093n/a # enough to choose the scheme (HTTP)
1094n/a self.url = 'http://'
1096n/a def test_close(self):
1097n/a p = xmlrpclib.ServerProxy(self.url)
1098n/a self.assertEqual(p('close')(), None)
1100n/a def test_transport(self):
1101n/a t = xmlrpclib.Transport()
1102n/a p = xmlrpclib.ServerProxy(self.url, transport=t)
1103n/a self.assertEqual(p('transport'), t)
1106n/a# This is a contrived way to make a failure occur on the server side
1107n/a# in order to test the _send_traceback_header flag on the server
1108n/aclass FailingMessageClass(http.client.HTTPMessage):
1109n/a def get(self, key, failobj=None):
1110n/a key = key.lower()
1111n/a if key == 'content-length':
1112n/a return 'I am broken'
1113n/a return super().get(key, failobj)
1116n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1117n/aclass FailingServerTestCase(unittest.TestCase):
1118n/a def setUp(self):
1119n/a self.evt = threading.Event()
1120n/a # start server thread to handle requests
1121n/a serv_args = (self.evt, 1)
1122n/a threading.Thread(target=http_server, args=serv_args).start()
1124n/a # wait for the server to be ready
1125n/a self.evt.wait()
1126n/a self.evt.clear()
1128n/a def tearDown(self):
1129n/a # wait on the server thread to terminate
1130n/a self.evt.wait()
1131n/a # reset flag
1132n/a xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1133n/a # reset message class
1134n/a default_class = http.client.HTTPMessage
1135n/a xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1137n/a def test_basic(self):
1138n/a # check that flag is false by default
1139n/a flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1140n/a self.assertEqual(flagval, False)
1142n/a # enable traceback reporting
1143n/a xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1145n/a # test a call that shouldn't fail just as a smoke test
1146n/a try:
1147n/a p = xmlrpclib.ServerProxy(URL)
1148n/a self.assertEqual(p.pow(6,8), 6**8)
1149n/a except (xmlrpclib.ProtocolError, OSError) as e:
1150n/a # ignore failures due to non-blocking socket 'unavailable' errors
1151n/a if not is_unavailable_exception(e):
1152n/a # protocol error; provide additional information in test output
1153n/a self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1155n/a def test_fail_no_info(self):
1156n/a # use the broken message class
1157n/a xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1159n/a try:
1160n/a p = xmlrpclib.ServerProxy(URL)
1161n/a p.pow(6,8)
1162n/a except (xmlrpclib.ProtocolError, OSError) as e:
1163n/a # ignore failures due to non-blocking socket 'unavailable' errors
1164n/a if not is_unavailable_exception(e) and hasattr(e, "headers"):
1165n/a # The two server-side error headers shouldn't be sent back in this case
1166n/a self.assertTrue(e.headers.get("X-exception") is None)
1167n/a self.assertTrue(e.headers.get("X-traceback") is None)
1168n/a else:
1169n/a self.fail('ProtocolError not raised')
1171n/a def test_fail_with_info(self):
1172n/a # use the broken message class
1173n/a xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1175n/a # Check that errors in the server send back exception/traceback
1176n/a # info when flag is set
1177n/a xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1179n/a try:
1180n/a p = xmlrpclib.ServerProxy(URL)
1181n/a p.pow(6,8)
1182n/a except (xmlrpclib.ProtocolError, OSError) as e:
1183n/a # ignore failures due to non-blocking socket 'unavailable' errors
1184n/a if not is_unavailable_exception(e) and hasattr(e, "headers"):
1185n/a # We should get error info in the response
1186n/a expected_err = "invalid literal for int() with base 10: 'I am broken'"
1187n/a self.assertEqual(e.headers.get("X-exception"), expected_err)
1188n/a self.assertTrue(e.headers.get("X-traceback") is not None)
1189n/a else:
1190n/a self.fail('ProtocolError not raised')
1194n/adef captured_stdout(encoding='utf-8'):
1195n/a """A variation on support.captured_stdout() which gives a text stream
1196n/a having a `buffer` attribute.
1197n/a """
1198n/a orig_stdout = sys.stdout
1199n/a sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1200n/a try:
1201n/a yield sys.stdout
1202n/a finally:
1203n/a sys.stdout = orig_stdout
1206n/aclass CGIHandlerTestCase(unittest.TestCase):
1207n/a def setUp(self):
1208n/a self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1210n/a def tearDown(self):
1211n/a self.cgi = None
1213n/a def test_cgi_get(self):
1214n/a with support.EnvironmentVarGuard() as env:
1215n/a env['REQUEST_METHOD'] = 'GET'
1216n/a # if the method is GET and no request_text is given, it runs handle_get
1217n/a # get sysout output
1218n/a with captured_stdout(encoding=self.cgi.encoding) as data_out:
1219n/a self.cgi.handle_request()
1221n/a # parse Status header
1222n/a data_out.seek(0)
1223n/a handle = data_out.read()
1224n/a status = handle.split()[1]
1225n/a message = ' '.join(handle.split()[2:4])
1227n/a self.assertEqual(status, '400')
1228n/a self.assertEqual(message, 'Bad Request')
1231n/a def test_cgi_xmlrpc_response(self):
1232n/a data = """<?xml version='1.0'?>
1233n/a <methodCall>
1234n/a <methodName>test_method</methodName>
1235n/a <params>
1236n/a <param>
1237n/a <value><string>foo</string></value>
1238n/a </param>
1239n/a <param>
1240n/a <value><string>bar</string></value>
1241n/a </param>
1242n/a </params>
1243n/a </methodCall>
1244n/a """
1246n/a with support.EnvironmentVarGuard() as env, \
1247n/a captured_stdout(encoding=self.cgi.encoding) as data_out, \
1248n/a support.captured_stdin() as data_in:
1249n/a data_in.write(data)
1250n/a data_in.seek(0)
1251n/a env['CONTENT_LENGTH'] = str(len(data))
1252n/a self.cgi.handle_request()
1253n/a data_out.seek(0)
1255n/a # will respond exception, if so, our goal is achieved ;)
1256n/a handle = data_out.read()
1258n/a # start with 44th char so as not to get http header, we just
1259n/a # need only xml
1260n/a self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1262n/a # Also test the content-length returned by handle_request
1263n/a # Using the same test method inorder to avoid all the datapassing
1264n/a # boilerplate code.
1265n/a # Test for bug: http://bugs.python.org/issue5040
1267n/a content = handle[handle.find("<?xml"):]
1269n/a self.assertEqual(
1270n/a int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1271n/a len(content))
1274n/aclass UseBuiltinTypesTestCase(unittest.TestCase):
1276n/a def test_use_builtin_types(self):
1277n/a # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1278n/a # makes all dispatch of binary data as bytes instances, and all
1279n/a # dispatch of datetime argument as datetime.datetime instances.
1280n/a self.log = []
1281n/a expected_bytes = b"my dog has fleas"
1282n/a expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1283n/a marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1284n/a def foobar(*args):
1285n/a self.log.extend(args)
1286n/a handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1287n/a allow_none=True, encoding=None, use_builtin_types=True)
1288n/a handler.register_function(foobar)
1289n/a handler._marshaled_dispatch(marshaled)
1290n/a self.assertEqual(len(self.log), 2)
1291n/a mybytes, mydate = self.log
1292n/a self.assertEqual(self.log, [expected_bytes, expected_date])
1293n/a self.assertIs(type(mydate), datetime.datetime)
1294n/a self.assertIs(type(mybytes), bytes)
1296n/a def test_cgihandler_has_use_builtin_types_flag(self):
1297n/a handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1298n/a self.assertTrue(handler.use_builtin_types)
1300n/a def test_xmlrpcserver_has_use_builtin_types_flag(self):
1301n/a server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1302n/a use_builtin_types=True)
1303n/a server.server_close()
1304n/a self.assertTrue(server.use_builtin_types)
1308n/adef test_main():
1309n/a support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1310n/a BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1311n/a SimpleServerTestCase, SimpleServerEncodingTestCase,
1312n/a KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1313n/a GzipServerTestCase, GzipUtilTestCase,
1314n/a MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1315n/a CGIHandlerTestCase)
1318n/aif __name__ == "__main__":
1319n/a test_main()