1 | n/a | """Test largefile support on system where this makes sense. |
---|
2 | n/a | """ |
---|
3 | n/a | |
---|
4 | n/a | import os |
---|
5 | n/a | import stat |
---|
6 | n/a | import sys |
---|
7 | n/a | import unittest |
---|
8 | n/a | from test.support import TESTFN, requires, unlink |
---|
9 | n/a | import io # C implementation of io |
---|
10 | n/a | import _pyio as pyio # Python implementation of io |
---|
11 | n/a | |
---|
12 | n/a | # size of file to create (>2GB; 2GB == 2147483648 bytes) |
---|
13 | n/a | size = 2500000000 |
---|
14 | n/a | |
---|
15 | n/a | class LargeFileTest: |
---|
16 | n/a | """Test that each file function works as expected for large |
---|
17 | n/a | (i.e. > 2GB) files. |
---|
18 | n/a | """ |
---|
19 | n/a | |
---|
20 | n/a | def setUp(self): |
---|
21 | n/a | if os.path.exists(TESTFN): |
---|
22 | n/a | mode = 'r+b' |
---|
23 | n/a | else: |
---|
24 | n/a | mode = 'w+b' |
---|
25 | n/a | |
---|
26 | n/a | with self.open(TESTFN, mode) as f: |
---|
27 | n/a | current_size = os.fstat(f.fileno())[stat.ST_SIZE] |
---|
28 | n/a | if current_size == size+1: |
---|
29 | n/a | return |
---|
30 | n/a | |
---|
31 | n/a | if current_size == 0: |
---|
32 | n/a | f.write(b'z') |
---|
33 | n/a | |
---|
34 | n/a | f.seek(0) |
---|
35 | n/a | f.seek(size) |
---|
36 | n/a | f.write(b'a') |
---|
37 | n/a | f.flush() |
---|
38 | n/a | self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) |
---|
39 | n/a | |
---|
40 | n/a | @classmethod |
---|
41 | n/a | def tearDownClass(cls): |
---|
42 | n/a | with cls.open(TESTFN, 'wb'): |
---|
43 | n/a | pass |
---|
44 | n/a | if not os.stat(TESTFN)[stat.ST_SIZE] == 0: |
---|
45 | n/a | raise cls.failureException('File was not truncated by opening ' |
---|
46 | n/a | 'with mode "wb"') |
---|
47 | n/a | |
---|
48 | n/a | def test_osstat(self): |
---|
49 | n/a | self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) |
---|
50 | n/a | |
---|
51 | n/a | def test_seek_read(self): |
---|
52 | n/a | with self.open(TESTFN, 'rb') as f: |
---|
53 | n/a | self.assertEqual(f.tell(), 0) |
---|
54 | n/a | self.assertEqual(f.read(1), b'z') |
---|
55 | n/a | self.assertEqual(f.tell(), 1) |
---|
56 | n/a | f.seek(0) |
---|
57 | n/a | self.assertEqual(f.tell(), 0) |
---|
58 | n/a | f.seek(0, 0) |
---|
59 | n/a | self.assertEqual(f.tell(), 0) |
---|
60 | n/a | f.seek(42) |
---|
61 | n/a | self.assertEqual(f.tell(), 42) |
---|
62 | n/a | f.seek(42, 0) |
---|
63 | n/a | self.assertEqual(f.tell(), 42) |
---|
64 | n/a | f.seek(42, 1) |
---|
65 | n/a | self.assertEqual(f.tell(), 84) |
---|
66 | n/a | f.seek(0, 1) |
---|
67 | n/a | self.assertEqual(f.tell(), 84) |
---|
68 | n/a | f.seek(0, 2) # seek from the end |
---|
69 | n/a | self.assertEqual(f.tell(), size + 1 + 0) |
---|
70 | n/a | f.seek(-10, 2) |
---|
71 | n/a | self.assertEqual(f.tell(), size + 1 - 10) |
---|
72 | n/a | f.seek(-size-1, 2) |
---|
73 | n/a | self.assertEqual(f.tell(), 0) |
---|
74 | n/a | f.seek(size) |
---|
75 | n/a | self.assertEqual(f.tell(), size) |
---|
76 | n/a | # the 'a' that was written at the end of file above |
---|
77 | n/a | self.assertEqual(f.read(1), b'a') |
---|
78 | n/a | f.seek(-size-1, 1) |
---|
79 | n/a | self.assertEqual(f.read(1), b'z') |
---|
80 | n/a | self.assertEqual(f.tell(), 1) |
---|
81 | n/a | |
---|
82 | n/a | def test_lseek(self): |
---|
83 | n/a | with self.open(TESTFN, 'rb') as f: |
---|
84 | n/a | self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) |
---|
85 | n/a | self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) |
---|
86 | n/a | self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) |
---|
87 | n/a | self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) |
---|
88 | n/a | self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) |
---|
89 | n/a | self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) |
---|
90 | n/a | self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) |
---|
91 | n/a | self.assertEqual(os.lseek(f.fileno(), size, 0), size) |
---|
92 | n/a | # the 'a' that was written at the end of file above |
---|
93 | n/a | self.assertEqual(f.read(1), b'a') |
---|
94 | n/a | |
---|
95 | n/a | def test_truncate(self): |
---|
96 | n/a | with self.open(TESTFN, 'r+b') as f: |
---|
97 | n/a | if not hasattr(f, 'truncate'): |
---|
98 | n/a | raise unittest.SkipTest("open().truncate() not available " |
---|
99 | n/a | "on this system") |
---|
100 | n/a | f.seek(0, 2) |
---|
101 | n/a | # else we've lost track of the true size |
---|
102 | n/a | self.assertEqual(f.tell(), size+1) |
---|
103 | n/a | # Cut it back via seek + truncate with no argument. |
---|
104 | n/a | newsize = size - 10 |
---|
105 | n/a | f.seek(newsize) |
---|
106 | n/a | f.truncate() |
---|
107 | n/a | self.assertEqual(f.tell(), newsize) # else pointer moved |
---|
108 | n/a | f.seek(0, 2) |
---|
109 | n/a | self.assertEqual(f.tell(), newsize) # else wasn't truncated |
---|
110 | n/a | # Ensure that truncate(smaller than true size) shrinks |
---|
111 | n/a | # the file. |
---|
112 | n/a | newsize -= 1 |
---|
113 | n/a | f.seek(42) |
---|
114 | n/a | f.truncate(newsize) |
---|
115 | n/a | self.assertEqual(f.tell(), 42) |
---|
116 | n/a | f.seek(0, 2) |
---|
117 | n/a | self.assertEqual(f.tell(), newsize) |
---|
118 | n/a | # XXX truncate(larger than true size) is ill-defined |
---|
119 | n/a | # across platform; cut it waaaaay back |
---|
120 | n/a | f.seek(0) |
---|
121 | n/a | f.truncate(1) |
---|
122 | n/a | self.assertEqual(f.tell(), 0) # else pointer moved |
---|
123 | n/a | f.seek(0) |
---|
124 | n/a | self.assertEqual(len(f.read()), 1) # else wasn't truncated |
---|
125 | n/a | |
---|
126 | n/a | def test_seekable(self): |
---|
127 | n/a | # Issue #5016; seekable() can return False when the current position |
---|
128 | n/a | # is negative when truncated to an int. |
---|
129 | n/a | for pos in (2**31-1, 2**31, 2**31+1): |
---|
130 | n/a | with self.open(TESTFN, 'rb') as f: |
---|
131 | n/a | f.seek(pos) |
---|
132 | n/a | self.assertTrue(f.seekable()) |
---|
133 | n/a | |
---|
134 | n/a | def setUpModule(): |
---|
135 | n/a | try: |
---|
136 | n/a | import signal |
---|
137 | n/a | # The default handler for SIGXFSZ is to abort the process. |
---|
138 | n/a | # By ignoring it, system calls exceeding the file size resource |
---|
139 | n/a | # limit will raise OSError instead of crashing the interpreter. |
---|
140 | n/a | signal.signal(signal.SIGXFSZ, signal.SIG_IGN) |
---|
141 | n/a | except (ImportError, AttributeError): |
---|
142 | n/a | pass |
---|
143 | n/a | |
---|
144 | n/a | # On Windows and Mac OSX this test comsumes large resources; It |
---|
145 | n/a | # takes a long time to build the >2GB file and takes >2GB of disk |
---|
146 | n/a | # space therefore the resource must be enabled to run this test. |
---|
147 | n/a | # If not, nothing after this line stanza will be executed. |
---|
148 | n/a | if sys.platform[:3] == 'win' or sys.platform == 'darwin': |
---|
149 | n/a | requires('largefile', |
---|
150 | n/a | 'test requires %s bytes and a long time to run' % str(size)) |
---|
151 | n/a | else: |
---|
152 | n/a | # Only run if the current filesystem supports large files. |
---|
153 | n/a | # (Skip this test on Windows, since we now always support |
---|
154 | n/a | # large files.) |
---|
155 | n/a | f = open(TESTFN, 'wb', buffering=0) |
---|
156 | n/a | try: |
---|
157 | n/a | # 2**31 == 2147483648 |
---|
158 | n/a | f.seek(2147483649) |
---|
159 | n/a | # Seeking is not enough of a test: you must write and flush, too! |
---|
160 | n/a | f.write(b'x') |
---|
161 | n/a | f.flush() |
---|
162 | n/a | except (OSError, OverflowError): |
---|
163 | n/a | raise unittest.SkipTest("filesystem does not have " |
---|
164 | n/a | "largefile support") |
---|
165 | n/a | finally: |
---|
166 | n/a | f.close() |
---|
167 | n/a | unlink(TESTFN) |
---|
168 | n/a | |
---|
169 | n/a | |
---|
170 | n/a | class CLargeFileTest(LargeFileTest, unittest.TestCase): |
---|
171 | n/a | open = staticmethod(io.open) |
---|
172 | n/a | |
---|
173 | n/a | class PyLargeFileTest(LargeFileTest, unittest.TestCase): |
---|
174 | n/a | open = staticmethod(pyio.open) |
---|
175 | n/a | |
---|
176 | n/a | def tearDownModule(): |
---|
177 | n/a | unlink(TESTFN) |
---|
178 | n/a | |
---|
179 | n/a | if __name__ == '__main__': |
---|
180 | n/a | unittest.main() |
---|