ยปCore Development>Code coverage>Lib/multiprocessing/forkserver.py

Python code coverage for Lib/multiprocessing/forkserver.py

#countcontent
1n/aimport errno
2n/aimport os
3n/aimport selectors
4n/aimport signal
5n/aimport socket
6n/aimport struct
7n/aimport sys
8n/aimport threading
9n/a
10n/afrom . import connection
11n/afrom . import process
12n/afrom .context import reduction
13n/afrom . import semaphore_tracker
14n/afrom . import spawn
15n/afrom . import util
16n/a
17n/a__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
18n/a 'set_forkserver_preload']
19n/a
20n/a#
21n/a#
22n/a#
23n/a
24n/aMAXFDS_TO_SEND = 256
25n/aUNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
26n/a
27n/a#
28n/a# Forkserver class
29n/a#
30n/a
31n/aclass ForkServer(object):
32n/a
33n/a def __init__(self):
34n/a self._forkserver_address = None
35n/a self._forkserver_alive_fd = None
36n/a self._inherited_fds = None
37n/a self._lock = threading.Lock()
38n/a self._preload_modules = ['__main__']
39n/a
40n/a def set_forkserver_preload(self, modules_names):
41n/a '''Set list of module names to try to load in forkserver process.'''
42n/a if not all(type(mod) is str for mod in self._preload_modules):
43n/a raise TypeError('module_names must be a list of strings')
44n/a self._preload_modules = modules_names
45n/a
46n/a def get_inherited_fds(self):
47n/a '''Return list of fds inherited from parent process.
48n/a
49n/a This returns None if the current process was not started by fork
50n/a server.
51n/a '''
52n/a return self._inherited_fds
53n/a
54n/a def connect_to_new_process(self, fds):
55n/a '''Request forkserver to create a child process.
56n/a
57n/a Returns a pair of fds (status_r, data_w). The calling process can read
58n/a the child process's pid and (eventually) its returncode from status_r.
59n/a The calling process should write to data_w the pickled preparation and
60n/a process data.
61n/a '''
62n/a self.ensure_running()
63n/a if len(fds) + 4 >= MAXFDS_TO_SEND:
64n/a raise ValueError('too many fds')
65n/a with socket.socket(socket.AF_UNIX) as client:
66n/a client.connect(self._forkserver_address)
67n/a parent_r, child_w = os.pipe()
68n/a child_r, parent_w = os.pipe()
69n/a allfds = [child_r, child_w, self._forkserver_alive_fd,
70n/a semaphore_tracker.getfd()]
71n/a allfds += fds
72n/a try:
73n/a reduction.sendfds(client, allfds)
74n/a return parent_r, parent_w
75n/a except:
76n/a os.close(parent_r)
77n/a os.close(parent_w)
78n/a raise
79n/a finally:
80n/a os.close(child_r)
81n/a os.close(child_w)
82n/a
83n/a def ensure_running(self):
84n/a '''Make sure that a fork server is running.
85n/a
86n/a This can be called from any process. Note that usually a child
87n/a process will just reuse the forkserver started by its parent, so
88n/a ensure_running() will do nothing.
89n/a '''
90n/a with self._lock:
91n/a semaphore_tracker.ensure_running()
92n/a if self._forkserver_alive_fd is not None:
93n/a return
94n/a
95n/a cmd = ('from multiprocessing.forkserver import main; ' +
96n/a 'main(%d, %d, %r, **%r)')
97n/a
98n/a if self._preload_modules:
99n/a desired_keys = {'main_path', 'sys_path'}
100n/a data = spawn.get_preparation_data('ignore')
101n/a data = dict((x,y) for (x,y) in data.items()
102n/a if x in desired_keys)
103n/a else:
104n/a data = {}
105n/a
106n/a with socket.socket(socket.AF_UNIX) as listener:
107n/a address = connection.arbitrary_address('AF_UNIX')
108n/a listener.bind(address)
109n/a os.chmod(address, 0o600)
110n/a listener.listen()
111n/a
112n/a # all client processes own the write end of the "alive" pipe;
113n/a # when they all terminate the read end becomes ready.
114n/a alive_r, alive_w = os.pipe()
115n/a try:
116n/a fds_to_pass = [listener.fileno(), alive_r]
117n/a cmd %= (listener.fileno(), alive_r, self._preload_modules,
118n/a data)
119n/a exe = spawn.get_executable()
120n/a args = [exe] + util._args_from_interpreter_flags()
121n/a args += ['-c', cmd]
122n/a pid = util.spawnv_passfds(exe, args, fds_to_pass)
123n/a except:
124n/a os.close(alive_w)
125n/a raise
126n/a finally:
127n/a os.close(alive_r)
128n/a self._forkserver_address = address
129n/a self._forkserver_alive_fd = alive_w
130n/a
131n/a#
132n/a#
133n/a#
134n/a
135n/adef main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
136n/a '''Run forkserver.'''
137n/a if preload:
138n/a if '__main__' in preload and main_path is not None:
139n/a process.current_process()._inheriting = True
140n/a try:
141n/a spawn.import_main_path(main_path)
142n/a finally:
143n/a del process.current_process()._inheriting
144n/a for modname in preload:
145n/a try:
146n/a __import__(modname)
147n/a except ImportError:
148n/a pass
149n/a
150n/a util._close_stdin()
151n/a
152n/a # ignoring SIGCHLD means no need to reap zombie processes
153n/a handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
154n/a with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
155n/a selectors.DefaultSelector() as selector:
156n/a _forkserver._forkserver_address = listener.getsockname()
157n/a
158n/a selector.register(listener, selectors.EVENT_READ)
159n/a selector.register(alive_r, selectors.EVENT_READ)
160n/a
161n/a while True:
162n/a try:
163n/a while True:
164n/a rfds = [key.fileobj for (key, events) in selector.select()]
165n/a if rfds:
166n/a break
167n/a
168n/a if alive_r in rfds:
169n/a # EOF because no more client processes left
170n/a assert os.read(alive_r, 1) == b''
171n/a raise SystemExit
172n/a
173n/a assert listener in rfds
174n/a with listener.accept()[0] as s:
175n/a code = 1
176n/a if os.fork() == 0:
177n/a try:
178n/a _serve_one(s, listener, alive_r, handler)
179n/a except Exception:
180n/a sys.excepthook(*sys.exc_info())
181n/a sys.stderr.flush()
182n/a finally:
183n/a os._exit(code)
184n/a
185n/a except OSError as e:
186n/a if e.errno != errno.ECONNABORTED:
187n/a raise
188n/a
189n/adef _serve_one(s, listener, alive_r, handler):
190n/a # close unnecessary stuff and reset SIGCHLD handler
191n/a listener.close()
192n/a os.close(alive_r)
193n/a signal.signal(signal.SIGCHLD, handler)
194n/a
195n/a # receive fds from parent process
196n/a fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
197n/a s.close()
198n/a assert len(fds) <= MAXFDS_TO_SEND
199n/a (child_r, child_w, _forkserver._forkserver_alive_fd,
200n/a stfd, *_forkserver._inherited_fds) = fds
201n/a semaphore_tracker._semaphore_tracker._fd = stfd
202n/a
203n/a # send pid to client processes
204n/a write_unsigned(child_w, os.getpid())
205n/a
206n/a # reseed random number generator
207n/a if 'random' in sys.modules:
208n/a import random
209n/a random.seed()
210n/a
211n/a # run process object received over pipe
212n/a code = spawn._main(child_r)
213n/a
214n/a # write the exit code to the pipe
215n/a write_unsigned(child_w, code)
216n/a
217n/a#
218n/a# Read and write unsigned numbers
219n/a#
220n/a
221n/adef read_unsigned(fd):
222n/a data = b''
223n/a length = UNSIGNED_STRUCT.size
224n/a while len(data) < length:
225n/a s = os.read(fd, length - len(data))
226n/a if not s:
227n/a raise EOFError('unexpected EOF')
228n/a data += s
229n/a return UNSIGNED_STRUCT.unpack(data)[0]
230n/a
231n/adef write_unsigned(fd, n):
232n/a msg = UNSIGNED_STRUCT.pack(n)
233n/a while msg:
234n/a nbytes = os.write(fd, msg)
235n/a if nbytes == 0:
236n/a raise RuntimeError('should not get here')
237n/a msg = msg[nbytes:]
238n/a
239n/a#
240n/a#
241n/a#
242n/a
243n/a_forkserver = ForkServer()
244n/aensure_running = _forkserver.ensure_running
245n/aget_inherited_fds = _forkserver.get_inherited_fds
246n/aconnect_to_new_process = _forkserver.connect_to_new_process
247n/aset_forkserver_preload = _forkserver.set_forkserver_preload