ยปCore Development>Code coverage>Lib/test/test_fileinput.py

Python code coverage for Lib/test/test_fileinput.py

#countcontent
1n/a'''
2n/aTests for fileinput module.
3n/aNick Mathewson
4n/a'''
5n/aimport os
6n/aimport sys
7n/aimport re
8n/aimport fileinput
9n/aimport collections
10n/aimport builtins
11n/aimport unittest
12n/a
13n/atry:
14n/a import bz2
15n/aexcept ImportError:
16n/a bz2 = None
17n/atry:
18n/a import gzip
19n/aexcept ImportError:
20n/a gzip = None
21n/a
22n/afrom io import BytesIO, StringIO
23n/afrom fileinput import FileInput, hook_encoded
24n/a
25n/afrom test.support import verbose, TESTFN, check_warnings
26n/afrom test.support import unlink as safe_unlink
27n/afrom test import support
28n/afrom unittest import mock
29n/a
30n/a
31n/a# The fileinput module has 2 interfaces: the FileInput class which does
32n/a# all the work, and a few functions (input, etc.) that use a global _state
33n/a# variable.
34n/a
35n/a# Write lines (a list of lines) to temp file number i, and return the
36n/a# temp file's name.
37n/adef writeTmp(i, lines, mode='w'): # opening in text mode is the default
38n/a name = TESTFN + str(i)
39n/a f = open(name, mode)
40n/a for line in lines:
41n/a f.write(line)
42n/a f.close()
43n/a return name
44n/a
45n/adef remove_tempfiles(*names):
46n/a for name in names:
47n/a if name:
48n/a safe_unlink(name)
49n/a
50n/aclass LineReader:
51n/a
52n/a def __init__(self):
53n/a self._linesread = []
54n/a
55n/a @property
56n/a def linesread(self):
57n/a try:
58n/a return self._linesread[:]
59n/a finally:
60n/a self._linesread = []
61n/a
62n/a def openhook(self, filename, mode):
63n/a self.it = iter(filename.splitlines(True))
64n/a return self
65n/a
66n/a def readline(self, size=None):
67n/a line = next(self.it, '')
68n/a self._linesread.append(line)
69n/a return line
70n/a
71n/a def readlines(self, hint=-1):
72n/a lines = []
73n/a size = 0
74n/a while True:
75n/a line = self.readline()
76n/a if not line:
77n/a return lines
78n/a lines.append(line)
79n/a size += len(line)
80n/a if size >= hint:
81n/a return lines
82n/a
83n/a def close(self):
84n/a pass
85n/a
86n/aclass BufferSizesTests(unittest.TestCase):
87n/a def test_buffer_sizes(self):
88n/a # First, run the tests with default and teeny buffer size.
89n/a for round, bs in (0, 0), (1, 30):
90n/a t1 = t2 = t3 = t4 = None
91n/a try:
92n/a t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
93n/a t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
94n/a t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
95n/a t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
96n/a if bs:
97n/a with self.assertWarns(DeprecationWarning):
98n/a self.buffer_size_test(t1, t2, t3, t4, bs, round)
99n/a else:
100n/a self.buffer_size_test(t1, t2, t3, t4, bs, round)
101n/a finally:
102n/a remove_tempfiles(t1, t2, t3, t4)
103n/a
104n/a def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
105n/a pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
106n/a
107n/a start = 1 + round*6
108n/a if verbose:
109n/a print('%s. Simple iteration (bs=%s)' % (start+0, bs))
110n/a fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
111n/a lines = list(fi)
112n/a fi.close()
113n/a self.assertEqual(len(lines), 31)
114n/a self.assertEqual(lines[4], 'Line 5 of file 1\n')
115n/a self.assertEqual(lines[30], 'Line 1 of file 4\n')
116n/a self.assertEqual(fi.lineno(), 31)
117n/a self.assertEqual(fi.filename(), t4)
118n/a
119n/a if verbose:
120n/a print('%s. Status variables (bs=%s)' % (start+1, bs))
121n/a fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
122n/a s = "x"
123n/a while s and s != 'Line 6 of file 2\n':
124n/a s = fi.readline()
125n/a self.assertEqual(fi.filename(), t2)
126n/a self.assertEqual(fi.lineno(), 21)
127n/a self.assertEqual(fi.filelineno(), 6)
128n/a self.assertFalse(fi.isfirstline())
129n/a self.assertFalse(fi.isstdin())
130n/a
131n/a if verbose:
132n/a print('%s. Nextfile (bs=%s)' % (start+2, bs))
133n/a fi.nextfile()
134n/a self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
135n/a self.assertEqual(fi.lineno(), 22)
136n/a fi.close()
137n/a
138n/a if verbose:
139n/a print('%s. Stdin (bs=%s)' % (start+3, bs))
140n/a fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
141n/a savestdin = sys.stdin
142n/a try:
143n/a sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
144n/a lines = list(fi)
145n/a self.assertEqual(len(lines), 33)
146n/a self.assertEqual(lines[32], 'Line 2 of stdin\n')
147n/a self.assertEqual(fi.filename(), '<stdin>')
148n/a fi.nextfile()
149n/a finally:
150n/a sys.stdin = savestdin
151n/a
152n/a if verbose:
153n/a print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
154n/a fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
155n/a self.assertEqual(fi.lineno(), 0)
156n/a self.assertEqual(fi.filename(), None)
157n/a fi.nextfile()
158n/a self.assertEqual(fi.lineno(), 0)
159n/a self.assertEqual(fi.filename(), None)
160n/a
161n/a if verbose:
162n/a print('%s. Inplace (bs=%s)' % (start+5, bs))
163n/a savestdout = sys.stdout
164n/a try:
165n/a fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
166n/a for line in fi:
167n/a line = line[:-1].upper()
168n/a print(line)
169n/a fi.close()
170n/a finally:
171n/a sys.stdout = savestdout
172n/a
173n/a fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
174n/a for line in fi:
175n/a self.assertEqual(line[-1], '\n')
176n/a m = pat.match(line[:-1])
177n/a self.assertNotEqual(m, None)
178n/a self.assertEqual(int(m.group(1)), fi.filelineno())
179n/a fi.close()
180n/a
181n/aclass UnconditionallyRaise:
182n/a def __init__(self, exception_type):
183n/a self.exception_type = exception_type
184n/a self.invoked = False
185n/a def __call__(self, *args, **kwargs):
186n/a self.invoked = True
187n/a raise self.exception_type()
188n/a
189n/aclass FileInputTests(unittest.TestCase):
190n/a
191n/a def test_zero_byte_files(self):
192n/a t1 = t2 = t3 = t4 = None
193n/a try:
194n/a t1 = writeTmp(1, [""])
195n/a t2 = writeTmp(2, [""])
196n/a t3 = writeTmp(3, ["The only line there is.\n"])
197n/a t4 = writeTmp(4, [""])
198n/a fi = FileInput(files=(t1, t2, t3, t4))
199n/a
200n/a line = fi.readline()
201n/a self.assertEqual(line, 'The only line there is.\n')
202n/a self.assertEqual(fi.lineno(), 1)
203n/a self.assertEqual(fi.filelineno(), 1)
204n/a self.assertEqual(fi.filename(), t3)
205n/a
206n/a line = fi.readline()
207n/a self.assertFalse(line)
208n/a self.assertEqual(fi.lineno(), 1)
209n/a self.assertEqual(fi.filelineno(), 0)
210n/a self.assertEqual(fi.filename(), t4)
211n/a fi.close()
212n/a finally:
213n/a remove_tempfiles(t1, t2, t3, t4)
214n/a
215n/a def test_files_that_dont_end_with_newline(self):
216n/a t1 = t2 = None
217n/a try:
218n/a t1 = writeTmp(1, ["A\nB\nC"])
219n/a t2 = writeTmp(2, ["D\nE\nF"])
220n/a fi = FileInput(files=(t1, t2))
221n/a lines = list(fi)
222n/a self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
223n/a self.assertEqual(fi.filelineno(), 3)
224n/a self.assertEqual(fi.lineno(), 6)
225n/a finally:
226n/a remove_tempfiles(t1, t2)
227n/a
228n/a## def test_unicode_filenames(self):
229n/a## # XXX A unicode string is always returned by writeTmp.
230n/a## # So is this needed?
231n/a## try:
232n/a## t1 = writeTmp(1, ["A\nB"])
233n/a## encoding = sys.getfilesystemencoding()
234n/a## if encoding is None:
235n/a## encoding = 'ascii'
236n/a## fi = FileInput(files=str(t1, encoding))
237n/a## lines = list(fi)
238n/a## self.assertEqual(lines, ["A\n", "B"])
239n/a## finally:
240n/a## remove_tempfiles(t1)
241n/a
242n/a def test_fileno(self):
243n/a t1 = t2 = None
244n/a try:
245n/a t1 = writeTmp(1, ["A\nB"])
246n/a t2 = writeTmp(2, ["C\nD"])
247n/a fi = FileInput(files=(t1, t2))
248n/a self.assertEqual(fi.fileno(), -1)
249n/a line =next( fi)
250n/a self.assertNotEqual(fi.fileno(), -1)
251n/a fi.nextfile()
252n/a self.assertEqual(fi.fileno(), -1)
253n/a line = list(fi)
254n/a self.assertEqual(fi.fileno(), -1)
255n/a finally:
256n/a remove_tempfiles(t1, t2)
257n/a
258n/a def test_opening_mode(self):
259n/a try:
260n/a # invalid mode, should raise ValueError
261n/a fi = FileInput(mode="w")
262n/a self.fail("FileInput should reject invalid mode argument")
263n/a except ValueError:
264n/a pass
265n/a t1 = None
266n/a try:
267n/a # try opening in universal newline mode
268n/a t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
269n/a with check_warnings(('', DeprecationWarning)):
270n/a fi = FileInput(files=t1, mode="U")
271n/a with check_warnings(('', DeprecationWarning)):
272n/a lines = list(fi)
273n/a self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
274n/a finally:
275n/a remove_tempfiles(t1)
276n/a
277n/a def test_stdin_binary_mode(self):
278n/a with mock.patch('sys.stdin') as m_stdin:
279n/a m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
280n/a fi = FileInput(files=['-'], mode='rb')
281n/a lines = list(fi)
282n/a self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
283n/a
284n/a def test_detached_stdin_binary_mode(self):
285n/a orig_stdin = sys.stdin
286n/a try:
287n/a sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
288n/a self.assertFalse(hasattr(sys.stdin, 'buffer'))
289n/a fi = FileInput(files=['-'], mode='rb')
290n/a lines = list(fi)
291n/a self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
292n/a finally:
293n/a sys.stdin = orig_stdin
294n/a
295n/a def test_file_opening_hook(self):
296n/a try:
297n/a # cannot use openhook and inplace mode
298n/a fi = FileInput(inplace=1, openhook=lambda f, m: None)
299n/a self.fail("FileInput should raise if both inplace "
300n/a "and openhook arguments are given")
301n/a except ValueError:
302n/a pass
303n/a try:
304n/a fi = FileInput(openhook=1)
305n/a self.fail("FileInput should check openhook for being callable")
306n/a except ValueError:
307n/a pass
308n/a
309n/a class CustomOpenHook:
310n/a def __init__(self):
311n/a self.invoked = False
312n/a def __call__(self, *args):
313n/a self.invoked = True
314n/a return open(*args)
315n/a
316n/a t = writeTmp(1, ["\n"])
317n/a self.addCleanup(remove_tempfiles, t)
318n/a custom_open_hook = CustomOpenHook()
319n/a with FileInput([t], openhook=custom_open_hook) as fi:
320n/a fi.readline()
321n/a self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
322n/a
323n/a def test_readline(self):
324n/a with open(TESTFN, 'wb') as f:
325n/a f.write(b'A\nB\r\nC\r')
326n/a # Fill TextIOWrapper buffer.
327n/a f.write(b'123456789\n' * 1000)
328n/a # Issue #20501: readline() shouldn't read whole file.
329n/a f.write(b'\x80')
330n/a self.addCleanup(safe_unlink, TESTFN)
331n/a
332n/a with FileInput(files=TESTFN,
333n/a openhook=hook_encoded('ascii')) as fi:
334n/a try:
335n/a self.assertEqual(fi.readline(), 'A\n')
336n/a self.assertEqual(fi.readline(), 'B\n')
337n/a self.assertEqual(fi.readline(), 'C\n')
338n/a except UnicodeDecodeError:
339n/a self.fail('Read to end of file')
340n/a with self.assertRaises(UnicodeDecodeError):
341n/a # Read to the end of file.
342n/a list(fi)
343n/a self.assertEqual(fi.readline(), '')
344n/a self.assertEqual(fi.readline(), '')
345n/a
346n/a def test_readline_binary_mode(self):
347n/a with open(TESTFN, 'wb') as f:
348n/a f.write(b'A\nB\r\nC\rD')
349n/a self.addCleanup(safe_unlink, TESTFN)
350n/a
351n/a with FileInput(files=TESTFN, mode='rb') as fi:
352n/a self.assertEqual(fi.readline(), b'A\n')
353n/a self.assertEqual(fi.readline(), b'B\r\n')
354n/a self.assertEqual(fi.readline(), b'C\rD')
355n/a # Read to the end of file.
356n/a self.assertEqual(fi.readline(), b'')
357n/a self.assertEqual(fi.readline(), b'')
358n/a
359n/a def test_context_manager(self):
360n/a try:
361n/a t1 = writeTmp(1, ["A\nB\nC"])
362n/a t2 = writeTmp(2, ["D\nE\nF"])
363n/a with FileInput(files=(t1, t2)) as fi:
364n/a lines = list(fi)
365n/a self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
366n/a self.assertEqual(fi.filelineno(), 3)
367n/a self.assertEqual(fi.lineno(), 6)
368n/a self.assertEqual(fi._files, ())
369n/a finally:
370n/a remove_tempfiles(t1, t2)
371n/a
372n/a def test_close_on_exception(self):
373n/a try:
374n/a t1 = writeTmp(1, [""])
375n/a with FileInput(files=t1) as fi:
376n/a raise OSError
377n/a except OSError:
378n/a self.assertEqual(fi._files, ())
379n/a finally:
380n/a remove_tempfiles(t1)
381n/a
382n/a def test_empty_files_list_specified_to_constructor(self):
383n/a with FileInput(files=[]) as fi:
384n/a self.assertEqual(fi._files, ('-',))
385n/a
386n/a def test__getitem__(self):
387n/a """Tests invoking FileInput.__getitem__() with the current
388n/a line number"""
389n/a t = writeTmp(1, ["line1\n", "line2\n"])
390n/a self.addCleanup(remove_tempfiles, t)
391n/a with FileInput(files=[t]) as fi:
392n/a retval1 = fi[0]
393n/a self.assertEqual(retval1, "line1\n")
394n/a retval2 = fi[1]
395n/a self.assertEqual(retval2, "line2\n")
396n/a
397n/a def test__getitem__invalid_key(self):
398n/a """Tests invoking FileInput.__getitem__() with an index unequal to
399n/a the line number"""
400n/a t = writeTmp(1, ["line1\n", "line2\n"])
401n/a self.addCleanup(remove_tempfiles, t)
402n/a with FileInput(files=[t]) as fi:
403n/a with self.assertRaises(RuntimeError) as cm:
404n/a fi[1]
405n/a self.assertEqual(cm.exception.args, ("accessing lines out of order",))
406n/a
407n/a def test__getitem__eof(self):
408n/a """Tests invoking FileInput.__getitem__() with the line number but at
409n/a end-of-input"""
410n/a t = writeTmp(1, [])
411n/a self.addCleanup(remove_tempfiles, t)
412n/a with FileInput(files=[t]) as fi:
413n/a with self.assertRaises(IndexError) as cm:
414n/a fi[0]
415n/a self.assertEqual(cm.exception.args, ("end of input reached",))
416n/a
417n/a def test_nextfile_oserror_deleting_backup(self):
418n/a """Tests invoking FileInput.nextfile() when the attempt to delete
419n/a the backup file would raise OSError. This error is expected to be
420n/a silently ignored"""
421n/a
422n/a os_unlink_orig = os.unlink
423n/a os_unlink_replacement = UnconditionallyRaise(OSError)
424n/a try:
425n/a t = writeTmp(1, ["\n"])
426n/a self.addCleanup(remove_tempfiles, t)
427n/a with FileInput(files=[t], inplace=True) as fi:
428n/a next(fi) # make sure the file is opened
429n/a os.unlink = os_unlink_replacement
430n/a fi.nextfile()
431n/a finally:
432n/a os.unlink = os_unlink_orig
433n/a
434n/a # sanity check to make sure that our test scenario was actually hit
435n/a self.assertTrue(os_unlink_replacement.invoked,
436n/a "os.unlink() was not invoked")
437n/a
438n/a def test_readline_os_fstat_raises_OSError(self):
439n/a """Tests invoking FileInput.readline() when os.fstat() raises OSError.
440n/a This exception should be silently discarded."""
441n/a
442n/a os_fstat_orig = os.fstat
443n/a os_fstat_replacement = UnconditionallyRaise(OSError)
444n/a try:
445n/a t = writeTmp(1, ["\n"])
446n/a self.addCleanup(remove_tempfiles, t)
447n/a with FileInput(files=[t], inplace=True) as fi:
448n/a os.fstat = os_fstat_replacement
449n/a fi.readline()
450n/a finally:
451n/a os.fstat = os_fstat_orig
452n/a
453n/a # sanity check to make sure that our test scenario was actually hit
454n/a self.assertTrue(os_fstat_replacement.invoked,
455n/a "os.fstat() was not invoked")
456n/a
457n/a @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
458n/a def test_readline_os_chmod_raises_OSError(self):
459n/a """Tests invoking FileInput.readline() when os.chmod() raises OSError.
460n/a This exception should be silently discarded."""
461n/a
462n/a os_chmod_orig = os.chmod
463n/a os_chmod_replacement = UnconditionallyRaise(OSError)
464n/a try:
465n/a t = writeTmp(1, ["\n"])
466n/a self.addCleanup(remove_tempfiles, t)
467n/a with FileInput(files=[t], inplace=True) as fi:
468n/a os.chmod = os_chmod_replacement
469n/a fi.readline()
470n/a finally:
471n/a os.chmod = os_chmod_orig
472n/a
473n/a # sanity check to make sure that our test scenario was actually hit
474n/a self.assertTrue(os_chmod_replacement.invoked,
475n/a "os.fstat() was not invoked")
476n/a
477n/a def test_fileno_when_ValueError_raised(self):
478n/a class FilenoRaisesValueError(UnconditionallyRaise):
479n/a def __init__(self):
480n/a UnconditionallyRaise.__init__(self, ValueError)
481n/a def fileno(self):
482n/a self.__call__()
483n/a
484n/a unconditionally_raise_ValueError = FilenoRaisesValueError()
485n/a t = writeTmp(1, ["\n"])
486n/a self.addCleanup(remove_tempfiles, t)
487n/a with FileInput(files=[t]) as fi:
488n/a file_backup = fi._file
489n/a try:
490n/a fi._file = unconditionally_raise_ValueError
491n/a result = fi.fileno()
492n/a finally:
493n/a fi._file = file_backup # make sure the file gets cleaned up
494n/a
495n/a # sanity check to make sure that our test scenario was actually hit
496n/a self.assertTrue(unconditionally_raise_ValueError.invoked,
497n/a "_file.fileno() was not invoked")
498n/a
499n/a self.assertEqual(result, -1, "fileno() should return -1")
500n/a
501n/a def test_readline_buffering(self):
502n/a src = LineReader()
503n/a with FileInput(files=['line1\nline2', 'line3\n'],
504n/a openhook=src.openhook) as fi:
505n/a self.assertEqual(src.linesread, [])
506n/a self.assertEqual(fi.readline(), 'line1\n')
507n/a self.assertEqual(src.linesread, ['line1\n'])
508n/a self.assertEqual(fi.readline(), 'line2')
509n/a self.assertEqual(src.linesread, ['line2'])
510n/a self.assertEqual(fi.readline(), 'line3\n')
511n/a self.assertEqual(src.linesread, ['', 'line3\n'])
512n/a self.assertEqual(fi.readline(), '')
513n/a self.assertEqual(src.linesread, [''])
514n/a self.assertEqual(fi.readline(), '')
515n/a self.assertEqual(src.linesread, [])
516n/a
517n/a def test_iteration_buffering(self):
518n/a src = LineReader()
519n/a with FileInput(files=['line1\nline2', 'line3\n'],
520n/a openhook=src.openhook) as fi:
521n/a self.assertEqual(src.linesread, [])
522n/a self.assertEqual(next(fi), 'line1\n')
523n/a self.assertEqual(src.linesread, ['line1\n'])
524n/a self.assertEqual(next(fi), 'line2')
525n/a self.assertEqual(src.linesread, ['line2'])
526n/a self.assertEqual(next(fi), 'line3\n')
527n/a self.assertEqual(src.linesread, ['', 'line3\n'])
528n/a self.assertRaises(StopIteration, next, fi)
529n/a self.assertEqual(src.linesread, [''])
530n/a self.assertRaises(StopIteration, next, fi)
531n/a self.assertEqual(src.linesread, [])
532n/a
533n/aclass MockFileInput:
534n/a """A class that mocks out fileinput.FileInput for use during unit tests"""
535n/a
536n/a def __init__(self, files=None, inplace=False, backup="", bufsize=0,
537n/a mode="r", openhook=None):
538n/a self.files = files
539n/a self.inplace = inplace
540n/a self.backup = backup
541n/a self.bufsize = bufsize
542n/a self.mode = mode
543n/a self.openhook = openhook
544n/a self._file = None
545n/a self.invocation_counts = collections.defaultdict(lambda: 0)
546n/a self.return_values = {}
547n/a
548n/a def close(self):
549n/a self.invocation_counts["close"] += 1
550n/a
551n/a def nextfile(self):
552n/a self.invocation_counts["nextfile"] += 1
553n/a return self.return_values["nextfile"]
554n/a
555n/a def filename(self):
556n/a self.invocation_counts["filename"] += 1
557n/a return self.return_values["filename"]
558n/a
559n/a def lineno(self):
560n/a self.invocation_counts["lineno"] += 1
561n/a return self.return_values["lineno"]
562n/a
563n/a def filelineno(self):
564n/a self.invocation_counts["filelineno"] += 1
565n/a return self.return_values["filelineno"]
566n/a
567n/a def fileno(self):
568n/a self.invocation_counts["fileno"] += 1
569n/a return self.return_values["fileno"]
570n/a
571n/a def isfirstline(self):
572n/a self.invocation_counts["isfirstline"] += 1
573n/a return self.return_values["isfirstline"]
574n/a
575n/a def isstdin(self):
576n/a self.invocation_counts["isstdin"] += 1
577n/a return self.return_values["isstdin"]
578n/a
579n/aclass BaseFileInputGlobalMethodsTest(unittest.TestCase):
580n/a """Base class for unit tests for the global function of
581n/a the fileinput module."""
582n/a
583n/a def setUp(self):
584n/a self._orig_state = fileinput._state
585n/a self._orig_FileInput = fileinput.FileInput
586n/a fileinput.FileInput = MockFileInput
587n/a
588n/a def tearDown(self):
589n/a fileinput.FileInput = self._orig_FileInput
590n/a fileinput._state = self._orig_state
591n/a
592n/a def assertExactlyOneInvocation(self, mock_file_input, method_name):
593n/a # assert that the method with the given name was invoked once
594n/a actual_count = mock_file_input.invocation_counts[method_name]
595n/a self.assertEqual(actual_count, 1, method_name)
596n/a # assert that no other unexpected methods were invoked
597n/a actual_total_count = len(mock_file_input.invocation_counts)
598n/a self.assertEqual(actual_total_count, 1)
599n/a
600n/aclass Test_fileinput_input(BaseFileInputGlobalMethodsTest):
601n/a """Unit tests for fileinput.input()"""
602n/a
603n/a def test_state_is_not_None_and_state_file_is_not_None(self):
604n/a """Tests invoking fileinput.input() when fileinput._state is not None
605n/a and its _file attribute is also not None. Expect RuntimeError to
606n/a be raised with a meaningful error message and for fileinput._state
607n/a to *not* be modified."""
608n/a instance = MockFileInput()
609n/a instance._file = object()
610n/a fileinput._state = instance
611n/a with self.assertRaises(RuntimeError) as cm:
612n/a fileinput.input()
613n/a self.assertEqual(("input() already active",), cm.exception.args)
614n/a self.assertIs(instance, fileinput._state, "fileinput._state")
615n/a
616n/a def test_state_is_not_None_and_state_file_is_None(self):
617n/a """Tests invoking fileinput.input() when fileinput._state is not None
618n/a but its _file attribute *is* None. Expect it to create and return
619n/a a new fileinput.FileInput object with all method parameters passed
620n/a explicitly to the __init__() method; also ensure that
621n/a fileinput._state is set to the returned instance."""
622n/a instance = MockFileInput()
623n/a instance._file = None
624n/a fileinput._state = instance
625n/a self.do_test_call_input()
626n/a
627n/a def test_state_is_None(self):
628n/a """Tests invoking fileinput.input() when fileinput._state is None
629n/a Expect it to create and return a new fileinput.FileInput object
630n/a with all method parameters passed explicitly to the __init__()
631n/a method; also ensure that fileinput._state is set to the returned
632n/a instance."""
633n/a fileinput._state = None
634n/a self.do_test_call_input()
635n/a
636n/a def do_test_call_input(self):
637n/a """Tests that fileinput.input() creates a new fileinput.FileInput
638n/a object, passing the given parameters unmodified to
639n/a fileinput.FileInput.__init__(). Note that this test depends on the
640n/a monkey patching of fileinput.FileInput done by setUp()."""
641n/a files = object()
642n/a inplace = object()
643n/a backup = object()
644n/a bufsize = object()
645n/a mode = object()
646n/a openhook = object()
647n/a
648n/a # call fileinput.input() with different values for each argument
649n/a result = fileinput.input(files=files, inplace=inplace, backup=backup,
650n/a bufsize=bufsize,
651n/a mode=mode, openhook=openhook)
652n/a
653n/a # ensure fileinput._state was set to the returned object
654n/a self.assertIs(result, fileinput._state, "fileinput._state")
655n/a
656n/a # ensure the parameters to fileinput.input() were passed directly
657n/a # to FileInput.__init__()
658n/a self.assertIs(files, result.files, "files")
659n/a self.assertIs(inplace, result.inplace, "inplace")
660n/a self.assertIs(backup, result.backup, "backup")
661n/a self.assertIs(bufsize, result.bufsize, "bufsize")
662n/a self.assertIs(mode, result.mode, "mode")
663n/a self.assertIs(openhook, result.openhook, "openhook")
664n/a
665n/aclass Test_fileinput_close(BaseFileInputGlobalMethodsTest):
666n/a """Unit tests for fileinput.close()"""
667n/a
668n/a def test_state_is_None(self):
669n/a """Tests that fileinput.close() does nothing if fileinput._state
670n/a is None"""
671n/a fileinput._state = None
672n/a fileinput.close()
673n/a self.assertIsNone(fileinput._state)
674n/a
675n/a def test_state_is_not_None(self):
676n/a """Tests that fileinput.close() invokes close() on fileinput._state
677n/a and sets _state=None"""
678n/a instance = MockFileInput()
679n/a fileinput._state = instance
680n/a fileinput.close()
681n/a self.assertExactlyOneInvocation(instance, "close")
682n/a self.assertIsNone(fileinput._state)
683n/a
684n/aclass Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
685n/a """Unit tests for fileinput.nextfile()"""
686n/a
687n/a def test_state_is_None(self):
688n/a """Tests fileinput.nextfile() when fileinput._state is None.
689n/a Ensure that it raises RuntimeError with a meaningful error message
690n/a and does not modify fileinput._state"""
691n/a fileinput._state = None
692n/a with self.assertRaises(RuntimeError) as cm:
693n/a fileinput.nextfile()
694n/a self.assertEqual(("no active input()",), cm.exception.args)
695n/a self.assertIsNone(fileinput._state)
696n/a
697n/a def test_state_is_not_None(self):
698n/a """Tests fileinput.nextfile() when fileinput._state is not None.
699n/a Ensure that it invokes fileinput._state.nextfile() exactly once,
700n/a returns whatever it returns, and does not modify fileinput._state
701n/a to point to a different object."""
702n/a nextfile_retval = object()
703n/a instance = MockFileInput()
704n/a instance.return_values["nextfile"] = nextfile_retval
705n/a fileinput._state = instance
706n/a retval = fileinput.nextfile()
707n/a self.assertExactlyOneInvocation(instance, "nextfile")
708n/a self.assertIs(retval, nextfile_retval)
709n/a self.assertIs(fileinput._state, instance)
710n/a
711n/aclass Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
712n/a """Unit tests for fileinput.filename()"""
713n/a
714n/a def test_state_is_None(self):
715n/a """Tests fileinput.filename() when fileinput._state is None.
716n/a Ensure that it raises RuntimeError with a meaningful error message
717n/a and does not modify fileinput._state"""
718n/a fileinput._state = None
719n/a with self.assertRaises(RuntimeError) as cm:
720n/a fileinput.filename()
721n/a self.assertEqual(("no active input()",), cm.exception.args)
722n/a self.assertIsNone(fileinput._state)
723n/a
724n/a def test_state_is_not_None(self):
725n/a """Tests fileinput.filename() when fileinput._state is not None.
726n/a Ensure that it invokes fileinput._state.filename() exactly once,
727n/a returns whatever it returns, and does not modify fileinput._state
728n/a to point to a different object."""
729n/a filename_retval = object()
730n/a instance = MockFileInput()
731n/a instance.return_values["filename"] = filename_retval
732n/a fileinput._state = instance
733n/a retval = fileinput.filename()
734n/a self.assertExactlyOneInvocation(instance, "filename")
735n/a self.assertIs(retval, filename_retval)
736n/a self.assertIs(fileinput._state, instance)
737n/a
738n/aclass Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
739n/a """Unit tests for fileinput.lineno()"""
740n/a
741n/a def test_state_is_None(self):
742n/a """Tests fileinput.lineno() when fileinput._state is None.
743n/a Ensure that it raises RuntimeError with a meaningful error message
744n/a and does not modify fileinput._state"""
745n/a fileinput._state = None
746n/a with self.assertRaises(RuntimeError) as cm:
747n/a fileinput.lineno()
748n/a self.assertEqual(("no active input()",), cm.exception.args)
749n/a self.assertIsNone(fileinput._state)
750n/a
751n/a def test_state_is_not_None(self):
752n/a """Tests fileinput.lineno() when fileinput._state is not None.
753n/a Ensure that it invokes fileinput._state.lineno() exactly once,
754n/a returns whatever it returns, and does not modify fileinput._state
755n/a to point to a different object."""
756n/a lineno_retval = object()
757n/a instance = MockFileInput()
758n/a instance.return_values["lineno"] = lineno_retval
759n/a fileinput._state = instance
760n/a retval = fileinput.lineno()
761n/a self.assertExactlyOneInvocation(instance, "lineno")
762n/a self.assertIs(retval, lineno_retval)
763n/a self.assertIs(fileinput._state, instance)
764n/a
765n/aclass Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
766n/a """Unit tests for fileinput.filelineno()"""
767n/a
768n/a def test_state_is_None(self):
769n/a """Tests fileinput.filelineno() when fileinput._state is None.
770n/a Ensure that it raises RuntimeError with a meaningful error message
771n/a and does not modify fileinput._state"""
772n/a fileinput._state = None
773n/a with self.assertRaises(RuntimeError) as cm:
774n/a fileinput.filelineno()
775n/a self.assertEqual(("no active input()",), cm.exception.args)
776n/a self.assertIsNone(fileinput._state)
777n/a
778n/a def test_state_is_not_None(self):
779n/a """Tests fileinput.filelineno() when fileinput._state is not None.
780n/a Ensure that it invokes fileinput._state.filelineno() exactly once,
781n/a returns whatever it returns, and does not modify fileinput._state
782n/a to point to a different object."""
783n/a filelineno_retval = object()
784n/a instance = MockFileInput()
785n/a instance.return_values["filelineno"] = filelineno_retval
786n/a fileinput._state = instance
787n/a retval = fileinput.filelineno()
788n/a self.assertExactlyOneInvocation(instance, "filelineno")
789n/a self.assertIs(retval, filelineno_retval)
790n/a self.assertIs(fileinput._state, instance)
791n/a
792n/aclass Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
793n/a """Unit tests for fileinput.fileno()"""
794n/a
795n/a def test_state_is_None(self):
796n/a """Tests fileinput.fileno() when fileinput._state is None.
797n/a Ensure that it raises RuntimeError with a meaningful error message
798n/a and does not modify fileinput._state"""
799n/a fileinput._state = None
800n/a with self.assertRaises(RuntimeError) as cm:
801n/a fileinput.fileno()
802n/a self.assertEqual(("no active input()",), cm.exception.args)
803n/a self.assertIsNone(fileinput._state)
804n/a
805n/a def test_state_is_not_None(self):
806n/a """Tests fileinput.fileno() when fileinput._state is not None.
807n/a Ensure that it invokes fileinput._state.fileno() exactly once,
808n/a returns whatever it returns, and does not modify fileinput._state
809n/a to point to a different object."""
810n/a fileno_retval = object()
811n/a instance = MockFileInput()
812n/a instance.return_values["fileno"] = fileno_retval
813n/a instance.fileno_retval = fileno_retval
814n/a fileinput._state = instance
815n/a retval = fileinput.fileno()
816n/a self.assertExactlyOneInvocation(instance, "fileno")
817n/a self.assertIs(retval, fileno_retval)
818n/a self.assertIs(fileinput._state, instance)
819n/a
820n/aclass Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
821n/a """Unit tests for fileinput.isfirstline()"""
822n/a
823n/a def test_state_is_None(self):
824n/a """Tests fileinput.isfirstline() when fileinput._state is None.
825n/a Ensure that it raises RuntimeError with a meaningful error message
826n/a and does not modify fileinput._state"""
827n/a fileinput._state = None
828n/a with self.assertRaises(RuntimeError) as cm:
829n/a fileinput.isfirstline()
830n/a self.assertEqual(("no active input()",), cm.exception.args)
831n/a self.assertIsNone(fileinput._state)
832n/a
833n/a def test_state_is_not_None(self):
834n/a """Tests fileinput.isfirstline() when fileinput._state is not None.
835n/a Ensure that it invokes fileinput._state.isfirstline() exactly once,
836n/a returns whatever it returns, and does not modify fileinput._state
837n/a to point to a different object."""
838n/a isfirstline_retval = object()
839n/a instance = MockFileInput()
840n/a instance.return_values["isfirstline"] = isfirstline_retval
841n/a fileinput._state = instance
842n/a retval = fileinput.isfirstline()
843n/a self.assertExactlyOneInvocation(instance, "isfirstline")
844n/a self.assertIs(retval, isfirstline_retval)
845n/a self.assertIs(fileinput._state, instance)
846n/a
847n/aclass Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
848n/a """Unit tests for fileinput.isstdin()"""
849n/a
850n/a def test_state_is_None(self):
851n/a """Tests fileinput.isstdin() when fileinput._state is None.
852n/a Ensure that it raises RuntimeError with a meaningful error message
853n/a and does not modify fileinput._state"""
854n/a fileinput._state = None
855n/a with self.assertRaises(RuntimeError) as cm:
856n/a fileinput.isstdin()
857n/a self.assertEqual(("no active input()",), cm.exception.args)
858n/a self.assertIsNone(fileinput._state)
859n/a
860n/a def test_state_is_not_None(self):
861n/a """Tests fileinput.isstdin() when fileinput._state is not None.
862n/a Ensure that it invokes fileinput._state.isstdin() exactly once,
863n/a returns whatever it returns, and does not modify fileinput._state
864n/a to point to a different object."""
865n/a isstdin_retval = object()
866n/a instance = MockFileInput()
867n/a instance.return_values["isstdin"] = isstdin_retval
868n/a fileinput._state = instance
869n/a retval = fileinput.isstdin()
870n/a self.assertExactlyOneInvocation(instance, "isstdin")
871n/a self.assertIs(retval, isstdin_retval)
872n/a self.assertIs(fileinput._state, instance)
873n/a
874n/aclass InvocationRecorder:
875n/a def __init__(self):
876n/a self.invocation_count = 0
877n/a def __call__(self, *args, **kwargs):
878n/a self.invocation_count += 1
879n/a self.last_invocation = (args, kwargs)
880n/a
881n/aclass Test_hook_compressed(unittest.TestCase):
882n/a """Unit tests for fileinput.hook_compressed()"""
883n/a
884n/a def setUp(self):
885n/a self.fake_open = InvocationRecorder()
886n/a
887n/a def test_empty_string(self):
888n/a self.do_test_use_builtin_open("", 1)
889n/a
890n/a def test_no_ext(self):
891n/a self.do_test_use_builtin_open("abcd", 2)
892n/a
893n/a @unittest.skipUnless(gzip, "Requires gzip and zlib")
894n/a def test_gz_ext_fake(self):
895n/a original_open = gzip.open
896n/a gzip.open = self.fake_open
897n/a try:
898n/a result = fileinput.hook_compressed("test.gz", 3)
899n/a finally:
900n/a gzip.open = original_open
901n/a
902n/a self.assertEqual(self.fake_open.invocation_count, 1)
903n/a self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
904n/a
905n/a @unittest.skipUnless(bz2, "Requires bz2")
906n/a def test_bz2_ext_fake(self):
907n/a original_open = bz2.BZ2File
908n/a bz2.BZ2File = self.fake_open
909n/a try:
910n/a result = fileinput.hook_compressed("test.bz2", 4)
911n/a finally:
912n/a bz2.BZ2File = original_open
913n/a
914n/a self.assertEqual(self.fake_open.invocation_count, 1)
915n/a self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
916n/a
917n/a def test_blah_ext(self):
918n/a self.do_test_use_builtin_open("abcd.blah", 5)
919n/a
920n/a def test_gz_ext_builtin(self):
921n/a self.do_test_use_builtin_open("abcd.Gz", 6)
922n/a
923n/a def test_bz2_ext_builtin(self):
924n/a self.do_test_use_builtin_open("abcd.Bz2", 7)
925n/a
926n/a def do_test_use_builtin_open(self, filename, mode):
927n/a original_open = self.replace_builtin_open(self.fake_open)
928n/a try:
929n/a result = fileinput.hook_compressed(filename, mode)
930n/a finally:
931n/a self.replace_builtin_open(original_open)
932n/a
933n/a self.assertEqual(self.fake_open.invocation_count, 1)
934n/a self.assertEqual(self.fake_open.last_invocation,
935n/a ((filename, mode), {}))
936n/a
937n/a @staticmethod
938n/a def replace_builtin_open(new_open_func):
939n/a original_open = builtins.open
940n/a builtins.open = new_open_func
941n/a return original_open
942n/a
943n/aclass Test_hook_encoded(unittest.TestCase):
944n/a """Unit tests for fileinput.hook_encoded()"""
945n/a
946n/a def test(self):
947n/a encoding = object()
948n/a errors = object()
949n/a result = fileinput.hook_encoded(encoding, errors=errors)
950n/a
951n/a fake_open = InvocationRecorder()
952n/a original_open = builtins.open
953n/a builtins.open = fake_open
954n/a try:
955n/a filename = object()
956n/a mode = object()
957n/a open_result = result(filename, mode)
958n/a finally:
959n/a builtins.open = original_open
960n/a
961n/a self.assertEqual(fake_open.invocation_count, 1)
962n/a
963n/a args, kwargs = fake_open.last_invocation
964n/a self.assertIs(args[0], filename)
965n/a self.assertIs(args[1], mode)
966n/a self.assertIs(kwargs.pop('encoding'), encoding)
967n/a self.assertIs(kwargs.pop('errors'), errors)
968n/a self.assertFalse(kwargs)
969n/a
970n/a def test_errors(self):
971n/a with open(TESTFN, 'wb') as f:
972n/a f.write(b'\x80abc')
973n/a self.addCleanup(safe_unlink, TESTFN)
974n/a
975n/a def check(errors, expected_lines):
976n/a with FileInput(files=TESTFN, mode='r',
977n/a openhook=hook_encoded('utf-8', errors=errors)) as fi:
978n/a lines = list(fi)
979n/a self.assertEqual(lines, expected_lines)
980n/a
981n/a check('ignore', ['abc'])
982n/a with self.assertRaises(UnicodeDecodeError):
983n/a check('strict', ['abc'])
984n/a check('replace', ['\ufffdabc'])
985n/a check('backslashreplace', ['\\x80abc'])
986n/a
987n/a def test_modes(self):
988n/a with open(TESTFN, 'wb') as f:
989n/a # UTF-7 is a convenient, seldom used encoding
990n/a f.write(b'A\nB\r\nC\rD+IKw-')
991n/a self.addCleanup(safe_unlink, TESTFN)
992n/a
993n/a def check(mode, expected_lines):
994n/a with FileInput(files=TESTFN, mode=mode,
995n/a openhook=hook_encoded('utf-7')) as fi:
996n/a lines = list(fi)
997n/a self.assertEqual(lines, expected_lines)
998n/a
999n/a check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1000n/a with self.assertWarns(DeprecationWarning):
1001n/a check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1002n/a with self.assertWarns(DeprecationWarning):
1003n/a check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1004n/a with self.assertRaises(ValueError):
1005n/a check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
1006n/a
1007n/a
1008n/aclass MiscTest(unittest.TestCase):
1009n/a
1010n/a def test_all(self):
1011n/a support.check__all__(self, fileinput)
1012n/a
1013n/a
1014n/aif __name__ == "__main__":
1015n/a unittest.main()