| 1 | n/a | """Test program for the fcntl C module. |
|---|
| 2 | n/a | """ |
|---|
| 3 | n/a | import platform |
|---|
| 4 | n/a | import os |
|---|
| 5 | n/a | import struct |
|---|
| 6 | n/a | import sys |
|---|
| 7 | n/a | import unittest |
|---|
| 8 | n/a | from test.support import (verbose, TESTFN, unlink, run_unittest, import_module, |
|---|
| 9 | n/a | cpython_only) |
|---|
| 10 | n/a | |
|---|
| 11 | n/a | # Skip test if no fcntl module. |
|---|
| 12 | n/a | fcntl = import_module('fcntl') |
|---|
| 13 | n/a | |
|---|
| 14 | n/a | |
|---|
| 15 | n/a | # TODO - Write tests for flock() and lockf(). |
|---|
| 16 | n/a | |
|---|
| 17 | n/a | def get_lockdata(): |
|---|
| 18 | n/a | try: |
|---|
| 19 | n/a | os.O_LARGEFILE |
|---|
| 20 | n/a | except AttributeError: |
|---|
| 21 | n/a | start_len = "ll" |
|---|
| 22 | n/a | else: |
|---|
| 23 | n/a | start_len = "qq" |
|---|
| 24 | n/a | |
|---|
| 25 | n/a | if (sys.platform.startswith(('netbsd', 'freebsd', 'openbsd', 'bsdos')) |
|---|
| 26 | n/a | or sys.platform == 'darwin'): |
|---|
| 27 | n/a | if struct.calcsize('l') == 8: |
|---|
| 28 | n/a | off_t = 'l' |
|---|
| 29 | n/a | pid_t = 'i' |
|---|
| 30 | n/a | else: |
|---|
| 31 | n/a | off_t = 'lxxxx' |
|---|
| 32 | n/a | pid_t = 'l' |
|---|
| 33 | n/a | lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0, |
|---|
| 34 | n/a | fcntl.F_WRLCK, 0) |
|---|
| 35 | n/a | elif sys.platform.startswith('gnukfreebsd'): |
|---|
| 36 | n/a | lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0) |
|---|
| 37 | n/a | elif sys.platform in ['aix3', 'aix4', 'hp-uxB', 'unixware7']: |
|---|
| 38 | n/a | lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0) |
|---|
| 39 | n/a | else: |
|---|
| 40 | n/a | lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) |
|---|
| 41 | n/a | if lockdata: |
|---|
| 42 | n/a | if verbose: |
|---|
| 43 | n/a | print('struct.pack: ', repr(lockdata)) |
|---|
| 44 | n/a | return lockdata |
|---|
| 45 | n/a | |
|---|
| 46 | n/a | lockdata = get_lockdata() |
|---|
| 47 | n/a | |
|---|
| 48 | n/a | class BadFile: |
|---|
| 49 | n/a | def __init__(self, fn): |
|---|
| 50 | n/a | self.fn = fn |
|---|
| 51 | n/a | def fileno(self): |
|---|
| 52 | n/a | return self.fn |
|---|
| 53 | n/a | |
|---|
| 54 | n/a | class TestFcntl(unittest.TestCase): |
|---|
| 55 | n/a | |
|---|
| 56 | n/a | def setUp(self): |
|---|
| 57 | n/a | self.f = None |
|---|
| 58 | n/a | |
|---|
| 59 | n/a | def tearDown(self): |
|---|
| 60 | n/a | if self.f and not self.f.closed: |
|---|
| 61 | n/a | self.f.close() |
|---|
| 62 | n/a | unlink(TESTFN) |
|---|
| 63 | n/a | |
|---|
| 64 | n/a | def test_fcntl_fileno(self): |
|---|
| 65 | n/a | # the example from the library docs |
|---|
| 66 | n/a | self.f = open(TESTFN, 'wb') |
|---|
| 67 | n/a | rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 68 | n/a | if verbose: |
|---|
| 69 | n/a | print('Status from fcntl with O_NONBLOCK: ', rv) |
|---|
| 70 | n/a | rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata) |
|---|
| 71 | n/a | if verbose: |
|---|
| 72 | n/a | print('String from fcntl with F_SETLKW: ', repr(rv)) |
|---|
| 73 | n/a | self.f.close() |
|---|
| 74 | n/a | |
|---|
| 75 | n/a | def test_fcntl_file_descriptor(self): |
|---|
| 76 | n/a | # again, but pass the file rather than numeric descriptor |
|---|
| 77 | n/a | self.f = open(TESTFN, 'wb') |
|---|
| 78 | n/a | rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 79 | n/a | if verbose: |
|---|
| 80 | n/a | print('Status from fcntl with O_NONBLOCK: ', rv) |
|---|
| 81 | n/a | rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata) |
|---|
| 82 | n/a | if verbose: |
|---|
| 83 | n/a | print('String from fcntl with F_SETLKW: ', repr(rv)) |
|---|
| 84 | n/a | self.f.close() |
|---|
| 85 | n/a | |
|---|
| 86 | n/a | def test_fcntl_bad_file(self): |
|---|
| 87 | n/a | with self.assertRaises(ValueError): |
|---|
| 88 | n/a | fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 89 | n/a | with self.assertRaises(ValueError): |
|---|
| 90 | n/a | fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 91 | n/a | with self.assertRaises(TypeError): |
|---|
| 92 | n/a | fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 93 | n/a | with self.assertRaises(TypeError): |
|---|
| 94 | n/a | fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 95 | n/a | |
|---|
| 96 | n/a | @cpython_only |
|---|
| 97 | n/a | def test_fcntl_bad_file_overflow(self): |
|---|
| 98 | n/a | from _testcapi import INT_MAX, INT_MIN |
|---|
| 99 | n/a | # Issue 15989 |
|---|
| 100 | n/a | with self.assertRaises(OverflowError): |
|---|
| 101 | n/a | fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 102 | n/a | with self.assertRaises(OverflowError): |
|---|
| 103 | n/a | fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 104 | n/a | with self.assertRaises(OverflowError): |
|---|
| 105 | n/a | fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 106 | n/a | with self.assertRaises(OverflowError): |
|---|
| 107 | n/a | fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK) |
|---|
| 108 | n/a | |
|---|
| 109 | n/a | @unittest.skipIf( |
|---|
| 110 | n/a | platform.machine().startswith('arm') and platform.system() == 'Linux', |
|---|
| 111 | n/a | "ARM Linux returns EINVAL for F_NOTIFY DN_MULTISHOT") |
|---|
| 112 | n/a | def test_fcntl_64_bit(self): |
|---|
| 113 | n/a | # Issue #1309352: fcntl shouldn't fail when the third arg fits in a |
|---|
| 114 | n/a | # C 'long' but not in a C 'int'. |
|---|
| 115 | n/a | try: |
|---|
| 116 | n/a | cmd = fcntl.F_NOTIFY |
|---|
| 117 | n/a | # This flag is larger than 2**31 in 64-bit builds |
|---|
| 118 | n/a | flags = fcntl.DN_MULTISHOT |
|---|
| 119 | n/a | except AttributeError: |
|---|
| 120 | n/a | self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable") |
|---|
| 121 | n/a | fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY) |
|---|
| 122 | n/a | try: |
|---|
| 123 | n/a | fcntl.fcntl(fd, cmd, flags) |
|---|
| 124 | n/a | finally: |
|---|
| 125 | n/a | os.close(fd) |
|---|
| 126 | n/a | |
|---|
| 127 | n/a | def test_flock(self): |
|---|
| 128 | n/a | # Solaris needs readable file for shared lock |
|---|
| 129 | n/a | self.f = open(TESTFN, 'wb+') |
|---|
| 130 | n/a | fileno = self.f.fileno() |
|---|
| 131 | n/a | fcntl.flock(fileno, fcntl.LOCK_SH) |
|---|
| 132 | n/a | fcntl.flock(fileno, fcntl.LOCK_UN) |
|---|
| 133 | n/a | fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB) |
|---|
| 134 | n/a | fcntl.flock(self.f, fcntl.LOCK_UN) |
|---|
| 135 | n/a | fcntl.flock(fileno, fcntl.LOCK_EX) |
|---|
| 136 | n/a | fcntl.flock(fileno, fcntl.LOCK_UN) |
|---|
| 137 | n/a | |
|---|
| 138 | n/a | self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH) |
|---|
| 139 | n/a | self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH) |
|---|
| 140 | n/a | |
|---|
| 141 | n/a | @cpython_only |
|---|
| 142 | n/a | def test_flock_overflow(self): |
|---|
| 143 | n/a | import _testcapi |
|---|
| 144 | n/a | self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1, |
|---|
| 145 | n/a | fcntl.LOCK_SH) |
|---|
| 146 | n/a | |
|---|
| 147 | n/a | |
|---|
| 148 | n/a | def test_main(): |
|---|
| 149 | n/a | run_unittest(TestFcntl) |
|---|
| 150 | n/a | |
|---|
| 151 | n/a | if __name__ == '__main__': |
|---|
| 152 | n/a | test_main() |
|---|