ยปCore Development>Code coverage>Lib/multiprocessing/reduction.py

Python code coverage for Lib/multiprocessing/reduction.py

#countcontent
1n/a#
2n/a# Module which deals with pickling of objects.
3n/a#
4n/a# multiprocessing/reduction.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/afrom abc import ABCMeta, abstractmethod
11n/aimport copyreg
12n/aimport functools
13n/aimport io
14n/aimport os
15n/aimport pickle
16n/aimport socket
17n/aimport sys
18n/a
19n/afrom . import context
20n/a
21n/a__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
22n/a
23n/a
24n/aHAVE_SEND_HANDLE = (sys.platform == 'win32' or
25n/a (hasattr(socket, 'CMSG_LEN') and
26n/a hasattr(socket, 'SCM_RIGHTS') and
27n/a hasattr(socket.socket, 'sendmsg')))
28n/a
29n/a#
30n/a# Pickler subclass
31n/a#
32n/a
33n/aclass ForkingPickler(pickle.Pickler):
34n/a '''Pickler subclass used by multiprocessing.'''
35n/a _extra_reducers = {}
36n/a _copyreg_dispatch_table = copyreg.dispatch_table
37n/a
38n/a def __init__(self, *args):
39n/a super().__init__(*args)
40n/a self.dispatch_table = self._copyreg_dispatch_table.copy()
41n/a self.dispatch_table.update(self._extra_reducers)
42n/a
43n/a @classmethod
44n/a def register(cls, type, reduce):
45n/a '''Register a reduce function for a type.'''
46n/a cls._extra_reducers[type] = reduce
47n/a
48n/a @classmethod
49n/a def dumps(cls, obj, protocol=None):
50n/a buf = io.BytesIO()
51n/a cls(buf, protocol).dump(obj)
52n/a return buf.getbuffer()
53n/a
54n/a loads = pickle.loads
55n/a
56n/aregister = ForkingPickler.register
57n/a
58n/adef dump(obj, file, protocol=None):
59n/a '''Replacement for pickle.dump() using ForkingPickler.'''
60n/a ForkingPickler(file, protocol).dump(obj)
61n/a
62n/a#
63n/a# Platform specific definitions
64n/a#
65n/a
66n/aif sys.platform == 'win32':
67n/a # Windows
68n/a __all__ += ['DupHandle', 'duplicate', 'steal_handle']
69n/a import _winapi
70n/a
71n/a def duplicate(handle, target_process=None, inheritable=False):
72n/a '''Duplicate a handle. (target_process is a handle not a pid!)'''
73n/a if target_process is None:
74n/a target_process = _winapi.GetCurrentProcess()
75n/a return _winapi.DuplicateHandle(
76n/a _winapi.GetCurrentProcess(), handle, target_process,
77n/a 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
78n/a
79n/a def steal_handle(source_pid, handle):
80n/a '''Steal a handle from process identified by source_pid.'''
81n/a source_process_handle = _winapi.OpenProcess(
82n/a _winapi.PROCESS_DUP_HANDLE, False, source_pid)
83n/a try:
84n/a return _winapi.DuplicateHandle(
85n/a source_process_handle, handle,
86n/a _winapi.GetCurrentProcess(), 0, False,
87n/a _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
88n/a finally:
89n/a _winapi.CloseHandle(source_process_handle)
90n/a
91n/a def send_handle(conn, handle, destination_pid):
92n/a '''Send a handle over a local connection.'''
93n/a dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
94n/a conn.send(dh)
95n/a
96n/a def recv_handle(conn):
97n/a '''Receive a handle over a local connection.'''
98n/a return conn.recv().detach()
99n/a
100n/a class DupHandle(object):
101n/a '''Picklable wrapper for a handle.'''
102n/a def __init__(self, handle, access, pid=None):
103n/a if pid is None:
104n/a # We just duplicate the handle in the current process and
105n/a # let the receiving process steal the handle.
106n/a pid = os.getpid()
107n/a proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
108n/a try:
109n/a self._handle = _winapi.DuplicateHandle(
110n/a _winapi.GetCurrentProcess(),
111n/a handle, proc, access, False, 0)
112n/a finally:
113n/a _winapi.CloseHandle(proc)
114n/a self._access = access
115n/a self._pid = pid
116n/a
117n/a def detach(self):
118n/a '''Get the handle. This should only be called once.'''
119n/a # retrieve handle from process which currently owns it
120n/a if self._pid == os.getpid():
121n/a # The handle has already been duplicated for this process.
122n/a return self._handle
123n/a # We must steal the handle from the process whose pid is self._pid.
124n/a proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
125n/a self._pid)
126n/a try:
127n/a return _winapi.DuplicateHandle(
128n/a proc, self._handle, _winapi.GetCurrentProcess(),
129n/a self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
130n/a finally:
131n/a _winapi.CloseHandle(proc)
132n/a
133n/aelse:
134n/a # Unix
135n/a __all__ += ['DupFd', 'sendfds', 'recvfds']
136n/a import array
137n/a
138n/a # On MacOSX we should acknowledge receipt of fds -- see Issue14669
139n/a ACKNOWLEDGE = sys.platform == 'darwin'
140n/a
141n/a def sendfds(sock, fds):
142n/a '''Send an array of fds over an AF_UNIX socket.'''
143n/a fds = array.array('i', fds)
144n/a msg = bytes([len(fds) % 256])
145n/a sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
146n/a if ACKNOWLEDGE and sock.recv(1) != b'A':
147n/a raise RuntimeError('did not receive acknowledgement of fd')
148n/a
149n/a def recvfds(sock, size):
150n/a '''Receive an array of fds over an AF_UNIX socket.'''
151n/a a = array.array('i')
152n/a bytes_size = a.itemsize * size
153n/a msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
154n/a if not msg and not ancdata:
155n/a raise EOFError
156n/a try:
157n/a if ACKNOWLEDGE:
158n/a sock.send(b'A')
159n/a if len(ancdata) != 1:
160n/a raise RuntimeError('received %d items of ancdata' %
161n/a len(ancdata))
162n/a cmsg_level, cmsg_type, cmsg_data = ancdata[0]
163n/a if (cmsg_level == socket.SOL_SOCKET and
164n/a cmsg_type == socket.SCM_RIGHTS):
165n/a if len(cmsg_data) % a.itemsize != 0:
166n/a raise ValueError
167n/a a.frombytes(cmsg_data)
168n/a assert len(a) % 256 == msg[0]
169n/a return list(a)
170n/a except (ValueError, IndexError):
171n/a pass
172n/a raise RuntimeError('Invalid data received')
173n/a
174n/a def send_handle(conn, handle, destination_pid):
175n/a '''Send a handle over a local connection.'''
176n/a with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
177n/a sendfds(s, [handle])
178n/a
179n/a def recv_handle(conn):
180n/a '''Receive a handle over a local connection.'''
181n/a with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
182n/a return recvfds(s, 1)[0]
183n/a
184n/a def DupFd(fd):
185n/a '''Return a wrapper for an fd.'''
186n/a popen_obj = context.get_spawning_popen()
187n/a if popen_obj is not None:
188n/a return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
189n/a elif HAVE_SEND_HANDLE:
190n/a from . import resource_sharer
191n/a return resource_sharer.DupFd(fd)
192n/a else:
193n/a raise ValueError('SCM_RIGHTS appears not to be available')
194n/a
195n/a#
196n/a# Try making some callable types picklable
197n/a#
198n/a
199n/adef _reduce_method(m):
200n/a if m.__self__ is None:
201n/a return getattr, (m.__class__, m.__func__.__name__)
202n/a else:
203n/a return getattr, (m.__self__, m.__func__.__name__)
204n/aclass _C:
205n/a def f(self):
206n/a pass
207n/aregister(type(_C().f), _reduce_method)
208n/a
209n/a
210n/adef _reduce_method_descriptor(m):
211n/a return getattr, (m.__objclass__, m.__name__)
212n/aregister(type(list.append), _reduce_method_descriptor)
213n/aregister(type(int.__add__), _reduce_method_descriptor)
214n/a
215n/a
216n/adef _reduce_partial(p):
217n/a return _rebuild_partial, (p.func, p.args, p.keywords or {})
218n/adef _rebuild_partial(func, args, keywords):
219n/a return functools.partial(func, *args, **keywords)
220n/aregister(functools.partial, _reduce_partial)
221n/a
222n/a#
223n/a# Make sockets picklable
224n/a#
225n/a
226n/aif sys.platform == 'win32':
227n/a def _reduce_socket(s):
228n/a from .resource_sharer import DupSocket
229n/a return _rebuild_socket, (DupSocket(s),)
230n/a def _rebuild_socket(ds):
231n/a return ds.detach()
232n/a register(socket.socket, _reduce_socket)
233n/a
234n/aelse:
235n/a def _reduce_socket(s):
236n/a df = DupFd(s.fileno())
237n/a return _rebuild_socket, (df, s.family, s.type, s.proto)
238n/a def _rebuild_socket(df, family, type, proto):
239n/a fd = df.detach()
240n/a return socket.socket(family, type, proto, fileno=fd)
241n/a register(socket.socket, _reduce_socket)
242n/a
243n/a
244n/aclass AbstractReducer(metaclass=ABCMeta):
245n/a '''Abstract base class for use in implementing a Reduction class
246n/a suitable for use in replacing the standard reduction mechanism
247n/a used in multiprocessing.'''
248n/a ForkingPickler = ForkingPickler
249n/a register = register
250n/a dump = dump
251n/a send_handle = send_handle
252n/a recv_handle = recv_handle
253n/a
254n/a if sys.platform == 'win32':
255n/a steal_handle = steal_handle
256n/a duplicate = duplicate
257n/a DupHandle = DupHandle
258n/a else:
259n/a sendfds = sendfds
260n/a recvfds = recvfds
261n/a DupFd = DupFd
262n/a
263n/a _reduce_method = _reduce_method
264n/a _reduce_method_descriptor = _reduce_method_descriptor
265n/a _rebuild_partial = _rebuild_partial
266n/a _reduce_socket = _reduce_socket
267n/a _rebuild_socket = _rebuild_socket
268n/a
269n/a def __init__(self, *args):
270n/a register(type(_C().f), _reduce_method)
271n/a register(type(list.append), _reduce_method_descriptor)
272n/a register(type(int.__add__), _reduce_method_descriptor)
273n/a register(functools.partial, _reduce_partial)
274n/a register(socket.socket, _reduce_socket)