ยปCore Development>Code coverage>Lib/idlelib/rpc.py

Python code coverage for Lib/idlelib/rpc.py

#countcontent
1n/a"""RPC Implementation, originally written for the Python Idle IDE
2n/a
3n/aFor security reasons, GvR requested that Idle's Python execution server process
4n/aconnect to the Idle process, which listens for the connection. Since Idle has
5n/aonly one client per server, this was not a limitation.
6n/a
7n/a +---------------------------------+ +-------------+
8n/a | socketserver.BaseRequestHandler | | SocketIO |
9n/a +---------------------------------+ +-------------+
10n/a ^ | register() |
11n/a | | unregister()|
12n/a | +-------------+
13n/a | ^ ^
14n/a | | |
15n/a | + -------------------+ |
16n/a | | |
17n/a +-------------------------+ +-----------------+
18n/a | RPCHandler | | RPCClient |
19n/a | [attribute of RPCServer]| | |
20n/a +-------------------------+ +-----------------+
21n/a
22n/aThe RPCServer handler class is expected to provide register/unregister methods.
23n/aRPCHandler inherits the mix-in class SocketIO, which provides these methods.
24n/a
25n/aSee the Idle run.main() docstring for further information on how this was
26n/aaccomplished in Idle.
27n/a
28n/a"""
29n/aimport builtins
30n/aimport copyreg
31n/aimport io
32n/aimport marshal
33n/aimport os
34n/aimport pickle
35n/aimport queue
36n/aimport select
37n/aimport socket
38n/aimport socketserver
39n/aimport struct
40n/aimport sys
41n/aimport threading
42n/aimport traceback
43n/aimport types
44n/a
45n/adef unpickle_code(ms):
46n/a co = marshal.loads(ms)
47n/a assert isinstance(co, types.CodeType)
48n/a return co
49n/a
50n/adef pickle_code(co):
51n/a assert isinstance(co, types.CodeType)
52n/a ms = marshal.dumps(co)
53n/a return unpickle_code, (ms,)
54n/a
55n/adef dumps(obj, protocol=None):
56n/a f = io.BytesIO()
57n/a p = CodePickler(f, protocol)
58n/a p.dump(obj)
59n/a return f.getvalue()
60n/a
61n/a
62n/aclass CodePickler(pickle.Pickler):
63n/a dispatch_table = {types.CodeType: pickle_code}
64n/a dispatch_table.update(copyreg.dispatch_table)
65n/a
66n/a
67n/aBUFSIZE = 8*1024
68n/aLOCALHOST = '127.0.0.1'
69n/a
70n/aclass RPCServer(socketserver.TCPServer):
71n/a
72n/a def __init__(self, addr, handlerclass=None):
73n/a if handlerclass is None:
74n/a handlerclass = RPCHandler
75n/a socketserver.TCPServer.__init__(self, addr, handlerclass)
76n/a
77n/a def server_bind(self):
78n/a "Override TCPServer method, no bind() phase for connecting entity"
79n/a pass
80n/a
81n/a def server_activate(self):
82n/a """Override TCPServer method, connect() instead of listen()
83n/a
84n/a Due to the reversed connection, self.server_address is actually the
85n/a address of the Idle Client to which we are connecting.
86n/a
87n/a """
88n/a self.socket.connect(self.server_address)
89n/a
90n/a def get_request(self):
91n/a "Override TCPServer method, return already connected socket"
92n/a return self.socket, self.server_address
93n/a
94n/a def handle_error(self, request, client_address):
95n/a """Override TCPServer method
96n/a
97n/a Error message goes to __stderr__. No error message if exiting
98n/a normally or socket raised EOF. Other exceptions not handled in
99n/a server code will cause os._exit.
100n/a
101n/a """
102n/a try:
103n/a raise
104n/a except SystemExit:
105n/a raise
106n/a except:
107n/a erf = sys.__stderr__
108n/a print('\n' + '-'*40, file=erf)
109n/a print('Unhandled server exception!', file=erf)
110n/a print('Thread: %s' % threading.current_thread().name, file=erf)
111n/a print('Client Address: ', client_address, file=erf)
112n/a print('Request: ', repr(request), file=erf)
113n/a traceback.print_exc(file=erf)
114n/a print('\n*** Unrecoverable, server exiting!', file=erf)
115n/a print('-'*40, file=erf)
116n/a os._exit(0)
117n/a
118n/a#----------------- end class RPCServer --------------------
119n/a
120n/aobjecttable = {}
121n/arequest_queue = queue.Queue(0)
122n/aresponse_queue = queue.Queue(0)
123n/a
124n/a
125n/aclass SocketIO(object):
126n/a
127n/a nextseq = 0
128n/a
129n/a def __init__(self, sock, objtable=None, debugging=None):
130n/a self.sockthread = threading.current_thread()
131n/a if debugging is not None:
132n/a self.debugging = debugging
133n/a self.sock = sock
134n/a if objtable is None:
135n/a objtable = objecttable
136n/a self.objtable = objtable
137n/a self.responses = {}
138n/a self.cvars = {}
139n/a
140n/a def close(self):
141n/a sock = self.sock
142n/a self.sock = None
143n/a if sock is not None:
144n/a sock.close()
145n/a
146n/a def exithook(self):
147n/a "override for specific exit action"
148n/a os._exit(0)
149n/a
150n/a def debug(self, *args):
151n/a if not self.debugging:
152n/a return
153n/a s = self.location + " " + str(threading.current_thread().name)
154n/a for a in args:
155n/a s = s + " " + str(a)
156n/a print(s, file=sys.__stderr__)
157n/a
158n/a def register(self, oid, object):
159n/a self.objtable[oid] = object
160n/a
161n/a def unregister(self, oid):
162n/a try:
163n/a del self.objtable[oid]
164n/a except KeyError:
165n/a pass
166n/a
167n/a def localcall(self, seq, request):
168n/a self.debug("localcall:", request)
169n/a try:
170n/a how, (oid, methodname, args, kwargs) = request
171n/a except TypeError:
172n/a return ("ERROR", "Bad request format")
173n/a if oid not in self.objtable:
174n/a return ("ERROR", "Unknown object id: %r" % (oid,))
175n/a obj = self.objtable[oid]
176n/a if methodname == "__methods__":
177n/a methods = {}
178n/a _getmethods(obj, methods)
179n/a return ("OK", methods)
180n/a if methodname == "__attributes__":
181n/a attributes = {}
182n/a _getattributes(obj, attributes)
183n/a return ("OK", attributes)
184n/a if not hasattr(obj, methodname):
185n/a return ("ERROR", "Unsupported method name: %r" % (methodname,))
186n/a method = getattr(obj, methodname)
187n/a try:
188n/a if how == 'CALL':
189n/a ret = method(*args, **kwargs)
190n/a if isinstance(ret, RemoteObject):
191n/a ret = remoteref(ret)
192n/a return ("OK", ret)
193n/a elif how == 'QUEUE':
194n/a request_queue.put((seq, (method, args, kwargs)))
195n/a return("QUEUED", None)
196n/a else:
197n/a return ("ERROR", "Unsupported message type: %s" % how)
198n/a except SystemExit:
199n/a raise
200n/a except KeyboardInterrupt:
201n/a raise
202n/a except OSError:
203n/a raise
204n/a except Exception as ex:
205n/a return ("CALLEXC", ex)
206n/a except:
207n/a msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
208n/a " Object: %s \n Method: %s \n Args: %s\n"
209n/a print(msg % (oid, method, args), file=sys.__stderr__)
210n/a traceback.print_exc(file=sys.__stderr__)
211n/a return ("EXCEPTION", None)
212n/a
213n/a def remotecall(self, oid, methodname, args, kwargs):
214n/a self.debug("remotecall:asynccall: ", oid, methodname)
215n/a seq = self.asynccall(oid, methodname, args, kwargs)
216n/a return self.asyncreturn(seq)
217n/a
218n/a def remotequeue(self, oid, methodname, args, kwargs):
219n/a self.debug("remotequeue:asyncqueue: ", oid, methodname)
220n/a seq = self.asyncqueue(oid, methodname, args, kwargs)
221n/a return self.asyncreturn(seq)
222n/a
223n/a def asynccall(self, oid, methodname, args, kwargs):
224n/a request = ("CALL", (oid, methodname, args, kwargs))
225n/a seq = self.newseq()
226n/a if threading.current_thread() != self.sockthread:
227n/a cvar = threading.Condition()
228n/a self.cvars[seq] = cvar
229n/a self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
230n/a self.putmessage((seq, request))
231n/a return seq
232n/a
233n/a def asyncqueue(self, oid, methodname, args, kwargs):
234n/a request = ("QUEUE", (oid, methodname, args, kwargs))
235n/a seq = self.newseq()
236n/a if threading.current_thread() != self.sockthread:
237n/a cvar = threading.Condition()
238n/a self.cvars[seq] = cvar
239n/a self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
240n/a self.putmessage((seq, request))
241n/a return seq
242n/a
243n/a def asyncreturn(self, seq):
244n/a self.debug("asyncreturn:%d:call getresponse(): " % seq)
245n/a response = self.getresponse(seq, wait=0.05)
246n/a self.debug(("asyncreturn:%d:response: " % seq), response)
247n/a return self.decoderesponse(response)
248n/a
249n/a def decoderesponse(self, response):
250n/a how, what = response
251n/a if how == "OK":
252n/a return what
253n/a if how == "QUEUED":
254n/a return None
255n/a if how == "EXCEPTION":
256n/a self.debug("decoderesponse: EXCEPTION")
257n/a return None
258n/a if how == "EOF":
259n/a self.debug("decoderesponse: EOF")
260n/a self.decode_interrupthook()
261n/a return None
262n/a if how == "ERROR":
263n/a self.debug("decoderesponse: Internal ERROR:", what)
264n/a raise RuntimeError(what)
265n/a if how == "CALLEXC":
266n/a self.debug("decoderesponse: Call Exception:", what)
267n/a raise what
268n/a raise SystemError(how, what)
269n/a
270n/a def decode_interrupthook(self):
271n/a ""
272n/a raise EOFError
273n/a
274n/a def mainloop(self):
275n/a """Listen on socket until I/O not ready or EOF
276n/a
277n/a pollresponse() will loop looking for seq number None, which
278n/a never comes, and exit on EOFError.
279n/a
280n/a """
281n/a try:
282n/a self.getresponse(myseq=None, wait=0.05)
283n/a except EOFError:
284n/a self.debug("mainloop:return")
285n/a return
286n/a
287n/a def getresponse(self, myseq, wait):
288n/a response = self._getresponse(myseq, wait)
289n/a if response is not None:
290n/a how, what = response
291n/a if how == "OK":
292n/a response = how, self._proxify(what)
293n/a return response
294n/a
295n/a def _proxify(self, obj):
296n/a if isinstance(obj, RemoteProxy):
297n/a return RPCProxy(self, obj.oid)
298n/a if isinstance(obj, list):
299n/a return list(map(self._proxify, obj))
300n/a # XXX Check for other types -- not currently needed
301n/a return obj
302n/a
303n/a def _getresponse(self, myseq, wait):
304n/a self.debug("_getresponse:myseq:", myseq)
305n/a if threading.current_thread() is self.sockthread:
306n/a # this thread does all reading of requests or responses
307n/a while 1:
308n/a response = self.pollresponse(myseq, wait)
309n/a if response is not None:
310n/a return response
311n/a else:
312n/a # wait for notification from socket handling thread
313n/a cvar = self.cvars[myseq]
314n/a cvar.acquire()
315n/a while myseq not in self.responses:
316n/a cvar.wait()
317n/a response = self.responses[myseq]
318n/a self.debug("_getresponse:%s: thread woke up: response: %s" %
319n/a (myseq, response))
320n/a del self.responses[myseq]
321n/a del self.cvars[myseq]
322n/a cvar.release()
323n/a return response
324n/a
325n/a def newseq(self):
326n/a self.nextseq = seq = self.nextseq + 2
327n/a return seq
328n/a
329n/a def putmessage(self, message):
330n/a self.debug("putmessage:%d:" % message[0])
331n/a try:
332n/a s = dumps(message)
333n/a except pickle.PicklingError:
334n/a print("Cannot pickle:", repr(message), file=sys.__stderr__)
335n/a raise
336n/a s = struct.pack("<i", len(s)) + s
337n/a while len(s) > 0:
338n/a try:
339n/a r, w, x = select.select([], [self.sock], [])
340n/a n = self.sock.send(s[:BUFSIZE])
341n/a except (AttributeError, TypeError):
342n/a raise OSError("socket no longer exists")
343n/a s = s[n:]
344n/a
345n/a buff = b''
346n/a bufneed = 4
347n/a bufstate = 0 # meaning: 0 => reading count; 1 => reading data
348n/a
349n/a def pollpacket(self, wait):
350n/a self._stage0()
351n/a if len(self.buff) < self.bufneed:
352n/a r, w, x = select.select([self.sock.fileno()], [], [], wait)
353n/a if len(r) == 0:
354n/a return None
355n/a try:
356n/a s = self.sock.recv(BUFSIZE)
357n/a except OSError:
358n/a raise EOFError
359n/a if len(s) == 0:
360n/a raise EOFError
361n/a self.buff += s
362n/a self._stage0()
363n/a return self._stage1()
364n/a
365n/a def _stage0(self):
366n/a if self.bufstate == 0 and len(self.buff) >= 4:
367n/a s = self.buff[:4]
368n/a self.buff = self.buff[4:]
369n/a self.bufneed = struct.unpack("<i", s)[0]
370n/a self.bufstate = 1
371n/a
372n/a def _stage1(self):
373n/a if self.bufstate == 1 and len(self.buff) >= self.bufneed:
374n/a packet = self.buff[:self.bufneed]
375n/a self.buff = self.buff[self.bufneed:]
376n/a self.bufneed = 4
377n/a self.bufstate = 0
378n/a return packet
379n/a
380n/a def pollmessage(self, wait):
381n/a packet = self.pollpacket(wait)
382n/a if packet is None:
383n/a return None
384n/a try:
385n/a message = pickle.loads(packet)
386n/a except pickle.UnpicklingError:
387n/a print("-----------------------", file=sys.__stderr__)
388n/a print("cannot unpickle packet:", repr(packet), file=sys.__stderr__)
389n/a traceback.print_stack(file=sys.__stderr__)
390n/a print("-----------------------", file=sys.__stderr__)
391n/a raise
392n/a return message
393n/a
394n/a def pollresponse(self, myseq, wait):
395n/a """Handle messages received on the socket.
396n/a
397n/a Some messages received may be asynchronous 'call' or 'queue' requests,
398n/a and some may be responses for other threads.
399n/a
400n/a 'call' requests are passed to self.localcall() with the expectation of
401n/a immediate execution, during which time the socket is not serviced.
402n/a
403n/a 'queue' requests are used for tasks (which may block or hang) to be
404n/a processed in a different thread. These requests are fed into
405n/a request_queue by self.localcall(). Responses to queued requests are
406n/a taken from response_queue and sent across the link with the associated
407n/a sequence numbers. Messages in the queues are (sequence_number,
408n/a request/response) tuples and code using this module removing messages
409n/a from the request_queue is responsible for returning the correct
410n/a sequence number in the response_queue.
411n/a
412n/a pollresponse() will loop until a response message with the myseq
413n/a sequence number is received, and will save other responses in
414n/a self.responses and notify the owning thread.
415n/a
416n/a """
417n/a while 1:
418n/a # send queued response if there is one available
419n/a try:
420n/a qmsg = response_queue.get(0)
421n/a except queue.Empty:
422n/a pass
423n/a else:
424n/a seq, response = qmsg
425n/a message = (seq, ('OK', response))
426n/a self.putmessage(message)
427n/a # poll for message on link
428n/a try:
429n/a message = self.pollmessage(wait)
430n/a if message is None: # socket not ready
431n/a return None
432n/a except EOFError:
433n/a self.handle_EOF()
434n/a return None
435n/a except AttributeError:
436n/a return None
437n/a seq, resq = message
438n/a how = resq[0]
439n/a self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
440n/a # process or queue a request
441n/a if how in ("CALL", "QUEUE"):
442n/a self.debug("pollresponse:%d:localcall:call:" % seq)
443n/a response = self.localcall(seq, resq)
444n/a self.debug("pollresponse:%d:localcall:response:%s"
445n/a % (seq, response))
446n/a if how == "CALL":
447n/a self.putmessage((seq, response))
448n/a elif how == "QUEUE":
449n/a # don't acknowledge the 'queue' request!
450n/a pass
451n/a continue
452n/a # return if completed message transaction
453n/a elif seq == myseq:
454n/a return resq
455n/a # must be a response for a different thread:
456n/a else:
457n/a cv = self.cvars.get(seq, None)
458n/a # response involving unknown sequence number is discarded,
459n/a # probably intended for prior incarnation of server
460n/a if cv is not None:
461n/a cv.acquire()
462n/a self.responses[seq] = resq
463n/a cv.notify()
464n/a cv.release()
465n/a continue
466n/a
467n/a def handle_EOF(self):
468n/a "action taken upon link being closed by peer"
469n/a self.EOFhook()
470n/a self.debug("handle_EOF")
471n/a for key in self.cvars:
472n/a cv = self.cvars[key]
473n/a cv.acquire()
474n/a self.responses[key] = ('EOF', None)
475n/a cv.notify()
476n/a cv.release()
477n/a # call our (possibly overridden) exit function
478n/a self.exithook()
479n/a
480n/a def EOFhook(self):
481n/a "Classes using rpc client/server can override to augment EOF action"
482n/a pass
483n/a
484n/a#----------------- end class SocketIO --------------------
485n/a
486n/aclass RemoteObject(object):
487n/a # Token mix-in class
488n/a pass
489n/a
490n/a
491n/adef remoteref(obj):
492n/a oid = id(obj)
493n/a objecttable[oid] = obj
494n/a return RemoteProxy(oid)
495n/a
496n/a
497n/aclass RemoteProxy(object):
498n/a
499n/a def __init__(self, oid):
500n/a self.oid = oid
501n/a
502n/a
503n/aclass RPCHandler(socketserver.BaseRequestHandler, SocketIO):
504n/a
505n/a debugging = False
506n/a location = "#S" # Server
507n/a
508n/a def __init__(self, sock, addr, svr):
509n/a svr.current_handler = self ## cgt xxx
510n/a SocketIO.__init__(self, sock)
511n/a socketserver.BaseRequestHandler.__init__(self, sock, addr, svr)
512n/a
513n/a def handle(self):
514n/a "handle() method required by socketserver"
515n/a self.mainloop()
516n/a
517n/a def get_remote_proxy(self, oid):
518n/a return RPCProxy(self, oid)
519n/a
520n/a
521n/aclass RPCClient(SocketIO):
522n/a
523n/a debugging = False
524n/a location = "#C" # Client
525n/a
526n/a nextseq = 1 # Requests coming from the client are odd numbered
527n/a
528n/a def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
529n/a self.listening_sock = socket.socket(family, type)
530n/a self.listening_sock.bind(address)
531n/a self.listening_sock.listen(1)
532n/a
533n/a def accept(self):
534n/a working_sock, address = self.listening_sock.accept()
535n/a if self.debugging:
536n/a print("****** Connection request from ", address, file=sys.__stderr__)
537n/a if address[0] == LOCALHOST:
538n/a SocketIO.__init__(self, working_sock)
539n/a else:
540n/a print("** Invalid host: ", address, file=sys.__stderr__)
541n/a raise OSError
542n/a
543n/a def get_remote_proxy(self, oid):
544n/a return RPCProxy(self, oid)
545n/a
546n/a
547n/aclass RPCProxy(object):
548n/a
549n/a __methods = None
550n/a __attributes = None
551n/a
552n/a def __init__(self, sockio, oid):
553n/a self.sockio = sockio
554n/a self.oid = oid
555n/a
556n/a def __getattr__(self, name):
557n/a if self.__methods is None:
558n/a self.__getmethods()
559n/a if self.__methods.get(name):
560n/a return MethodProxy(self.sockio, self.oid, name)
561n/a if self.__attributes is None:
562n/a self.__getattributes()
563n/a if name in self.__attributes:
564n/a value = self.sockio.remotecall(self.oid, '__getattribute__',
565n/a (name,), {})
566n/a return value
567n/a else:
568n/a raise AttributeError(name)
569n/a
570n/a def __getattributes(self):
571n/a self.__attributes = self.sockio.remotecall(self.oid,
572n/a "__attributes__", (), {})
573n/a
574n/a def __getmethods(self):
575n/a self.__methods = self.sockio.remotecall(self.oid,
576n/a "__methods__", (), {})
577n/a
578n/adef _getmethods(obj, methods):
579n/a # Helper to get a list of methods from an object
580n/a # Adds names to dictionary argument 'methods'
581n/a for name in dir(obj):
582n/a attr = getattr(obj, name)
583n/a if callable(attr):
584n/a methods[name] = 1
585n/a if isinstance(obj, type):
586n/a for super in obj.__bases__:
587n/a _getmethods(super, methods)
588n/a
589n/adef _getattributes(obj, attributes):
590n/a for name in dir(obj):
591n/a attr = getattr(obj, name)
592n/a if not callable(attr):
593n/a attributes[name] = 1
594n/a
595n/a
596n/aclass MethodProxy(object):
597n/a
598n/a def __init__(self, sockio, oid, name):
599n/a self.sockio = sockio
600n/a self.oid = oid
601n/a self.name = name
602n/a
603n/a def __call__(self, *args, **kwargs):
604n/a value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
605n/a return value
606n/a
607n/a
608n/a# XXX KBK 09Sep03 We need a proper unit test for this module. Previously
609n/a# existing test code was removed at Rev 1.27 (r34098).
610n/a
611n/adef displayhook(value):
612n/a """Override standard display hook to use non-locale encoding"""
613n/a if value is None:
614n/a return
615n/a # Set '_' to None to avoid recursion
616n/a builtins._ = None
617n/a text = repr(value)
618n/a try:
619n/a sys.stdout.write(text)
620n/a except UnicodeEncodeError:
621n/a # let's use ascii while utf8-bmp codec doesn't present
622n/a encoding = 'ascii'
623n/a bytes = text.encode(encoding, 'backslashreplace')
624n/a text = bytes.decode(encoding, 'strict')
625n/a sys.stdout.write(text)
626n/a sys.stdout.write("\n")
627n/a builtins._ = value