ยปCore Development>Code coverage>Lib/test/multibytecodec_support.py

Python code coverage for Lib/test/multibytecodec_support.py

#countcontent
1n/a#
2n/a# multibytecodec_support.py
3n/a# Common Unittest Routines for CJK codecs
4n/a#
5n/a
6n/aimport codecs
7n/aimport os
8n/aimport re
9n/aimport sys
10n/aimport unittest
11n/afrom http.client import HTTPException
12n/afrom test import support
13n/afrom io import BytesIO
14n/a
15n/aclass TestBase:
16n/a encoding = '' # codec name
17n/a codec = None # codec tuple (with 4 elements)
18n/a tstring = None # must set. 2 strings to test StreamReader
19n/a
20n/a codectests = None # must set. codec test tuple
21n/a roundtriptest = 1 # set if roundtrip is possible with unicode
22n/a has_iso10646 = 0 # set if this encoding contains whole iso10646 map
23n/a xmlcharnametest = None # string to test xmlcharrefreplace
24n/a unmappedunicode = '\udeee' # a unicode code point that is not mapped.
25n/a
26n/a def setUp(self):
27n/a if self.codec is None:
28n/a self.codec = codecs.lookup(self.encoding)
29n/a self.encode = self.codec.encode
30n/a self.decode = self.codec.decode
31n/a self.reader = self.codec.streamreader
32n/a self.writer = self.codec.streamwriter
33n/a self.incrementalencoder = self.codec.incrementalencoder
34n/a self.incrementaldecoder = self.codec.incrementaldecoder
35n/a
36n/a def test_chunkcoding(self):
37n/a tstring_lines = []
38n/a for b in self.tstring:
39n/a lines = b.split(b"\n")
40n/a last = lines.pop()
41n/a assert last == b""
42n/a lines = [line + b"\n" for line in lines]
43n/a tstring_lines.append(lines)
44n/a for native, utf8 in zip(*tstring_lines):
45n/a u = self.decode(native)[0]
46n/a self.assertEqual(u, utf8.decode('utf-8'))
47n/a if self.roundtriptest:
48n/a self.assertEqual(native, self.encode(u)[0])
49n/a
50n/a def test_errorhandle(self):
51n/a for source, scheme, expected in self.codectests:
52n/a if isinstance(source, bytes):
53n/a func = self.decode
54n/a else:
55n/a func = self.encode
56n/a if expected:
57n/a result = func(source, scheme)[0]
58n/a if func is self.decode:
59n/a self.assertTrue(type(result) is str, type(result))
60n/a self.assertEqual(result, expected,
61n/a '%a.decode(%r, %r)=%a != %a'
62n/a % (source, self.encoding, scheme, result,
63n/a expected))
64n/a else:
65n/a self.assertTrue(type(result) is bytes, type(result))
66n/a self.assertEqual(result, expected,
67n/a '%a.encode(%r, %r)=%a != %a'
68n/a % (source, self.encoding, scheme, result,
69n/a expected))
70n/a else:
71n/a self.assertRaises(UnicodeError, func, source, scheme)
72n/a
73n/a def test_xmlcharrefreplace(self):
74n/a if self.has_iso10646:
75n/a self.skipTest('encoding contains full ISO 10646 map')
76n/a
77n/a s = "\u0b13\u0b23\u0b60 nd eggs"
78n/a self.assertEqual(
79n/a self.encode(s, "xmlcharrefreplace")[0],
80n/a b"ଓଣୠ nd eggs"
81n/a )
82n/a
83n/a def test_customreplace_encode(self):
84n/a if self.has_iso10646:
85n/a self.skipTest('encoding contains full ISO 10646 map')
86n/a
87n/a from html.entities import codepoint2name
88n/a
89n/a def xmlcharnamereplace(exc):
90n/a if not isinstance(exc, UnicodeEncodeError):
91n/a raise TypeError("don't know how to handle %r" % exc)
92n/a l = []
93n/a for c in exc.object[exc.start:exc.end]:
94n/a if ord(c) in codepoint2name:
95n/a l.append("&%s;" % codepoint2name[ord(c)])
96n/a else:
97n/a l.append("&#%d;" % ord(c))
98n/a return ("".join(l), exc.end)
99n/a
100n/a codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace)
101n/a
102n/a if self.xmlcharnametest:
103n/a sin, sout = self.xmlcharnametest
104n/a else:
105n/a sin = "\xab\u211c\xbb = \u2329\u1234\u232a"
106n/a sout = b"«ℜ» = ⟨ሴ⟩"
107n/a self.assertEqual(self.encode(sin,
108n/a "test.xmlcharnamereplace")[0], sout)
109n/a
110n/a def test_callback_returns_bytes(self):
111n/a def myreplace(exc):
112n/a return (b"1234", exc.end)
113n/a codecs.register_error("test.cjktest", myreplace)
114n/a enc = self.encode("abc" + self.unmappedunicode + "def", "test.cjktest")[0]
115n/a self.assertEqual(enc, b"abc1234def")
116n/a
117n/a def test_callback_wrong_objects(self):
118n/a def myreplace(exc):
119n/a return (ret, exc.end)
120n/a codecs.register_error("test.cjktest", myreplace)
121n/a
122n/a for ret in ([1, 2, 3], [], None, object()):
123n/a self.assertRaises(TypeError, self.encode, self.unmappedunicode,
124n/a 'test.cjktest')
125n/a
126n/a def test_callback_long_index(self):
127n/a def myreplace(exc):
128n/a return ('x', int(exc.end))
129n/a codecs.register_error("test.cjktest", myreplace)
130n/a self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
131n/a 'test.cjktest'), (b'abcdxefgh', 9))
132n/a
133n/a def myreplace(exc):
134n/a return ('x', sys.maxsize + 1)
135n/a codecs.register_error("test.cjktest", myreplace)
136n/a self.assertRaises(IndexError, self.encode, self.unmappedunicode,
137n/a 'test.cjktest')
138n/a
139n/a def test_callback_None_index(self):
140n/a def myreplace(exc):
141n/a return ('x', None)
142n/a codecs.register_error("test.cjktest", myreplace)
143n/a self.assertRaises(TypeError, self.encode, self.unmappedunicode,
144n/a 'test.cjktest')
145n/a
146n/a def test_callback_backward_index(self):
147n/a def myreplace(exc):
148n/a if myreplace.limit > 0:
149n/a myreplace.limit -= 1
150n/a return ('REPLACED', 0)
151n/a else:
152n/a return ('TERMINAL', exc.end)
153n/a myreplace.limit = 3
154n/a codecs.register_error("test.cjktest", myreplace)
155n/a self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
156n/a 'test.cjktest'),
157n/a (b'abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9))
158n/a
159n/a def test_callback_forward_index(self):
160n/a def myreplace(exc):
161n/a return ('REPLACED', exc.end + 2)
162n/a codecs.register_error("test.cjktest", myreplace)
163n/a self.assertEqual(self.encode('abcd' + self.unmappedunicode + 'efgh',
164n/a 'test.cjktest'), (b'abcdREPLACEDgh', 9))
165n/a
166n/a def test_callback_index_outofbound(self):
167n/a def myreplace(exc):
168n/a return ('TERM', 100)
169n/a codecs.register_error("test.cjktest", myreplace)
170n/a self.assertRaises(IndexError, self.encode, self.unmappedunicode,
171n/a 'test.cjktest')
172n/a
173n/a def test_incrementalencoder(self):
174n/a UTF8Reader = codecs.getreader('utf-8')
175n/a for sizehint in [None] + list(range(1, 33)) + \
176n/a [64, 128, 256, 512, 1024]:
177n/a istream = UTF8Reader(BytesIO(self.tstring[1]))
178n/a ostream = BytesIO()
179n/a encoder = self.incrementalencoder()
180n/a while 1:
181n/a if sizehint is not None:
182n/a data = istream.read(sizehint)
183n/a else:
184n/a data = istream.read()
185n/a
186n/a if not data:
187n/a break
188n/a e = encoder.encode(data)
189n/a ostream.write(e)
190n/a
191n/a self.assertEqual(ostream.getvalue(), self.tstring[0])
192n/a
193n/a def test_incrementaldecoder(self):
194n/a UTF8Writer = codecs.getwriter('utf-8')
195n/a for sizehint in [None, -1] + list(range(1, 33)) + \
196n/a [64, 128, 256, 512, 1024]:
197n/a istream = BytesIO(self.tstring[0])
198n/a ostream = UTF8Writer(BytesIO())
199n/a decoder = self.incrementaldecoder()
200n/a while 1:
201n/a data = istream.read(sizehint)
202n/a if not data:
203n/a break
204n/a else:
205n/a u = decoder.decode(data)
206n/a ostream.write(u)
207n/a
208n/a self.assertEqual(ostream.getvalue(), self.tstring[1])
209n/a
210n/a def test_incrementalencoder_error_callback(self):
211n/a inv = self.unmappedunicode
212n/a
213n/a e = self.incrementalencoder()
214n/a self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
215n/a
216n/a e.errors = 'ignore'
217n/a self.assertEqual(e.encode(inv, True), b'')
218n/a
219n/a e.reset()
220n/a def tempreplace(exc):
221n/a return ('called', exc.end)
222n/a codecs.register_error('test.incremental_error_callback', tempreplace)
223n/a e.errors = 'test.incremental_error_callback'
224n/a self.assertEqual(e.encode(inv, True), b'called')
225n/a
226n/a # again
227n/a e.errors = 'ignore'
228n/a self.assertEqual(e.encode(inv, True), b'')
229n/a
230n/a def test_streamreader(self):
231n/a UTF8Writer = codecs.getwriter('utf-8')
232n/a for name in ["read", "readline", "readlines"]:
233n/a for sizehint in [None, -1] + list(range(1, 33)) + \
234n/a [64, 128, 256, 512, 1024]:
235n/a istream = self.reader(BytesIO(self.tstring[0]))
236n/a ostream = UTF8Writer(BytesIO())
237n/a func = getattr(istream, name)
238n/a while 1:
239n/a data = func(sizehint)
240n/a if not data:
241n/a break
242n/a if name == "readlines":
243n/a ostream.writelines(data)
244n/a else:
245n/a ostream.write(data)
246n/a
247n/a self.assertEqual(ostream.getvalue(), self.tstring[1])
248n/a
249n/a def test_streamwriter(self):
250n/a readfuncs = ('read', 'readline', 'readlines')
251n/a UTF8Reader = codecs.getreader('utf-8')
252n/a for name in readfuncs:
253n/a for sizehint in [None] + list(range(1, 33)) + \
254n/a [64, 128, 256, 512, 1024]:
255n/a istream = UTF8Reader(BytesIO(self.tstring[1]))
256n/a ostream = self.writer(BytesIO())
257n/a func = getattr(istream, name)
258n/a while 1:
259n/a if sizehint is not None:
260n/a data = func(sizehint)
261n/a else:
262n/a data = func()
263n/a
264n/a if not data:
265n/a break
266n/a if name == "readlines":
267n/a ostream.writelines(data)
268n/a else:
269n/a ostream.write(data)
270n/a
271n/a self.assertEqual(ostream.getvalue(), self.tstring[0])
272n/a
273n/a def test_streamwriter_reset_no_pending(self):
274n/a # Issue #23247: Calling reset() on a fresh StreamWriter instance
275n/a # (without pending data) must not crash
276n/a stream = BytesIO()
277n/a writer = self.writer(stream)
278n/a writer.reset()
279n/a
280n/a
281n/aclass TestBase_Mapping(unittest.TestCase):
282n/a pass_enctest = []
283n/a pass_dectest = []
284n/a supmaps = []
285n/a codectests = []
286n/a
287n/a def setUp(self):
288n/a try:
289n/a self.open_mapping_file().close() # test it to report the error early
290n/a except (OSError, HTTPException):
291n/a self.skipTest("Could not retrieve "+self.mapfileurl)
292n/a
293n/a def open_mapping_file(self):
294n/a return support.open_urlresource(self.mapfileurl)
295n/a
296n/a def test_mapping_file(self):
297n/a if self.mapfileurl.endswith('.xml'):
298n/a self._test_mapping_file_ucm()
299n/a else:
300n/a self._test_mapping_file_plain()
301n/a
302n/a def _test_mapping_file_plain(self):
303n/a unichrs = lambda s: ''.join(map(chr, map(eval, s.split('+'))))
304n/a urt_wa = {}
305n/a
306n/a with self.open_mapping_file() as f:
307n/a for line in f:
308n/a if not line:
309n/a break
310n/a data = line.split('#')[0].strip().split()
311n/a if len(data) != 2:
312n/a continue
313n/a
314n/a csetval = eval(data[0])
315n/a if csetval <= 0x7F:
316n/a csetch = bytes([csetval & 0xff])
317n/a elif csetval >= 0x1000000:
318n/a csetch = bytes([(csetval >> 24), ((csetval >> 16) & 0xff),
319n/a ((csetval >> 8) & 0xff), (csetval & 0xff)])
320n/a elif csetval >= 0x10000:
321n/a csetch = bytes([(csetval >> 16), ((csetval >> 8) & 0xff),
322n/a (csetval & 0xff)])
323n/a elif csetval >= 0x100:
324n/a csetch = bytes([(csetval >> 8), (csetval & 0xff)])
325n/a else:
326n/a continue
327n/a
328n/a unich = unichrs(data[1])
329n/a if ord(unich) == 0xfffd or unich in urt_wa:
330n/a continue
331n/a urt_wa[unich] = csetch
332n/a
333n/a self._testpoint(csetch, unich)
334n/a
335n/a def _test_mapping_file_ucm(self):
336n/a with self.open_mapping_file() as f:
337n/a ucmdata = f.read()
338n/a uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata)
339n/a for uni, coded in uc:
340n/a unich = chr(int(uni, 16))
341n/a codech = bytes.fromhex(coded)
342n/a self._testpoint(codech, unich)
343n/a
344n/a def test_mapping_supplemental(self):
345n/a for mapping in self.supmaps:
346n/a self._testpoint(*mapping)
347n/a
348n/a def _testpoint(self, csetch, unich):
349n/a if (csetch, unich) not in self.pass_enctest:
350n/a self.assertEqual(unich.encode(self.encoding), csetch)
351n/a if (csetch, unich) not in self.pass_dectest:
352n/a self.assertEqual(str(csetch, self.encoding), unich)
353n/a
354n/a def test_errorhandle(self):
355n/a for source, scheme, expected in self.codectests:
356n/a if isinstance(source, bytes):
357n/a func = source.decode
358n/a else:
359n/a func = source.encode
360n/a if expected:
361n/a if isinstance(source, bytes):
362n/a result = func(self.encoding, scheme)
363n/a self.assertTrue(type(result) is str, type(result))
364n/a self.assertEqual(result, expected,
365n/a '%a.decode(%r, %r)=%a != %a'
366n/a % (source, self.encoding, scheme, result,
367n/a expected))
368n/a else:
369n/a result = func(self.encoding, scheme)
370n/a self.assertTrue(type(result) is bytes, type(result))
371n/a self.assertEqual(result, expected,
372n/a '%a.encode(%r, %r)=%a != %a'
373n/a % (source, self.encoding, scheme, result,
374n/a expected))
375n/a else:
376n/a self.assertRaises(UnicodeError, func, self.encoding, scheme)
377n/a
378n/adef load_teststring(name):
379n/a dir = os.path.join(os.path.dirname(__file__), 'cjkencodings')
380n/a with open(os.path.join(dir, name + '.txt'), 'rb') as f:
381n/a encoded = f.read()
382n/a with open(os.path.join(dir, name + '-utf8.txt'), 'rb') as f:
383n/a utf8 = f.read()
384n/a return encoded, utf8