1 | n/a | from test.support import (TESTFN, run_unittest, import_module, unlink, |
---|
2 | n/a | requires, _2G, _4G, gc_collect, cpython_only) |
---|
3 | n/a | import unittest |
---|
4 | n/a | import os |
---|
5 | n/a | import re |
---|
6 | n/a | import itertools |
---|
7 | n/a | import socket |
---|
8 | n/a | import sys |
---|
9 | n/a | import weakref |
---|
10 | n/a | |
---|
11 | n/a | # Skip test if we can't import mmap. |
---|
12 | n/a | mmap = import_module('mmap') |
---|
13 | n/a | |
---|
14 | n/a | PAGESIZE = mmap.PAGESIZE |
---|
15 | n/a | |
---|
16 | n/a | class MmapTests(unittest.TestCase): |
---|
17 | n/a | |
---|
18 | n/a | def setUp(self): |
---|
19 | n/a | if os.path.exists(TESTFN): |
---|
20 | n/a | os.unlink(TESTFN) |
---|
21 | n/a | |
---|
22 | n/a | def tearDown(self): |
---|
23 | n/a | try: |
---|
24 | n/a | os.unlink(TESTFN) |
---|
25 | n/a | except OSError: |
---|
26 | n/a | pass |
---|
27 | n/a | |
---|
28 | n/a | def test_basic(self): |
---|
29 | n/a | # Test mmap module on Unix systems and Windows |
---|
30 | n/a | |
---|
31 | n/a | # Create a file to be mmap'ed. |
---|
32 | n/a | f = open(TESTFN, 'bw+') |
---|
33 | n/a | try: |
---|
34 | n/a | # Write 2 pages worth of data to the file |
---|
35 | n/a | f.write(b'\0'* PAGESIZE) |
---|
36 | n/a | f.write(b'foo') |
---|
37 | n/a | f.write(b'\0'* (PAGESIZE-3) ) |
---|
38 | n/a | f.flush() |
---|
39 | n/a | m = mmap.mmap(f.fileno(), 2 * PAGESIZE) |
---|
40 | n/a | finally: |
---|
41 | n/a | f.close() |
---|
42 | n/a | |
---|
43 | n/a | # Simple sanity checks |
---|
44 | n/a | |
---|
45 | n/a | tp = str(type(m)) # SF bug 128713: segfaulted on Linux |
---|
46 | n/a | self.assertEqual(m.find(b'foo'), PAGESIZE) |
---|
47 | n/a | |
---|
48 | n/a | self.assertEqual(len(m), 2*PAGESIZE) |
---|
49 | n/a | |
---|
50 | n/a | self.assertEqual(m[0], 0) |
---|
51 | n/a | self.assertEqual(m[0:3], b'\0\0\0') |
---|
52 | n/a | |
---|
53 | n/a | # Shouldn't crash on boundary (Issue #5292) |
---|
54 | n/a | self.assertRaises(IndexError, m.__getitem__, len(m)) |
---|
55 | n/a | self.assertRaises(IndexError, m.__setitem__, len(m), b'\0') |
---|
56 | n/a | |
---|
57 | n/a | # Modify the file's content |
---|
58 | n/a | m[0] = b'3'[0] |
---|
59 | n/a | m[PAGESIZE +3: PAGESIZE +3+3] = b'bar' |
---|
60 | n/a | |
---|
61 | n/a | # Check that the modification worked |
---|
62 | n/a | self.assertEqual(m[0], b'3'[0]) |
---|
63 | n/a | self.assertEqual(m[0:3], b'3\0\0') |
---|
64 | n/a | self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0') |
---|
65 | n/a | |
---|
66 | n/a | m.flush() |
---|
67 | n/a | |
---|
68 | n/a | # Test doing a regular expression match in an mmap'ed file |
---|
69 | n/a | match = re.search(b'[A-Za-z]+', m) |
---|
70 | n/a | if match is None: |
---|
71 | n/a | self.fail('regex match on mmap failed!') |
---|
72 | n/a | else: |
---|
73 | n/a | start, end = match.span(0) |
---|
74 | n/a | length = end - start |
---|
75 | n/a | |
---|
76 | n/a | self.assertEqual(start, PAGESIZE) |
---|
77 | n/a | self.assertEqual(end, PAGESIZE + 6) |
---|
78 | n/a | |
---|
79 | n/a | # test seeking around (try to overflow the seek implementation) |
---|
80 | n/a | m.seek(0,0) |
---|
81 | n/a | self.assertEqual(m.tell(), 0) |
---|
82 | n/a | m.seek(42,1) |
---|
83 | n/a | self.assertEqual(m.tell(), 42) |
---|
84 | n/a | m.seek(0,2) |
---|
85 | n/a | self.assertEqual(m.tell(), len(m)) |
---|
86 | n/a | |
---|
87 | n/a | # Try to seek to negative position... |
---|
88 | n/a | self.assertRaises(ValueError, m.seek, -1) |
---|
89 | n/a | |
---|
90 | n/a | # Try to seek beyond end of mmap... |
---|
91 | n/a | self.assertRaises(ValueError, m.seek, 1, 2) |
---|
92 | n/a | |
---|
93 | n/a | # Try to seek to negative position... |
---|
94 | n/a | self.assertRaises(ValueError, m.seek, -len(m)-1, 2) |
---|
95 | n/a | |
---|
96 | n/a | # Try resizing map |
---|
97 | n/a | try: |
---|
98 | n/a | m.resize(512) |
---|
99 | n/a | except SystemError: |
---|
100 | n/a | # resize() not supported |
---|
101 | n/a | # No messages are printed, since the output of this test suite |
---|
102 | n/a | # would then be different across platforms. |
---|
103 | n/a | pass |
---|
104 | n/a | else: |
---|
105 | n/a | # resize() is supported |
---|
106 | n/a | self.assertEqual(len(m), 512) |
---|
107 | n/a | # Check that we can no longer seek beyond the new size. |
---|
108 | n/a | self.assertRaises(ValueError, m.seek, 513, 0) |
---|
109 | n/a | |
---|
110 | n/a | # Check that the underlying file is truncated too |
---|
111 | n/a | # (bug #728515) |
---|
112 | n/a | f = open(TESTFN, 'rb') |
---|
113 | n/a | try: |
---|
114 | n/a | f.seek(0, 2) |
---|
115 | n/a | self.assertEqual(f.tell(), 512) |
---|
116 | n/a | finally: |
---|
117 | n/a | f.close() |
---|
118 | n/a | self.assertEqual(m.size(), 512) |
---|
119 | n/a | |
---|
120 | n/a | m.close() |
---|
121 | n/a | |
---|
122 | n/a | def test_access_parameter(self): |
---|
123 | n/a | # Test for "access" keyword parameter |
---|
124 | n/a | mapsize = 10 |
---|
125 | n/a | with open(TESTFN, "wb") as fp: |
---|
126 | n/a | fp.write(b"a"*mapsize) |
---|
127 | n/a | with open(TESTFN, "rb") as f: |
---|
128 | n/a | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) |
---|
129 | n/a | self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.") |
---|
130 | n/a | |
---|
131 | n/a | # Ensuring that readonly mmap can't be slice assigned |
---|
132 | n/a | try: |
---|
133 | n/a | m[:] = b'b'*mapsize |
---|
134 | n/a | except TypeError: |
---|
135 | n/a | pass |
---|
136 | n/a | else: |
---|
137 | n/a | self.fail("Able to write to readonly memory map") |
---|
138 | n/a | |
---|
139 | n/a | # Ensuring that readonly mmap can't be item assigned |
---|
140 | n/a | try: |
---|
141 | n/a | m[0] = b'b' |
---|
142 | n/a | except TypeError: |
---|
143 | n/a | pass |
---|
144 | n/a | else: |
---|
145 | n/a | self.fail("Able to write to readonly memory map") |
---|
146 | n/a | |
---|
147 | n/a | # Ensuring that readonly mmap can't be write() to |
---|
148 | n/a | try: |
---|
149 | n/a | m.seek(0,0) |
---|
150 | n/a | m.write(b'abc') |
---|
151 | n/a | except TypeError: |
---|
152 | n/a | pass |
---|
153 | n/a | else: |
---|
154 | n/a | self.fail("Able to write to readonly memory map") |
---|
155 | n/a | |
---|
156 | n/a | # Ensuring that readonly mmap can't be write_byte() to |
---|
157 | n/a | try: |
---|
158 | n/a | m.seek(0,0) |
---|
159 | n/a | m.write_byte(b'd') |
---|
160 | n/a | except TypeError: |
---|
161 | n/a | pass |
---|
162 | n/a | else: |
---|
163 | n/a | self.fail("Able to write to readonly memory map") |
---|
164 | n/a | |
---|
165 | n/a | # Ensuring that readonly mmap can't be resized |
---|
166 | n/a | try: |
---|
167 | n/a | m.resize(2*mapsize) |
---|
168 | n/a | except SystemError: # resize is not universally supported |
---|
169 | n/a | pass |
---|
170 | n/a | except TypeError: |
---|
171 | n/a | pass |
---|
172 | n/a | else: |
---|
173 | n/a | self.fail("Able to resize readonly memory map") |
---|
174 | n/a | with open(TESTFN, "rb") as fp: |
---|
175 | n/a | self.assertEqual(fp.read(), b'a'*mapsize, |
---|
176 | n/a | "Readonly memory map data file was modified") |
---|
177 | n/a | |
---|
178 | n/a | # Opening mmap with size too big |
---|
179 | n/a | with open(TESTFN, "r+b") as f: |
---|
180 | n/a | try: |
---|
181 | n/a | m = mmap.mmap(f.fileno(), mapsize+1) |
---|
182 | n/a | except ValueError: |
---|
183 | n/a | # we do not expect a ValueError on Windows |
---|
184 | n/a | # CAUTION: This also changes the size of the file on disk, and |
---|
185 | n/a | # later tests assume that the length hasn't changed. We need to |
---|
186 | n/a | # repair that. |
---|
187 | n/a | if sys.platform.startswith('win'): |
---|
188 | n/a | self.fail("Opening mmap with size+1 should work on Windows.") |
---|
189 | n/a | else: |
---|
190 | n/a | # we expect a ValueError on Unix, but not on Windows |
---|
191 | n/a | if not sys.platform.startswith('win'): |
---|
192 | n/a | self.fail("Opening mmap with size+1 should raise ValueError.") |
---|
193 | n/a | m.close() |
---|
194 | n/a | if sys.platform.startswith('win'): |
---|
195 | n/a | # Repair damage from the resizing test. |
---|
196 | n/a | with open(TESTFN, 'r+b') as f: |
---|
197 | n/a | f.truncate(mapsize) |
---|
198 | n/a | |
---|
199 | n/a | # Opening mmap with access=ACCESS_WRITE |
---|
200 | n/a | with open(TESTFN, "r+b") as f: |
---|
201 | n/a | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) |
---|
202 | n/a | # Modifying write-through memory map |
---|
203 | n/a | m[:] = b'c'*mapsize |
---|
204 | n/a | self.assertEqual(m[:], b'c'*mapsize, |
---|
205 | n/a | "Write-through memory map memory not updated properly.") |
---|
206 | n/a | m.flush() |
---|
207 | n/a | m.close() |
---|
208 | n/a | with open(TESTFN, 'rb') as f: |
---|
209 | n/a | stuff = f.read() |
---|
210 | n/a | self.assertEqual(stuff, b'c'*mapsize, |
---|
211 | n/a | "Write-through memory map data file not updated properly.") |
---|
212 | n/a | |
---|
213 | n/a | # Opening mmap with access=ACCESS_COPY |
---|
214 | n/a | with open(TESTFN, "r+b") as f: |
---|
215 | n/a | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) |
---|
216 | n/a | # Modifying copy-on-write memory map |
---|
217 | n/a | m[:] = b'd'*mapsize |
---|
218 | n/a | self.assertEqual(m[:], b'd' * mapsize, |
---|
219 | n/a | "Copy-on-write memory map data not written correctly.") |
---|
220 | n/a | m.flush() |
---|
221 | n/a | with open(TESTFN, "rb") as fp: |
---|
222 | n/a | self.assertEqual(fp.read(), b'c'*mapsize, |
---|
223 | n/a | "Copy-on-write test data file should not be modified.") |
---|
224 | n/a | # Ensuring copy-on-write maps cannot be resized |
---|
225 | n/a | self.assertRaises(TypeError, m.resize, 2*mapsize) |
---|
226 | n/a | m.close() |
---|
227 | n/a | |
---|
228 | n/a | # Ensuring invalid access parameter raises exception |
---|
229 | n/a | with open(TESTFN, "r+b") as f: |
---|
230 | n/a | self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) |
---|
231 | n/a | |
---|
232 | n/a | if os.name == "posix": |
---|
233 | n/a | # Try incompatible flags, prot and access parameters. |
---|
234 | n/a | with open(TESTFN, "r+b") as f: |
---|
235 | n/a | self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, |
---|
236 | n/a | flags=mmap.MAP_PRIVATE, |
---|
237 | n/a | prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) |
---|
238 | n/a | |
---|
239 | n/a | # Try writing with PROT_EXEC and without PROT_WRITE |
---|
240 | n/a | prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0) |
---|
241 | n/a | with open(TESTFN, "r+b") as f: |
---|
242 | n/a | m = mmap.mmap(f.fileno(), mapsize, prot=prot) |
---|
243 | n/a | self.assertRaises(TypeError, m.write, b"abcdef") |
---|
244 | n/a | self.assertRaises(TypeError, m.write_byte, 0) |
---|
245 | n/a | m.close() |
---|
246 | n/a | |
---|
247 | n/a | def test_bad_file_desc(self): |
---|
248 | n/a | # Try opening a bad file descriptor... |
---|
249 | n/a | self.assertRaises(OSError, mmap.mmap, -2, 4096) |
---|
250 | n/a | |
---|
251 | n/a | def test_tougher_find(self): |
---|
252 | n/a | # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, |
---|
253 | n/a | # searching for data with embedded \0 bytes didn't work. |
---|
254 | n/a | with open(TESTFN, 'wb+') as f: |
---|
255 | n/a | |
---|
256 | n/a | data = b'aabaac\x00deef\x00\x00aa\x00' |
---|
257 | n/a | n = len(data) |
---|
258 | n/a | f.write(data) |
---|
259 | n/a | f.flush() |
---|
260 | n/a | m = mmap.mmap(f.fileno(), n) |
---|
261 | n/a | |
---|
262 | n/a | for start in range(n+1): |
---|
263 | n/a | for finish in range(start, n+1): |
---|
264 | n/a | slice = data[start : finish] |
---|
265 | n/a | self.assertEqual(m.find(slice), data.find(slice)) |
---|
266 | n/a | self.assertEqual(m.find(slice + b'x'), -1) |
---|
267 | n/a | m.close() |
---|
268 | n/a | |
---|
269 | n/a | def test_find_end(self): |
---|
270 | n/a | # test the new 'end' parameter works as expected |
---|
271 | n/a | f = open(TESTFN, 'wb+') |
---|
272 | n/a | data = b'one two ones' |
---|
273 | n/a | n = len(data) |
---|
274 | n/a | f.write(data) |
---|
275 | n/a | f.flush() |
---|
276 | n/a | m = mmap.mmap(f.fileno(), n) |
---|
277 | n/a | f.close() |
---|
278 | n/a | |
---|
279 | n/a | self.assertEqual(m.find(b'one'), 0) |
---|
280 | n/a | self.assertEqual(m.find(b'ones'), 8) |
---|
281 | n/a | self.assertEqual(m.find(b'one', 0, -1), 0) |
---|
282 | n/a | self.assertEqual(m.find(b'one', 1), 8) |
---|
283 | n/a | self.assertEqual(m.find(b'one', 1, -1), 8) |
---|
284 | n/a | self.assertEqual(m.find(b'one', 1, -2), -1) |
---|
285 | n/a | self.assertEqual(m.find(bytearray(b'one')), 0) |
---|
286 | n/a | |
---|
287 | n/a | |
---|
288 | n/a | def test_rfind(self): |
---|
289 | n/a | # test the new 'end' parameter works as expected |
---|
290 | n/a | f = open(TESTFN, 'wb+') |
---|
291 | n/a | data = b'one two ones' |
---|
292 | n/a | n = len(data) |
---|
293 | n/a | f.write(data) |
---|
294 | n/a | f.flush() |
---|
295 | n/a | m = mmap.mmap(f.fileno(), n) |
---|
296 | n/a | f.close() |
---|
297 | n/a | |
---|
298 | n/a | self.assertEqual(m.rfind(b'one'), 8) |
---|
299 | n/a | self.assertEqual(m.rfind(b'one '), 0) |
---|
300 | n/a | self.assertEqual(m.rfind(b'one', 0, -1), 8) |
---|
301 | n/a | self.assertEqual(m.rfind(b'one', 0, -2), 0) |
---|
302 | n/a | self.assertEqual(m.rfind(b'one', 1, -1), 8) |
---|
303 | n/a | self.assertEqual(m.rfind(b'one', 1, -2), -1) |
---|
304 | n/a | self.assertEqual(m.rfind(bytearray(b'one')), 8) |
---|
305 | n/a | |
---|
306 | n/a | |
---|
307 | n/a | def test_double_close(self): |
---|
308 | n/a | # make sure a double close doesn't crash on Solaris (Bug# 665913) |
---|
309 | n/a | f = open(TESTFN, 'wb+') |
---|
310 | n/a | |
---|
311 | n/a | f.write(2**16 * b'a') # Arbitrary character |
---|
312 | n/a | f.close() |
---|
313 | n/a | |
---|
314 | n/a | f = open(TESTFN, 'rb') |
---|
315 | n/a | mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) |
---|
316 | n/a | mf.close() |
---|
317 | n/a | mf.close() |
---|
318 | n/a | f.close() |
---|
319 | n/a | |
---|
320 | n/a | @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") |
---|
321 | n/a | def test_entire_file(self): |
---|
322 | n/a | # test mapping of entire file by passing 0 for map length |
---|
323 | n/a | f = open(TESTFN, "wb+") |
---|
324 | n/a | |
---|
325 | n/a | f.write(2**16 * b'm') # Arbitrary character |
---|
326 | n/a | f.close() |
---|
327 | n/a | |
---|
328 | n/a | f = open(TESTFN, "rb+") |
---|
329 | n/a | mf = mmap.mmap(f.fileno(), 0) |
---|
330 | n/a | self.assertEqual(len(mf), 2**16, "Map size should equal file size.") |
---|
331 | n/a | self.assertEqual(mf.read(2**16), 2**16 * b"m") |
---|
332 | n/a | mf.close() |
---|
333 | n/a | f.close() |
---|
334 | n/a | |
---|
335 | n/a | @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") |
---|
336 | n/a | def test_length_0_offset(self): |
---|
337 | n/a | # Issue #10916: test mapping of remainder of file by passing 0 for |
---|
338 | n/a | # map length with an offset doesn't cause a segfault. |
---|
339 | n/a | # NOTE: allocation granularity is currently 65536 under Win64, |
---|
340 | n/a | # and therefore the minimum offset alignment. |
---|
341 | n/a | with open(TESTFN, "wb") as f: |
---|
342 | n/a | f.write((65536 * 2) * b'm') # Arbitrary character |
---|
343 | n/a | |
---|
344 | n/a | with open(TESTFN, "rb") as f: |
---|
345 | n/a | with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: |
---|
346 | n/a | self.assertRaises(IndexError, mf.__getitem__, 80000) |
---|
347 | n/a | |
---|
348 | n/a | @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") |
---|
349 | n/a | def test_length_0_large_offset(self): |
---|
350 | n/a | # Issue #10959: test mapping of a file by passing 0 for |
---|
351 | n/a | # map length with a large offset doesn't cause a segfault. |
---|
352 | n/a | with open(TESTFN, "wb") as f: |
---|
353 | n/a | f.write(115699 * b'm') # Arbitrary character |
---|
354 | n/a | |
---|
355 | n/a | with open(TESTFN, "w+b") as f: |
---|
356 | n/a | self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0, |
---|
357 | n/a | offset=2147418112) |
---|
358 | n/a | |
---|
359 | n/a | def test_move(self): |
---|
360 | n/a | # make move works everywhere (64-bit format problem earlier) |
---|
361 | n/a | f = open(TESTFN, 'wb+') |
---|
362 | n/a | |
---|
363 | n/a | f.write(b"ABCDEabcde") # Arbitrary character |
---|
364 | n/a | f.flush() |
---|
365 | n/a | |
---|
366 | n/a | mf = mmap.mmap(f.fileno(), 10) |
---|
367 | n/a | mf.move(5, 0, 5) |
---|
368 | n/a | self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5") |
---|
369 | n/a | mf.close() |
---|
370 | n/a | f.close() |
---|
371 | n/a | |
---|
372 | n/a | # more excessive test |
---|
373 | n/a | data = b"0123456789" |
---|
374 | n/a | for dest in range(len(data)): |
---|
375 | n/a | for src in range(len(data)): |
---|
376 | n/a | for count in range(len(data) - max(dest, src)): |
---|
377 | n/a | expected = data[:dest] + data[src:src+count] + data[dest+count:] |
---|
378 | n/a | m = mmap.mmap(-1, len(data)) |
---|
379 | n/a | m[:] = data |
---|
380 | n/a | m.move(dest, src, count) |
---|
381 | n/a | self.assertEqual(m[:], expected) |
---|
382 | n/a | m.close() |
---|
383 | n/a | |
---|
384 | n/a | # segfault test (Issue 5387) |
---|
385 | n/a | m = mmap.mmap(-1, 100) |
---|
386 | n/a | offsets = [-100, -1, 0, 1, 100] |
---|
387 | n/a | for source, dest, size in itertools.product(offsets, offsets, offsets): |
---|
388 | n/a | try: |
---|
389 | n/a | m.move(source, dest, size) |
---|
390 | n/a | except ValueError: |
---|
391 | n/a | pass |
---|
392 | n/a | |
---|
393 | n/a | offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1), |
---|
394 | n/a | (-1, 0, 0), (0, -1, 0), (0, 0, -1)] |
---|
395 | n/a | for source, dest, size in offsets: |
---|
396 | n/a | self.assertRaises(ValueError, m.move, source, dest, size) |
---|
397 | n/a | |
---|
398 | n/a | m.close() |
---|
399 | n/a | |
---|
400 | n/a | m = mmap.mmap(-1, 1) # single byte |
---|
401 | n/a | self.assertRaises(ValueError, m.move, 0, 0, 2) |
---|
402 | n/a | self.assertRaises(ValueError, m.move, 1, 0, 1) |
---|
403 | n/a | self.assertRaises(ValueError, m.move, 0, 1, 1) |
---|
404 | n/a | m.move(0, 0, 1) |
---|
405 | n/a | m.move(0, 0, 0) |
---|
406 | n/a | |
---|
407 | n/a | |
---|
408 | n/a | def test_anonymous(self): |
---|
409 | n/a | # anonymous mmap.mmap(-1, PAGE) |
---|
410 | n/a | m = mmap.mmap(-1, PAGESIZE) |
---|
411 | n/a | for x in range(PAGESIZE): |
---|
412 | n/a | self.assertEqual(m[x], 0, |
---|
413 | n/a | "anonymously mmap'ed contents should be zero") |
---|
414 | n/a | |
---|
415 | n/a | for x in range(PAGESIZE): |
---|
416 | n/a | b = x & 0xff |
---|
417 | n/a | m[x] = b |
---|
418 | n/a | self.assertEqual(m[x], b) |
---|
419 | n/a | |
---|
420 | n/a | def test_read_all(self): |
---|
421 | n/a | m = mmap.mmap(-1, 16) |
---|
422 | n/a | self.addCleanup(m.close) |
---|
423 | n/a | |
---|
424 | n/a | # With no parameters, or None or a negative argument, reads all |
---|
425 | n/a | m.write(bytes(range(16))) |
---|
426 | n/a | m.seek(0) |
---|
427 | n/a | self.assertEqual(m.read(), bytes(range(16))) |
---|
428 | n/a | m.seek(8) |
---|
429 | n/a | self.assertEqual(m.read(), bytes(range(8, 16))) |
---|
430 | n/a | m.seek(16) |
---|
431 | n/a | self.assertEqual(m.read(), b'') |
---|
432 | n/a | m.seek(3) |
---|
433 | n/a | self.assertEqual(m.read(None), bytes(range(3, 16))) |
---|
434 | n/a | m.seek(4) |
---|
435 | n/a | self.assertEqual(m.read(-1), bytes(range(4, 16))) |
---|
436 | n/a | m.seek(5) |
---|
437 | n/a | self.assertEqual(m.read(-2), bytes(range(5, 16))) |
---|
438 | n/a | m.seek(9) |
---|
439 | n/a | self.assertEqual(m.read(-42), bytes(range(9, 16))) |
---|
440 | n/a | |
---|
441 | n/a | def test_read_invalid_arg(self): |
---|
442 | n/a | m = mmap.mmap(-1, 16) |
---|
443 | n/a | self.addCleanup(m.close) |
---|
444 | n/a | |
---|
445 | n/a | self.assertRaises(TypeError, m.read, 'foo') |
---|
446 | n/a | self.assertRaises(TypeError, m.read, 5.5) |
---|
447 | n/a | self.assertRaises(TypeError, m.read, [1, 2, 3]) |
---|
448 | n/a | |
---|
449 | n/a | def test_extended_getslice(self): |
---|
450 | n/a | # Test extended slicing by comparing with list slicing. |
---|
451 | n/a | s = bytes(reversed(range(256))) |
---|
452 | n/a | m = mmap.mmap(-1, len(s)) |
---|
453 | n/a | m[:] = s |
---|
454 | n/a | self.assertEqual(m[:], s) |
---|
455 | n/a | indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) |
---|
456 | n/a | for start in indices: |
---|
457 | n/a | for stop in indices: |
---|
458 | n/a | # Skip step 0 (invalid) |
---|
459 | n/a | for step in indices[1:]: |
---|
460 | n/a | self.assertEqual(m[start:stop:step], |
---|
461 | n/a | s[start:stop:step]) |
---|
462 | n/a | |
---|
463 | n/a | def test_extended_set_del_slice(self): |
---|
464 | n/a | # Test extended slicing by comparing with list slicing. |
---|
465 | n/a | s = bytes(reversed(range(256))) |
---|
466 | n/a | m = mmap.mmap(-1, len(s)) |
---|
467 | n/a | indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) |
---|
468 | n/a | for start in indices: |
---|
469 | n/a | for stop in indices: |
---|
470 | n/a | # Skip invalid step 0 |
---|
471 | n/a | for step in indices[1:]: |
---|
472 | n/a | m[:] = s |
---|
473 | n/a | self.assertEqual(m[:], s) |
---|
474 | n/a | L = list(s) |
---|
475 | n/a | # Make sure we have a slice of exactly the right length, |
---|
476 | n/a | # but with different data. |
---|
477 | n/a | data = L[start:stop:step] |
---|
478 | n/a | data = bytes(reversed(data)) |
---|
479 | n/a | L[start:stop:step] = data |
---|
480 | n/a | m[start:stop:step] = data |
---|
481 | n/a | self.assertEqual(m[:], bytes(L)) |
---|
482 | n/a | |
---|
483 | n/a | def make_mmap_file (self, f, halfsize): |
---|
484 | n/a | # Write 2 pages worth of data to the file |
---|
485 | n/a | f.write (b'\0' * halfsize) |
---|
486 | n/a | f.write (b'foo') |
---|
487 | n/a | f.write (b'\0' * (halfsize - 3)) |
---|
488 | n/a | f.flush () |
---|
489 | n/a | return mmap.mmap (f.fileno(), 0) |
---|
490 | n/a | |
---|
491 | n/a | def test_empty_file (self): |
---|
492 | n/a | f = open (TESTFN, 'w+b') |
---|
493 | n/a | f.close() |
---|
494 | n/a | with open(TESTFN, "rb") as f : |
---|
495 | n/a | self.assertRaisesRegex(ValueError, |
---|
496 | n/a | "cannot mmap an empty file", |
---|
497 | n/a | mmap.mmap, f.fileno(), 0, |
---|
498 | n/a | access=mmap.ACCESS_READ) |
---|
499 | n/a | |
---|
500 | n/a | def test_offset (self): |
---|
501 | n/a | f = open (TESTFN, 'w+b') |
---|
502 | n/a | |
---|
503 | n/a | try: # unlink TESTFN no matter what |
---|
504 | n/a | halfsize = mmap.ALLOCATIONGRANULARITY |
---|
505 | n/a | m = self.make_mmap_file (f, halfsize) |
---|
506 | n/a | m.close () |
---|
507 | n/a | f.close () |
---|
508 | n/a | |
---|
509 | n/a | mapsize = halfsize * 2 |
---|
510 | n/a | # Try invalid offset |
---|
511 | n/a | f = open(TESTFN, "r+b") |
---|
512 | n/a | for offset in [-2, -1, None]: |
---|
513 | n/a | try: |
---|
514 | n/a | m = mmap.mmap(f.fileno(), mapsize, offset=offset) |
---|
515 | n/a | self.assertEqual(0, 1) |
---|
516 | n/a | except (ValueError, TypeError, OverflowError): |
---|
517 | n/a | pass |
---|
518 | n/a | else: |
---|
519 | n/a | self.assertEqual(0, 0) |
---|
520 | n/a | f.close() |
---|
521 | n/a | |
---|
522 | n/a | # Try valid offset, hopefully 8192 works on all OSes |
---|
523 | n/a | f = open(TESTFN, "r+b") |
---|
524 | n/a | m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) |
---|
525 | n/a | self.assertEqual(m[0:3], b'foo') |
---|
526 | n/a | f.close() |
---|
527 | n/a | |
---|
528 | n/a | # Try resizing map |
---|
529 | n/a | try: |
---|
530 | n/a | m.resize(512) |
---|
531 | n/a | except SystemError: |
---|
532 | n/a | pass |
---|
533 | n/a | else: |
---|
534 | n/a | # resize() is supported |
---|
535 | n/a | self.assertEqual(len(m), 512) |
---|
536 | n/a | # Check that we can no longer seek beyond the new size. |
---|
537 | n/a | self.assertRaises(ValueError, m.seek, 513, 0) |
---|
538 | n/a | # Check that the content is not changed |
---|
539 | n/a | self.assertEqual(m[0:3], b'foo') |
---|
540 | n/a | |
---|
541 | n/a | # Check that the underlying file is truncated too |
---|
542 | n/a | f = open(TESTFN, 'rb') |
---|
543 | n/a | f.seek(0, 2) |
---|
544 | n/a | self.assertEqual(f.tell(), halfsize + 512) |
---|
545 | n/a | f.close() |
---|
546 | n/a | self.assertEqual(m.size(), halfsize + 512) |
---|
547 | n/a | |
---|
548 | n/a | m.close() |
---|
549 | n/a | |
---|
550 | n/a | finally: |
---|
551 | n/a | f.close() |
---|
552 | n/a | try: |
---|
553 | n/a | os.unlink(TESTFN) |
---|
554 | n/a | except OSError: |
---|
555 | n/a | pass |
---|
556 | n/a | |
---|
557 | n/a | def test_subclass(self): |
---|
558 | n/a | class anon_mmap(mmap.mmap): |
---|
559 | n/a | def __new__(klass, *args, **kwargs): |
---|
560 | n/a | return mmap.mmap.__new__(klass, -1, *args, **kwargs) |
---|
561 | n/a | anon_mmap(PAGESIZE) |
---|
562 | n/a | |
---|
563 | n/a | @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") |
---|
564 | n/a | def test_prot_readonly(self): |
---|
565 | n/a | mapsize = 10 |
---|
566 | n/a | with open(TESTFN, "wb") as fp: |
---|
567 | n/a | fp.write(b"a"*mapsize) |
---|
568 | n/a | f = open(TESTFN, "rb") |
---|
569 | n/a | m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) |
---|
570 | n/a | self.assertRaises(TypeError, m.write, "foo") |
---|
571 | n/a | f.close() |
---|
572 | n/a | |
---|
573 | n/a | def test_error(self): |
---|
574 | n/a | self.assertIs(mmap.error, OSError) |
---|
575 | n/a | |
---|
576 | n/a | def test_io_methods(self): |
---|
577 | n/a | data = b"0123456789" |
---|
578 | n/a | with open(TESTFN, "wb") as fp: |
---|
579 | n/a | fp.write(b"x"*len(data)) |
---|
580 | n/a | f = open(TESTFN, "r+b") |
---|
581 | n/a | m = mmap.mmap(f.fileno(), len(data)) |
---|
582 | n/a | f.close() |
---|
583 | n/a | # Test write_byte() |
---|
584 | n/a | for i in range(len(data)): |
---|
585 | n/a | self.assertEqual(m.tell(), i) |
---|
586 | n/a | m.write_byte(data[i]) |
---|
587 | n/a | self.assertEqual(m.tell(), i+1) |
---|
588 | n/a | self.assertRaises(ValueError, m.write_byte, b"x"[0]) |
---|
589 | n/a | self.assertEqual(m[:], data) |
---|
590 | n/a | # Test read_byte() |
---|
591 | n/a | m.seek(0) |
---|
592 | n/a | for i in range(len(data)): |
---|
593 | n/a | self.assertEqual(m.tell(), i) |
---|
594 | n/a | self.assertEqual(m.read_byte(), data[i]) |
---|
595 | n/a | self.assertEqual(m.tell(), i+1) |
---|
596 | n/a | self.assertRaises(ValueError, m.read_byte) |
---|
597 | n/a | # Test read() |
---|
598 | n/a | m.seek(3) |
---|
599 | n/a | self.assertEqual(m.read(3), b"345") |
---|
600 | n/a | self.assertEqual(m.tell(), 6) |
---|
601 | n/a | # Test write() |
---|
602 | n/a | m.seek(3) |
---|
603 | n/a | m.write(b"bar") |
---|
604 | n/a | self.assertEqual(m.tell(), 6) |
---|
605 | n/a | self.assertEqual(m[:], b"012bar6789") |
---|
606 | n/a | m.write(bytearray(b"baz")) |
---|
607 | n/a | self.assertEqual(m.tell(), 9) |
---|
608 | n/a | self.assertEqual(m[:], b"012barbaz9") |
---|
609 | n/a | self.assertRaises(ValueError, m.write, b"ba") |
---|
610 | n/a | |
---|
611 | n/a | def test_non_ascii_byte(self): |
---|
612 | n/a | for b in (129, 200, 255): # > 128 |
---|
613 | n/a | m = mmap.mmap(-1, 1) |
---|
614 | n/a | m.write_byte(b) |
---|
615 | n/a | self.assertEqual(m[0], b) |
---|
616 | n/a | m.seek(0) |
---|
617 | n/a | self.assertEqual(m.read_byte(), b) |
---|
618 | n/a | m.close() |
---|
619 | n/a | |
---|
620 | n/a | @unittest.skipUnless(os.name == 'nt', 'requires Windows') |
---|
621 | n/a | def test_tagname(self): |
---|
622 | n/a | data1 = b"0123456789" |
---|
623 | n/a | data2 = b"abcdefghij" |
---|
624 | n/a | assert len(data1) == len(data2) |
---|
625 | n/a | |
---|
626 | n/a | # Test same tag |
---|
627 | n/a | m1 = mmap.mmap(-1, len(data1), tagname="foo") |
---|
628 | n/a | m1[:] = data1 |
---|
629 | n/a | m2 = mmap.mmap(-1, len(data2), tagname="foo") |
---|
630 | n/a | m2[:] = data2 |
---|
631 | n/a | self.assertEqual(m1[:], data2) |
---|
632 | n/a | self.assertEqual(m2[:], data2) |
---|
633 | n/a | m2.close() |
---|
634 | n/a | m1.close() |
---|
635 | n/a | |
---|
636 | n/a | # Test different tag |
---|
637 | n/a | m1 = mmap.mmap(-1, len(data1), tagname="foo") |
---|
638 | n/a | m1[:] = data1 |
---|
639 | n/a | m2 = mmap.mmap(-1, len(data2), tagname="boo") |
---|
640 | n/a | m2[:] = data2 |
---|
641 | n/a | self.assertEqual(m1[:], data1) |
---|
642 | n/a | self.assertEqual(m2[:], data2) |
---|
643 | n/a | m2.close() |
---|
644 | n/a | m1.close() |
---|
645 | n/a | |
---|
646 | n/a | @cpython_only |
---|
647 | n/a | @unittest.skipUnless(os.name == 'nt', 'requires Windows') |
---|
648 | n/a | def test_sizeof(self): |
---|
649 | n/a | m1 = mmap.mmap(-1, 100) |
---|
650 | n/a | tagname = "foo" |
---|
651 | n/a | m2 = mmap.mmap(-1, 100, tagname=tagname) |
---|
652 | n/a | self.assertEqual(sys.getsizeof(m2), |
---|
653 | n/a | sys.getsizeof(m1) + len(tagname) + 1) |
---|
654 | n/a | |
---|
655 | n/a | @unittest.skipUnless(os.name == 'nt', 'requires Windows') |
---|
656 | n/a | def test_crasher_on_windows(self): |
---|
657 | n/a | # Should not crash (Issue 1733986) |
---|
658 | n/a | m = mmap.mmap(-1, 1000, tagname="foo") |
---|
659 | n/a | try: |
---|
660 | n/a | mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size |
---|
661 | n/a | except: |
---|
662 | n/a | pass |
---|
663 | n/a | m.close() |
---|
664 | n/a | |
---|
665 | n/a | # Should not crash (Issue 5385) |
---|
666 | n/a | with open(TESTFN, "wb") as fp: |
---|
667 | n/a | fp.write(b"x"*10) |
---|
668 | n/a | f = open(TESTFN, "r+b") |
---|
669 | n/a | m = mmap.mmap(f.fileno(), 0) |
---|
670 | n/a | f.close() |
---|
671 | n/a | try: |
---|
672 | n/a | m.resize(0) # will raise OSError |
---|
673 | n/a | except: |
---|
674 | n/a | pass |
---|
675 | n/a | try: |
---|
676 | n/a | m[:] |
---|
677 | n/a | except: |
---|
678 | n/a | pass |
---|
679 | n/a | m.close() |
---|
680 | n/a | |
---|
681 | n/a | @unittest.skipUnless(os.name == 'nt', 'requires Windows') |
---|
682 | n/a | def test_invalid_descriptor(self): |
---|
683 | n/a | # socket file descriptors are valid, but out of range |
---|
684 | n/a | # for _get_osfhandle, causing a crash when validating the |
---|
685 | n/a | # parameters to _get_osfhandle. |
---|
686 | n/a | s = socket.socket() |
---|
687 | n/a | try: |
---|
688 | n/a | with self.assertRaises(OSError): |
---|
689 | n/a | m = mmap.mmap(s.fileno(), 10) |
---|
690 | n/a | finally: |
---|
691 | n/a | s.close() |
---|
692 | n/a | |
---|
693 | n/a | def test_context_manager(self): |
---|
694 | n/a | with mmap.mmap(-1, 10) as m: |
---|
695 | n/a | self.assertFalse(m.closed) |
---|
696 | n/a | self.assertTrue(m.closed) |
---|
697 | n/a | |
---|
698 | n/a | def test_context_manager_exception(self): |
---|
699 | n/a | # Test that the OSError gets passed through |
---|
700 | n/a | with self.assertRaises(Exception) as exc: |
---|
701 | n/a | with mmap.mmap(-1, 10) as m: |
---|
702 | n/a | raise OSError |
---|
703 | n/a | self.assertIsInstance(exc.exception, OSError, |
---|
704 | n/a | "wrong exception raised in context manager") |
---|
705 | n/a | self.assertTrue(m.closed, "context manager failed") |
---|
706 | n/a | |
---|
707 | n/a | def test_weakref(self): |
---|
708 | n/a | # Check mmap objects are weakrefable |
---|
709 | n/a | mm = mmap.mmap(-1, 16) |
---|
710 | n/a | wr = weakref.ref(mm) |
---|
711 | n/a | self.assertIs(wr(), mm) |
---|
712 | n/a | del mm |
---|
713 | n/a | gc_collect() |
---|
714 | n/a | self.assertIs(wr(), None) |
---|
715 | n/a | |
---|
716 | n/a | def test_write_returning_the_number_of_bytes_written(self): |
---|
717 | n/a | mm = mmap.mmap(-1, 16) |
---|
718 | n/a | self.assertEqual(mm.write(b""), 0) |
---|
719 | n/a | self.assertEqual(mm.write(b"x"), 1) |
---|
720 | n/a | self.assertEqual(mm.write(b"yz"), 2) |
---|
721 | n/a | self.assertEqual(mm.write(b"python"), 6) |
---|
722 | n/a | |
---|
723 | n/a | @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') |
---|
724 | n/a | def test_resize_past_pos(self): |
---|
725 | n/a | m = mmap.mmap(-1, 8192) |
---|
726 | n/a | self.addCleanup(m.close) |
---|
727 | n/a | m.read(5000) |
---|
728 | n/a | try: |
---|
729 | n/a | m.resize(4096) |
---|
730 | n/a | except SystemError: |
---|
731 | n/a | self.skipTest("resizing not supported") |
---|
732 | n/a | self.assertEqual(m.read(14), b'') |
---|
733 | n/a | self.assertRaises(ValueError, m.read_byte) |
---|
734 | n/a | self.assertRaises(ValueError, m.write_byte, 42) |
---|
735 | n/a | self.assertRaises(ValueError, m.write, b'abc') |
---|
736 | n/a | |
---|
737 | n/a | |
---|
738 | n/a | class LargeMmapTests(unittest.TestCase): |
---|
739 | n/a | |
---|
740 | n/a | def setUp(self): |
---|
741 | n/a | unlink(TESTFN) |
---|
742 | n/a | |
---|
743 | n/a | def tearDown(self): |
---|
744 | n/a | unlink(TESTFN) |
---|
745 | n/a | |
---|
746 | n/a | def _make_test_file(self, num_zeroes, tail): |
---|
747 | n/a | if sys.platform[:3] == 'win' or sys.platform == 'darwin': |
---|
748 | n/a | requires('largefile', |
---|
749 | n/a | 'test requires %s bytes and a long time to run' % str(0x180000000)) |
---|
750 | n/a | f = open(TESTFN, 'w+b') |
---|
751 | n/a | try: |
---|
752 | n/a | f.seek(num_zeroes) |
---|
753 | n/a | f.write(tail) |
---|
754 | n/a | f.flush() |
---|
755 | n/a | except (OSError, OverflowError, ValueError): |
---|
756 | n/a | try: |
---|
757 | n/a | f.close() |
---|
758 | n/a | except (OSError, OverflowError): |
---|
759 | n/a | pass |
---|
760 | n/a | raise unittest.SkipTest("filesystem does not have largefile support") |
---|
761 | n/a | return f |
---|
762 | n/a | |
---|
763 | n/a | def test_large_offset(self): |
---|
764 | n/a | with self._make_test_file(0x14FFFFFFF, b" ") as f: |
---|
765 | n/a | with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m: |
---|
766 | n/a | self.assertEqual(m[0xFFFFFFF], 32) |
---|
767 | n/a | |
---|
768 | n/a | def test_large_filesize(self): |
---|
769 | n/a | with self._make_test_file(0x17FFFFFFF, b" ") as f: |
---|
770 | n/a | if sys.maxsize < 0x180000000: |
---|
771 | n/a | # On 32 bit platforms the file is larger than sys.maxsize so |
---|
772 | n/a | # mapping the whole file should fail -- Issue #16743 |
---|
773 | n/a | with self.assertRaises(OverflowError): |
---|
774 | n/a | mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ) |
---|
775 | n/a | with self.assertRaises(ValueError): |
---|
776 | n/a | mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) |
---|
777 | n/a | with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m: |
---|
778 | n/a | self.assertEqual(m.size(), 0x180000000) |
---|
779 | n/a | |
---|
780 | n/a | # Issue 11277: mmap() with large (~4GB) sparse files crashes on OS X. |
---|
781 | n/a | |
---|
782 | n/a | def _test_around_boundary(self, boundary): |
---|
783 | n/a | tail = b' DEARdear ' |
---|
784 | n/a | start = boundary - len(tail) // 2 |
---|
785 | n/a | end = start + len(tail) |
---|
786 | n/a | with self._make_test_file(start, tail) as f: |
---|
787 | n/a | with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m: |
---|
788 | n/a | self.assertEqual(m[start:end], tail) |
---|
789 | n/a | |
---|
790 | n/a | @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") |
---|
791 | n/a | def test_around_2GB(self): |
---|
792 | n/a | self._test_around_boundary(_2G) |
---|
793 | n/a | |
---|
794 | n/a | @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") |
---|
795 | n/a | def test_around_4GB(self): |
---|
796 | n/a | self._test_around_boundary(_4G) |
---|
797 | n/a | |
---|
798 | n/a | |
---|
799 | n/a | def test_main(): |
---|
800 | n/a | run_unittest(MmapTests, LargeMmapTests) |
---|
801 | n/a | |
---|
802 | n/a | if __name__ == '__main__': |
---|
803 | n/a | test_main() |
---|