ยปCore Development>Code coverage>Lib/multiprocessing/semaphore_tracker.py

Python code coverage for Lib/multiprocessing/semaphore_tracker.py

#countcontent
1n/a#
2n/a# On Unix we run a server process which keeps track of unlinked
3n/a# semaphores. The server ignores SIGINT and SIGTERM and reads from a
4n/a# pipe. Every other process of the program has a copy of the writable
5n/a# end of the pipe, so we get EOF when all other processes have exited.
6n/a# Then the server process unlinks any remaining semaphore names.
7n/a#
8n/a# This is important because the system only supports a limited number
9n/a# of named semaphores, and they will not be automatically removed till
10n/a# the next reboot. Without this semaphore tracker process, "killall
11n/a# python" would probably leave unlinked semaphores.
12n/a#
13n/a
14n/aimport os
15n/aimport signal
16n/aimport sys
17n/aimport threading
18n/aimport warnings
19n/aimport _multiprocessing
20n/a
21n/afrom . import spawn
22n/afrom . import util
23n/a
24n/a__all__ = ['ensure_running', 'register', 'unregister']
25n/a
26n/a
27n/aclass SemaphoreTracker(object):
28n/a
29n/a def __init__(self):
30n/a self._lock = threading.Lock()
31n/a self._fd = None
32n/a
33n/a def getfd(self):
34n/a self.ensure_running()
35n/a return self._fd
36n/a
37n/a def ensure_running(self):
38n/a '''Make sure that semaphore tracker process is running.
39n/a
40n/a This can be run from any process. Usually a child process will use
41n/a the semaphore created by its parent.'''
42n/a with self._lock:
43n/a if self._fd is not None:
44n/a return
45n/a fds_to_pass = []
46n/a try:
47n/a fds_to_pass.append(sys.stderr.fileno())
48n/a except Exception:
49n/a pass
50n/a cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
51n/a r, w = os.pipe()
52n/a try:
53n/a fds_to_pass.append(r)
54n/a # process will out live us, so no need to wait on pid
55n/a exe = spawn.get_executable()
56n/a args = [exe] + util._args_from_interpreter_flags()
57n/a args += ['-c', cmd % r]
58n/a util.spawnv_passfds(exe, args, fds_to_pass)
59n/a except:
60n/a os.close(w)
61n/a raise
62n/a else:
63n/a self._fd = w
64n/a finally:
65n/a os.close(r)
66n/a
67n/a def register(self, name):
68n/a '''Register name of semaphore with semaphore tracker.'''
69n/a self._send('REGISTER', name)
70n/a
71n/a def unregister(self, name):
72n/a '''Unregister name of semaphore with semaphore tracker.'''
73n/a self._send('UNREGISTER', name)
74n/a
75n/a def _send(self, cmd, name):
76n/a self.ensure_running()
77n/a msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
78n/a if len(name) > 512:
79n/a # posix guarantees that writes to a pipe of less than PIPE_BUF
80n/a # bytes are atomic, and that PIPE_BUF >= 512
81n/a raise ValueError('name too long')
82n/a nbytes = os.write(self._fd, msg)
83n/a assert nbytes == len(msg)
84n/a
85n/a
86n/a_semaphore_tracker = SemaphoreTracker()
87n/aensure_running = _semaphore_tracker.ensure_running
88n/aregister = _semaphore_tracker.register
89n/aunregister = _semaphore_tracker.unregister
90n/agetfd = _semaphore_tracker.getfd
91n/a
92n/a
93n/adef main(fd):
94n/a '''Run semaphore tracker.'''
95n/a # protect the process from ^C and "killall python" etc
96n/a signal.signal(signal.SIGINT, signal.SIG_IGN)
97n/a signal.signal(signal.SIGTERM, signal.SIG_IGN)
98n/a
99n/a for f in (sys.stdin, sys.stdout):
100n/a try:
101n/a f.close()
102n/a except Exception:
103n/a pass
104n/a
105n/a cache = set()
106n/a try:
107n/a # keep track of registered/unregistered semaphores
108n/a with open(fd, 'rb') as f:
109n/a for line in f:
110n/a try:
111n/a cmd, name = line.strip().split(b':')
112n/a if cmd == b'REGISTER':
113n/a cache.add(name)
114n/a elif cmd == b'UNREGISTER':
115n/a cache.remove(name)
116n/a else:
117n/a raise RuntimeError('unrecognized command %r' % cmd)
118n/a except Exception:
119n/a try:
120n/a sys.excepthook(*sys.exc_info())
121n/a except:
122n/a pass
123n/a finally:
124n/a # all processes have terminated; cleanup any remaining semaphores
125n/a if cache:
126n/a try:
127n/a warnings.warn('semaphore_tracker: There appear to be %d '
128n/a 'leaked semaphores to clean up at shutdown' %
129n/a len(cache))
130n/a except Exception:
131n/a pass
132n/a for name in cache:
133n/a # For some reason the process which created and registered this
134n/a # semaphore has failed to unregister it. Presumably it has died.
135n/a # We therefore unlink it.
136n/a try:
137n/a name = name.decode('ascii')
138n/a try:
139n/a _multiprocessing.sem_unlink(name)
140n/a except Exception as e:
141n/a warnings.warn('semaphore_tracker: %r: %s' % (name, e))
142n/a finally:
143n/a pass