ยปCore Development>Code coverage>Lib/test/test_io.py

Python code coverage for Lib/test/test_io.py

#countcontent
1n/a"""Unit tests for the io module."""
2n/a
3n/a# Tests of io are scattered over the test suite:
4n/a# * test_bufio - tests file buffering
5n/a# * test_memoryio - tests BytesIO and StringIO
6n/a# * test_fileio - tests FileIO
7n/a# * test_file - tests the file interface
8n/a# * test_io - tests everything else in the io module
9n/a# * test_univnewlines - tests universal newline support
10n/a# * test_largefile - tests operations on a file greater than 2**32 bytes
11n/a# (only enabled with -ulargefile)
12n/a
13n/a################################################################################
14n/a# ATTENTION TEST WRITERS!!!
15n/a################################################################################
16n/a# When writing tests for io, it's important to test both the C and Python
17n/a# implementations. This is usually done by writing a base test that refers to
18n/a# the type it is testing as an attribute. Then it provides custom subclasses to
19n/a# test both implementations. This file has lots of examples.
20n/a################################################################################
21n/a
22n/aimport abc
23n/aimport array
24n/aimport errno
25n/aimport locale
26n/aimport os
27n/aimport pickle
28n/aimport random
29n/aimport signal
30n/aimport sys
31n/aimport time
32n/aimport unittest
33n/aimport warnings
34n/aimport weakref
35n/afrom collections import deque, UserList
36n/afrom itertools import cycle, count
37n/afrom test import support
38n/afrom test.support.script_helper import assert_python_ok, run_python_until_end
39n/a
40n/aimport codecs
41n/aimport io # C implementation of io
42n/aimport _pyio as pyio # Python implementation of io
43n/atry:
44n/a import threading
45n/aexcept ImportError:
46n/a threading = None
47n/a
48n/atry:
49n/a import ctypes
50n/aexcept ImportError:
51n/a def byteslike(*pos, **kw):
52n/a return array.array("b", bytes(*pos, **kw))
53n/aelse:
54n/a def byteslike(*pos, **kw):
55n/a """Create a bytes-like object having no string or sequence methods"""
56n/a data = bytes(*pos, **kw)
57n/a obj = EmptyStruct()
58n/a ctypes.resize(obj, len(data))
59n/a memoryview(obj).cast("B")[:] = data
60n/a return obj
61n/a class EmptyStruct(ctypes.Structure):
62n/a pass
63n/a
64n/adef _default_chunk_size():
65n/a """Get the default TextIOWrapper chunk size"""
66n/a with open(__file__, "r", encoding="latin-1") as f:
67n/a return f._CHUNK_SIZE
68n/a
69n/a
70n/aclass MockRawIOWithoutRead:
71n/a """A RawIO implementation without read(), so as to exercise the default
72n/a RawIO.read() which calls readinto()."""
73n/a
74n/a def __init__(self, read_stack=()):
75n/a self._read_stack = list(read_stack)
76n/a self._write_stack = []
77n/a self._reads = 0
78n/a self._extraneous_reads = 0
79n/a
80n/a def write(self, b):
81n/a self._write_stack.append(bytes(b))
82n/a return len(b)
83n/a
84n/a def writable(self):
85n/a return True
86n/a
87n/a def fileno(self):
88n/a return 42
89n/a
90n/a def readable(self):
91n/a return True
92n/a
93n/a def seekable(self):
94n/a return True
95n/a
96n/a def seek(self, pos, whence):
97n/a return 0 # wrong but we gotta return something
98n/a
99n/a def tell(self):
100n/a return 0 # same comment as above
101n/a
102n/a def readinto(self, buf):
103n/a self._reads += 1
104n/a max_len = len(buf)
105n/a try:
106n/a data = self._read_stack[0]
107n/a except IndexError:
108n/a self._extraneous_reads += 1
109n/a return 0
110n/a if data is None:
111n/a del self._read_stack[0]
112n/a return None
113n/a n = len(data)
114n/a if len(data) <= max_len:
115n/a del self._read_stack[0]
116n/a buf[:n] = data
117n/a return n
118n/a else:
119n/a buf[:] = data[:max_len]
120n/a self._read_stack[0] = data[max_len:]
121n/a return max_len
122n/a
123n/a def truncate(self, pos=None):
124n/a return pos
125n/a
126n/aclass CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
127n/a pass
128n/a
129n/aclass PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
130n/a pass
131n/a
132n/a
133n/aclass MockRawIO(MockRawIOWithoutRead):
134n/a
135n/a def read(self, n=None):
136n/a self._reads += 1
137n/a try:
138n/a return self._read_stack.pop(0)
139n/a except:
140n/a self._extraneous_reads += 1
141n/a return b""
142n/a
143n/aclass CMockRawIO(MockRawIO, io.RawIOBase):
144n/a pass
145n/a
146n/aclass PyMockRawIO(MockRawIO, pyio.RawIOBase):
147n/a pass
148n/a
149n/a
150n/aclass MisbehavedRawIO(MockRawIO):
151n/a def write(self, b):
152n/a return super().write(b) * 2
153n/a
154n/a def read(self, n=None):
155n/a return super().read(n) * 2
156n/a
157n/a def seek(self, pos, whence):
158n/a return -123
159n/a
160n/a def tell(self):
161n/a return -456
162n/a
163n/a def readinto(self, buf):
164n/a super().readinto(buf)
165n/a return len(buf) * 5
166n/a
167n/aclass CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
168n/a pass
169n/a
170n/aclass PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
171n/a pass
172n/a
173n/a
174n/aclass CloseFailureIO(MockRawIO):
175n/a closed = 0
176n/a
177n/a def close(self):
178n/a if not self.closed:
179n/a self.closed = 1
180n/a raise OSError
181n/a
182n/aclass CCloseFailureIO(CloseFailureIO, io.RawIOBase):
183n/a pass
184n/a
185n/aclass PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
186n/a pass
187n/a
188n/a
189n/aclass MockFileIO:
190n/a
191n/a def __init__(self, data):
192n/a self.read_history = []
193n/a super().__init__(data)
194n/a
195n/a def read(self, n=None):
196n/a res = super().read(n)
197n/a self.read_history.append(None if res is None else len(res))
198n/a return res
199n/a
200n/a def readinto(self, b):
201n/a res = super().readinto(b)
202n/a self.read_history.append(res)
203n/a return res
204n/a
205n/aclass CMockFileIO(MockFileIO, io.BytesIO):
206n/a pass
207n/a
208n/aclass PyMockFileIO(MockFileIO, pyio.BytesIO):
209n/a pass
210n/a
211n/a
212n/aclass MockUnseekableIO:
213n/a def seekable(self):
214n/a return False
215n/a
216n/a def seek(self, *args):
217n/a raise self.UnsupportedOperation("not seekable")
218n/a
219n/a def tell(self, *args):
220n/a raise self.UnsupportedOperation("not seekable")
221n/a
222n/a def truncate(self, *args):
223n/a raise self.UnsupportedOperation("not seekable")
224n/a
225n/aclass CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
226n/a UnsupportedOperation = io.UnsupportedOperation
227n/a
228n/aclass PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
229n/a UnsupportedOperation = pyio.UnsupportedOperation
230n/a
231n/a
232n/aclass MockNonBlockWriterIO:
233n/a
234n/a def __init__(self):
235n/a self._write_stack = []
236n/a self._blocker_char = None
237n/a
238n/a def pop_written(self):
239n/a s = b"".join(self._write_stack)
240n/a self._write_stack[:] = []
241n/a return s
242n/a
243n/a def block_on(self, char):
244n/a """Block when a given char is encountered."""
245n/a self._blocker_char = char
246n/a
247n/a def readable(self):
248n/a return True
249n/a
250n/a def seekable(self):
251n/a return True
252n/a
253n/a def writable(self):
254n/a return True
255n/a
256n/a def write(self, b):
257n/a b = bytes(b)
258n/a n = -1
259n/a if self._blocker_char:
260n/a try:
261n/a n = b.index(self._blocker_char)
262n/a except ValueError:
263n/a pass
264n/a else:
265n/a if n > 0:
266n/a # write data up to the first blocker
267n/a self._write_stack.append(b[:n])
268n/a return n
269n/a else:
270n/a # cancel blocker and indicate would block
271n/a self._blocker_char = None
272n/a return None
273n/a self._write_stack.append(b)
274n/a return len(b)
275n/a
276n/aclass CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
277n/a BlockingIOError = io.BlockingIOError
278n/a
279n/aclass PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
280n/a BlockingIOError = pyio.BlockingIOError
281n/a
282n/a
283n/aclass IOTest(unittest.TestCase):
284n/a
285n/a def setUp(self):
286n/a support.unlink(support.TESTFN)
287n/a
288n/a def tearDown(self):
289n/a support.unlink(support.TESTFN)
290n/a
291n/a def write_ops(self, f):
292n/a self.assertEqual(f.write(b"blah."), 5)
293n/a f.truncate(0)
294n/a self.assertEqual(f.tell(), 5)
295n/a f.seek(0)
296n/a
297n/a self.assertEqual(f.write(b"blah."), 5)
298n/a self.assertEqual(f.seek(0), 0)
299n/a self.assertEqual(f.write(b"Hello."), 6)
300n/a self.assertEqual(f.tell(), 6)
301n/a self.assertEqual(f.seek(-1, 1), 5)
302n/a self.assertEqual(f.tell(), 5)
303n/a buffer = bytearray(b" world\n\n\n")
304n/a self.assertEqual(f.write(buffer), 9)
305n/a buffer[:] = b"*" * 9 # Overwrite our copy of the data
306n/a self.assertEqual(f.seek(0), 0)
307n/a self.assertEqual(f.write(b"h"), 1)
308n/a self.assertEqual(f.seek(-1, 2), 13)
309n/a self.assertEqual(f.tell(), 13)
310n/a
311n/a self.assertEqual(f.truncate(12), 12)
312n/a self.assertEqual(f.tell(), 13)
313n/a self.assertRaises(TypeError, f.seek, 0.0)
314n/a
315n/a def read_ops(self, f, buffered=False):
316n/a data = f.read(5)
317n/a self.assertEqual(data, b"hello")
318n/a data = byteslike(data)
319n/a self.assertEqual(f.readinto(data), 5)
320n/a self.assertEqual(bytes(data), b" worl")
321n/a data = bytearray(5)
322n/a self.assertEqual(f.readinto(data), 2)
323n/a self.assertEqual(len(data), 5)
324n/a self.assertEqual(data[:2], b"d\n")
325n/a self.assertEqual(f.seek(0), 0)
326n/a self.assertEqual(f.read(20), b"hello world\n")
327n/a self.assertEqual(f.read(1), b"")
328n/a self.assertEqual(f.readinto(byteslike(b"x")), 0)
329n/a self.assertEqual(f.seek(-6, 2), 6)
330n/a self.assertEqual(f.read(5), b"world")
331n/a self.assertEqual(f.read(0), b"")
332n/a self.assertEqual(f.readinto(byteslike()), 0)
333n/a self.assertEqual(f.seek(-6, 1), 5)
334n/a self.assertEqual(f.read(5), b" worl")
335n/a self.assertEqual(f.tell(), 10)
336n/a self.assertRaises(TypeError, f.seek, 0.0)
337n/a if buffered:
338n/a f.seek(0)
339n/a self.assertEqual(f.read(), b"hello world\n")
340n/a f.seek(6)
341n/a self.assertEqual(f.read(), b"world\n")
342n/a self.assertEqual(f.read(), b"")
343n/a f.seek(0)
344n/a data = byteslike(5)
345n/a self.assertEqual(f.readinto1(data), 5)
346n/a self.assertEqual(bytes(data), b"hello")
347n/a
348n/a LARGE = 2**31
349n/a
350n/a def large_file_ops(self, f):
351n/a assert f.readable()
352n/a assert f.writable()
353n/a try:
354n/a self.assertEqual(f.seek(self.LARGE), self.LARGE)
355n/a except (OverflowError, ValueError):
356n/a self.skipTest("no largefile support")
357n/a self.assertEqual(f.tell(), self.LARGE)
358n/a self.assertEqual(f.write(b"xxx"), 3)
359n/a self.assertEqual(f.tell(), self.LARGE + 3)
360n/a self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
361n/a self.assertEqual(f.truncate(), self.LARGE + 2)
362n/a self.assertEqual(f.tell(), self.LARGE + 2)
363n/a self.assertEqual(f.seek(0, 2), self.LARGE + 2)
364n/a self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
365n/a self.assertEqual(f.tell(), self.LARGE + 2)
366n/a self.assertEqual(f.seek(0, 2), self.LARGE + 1)
367n/a self.assertEqual(f.seek(-1, 2), self.LARGE)
368n/a self.assertEqual(f.read(2), b"x")
369n/a
370n/a def test_invalid_operations(self):
371n/a # Try writing on a file opened in read mode and vice-versa.
372n/a exc = self.UnsupportedOperation
373n/a for mode in ("w", "wb"):
374n/a with self.open(support.TESTFN, mode) as fp:
375n/a self.assertRaises(exc, fp.read)
376n/a self.assertRaises(exc, fp.readline)
377n/a with self.open(support.TESTFN, "wb", buffering=0) as fp:
378n/a self.assertRaises(exc, fp.read)
379n/a self.assertRaises(exc, fp.readline)
380n/a with self.open(support.TESTFN, "rb", buffering=0) as fp:
381n/a self.assertRaises(exc, fp.write, b"blah")
382n/a self.assertRaises(exc, fp.writelines, [b"blah\n"])
383n/a with self.open(support.TESTFN, "rb") as fp:
384n/a self.assertRaises(exc, fp.write, b"blah")
385n/a self.assertRaises(exc, fp.writelines, [b"blah\n"])
386n/a with self.open(support.TESTFN, "r") as fp:
387n/a self.assertRaises(exc, fp.write, "blah")
388n/a self.assertRaises(exc, fp.writelines, ["blah\n"])
389n/a # Non-zero seeking from current or end pos
390n/a self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
391n/a self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
392n/a
393n/a def test_optional_abilities(self):
394n/a # Test for OSError when optional APIs are not supported
395n/a # The purpose of this test is to try fileno(), reading, writing and
396n/a # seeking operations with various objects that indicate they do not
397n/a # support these operations.
398n/a
399n/a def pipe_reader():
400n/a [r, w] = os.pipe()
401n/a os.close(w) # So that read() is harmless
402n/a return self.FileIO(r, "r")
403n/a
404n/a def pipe_writer():
405n/a [r, w] = os.pipe()
406n/a self.addCleanup(os.close, r)
407n/a # Guarantee that we can write into the pipe without blocking
408n/a thread = threading.Thread(target=os.read, args=(r, 100))
409n/a thread.start()
410n/a self.addCleanup(thread.join)
411n/a return self.FileIO(w, "w")
412n/a
413n/a def buffered_reader():
414n/a return self.BufferedReader(self.MockUnseekableIO())
415n/a
416n/a def buffered_writer():
417n/a return self.BufferedWriter(self.MockUnseekableIO())
418n/a
419n/a def buffered_random():
420n/a return self.BufferedRandom(self.BytesIO())
421n/a
422n/a def buffered_rw_pair():
423n/a return self.BufferedRWPair(self.MockUnseekableIO(),
424n/a self.MockUnseekableIO())
425n/a
426n/a def text_reader():
427n/a class UnseekableReader(self.MockUnseekableIO):
428n/a writable = self.BufferedIOBase.writable
429n/a write = self.BufferedIOBase.write
430n/a return self.TextIOWrapper(UnseekableReader(), "ascii")
431n/a
432n/a def text_writer():
433n/a class UnseekableWriter(self.MockUnseekableIO):
434n/a readable = self.BufferedIOBase.readable
435n/a read = self.BufferedIOBase.read
436n/a return self.TextIOWrapper(UnseekableWriter(), "ascii")
437n/a
438n/a tests = (
439n/a (pipe_reader, "fr"), (pipe_writer, "fw"),
440n/a (buffered_reader, "r"), (buffered_writer, "w"),
441n/a (buffered_random, "rws"), (buffered_rw_pair, "rw"),
442n/a (text_reader, "r"), (text_writer, "w"),
443n/a (self.BytesIO, "rws"), (self.StringIO, "rws"),
444n/a )
445n/a for [test, abilities] in tests:
446n/a if test is pipe_writer and not threading:
447n/a continue # Skip subtest that uses a background thread
448n/a with self.subTest(test), test() as obj:
449n/a readable = "r" in abilities
450n/a self.assertEqual(obj.readable(), readable)
451n/a writable = "w" in abilities
452n/a self.assertEqual(obj.writable(), writable)
453n/a
454n/a if isinstance(obj, self.TextIOBase):
455n/a data = "3"
456n/a elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
457n/a data = b"3"
458n/a else:
459n/a self.fail("Unknown base class")
460n/a
461n/a if "f" in abilities:
462n/a obj.fileno()
463n/a else:
464n/a self.assertRaises(OSError, obj.fileno)
465n/a
466n/a if readable:
467n/a obj.read(1)
468n/a obj.read()
469n/a else:
470n/a self.assertRaises(OSError, obj.read, 1)
471n/a self.assertRaises(OSError, obj.read)
472n/a
473n/a if writable:
474n/a obj.write(data)
475n/a else:
476n/a self.assertRaises(OSError, obj.write, data)
477n/a
478n/a if sys.platform.startswith("win") and test in (
479n/a pipe_reader, pipe_writer):
480n/a # Pipes seem to appear as seekable on Windows
481n/a continue
482n/a seekable = "s" in abilities
483n/a self.assertEqual(obj.seekable(), seekable)
484n/a
485n/a if seekable:
486n/a obj.tell()
487n/a obj.seek(0)
488n/a else:
489n/a self.assertRaises(OSError, obj.tell)
490n/a self.assertRaises(OSError, obj.seek, 0)
491n/a
492n/a if writable and seekable:
493n/a obj.truncate()
494n/a obj.truncate(0)
495n/a else:
496n/a self.assertRaises(OSError, obj.truncate)
497n/a self.assertRaises(OSError, obj.truncate, 0)
498n/a
499n/a def test_open_handles_NUL_chars(self):
500n/a fn_with_NUL = 'foo\0bar'
501n/a self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
502n/a
503n/a bytes_fn = bytes(fn_with_NUL, 'ascii')
504n/a with warnings.catch_warnings():
505n/a warnings.simplefilter("ignore", DeprecationWarning)
506n/a self.assertRaises(ValueError, self.open, bytes_fn, 'w')
507n/a
508n/a def test_raw_file_io(self):
509n/a with self.open(support.TESTFN, "wb", buffering=0) as f:
510n/a self.assertEqual(f.readable(), False)
511n/a self.assertEqual(f.writable(), True)
512n/a self.assertEqual(f.seekable(), True)
513n/a self.write_ops(f)
514n/a with self.open(support.TESTFN, "rb", buffering=0) as f:
515n/a self.assertEqual(f.readable(), True)
516n/a self.assertEqual(f.writable(), False)
517n/a self.assertEqual(f.seekable(), True)
518n/a self.read_ops(f)
519n/a
520n/a def test_buffered_file_io(self):
521n/a with self.open(support.TESTFN, "wb") as f:
522n/a self.assertEqual(f.readable(), False)
523n/a self.assertEqual(f.writable(), True)
524n/a self.assertEqual(f.seekable(), True)
525n/a self.write_ops(f)
526n/a with self.open(support.TESTFN, "rb") as f:
527n/a self.assertEqual(f.readable(), True)
528n/a self.assertEqual(f.writable(), False)
529n/a self.assertEqual(f.seekable(), True)
530n/a self.read_ops(f, True)
531n/a
532n/a def test_readline(self):
533n/a with self.open(support.TESTFN, "wb") as f:
534n/a f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
535n/a with self.open(support.TESTFN, "rb") as f:
536n/a self.assertEqual(f.readline(), b"abc\n")
537n/a self.assertEqual(f.readline(10), b"def\n")
538n/a self.assertEqual(f.readline(2), b"xy")
539n/a self.assertEqual(f.readline(4), b"zzy\n")
540n/a self.assertEqual(f.readline(), b"foo\x00bar\n")
541n/a self.assertEqual(f.readline(None), b"another line")
542n/a self.assertRaises(TypeError, f.readline, 5.3)
543n/a with self.open(support.TESTFN, "r") as f:
544n/a self.assertRaises(TypeError, f.readline, 5.3)
545n/a
546n/a def test_raw_bytes_io(self):
547n/a f = self.BytesIO()
548n/a self.write_ops(f)
549n/a data = f.getvalue()
550n/a self.assertEqual(data, b"hello world\n")
551n/a f = self.BytesIO(data)
552n/a self.read_ops(f, True)
553n/a
554n/a def test_large_file_ops(self):
555n/a # On Windows and Mac OSX this test comsumes large resources; It takes
556n/a # a long time to build the >2GB file and takes >2GB of disk space
557n/a # therefore the resource must be enabled to run this test.
558n/a if sys.platform[:3] == 'win' or sys.platform == 'darwin':
559n/a support.requires(
560n/a 'largefile',
561n/a 'test requires %s bytes and a long time to run' % self.LARGE)
562n/a with self.open(support.TESTFN, "w+b", 0) as f:
563n/a self.large_file_ops(f)
564n/a with self.open(support.TESTFN, "w+b") as f:
565n/a self.large_file_ops(f)
566n/a
567n/a def test_with_open(self):
568n/a for bufsize in (0, 1, 100):
569n/a f = None
570n/a with self.open(support.TESTFN, "wb", bufsize) as f:
571n/a f.write(b"xxx")
572n/a self.assertEqual(f.closed, True)
573n/a f = None
574n/a try:
575n/a with self.open(support.TESTFN, "wb", bufsize) as f:
576n/a 1/0
577n/a except ZeroDivisionError:
578n/a self.assertEqual(f.closed, True)
579n/a else:
580n/a self.fail("1/0 didn't raise an exception")
581n/a
582n/a # issue 5008
583n/a def test_append_mode_tell(self):
584n/a with self.open(support.TESTFN, "wb") as f:
585n/a f.write(b"xxx")
586n/a with self.open(support.TESTFN, "ab", buffering=0) as f:
587n/a self.assertEqual(f.tell(), 3)
588n/a with self.open(support.TESTFN, "ab") as f:
589n/a self.assertEqual(f.tell(), 3)
590n/a with self.open(support.TESTFN, "a") as f:
591n/a self.assertGreater(f.tell(), 0)
592n/a
593n/a def test_destructor(self):
594n/a record = []
595n/a class MyFileIO(self.FileIO):
596n/a def __del__(self):
597n/a record.append(1)
598n/a try:
599n/a f = super().__del__
600n/a except AttributeError:
601n/a pass
602n/a else:
603n/a f()
604n/a def close(self):
605n/a record.append(2)
606n/a super().close()
607n/a def flush(self):
608n/a record.append(3)
609n/a super().flush()
610n/a with support.check_warnings(('', ResourceWarning)):
611n/a f = MyFileIO(support.TESTFN, "wb")
612n/a f.write(b"xxx")
613n/a del f
614n/a support.gc_collect()
615n/a self.assertEqual(record, [1, 2, 3])
616n/a with self.open(support.TESTFN, "rb") as f:
617n/a self.assertEqual(f.read(), b"xxx")
618n/a
619n/a def _check_base_destructor(self, base):
620n/a record = []
621n/a class MyIO(base):
622n/a def __init__(self):
623n/a # This exercises the availability of attributes on object
624n/a # destruction.
625n/a # (in the C version, close() is called by the tp_dealloc
626n/a # function, not by __del__)
627n/a self.on_del = 1
628n/a self.on_close = 2
629n/a self.on_flush = 3
630n/a def __del__(self):
631n/a record.append(self.on_del)
632n/a try:
633n/a f = super().__del__
634n/a except AttributeError:
635n/a pass
636n/a else:
637n/a f()
638n/a def close(self):
639n/a record.append(self.on_close)
640n/a super().close()
641n/a def flush(self):
642n/a record.append(self.on_flush)
643n/a super().flush()
644n/a f = MyIO()
645n/a del f
646n/a support.gc_collect()
647n/a self.assertEqual(record, [1, 2, 3])
648n/a
649n/a def test_IOBase_destructor(self):
650n/a self._check_base_destructor(self.IOBase)
651n/a
652n/a def test_RawIOBase_destructor(self):
653n/a self._check_base_destructor(self.RawIOBase)
654n/a
655n/a def test_BufferedIOBase_destructor(self):
656n/a self._check_base_destructor(self.BufferedIOBase)
657n/a
658n/a def test_TextIOBase_destructor(self):
659n/a self._check_base_destructor(self.TextIOBase)
660n/a
661n/a def test_close_flushes(self):
662n/a with self.open(support.TESTFN, "wb") as f:
663n/a f.write(b"xxx")
664n/a with self.open(support.TESTFN, "rb") as f:
665n/a self.assertEqual(f.read(), b"xxx")
666n/a
667n/a def test_array_writes(self):
668n/a a = array.array('i', range(10))
669n/a n = len(a.tobytes())
670n/a def check(f):
671n/a with f:
672n/a self.assertEqual(f.write(a), n)
673n/a f.writelines((a,))
674n/a check(self.BytesIO())
675n/a check(self.FileIO(support.TESTFN, "w"))
676n/a check(self.BufferedWriter(self.MockRawIO()))
677n/a check(self.BufferedRandom(self.MockRawIO()))
678n/a check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
679n/a
680n/a def test_closefd(self):
681n/a self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
682n/a closefd=False)
683n/a
684n/a def test_read_closed(self):
685n/a with self.open(support.TESTFN, "w") as f:
686n/a f.write("egg\n")
687n/a with self.open(support.TESTFN, "r") as f:
688n/a file = self.open(f.fileno(), "r", closefd=False)
689n/a self.assertEqual(file.read(), "egg\n")
690n/a file.seek(0)
691n/a file.close()
692n/a self.assertRaises(ValueError, file.read)
693n/a
694n/a def test_no_closefd_with_filename(self):
695n/a # can't use closefd in combination with a file name
696n/a self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
697n/a
698n/a def test_closefd_attr(self):
699n/a with self.open(support.TESTFN, "wb") as f:
700n/a f.write(b"egg\n")
701n/a with self.open(support.TESTFN, "r") as f:
702n/a self.assertEqual(f.buffer.raw.closefd, True)
703n/a file = self.open(f.fileno(), "r", closefd=False)
704n/a self.assertEqual(file.buffer.raw.closefd, False)
705n/a
706n/a def test_garbage_collection(self):
707n/a # FileIO objects are collected, and collecting them flushes
708n/a # all data to disk.
709n/a with support.check_warnings(('', ResourceWarning)):
710n/a f = self.FileIO(support.TESTFN, "wb")
711n/a f.write(b"abcxxx")
712n/a f.f = f
713n/a wr = weakref.ref(f)
714n/a del f
715n/a support.gc_collect()
716n/a self.assertIsNone(wr(), wr)
717n/a with self.open(support.TESTFN, "rb") as f:
718n/a self.assertEqual(f.read(), b"abcxxx")
719n/a
720n/a def test_unbounded_file(self):
721n/a # Issue #1174606: reading from an unbounded stream such as /dev/zero.
722n/a zero = "/dev/zero"
723n/a if not os.path.exists(zero):
724n/a self.skipTest("{0} does not exist".format(zero))
725n/a if sys.maxsize > 0x7FFFFFFF:
726n/a self.skipTest("test can only run in a 32-bit address space")
727n/a if support.real_max_memuse < support._2G:
728n/a self.skipTest("test requires at least 2GB of memory")
729n/a with self.open(zero, "rb", buffering=0) as f:
730n/a self.assertRaises(OverflowError, f.read)
731n/a with self.open(zero, "rb") as f:
732n/a self.assertRaises(OverflowError, f.read)
733n/a with self.open(zero, "r") as f:
734n/a self.assertRaises(OverflowError, f.read)
735n/a
736n/a def check_flush_error_on_close(self, *args, **kwargs):
737n/a # Test that the file is closed despite failed flush
738n/a # and that flush() is called before file closed.
739n/a f = self.open(*args, **kwargs)
740n/a closed = []
741n/a def bad_flush():
742n/a closed[:] = [f.closed]
743n/a raise OSError()
744n/a f.flush = bad_flush
745n/a self.assertRaises(OSError, f.close) # exception not swallowed
746n/a self.assertTrue(f.closed)
747n/a self.assertTrue(closed) # flush() called
748n/a self.assertFalse(closed[0]) # flush() called before file closed
749n/a f.flush = lambda: None # break reference loop
750n/a
751n/a def test_flush_error_on_close(self):
752n/a # raw file
753n/a # Issue #5700: io.FileIO calls flush() after file closed
754n/a self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
755n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
756n/a self.check_flush_error_on_close(fd, 'wb', buffering=0)
757n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
758n/a self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
759n/a os.close(fd)
760n/a # buffered io
761n/a self.check_flush_error_on_close(support.TESTFN, 'wb')
762n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
763n/a self.check_flush_error_on_close(fd, 'wb')
764n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
765n/a self.check_flush_error_on_close(fd, 'wb', closefd=False)
766n/a os.close(fd)
767n/a # text io
768n/a self.check_flush_error_on_close(support.TESTFN, 'w')
769n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
770n/a self.check_flush_error_on_close(fd, 'w')
771n/a fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
772n/a self.check_flush_error_on_close(fd, 'w', closefd=False)
773n/a os.close(fd)
774n/a
775n/a def test_multi_close(self):
776n/a f = self.open(support.TESTFN, "wb", buffering=0)
777n/a f.close()
778n/a f.close()
779n/a f.close()
780n/a self.assertRaises(ValueError, f.flush)
781n/a
782n/a def test_RawIOBase_read(self):
783n/a # Exercise the default RawIOBase.read() implementation (which calls
784n/a # readinto() internally).
785n/a rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
786n/a self.assertEqual(rawio.read(2), b"ab")
787n/a self.assertEqual(rawio.read(2), b"c")
788n/a self.assertEqual(rawio.read(2), b"d")
789n/a self.assertEqual(rawio.read(2), None)
790n/a self.assertEqual(rawio.read(2), b"ef")
791n/a self.assertEqual(rawio.read(2), b"g")
792n/a self.assertEqual(rawio.read(2), None)
793n/a self.assertEqual(rawio.read(2), b"")
794n/a
795n/a def test_types_have_dict(self):
796n/a test = (
797n/a self.IOBase(),
798n/a self.RawIOBase(),
799n/a self.TextIOBase(),
800n/a self.StringIO(),
801n/a self.BytesIO()
802n/a )
803n/a for obj in test:
804n/a self.assertTrue(hasattr(obj, "__dict__"))
805n/a
806n/a def test_opener(self):
807n/a with self.open(support.TESTFN, "w") as f:
808n/a f.write("egg\n")
809n/a fd = os.open(support.TESTFN, os.O_RDONLY)
810n/a def opener(path, flags):
811n/a return fd
812n/a with self.open("non-existent", "r", opener=opener) as f:
813n/a self.assertEqual(f.read(), "egg\n")
814n/a
815n/a def test_bad_opener_negative_1(self):
816n/a # Issue #27066.
817n/a def badopener(fname, flags):
818n/a return -1
819n/a with self.assertRaises(ValueError) as cm:
820n/a open('non-existent', 'r', opener=badopener)
821n/a self.assertEqual(str(cm.exception), 'opener returned -1')
822n/a
823n/a def test_bad_opener_other_negative(self):
824n/a # Issue #27066.
825n/a def badopener(fname, flags):
826n/a return -2
827n/a with self.assertRaises(ValueError) as cm:
828n/a open('non-existent', 'r', opener=badopener)
829n/a self.assertEqual(str(cm.exception), 'opener returned -2')
830n/a
831n/a def test_fileio_closefd(self):
832n/a # Issue #4841
833n/a with self.open(__file__, 'rb') as f1, \
834n/a self.open(__file__, 'rb') as f2:
835n/a fileio = self.FileIO(f1.fileno(), closefd=False)
836n/a # .__init__() must not close f1
837n/a fileio.__init__(f2.fileno(), closefd=False)
838n/a f1.readline()
839n/a # .close() must not close f2
840n/a fileio.close()
841n/a f2.readline()
842n/a
843n/a def test_nonbuffered_textio(self):
844n/a with support.check_no_resource_warning(self):
845n/a with self.assertRaises(ValueError):
846n/a self.open(support.TESTFN, 'w', buffering=0)
847n/a
848n/a def test_invalid_newline(self):
849n/a with support.check_no_resource_warning(self):
850n/a with self.assertRaises(ValueError):
851n/a self.open(support.TESTFN, 'w', newline='invalid')
852n/a
853n/a def test_buffered_readinto_mixin(self):
854n/a # Test the implementation provided by BufferedIOBase
855n/a class Stream(self.BufferedIOBase):
856n/a def read(self, size):
857n/a return b"12345"
858n/a read1 = read
859n/a stream = Stream()
860n/a for method in ("readinto", "readinto1"):
861n/a with self.subTest(method):
862n/a buffer = byteslike(5)
863n/a self.assertEqual(getattr(stream, method)(buffer), 5)
864n/a self.assertEqual(bytes(buffer), b"12345")
865n/a
866n/a def test_fspath_support(self):
867n/a class PathLike:
868n/a def __init__(self, path):
869n/a self.path = path
870n/a
871n/a def __fspath__(self):
872n/a return self.path
873n/a
874n/a def check_path_succeeds(path):
875n/a with self.open(path, "w") as f:
876n/a f.write("egg\n")
877n/a
878n/a with self.open(path, "r") as f:
879n/a self.assertEqual(f.read(), "egg\n")
880n/a
881n/a check_path_succeeds(PathLike(support.TESTFN))
882n/a check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
883n/a
884n/a bad_path = PathLike(TypeError)
885n/a with self.assertRaises(TypeError):
886n/a self.open(bad_path, 'w')
887n/a
888n/a # ensure that refcounting is correct with some error conditions
889n/a with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
890n/a self.open(PathLike(support.TESTFN), 'rwxa')
891n/a
892n/a
893n/aclass CIOTest(IOTest):
894n/a
895n/a def test_IOBase_finalize(self):
896n/a # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
897n/a # class which inherits IOBase and an object of this class are caught
898n/a # in a reference cycle and close() is already in the method cache.
899n/a class MyIO(self.IOBase):
900n/a def close(self):
901n/a pass
902n/a
903n/a # create an instance to populate the method cache
904n/a MyIO()
905n/a obj = MyIO()
906n/a obj.obj = obj
907n/a wr = weakref.ref(obj)
908n/a del MyIO
909n/a del obj
910n/a support.gc_collect()
911n/a self.assertIsNone(wr(), wr)
912n/a
913n/aclass PyIOTest(IOTest):
914n/a pass
915n/a
916n/a
917n/a@support.cpython_only
918n/aclass APIMismatchTest(unittest.TestCase):
919n/a
920n/a def test_RawIOBase_io_in_pyio_match(self):
921n/a """Test that pyio RawIOBase class has all c RawIOBase methods"""
922n/a mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
923n/a ignore=('__weakref__',))
924n/a self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
925n/a
926n/a def test_RawIOBase_pyio_in_io_match(self):
927n/a """Test that c RawIOBase class has all pyio RawIOBase methods"""
928n/a mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
929n/a self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
930n/a
931n/a
932n/aclass CommonBufferedTests:
933n/a # Tests common to BufferedReader, BufferedWriter and BufferedRandom
934n/a
935n/a def test_detach(self):
936n/a raw = self.MockRawIO()
937n/a buf = self.tp(raw)
938n/a self.assertIs(buf.detach(), raw)
939n/a self.assertRaises(ValueError, buf.detach)
940n/a
941n/a repr(buf) # Should still work
942n/a
943n/a def test_fileno(self):
944n/a rawio = self.MockRawIO()
945n/a bufio = self.tp(rawio)
946n/a
947n/a self.assertEqual(42, bufio.fileno())
948n/a
949n/a def test_invalid_args(self):
950n/a rawio = self.MockRawIO()
951n/a bufio = self.tp(rawio)
952n/a # Invalid whence
953n/a self.assertRaises(ValueError, bufio.seek, 0, -1)
954n/a self.assertRaises(ValueError, bufio.seek, 0, 9)
955n/a
956n/a def test_override_destructor(self):
957n/a tp = self.tp
958n/a record = []
959n/a class MyBufferedIO(tp):
960n/a def __del__(self):
961n/a record.append(1)
962n/a try:
963n/a f = super().__del__
964n/a except AttributeError:
965n/a pass
966n/a else:
967n/a f()
968n/a def close(self):
969n/a record.append(2)
970n/a super().close()
971n/a def flush(self):
972n/a record.append(3)
973n/a super().flush()
974n/a rawio = self.MockRawIO()
975n/a bufio = MyBufferedIO(rawio)
976n/a del bufio
977n/a support.gc_collect()
978n/a self.assertEqual(record, [1, 2, 3])
979n/a
980n/a def test_context_manager(self):
981n/a # Test usability as a context manager
982n/a rawio = self.MockRawIO()
983n/a bufio = self.tp(rawio)
984n/a def _with():
985n/a with bufio:
986n/a pass
987n/a _with()
988n/a # bufio should now be closed, and using it a second time should raise
989n/a # a ValueError.
990n/a self.assertRaises(ValueError, _with)
991n/a
992n/a def test_error_through_destructor(self):
993n/a # Test that the exception state is not modified by a destructor,
994n/a # even if close() fails.
995n/a rawio = self.CloseFailureIO()
996n/a def f():
997n/a self.tp(rawio).xyzzy
998n/a with support.captured_output("stderr") as s:
999n/a self.assertRaises(AttributeError, f)
1000n/a s = s.getvalue().strip()
1001n/a if s:
1002n/a # The destructor *may* have printed an unraisable error, check it
1003n/a self.assertEqual(len(s.splitlines()), 1)
1004n/a self.assertTrue(s.startswith("Exception OSError: "), s)
1005n/a self.assertTrue(s.endswith(" ignored"), s)
1006n/a
1007n/a def test_repr(self):
1008n/a raw = self.MockRawIO()
1009n/a b = self.tp(raw)
1010n/a clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
1011n/a self.assertEqual(repr(b), "<%s>" % clsname)
1012n/a raw.name = "dummy"
1013n/a self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1014n/a raw.name = b"dummy"
1015n/a self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1016n/a
1017n/a def test_flush_error_on_close(self):
1018n/a # Test that buffered file is closed despite failed flush
1019n/a # and that flush() is called before file closed.
1020n/a raw = self.MockRawIO()
1021n/a closed = []
1022n/a def bad_flush():
1023n/a closed[:] = [b.closed, raw.closed]
1024n/a raise OSError()
1025n/a raw.flush = bad_flush
1026n/a b = self.tp(raw)
1027n/a self.assertRaises(OSError, b.close) # exception not swallowed
1028n/a self.assertTrue(b.closed)
1029n/a self.assertTrue(raw.closed)
1030n/a self.assertTrue(closed) # flush() called
1031n/a self.assertFalse(closed[0]) # flush() called before file closed
1032n/a self.assertFalse(closed[1])
1033n/a raw.flush = lambda: None # break reference loop
1034n/a
1035n/a def test_close_error_on_close(self):
1036n/a raw = self.MockRawIO()
1037n/a def bad_flush():
1038n/a raise OSError('flush')
1039n/a def bad_close():
1040n/a raise OSError('close')
1041n/a raw.close = bad_close
1042n/a b = self.tp(raw)
1043n/a b.flush = bad_flush
1044n/a with self.assertRaises(OSError) as err: # exception not swallowed
1045n/a b.close()
1046n/a self.assertEqual(err.exception.args, ('close',))
1047n/a self.assertIsInstance(err.exception.__context__, OSError)
1048n/a self.assertEqual(err.exception.__context__.args, ('flush',))
1049n/a self.assertFalse(b.closed)
1050n/a
1051n/a def test_nonnormalized_close_error_on_close(self):
1052n/a # Issue #21677
1053n/a raw = self.MockRawIO()
1054n/a def bad_flush():
1055n/a raise non_existing_flush
1056n/a def bad_close():
1057n/a raise non_existing_close
1058n/a raw.close = bad_close
1059n/a b = self.tp(raw)
1060n/a b.flush = bad_flush
1061n/a with self.assertRaises(NameError) as err: # exception not swallowed
1062n/a b.close()
1063n/a self.assertIn('non_existing_close', str(err.exception))
1064n/a self.assertIsInstance(err.exception.__context__, NameError)
1065n/a self.assertIn('non_existing_flush', str(err.exception.__context__))
1066n/a self.assertFalse(b.closed)
1067n/a
1068n/a def test_multi_close(self):
1069n/a raw = self.MockRawIO()
1070n/a b = self.tp(raw)
1071n/a b.close()
1072n/a b.close()
1073n/a b.close()
1074n/a self.assertRaises(ValueError, b.flush)
1075n/a
1076n/a def test_unseekable(self):
1077n/a bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1078n/a self.assertRaises(self.UnsupportedOperation, bufio.tell)
1079n/a self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1080n/a
1081n/a def test_readonly_attributes(self):
1082n/a raw = self.MockRawIO()
1083n/a buf = self.tp(raw)
1084n/a x = self.MockRawIO()
1085n/a with self.assertRaises(AttributeError):
1086n/a buf.raw = x
1087n/a
1088n/a
1089n/aclass SizeofTest:
1090n/a
1091n/a @support.cpython_only
1092n/a def test_sizeof(self):
1093n/a bufsize1 = 4096
1094n/a bufsize2 = 8192
1095n/a rawio = self.MockRawIO()
1096n/a bufio = self.tp(rawio, buffer_size=bufsize1)
1097n/a size = sys.getsizeof(bufio) - bufsize1
1098n/a rawio = self.MockRawIO()
1099n/a bufio = self.tp(rawio, buffer_size=bufsize2)
1100n/a self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1101n/a
1102n/a @support.cpython_only
1103n/a def test_buffer_freeing(self) :
1104n/a bufsize = 4096
1105n/a rawio = self.MockRawIO()
1106n/a bufio = self.tp(rawio, buffer_size=bufsize)
1107n/a size = sys.getsizeof(bufio) - bufsize
1108n/a bufio.close()
1109n/a self.assertEqual(sys.getsizeof(bufio), size)
1110n/a
1111n/aclass BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1112n/a read_mode = "rb"
1113n/a
1114n/a def test_constructor(self):
1115n/a rawio = self.MockRawIO([b"abc"])
1116n/a bufio = self.tp(rawio)
1117n/a bufio.__init__(rawio)
1118n/a bufio.__init__(rawio, buffer_size=1024)
1119n/a bufio.__init__(rawio, buffer_size=16)
1120n/a self.assertEqual(b"abc", bufio.read())
1121n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1122n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1123n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1124n/a rawio = self.MockRawIO([b"abc"])
1125n/a bufio.__init__(rawio)
1126n/a self.assertEqual(b"abc", bufio.read())
1127n/a
1128n/a def test_uninitialized(self):
1129n/a bufio = self.tp.__new__(self.tp)
1130n/a del bufio
1131n/a bufio = self.tp.__new__(self.tp)
1132n/a self.assertRaisesRegex((ValueError, AttributeError),
1133n/a 'uninitialized|has no attribute',
1134n/a bufio.read, 0)
1135n/a bufio.__init__(self.MockRawIO())
1136n/a self.assertEqual(bufio.read(0), b'')
1137n/a
1138n/a def test_read(self):
1139n/a for arg in (None, 7):
1140n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1141n/a bufio = self.tp(rawio)
1142n/a self.assertEqual(b"abcdefg", bufio.read(arg))
1143n/a # Invalid args
1144n/a self.assertRaises(ValueError, bufio.read, -2)
1145n/a
1146n/a def test_read1(self):
1147n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1148n/a bufio = self.tp(rawio)
1149n/a self.assertEqual(b"a", bufio.read(1))
1150n/a self.assertEqual(b"b", bufio.read1(1))
1151n/a self.assertEqual(rawio._reads, 1)
1152n/a self.assertEqual(b"", bufio.read1(0))
1153n/a self.assertEqual(b"c", bufio.read1(100))
1154n/a self.assertEqual(rawio._reads, 1)
1155n/a self.assertEqual(b"d", bufio.read1(100))
1156n/a self.assertEqual(rawio._reads, 2)
1157n/a self.assertEqual(b"efg", bufio.read1(100))
1158n/a self.assertEqual(rawio._reads, 3)
1159n/a self.assertEqual(b"", bufio.read1(100))
1160n/a self.assertEqual(rawio._reads, 4)
1161n/a
1162n/a def test_read1_arbitrary(self):
1163n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1164n/a bufio = self.tp(rawio)
1165n/a self.assertEqual(b"a", bufio.read(1))
1166n/a self.assertEqual(b"bc", bufio.read1())
1167n/a self.assertEqual(b"d", bufio.read1())
1168n/a self.assertEqual(b"efg", bufio.read1(-1))
1169n/a self.assertEqual(rawio._reads, 3)
1170n/a self.assertEqual(b"", bufio.read1())
1171n/a self.assertEqual(rawio._reads, 4)
1172n/a
1173n/a def test_readinto(self):
1174n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1175n/a bufio = self.tp(rawio)
1176n/a b = bytearray(2)
1177n/a self.assertEqual(bufio.readinto(b), 2)
1178n/a self.assertEqual(b, b"ab")
1179n/a self.assertEqual(bufio.readinto(b), 2)
1180n/a self.assertEqual(b, b"cd")
1181n/a self.assertEqual(bufio.readinto(b), 2)
1182n/a self.assertEqual(b, b"ef")
1183n/a self.assertEqual(bufio.readinto(b), 1)
1184n/a self.assertEqual(b, b"gf")
1185n/a self.assertEqual(bufio.readinto(b), 0)
1186n/a self.assertEqual(b, b"gf")
1187n/a rawio = self.MockRawIO((b"abc", None))
1188n/a bufio = self.tp(rawio)
1189n/a self.assertEqual(bufio.readinto(b), 2)
1190n/a self.assertEqual(b, b"ab")
1191n/a self.assertEqual(bufio.readinto(b), 1)
1192n/a self.assertEqual(b, b"cb")
1193n/a
1194n/a def test_readinto1(self):
1195n/a buffer_size = 10
1196n/a rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1197n/a bufio = self.tp(rawio, buffer_size=buffer_size)
1198n/a b = bytearray(2)
1199n/a self.assertEqual(bufio.peek(3), b'abc')
1200n/a self.assertEqual(rawio._reads, 1)
1201n/a self.assertEqual(bufio.readinto1(b), 2)
1202n/a self.assertEqual(b, b"ab")
1203n/a self.assertEqual(rawio._reads, 1)
1204n/a self.assertEqual(bufio.readinto1(b), 1)
1205n/a self.assertEqual(b[:1], b"c")
1206n/a self.assertEqual(rawio._reads, 1)
1207n/a self.assertEqual(bufio.readinto1(b), 2)
1208n/a self.assertEqual(b, b"de")
1209n/a self.assertEqual(rawio._reads, 2)
1210n/a b = bytearray(2*buffer_size)
1211n/a self.assertEqual(bufio.peek(3), b'fgh')
1212n/a self.assertEqual(rawio._reads, 3)
1213n/a self.assertEqual(bufio.readinto1(b), 6)
1214n/a self.assertEqual(b[:6], b"fghjkl")
1215n/a self.assertEqual(rawio._reads, 4)
1216n/a
1217n/a def test_readinto_array(self):
1218n/a buffer_size = 60
1219n/a data = b"a" * 26
1220n/a rawio = self.MockRawIO((data,))
1221n/a bufio = self.tp(rawio, buffer_size=buffer_size)
1222n/a
1223n/a # Create an array with element size > 1 byte
1224n/a b = array.array('i', b'x' * 32)
1225n/a assert len(b) != 16
1226n/a
1227n/a # Read into it. We should get as many *bytes* as we can fit into b
1228n/a # (which is more than the number of elements)
1229n/a n = bufio.readinto(b)
1230n/a self.assertGreater(n, len(b))
1231n/a
1232n/a # Check that old contents of b are preserved
1233n/a bm = memoryview(b).cast('B')
1234n/a self.assertLess(n, len(bm))
1235n/a self.assertEqual(bm[:n], data[:n])
1236n/a self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1237n/a
1238n/a def test_readinto1_array(self):
1239n/a buffer_size = 60
1240n/a data = b"a" * 26
1241n/a rawio = self.MockRawIO((data,))
1242n/a bufio = self.tp(rawio, buffer_size=buffer_size)
1243n/a
1244n/a # Create an array with element size > 1 byte
1245n/a b = array.array('i', b'x' * 32)
1246n/a assert len(b) != 16
1247n/a
1248n/a # Read into it. We should get as many *bytes* as we can fit into b
1249n/a # (which is more than the number of elements)
1250n/a n = bufio.readinto1(b)
1251n/a self.assertGreater(n, len(b))
1252n/a
1253n/a # Check that old contents of b are preserved
1254n/a bm = memoryview(b).cast('B')
1255n/a self.assertLess(n, len(bm))
1256n/a self.assertEqual(bm[:n], data[:n])
1257n/a self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1258n/a
1259n/a def test_readlines(self):
1260n/a def bufio():
1261n/a rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1262n/a return self.tp(rawio)
1263n/a self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1264n/a self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1265n/a self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1266n/a
1267n/a def test_buffering(self):
1268n/a data = b"abcdefghi"
1269n/a dlen = len(data)
1270n/a
1271n/a tests = [
1272n/a [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1273n/a [ 100, [ 3, 3, 3], [ dlen ] ],
1274n/a [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1275n/a ]
1276n/a
1277n/a for bufsize, buf_read_sizes, raw_read_sizes in tests:
1278n/a rawio = self.MockFileIO(data)
1279n/a bufio = self.tp(rawio, buffer_size=bufsize)
1280n/a pos = 0
1281n/a for nbytes in buf_read_sizes:
1282n/a self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1283n/a pos += nbytes
1284n/a # this is mildly implementation-dependent
1285n/a self.assertEqual(rawio.read_history, raw_read_sizes)
1286n/a
1287n/a def test_read_non_blocking(self):
1288n/a # Inject some None's in there to simulate EWOULDBLOCK
1289n/a rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1290n/a bufio = self.tp(rawio)
1291n/a self.assertEqual(b"abcd", bufio.read(6))
1292n/a self.assertEqual(b"e", bufio.read(1))
1293n/a self.assertEqual(b"fg", bufio.read())
1294n/a self.assertEqual(b"", bufio.peek(1))
1295n/a self.assertIsNone(bufio.read())
1296n/a self.assertEqual(b"", bufio.read())
1297n/a
1298n/a rawio = self.MockRawIO((b"a", None, None))
1299n/a self.assertEqual(b"a", rawio.readall())
1300n/a self.assertIsNone(rawio.readall())
1301n/a
1302n/a def test_read_past_eof(self):
1303n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1304n/a bufio = self.tp(rawio)
1305n/a
1306n/a self.assertEqual(b"abcdefg", bufio.read(9000))
1307n/a
1308n/a def test_read_all(self):
1309n/a rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1310n/a bufio = self.tp(rawio)
1311n/a
1312n/a self.assertEqual(b"abcdefg", bufio.read())
1313n/a
1314n/a @unittest.skipUnless(threading, 'Threading required for this test.')
1315n/a @support.requires_resource('cpu')
1316n/a def test_threads(self):
1317n/a try:
1318n/a # Write out many bytes with exactly the same number of 0's,
1319n/a # 1's... 255's. This will help us check that concurrent reading
1320n/a # doesn't duplicate or forget contents.
1321n/a N = 1000
1322n/a l = list(range(256)) * N
1323n/a random.shuffle(l)
1324n/a s = bytes(bytearray(l))
1325n/a with self.open(support.TESTFN, "wb") as f:
1326n/a f.write(s)
1327n/a with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1328n/a bufio = self.tp(raw, 8)
1329n/a errors = []
1330n/a results = []
1331n/a def f():
1332n/a try:
1333n/a # Intra-buffer read then buffer-flushing read
1334n/a for n in cycle([1, 19]):
1335n/a s = bufio.read(n)
1336n/a if not s:
1337n/a break
1338n/a # list.append() is atomic
1339n/a results.append(s)
1340n/a except Exception as e:
1341n/a errors.append(e)
1342n/a raise
1343n/a threads = [threading.Thread(target=f) for x in range(20)]
1344n/a with support.start_threads(threads):
1345n/a time.sleep(0.02) # yield
1346n/a self.assertFalse(errors,
1347n/a "the following exceptions were caught: %r" % errors)
1348n/a s = b''.join(results)
1349n/a for i in range(256):
1350n/a c = bytes(bytearray([i]))
1351n/a self.assertEqual(s.count(c), N)
1352n/a finally:
1353n/a support.unlink(support.TESTFN)
1354n/a
1355n/a def test_unseekable(self):
1356n/a bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1357n/a self.assertRaises(self.UnsupportedOperation, bufio.tell)
1358n/a self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1359n/a bufio.read(1)
1360n/a self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1361n/a self.assertRaises(self.UnsupportedOperation, bufio.tell)
1362n/a
1363n/a def test_misbehaved_io(self):
1364n/a rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1365n/a bufio = self.tp(rawio)
1366n/a self.assertRaises(OSError, bufio.seek, 0)
1367n/a self.assertRaises(OSError, bufio.tell)
1368n/a
1369n/a def test_no_extraneous_read(self):
1370n/a # Issue #9550; when the raw IO object has satisfied the read request,
1371n/a # we should not issue any additional reads, otherwise it may block
1372n/a # (e.g. socket).
1373n/a bufsize = 16
1374n/a for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1375n/a rawio = self.MockRawIO([b"x" * n])
1376n/a bufio = self.tp(rawio, bufsize)
1377n/a self.assertEqual(bufio.read(n), b"x" * n)
1378n/a # Simple case: one raw read is enough to satisfy the request.
1379n/a self.assertEqual(rawio._extraneous_reads, 0,
1380n/a "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1381n/a # A more complex case where two raw reads are needed to satisfy
1382n/a # the request.
1383n/a rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1384n/a bufio = self.tp(rawio, bufsize)
1385n/a self.assertEqual(bufio.read(n), b"x" * n)
1386n/a self.assertEqual(rawio._extraneous_reads, 0,
1387n/a "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1388n/a
1389n/a def test_read_on_closed(self):
1390n/a # Issue #23796
1391n/a b = io.BufferedReader(io.BytesIO(b"12"))
1392n/a b.read(1)
1393n/a b.close()
1394n/a self.assertRaises(ValueError, b.peek)
1395n/a self.assertRaises(ValueError, b.read1, 1)
1396n/a
1397n/a
1398n/aclass CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1399n/a tp = io.BufferedReader
1400n/a
1401n/a def test_constructor(self):
1402n/a BufferedReaderTest.test_constructor(self)
1403n/a # The allocation can succeed on 32-bit builds, e.g. with more
1404n/a # than 2GB RAM and a 64-bit kernel.
1405n/a if sys.maxsize > 0x7FFFFFFF:
1406n/a rawio = self.MockRawIO()
1407n/a bufio = self.tp(rawio)
1408n/a self.assertRaises((OverflowError, MemoryError, ValueError),
1409n/a bufio.__init__, rawio, sys.maxsize)
1410n/a
1411n/a def test_initialization(self):
1412n/a rawio = self.MockRawIO([b"abc"])
1413n/a bufio = self.tp(rawio)
1414n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1415n/a self.assertRaises(ValueError, bufio.read)
1416n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1417n/a self.assertRaises(ValueError, bufio.read)
1418n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1419n/a self.assertRaises(ValueError, bufio.read)
1420n/a
1421n/a def test_misbehaved_io_read(self):
1422n/a rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1423n/a bufio = self.tp(rawio)
1424n/a # _pyio.BufferedReader seems to implement reading different, so that
1425n/a # checking this is not so easy.
1426n/a self.assertRaises(OSError, bufio.read, 10)
1427n/a
1428n/a def test_garbage_collection(self):
1429n/a # C BufferedReader objects are collected.
1430n/a # The Python version has __del__, so it ends into gc.garbage instead
1431n/a with support.check_warnings(('', ResourceWarning)):
1432n/a rawio = self.FileIO(support.TESTFN, "w+b")
1433n/a f = self.tp(rawio)
1434n/a f.f = f
1435n/a wr = weakref.ref(f)
1436n/a del f
1437n/a support.gc_collect()
1438n/a self.assertIsNone(wr(), wr)
1439n/a
1440n/a def test_args_error(self):
1441n/a # Issue #17275
1442n/a with self.assertRaisesRegex(TypeError, "BufferedReader"):
1443n/a self.tp(io.BytesIO(), 1024, 1024, 1024)
1444n/a
1445n/a
1446n/aclass PyBufferedReaderTest(BufferedReaderTest):
1447n/a tp = pyio.BufferedReader
1448n/a
1449n/a
1450n/aclass BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1451n/a write_mode = "wb"
1452n/a
1453n/a def test_constructor(self):
1454n/a rawio = self.MockRawIO()
1455n/a bufio = self.tp(rawio)
1456n/a bufio.__init__(rawio)
1457n/a bufio.__init__(rawio, buffer_size=1024)
1458n/a bufio.__init__(rawio, buffer_size=16)
1459n/a self.assertEqual(3, bufio.write(b"abc"))
1460n/a bufio.flush()
1461n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1462n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1463n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1464n/a bufio.__init__(rawio)
1465n/a self.assertEqual(3, bufio.write(b"ghi"))
1466n/a bufio.flush()
1467n/a self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1468n/a
1469n/a def test_uninitialized(self):
1470n/a bufio = self.tp.__new__(self.tp)
1471n/a del bufio
1472n/a bufio = self.tp.__new__(self.tp)
1473n/a self.assertRaisesRegex((ValueError, AttributeError),
1474n/a 'uninitialized|has no attribute',
1475n/a bufio.write, b'')
1476n/a bufio.__init__(self.MockRawIO())
1477n/a self.assertEqual(bufio.write(b''), 0)
1478n/a
1479n/a def test_detach_flush(self):
1480n/a raw = self.MockRawIO()
1481n/a buf = self.tp(raw)
1482n/a buf.write(b"howdy!")
1483n/a self.assertFalse(raw._write_stack)
1484n/a buf.detach()
1485n/a self.assertEqual(raw._write_stack, [b"howdy!"])
1486n/a
1487n/a def test_write(self):
1488n/a # Write to the buffered IO but don't overflow the buffer.
1489n/a writer = self.MockRawIO()
1490n/a bufio = self.tp(writer, 8)
1491n/a bufio.write(b"abc")
1492n/a self.assertFalse(writer._write_stack)
1493n/a buffer = bytearray(b"def")
1494n/a bufio.write(buffer)
1495n/a buffer[:] = b"***" # Overwrite our copy of the data
1496n/a bufio.flush()
1497n/a self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1498n/a
1499n/a def test_write_overflow(self):
1500n/a writer = self.MockRawIO()
1501n/a bufio = self.tp(writer, 8)
1502n/a contents = b"abcdefghijklmnop"
1503n/a for n in range(0, len(contents), 3):
1504n/a bufio.write(contents[n:n+3])
1505n/a flushed = b"".join(writer._write_stack)
1506n/a # At least (total - 8) bytes were implicitly flushed, perhaps more
1507n/a # depending on the implementation.
1508n/a self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1509n/a
1510n/a def check_writes(self, intermediate_func):
1511n/a # Lots of writes, test the flushed output is as expected.
1512n/a contents = bytes(range(256)) * 1000
1513n/a n = 0
1514n/a writer = self.MockRawIO()
1515n/a bufio = self.tp(writer, 13)
1516n/a # Generator of write sizes: repeat each N 15 times then proceed to N+1
1517n/a def gen_sizes():
1518n/a for size in count(1):
1519n/a for i in range(15):
1520n/a yield size
1521n/a sizes = gen_sizes()
1522n/a while n < len(contents):
1523n/a size = min(next(sizes), len(contents) - n)
1524n/a self.assertEqual(bufio.write(contents[n:n+size]), size)
1525n/a intermediate_func(bufio)
1526n/a n += size
1527n/a bufio.flush()
1528n/a self.assertEqual(contents, b"".join(writer._write_stack))
1529n/a
1530n/a def test_writes(self):
1531n/a self.check_writes(lambda bufio: None)
1532n/a
1533n/a def test_writes_and_flushes(self):
1534n/a self.check_writes(lambda bufio: bufio.flush())
1535n/a
1536n/a def test_writes_and_seeks(self):
1537n/a def _seekabs(bufio):
1538n/a pos = bufio.tell()
1539n/a bufio.seek(pos + 1, 0)
1540n/a bufio.seek(pos - 1, 0)
1541n/a bufio.seek(pos, 0)
1542n/a self.check_writes(_seekabs)
1543n/a def _seekrel(bufio):
1544n/a pos = bufio.seek(0, 1)
1545n/a bufio.seek(+1, 1)
1546n/a bufio.seek(-1, 1)
1547n/a bufio.seek(pos, 0)
1548n/a self.check_writes(_seekrel)
1549n/a
1550n/a def test_writes_and_truncates(self):
1551n/a self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1552n/a
1553n/a def test_write_non_blocking(self):
1554n/a raw = self.MockNonBlockWriterIO()
1555n/a bufio = self.tp(raw, 8)
1556n/a
1557n/a self.assertEqual(bufio.write(b"abcd"), 4)
1558n/a self.assertEqual(bufio.write(b"efghi"), 5)
1559n/a # 1 byte will be written, the rest will be buffered
1560n/a raw.block_on(b"k")
1561n/a self.assertEqual(bufio.write(b"jklmn"), 5)
1562n/a
1563n/a # 8 bytes will be written, 8 will be buffered and the rest will be lost
1564n/a raw.block_on(b"0")
1565n/a try:
1566n/a bufio.write(b"opqrwxyz0123456789")
1567n/a except self.BlockingIOError as e:
1568n/a written = e.characters_written
1569n/a else:
1570n/a self.fail("BlockingIOError should have been raised")
1571n/a self.assertEqual(written, 16)
1572n/a self.assertEqual(raw.pop_written(),
1573n/a b"abcdefghijklmnopqrwxyz")
1574n/a
1575n/a self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1576n/a s = raw.pop_written()
1577n/a # Previously buffered bytes were flushed
1578n/a self.assertTrue(s.startswith(b"01234567A"), s)
1579n/a
1580n/a def test_write_and_rewind(self):
1581n/a raw = io.BytesIO()
1582n/a bufio = self.tp(raw, 4)
1583n/a self.assertEqual(bufio.write(b"abcdef"), 6)
1584n/a self.assertEqual(bufio.tell(), 6)
1585n/a bufio.seek(0, 0)
1586n/a self.assertEqual(bufio.write(b"XY"), 2)
1587n/a bufio.seek(6, 0)
1588n/a self.assertEqual(raw.getvalue(), b"XYcdef")
1589n/a self.assertEqual(bufio.write(b"123456"), 6)
1590n/a bufio.flush()
1591n/a self.assertEqual(raw.getvalue(), b"XYcdef123456")
1592n/a
1593n/a def test_flush(self):
1594n/a writer = self.MockRawIO()
1595n/a bufio = self.tp(writer, 8)
1596n/a bufio.write(b"abc")
1597n/a bufio.flush()
1598n/a self.assertEqual(b"abc", writer._write_stack[0])
1599n/a
1600n/a def test_writelines(self):
1601n/a l = [b'ab', b'cd', b'ef']
1602n/a writer = self.MockRawIO()
1603n/a bufio = self.tp(writer, 8)
1604n/a bufio.writelines(l)
1605n/a bufio.flush()
1606n/a self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1607n/a
1608n/a def test_writelines_userlist(self):
1609n/a l = UserList([b'ab', b'cd', b'ef'])
1610n/a writer = self.MockRawIO()
1611n/a bufio = self.tp(writer, 8)
1612n/a bufio.writelines(l)
1613n/a bufio.flush()
1614n/a self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1615n/a
1616n/a def test_writelines_error(self):
1617n/a writer = self.MockRawIO()
1618n/a bufio = self.tp(writer, 8)
1619n/a self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1620n/a self.assertRaises(TypeError, bufio.writelines, None)
1621n/a self.assertRaises(TypeError, bufio.writelines, 'abc')
1622n/a
1623n/a def test_destructor(self):
1624n/a writer = self.MockRawIO()
1625n/a bufio = self.tp(writer, 8)
1626n/a bufio.write(b"abc")
1627n/a del bufio
1628n/a support.gc_collect()
1629n/a self.assertEqual(b"abc", writer._write_stack[0])
1630n/a
1631n/a def test_truncate(self):
1632n/a # Truncate implicitly flushes the buffer.
1633n/a with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1634n/a bufio = self.tp(raw, 8)
1635n/a bufio.write(b"abcdef")
1636n/a self.assertEqual(bufio.truncate(3), 3)
1637n/a self.assertEqual(bufio.tell(), 6)
1638n/a with self.open(support.TESTFN, "rb", buffering=0) as f:
1639n/a self.assertEqual(f.read(), b"abc")
1640n/a
1641n/a @unittest.skipUnless(threading, 'Threading required for this test.')
1642n/a @support.requires_resource('cpu')
1643n/a def test_threads(self):
1644n/a try:
1645n/a # Write out many bytes from many threads and test they were
1646n/a # all flushed.
1647n/a N = 1000
1648n/a contents = bytes(range(256)) * N
1649n/a sizes = cycle([1, 19])
1650n/a n = 0
1651n/a queue = deque()
1652n/a while n < len(contents):
1653n/a size = next(sizes)
1654n/a queue.append(contents[n:n+size])
1655n/a n += size
1656n/a del contents
1657n/a # We use a real file object because it allows us to
1658n/a # exercise situations where the GIL is released before
1659n/a # writing the buffer to the raw streams. This is in addition
1660n/a # to concurrency issues due to switching threads in the middle
1661n/a # of Python code.
1662n/a with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1663n/a bufio = self.tp(raw, 8)
1664n/a errors = []
1665n/a def f():
1666n/a try:
1667n/a while True:
1668n/a try:
1669n/a s = queue.popleft()
1670n/a except IndexError:
1671n/a return
1672n/a bufio.write(s)
1673n/a except Exception as e:
1674n/a errors.append(e)
1675n/a raise
1676n/a threads = [threading.Thread(target=f) for x in range(20)]
1677n/a with support.start_threads(threads):
1678n/a time.sleep(0.02) # yield
1679n/a self.assertFalse(errors,
1680n/a "the following exceptions were caught: %r" % errors)
1681n/a bufio.close()
1682n/a with self.open(support.TESTFN, "rb") as f:
1683n/a s = f.read()
1684n/a for i in range(256):
1685n/a self.assertEqual(s.count(bytes([i])), N)
1686n/a finally:
1687n/a support.unlink(support.TESTFN)
1688n/a
1689n/a def test_misbehaved_io(self):
1690n/a rawio = self.MisbehavedRawIO()
1691n/a bufio = self.tp(rawio, 5)
1692n/a self.assertRaises(OSError, bufio.seek, 0)
1693n/a self.assertRaises(OSError, bufio.tell)
1694n/a self.assertRaises(OSError, bufio.write, b"abcdef")
1695n/a
1696n/a def test_max_buffer_size_removal(self):
1697n/a with self.assertRaises(TypeError):
1698n/a self.tp(self.MockRawIO(), 8, 12)
1699n/a
1700n/a def test_write_error_on_close(self):
1701n/a raw = self.MockRawIO()
1702n/a def bad_write(b):
1703n/a raise OSError()
1704n/a raw.write = bad_write
1705n/a b = self.tp(raw)
1706n/a b.write(b'spam')
1707n/a self.assertRaises(OSError, b.close) # exception not swallowed
1708n/a self.assertTrue(b.closed)
1709n/a
1710n/a
1711n/aclass CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1712n/a tp = io.BufferedWriter
1713n/a
1714n/a def test_constructor(self):
1715n/a BufferedWriterTest.test_constructor(self)
1716n/a # The allocation can succeed on 32-bit builds, e.g. with more
1717n/a # than 2GB RAM and a 64-bit kernel.
1718n/a if sys.maxsize > 0x7FFFFFFF:
1719n/a rawio = self.MockRawIO()
1720n/a bufio = self.tp(rawio)
1721n/a self.assertRaises((OverflowError, MemoryError, ValueError),
1722n/a bufio.__init__, rawio, sys.maxsize)
1723n/a
1724n/a def test_initialization(self):
1725n/a rawio = self.MockRawIO()
1726n/a bufio = self.tp(rawio)
1727n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1728n/a self.assertRaises(ValueError, bufio.write, b"def")
1729n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1730n/a self.assertRaises(ValueError, bufio.write, b"def")
1731n/a self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1732n/a self.assertRaises(ValueError, bufio.write, b"def")
1733n/a
1734n/a def test_garbage_collection(self):
1735n/a # C BufferedWriter objects are collected, and collecting them flushes
1736n/a # all data to disk.
1737n/a # The Python version has __del__, so it ends into gc.garbage instead
1738n/a with support.check_warnings(('', ResourceWarning)):
1739n/a rawio = self.FileIO(support.TESTFN, "w+b")
1740n/a f = self.tp(rawio)
1741n/a f.write(b"123xxx")
1742n/a f.x = f
1743n/a wr = weakref.ref(f)
1744n/a del f
1745n/a support.gc_collect()
1746n/a self.assertIsNone(wr(), wr)
1747n/a with self.open(support.TESTFN, "rb") as f:
1748n/a self.assertEqual(f.read(), b"123xxx")
1749n/a
1750n/a def test_args_error(self):
1751n/a # Issue #17275
1752n/a with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1753n/a self.tp(io.BytesIO(), 1024, 1024, 1024)
1754n/a
1755n/a
1756n/aclass PyBufferedWriterTest(BufferedWriterTest):
1757n/a tp = pyio.BufferedWriter
1758n/a
1759n/aclass BufferedRWPairTest(unittest.TestCase):
1760n/a
1761n/a def test_constructor(self):
1762n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1763n/a self.assertFalse(pair.closed)
1764n/a
1765n/a def test_uninitialized(self):
1766n/a pair = self.tp.__new__(self.tp)
1767n/a del pair
1768n/a pair = self.tp.__new__(self.tp)
1769n/a self.assertRaisesRegex((ValueError, AttributeError),
1770n/a 'uninitialized|has no attribute',
1771n/a pair.read, 0)
1772n/a self.assertRaisesRegex((ValueError, AttributeError),
1773n/a 'uninitialized|has no attribute',
1774n/a pair.write, b'')
1775n/a pair.__init__(self.MockRawIO(), self.MockRawIO())
1776n/a self.assertEqual(pair.read(0), b'')
1777n/a self.assertEqual(pair.write(b''), 0)
1778n/a
1779n/a def test_detach(self):
1780n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1781n/a self.assertRaises(self.UnsupportedOperation, pair.detach)
1782n/a
1783n/a def test_constructor_max_buffer_size_removal(self):
1784n/a with self.assertRaises(TypeError):
1785n/a self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1786n/a
1787n/a def test_constructor_with_not_readable(self):
1788n/a class NotReadable(MockRawIO):
1789n/a def readable(self):
1790n/a return False
1791n/a
1792n/a self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1793n/a
1794n/a def test_constructor_with_not_writeable(self):
1795n/a class NotWriteable(MockRawIO):
1796n/a def writable(self):
1797n/a return False
1798n/a
1799n/a self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1800n/a
1801n/a def test_read(self):
1802n/a pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1803n/a
1804n/a self.assertEqual(pair.read(3), b"abc")
1805n/a self.assertEqual(pair.read(1), b"d")
1806n/a self.assertEqual(pair.read(), b"ef")
1807n/a pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1808n/a self.assertEqual(pair.read(None), b"abc")
1809n/a
1810n/a def test_readlines(self):
1811n/a pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1812n/a self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1813n/a self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1814n/a self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1815n/a
1816n/a def test_read1(self):
1817n/a # .read1() is delegated to the underlying reader object, so this test
1818n/a # can be shallow.
1819n/a pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1820n/a
1821n/a self.assertEqual(pair.read1(3), b"abc")
1822n/a self.assertEqual(pair.read1(), b"def")
1823n/a
1824n/a def test_readinto(self):
1825n/a for method in ("readinto", "readinto1"):
1826n/a with self.subTest(method):
1827n/a pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1828n/a
1829n/a data = byteslike(b'\0' * 5)
1830n/a self.assertEqual(getattr(pair, method)(data), 5)
1831n/a self.assertEqual(bytes(data), b"abcde")
1832n/a
1833n/a def test_write(self):
1834n/a w = self.MockRawIO()
1835n/a pair = self.tp(self.MockRawIO(), w)
1836n/a
1837n/a pair.write(b"abc")
1838n/a pair.flush()
1839n/a buffer = bytearray(b"def")
1840n/a pair.write(buffer)
1841n/a buffer[:] = b"***" # Overwrite our copy of the data
1842n/a pair.flush()
1843n/a self.assertEqual(w._write_stack, [b"abc", b"def"])
1844n/a
1845n/a def test_peek(self):
1846n/a pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1847n/a
1848n/a self.assertTrue(pair.peek(3).startswith(b"abc"))
1849n/a self.assertEqual(pair.read(3), b"abc")
1850n/a
1851n/a def test_readable(self):
1852n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1853n/a self.assertTrue(pair.readable())
1854n/a
1855n/a def test_writeable(self):
1856n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1857n/a self.assertTrue(pair.writable())
1858n/a
1859n/a def test_seekable(self):
1860n/a # BufferedRWPairs are never seekable, even if their readers and writers
1861n/a # are.
1862n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1863n/a self.assertFalse(pair.seekable())
1864n/a
1865n/a # .flush() is delegated to the underlying writer object and has been
1866n/a # tested in the test_write method.
1867n/a
1868n/a def test_close_and_closed(self):
1869n/a pair = self.tp(self.MockRawIO(), self.MockRawIO())
1870n/a self.assertFalse(pair.closed)
1871n/a pair.close()
1872n/a self.assertTrue(pair.closed)
1873n/a
1874n/a def test_reader_close_error_on_close(self):
1875n/a def reader_close():
1876n/a reader_non_existing
1877n/a reader = self.MockRawIO()
1878n/a reader.close = reader_close
1879n/a writer = self.MockRawIO()
1880n/a pair = self.tp(reader, writer)
1881n/a with self.assertRaises(NameError) as err:
1882n/a pair.close()
1883n/a self.assertIn('reader_non_existing', str(err.exception))
1884n/a self.assertTrue(pair.closed)
1885n/a self.assertFalse(reader.closed)
1886n/a self.assertTrue(writer.closed)
1887n/a
1888n/a def test_writer_close_error_on_close(self):
1889n/a def writer_close():
1890n/a writer_non_existing
1891n/a reader = self.MockRawIO()
1892n/a writer = self.MockRawIO()
1893n/a writer.close = writer_close
1894n/a pair = self.tp(reader, writer)
1895n/a with self.assertRaises(NameError) as err:
1896n/a pair.close()
1897n/a self.assertIn('writer_non_existing', str(err.exception))
1898n/a self.assertFalse(pair.closed)
1899n/a self.assertTrue(reader.closed)
1900n/a self.assertFalse(writer.closed)
1901n/a
1902n/a def test_reader_writer_close_error_on_close(self):
1903n/a def reader_close():
1904n/a reader_non_existing
1905n/a def writer_close():
1906n/a writer_non_existing
1907n/a reader = self.MockRawIO()
1908n/a reader.close = reader_close
1909n/a writer = self.MockRawIO()
1910n/a writer.close = writer_close
1911n/a pair = self.tp(reader, writer)
1912n/a with self.assertRaises(NameError) as err:
1913n/a pair.close()
1914n/a self.assertIn('reader_non_existing', str(err.exception))
1915n/a self.assertIsInstance(err.exception.__context__, NameError)
1916n/a self.assertIn('writer_non_existing', str(err.exception.__context__))
1917n/a self.assertFalse(pair.closed)
1918n/a self.assertFalse(reader.closed)
1919n/a self.assertFalse(writer.closed)
1920n/a
1921n/a def test_isatty(self):
1922n/a class SelectableIsAtty(MockRawIO):
1923n/a def __init__(self, isatty):
1924n/a MockRawIO.__init__(self)
1925n/a self._isatty = isatty
1926n/a
1927n/a def isatty(self):
1928n/a return self._isatty
1929n/a
1930n/a pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1931n/a self.assertFalse(pair.isatty())
1932n/a
1933n/a pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1934n/a self.assertTrue(pair.isatty())
1935n/a
1936n/a pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1937n/a self.assertTrue(pair.isatty())
1938n/a
1939n/a pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1940n/a self.assertTrue(pair.isatty())
1941n/a
1942n/a def test_weakref_clearing(self):
1943n/a brw = self.tp(self.MockRawIO(), self.MockRawIO())
1944n/a ref = weakref.ref(brw)
1945n/a brw = None
1946n/a ref = None # Shouldn't segfault.
1947n/a
1948n/aclass CBufferedRWPairTest(BufferedRWPairTest):
1949n/a tp = io.BufferedRWPair
1950n/a
1951n/aclass PyBufferedRWPairTest(BufferedRWPairTest):
1952n/a tp = pyio.BufferedRWPair
1953n/a
1954n/a
1955n/aclass BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1956n/a read_mode = "rb+"
1957n/a write_mode = "wb+"
1958n/a
1959n/a def test_constructor(self):
1960n/a BufferedReaderTest.test_constructor(self)
1961n/a BufferedWriterTest.test_constructor(self)
1962n/a
1963n/a def test_uninitialized(self):
1964n/a BufferedReaderTest.test_uninitialized(self)
1965n/a BufferedWriterTest.test_uninitialized(self)
1966n/a
1967n/a def test_read_and_write(self):
1968n/a raw = self.MockRawIO((b"asdf", b"ghjk"))
1969n/a rw = self.tp(raw, 8)
1970n/a
1971n/a self.assertEqual(b"as", rw.read(2))
1972n/a rw.write(b"ddd")
1973n/a rw.write(b"eee")
1974n/a self.assertFalse(raw._write_stack) # Buffer writes
1975n/a self.assertEqual(b"ghjk", rw.read())
1976n/a self.assertEqual(b"dddeee", raw._write_stack[0])
1977n/a
1978n/a def test_seek_and_tell(self):
1979n/a raw = self.BytesIO(b"asdfghjkl")
1980n/a rw = self.tp(raw)
1981n/a
1982n/a self.assertEqual(b"as", rw.read(2))
1983n/a self.assertEqual(2, rw.tell())
1984n/a rw.seek(0, 0)
1985n/a self.assertEqual(b"asdf", rw.read(4))
1986n/a
1987n/a rw.write(b"123f")
1988n/a rw.seek(0, 0)
1989n/a self.assertEqual(b"asdf123fl", rw.read())
1990n/a self.assertEqual(9, rw.tell())
1991n/a rw.seek(-4, 2)
1992n/a self.assertEqual(5, rw.tell())
1993n/a rw.seek(2, 1)
1994n/a self.assertEqual(7, rw.tell())
1995n/a self.assertEqual(b"fl", rw.read(11))
1996n/a rw.flush()
1997n/a self.assertEqual(b"asdf123fl", raw.getvalue())
1998n/a
1999n/a self.assertRaises(TypeError, rw.seek, 0.0)
2000n/a
2001n/a def check_flush_and_read(self, read_func):
2002n/a raw = self.BytesIO(b"abcdefghi")
2003n/a bufio = self.tp(raw)
2004n/a
2005n/a self.assertEqual(b"ab", read_func(bufio, 2))
2006n/a bufio.write(b"12")
2007n/a self.assertEqual(b"ef", read_func(bufio, 2))
2008n/a self.assertEqual(6, bufio.tell())
2009n/a bufio.flush()
2010n/a self.assertEqual(6, bufio.tell())
2011n/a self.assertEqual(b"ghi", read_func(bufio))
2012n/a raw.seek(0, 0)
2013n/a raw.write(b"XYZ")
2014n/a # flush() resets the read buffer
2015n/a bufio.flush()
2016n/a bufio.seek(0, 0)
2017n/a self.assertEqual(b"XYZ", read_func(bufio, 3))
2018n/a
2019n/a def test_flush_and_read(self):
2020n/a self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2021n/a
2022n/a def test_flush_and_readinto(self):
2023n/a def _readinto(bufio, n=-1):
2024n/a b = bytearray(n if n >= 0 else 9999)
2025n/a n = bufio.readinto(b)
2026n/a return bytes(b[:n])
2027n/a self.check_flush_and_read(_readinto)
2028n/a
2029n/a def test_flush_and_peek(self):
2030n/a def _peek(bufio, n=-1):
2031n/a # This relies on the fact that the buffer can contain the whole
2032n/a # raw stream, otherwise peek() can return less.
2033n/a b = bufio.peek(n)
2034n/a if n != -1:
2035n/a b = b[:n]
2036n/a bufio.seek(len(b), 1)
2037n/a return b
2038n/a self.check_flush_and_read(_peek)
2039n/a
2040n/a def test_flush_and_write(self):
2041n/a raw = self.BytesIO(b"abcdefghi")
2042n/a bufio = self.tp(raw)
2043n/a
2044n/a bufio.write(b"123")
2045n/a bufio.flush()
2046n/a bufio.write(b"45")
2047n/a bufio.flush()
2048n/a bufio.seek(0, 0)
2049n/a self.assertEqual(b"12345fghi", raw.getvalue())
2050n/a self.assertEqual(b"12345fghi", bufio.read())
2051n/a
2052n/a def test_threads(self):
2053n/a BufferedReaderTest.test_threads(self)
2054n/a BufferedWriterTest.test_threads(self)
2055n/a
2056n/a def test_writes_and_peek(self):
2057n/a def _peek(bufio):
2058n/a bufio.peek(1)
2059n/a self.check_writes(_peek)
2060n/a def _peek(bufio):
2061n/a pos = bufio.tell()
2062n/a bufio.seek(-1, 1)
2063n/a bufio.peek(1)
2064n/a bufio.seek(pos, 0)
2065n/a self.check_writes(_peek)
2066n/a
2067n/a def test_writes_and_reads(self):
2068n/a def _read(bufio):
2069n/a bufio.seek(-1, 1)
2070n/a bufio.read(1)
2071n/a self.check_writes(_read)
2072n/a
2073n/a def test_writes_and_read1s(self):
2074n/a def _read1(bufio):
2075n/a bufio.seek(-1, 1)
2076n/a bufio.read1(1)
2077n/a self.check_writes(_read1)
2078n/a
2079n/a def test_writes_and_readintos(self):
2080n/a def _read(bufio):
2081n/a bufio.seek(-1, 1)
2082n/a bufio.readinto(bytearray(1))
2083n/a self.check_writes(_read)
2084n/a
2085n/a def test_write_after_readahead(self):
2086n/a # Issue #6629: writing after the buffer was filled by readahead should
2087n/a # first rewind the raw stream.
2088n/a for overwrite_size in [1, 5]:
2089n/a raw = self.BytesIO(b"A" * 10)
2090n/a bufio = self.tp(raw, 4)
2091n/a # Trigger readahead
2092n/a self.assertEqual(bufio.read(1), b"A")
2093n/a self.assertEqual(bufio.tell(), 1)
2094n/a # Overwriting should rewind the raw stream if it needs so
2095n/a bufio.write(b"B" * overwrite_size)
2096n/a self.assertEqual(bufio.tell(), overwrite_size + 1)
2097n/a # If the write size was smaller than the buffer size, flush() and
2098n/a # check that rewind happens.
2099n/a bufio.flush()
2100n/a self.assertEqual(bufio.tell(), overwrite_size + 1)
2101n/a s = raw.getvalue()
2102n/a self.assertEqual(s,
2103n/a b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2104n/a
2105n/a def test_write_rewind_write(self):
2106n/a # Various combinations of reading / writing / seeking backwards / writing again
2107n/a def mutate(bufio, pos1, pos2):
2108n/a assert pos2 >= pos1
2109n/a # Fill the buffer
2110n/a bufio.seek(pos1)
2111n/a bufio.read(pos2 - pos1)
2112n/a bufio.write(b'\x02')
2113n/a # This writes earlier than the previous write, but still inside
2114n/a # the buffer.
2115n/a bufio.seek(pos1)
2116n/a bufio.write(b'\x01')
2117n/a
2118n/a b = b"\x80\x81\x82\x83\x84"
2119n/a for i in range(0, len(b)):
2120n/a for j in range(i, len(b)):
2121n/a raw = self.BytesIO(b)
2122n/a bufio = self.tp(raw, 100)
2123n/a mutate(bufio, i, j)
2124n/a bufio.flush()
2125n/a expected = bytearray(b)
2126n/a expected[j] = 2
2127n/a expected[i] = 1
2128n/a self.assertEqual(raw.getvalue(), expected,
2129n/a "failed result for i=%d, j=%d" % (i, j))
2130n/a
2131n/a def test_truncate_after_read_or_write(self):
2132n/a raw = self.BytesIO(b"A" * 10)
2133n/a bufio = self.tp(raw, 100)
2134n/a self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2135n/a self.assertEqual(bufio.truncate(), 2)
2136n/a self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2137n/a self.assertEqual(bufio.truncate(), 4)
2138n/a
2139n/a def test_misbehaved_io(self):
2140n/a BufferedReaderTest.test_misbehaved_io(self)
2141n/a BufferedWriterTest.test_misbehaved_io(self)
2142n/a
2143n/a def test_interleaved_read_write(self):
2144n/a # Test for issue #12213
2145n/a with self.BytesIO(b'abcdefgh') as raw:
2146n/a with self.tp(raw, 100) as f:
2147n/a f.write(b"1")
2148n/a self.assertEqual(f.read(1), b'b')
2149n/a f.write(b'2')
2150n/a self.assertEqual(f.read1(1), b'd')
2151n/a f.write(b'3')
2152n/a buf = bytearray(1)
2153n/a f.readinto(buf)
2154n/a self.assertEqual(buf, b'f')
2155n/a f.write(b'4')
2156n/a self.assertEqual(f.peek(1), b'h')
2157n/a f.flush()
2158n/a self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2159n/a
2160n/a with self.BytesIO(b'abc') as raw:
2161n/a with self.tp(raw, 100) as f:
2162n/a self.assertEqual(f.read(1), b'a')
2163n/a f.write(b"2")
2164n/a self.assertEqual(f.read(1), b'c')
2165n/a f.flush()
2166n/a self.assertEqual(raw.getvalue(), b'a2c')
2167n/a
2168n/a def test_interleaved_readline_write(self):
2169n/a with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2170n/a with self.tp(raw) as f:
2171n/a f.write(b'1')
2172n/a self.assertEqual(f.readline(), b'b\n')
2173n/a f.write(b'2')
2174n/a self.assertEqual(f.readline(), b'def\n')
2175n/a f.write(b'3')
2176n/a self.assertEqual(f.readline(), b'\n')
2177n/a f.flush()
2178n/a self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2179n/a
2180n/a # You can't construct a BufferedRandom over a non-seekable stream.
2181n/a test_unseekable = None
2182n/a
2183n/a
2184n/aclass CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2185n/a tp = io.BufferedRandom
2186n/a
2187n/a def test_constructor(self):
2188n/a BufferedRandomTest.test_constructor(self)
2189n/a # The allocation can succeed on 32-bit builds, e.g. with more
2190n/a # than 2GB RAM and a 64-bit kernel.
2191n/a if sys.maxsize > 0x7FFFFFFF:
2192n/a rawio = self.MockRawIO()
2193n/a bufio = self.tp(rawio)
2194n/a self.assertRaises((OverflowError, MemoryError, ValueError),
2195n/a bufio.__init__, rawio, sys.maxsize)
2196n/a
2197n/a def test_garbage_collection(self):
2198n/a CBufferedReaderTest.test_garbage_collection(self)
2199n/a CBufferedWriterTest.test_garbage_collection(self)
2200n/a
2201n/a def test_args_error(self):
2202n/a # Issue #17275
2203n/a with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2204n/a self.tp(io.BytesIO(), 1024, 1024, 1024)
2205n/a
2206n/a
2207n/aclass PyBufferedRandomTest(BufferedRandomTest):
2208n/a tp = pyio.BufferedRandom
2209n/a
2210n/a
2211n/a# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2212n/a# properties:
2213n/a# - A single output character can correspond to many bytes of input.
2214n/a# - The number of input bytes to complete the character can be
2215n/a# undetermined until the last input byte is received.
2216n/a# - The number of input bytes can vary depending on previous input.
2217n/a# - A single input byte can correspond to many characters of output.
2218n/a# - The number of output characters can be undetermined until the
2219n/a# last input byte is received.
2220n/a# - The number of output characters can vary depending on previous input.
2221n/a
2222n/aclass StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2223n/a """
2224n/a For testing seek/tell behavior with a stateful, buffering decoder.
2225n/a
2226n/a Input is a sequence of words. Words may be fixed-length (length set
2227n/a by input) or variable-length (period-terminated). In variable-length
2228n/a mode, extra periods are ignored. Possible words are:
2229n/a - 'i' followed by a number sets the input length, I (maximum 99).
2230n/a When I is set to 0, words are space-terminated.
2231n/a - 'o' followed by a number sets the output length, O (maximum 99).
2232n/a - Any other word is converted into a word followed by a period on
2233n/a the output. The output word consists of the input word truncated
2234n/a or padded out with hyphens to make its length equal to O. If O
2235n/a is 0, the word is output verbatim without truncating or padding.
2236n/a I and O are initially set to 1. When I changes, any buffered input is
2237n/a re-scanned according to the new I. EOF also terminates the last word.
2238n/a """
2239n/a
2240n/a def __init__(self, errors='strict'):
2241n/a codecs.IncrementalDecoder.__init__(self, errors)
2242n/a self.reset()
2243n/a
2244n/a def __repr__(self):
2245n/a return '<SID %x>' % id(self)
2246n/a
2247n/a def reset(self):
2248n/a self.i = 1
2249n/a self.o = 1
2250n/a self.buffer = bytearray()
2251n/a
2252n/a def getstate(self):
2253n/a i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2254n/a return bytes(self.buffer), i*100 + o
2255n/a
2256n/a def setstate(self, state):
2257n/a buffer, io = state
2258n/a self.buffer = bytearray(buffer)
2259n/a i, o = divmod(io, 100)
2260n/a self.i, self.o = i ^ 1, o ^ 1
2261n/a
2262n/a def decode(self, input, final=False):
2263n/a output = ''
2264n/a for b in input:
2265n/a if self.i == 0: # variable-length, terminated with period
2266n/a if b == ord('.'):
2267n/a if self.buffer:
2268n/a output += self.process_word()
2269n/a else:
2270n/a self.buffer.append(b)
2271n/a else: # fixed-length, terminate after self.i bytes
2272n/a self.buffer.append(b)
2273n/a if len(self.buffer) == self.i:
2274n/a output += self.process_word()
2275n/a if final and self.buffer: # EOF terminates the last word
2276n/a output += self.process_word()
2277n/a return output
2278n/a
2279n/a def process_word(self):
2280n/a output = ''
2281n/a if self.buffer[0] == ord('i'):
2282n/a self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2283n/a elif self.buffer[0] == ord('o'):
2284n/a self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2285n/a else:
2286n/a output = self.buffer.decode('ascii')
2287n/a if len(output) < self.o:
2288n/a output += '-'*self.o # pad out with hyphens
2289n/a if self.o:
2290n/a output = output[:self.o] # truncate to output length
2291n/a output += '.'
2292n/a self.buffer = bytearray()
2293n/a return output
2294n/a
2295n/a codecEnabled = False
2296n/a
2297n/a @classmethod
2298n/a def lookupTestDecoder(cls, name):
2299n/a if cls.codecEnabled and name == 'test_decoder':
2300n/a latin1 = codecs.lookup('latin-1')
2301n/a return codecs.CodecInfo(
2302n/a name='test_decoder', encode=latin1.encode, decode=None,
2303n/a incrementalencoder=None,
2304n/a streamreader=None, streamwriter=None,
2305n/a incrementaldecoder=cls)
2306n/a
2307n/a# Register the previous decoder for testing.
2308n/a# Disabled by default, tests will enable it.
2309n/acodecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2310n/a
2311n/a
2312n/aclass StatefulIncrementalDecoderTest(unittest.TestCase):
2313n/a """
2314n/a Make sure the StatefulIncrementalDecoder actually works.
2315n/a """
2316n/a
2317n/a test_cases = [
2318n/a # I=1, O=1 (fixed-length input == fixed-length output)
2319n/a (b'abcd', False, 'a.b.c.d.'),
2320n/a # I=0, O=0 (variable-length input, variable-length output)
2321n/a (b'oiabcd', True, 'abcd.'),
2322n/a # I=0, O=0 (should ignore extra periods)
2323n/a (b'oi...abcd...', True, 'abcd.'),
2324n/a # I=0, O=6 (variable-length input, fixed-length output)
2325n/a (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2326n/a # I=2, O=6 (fixed-length input < fixed-length output)
2327n/a (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2328n/a # I=6, O=3 (fixed-length input > fixed-length output)
2329n/a (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2330n/a # I=0, then 3; O=29, then 15 (with longer output)
2331n/a (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2332n/a 'a----------------------------.' +
2333n/a 'b----------------------------.' +
2334n/a 'cde--------------------------.' +
2335n/a 'abcdefghijabcde.' +
2336n/a 'a.b------------.' +
2337n/a '.c.------------.' +
2338n/a 'd.e------------.' +
2339n/a 'k--------------.' +
2340n/a 'l--------------.' +
2341n/a 'm--------------.')
2342n/a ]
2343n/a
2344n/a def test_decoder(self):
2345n/a # Try a few one-shot test cases.
2346n/a for input, eof, output in self.test_cases:
2347n/a d = StatefulIncrementalDecoder()
2348n/a self.assertEqual(d.decode(input, eof), output)
2349n/a
2350n/a # Also test an unfinished decode, followed by forcing EOF.
2351n/a d = StatefulIncrementalDecoder()
2352n/a self.assertEqual(d.decode(b'oiabcd'), '')
2353n/a self.assertEqual(d.decode(b'', 1), 'abcd.')
2354n/a
2355n/aclass TextIOWrapperTest(unittest.TestCase):
2356n/a
2357n/a def setUp(self):
2358n/a self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2359n/a self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2360n/a support.unlink(support.TESTFN)
2361n/a
2362n/a def tearDown(self):
2363n/a support.unlink(support.TESTFN)
2364n/a
2365n/a def test_constructor(self):
2366n/a r = self.BytesIO(b"\xc3\xa9\n\n")
2367n/a b = self.BufferedReader(r, 1000)
2368n/a t = self.TextIOWrapper(b)
2369n/a t.__init__(b, encoding="latin-1", newline="\r\n")
2370n/a self.assertEqual(t.encoding, "latin-1")
2371n/a self.assertEqual(t.line_buffering, False)
2372n/a t.__init__(b, encoding="utf-8", line_buffering=True)
2373n/a self.assertEqual(t.encoding, "utf-8")
2374n/a self.assertEqual(t.line_buffering, True)
2375n/a self.assertEqual("\xe9\n", t.readline())
2376n/a self.assertRaises(TypeError, t.__init__, b, newline=42)
2377n/a self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2378n/a
2379n/a def test_uninitialized(self):
2380n/a t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2381n/a del t
2382n/a t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2383n/a self.assertRaises(Exception, repr, t)
2384n/a self.assertRaisesRegex((ValueError, AttributeError),
2385n/a 'uninitialized|has no attribute',
2386n/a t.read, 0)
2387n/a t.__init__(self.MockRawIO())
2388n/a self.assertEqual(t.read(0), '')
2389n/a
2390n/a def test_non_text_encoding_codecs_are_rejected(self):
2391n/a # Ensure the constructor complains if passed a codec that isn't
2392n/a # marked as a text encoding
2393n/a # http://bugs.python.org/issue20404
2394n/a r = self.BytesIO()
2395n/a b = self.BufferedWriter(r)
2396n/a with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2397n/a self.TextIOWrapper(b, encoding="hex")
2398n/a
2399n/a def test_detach(self):
2400n/a r = self.BytesIO()
2401n/a b = self.BufferedWriter(r)
2402n/a t = self.TextIOWrapper(b)
2403n/a self.assertIs(t.detach(), b)
2404n/a
2405n/a t = self.TextIOWrapper(b, encoding="ascii")
2406n/a t.write("howdy")
2407n/a self.assertFalse(r.getvalue())
2408n/a t.detach()
2409n/a self.assertEqual(r.getvalue(), b"howdy")
2410n/a self.assertRaises(ValueError, t.detach)
2411n/a
2412n/a # Operations independent of the detached stream should still work
2413n/a repr(t)
2414n/a self.assertEqual(t.encoding, "ascii")
2415n/a self.assertEqual(t.errors, "strict")
2416n/a self.assertFalse(t.line_buffering)
2417n/a
2418n/a def test_repr(self):
2419n/a raw = self.BytesIO("hello".encode("utf-8"))
2420n/a b = self.BufferedReader(raw)
2421n/a t = self.TextIOWrapper(b, encoding="utf-8")
2422n/a modname = self.TextIOWrapper.__module__
2423n/a self.assertEqual(repr(t),
2424n/a "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2425n/a raw.name = "dummy"
2426n/a self.assertEqual(repr(t),
2427n/a "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2428n/a t.mode = "r"
2429n/a self.assertEqual(repr(t),
2430n/a "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2431n/a raw.name = b"dummy"
2432n/a self.assertEqual(repr(t),
2433n/a "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2434n/a
2435n/a t.buffer.detach()
2436n/a repr(t) # Should not raise an exception
2437n/a
2438n/a def test_line_buffering(self):
2439n/a r = self.BytesIO()
2440n/a b = self.BufferedWriter(r, 1000)
2441n/a t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2442n/a t.write("X")
2443n/a self.assertEqual(r.getvalue(), b"") # No flush happened
2444n/a t.write("Y\nZ")
2445n/a self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
2446n/a t.write("A\rB")
2447n/a self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2448n/a
2449n/a def test_default_encoding(self):
2450n/a old_environ = dict(os.environ)
2451n/a try:
2452n/a # try to get a user preferred encoding different than the current
2453n/a # locale encoding to check that TextIOWrapper() uses the current
2454n/a # locale encoding and not the user preferred encoding
2455n/a for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2456n/a if key in os.environ:
2457n/a del os.environ[key]
2458n/a
2459n/a current_locale_encoding = locale.getpreferredencoding(False)
2460n/a b = self.BytesIO()
2461n/a t = self.TextIOWrapper(b)
2462n/a self.assertEqual(t.encoding, current_locale_encoding)
2463n/a finally:
2464n/a os.environ.clear()
2465n/a os.environ.update(old_environ)
2466n/a
2467n/a @support.cpython_only
2468n/a def test_device_encoding(self):
2469n/a # Issue 15989
2470n/a import _testcapi
2471n/a b = self.BytesIO()
2472n/a b.fileno = lambda: _testcapi.INT_MAX + 1
2473n/a self.assertRaises(OverflowError, self.TextIOWrapper, b)
2474n/a b.fileno = lambda: _testcapi.UINT_MAX + 1
2475n/a self.assertRaises(OverflowError, self.TextIOWrapper, b)
2476n/a
2477n/a def test_encoding(self):
2478n/a # Check the encoding attribute is always set, and valid
2479n/a b = self.BytesIO()
2480n/a t = self.TextIOWrapper(b, encoding="utf-8")
2481n/a self.assertEqual(t.encoding, "utf-8")
2482n/a t = self.TextIOWrapper(b)
2483n/a self.assertIsNotNone(t.encoding)
2484n/a codecs.lookup(t.encoding)
2485n/a
2486n/a def test_encoding_errors_reading(self):
2487n/a # (1) default
2488n/a b = self.BytesIO(b"abc\n\xff\n")
2489n/a t = self.TextIOWrapper(b, encoding="ascii")
2490n/a self.assertRaises(UnicodeError, t.read)
2491n/a # (2) explicit strict
2492n/a b = self.BytesIO(b"abc\n\xff\n")
2493n/a t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2494n/a self.assertRaises(UnicodeError, t.read)
2495n/a # (3) ignore
2496n/a b = self.BytesIO(b"abc\n\xff\n")
2497n/a t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2498n/a self.assertEqual(t.read(), "abc\n\n")
2499n/a # (4) replace
2500n/a b = self.BytesIO(b"abc\n\xff\n")
2501n/a t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2502n/a self.assertEqual(t.read(), "abc\n\ufffd\n")
2503n/a
2504n/a def test_encoding_errors_writing(self):
2505n/a # (1) default
2506n/a b = self.BytesIO()
2507n/a t = self.TextIOWrapper(b, encoding="ascii")
2508n/a self.assertRaises(UnicodeError, t.write, "\xff")
2509n/a # (2) explicit strict
2510n/a b = self.BytesIO()
2511n/a t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2512n/a self.assertRaises(UnicodeError, t.write, "\xff")
2513n/a # (3) ignore
2514n/a b = self.BytesIO()
2515n/a t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2516n/a newline="\n")
2517n/a t.write("abc\xffdef\n")
2518n/a t.flush()
2519n/a self.assertEqual(b.getvalue(), b"abcdef\n")
2520n/a # (4) replace
2521n/a b = self.BytesIO()
2522n/a t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2523n/a newline="\n")
2524n/a t.write("abc\xffdef\n")
2525n/a t.flush()
2526n/a self.assertEqual(b.getvalue(), b"abc?def\n")
2527n/a
2528n/a def test_newlines(self):
2529n/a input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2530n/a
2531n/a tests = [
2532n/a [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2533n/a [ '', input_lines ],
2534n/a [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2535n/a [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2536n/a [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2537n/a ]
2538n/a encodings = (
2539n/a 'utf-8', 'latin-1',
2540n/a 'utf-16', 'utf-16-le', 'utf-16-be',
2541n/a 'utf-32', 'utf-32-le', 'utf-32-be',
2542n/a )
2543n/a
2544n/a # Try a range of buffer sizes to test the case where \r is the last
2545n/a # character in TextIOWrapper._pending_line.
2546n/a for encoding in encodings:
2547n/a # XXX: str.encode() should return bytes
2548n/a data = bytes(''.join(input_lines).encode(encoding))
2549n/a for do_reads in (False, True):
2550n/a for bufsize in range(1, 10):
2551n/a for newline, exp_lines in tests:
2552n/a bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2553n/a textio = self.TextIOWrapper(bufio, newline=newline,
2554n/a encoding=encoding)
2555n/a if do_reads:
2556n/a got_lines = []
2557n/a while True:
2558n/a c2 = textio.read(2)
2559n/a if c2 == '':
2560n/a break
2561n/a self.assertEqual(len(c2), 2)
2562n/a got_lines.append(c2 + textio.readline())
2563n/a else:
2564n/a got_lines = list(textio)
2565n/a
2566n/a for got_line, exp_line in zip(got_lines, exp_lines):
2567n/a self.assertEqual(got_line, exp_line)
2568n/a self.assertEqual(len(got_lines), len(exp_lines))
2569n/a
2570n/a def test_newlines_input(self):
2571n/a testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2572n/a normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2573n/a for newline, expected in [
2574n/a (None, normalized.decode("ascii").splitlines(keepends=True)),
2575n/a ("", testdata.decode("ascii").splitlines(keepends=True)),
2576n/a ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2577n/a ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2578n/a ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2579n/a ]:
2580n/a buf = self.BytesIO(testdata)
2581n/a txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2582n/a self.assertEqual(txt.readlines(), expected)
2583n/a txt.seek(0)
2584n/a self.assertEqual(txt.read(), "".join(expected))
2585n/a
2586n/a def test_newlines_output(self):
2587n/a testdict = {
2588n/a "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2589n/a "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2590n/a "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2591n/a "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2592n/a }
2593n/a tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2594n/a for newline, expected in tests:
2595n/a buf = self.BytesIO()
2596n/a txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2597n/a txt.write("AAA\nB")
2598n/a txt.write("BB\nCCC\n")
2599n/a txt.write("X\rY\r\nZ")
2600n/a txt.flush()
2601n/a self.assertEqual(buf.closed, False)
2602n/a self.assertEqual(buf.getvalue(), expected)
2603n/a
2604n/a def test_destructor(self):
2605n/a l = []
2606n/a base = self.BytesIO
2607n/a class MyBytesIO(base):
2608n/a def close(self):
2609n/a l.append(self.getvalue())
2610n/a base.close(self)
2611n/a b = MyBytesIO()
2612n/a t = self.TextIOWrapper(b, encoding="ascii")
2613n/a t.write("abc")
2614n/a del t
2615n/a support.gc_collect()
2616n/a self.assertEqual([b"abc"], l)
2617n/a
2618n/a def test_override_destructor(self):
2619n/a record = []
2620n/a class MyTextIO(self.TextIOWrapper):
2621n/a def __del__(self):
2622n/a record.append(1)
2623n/a try:
2624n/a f = super().__del__
2625n/a except AttributeError:
2626n/a pass
2627n/a else:
2628n/a f()
2629n/a def close(self):
2630n/a record.append(2)
2631n/a super().close()
2632n/a def flush(self):
2633n/a record.append(3)
2634n/a super().flush()
2635n/a b = self.BytesIO()
2636n/a t = MyTextIO(b, encoding="ascii")
2637n/a del t
2638n/a support.gc_collect()
2639n/a self.assertEqual(record, [1, 2, 3])
2640n/a
2641n/a def test_error_through_destructor(self):
2642n/a # Test that the exception state is not modified by a destructor,
2643n/a # even if close() fails.
2644n/a rawio = self.CloseFailureIO()
2645n/a def f():
2646n/a self.TextIOWrapper(rawio).xyzzy
2647n/a with support.captured_output("stderr") as s:
2648n/a self.assertRaises(AttributeError, f)
2649n/a s = s.getvalue().strip()
2650n/a if s:
2651n/a # The destructor *may* have printed an unraisable error, check it
2652n/a self.assertEqual(len(s.splitlines()), 1)
2653n/a self.assertTrue(s.startswith("Exception OSError: "), s)
2654n/a self.assertTrue(s.endswith(" ignored"), s)
2655n/a
2656n/a # Systematic tests of the text I/O API
2657n/a
2658n/a def test_basic_io(self):
2659n/a for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2660n/a for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2661n/a f = self.open(support.TESTFN, "w+", encoding=enc)
2662n/a f._CHUNK_SIZE = chunksize
2663n/a self.assertEqual(f.write("abc"), 3)
2664n/a f.close()
2665n/a f = self.open(support.TESTFN, "r+", encoding=enc)
2666n/a f._CHUNK_SIZE = chunksize
2667n/a self.assertEqual(f.tell(), 0)
2668n/a self.assertEqual(f.read(), "abc")
2669n/a cookie = f.tell()
2670n/a self.assertEqual(f.seek(0), 0)
2671n/a self.assertEqual(f.read(None), "abc")
2672n/a f.seek(0)
2673n/a self.assertEqual(f.read(2), "ab")
2674n/a self.assertEqual(f.read(1), "c")
2675n/a self.assertEqual(f.read(1), "")
2676n/a self.assertEqual(f.read(), "")
2677n/a self.assertEqual(f.tell(), cookie)
2678n/a self.assertEqual(f.seek(0), 0)
2679n/a self.assertEqual(f.seek(0, 2), cookie)
2680n/a self.assertEqual(f.write("def"), 3)
2681n/a self.assertEqual(f.seek(cookie), cookie)
2682n/a self.assertEqual(f.read(), "def")
2683n/a if enc.startswith("utf"):
2684n/a self.multi_line_test(f, enc)
2685n/a f.close()
2686n/a
2687n/a def multi_line_test(self, f, enc):
2688n/a f.seek(0)
2689n/a f.truncate()
2690n/a sample = "s\xff\u0fff\uffff"
2691n/a wlines = []
2692n/a for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2693n/a chars = []
2694n/a for i in range(size):
2695n/a chars.append(sample[i % len(sample)])
2696n/a line = "".join(chars) + "\n"
2697n/a wlines.append((f.tell(), line))
2698n/a f.write(line)
2699n/a f.seek(0)
2700n/a rlines = []
2701n/a while True:
2702n/a pos = f.tell()
2703n/a line = f.readline()
2704n/a if not line:
2705n/a break
2706n/a rlines.append((pos, line))
2707n/a self.assertEqual(rlines, wlines)
2708n/a
2709n/a def test_telling(self):
2710n/a f = self.open(support.TESTFN, "w+", encoding="utf-8")
2711n/a p0 = f.tell()
2712n/a f.write("\xff\n")
2713n/a p1 = f.tell()
2714n/a f.write("\xff\n")
2715n/a p2 = f.tell()
2716n/a f.seek(0)
2717n/a self.assertEqual(f.tell(), p0)
2718n/a self.assertEqual(f.readline(), "\xff\n")
2719n/a self.assertEqual(f.tell(), p1)
2720n/a self.assertEqual(f.readline(), "\xff\n")
2721n/a self.assertEqual(f.tell(), p2)
2722n/a f.seek(0)
2723n/a for line in f:
2724n/a self.assertEqual(line, "\xff\n")
2725n/a self.assertRaises(OSError, f.tell)
2726n/a self.assertEqual(f.tell(), p2)
2727n/a f.close()
2728n/a
2729n/a def test_seeking(self):
2730n/a chunk_size = _default_chunk_size()
2731n/a prefix_size = chunk_size - 2
2732n/a u_prefix = "a" * prefix_size
2733n/a prefix = bytes(u_prefix.encode("utf-8"))
2734n/a self.assertEqual(len(u_prefix), len(prefix))
2735n/a u_suffix = "\u8888\n"
2736n/a suffix = bytes(u_suffix.encode("utf-8"))
2737n/a line = prefix + suffix
2738n/a with self.open(support.TESTFN, "wb") as f:
2739n/a f.write(line*2)
2740n/a with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2741n/a s = f.read(prefix_size)
2742n/a self.assertEqual(s, str(prefix, "ascii"))
2743n/a self.assertEqual(f.tell(), prefix_size)
2744n/a self.assertEqual(f.readline(), u_suffix)
2745n/a
2746n/a def test_seeking_too(self):
2747n/a # Regression test for a specific bug
2748n/a data = b'\xe0\xbf\xbf\n'
2749n/a with self.open(support.TESTFN, "wb") as f:
2750n/a f.write(data)
2751n/a with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2752n/a f._CHUNK_SIZE # Just test that it exists
2753n/a f._CHUNK_SIZE = 2
2754n/a f.readline()
2755n/a f.tell()
2756n/a
2757n/a def test_seek_and_tell(self):
2758n/a #Test seek/tell using the StatefulIncrementalDecoder.
2759n/a # Make test faster by doing smaller seeks
2760n/a CHUNK_SIZE = 128
2761n/a
2762n/a def test_seek_and_tell_with_data(data, min_pos=0):
2763n/a """Tell/seek to various points within a data stream and ensure
2764n/a that the decoded data returned by read() is consistent."""
2765n/a f = self.open(support.TESTFN, 'wb')
2766n/a f.write(data)
2767n/a f.close()
2768n/a f = self.open(support.TESTFN, encoding='test_decoder')
2769n/a f._CHUNK_SIZE = CHUNK_SIZE
2770n/a decoded = f.read()
2771n/a f.close()
2772n/a
2773n/a for i in range(min_pos, len(decoded) + 1): # seek positions
2774n/a for j in [1, 5, len(decoded) - i]: # read lengths
2775n/a f = self.open(support.TESTFN, encoding='test_decoder')
2776n/a self.assertEqual(f.read(i), decoded[:i])
2777n/a cookie = f.tell()
2778n/a self.assertEqual(f.read(j), decoded[i:i + j])
2779n/a f.seek(cookie)
2780n/a self.assertEqual(f.read(), decoded[i:])
2781n/a f.close()
2782n/a
2783n/a # Enable the test decoder.
2784n/a StatefulIncrementalDecoder.codecEnabled = 1
2785n/a
2786n/a # Run the tests.
2787n/a try:
2788n/a # Try each test case.
2789n/a for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2790n/a test_seek_and_tell_with_data(input)
2791n/a
2792n/a # Position each test case so that it crosses a chunk boundary.
2793n/a for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2794n/a offset = CHUNK_SIZE - len(input)//2
2795n/a prefix = b'.'*offset
2796n/a # Don't bother seeking into the prefix (takes too long).
2797n/a min_pos = offset*2
2798n/a test_seek_and_tell_with_data(prefix + input, min_pos)
2799n/a
2800n/a # Ensure our test decoder won't interfere with subsequent tests.
2801n/a finally:
2802n/a StatefulIncrementalDecoder.codecEnabled = 0
2803n/a
2804n/a def test_encoded_writes(self):
2805n/a data = "1234567890"
2806n/a tests = ("utf-16",
2807n/a "utf-16-le",
2808n/a "utf-16-be",
2809n/a "utf-32",
2810n/a "utf-32-le",
2811n/a "utf-32-be")
2812n/a for encoding in tests:
2813n/a buf = self.BytesIO()
2814n/a f = self.TextIOWrapper(buf, encoding=encoding)
2815n/a # Check if the BOM is written only once (see issue1753).
2816n/a f.write(data)
2817n/a f.write(data)
2818n/a f.seek(0)
2819n/a self.assertEqual(f.read(), data * 2)
2820n/a f.seek(0)
2821n/a self.assertEqual(f.read(), data * 2)
2822n/a self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2823n/a
2824n/a def test_unreadable(self):
2825n/a class UnReadable(self.BytesIO):
2826n/a def readable(self):
2827n/a return False
2828n/a txt = self.TextIOWrapper(UnReadable())
2829n/a self.assertRaises(OSError, txt.read)
2830n/a
2831n/a def test_read_one_by_one(self):
2832n/a txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2833n/a reads = ""
2834n/a while True:
2835n/a c = txt.read(1)
2836n/a if not c:
2837n/a break
2838n/a reads += c
2839n/a self.assertEqual(reads, "AA\nBB")
2840n/a
2841n/a def test_readlines(self):
2842n/a txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2843n/a self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2844n/a txt.seek(0)
2845n/a self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2846n/a txt.seek(0)
2847n/a self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2848n/a
2849n/a # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2850n/a def test_read_by_chunk(self):
2851n/a # make sure "\r\n" straddles 128 char boundary.
2852n/a txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2853n/a reads = ""
2854n/a while True:
2855n/a c = txt.read(128)
2856n/a if not c:
2857n/a break
2858n/a reads += c
2859n/a self.assertEqual(reads, "A"*127+"\nB")
2860n/a
2861n/a def test_writelines(self):
2862n/a l = ['ab', 'cd', 'ef']
2863n/a buf = self.BytesIO()
2864n/a txt = self.TextIOWrapper(buf)
2865n/a txt.writelines(l)
2866n/a txt.flush()
2867n/a self.assertEqual(buf.getvalue(), b'abcdef')
2868n/a
2869n/a def test_writelines_userlist(self):
2870n/a l = UserList(['ab', 'cd', 'ef'])
2871n/a buf = self.BytesIO()
2872n/a txt = self.TextIOWrapper(buf)
2873n/a txt.writelines(l)
2874n/a txt.flush()
2875n/a self.assertEqual(buf.getvalue(), b'abcdef')
2876n/a
2877n/a def test_writelines_error(self):
2878n/a txt = self.TextIOWrapper(self.BytesIO())
2879n/a self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2880n/a self.assertRaises(TypeError, txt.writelines, None)
2881n/a self.assertRaises(TypeError, txt.writelines, b'abc')
2882n/a
2883n/a def test_issue1395_1(self):
2884n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2885n/a
2886n/a # read one char at a time
2887n/a reads = ""
2888n/a while True:
2889n/a c = txt.read(1)
2890n/a if not c:
2891n/a break
2892n/a reads += c
2893n/a self.assertEqual(reads, self.normalized)
2894n/a
2895n/a def test_issue1395_2(self):
2896n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2897n/a txt._CHUNK_SIZE = 4
2898n/a
2899n/a reads = ""
2900n/a while True:
2901n/a c = txt.read(4)
2902n/a if not c:
2903n/a break
2904n/a reads += c
2905n/a self.assertEqual(reads, self.normalized)
2906n/a
2907n/a def test_issue1395_3(self):
2908n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2909n/a txt._CHUNK_SIZE = 4
2910n/a
2911n/a reads = txt.read(4)
2912n/a reads += txt.read(4)
2913n/a reads += txt.readline()
2914n/a reads += txt.readline()
2915n/a reads += txt.readline()
2916n/a self.assertEqual(reads, self.normalized)
2917n/a
2918n/a def test_issue1395_4(self):
2919n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2920n/a txt._CHUNK_SIZE = 4
2921n/a
2922n/a reads = txt.read(4)
2923n/a reads += txt.read()
2924n/a self.assertEqual(reads, self.normalized)
2925n/a
2926n/a def test_issue1395_5(self):
2927n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2928n/a txt._CHUNK_SIZE = 4
2929n/a
2930n/a reads = txt.read(4)
2931n/a pos = txt.tell()
2932n/a txt.seek(0)
2933n/a txt.seek(pos)
2934n/a self.assertEqual(txt.read(4), "BBB\n")
2935n/a
2936n/a def test_issue2282(self):
2937n/a buffer = self.BytesIO(self.testdata)
2938n/a txt = self.TextIOWrapper(buffer, encoding="ascii")
2939n/a
2940n/a self.assertEqual(buffer.seekable(), txt.seekable())
2941n/a
2942n/a def test_append_bom(self):
2943n/a # The BOM is not written again when appending to a non-empty file
2944n/a filename = support.TESTFN
2945n/a for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2946n/a with self.open(filename, 'w', encoding=charset) as f:
2947n/a f.write('aaa')
2948n/a pos = f.tell()
2949n/a with self.open(filename, 'rb') as f:
2950n/a self.assertEqual(f.read(), 'aaa'.encode(charset))
2951n/a
2952n/a with self.open(filename, 'a', encoding=charset) as f:
2953n/a f.write('xxx')
2954n/a with self.open(filename, 'rb') as f:
2955n/a self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2956n/a
2957n/a def test_seek_bom(self):
2958n/a # Same test, but when seeking manually
2959n/a filename = support.TESTFN
2960n/a for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2961n/a with self.open(filename, 'w', encoding=charset) as f:
2962n/a f.write('aaa')
2963n/a pos = f.tell()
2964n/a with self.open(filename, 'r+', encoding=charset) as f:
2965n/a f.seek(pos)
2966n/a f.write('zzz')
2967n/a f.seek(0)
2968n/a f.write('bbb')
2969n/a with self.open(filename, 'rb') as f:
2970n/a self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2971n/a
2972n/a def test_seek_append_bom(self):
2973n/a # Same test, but first seek to the start and then to the end
2974n/a filename = support.TESTFN
2975n/a for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2976n/a with self.open(filename, 'w', encoding=charset) as f:
2977n/a f.write('aaa')
2978n/a with self.open(filename, 'a', encoding=charset) as f:
2979n/a f.seek(0)
2980n/a f.seek(0, self.SEEK_END)
2981n/a f.write('xxx')
2982n/a with self.open(filename, 'rb') as f:
2983n/a self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2984n/a
2985n/a def test_errors_property(self):
2986n/a with self.open(support.TESTFN, "w") as f:
2987n/a self.assertEqual(f.errors, "strict")
2988n/a with self.open(support.TESTFN, "w", errors="replace") as f:
2989n/a self.assertEqual(f.errors, "replace")
2990n/a
2991n/a @support.no_tracing
2992n/a @unittest.skipUnless(threading, 'Threading required for this test.')
2993n/a def test_threads_write(self):
2994n/a # Issue6750: concurrent writes could duplicate data
2995n/a event = threading.Event()
2996n/a with self.open(support.TESTFN, "w", buffering=1) as f:
2997n/a def run(n):
2998n/a text = "Thread%03d\n" % n
2999n/a event.wait()
3000n/a f.write(text)
3001n/a threads = [threading.Thread(target=run, args=(x,))
3002n/a for x in range(20)]
3003n/a with support.start_threads(threads, event.set):
3004n/a time.sleep(0.02)
3005n/a with self.open(support.TESTFN) as f:
3006n/a content = f.read()
3007n/a for n in range(20):
3008n/a self.assertEqual(content.count("Thread%03d\n" % n), 1)
3009n/a
3010n/a def test_flush_error_on_close(self):
3011n/a # Test that text file is closed despite failed flush
3012n/a # and that flush() is called before file closed.
3013n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3014n/a closed = []
3015n/a def bad_flush():
3016n/a closed[:] = [txt.closed, txt.buffer.closed]
3017n/a raise OSError()
3018n/a txt.flush = bad_flush
3019n/a self.assertRaises(OSError, txt.close) # exception not swallowed
3020n/a self.assertTrue(txt.closed)
3021n/a self.assertTrue(txt.buffer.closed)
3022n/a self.assertTrue(closed) # flush() called
3023n/a self.assertFalse(closed[0]) # flush() called before file closed
3024n/a self.assertFalse(closed[1])
3025n/a txt.flush = lambda: None # break reference loop
3026n/a
3027n/a def test_close_error_on_close(self):
3028n/a buffer = self.BytesIO(self.testdata)
3029n/a def bad_flush():
3030n/a raise OSError('flush')
3031n/a def bad_close():
3032n/a raise OSError('close')
3033n/a buffer.close = bad_close
3034n/a txt = self.TextIOWrapper(buffer, encoding="ascii")
3035n/a txt.flush = bad_flush
3036n/a with self.assertRaises(OSError) as err: # exception not swallowed
3037n/a txt.close()
3038n/a self.assertEqual(err.exception.args, ('close',))
3039n/a self.assertIsInstance(err.exception.__context__, OSError)
3040n/a self.assertEqual(err.exception.__context__.args, ('flush',))
3041n/a self.assertFalse(txt.closed)
3042n/a
3043n/a def test_nonnormalized_close_error_on_close(self):
3044n/a # Issue #21677
3045n/a buffer = self.BytesIO(self.testdata)
3046n/a def bad_flush():
3047n/a raise non_existing_flush
3048n/a def bad_close():
3049n/a raise non_existing_close
3050n/a buffer.close = bad_close
3051n/a txt = self.TextIOWrapper(buffer, encoding="ascii")
3052n/a txt.flush = bad_flush
3053n/a with self.assertRaises(NameError) as err: # exception not swallowed
3054n/a txt.close()
3055n/a self.assertIn('non_existing_close', str(err.exception))
3056n/a self.assertIsInstance(err.exception.__context__, NameError)
3057n/a self.assertIn('non_existing_flush', str(err.exception.__context__))
3058n/a self.assertFalse(txt.closed)
3059n/a
3060n/a def test_multi_close(self):
3061n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3062n/a txt.close()
3063n/a txt.close()
3064n/a txt.close()
3065n/a self.assertRaises(ValueError, txt.flush)
3066n/a
3067n/a def test_unseekable(self):
3068n/a txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3069n/a self.assertRaises(self.UnsupportedOperation, txt.tell)
3070n/a self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3071n/a
3072n/a def test_readonly_attributes(self):
3073n/a txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3074n/a buf = self.BytesIO(self.testdata)
3075n/a with self.assertRaises(AttributeError):
3076n/a txt.buffer = buf
3077n/a
3078n/a def test_rawio(self):
3079n/a # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3080n/a # that subprocess.Popen() can have the required unbuffered
3081n/a # semantics with universal_newlines=True.
3082n/a raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3083n/a txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3084n/a # Reads
3085n/a self.assertEqual(txt.read(4), 'abcd')
3086n/a self.assertEqual(txt.readline(), 'efghi\n')
3087n/a self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3088n/a
3089n/a def test_rawio_write_through(self):
3090n/a # Issue #12591: with write_through=True, writes don't need a flush
3091n/a raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3092n/a txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3093n/a write_through=True)
3094n/a txt.write('1')
3095n/a txt.write('23\n4')
3096n/a txt.write('5')
3097n/a self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3098n/a
3099n/a def test_bufio_write_through(self):
3100n/a # Issue #21396: write_through=True doesn't force a flush()
3101n/a # on the underlying binary buffered object.
3102n/a flush_called, write_called = [], []
3103n/a class BufferedWriter(self.BufferedWriter):
3104n/a def flush(self, *args, **kwargs):
3105n/a flush_called.append(True)
3106n/a return super().flush(*args, **kwargs)
3107n/a def write(self, *args, **kwargs):
3108n/a write_called.append(True)
3109n/a return super().write(*args, **kwargs)
3110n/a
3111n/a rawio = self.BytesIO()
3112n/a data = b"a"
3113n/a bufio = BufferedWriter(rawio, len(data)*2)
3114n/a textio = self.TextIOWrapper(bufio, encoding='ascii',
3115n/a write_through=True)
3116n/a # write to the buffered io but don't overflow the buffer
3117n/a text = data.decode('ascii')
3118n/a textio.write(text)
3119n/a
3120n/a # buffer.flush is not called with write_through=True
3121n/a self.assertFalse(flush_called)
3122n/a # buffer.write *is* called with write_through=True
3123n/a self.assertTrue(write_called)
3124n/a self.assertEqual(rawio.getvalue(), b"") # no flush
3125n/a
3126n/a write_called = [] # reset
3127n/a textio.write(text * 10) # total content is larger than bufio buffer
3128n/a self.assertTrue(write_called)
3129n/a self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3130n/a
3131n/a def test_read_nonbytes(self):
3132n/a # Issue #17106
3133n/a # Crash when underlying read() returns non-bytes
3134n/a t = self.TextIOWrapper(self.StringIO('a'))
3135n/a self.assertRaises(TypeError, t.read, 1)
3136n/a t = self.TextIOWrapper(self.StringIO('a'))
3137n/a self.assertRaises(TypeError, t.readline)
3138n/a t = self.TextIOWrapper(self.StringIO('a'))
3139n/a self.assertRaises(TypeError, t.read)
3140n/a
3141n/a def test_illegal_decoder(self):
3142n/a # Issue #17106
3143n/a # Bypass the early encoding check added in issue 20404
3144n/a def _make_illegal_wrapper():
3145n/a quopri = codecs.lookup("quopri")
3146n/a quopri._is_text_encoding = True
3147n/a try:
3148n/a t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3149n/a newline='\n', encoding="quopri")
3150n/a finally:
3151n/a quopri._is_text_encoding = False
3152n/a return t
3153n/a # Crash when decoder returns non-string
3154n/a t = _make_illegal_wrapper()
3155n/a self.assertRaises(TypeError, t.read, 1)
3156n/a t = _make_illegal_wrapper()
3157n/a self.assertRaises(TypeError, t.readline)
3158n/a t = _make_illegal_wrapper()
3159n/a self.assertRaises(TypeError, t.read)
3160n/a
3161n/a def _check_create_at_shutdown(self, **kwargs):
3162n/a # Issue #20037: creating a TextIOWrapper at shutdown
3163n/a # shouldn't crash the interpreter.
3164n/a iomod = self.io.__name__
3165n/a code = """if 1:
3166n/a import codecs
3167n/a import {iomod} as io
3168n/a
3169n/a # Avoid looking up codecs at shutdown
3170n/a codecs.lookup('utf-8')
3171n/a
3172n/a class C:
3173n/a def __init__(self):
3174n/a self.buf = io.BytesIO()
3175n/a def __del__(self):
3176n/a io.TextIOWrapper(self.buf, **{kwargs})
3177n/a print("ok")
3178n/a c = C()
3179n/a """.format(iomod=iomod, kwargs=kwargs)
3180n/a return assert_python_ok("-c", code)
3181n/a
3182n/a @support.requires_type_collecting
3183n/a def test_create_at_shutdown_without_encoding(self):
3184n/a rc, out, err = self._check_create_at_shutdown()
3185n/a if err:
3186n/a # Can error out with a RuntimeError if the module state
3187n/a # isn't found.
3188n/a self.assertIn(self.shutdown_error, err.decode())
3189n/a else:
3190n/a self.assertEqual("ok", out.decode().strip())
3191n/a
3192n/a @support.requires_type_collecting
3193n/a def test_create_at_shutdown_with_encoding(self):
3194n/a rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3195n/a errors='strict')
3196n/a self.assertFalse(err)
3197n/a self.assertEqual("ok", out.decode().strip())
3198n/a
3199n/a def test_read_byteslike(self):
3200n/a r = MemviewBytesIO(b'Just some random string\n')
3201n/a t = self.TextIOWrapper(r, 'utf-8')
3202n/a
3203n/a # TextIOwrapper will not read the full string, because
3204n/a # we truncate it to a multiple of the native int size
3205n/a # so that we can construct a more complex memoryview.
3206n/a bytes_val = _to_memoryview(r.getvalue()).tobytes()
3207n/a
3208n/a self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3209n/a
3210n/a def test_issue22849(self):
3211n/a class F(object):
3212n/a def readable(self): return True
3213n/a def writable(self): return True
3214n/a def seekable(self): return True
3215n/a
3216n/a for i in range(10):
3217n/a try:
3218n/a self.TextIOWrapper(F(), encoding='utf-8')
3219n/a except Exception:
3220n/a pass
3221n/a
3222n/a F.tell = lambda x: 0
3223n/a t = self.TextIOWrapper(F(), encoding='utf-8')
3224n/a
3225n/a
3226n/aclass MemviewBytesIO(io.BytesIO):
3227n/a '''A BytesIO object whose read method returns memoryviews
3228n/a rather than bytes'''
3229n/a
3230n/a def read1(self, len_):
3231n/a return _to_memoryview(super().read1(len_))
3232n/a
3233n/a def read(self, len_):
3234n/a return _to_memoryview(super().read(len_))
3235n/a
3236n/adef _to_memoryview(buf):
3237n/a '''Convert bytes-object *buf* to a non-trivial memoryview'''
3238n/a
3239n/a arr = array.array('i')
3240n/a idx = len(buf) - len(buf) % arr.itemsize
3241n/a arr.frombytes(buf[:idx])
3242n/a return memoryview(arr)
3243n/a
3244n/a
3245n/aclass CTextIOWrapperTest(TextIOWrapperTest):
3246n/a io = io
3247n/a shutdown_error = "RuntimeError: could not find io module state"
3248n/a
3249n/a def test_initialization(self):
3250n/a r = self.BytesIO(b"\xc3\xa9\n\n")
3251n/a b = self.BufferedReader(r, 1000)
3252n/a t = self.TextIOWrapper(b)
3253n/a self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3254n/a self.assertRaises(ValueError, t.read)
3255n/a
3256n/a t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3257n/a self.assertRaises(Exception, repr, t)
3258n/a
3259n/a def test_garbage_collection(self):
3260n/a # C TextIOWrapper objects are collected, and collecting them flushes
3261n/a # all data to disk.
3262n/a # The Python version has __del__, so it ends in gc.garbage instead.
3263n/a with support.check_warnings(('', ResourceWarning)):
3264n/a rawio = io.FileIO(support.TESTFN, "wb")
3265n/a b = self.BufferedWriter(rawio)
3266n/a t = self.TextIOWrapper(b, encoding="ascii")
3267n/a t.write("456def")
3268n/a t.x = t
3269n/a wr = weakref.ref(t)
3270n/a del t
3271n/a support.gc_collect()
3272n/a self.assertIsNone(wr(), wr)
3273n/a with self.open(support.TESTFN, "rb") as f:
3274n/a self.assertEqual(f.read(), b"456def")
3275n/a
3276n/a def test_rwpair_cleared_before_textio(self):
3277n/a # Issue 13070: TextIOWrapper's finalization would crash when called
3278n/a # after the reference to the underlying BufferedRWPair's writer got
3279n/a # cleared by the GC.
3280n/a for i in range(1000):
3281n/a b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3282n/a t1 = self.TextIOWrapper(b1, encoding="ascii")
3283n/a b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3284n/a t2 = self.TextIOWrapper(b2, encoding="ascii")
3285n/a # circular references
3286n/a t1.buddy = t2
3287n/a t2.buddy = t1
3288n/a support.gc_collect()
3289n/a
3290n/a
3291n/aclass PyTextIOWrapperTest(TextIOWrapperTest):
3292n/a io = pyio
3293n/a shutdown_error = "LookupError: unknown encoding: ascii"
3294n/a
3295n/a
3296n/aclass IncrementalNewlineDecoderTest(unittest.TestCase):
3297n/a
3298n/a def check_newline_decoding_utf8(self, decoder):
3299n/a # UTF-8 specific tests for a newline decoder
3300n/a def _check_decode(b, s, **kwargs):
3301n/a # We exercise getstate() / setstate() as well as decode()
3302n/a state = decoder.getstate()
3303n/a self.assertEqual(decoder.decode(b, **kwargs), s)
3304n/a decoder.setstate(state)
3305n/a self.assertEqual(decoder.decode(b, **kwargs), s)
3306n/a
3307n/a _check_decode(b'\xe8\xa2\x88', "\u8888")
3308n/a
3309n/a _check_decode(b'\xe8', "")
3310n/a _check_decode(b'\xa2', "")
3311n/a _check_decode(b'\x88', "\u8888")
3312n/a
3313n/a _check_decode(b'\xe8', "")
3314n/a _check_decode(b'\xa2', "")
3315n/a _check_decode(b'\x88', "\u8888")
3316n/a
3317n/a _check_decode(b'\xe8', "")
3318n/a self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3319n/a
3320n/a decoder.reset()
3321n/a _check_decode(b'\n', "\n")
3322n/a _check_decode(b'\r', "")
3323n/a _check_decode(b'', "\n", final=True)
3324n/a _check_decode(b'\r', "\n", final=True)
3325n/a
3326n/a _check_decode(b'\r', "")
3327n/a _check_decode(b'a', "\na")
3328n/a
3329n/a _check_decode(b'\r\r\n', "\n\n")
3330n/a _check_decode(b'\r', "")
3331n/a _check_decode(b'\r', "\n")
3332n/a _check_decode(b'\na', "\na")
3333n/a
3334n/a _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3335n/a _check_decode(b'\xe8\xa2\x88', "\u8888")
3336n/a _check_decode(b'\n', "\n")
3337n/a _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3338n/a _check_decode(b'\n', "\n")
3339n/a
3340n/a def check_newline_decoding(self, decoder, encoding):
3341n/a result = []
3342n/a if encoding is not None:
3343n/a encoder = codecs.getincrementalencoder(encoding)()
3344n/a def _decode_bytewise(s):
3345n/a # Decode one byte at a time
3346n/a for b in encoder.encode(s):
3347n/a result.append(decoder.decode(bytes([b])))
3348n/a else:
3349n/a encoder = None
3350n/a def _decode_bytewise(s):
3351n/a # Decode one char at a time
3352n/a for c in s:
3353n/a result.append(decoder.decode(c))
3354n/a self.assertEqual(decoder.newlines, None)
3355n/a _decode_bytewise("abc\n\r")
3356n/a self.assertEqual(decoder.newlines, '\n')
3357n/a _decode_bytewise("\nabc")
3358n/a self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3359n/a _decode_bytewise("abc\r")
3360n/a self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3361n/a _decode_bytewise("abc")
3362n/a self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3363n/a _decode_bytewise("abc\r")
3364n/a self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3365n/a decoder.reset()
3366n/a input = "abc"
3367n/a if encoder is not None:
3368n/a encoder.reset()
3369n/a input = encoder.encode(input)
3370n/a self.assertEqual(decoder.decode(input), "abc")
3371n/a self.assertEqual(decoder.newlines, None)
3372n/a
3373n/a def test_newline_decoder(self):
3374n/a encodings = (
3375n/a # None meaning the IncrementalNewlineDecoder takes unicode input
3376n/a # rather than bytes input
3377n/a None, 'utf-8', 'latin-1',
3378n/a 'utf-16', 'utf-16-le', 'utf-16-be',
3379n/a 'utf-32', 'utf-32-le', 'utf-32-be',
3380n/a )
3381n/a for enc in encodings:
3382n/a decoder = enc and codecs.getincrementaldecoder(enc)()
3383n/a decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3384n/a self.check_newline_decoding(decoder, enc)
3385n/a decoder = codecs.getincrementaldecoder("utf-8")()
3386n/a decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3387n/a self.check_newline_decoding_utf8(decoder)
3388n/a
3389n/a def test_newline_bytes(self):
3390n/a # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3391n/a def _check(dec):
3392n/a self.assertEqual(dec.newlines, None)
3393n/a self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3394n/a self.assertEqual(dec.newlines, None)
3395n/a self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3396n/a self.assertEqual(dec.newlines, None)
3397n/a dec = self.IncrementalNewlineDecoder(None, translate=False)
3398n/a _check(dec)
3399n/a dec = self.IncrementalNewlineDecoder(None, translate=True)
3400n/a _check(dec)
3401n/a
3402n/aclass CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3403n/a pass
3404n/a
3405n/aclass PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3406n/a pass
3407n/a
3408n/a
3409n/a# XXX Tests for open()
3410n/a
3411n/aclass MiscIOTest(unittest.TestCase):
3412n/a
3413n/a def tearDown(self):
3414n/a support.unlink(support.TESTFN)
3415n/a
3416n/a def test___all__(self):
3417n/a for name in self.io.__all__:
3418n/a obj = getattr(self.io, name, None)
3419n/a self.assertIsNotNone(obj, name)
3420n/a if name == "open":
3421n/a continue
3422n/a elif "error" in name.lower() or name == "UnsupportedOperation":
3423n/a self.assertTrue(issubclass(obj, Exception), name)
3424n/a elif not name.startswith("SEEK_"):
3425n/a self.assertTrue(issubclass(obj, self.IOBase))
3426n/a
3427n/a def test_attributes(self):
3428n/a f = self.open(support.TESTFN, "wb", buffering=0)
3429n/a self.assertEqual(f.mode, "wb")
3430n/a f.close()
3431n/a
3432n/a with support.check_warnings(('', DeprecationWarning)):
3433n/a f = self.open(support.TESTFN, "U")
3434n/a self.assertEqual(f.name, support.TESTFN)
3435n/a self.assertEqual(f.buffer.name, support.TESTFN)
3436n/a self.assertEqual(f.buffer.raw.name, support.TESTFN)
3437n/a self.assertEqual(f.mode, "U")
3438n/a self.assertEqual(f.buffer.mode, "rb")
3439n/a self.assertEqual(f.buffer.raw.mode, "rb")
3440n/a f.close()
3441n/a
3442n/a f = self.open(support.TESTFN, "w+")
3443n/a self.assertEqual(f.mode, "w+")
3444n/a self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3445n/a self.assertEqual(f.buffer.raw.mode, "rb+")
3446n/a
3447n/a g = self.open(f.fileno(), "wb", closefd=False)
3448n/a self.assertEqual(g.mode, "wb")
3449n/a self.assertEqual(g.raw.mode, "wb")
3450n/a self.assertEqual(g.name, f.fileno())
3451n/a self.assertEqual(g.raw.name, f.fileno())
3452n/a f.close()
3453n/a g.close()
3454n/a
3455n/a def test_io_after_close(self):
3456n/a for kwargs in [
3457n/a {"mode": "w"},
3458n/a {"mode": "wb"},
3459n/a {"mode": "w", "buffering": 1},
3460n/a {"mode": "w", "buffering": 2},
3461n/a {"mode": "wb", "buffering": 0},
3462n/a {"mode": "r"},
3463n/a {"mode": "rb"},
3464n/a {"mode": "r", "buffering": 1},
3465n/a {"mode": "r", "buffering": 2},
3466n/a {"mode": "rb", "buffering": 0},
3467n/a {"mode": "w+"},
3468n/a {"mode": "w+b"},
3469n/a {"mode": "w+", "buffering": 1},
3470n/a {"mode": "w+", "buffering": 2},
3471n/a {"mode": "w+b", "buffering": 0},
3472n/a ]:
3473n/a f = self.open(support.TESTFN, **kwargs)
3474n/a f.close()
3475n/a self.assertRaises(ValueError, f.flush)
3476n/a self.assertRaises(ValueError, f.fileno)
3477n/a self.assertRaises(ValueError, f.isatty)
3478n/a self.assertRaises(ValueError, f.__iter__)
3479n/a if hasattr(f, "peek"):
3480n/a self.assertRaises(ValueError, f.peek, 1)
3481n/a self.assertRaises(ValueError, f.read)
3482n/a if hasattr(f, "read1"):
3483n/a self.assertRaises(ValueError, f.read1, 1024)
3484n/a self.assertRaises(ValueError, f.read1)
3485n/a if hasattr(f, "readall"):
3486n/a self.assertRaises(ValueError, f.readall)
3487n/a if hasattr(f, "readinto"):
3488n/a self.assertRaises(ValueError, f.readinto, bytearray(1024))
3489n/a if hasattr(f, "readinto1"):
3490n/a self.assertRaises(ValueError, f.readinto1, bytearray(1024))
3491n/a self.assertRaises(ValueError, f.readline)
3492n/a self.assertRaises(ValueError, f.readlines)
3493n/a self.assertRaises(ValueError, f.seek, 0)
3494n/a self.assertRaises(ValueError, f.tell)
3495n/a self.assertRaises(ValueError, f.truncate)
3496n/a self.assertRaises(ValueError, f.write,
3497n/a b"" if "b" in kwargs['mode'] else "")
3498n/a self.assertRaises(ValueError, f.writelines, [])
3499n/a self.assertRaises(ValueError, next, f)
3500n/a
3501n/a def test_blockingioerror(self):
3502n/a # Various BlockingIOError issues
3503n/a class C(str):
3504n/a pass
3505n/a c = C("")
3506n/a b = self.BlockingIOError(1, c)
3507n/a c.b = b
3508n/a b.c = c
3509n/a wr = weakref.ref(c)
3510n/a del c, b
3511n/a support.gc_collect()
3512n/a self.assertIsNone(wr(), wr)
3513n/a
3514n/a def test_abcs(self):
3515n/a # Test the visible base classes are ABCs.
3516n/a self.assertIsInstance(self.IOBase, abc.ABCMeta)
3517n/a self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3518n/a self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3519n/a self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
3520n/a
3521n/a def _check_abc_inheritance(self, abcmodule):
3522n/a with self.open(support.TESTFN, "wb", buffering=0) as f:
3523n/a self.assertIsInstance(f, abcmodule.IOBase)
3524n/a self.assertIsInstance(f, abcmodule.RawIOBase)
3525n/a self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3526n/a self.assertNotIsInstance(f, abcmodule.TextIOBase)
3527n/a with self.open(support.TESTFN, "wb") as f:
3528n/a self.assertIsInstance(f, abcmodule.IOBase)
3529n/a self.assertNotIsInstance(f, abcmodule.RawIOBase)
3530n/a self.assertIsInstance(f, abcmodule.BufferedIOBase)
3531n/a self.assertNotIsInstance(f, abcmodule.TextIOBase)
3532n/a with self.open(support.TESTFN, "w") as f:
3533n/a self.assertIsInstance(f, abcmodule.IOBase)
3534n/a self.assertNotIsInstance(f, abcmodule.RawIOBase)
3535n/a self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3536n/a self.assertIsInstance(f, abcmodule.TextIOBase)
3537n/a
3538n/a def test_abc_inheritance(self):
3539n/a # Test implementations inherit from their respective ABCs
3540n/a self._check_abc_inheritance(self)
3541n/a
3542n/a def test_abc_inheritance_official(self):
3543n/a # Test implementations inherit from the official ABCs of the
3544n/a # baseline "io" module.
3545n/a self._check_abc_inheritance(io)
3546n/a
3547n/a def _check_warn_on_dealloc(self, *args, **kwargs):
3548n/a f = open(*args, **kwargs)
3549n/a r = repr(f)
3550n/a with self.assertWarns(ResourceWarning) as cm:
3551n/a f = None
3552n/a support.gc_collect()
3553n/a self.assertIn(r, str(cm.warning.args[0]))
3554n/a
3555n/a def test_warn_on_dealloc(self):
3556n/a self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3557n/a self._check_warn_on_dealloc(support.TESTFN, "wb")
3558n/a self._check_warn_on_dealloc(support.TESTFN, "w")
3559n/a
3560n/a def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3561n/a fds = []
3562n/a def cleanup_fds():
3563n/a for fd in fds:
3564n/a try:
3565n/a os.close(fd)
3566n/a except OSError as e:
3567n/a if e.errno != errno.EBADF:
3568n/a raise
3569n/a self.addCleanup(cleanup_fds)
3570n/a r, w = os.pipe()
3571n/a fds += r, w
3572n/a self._check_warn_on_dealloc(r, *args, **kwargs)
3573n/a # When using closefd=False, there's no warning
3574n/a r, w = os.pipe()
3575n/a fds += r, w
3576n/a with support.check_no_resource_warning(self):
3577n/a open(r, *args, closefd=False, **kwargs)
3578n/a
3579n/a def test_warn_on_dealloc_fd(self):
3580n/a self._check_warn_on_dealloc_fd("rb", buffering=0)
3581n/a self._check_warn_on_dealloc_fd("rb")
3582n/a self._check_warn_on_dealloc_fd("r")
3583n/a
3584n/a
3585n/a def test_pickling(self):
3586n/a # Pickling file objects is forbidden
3587n/a for kwargs in [
3588n/a {"mode": "w"},
3589n/a {"mode": "wb"},
3590n/a {"mode": "wb", "buffering": 0},
3591n/a {"mode": "r"},
3592n/a {"mode": "rb"},
3593n/a {"mode": "rb", "buffering": 0},
3594n/a {"mode": "w+"},
3595n/a {"mode": "w+b"},
3596n/a {"mode": "w+b", "buffering": 0},
3597n/a ]:
3598n/a for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3599n/a with self.open(support.TESTFN, **kwargs) as f:
3600n/a self.assertRaises(TypeError, pickle.dumps, f, protocol)
3601n/a
3602n/a def test_nonblock_pipe_write_bigbuf(self):
3603n/a self._test_nonblock_pipe_write(16*1024)
3604n/a
3605n/a def test_nonblock_pipe_write_smallbuf(self):
3606n/a self._test_nonblock_pipe_write(1024)
3607n/a
3608n/a @unittest.skipUnless(hasattr(os, 'set_blocking'),
3609n/a 'os.set_blocking() required for this test')
3610n/a def _test_nonblock_pipe_write(self, bufsize):
3611n/a sent = []
3612n/a received = []
3613n/a r, w = os.pipe()
3614n/a os.set_blocking(r, False)
3615n/a os.set_blocking(w, False)
3616n/a
3617n/a # To exercise all code paths in the C implementation we need
3618n/a # to play with buffer sizes. For instance, if we choose a
3619n/a # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3620n/a # then we will never get a partial write of the buffer.
3621n/a rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3622n/a wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3623n/a
3624n/a with rf, wf:
3625n/a for N in 9999, 73, 7574:
3626n/a try:
3627n/a i = 0
3628n/a while True:
3629n/a msg = bytes([i % 26 + 97]) * N
3630n/a sent.append(msg)
3631n/a wf.write(msg)
3632n/a i += 1
3633n/a
3634n/a except self.BlockingIOError as e:
3635n/a self.assertEqual(e.args[0], errno.EAGAIN)
3636n/a self.assertEqual(e.args[2], e.characters_written)
3637n/a sent[-1] = sent[-1][:e.characters_written]
3638n/a received.append(rf.read())
3639n/a msg = b'BLOCKED'
3640n/a wf.write(msg)
3641n/a sent.append(msg)
3642n/a
3643n/a while True:
3644n/a try:
3645n/a wf.flush()
3646n/a break
3647n/a except self.BlockingIOError as e:
3648n/a self.assertEqual(e.args[0], errno.EAGAIN)
3649n/a self.assertEqual(e.args[2], e.characters_written)
3650n/a self.assertEqual(e.characters_written, 0)
3651n/a received.append(rf.read())
3652n/a
3653n/a received += iter(rf.read, None)
3654n/a
3655n/a sent, received = b''.join(sent), b''.join(received)
3656n/a self.assertEqual(sent, received)
3657n/a self.assertTrue(wf.closed)
3658n/a self.assertTrue(rf.closed)
3659n/a
3660n/a def test_create_fail(self):
3661n/a # 'x' mode fails if file is existing
3662n/a with self.open(support.TESTFN, 'w'):
3663n/a pass
3664n/a self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3665n/a
3666n/a def test_create_writes(self):
3667n/a # 'x' mode opens for writing
3668n/a with self.open(support.TESTFN, 'xb') as f:
3669n/a f.write(b"spam")
3670n/a with self.open(support.TESTFN, 'rb') as f:
3671n/a self.assertEqual(b"spam", f.read())
3672n/a
3673n/a def test_open_allargs(self):
3674n/a # there used to be a buffer overflow in the parser for rawmode
3675n/a self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3676n/a
3677n/a
3678n/aclass CMiscIOTest(MiscIOTest):
3679n/a io = io
3680n/a
3681n/a def test_readinto_buffer_overflow(self):
3682n/a # Issue #18025
3683n/a class BadReader(self.io.BufferedIOBase):
3684n/a def read(self, n=-1):
3685n/a return b'x' * 10**6
3686n/a bufio = BadReader()
3687n/a b = bytearray(2)
3688n/a self.assertRaises(ValueError, bufio.readinto, b)
3689n/a
3690n/a @unittest.skipUnless(threading, 'Threading required for this test.')
3691n/a def check_daemon_threads_shutdown_deadlock(self, stream_name):
3692n/a # Issue #23309: deadlocks at shutdown should be avoided when a
3693n/a # daemon thread and the main thread both write to a file.
3694n/a code = """if 1:
3695n/a import sys
3696n/a import time
3697n/a import threading
3698n/a
3699n/a file = sys.{stream_name}
3700n/a
3701n/a def run():
3702n/a while True:
3703n/a file.write('.')
3704n/a file.flush()
3705n/a
3706n/a thread = threading.Thread(target=run)
3707n/a thread.daemon = True
3708n/a thread.start()
3709n/a
3710n/a time.sleep(0.5)
3711n/a file.write('!')
3712n/a file.flush()
3713n/a """.format_map(locals())
3714n/a res, _ = run_python_until_end("-c", code)
3715n/a err = res.err.decode()
3716n/a if res.rc != 0:
3717n/a # Failure: should be a fatal error
3718n/a self.assertIn("Fatal Python error: could not acquire lock "
3719n/a "for <_io.BufferedWriter name='<{stream_name}>'> "
3720n/a "at interpreter shutdown, possibly due to "
3721n/a "daemon threads".format_map(locals()),
3722n/a err)
3723n/a else:
3724n/a self.assertFalse(err.strip('.!'))
3725n/a
3726n/a def test_daemon_threads_shutdown_stdout_deadlock(self):
3727n/a self.check_daemon_threads_shutdown_deadlock('stdout')
3728n/a
3729n/a def test_daemon_threads_shutdown_stderr_deadlock(self):
3730n/a self.check_daemon_threads_shutdown_deadlock('stderr')
3731n/a
3732n/a
3733n/aclass PyMiscIOTest(MiscIOTest):
3734n/a io = pyio
3735n/a
3736n/a
3737n/a@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3738n/aclass SignalsTest(unittest.TestCase):
3739n/a
3740n/a def setUp(self):
3741n/a self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3742n/a
3743n/a def tearDown(self):
3744n/a signal.signal(signal.SIGALRM, self.oldalrm)
3745n/a
3746n/a def alarm_interrupt(self, sig, frame):
3747n/a 1/0
3748n/a
3749n/a @unittest.skipUnless(threading, 'Threading required for this test.')
3750n/a def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3751n/a """Check that a partial write, when it gets interrupted, properly
3752n/a invokes the signal handler, and bubbles up the exception raised
3753n/a in the latter."""
3754n/a read_results = []
3755n/a def _read():
3756n/a if hasattr(signal, 'pthread_sigmask'):
3757n/a signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
3758n/a s = os.read(r, 1)
3759n/a read_results.append(s)
3760n/a t = threading.Thread(target=_read)
3761n/a t.daemon = True
3762n/a r, w = os.pipe()
3763n/a fdopen_kwargs["closefd"] = False
3764n/a large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
3765n/a try:
3766n/a wio = self.io.open(w, **fdopen_kwargs)
3767n/a t.start()
3768n/a # Fill the pipe enough that the write will be blocking.
3769n/a # It will be interrupted by the timer armed above. Since the
3770n/a # other thread has read one byte, the low-level write will
3771n/a # return with a successful (partial) result rather than an EINTR.
3772n/a # The buffered IO layer must check for pending signal
3773n/a # handlers, which in this case will invoke alarm_interrupt().
3774n/a signal.alarm(1)
3775n/a try:
3776n/a self.assertRaises(ZeroDivisionError, wio.write, large_data)
3777n/a finally:
3778n/a signal.alarm(0)
3779n/a t.join()
3780n/a # We got one byte, get another one and check that it isn't a
3781n/a # repeat of the first one.
3782n/a read_results.append(os.read(r, 1))
3783n/a self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3784n/a finally:
3785n/a os.close(w)
3786n/a os.close(r)
3787n/a # This is deliberate. If we didn't close the file descriptor
3788n/a # before closing wio, wio would try to flush its internal
3789n/a # buffer, and block again.
3790n/a try:
3791n/a wio.close()
3792n/a except OSError as e:
3793n/a if e.errno != errno.EBADF:
3794n/a raise
3795n/a
3796n/a def test_interrupted_write_unbuffered(self):
3797n/a self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3798n/a
3799n/a def test_interrupted_write_buffered(self):
3800n/a self.check_interrupted_write(b"xy", b"xy", mode="wb")
3801n/a
3802n/a # Issue #22331: The test hangs on FreeBSD 7.2
3803n/a @support.requires_freebsd_version(8)
3804n/a def test_interrupted_write_text(self):
3805n/a self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3806n/a
3807n/a @support.no_tracing
3808n/a def check_reentrant_write(self, data, **fdopen_kwargs):
3809n/a def on_alarm(*args):
3810n/a # Will be called reentrantly from the same thread
3811n/a wio.write(data)
3812n/a 1/0
3813n/a signal.signal(signal.SIGALRM, on_alarm)
3814n/a r, w = os.pipe()
3815n/a wio = self.io.open(w, **fdopen_kwargs)
3816n/a try:
3817n/a signal.alarm(1)
3818n/a # Either the reentrant call to wio.write() fails with RuntimeError,
3819n/a # or the signal handler raises ZeroDivisionError.
3820n/a with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3821n/a while 1:
3822n/a for i in range(100):
3823n/a wio.write(data)
3824n/a wio.flush()
3825n/a # Make sure the buffer doesn't fill up and block further writes
3826n/a os.read(r, len(data) * 100)
3827n/a exc = cm.exception
3828n/a if isinstance(exc, RuntimeError):
3829n/a self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3830n/a finally:
3831n/a wio.close()
3832n/a os.close(r)
3833n/a
3834n/a def test_reentrant_write_buffered(self):
3835n/a self.check_reentrant_write(b"xy", mode="wb")
3836n/a
3837n/a def test_reentrant_write_text(self):
3838n/a self.check_reentrant_write("xy", mode="w", encoding="ascii")
3839n/a
3840n/a def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3841n/a """Check that a buffered read, when it gets interrupted (either
3842n/a returning a partial result or EINTR), properly invokes the signal
3843n/a handler and retries if the latter returned successfully."""
3844n/a r, w = os.pipe()
3845n/a fdopen_kwargs["closefd"] = False
3846n/a def alarm_handler(sig, frame):
3847n/a os.write(w, b"bar")
3848n/a signal.signal(signal.SIGALRM, alarm_handler)
3849n/a try:
3850n/a rio = self.io.open(r, **fdopen_kwargs)
3851n/a os.write(w, b"foo")
3852n/a signal.alarm(1)
3853n/a # Expected behaviour:
3854n/a # - first raw read() returns partial b"foo"
3855n/a # - second raw read() returns EINTR
3856n/a # - third raw read() returns b"bar"
3857n/a self.assertEqual(decode(rio.read(6)), "foobar")
3858n/a finally:
3859n/a rio.close()
3860n/a os.close(w)
3861n/a os.close(r)
3862n/a
3863n/a def test_interrupted_read_retry_buffered(self):
3864n/a self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3865n/a mode="rb")
3866n/a
3867n/a def test_interrupted_read_retry_text(self):
3868n/a self.check_interrupted_read_retry(lambda x: x,
3869n/a mode="r")
3870n/a
3871n/a @unittest.skipUnless(threading, 'Threading required for this test.')
3872n/a def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3873n/a """Check that a buffered write, when it gets interrupted (either
3874n/a returning a partial result or EINTR), properly invokes the signal
3875n/a handler and retries if the latter returned successfully."""
3876n/a select = support.import_module("select")
3877n/a
3878n/a # A quantity that exceeds the buffer size of an anonymous pipe's
3879n/a # write end.
3880n/a N = support.PIPE_MAX_SIZE
3881n/a r, w = os.pipe()
3882n/a fdopen_kwargs["closefd"] = False
3883n/a
3884n/a # We need a separate thread to read from the pipe and allow the
3885n/a # write() to finish. This thread is started after the SIGALRM is
3886n/a # received (forcing a first EINTR in write()).
3887n/a read_results = []
3888n/a write_finished = False
3889n/a error = None
3890n/a def _read():
3891n/a try:
3892n/a while not write_finished:
3893n/a while r in select.select([r], [], [], 1.0)[0]:
3894n/a s = os.read(r, 1024)
3895n/a read_results.append(s)
3896n/a except BaseException as exc:
3897n/a nonlocal error
3898n/a error = exc
3899n/a t = threading.Thread(target=_read)
3900n/a t.daemon = True
3901n/a def alarm1(sig, frame):
3902n/a signal.signal(signal.SIGALRM, alarm2)
3903n/a signal.alarm(1)
3904n/a def alarm2(sig, frame):
3905n/a t.start()
3906n/a
3907n/a large_data = item * N
3908n/a signal.signal(signal.SIGALRM, alarm1)
3909n/a try:
3910n/a wio = self.io.open(w, **fdopen_kwargs)
3911n/a signal.alarm(1)
3912n/a # Expected behaviour:
3913n/a # - first raw write() is partial (because of the limited pipe buffer
3914n/a # and the first alarm)
3915n/a # - second raw write() returns EINTR (because of the second alarm)
3916n/a # - subsequent write()s are successful (either partial or complete)
3917n/a written = wio.write(large_data)
3918n/a self.assertEqual(N, written)
3919n/a
3920n/a wio.flush()
3921n/a write_finished = True
3922n/a t.join()
3923n/a
3924n/a self.assertIsNone(error)
3925n/a self.assertEqual(N, sum(len(x) for x in read_results))
3926n/a finally:
3927n/a write_finished = True
3928n/a os.close(w)
3929n/a os.close(r)
3930n/a # This is deliberate. If we didn't close the file descriptor
3931n/a # before closing wio, wio would try to flush its internal
3932n/a # buffer, and could block (in case of failure).
3933n/a try:
3934n/a wio.close()
3935n/a except OSError as e:
3936n/a if e.errno != errno.EBADF:
3937n/a raise
3938n/a
3939n/a def test_interrupted_write_retry_buffered(self):
3940n/a self.check_interrupted_write_retry(b"x", mode="wb")
3941n/a
3942n/a def test_interrupted_write_retry_text(self):
3943n/a self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3944n/a
3945n/a
3946n/aclass CSignalsTest(SignalsTest):
3947n/a io = io
3948n/a
3949n/aclass PySignalsTest(SignalsTest):
3950n/a io = pyio
3951n/a
3952n/a # Handling reentrancy issues would slow down _pyio even more, so the
3953n/a # tests are disabled.
3954n/a test_reentrant_write_buffered = None
3955n/a test_reentrant_write_text = None
3956n/a
3957n/a
3958n/adef load_tests(*args):
3959n/a tests = (CIOTest, PyIOTest, APIMismatchTest,
3960n/a CBufferedReaderTest, PyBufferedReaderTest,
3961n/a CBufferedWriterTest, PyBufferedWriterTest,
3962n/a CBufferedRWPairTest, PyBufferedRWPairTest,
3963n/a CBufferedRandomTest, PyBufferedRandomTest,
3964n/a StatefulIncrementalDecoderTest,
3965n/a CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3966n/a CTextIOWrapperTest, PyTextIOWrapperTest,
3967n/a CMiscIOTest, PyMiscIOTest,
3968n/a CSignalsTest, PySignalsTest,
3969n/a )
3970n/a
3971n/a # Put the namespaces of the IO module we are testing and some useful mock
3972n/a # classes in the __dict__ of each test.
3973n/a mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
3974n/a MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
3975n/a all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3976n/a c_io_ns = {name : getattr(io, name) for name in all_members}
3977n/a py_io_ns = {name : getattr(pyio, name) for name in all_members}
3978n/a globs = globals()
3979n/a c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3980n/a py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3981n/a # Avoid turning open into a bound method.
3982n/a py_io_ns["open"] = pyio.OpenWrapper
3983n/a for test in tests:
3984n/a if test.__name__.startswith("C"):
3985n/a for name, obj in c_io_ns.items():
3986n/a setattr(test, name, obj)
3987n/a elif test.__name__.startswith("Py"):
3988n/a for name, obj in py_io_ns.items():
3989n/a setattr(test, name, obj)
3990n/a
3991n/a suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3992n/a return suite
3993n/a
3994n/aif __name__ == "__main__":
3995n/a unittest.main()