ยปCore Development>Code coverage>Lib/multiprocessing/managers.py

Python code coverage for Lib/multiprocessing/managers.py

#countcontent
1n/a#
2n/a# Module providing the `SyncManager` class for dealing
3n/a# with shared objects
4n/a#
5n/a# multiprocessing/managers.py
6n/a#
7n/a# Copyright (c) 2006-2008, R Oudkerk
8n/a# Licensed to PSF under a Contributor Agreement.
9n/a#
10n/a
11n/a__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12n/a
13n/a#
14n/a# Imports
15n/a#
16n/a
17n/aimport sys
18n/aimport threading
19n/aimport array
20n/aimport queue
21n/a
22n/afrom time import time as _time
23n/afrom traceback import format_exc
24n/a
25n/afrom . import connection
26n/afrom .context import reduction, get_spawning_popen
27n/afrom . import pool
28n/afrom . import process
29n/afrom . import util
30n/afrom . import get_context
31n/a
32n/a#
33n/a# Register some things for pickling
34n/a#
35n/a
36n/adef reduce_array(a):
37n/a return array.array, (a.typecode, a.tobytes())
38n/areduction.register(array.array, reduce_array)
39n/a
40n/aview_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
41n/aif view_types[0] is not list: # only needed in Py3.0
42n/a def rebuild_as_list(obj):
43n/a return list, (list(obj),)
44n/a for view_type in view_types:
45n/a reduction.register(view_type, rebuild_as_list)
46n/a
47n/a#
48n/a# Type for identifying shared objects
49n/a#
50n/a
51n/aclass Token(object):
52n/a '''
53n/a Type to uniquely indentify a shared object
54n/a '''
55n/a __slots__ = ('typeid', 'address', 'id')
56n/a
57n/a def __init__(self, typeid, address, id):
58n/a (self.typeid, self.address, self.id) = (typeid, address, id)
59n/a
60n/a def __getstate__(self):
61n/a return (self.typeid, self.address, self.id)
62n/a
63n/a def __setstate__(self, state):
64n/a (self.typeid, self.address, self.id) = state
65n/a
66n/a def __repr__(self):
67n/a return '%s(typeid=%r, address=%r, id=%r)' % \
68n/a (self.__class__.__name__, self.typeid, self.address, self.id)
69n/a
70n/a#
71n/a# Function for communication with a manager's server process
72n/a#
73n/a
74n/adef dispatch(c, id, methodname, args=(), kwds={}):
75n/a '''
76n/a Send a message to manager using connection `c` and return response
77n/a '''
78n/a c.send((id, methodname, args, kwds))
79n/a kind, result = c.recv()
80n/a if kind == '#RETURN':
81n/a return result
82n/a raise convert_to_error(kind, result)
83n/a
84n/adef convert_to_error(kind, result):
85n/a if kind == '#ERROR':
86n/a return result
87n/a elif kind == '#TRACEBACK':
88n/a assert type(result) is str
89n/a return RemoteError(result)
90n/a elif kind == '#UNSERIALIZABLE':
91n/a assert type(result) is str
92n/a return RemoteError('Unserializable message: %s\n' % result)
93n/a else:
94n/a return ValueError('Unrecognized message type')
95n/a
96n/aclass RemoteError(Exception):
97n/a def __str__(self):
98n/a return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99n/a
100n/a#
101n/a# Functions for finding the method names of an object
102n/a#
103n/a
104n/adef all_methods(obj):
105n/a '''
106n/a Return a list of names of methods of `obj`
107n/a '''
108n/a temp = []
109n/a for name in dir(obj):
110n/a func = getattr(obj, name)
111n/a if callable(func):
112n/a temp.append(name)
113n/a return temp
114n/a
115n/adef public_methods(obj):
116n/a '''
117n/a Return a list of names of methods of `obj` which do not start with '_'
118n/a '''
119n/a return [name for name in all_methods(obj) if name[0] != '_']
120n/a
121n/a#
122n/a# Server which is run in a process controlled by a manager
123n/a#
124n/a
125n/aclass Server(object):
126n/a '''
127n/a Server class which runs in a process controlled by a manager object
128n/a '''
129n/a public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130n/a 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131n/a
132n/a def __init__(self, registry, address, authkey, serializer):
133n/a assert isinstance(authkey, bytes)
134n/a self.registry = registry
135n/a self.authkey = process.AuthenticationString(authkey)
136n/a Listener, Client = listener_client[serializer]
137n/a
138n/a # do authentication later
139n/a self.listener = Listener(address=address, backlog=16)
140n/a self.address = self.listener.address
141n/a
142n/a self.id_to_obj = {'0': (None, ())}
143n/a self.id_to_refcount = {}
144n/a self.id_to_local_proxy_obj = {}
145n/a self.mutex = threading.Lock()
146n/a
147n/a def serve_forever(self):
148n/a '''
149n/a Run the server forever
150n/a '''
151n/a self.stop_event = threading.Event()
152n/a process.current_process()._manager_server = self
153n/a try:
154n/a accepter = threading.Thread(target=self.accepter)
155n/a accepter.daemon = True
156n/a accepter.start()
157n/a try:
158n/a while not self.stop_event.is_set():
159n/a self.stop_event.wait(1)
160n/a except (KeyboardInterrupt, SystemExit):
161n/a pass
162n/a finally:
163n/a if sys.stdout != sys.__stdout__:
164n/a util.debug('resetting stdout, stderr')
165n/a sys.stdout = sys.__stdout__
166n/a sys.stderr = sys.__stderr__
167n/a sys.exit(0)
168n/a
169n/a def accepter(self):
170n/a while True:
171n/a try:
172n/a c = self.listener.accept()
173n/a except OSError:
174n/a continue
175n/a t = threading.Thread(target=self.handle_request, args=(c,))
176n/a t.daemon = True
177n/a t.start()
178n/a
179n/a def handle_request(self, c):
180n/a '''
181n/a Handle a new connection
182n/a '''
183n/a funcname = result = request = None
184n/a try:
185n/a connection.deliver_challenge(c, self.authkey)
186n/a connection.answer_challenge(c, self.authkey)
187n/a request = c.recv()
188n/a ignore, funcname, args, kwds = request
189n/a assert funcname in self.public, '%r unrecognized' % funcname
190n/a func = getattr(self, funcname)
191n/a except Exception:
192n/a msg = ('#TRACEBACK', format_exc())
193n/a else:
194n/a try:
195n/a result = func(c, *args, **kwds)
196n/a except Exception:
197n/a msg = ('#TRACEBACK', format_exc())
198n/a else:
199n/a msg = ('#RETURN', result)
200n/a try:
201n/a c.send(msg)
202n/a except Exception as e:
203n/a try:
204n/a c.send(('#TRACEBACK', format_exc()))
205n/a except Exception:
206n/a pass
207n/a util.info('Failure to send message: %r', msg)
208n/a util.info(' ... request was %r', request)
209n/a util.info(' ... exception was %r', e)
210n/a
211n/a c.close()
212n/a
213n/a def serve_client(self, conn):
214n/a '''
215n/a Handle requests from the proxies in a particular process/thread
216n/a '''
217n/a util.debug('starting server thread to service %r',
218n/a threading.current_thread().name)
219n/a
220n/a recv = conn.recv
221n/a send = conn.send
222n/a id_to_obj = self.id_to_obj
223n/a
224n/a while not self.stop_event.is_set():
225n/a
226n/a try:
227n/a methodname = obj = None
228n/a request = recv()
229n/a ident, methodname, args, kwds = request
230n/a try:
231n/a obj, exposed, gettypeid = id_to_obj[ident]
232n/a except KeyError as ke:
233n/a try:
234n/a obj, exposed, gettypeid = \
235n/a self.id_to_local_proxy_obj[ident]
236n/a except KeyError as second_ke:
237n/a raise ke
238n/a
239n/a if methodname not in exposed:
240n/a raise AttributeError(
241n/a 'method %r of %r object is not in exposed=%r' %
242n/a (methodname, type(obj), exposed)
243n/a )
244n/a
245n/a function = getattr(obj, methodname)
246n/a
247n/a try:
248n/a res = function(*args, **kwds)
249n/a except Exception as e:
250n/a msg = ('#ERROR', e)
251n/a else:
252n/a typeid = gettypeid and gettypeid.get(methodname, None)
253n/a if typeid:
254n/a rident, rexposed = self.create(conn, typeid, res)
255n/a token = Token(typeid, self.address, rident)
256n/a msg = ('#PROXY', (rexposed, token))
257n/a else:
258n/a msg = ('#RETURN', res)
259n/a
260n/a except AttributeError:
261n/a if methodname is None:
262n/a msg = ('#TRACEBACK', format_exc())
263n/a else:
264n/a try:
265n/a fallback_func = self.fallback_mapping[methodname]
266n/a result = fallback_func(
267n/a self, conn, ident, obj, *args, **kwds
268n/a )
269n/a msg = ('#RETURN', result)
270n/a except Exception:
271n/a msg = ('#TRACEBACK', format_exc())
272n/a
273n/a except EOFError:
274n/a util.debug('got EOF -- exiting thread serving %r',
275n/a threading.current_thread().name)
276n/a sys.exit(0)
277n/a
278n/a except Exception:
279n/a msg = ('#TRACEBACK', format_exc())
280n/a
281n/a try:
282n/a try:
283n/a send(msg)
284n/a except Exception as e:
285n/a send(('#UNSERIALIZABLE', format_exc()))
286n/a except Exception as e:
287n/a util.info('exception in thread serving %r',
288n/a threading.current_thread().name)
289n/a util.info(' ... message was %r', msg)
290n/a util.info(' ... exception was %r', e)
291n/a conn.close()
292n/a sys.exit(1)
293n/a
294n/a def fallback_getvalue(self, conn, ident, obj):
295n/a return obj
296n/a
297n/a def fallback_str(self, conn, ident, obj):
298n/a return str(obj)
299n/a
300n/a def fallback_repr(self, conn, ident, obj):
301n/a return repr(obj)
302n/a
303n/a fallback_mapping = {
304n/a '__str__':fallback_str,
305n/a '__repr__':fallback_repr,
306n/a '#GETVALUE':fallback_getvalue
307n/a }
308n/a
309n/a def dummy(self, c):
310n/a pass
311n/a
312n/a def debug_info(self, c):
313n/a '''
314n/a Return some info --- useful to spot problems with refcounting
315n/a '''
316n/a with self.mutex:
317n/a result = []
318n/a keys = list(self.id_to_refcount.keys())
319n/a keys.sort()
320n/a for ident in keys:
321n/a if ident != '0':
322n/a result.append(' %s: refcount=%s\n %s' %
323n/a (ident, self.id_to_refcount[ident],
324n/a str(self.id_to_obj[ident][0])[:75]))
325n/a return '\n'.join(result)
326n/a
327n/a def number_of_objects(self, c):
328n/a '''
329n/a Number of shared objects
330n/a '''
331n/a # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
332n/a return len(self.id_to_refcount)
333n/a
334n/a def shutdown(self, c):
335n/a '''
336n/a Shutdown this process
337n/a '''
338n/a try:
339n/a util.debug('manager received shutdown message')
340n/a c.send(('#RETURN', None))
341n/a except:
342n/a import traceback
343n/a traceback.print_exc()
344n/a finally:
345n/a self.stop_event.set()
346n/a
347n/a def create(self, c, typeid, *args, **kwds):
348n/a '''
349n/a Create a new shared object and return its id
350n/a '''
351n/a with self.mutex:
352n/a callable, exposed, method_to_typeid, proxytype = \
353n/a self.registry[typeid]
354n/a
355n/a if callable is None:
356n/a assert len(args) == 1 and not kwds
357n/a obj = args[0]
358n/a else:
359n/a obj = callable(*args, **kwds)
360n/a
361n/a if exposed is None:
362n/a exposed = public_methods(obj)
363n/a if method_to_typeid is not None:
364n/a assert type(method_to_typeid) is dict
365n/a exposed = list(exposed) + list(method_to_typeid)
366n/a
367n/a ident = '%x' % id(obj) # convert to string because xmlrpclib
368n/a # only has 32 bit signed integers
369n/a util.debug('%r callable returned object with id %r', typeid, ident)
370n/a
371n/a self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
372n/a if ident not in self.id_to_refcount:
373n/a self.id_to_refcount[ident] = 0
374n/a
375n/a self.incref(c, ident)
376n/a return ident, tuple(exposed)
377n/a
378n/a def get_methods(self, c, token):
379n/a '''
380n/a Return the methods of the shared object indicated by token
381n/a '''
382n/a return tuple(self.id_to_obj[token.id][1])
383n/a
384n/a def accept_connection(self, c, name):
385n/a '''
386n/a Spawn a new thread to serve this connection
387n/a '''
388n/a threading.current_thread().name = name
389n/a c.send(('#RETURN', None))
390n/a self.serve_client(c)
391n/a
392n/a def incref(self, c, ident):
393n/a with self.mutex:
394n/a try:
395n/a self.id_to_refcount[ident] += 1
396n/a except KeyError as ke:
397n/a # If no external references exist but an internal (to the
398n/a # manager) still does and a new external reference is created
399n/a # from it, restore the manager's tracking of it from the
400n/a # previously stashed internal ref.
401n/a if ident in self.id_to_local_proxy_obj:
402n/a self.id_to_refcount[ident] = 1
403n/a self.id_to_obj[ident] = \
404n/a self.id_to_local_proxy_obj[ident]
405n/a obj, exposed, gettypeid = self.id_to_obj[ident]
406n/a util.debug('Server re-enabled tracking & INCREF %r', ident)
407n/a else:
408n/a raise ke
409n/a
410n/a def decref(self, c, ident):
411n/a if ident not in self.id_to_refcount and \
412n/a ident in self.id_to_local_proxy_obj:
413n/a util.debug('Server DECREF skipping %r', ident)
414n/a return
415n/a
416n/a with self.mutex:
417n/a assert self.id_to_refcount[ident] >= 1
418n/a self.id_to_refcount[ident] -= 1
419n/a if self.id_to_refcount[ident] == 0:
420n/a del self.id_to_refcount[ident]
421n/a
422n/a if ident not in self.id_to_refcount:
423n/a # Two-step process in case the object turns out to contain other
424n/a # proxy objects (e.g. a managed list of managed lists).
425n/a # Otherwise, deleting self.id_to_obj[ident] would trigger the
426n/a # deleting of the stored value (another managed object) which would
427n/a # in turn attempt to acquire the mutex that is already held here.
428n/a self.id_to_obj[ident] = (None, (), None) # thread-safe
429n/a util.debug('disposing of obj with id %r', ident)
430n/a with self.mutex:
431n/a del self.id_to_obj[ident]
432n/a
433n/a
434n/a#
435n/a# Class to represent state of a manager
436n/a#
437n/a
438n/aclass State(object):
439n/a __slots__ = ['value']
440n/a INITIAL = 0
441n/a STARTED = 1
442n/a SHUTDOWN = 2
443n/a
444n/a#
445n/a# Mapping from serializer name to Listener and Client types
446n/a#
447n/a
448n/alistener_client = {
449n/a 'pickle' : (connection.Listener, connection.Client),
450n/a 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
451n/a }
452n/a
453n/a#
454n/a# Definition of BaseManager
455n/a#
456n/a
457n/aclass BaseManager(object):
458n/a '''
459n/a Base class for managers
460n/a '''
461n/a _registry = {}
462n/a _Server = Server
463n/a
464n/a def __init__(self, address=None, authkey=None, serializer='pickle',
465n/a ctx=None):
466n/a if authkey is None:
467n/a authkey = process.current_process().authkey
468n/a self._address = address # XXX not final address if eg ('', 0)
469n/a self._authkey = process.AuthenticationString(authkey)
470n/a self._state = State()
471n/a self._state.value = State.INITIAL
472n/a self._serializer = serializer
473n/a self._Listener, self._Client = listener_client[serializer]
474n/a self._ctx = ctx or get_context()
475n/a
476n/a def get_server(self):
477n/a '''
478n/a Return server object with serve_forever() method and address attribute
479n/a '''
480n/a assert self._state.value == State.INITIAL
481n/a return Server(self._registry, self._address,
482n/a self._authkey, self._serializer)
483n/a
484n/a def connect(self):
485n/a '''
486n/a Connect manager object to the server process
487n/a '''
488n/a Listener, Client = listener_client[self._serializer]
489n/a conn = Client(self._address, authkey=self._authkey)
490n/a dispatch(conn, None, 'dummy')
491n/a self._state.value = State.STARTED
492n/a
493n/a def start(self, initializer=None, initargs=()):
494n/a '''
495n/a Spawn a server process for this manager object
496n/a '''
497n/a assert self._state.value == State.INITIAL
498n/a
499n/a if initializer is not None and not callable(initializer):
500n/a raise TypeError('initializer must be a callable')
501n/a
502n/a # pipe over which we will retrieve address of server
503n/a reader, writer = connection.Pipe(duplex=False)
504n/a
505n/a # spawn process which runs a server
506n/a self._process = self._ctx.Process(
507n/a target=type(self)._run_server,
508n/a args=(self._registry, self._address, self._authkey,
509n/a self._serializer, writer, initializer, initargs),
510n/a )
511n/a ident = ':'.join(str(i) for i in self._process._identity)
512n/a self._process.name = type(self).__name__ + '-' + ident
513n/a self._process.start()
514n/a
515n/a # get address of server
516n/a writer.close()
517n/a self._address = reader.recv()
518n/a reader.close()
519n/a
520n/a # register a finalizer
521n/a self._state.value = State.STARTED
522n/a self.shutdown = util.Finalize(
523n/a self, type(self)._finalize_manager,
524n/a args=(self._process, self._address, self._authkey,
525n/a self._state, self._Client),
526n/a exitpriority=0
527n/a )
528n/a
529n/a @classmethod
530n/a def _run_server(cls, registry, address, authkey, serializer, writer,
531n/a initializer=None, initargs=()):
532n/a '''
533n/a Create a server, report its address and run it
534n/a '''
535n/a if initializer is not None:
536n/a initializer(*initargs)
537n/a
538n/a # create server
539n/a server = cls._Server(registry, address, authkey, serializer)
540n/a
541n/a # inform parent process of the server's address
542n/a writer.send(server.address)
543n/a writer.close()
544n/a
545n/a # run the manager
546n/a util.info('manager serving at %r', server.address)
547n/a server.serve_forever()
548n/a
549n/a def _create(self, typeid, *args, **kwds):
550n/a '''
551n/a Create a new shared object; return the token and exposed tuple
552n/a '''
553n/a assert self._state.value == State.STARTED, 'server not yet started'
554n/a conn = self._Client(self._address, authkey=self._authkey)
555n/a try:
556n/a id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
557n/a finally:
558n/a conn.close()
559n/a return Token(typeid, self._address, id), exposed
560n/a
561n/a def join(self, timeout=None):
562n/a '''
563n/a Join the manager process (if it has been spawned)
564n/a '''
565n/a if self._process is not None:
566n/a self._process.join(timeout)
567n/a if not self._process.is_alive():
568n/a self._process = None
569n/a
570n/a def _debug_info(self):
571n/a '''
572n/a Return some info about the servers shared objects and connections
573n/a '''
574n/a conn = self._Client(self._address, authkey=self._authkey)
575n/a try:
576n/a return dispatch(conn, None, 'debug_info')
577n/a finally:
578n/a conn.close()
579n/a
580n/a def _number_of_objects(self):
581n/a '''
582n/a Return the number of shared objects
583n/a '''
584n/a conn = self._Client(self._address, authkey=self._authkey)
585n/a try:
586n/a return dispatch(conn, None, 'number_of_objects')
587n/a finally:
588n/a conn.close()
589n/a
590n/a def __enter__(self):
591n/a if self._state.value == State.INITIAL:
592n/a self.start()
593n/a assert self._state.value == State.STARTED
594n/a return self
595n/a
596n/a def __exit__(self, exc_type, exc_val, exc_tb):
597n/a self.shutdown()
598n/a
599n/a @staticmethod
600n/a def _finalize_manager(process, address, authkey, state, _Client):
601n/a '''
602n/a Shutdown the manager process; will be registered as a finalizer
603n/a '''
604n/a if process.is_alive():
605n/a util.info('sending shutdown message to manager')
606n/a try:
607n/a conn = _Client(address, authkey=authkey)
608n/a try:
609n/a dispatch(conn, None, 'shutdown')
610n/a finally:
611n/a conn.close()
612n/a except Exception:
613n/a pass
614n/a
615n/a process.join(timeout=1.0)
616n/a if process.is_alive():
617n/a util.info('manager still alive')
618n/a if hasattr(process, 'terminate'):
619n/a util.info('trying to `terminate()` manager process')
620n/a process.terminate()
621n/a process.join(timeout=0.1)
622n/a if process.is_alive():
623n/a util.info('manager still alive after terminate')
624n/a
625n/a state.value = State.SHUTDOWN
626n/a try:
627n/a del BaseProxy._address_to_local[address]
628n/a except KeyError:
629n/a pass
630n/a
631n/a address = property(lambda self: self._address)
632n/a
633n/a @classmethod
634n/a def register(cls, typeid, callable=None, proxytype=None, exposed=None,
635n/a method_to_typeid=None, create_method=True):
636n/a '''
637n/a Register a typeid with the manager type
638n/a '''
639n/a if '_registry' not in cls.__dict__:
640n/a cls._registry = cls._registry.copy()
641n/a
642n/a if proxytype is None:
643n/a proxytype = AutoProxy
644n/a
645n/a exposed = exposed or getattr(proxytype, '_exposed_', None)
646n/a
647n/a method_to_typeid = method_to_typeid or \
648n/a getattr(proxytype, '_method_to_typeid_', None)
649n/a
650n/a if method_to_typeid:
651n/a for key, value in list(method_to_typeid.items()):
652n/a assert type(key) is str, '%r is not a string' % key
653n/a assert type(value) is str, '%r is not a string' % value
654n/a
655n/a cls._registry[typeid] = (
656n/a callable, exposed, method_to_typeid, proxytype
657n/a )
658n/a
659n/a if create_method:
660n/a def temp(self, *args, **kwds):
661n/a util.debug('requesting creation of a shared %r object', typeid)
662n/a token, exp = self._create(typeid, *args, **kwds)
663n/a proxy = proxytype(
664n/a token, self._serializer, manager=self,
665n/a authkey=self._authkey, exposed=exp
666n/a )
667n/a conn = self._Client(token.address, authkey=self._authkey)
668n/a dispatch(conn, None, 'decref', (token.id,))
669n/a return proxy
670n/a temp.__name__ = typeid
671n/a setattr(cls, typeid, temp)
672n/a
673n/a#
674n/a# Subclass of set which get cleared after a fork
675n/a#
676n/a
677n/aclass ProcessLocalSet(set):
678n/a def __init__(self):
679n/a util.register_after_fork(self, lambda obj: obj.clear())
680n/a def __reduce__(self):
681n/a return type(self), ()
682n/a
683n/a#
684n/a# Definition of BaseProxy
685n/a#
686n/a
687n/aclass BaseProxy(object):
688n/a '''
689n/a A base for proxies of shared objects
690n/a '''
691n/a _address_to_local = {}
692n/a _mutex = util.ForkAwareThreadLock()
693n/a
694n/a def __init__(self, token, serializer, manager=None,
695n/a authkey=None, exposed=None, incref=True, manager_owned=False):
696n/a with BaseProxy._mutex:
697n/a tls_idset = BaseProxy._address_to_local.get(token.address, None)
698n/a if tls_idset is None:
699n/a tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
700n/a BaseProxy._address_to_local[token.address] = tls_idset
701n/a
702n/a # self._tls is used to record the connection used by this
703n/a # thread to communicate with the manager at token.address
704n/a self._tls = tls_idset[0]
705n/a
706n/a # self._idset is used to record the identities of all shared
707n/a # objects for which the current process owns references and
708n/a # which are in the manager at token.address
709n/a self._idset = tls_idset[1]
710n/a
711n/a self._token = token
712n/a self._id = self._token.id
713n/a self._manager = manager
714n/a self._serializer = serializer
715n/a self._Client = listener_client[serializer][1]
716n/a
717n/a # Should be set to True only when a proxy object is being created
718n/a # on the manager server; primary use case: nested proxy objects.
719n/a # RebuildProxy detects when a proxy is being created on the manager
720n/a # and sets this value appropriately.
721n/a self._owned_by_manager = manager_owned
722n/a
723n/a if authkey is not None:
724n/a self._authkey = process.AuthenticationString(authkey)
725n/a elif self._manager is not None:
726n/a self._authkey = self._manager._authkey
727n/a else:
728n/a self._authkey = process.current_process().authkey
729n/a
730n/a if incref:
731n/a self._incref()
732n/a
733n/a util.register_after_fork(self, BaseProxy._after_fork)
734n/a
735n/a def _connect(self):
736n/a util.debug('making connection to manager')
737n/a name = process.current_process().name
738n/a if threading.current_thread().name != 'MainThread':
739n/a name += '|' + threading.current_thread().name
740n/a conn = self._Client(self._token.address, authkey=self._authkey)
741n/a dispatch(conn, None, 'accept_connection', (name,))
742n/a self._tls.connection = conn
743n/a
744n/a def _callmethod(self, methodname, args=(), kwds={}):
745n/a '''
746n/a Try to call a method of the referrent and return a copy of the result
747n/a '''
748n/a try:
749n/a conn = self._tls.connection
750n/a except AttributeError:
751n/a util.debug('thread %r does not own a connection',
752n/a threading.current_thread().name)
753n/a self._connect()
754n/a conn = self._tls.connection
755n/a
756n/a conn.send((self._id, methodname, args, kwds))
757n/a kind, result = conn.recv()
758n/a
759n/a if kind == '#RETURN':
760n/a return result
761n/a elif kind == '#PROXY':
762n/a exposed, token = result
763n/a proxytype = self._manager._registry[token.typeid][-1]
764n/a token.address = self._token.address
765n/a proxy = proxytype(
766n/a token, self._serializer, manager=self._manager,
767n/a authkey=self._authkey, exposed=exposed
768n/a )
769n/a conn = self._Client(token.address, authkey=self._authkey)
770n/a dispatch(conn, None, 'decref', (token.id,))
771n/a return proxy
772n/a raise convert_to_error(kind, result)
773n/a
774n/a def _getvalue(self):
775n/a '''
776n/a Get a copy of the value of the referent
777n/a '''
778n/a return self._callmethod('#GETVALUE')
779n/a
780n/a def _incref(self):
781n/a if self._owned_by_manager:
782n/a util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
783n/a return
784n/a
785n/a conn = self._Client(self._token.address, authkey=self._authkey)
786n/a dispatch(conn, None, 'incref', (self._id,))
787n/a util.debug('INCREF %r', self._token.id)
788n/a
789n/a self._idset.add(self._id)
790n/a
791n/a state = self._manager and self._manager._state
792n/a
793n/a self._close = util.Finalize(
794n/a self, BaseProxy._decref,
795n/a args=(self._token, self._authkey, state,
796n/a self._tls, self._idset, self._Client),
797n/a exitpriority=10
798n/a )
799n/a
800n/a @staticmethod
801n/a def _decref(token, authkey, state, tls, idset, _Client):
802n/a idset.discard(token.id)
803n/a
804n/a # check whether manager is still alive
805n/a if state is None or state.value == State.STARTED:
806n/a # tell manager this process no longer cares about referent
807n/a try:
808n/a util.debug('DECREF %r', token.id)
809n/a conn = _Client(token.address, authkey=authkey)
810n/a dispatch(conn, None, 'decref', (token.id,))
811n/a except Exception as e:
812n/a util.debug('... decref failed %s', e)
813n/a
814n/a else:
815n/a util.debug('DECREF %r -- manager already shutdown', token.id)
816n/a
817n/a # check whether we can close this thread's connection because
818n/a # the process owns no more references to objects for this manager
819n/a if not idset and hasattr(tls, 'connection'):
820n/a util.debug('thread %r has no more proxies so closing conn',
821n/a threading.current_thread().name)
822n/a tls.connection.close()
823n/a del tls.connection
824n/a
825n/a def _after_fork(self):
826n/a self._manager = None
827n/a try:
828n/a self._incref()
829n/a except Exception as e:
830n/a # the proxy may just be for a manager which has shutdown
831n/a util.info('incref failed: %s' % e)
832n/a
833n/a def __reduce__(self):
834n/a kwds = {}
835n/a if get_spawning_popen() is not None:
836n/a kwds['authkey'] = self._authkey
837n/a
838n/a if getattr(self, '_isauto', False):
839n/a kwds['exposed'] = self._exposed_
840n/a return (RebuildProxy,
841n/a (AutoProxy, self._token, self._serializer, kwds))
842n/a else:
843n/a return (RebuildProxy,
844n/a (type(self), self._token, self._serializer, kwds))
845n/a
846n/a def __deepcopy__(self, memo):
847n/a return self._getvalue()
848n/a
849n/a def __repr__(self):
850n/a return '<%s object, typeid %r at %#x>' % \
851n/a (type(self).__name__, self._token.typeid, id(self))
852n/a
853n/a def __str__(self):
854n/a '''
855n/a Return representation of the referent (or a fall-back if that fails)
856n/a '''
857n/a try:
858n/a return self._callmethod('__repr__')
859n/a except Exception:
860n/a return repr(self)[:-1] + "; '__str__()' failed>"
861n/a
862n/a#
863n/a# Function used for unpickling
864n/a#
865n/a
866n/adef RebuildProxy(func, token, serializer, kwds):
867n/a '''
868n/a Function used for unpickling proxy objects.
869n/a '''
870n/a server = getattr(process.current_process(), '_manager_server', None)
871n/a if server and server.address == token.address:
872n/a util.debug('Rebuild a proxy owned by manager, token=%r', token)
873n/a kwds['manager_owned'] = True
874n/a if token.id not in server.id_to_local_proxy_obj:
875n/a server.id_to_local_proxy_obj[token.id] = \
876n/a server.id_to_obj[token.id]
877n/a incref = (
878n/a kwds.pop('incref', True) and
879n/a not getattr(process.current_process(), '_inheriting', False)
880n/a )
881n/a return func(token, serializer, incref=incref, **kwds)
882n/a
883n/a#
884n/a# Functions to create proxies and proxy types
885n/a#
886n/a
887n/adef MakeProxyType(name, exposed, _cache={}):
888n/a '''
889n/a Return a proxy type whose methods are given by `exposed`
890n/a '''
891n/a exposed = tuple(exposed)
892n/a try:
893n/a return _cache[(name, exposed)]
894n/a except KeyError:
895n/a pass
896n/a
897n/a dic = {}
898n/a
899n/a for meth in exposed:
900n/a exec('''def %s(self, *args, **kwds):
901n/a return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
902n/a
903n/a ProxyType = type(name, (BaseProxy,), dic)
904n/a ProxyType._exposed_ = exposed
905n/a _cache[(name, exposed)] = ProxyType
906n/a return ProxyType
907n/a
908n/a
909n/adef AutoProxy(token, serializer, manager=None, authkey=None,
910n/a exposed=None, incref=True):
911n/a '''
912n/a Return an auto-proxy for `token`
913n/a '''
914n/a _Client = listener_client[serializer][1]
915n/a
916n/a if exposed is None:
917n/a conn = _Client(token.address, authkey=authkey)
918n/a try:
919n/a exposed = dispatch(conn, None, 'get_methods', (token,))
920n/a finally:
921n/a conn.close()
922n/a
923n/a if authkey is None and manager is not None:
924n/a authkey = manager._authkey
925n/a if authkey is None:
926n/a authkey = process.current_process().authkey
927n/a
928n/a ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
929n/a proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
930n/a incref=incref)
931n/a proxy._isauto = True
932n/a return proxy
933n/a
934n/a#
935n/a# Types/callables which we will register with SyncManager
936n/a#
937n/a
938n/aclass Namespace(object):
939n/a def __init__(self, **kwds):
940n/a self.__dict__.update(kwds)
941n/a def __repr__(self):
942n/a items = list(self.__dict__.items())
943n/a temp = []
944n/a for name, value in items:
945n/a if not name.startswith('_'):
946n/a temp.append('%s=%r' % (name, value))
947n/a temp.sort()
948n/a return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
949n/a
950n/aclass Value(object):
951n/a def __init__(self, typecode, value, lock=True):
952n/a self._typecode = typecode
953n/a self._value = value
954n/a def get(self):
955n/a return self._value
956n/a def set(self, value):
957n/a self._value = value
958n/a def __repr__(self):
959n/a return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
960n/a value = property(get, set)
961n/a
962n/adef Array(typecode, sequence, lock=True):
963n/a return array.array(typecode, sequence)
964n/a
965n/a#
966n/a# Proxy types used by SyncManager
967n/a#
968n/a
969n/aclass IteratorProxy(BaseProxy):
970n/a _exposed_ = ('__next__', 'send', 'throw', 'close')
971n/a def __iter__(self):
972n/a return self
973n/a def __next__(self, *args):
974n/a return self._callmethod('__next__', args)
975n/a def send(self, *args):
976n/a return self._callmethod('send', args)
977n/a def throw(self, *args):
978n/a return self._callmethod('throw', args)
979n/a def close(self, *args):
980n/a return self._callmethod('close', args)
981n/a
982n/a
983n/aclass AcquirerProxy(BaseProxy):
984n/a _exposed_ = ('acquire', 'release')
985n/a def acquire(self, blocking=True, timeout=None):
986n/a args = (blocking,) if timeout is None else (blocking, timeout)
987n/a return self._callmethod('acquire', args)
988n/a def release(self):
989n/a return self._callmethod('release')
990n/a def __enter__(self):
991n/a return self._callmethod('acquire')
992n/a def __exit__(self, exc_type, exc_val, exc_tb):
993n/a return self._callmethod('release')
994n/a
995n/a
996n/aclass ConditionProxy(AcquirerProxy):
997n/a _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
998n/a def wait(self, timeout=None):
999n/a return self._callmethod('wait', (timeout,))
1000n/a def notify(self):
1001n/a return self._callmethod('notify')
1002n/a def notify_all(self):
1003n/a return self._callmethod('notify_all')
1004n/a def wait_for(self, predicate, timeout=None):
1005n/a result = predicate()
1006n/a if result:
1007n/a return result
1008n/a if timeout is not None:
1009n/a endtime = _time() + timeout
1010n/a else:
1011n/a endtime = None
1012n/a waittime = None
1013n/a while not result:
1014n/a if endtime is not None:
1015n/a waittime = endtime - _time()
1016n/a if waittime <= 0:
1017n/a break
1018n/a self.wait(waittime)
1019n/a result = predicate()
1020n/a return result
1021n/a
1022n/a
1023n/aclass EventProxy(BaseProxy):
1024n/a _exposed_ = ('is_set', 'set', 'clear', 'wait')
1025n/a def is_set(self):
1026n/a return self._callmethod('is_set')
1027n/a def set(self):
1028n/a return self._callmethod('set')
1029n/a def clear(self):
1030n/a return self._callmethod('clear')
1031n/a def wait(self, timeout=None):
1032n/a return self._callmethod('wait', (timeout,))
1033n/a
1034n/a
1035n/aclass BarrierProxy(BaseProxy):
1036n/a _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1037n/a def wait(self, timeout=None):
1038n/a return self._callmethod('wait', (timeout,))
1039n/a def abort(self):
1040n/a return self._callmethod('abort')
1041n/a def reset(self):
1042n/a return self._callmethod('reset')
1043n/a @property
1044n/a def parties(self):
1045n/a return self._callmethod('__getattribute__', ('parties',))
1046n/a @property
1047n/a def n_waiting(self):
1048n/a return self._callmethod('__getattribute__', ('n_waiting',))
1049n/a @property
1050n/a def broken(self):
1051n/a return self._callmethod('__getattribute__', ('broken',))
1052n/a
1053n/a
1054n/aclass NamespaceProxy(BaseProxy):
1055n/a _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1056n/a def __getattr__(self, key):
1057n/a if key[0] == '_':
1058n/a return object.__getattribute__(self, key)
1059n/a callmethod = object.__getattribute__(self, '_callmethod')
1060n/a return callmethod('__getattribute__', (key,))
1061n/a def __setattr__(self, key, value):
1062n/a if key[0] == '_':
1063n/a return object.__setattr__(self, key, value)
1064n/a callmethod = object.__getattribute__(self, '_callmethod')
1065n/a return callmethod('__setattr__', (key, value))
1066n/a def __delattr__(self, key):
1067n/a if key[0] == '_':
1068n/a return object.__delattr__(self, key)
1069n/a callmethod = object.__getattribute__(self, '_callmethod')
1070n/a return callmethod('__delattr__', (key,))
1071n/a
1072n/a
1073n/aclass ValueProxy(BaseProxy):
1074n/a _exposed_ = ('get', 'set')
1075n/a def get(self):
1076n/a return self._callmethod('get')
1077n/a def set(self, value):
1078n/a return self._callmethod('set', (value,))
1079n/a value = property(get, set)
1080n/a
1081n/a
1082n/aBaseListProxy = MakeProxyType('BaseListProxy', (
1083n/a '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1084n/a '__mul__', '__reversed__', '__rmul__', '__setitem__',
1085n/a 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1086n/a 'reverse', 'sort', '__imul__'
1087n/a ))
1088n/aclass ListProxy(BaseListProxy):
1089n/a def __iadd__(self, value):
1090n/a self._callmethod('extend', (value,))
1091n/a return self
1092n/a def __imul__(self, value):
1093n/a self._callmethod('__imul__', (value,))
1094n/a return self
1095n/a
1096n/a
1097n/aDictProxy = MakeProxyType('DictProxy', (
1098n/a '__contains__', '__delitem__', '__getitem__', '__len__',
1099n/a '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1100n/a 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1101n/a ))
1102n/a
1103n/a
1104n/aArrayProxy = MakeProxyType('ArrayProxy', (
1105n/a '__len__', '__getitem__', '__setitem__'
1106n/a ))
1107n/a
1108n/a
1109n/aBasePoolProxy = MakeProxyType('PoolProxy', (
1110n/a 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1111n/a 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1112n/a ))
1113n/aBasePoolProxy._method_to_typeid_ = {
1114n/a 'apply_async': 'AsyncResult',
1115n/a 'map_async': 'AsyncResult',
1116n/a 'starmap_async': 'AsyncResult',
1117n/a 'imap': 'Iterator',
1118n/a 'imap_unordered': 'Iterator'
1119n/a }
1120n/aclass PoolProxy(BasePoolProxy):
1121n/a def __enter__(self):
1122n/a return self
1123n/a def __exit__(self, exc_type, exc_val, exc_tb):
1124n/a self.terminate()
1125n/a
1126n/a#
1127n/a# Definition of SyncManager
1128n/a#
1129n/a
1130n/aclass SyncManager(BaseManager):
1131n/a '''
1132n/a Subclass of `BaseManager` which supports a number of shared object types.
1133n/a
1134n/a The types registered are those intended for the synchronization
1135n/a of threads, plus `dict`, `list` and `Namespace`.
1136n/a
1137n/a The `multiprocessing.Manager()` function creates started instances of
1138n/a this class.
1139n/a '''
1140n/a
1141n/aSyncManager.register('Queue', queue.Queue)
1142n/aSyncManager.register('JoinableQueue', queue.Queue)
1143n/aSyncManager.register('Event', threading.Event, EventProxy)
1144n/aSyncManager.register('Lock', threading.Lock, AcquirerProxy)
1145n/aSyncManager.register('RLock', threading.RLock, AcquirerProxy)
1146n/aSyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1147n/aSyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1148n/a AcquirerProxy)
1149n/aSyncManager.register('Condition', threading.Condition, ConditionProxy)
1150n/aSyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1151n/aSyncManager.register('Pool', pool.Pool, PoolProxy)
1152n/aSyncManager.register('list', list, ListProxy)
1153n/aSyncManager.register('dict', dict, DictProxy)
1154n/aSyncManager.register('Value', Value, ValueProxy)
1155n/aSyncManager.register('Array', Array, ArrayProxy)
1156n/aSyncManager.register('Namespace', Namespace, NamespaceProxy)
1157n/a
1158n/a# types returned by methods of PoolProxy
1159n/aSyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1160n/aSyncManager.register('AsyncResult', create_method=False)