ยปCore Development>Code coverage>Lib/multiprocessing/resource_sharer.py

Python code coverage for Lib/multiprocessing/resource_sharer.py

#countcontent
1n/a#
2n/a# We use a background thread for sharing fds on Unix, and for sharing sockets on
3n/a# Windows.
4n/a#
5n/a# A client which wants to pickle a resource registers it with the resource
6n/a# sharer and gets an identifier in return. The unpickling process will connect
7n/a# to the resource sharer, sends the identifier and its pid, and then receives
8n/a# the resource.
9n/a#
10n/a
11n/aimport os
12n/aimport signal
13n/aimport socket
14n/aimport sys
15n/aimport threading
16n/a
17n/afrom . import process
18n/afrom .context import reduction
19n/afrom . import util
20n/a
21n/a__all__ = ['stop']
22n/a
23n/a
24n/aif sys.platform == 'win32':
25n/a __all__ += ['DupSocket']
26n/a
27n/a class DupSocket(object):
28n/a '''Picklable wrapper for a socket.'''
29n/a def __init__(self, sock):
30n/a new_sock = sock.dup()
31n/a def send(conn, pid):
32n/a share = new_sock.share(pid)
33n/a conn.send_bytes(share)
34n/a self._id = _resource_sharer.register(send, new_sock.close)
35n/a
36n/a def detach(self):
37n/a '''Get the socket. This should only be called once.'''
38n/a with _resource_sharer.get_connection(self._id) as conn:
39n/a share = conn.recv_bytes()
40n/a return socket.fromshare(share)
41n/a
42n/aelse:
43n/a __all__ += ['DupFd']
44n/a
45n/a class DupFd(object):
46n/a '''Wrapper for fd which can be used at any time.'''
47n/a def __init__(self, fd):
48n/a new_fd = os.dup(fd)
49n/a def send(conn, pid):
50n/a reduction.send_handle(conn, new_fd, pid)
51n/a def close():
52n/a os.close(new_fd)
53n/a self._id = _resource_sharer.register(send, close)
54n/a
55n/a def detach(self):
56n/a '''Get the fd. This should only be called once.'''
57n/a with _resource_sharer.get_connection(self._id) as conn:
58n/a return reduction.recv_handle(conn)
59n/a
60n/a
61n/aclass _ResourceSharer(object):
62n/a '''Manager for resouces using background thread.'''
63n/a def __init__(self):
64n/a self._key = 0
65n/a self._cache = {}
66n/a self._old_locks = []
67n/a self._lock = threading.Lock()
68n/a self._listener = None
69n/a self._address = None
70n/a self._thread = None
71n/a util.register_after_fork(self, _ResourceSharer._afterfork)
72n/a
73n/a def register(self, send, close):
74n/a '''Register resource, returning an identifier.'''
75n/a with self._lock:
76n/a if self._address is None:
77n/a self._start()
78n/a self._key += 1
79n/a self._cache[self._key] = (send, close)
80n/a return (self._address, self._key)
81n/a
82n/a @staticmethod
83n/a def get_connection(ident):
84n/a '''Return connection from which to receive identified resource.'''
85n/a from .connection import Client
86n/a address, key = ident
87n/a c = Client(address, authkey=process.current_process().authkey)
88n/a c.send((key, os.getpid()))
89n/a return c
90n/a
91n/a def stop(self, timeout=None):
92n/a '''Stop the background thread and clear registered resources.'''
93n/a from .connection import Client
94n/a with self._lock:
95n/a if self._address is not None:
96n/a c = Client(self._address,
97n/a authkey=process.current_process().authkey)
98n/a c.send(None)
99n/a c.close()
100n/a self._thread.join(timeout)
101n/a if self._thread.is_alive():
102n/a util.sub_warning('_ResourceSharer thread did '
103n/a 'not stop when asked')
104n/a self._listener.close()
105n/a self._thread = None
106n/a self._address = None
107n/a self._listener = None
108n/a for key, (send, close) in self._cache.items():
109n/a close()
110n/a self._cache.clear()
111n/a
112n/a def _afterfork(self):
113n/a for key, (send, close) in self._cache.items():
114n/a close()
115n/a self._cache.clear()
116n/a # If self._lock was locked at the time of the fork, it may be broken
117n/a # -- see issue 6721. Replace it without letting it be gc'ed.
118n/a self._old_locks.append(self._lock)
119n/a self._lock = threading.Lock()
120n/a if self._listener is not None:
121n/a self._listener.close()
122n/a self._listener = None
123n/a self._address = None
124n/a self._thread = None
125n/a
126n/a def _start(self):
127n/a from .connection import Listener
128n/a assert self._listener is None
129n/a util.debug('starting listener and thread for sending handles')
130n/a self._listener = Listener(authkey=process.current_process().authkey)
131n/a self._address = self._listener.address
132n/a t = threading.Thread(target=self._serve)
133n/a t.daemon = True
134n/a t.start()
135n/a self._thread = t
136n/a
137n/a def _serve(self):
138n/a if hasattr(signal, 'pthread_sigmask'):
139n/a signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
140n/a while 1:
141n/a try:
142n/a with self._listener.accept() as conn:
143n/a msg = conn.recv()
144n/a if msg is None:
145n/a break
146n/a key, destination_pid = msg
147n/a send, close = self._cache.pop(key)
148n/a try:
149n/a send(conn, destination_pid)
150n/a finally:
151n/a close()
152n/a except:
153n/a if not util.is_exiting():
154n/a sys.excepthook(*sys.exc_info())
155n/a
156n/a
157n/a_resource_sharer = _ResourceSharer()
158n/astop = _resource_sharer.stop