ยปCore Development>Code coverage>Lib/SocketServer.py

Python code coverage for Lib/SocketServer.py

#countcontent
1n/a"""Generic socket server classes.
2n/a
3n/aThis module tries to capture the various aspects of defining a server:
4n/a
5n/aFor socket-based servers:
6n/a
7n/a- address family:
8n/a - AF_INET{,6}: IP (Internet Protocol) sockets (default)
9n/a - AF_UNIX: Unix domain sockets
10n/a - others, e.g. AF_DECNET are conceivable (see <socket.h>
11n/a- socket type:
12n/a - SOCK_STREAM (reliable stream, e.g. TCP)
13n/a - SOCK_DGRAM (datagrams, e.g. UDP)
14n/a
15n/aFor request-based servers (including socket-based):
16n/a
17n/a- client address verification before further looking at the request
18n/a (This is actually a hook for any processing that needs to look
19n/a at the request before anything else, e.g. logging)
20n/a- how to handle multiple requests:
21n/a - synchronous (one request is handled at a time)
22n/a - forking (each request is handled by a new process)
23n/a - threading (each request is handled by a new thread)
24n/a
25n/aThe classes in this module favor the server type that is simplest to
26n/awrite: a synchronous TCP/IP server. This is bad class design, but
27n/asave some typing. (There's also the issue that a deep class hierarchy
28n/aslows down method lookups.)
29n/a
30n/aThere are five classes in an inheritance diagram, four of which represent
31n/asynchronous servers of four types:
32n/a
33n/a +------------+
34n/a | BaseServer |
35n/a +------------+
36n/a |
37n/a v
38n/a +-----------+ +------------------+
39n/a | TCPServer |------->| UnixStreamServer |
40n/a +-----------+ +------------------+
41n/a |
42n/a v
43n/a +-----------+ +--------------------+
44n/a | UDPServer |------->| UnixDatagramServer |
45n/a +-----------+ +--------------------+
46n/a
47n/aNote that UnixDatagramServer derives from UDPServer, not from
48n/aUnixStreamServer -- the only difference between an IP and a Unix
49n/astream server is the address family, which is simply repeated in both
50n/aunix server classes.
51n/a
52n/aForking and threading versions of each type of server can be created
53n/ausing the ForkingMixIn and ThreadingMixIn mix-in classes. For
54n/ainstance, a threading UDP server class is created as follows:
55n/a
56n/a class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
57n/a
58n/aThe Mix-in class must come first, since it overrides a method defined
59n/ain UDPServer! Setting the various member variables also changes
60n/athe behavior of the underlying server mechanism.
61n/a
62n/aTo implement a service, you must derive a class from
63n/aBaseRequestHandler and redefine its handle() method. You can then run
64n/avarious versions of the service by combining one of the server classes
65n/awith your request handler class.
66n/a
67n/aThe request handler class must be different for datagram or stream
68n/aservices. This can be hidden by using the request handler
69n/asubclasses StreamRequestHandler or DatagramRequestHandler.
70n/a
71n/aOf course, you still have to use your head!
72n/a
73n/aFor instance, it makes no sense to use a forking server if the service
74n/acontains state in memory that can be modified by requests (since the
75n/amodifications in the child process would never reach the initial state
76n/akept in the parent process and passed to each child). In this case,
77n/ayou can use a threading server, but you will probably have to use
78n/alocks to avoid two requests that come in nearly simultaneous to apply
79n/aconflicting changes to the server state.
80n/a
81n/aOn the other hand, if you are building e.g. an HTTP server, where all
82n/adata is stored externally (e.g. in the file system), a synchronous
83n/aclass will essentially render the service "deaf" while one request is
84n/abeing handled -- which may be for a very long time if a client is slow
85n/ato reqd all the data it has requested. Here a threading or forking
86n/aserver is appropriate.
87n/a
88n/aIn some cases, it may be appropriate to process part of a request
89n/asynchronously, but to finish processing in a forked child depending on
90n/athe request data. This can be implemented by using a synchronous
91n/aserver and doing an explicit fork in the request handler class
92n/ahandle() method.
93n/a
94n/aAnother approach to handling multiple simultaneous requests in an
95n/aenvironment that supports neither threads nor fork (or where these are
96n/atoo expensive or inappropriate for the service) is to maintain an
97n/aexplicit table of partially finished requests and to use select() to
98n/adecide which request to work on next (or whether to handle a new
99n/aincoming request). This is particularly important for stream services
100n/awhere each client can potentially be connected for a long time (if
101n/athreads or subprocesses cannot be used).
102n/a
103n/aFuture work:
104n/a- Standard classes for Sun RPC (which uses either UDP or TCP)
105n/a- Standard mix-in classes to implement various authentication
106n/a and encryption schemes
107n/a- Standard framework for select-based multiplexing
108n/a
109n/aXXX Open problems:
110n/a- What to do with out-of-band data?
111n/a
112n/aBaseServer:
113n/a- split generic "request" functionality out into BaseServer class.
114n/a Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org>
115n/a
116n/a example: read entries from a SQL database (requires overriding
117n/a get_request() to return a table entry from the database).
118n/a entry is processed by a RequestHandlerClass.
119n/a
1201"""
121n/a
122n/a# Author of the BaseServer patch: Luke Kenneth Casson Leighton
123n/a
124n/a# XXX Warning!
125n/a# There is a test suite for this module, but it cannot be run by the
126n/a# standard regression test.
127n/a# To run it manually, run Lib/test/test_socketserver.py.
128n/a
1291__version__ = "0.4"
130n/a
131n/a
1321import socket
1331import select
1341import sys
1351import os
1361try:
1371 import threading
1380except ImportError:
1390 import dummy_threading as threading
140n/a
1411__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
1421 "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
1431 "StreamRequestHandler","DatagramRequestHandler",
1441 "ThreadingMixIn", "ForkingMixIn"]
1451if hasattr(socket, "AF_UNIX"):
1461 __all__.extend(["UnixStreamServer","UnixDatagramServer",
1471 "ThreadingUnixStreamServer",
1481 "ThreadingUnixDatagramServer"])
149n/a
1502class BaseServer:
151n/a
152n/a """Base class for server classes.
153n/a
154n/a Methods for the caller:
155n/a
156n/a - __init__(server_address, RequestHandlerClass)
157n/a - serve_forever(poll_interval=0.5)
158n/a - shutdown()
159n/a - handle_request() # if you do not use serve_forever()
160n/a - fileno() -> int # for select()
161n/a
162n/a Methods that may be overridden:
163n/a
164n/a - server_bind()
165n/a - server_activate()
166n/a - get_request() -> request, client_address
167n/a - handle_timeout()
168n/a - verify_request(request, client_address)
169n/a - server_close()
170n/a - process_request(request, client_address)
171n/a - shutdown_request(request)
172n/a - close_request(request)
173n/a - handle_error()
174n/a
175n/a Methods for derived classes:
176n/a
177n/a - finish_request(request, client_address)
178n/a
179n/a Class variables that may be overridden by derived classes or
180n/a instances:
181n/a
182n/a - timeout
183n/a - address_family
184n/a - socket_type
185n/a - allow_reuse_address
186n/a
187n/a Instance variables:
188n/a
189n/a - RequestHandlerClass
190n/a - socket
191n/a
1921 """
193n/a
1941 timeout = None
195n/a
1961 def __init__(self, server_address, RequestHandlerClass):
197n/a """Constructor. May be extended, do not override."""
198101 self.server_address = server_address
199101 self.RequestHandlerClass = RequestHandlerClass
200101 self.__is_shut_down = threading.Event()
201101 self.__shutdown_request = False
202n/a
2031 def server_activate(self):
204n/a """Called by constructor to activate the server.
205n/a
206n/a May be overridden.
207n/a
208n/a """
2090 pass
210n/a
2111 def serve_forever(self, poll_interval=0.5):
212n/a """Handle one request at a time until shutdown.
213n/a
214n/a Polls for shutdown every poll_interval seconds. Ignores
215n/a self.timeout. If you need to do periodic tasks, do them in
216n/a another thread.
217n/a """
21857 self.__is_shut_down.clear()
21957 try:
220173 while not self.__shutdown_request:
221n/a # XXX: Consider using another file descriptor or
222n/a # connecting to the socket to wake this up instead of
223n/a # polling. Polling reduces our responsiveness to a
224n/a # shutdown request and wastes cpu at all other times.
225116 r, w, e = select.select([self], [], [], poll_interval)
226116 if self in r:
22762 self._handle_request_noblock()
228n/a finally:
22957 self.__shutdown_request = False
23057 self.__is_shut_down.set()
231n/a
2321 def shutdown(self):
233n/a """Stops the serve_forever loop.
234n/a
235n/a Blocks until the loop has finished. This must be called while
236n/a serve_forever() is running in another thread, or it will
237n/a deadlock.
238n/a """
23957 self.__shutdown_request = True
24057 self.__is_shut_down.wait()
241n/a
242n/a # The distinction between handling, getting, processing and
243n/a # finishing a request is fairly arbitrary. Remember:
244n/a #
245n/a # - handle_request() is the top-level call. It calls
246n/a # select, get_request(), verify_request() and process_request()
247n/a # - get_request() is different for stream or datagram sockets
248n/a # - process_request() is the place that may fork a new process
249n/a # or create a new thread to finish the request
250n/a # - finish_request() instantiates the request handler class;
251n/a # this constructor will handle the request all by itself
252n/a
2531 def handle_request(self):
254n/a """Handle one request, possibly blocking.
255n/a
256n/a Respects self.timeout.
257n/a """
258n/a # Support people who used socket.settimeout() to escape
259n/a # handle_request before self.timeout was available.
26057 timeout = self.socket.gettimeout()
26157 if timeout is None:
2629 timeout = self.timeout
26348 elif self.timeout is not None:
2640 timeout = min(timeout, self.timeout)
26557 fd_sets = select.select([self], [], [], timeout)
26657 if not fd_sets[0]:
2674 self.handle_timeout()
2684 return
26953 self._handle_request_noblock()
270n/a
2711 def _handle_request_noblock(self):
272n/a """Handle one request, without blocking.
273n/a
274n/a I assume that select.select has returned that the socket is
275n/a readable before this function was called, so there should be
276n/a no risk of blocking in get_request().
277n/a """
278115 try:
279115 request, client_address = self.get_request()
2800 except socket.error:
2810 return
282115 if self.verify_request(request, client_address):
283115 try:
284115 self.process_request(request, client_address)
2850 except:
2860 self.handle_error(request, client_address)
2870 self.shutdown_request(request)
288n/a
2891 def handle_timeout(self):
290n/a """Called if no new request arrives within self.timeout.
291n/a
292n/a Overridden by ForkingMixIn.
293n/a """
2944 pass
295n/a
2961 def verify_request(self, request, client_address):
297n/a """Verify the request. May be overridden.
298n/a
299n/a Return True if we should proceed with this request.
300n/a
301n/a """
302115 return True
303n/a
3041 def process_request(self, request, client_address):
305n/a """Call finish_request.
306n/a
307n/a Overridden by ForkingMixIn and ThreadingMixIn.
308n/a
309n/a """
31094 self.finish_request(request, client_address)
31194 self.shutdown_request(request)
312n/a
3131 def server_close(self):
314n/a """Called to clean-up the server.
315n/a
316n/a May be overridden.
317n/a
318n/a """
3190 pass
320n/a
3211 def finish_request(self, request, client_address):
322n/a """Finish one request by instantiating RequestHandlerClass."""
323109 self.RequestHandlerClass(request, client_address, self)
324n/a
3251 def shutdown_request(self, request):
326n/a """Called to shutdown and close an individual request."""
3270 self.close_request(request)
328n/a
3291 def close_request(self, request):
330n/a """Called to clean up an individual request."""
3310 pass
332n/a
3331 def handle_error(self, request, client_address):
334n/a """Handle an error gracefully. May be overridden.
335n/a
336n/a The default is to print a traceback and continue.
337n/a
338n/a """
3390 print '-'*40
3400 print 'Exception happened during processing of request from',
3410 print client_address
3420 import traceback
3430 traceback.print_exc() # XXX But this goes to stderr!
3440 print '-'*40
345n/a
346n/a
3472class TCPServer(BaseServer):
348n/a
349n/a """Base class for various socket-based server classes.
350n/a
351n/a Defaults to synchronous IP stream (i.e., TCP).
352n/a
353n/a Methods for the caller:
354n/a
355n/a - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
356n/a - serve_forever(poll_interval=0.5)
357n/a - shutdown()
358n/a - handle_request() # if you don't use serve_forever()
359n/a - fileno() -> int # for select()
360n/a
361n/a Methods that may be overridden:
362n/a
363n/a - server_bind()
364n/a - server_activate()
365n/a - get_request() -> request, client_address
366n/a - handle_timeout()
367n/a - verify_request(request, client_address)
368n/a - process_request(request, client_address)
369n/a - shutdown_request(request)
370n/a - close_request(request)
371n/a - handle_error()
372n/a
373n/a Methods for derived classes:
374n/a
375n/a - finish_request(request, client_address)
376n/a
377n/a Class variables that may be overridden by derived classes or
378n/a instances:
379n/a
380n/a - timeout
381n/a - address_family
382n/a - socket_type
383n/a - request_queue_size (only for stream sockets)
384n/a - allow_reuse_address
385n/a
386n/a Instance variables:
387n/a
388n/a - server_address
389n/a - RequestHandlerClass
390n/a - socket
391n/a
3921 """
393n/a
3941 address_family = socket.AF_INET
395n/a
3961 socket_type = socket.SOCK_STREAM
397n/a
3981 request_queue_size = 5
399n/a
4001 allow_reuse_address = False
401n/a
4021 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
403n/a """Constructor. May be extended, do not override."""
40498 BaseServer.__init__(self, server_address, RequestHandlerClass)
40598 self.socket = socket.socket(self.address_family,
40698 self.socket_type)
40798 if bind_and_activate:
40878 self.server_bind()
40978 self.server_activate()
410n/a
4111 def server_bind(self):
412n/a """Called by constructor to bind the socket.
413n/a
414n/a May be overridden.
415n/a
416n/a """
41798 if self.allow_reuse_address:
41865 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
41998 self.socket.bind(self.server_address)
42098 self.server_address = self.socket.getsockname()
421n/a
4221 def server_activate(self):
423n/a """Called by constructor to activate the server.
424n/a
425n/a May be overridden.
426n/a
427n/a """
42895 self.socket.listen(self.request_queue_size)
429n/a
4301 def server_close(self):
431n/a """Called to clean-up the server.
432n/a
433n/a May be overridden.
434n/a
435n/a """
43630 self.socket.close()
437n/a
4381 def fileno(self):
439n/a """Return socket file number.
440n/a
441n/a Interface required by select().
442n/a
443n/a """
444225 return self.socket.fileno()
445n/a
4461 def get_request(self):
447n/a """Get the request and client address from the socket.
448n/a
449n/a May be overridden.
450n/a
451n/a """
45259 return self.socket.accept()
453n/a
4541 def shutdown_request(self, request):
455n/a """Called to shutdown and close an individual request."""
456100 try:
457n/a #explicitly shutdown. socket.close() merely releases
458n/a #the socket and waits for GC to perform the actual close.
459100 request.shutdown(socket.SHUT_WR)
4605 except socket.error:
4615 pass #some platforms may raise ENOTCONN here
462100 self.close_request(request)
463n/a
4641 def close_request(self, request):
465n/a """Called to clean up an individual request."""
466106 request.close()
467n/a
468n/a
4692class UDPServer(TCPServer):
470n/a
4711 """UDP server class."""
472n/a
4731 allow_reuse_address = False
474n/a
4751 socket_type = socket.SOCK_DGRAM
476n/a
4771 max_packet_size = 8192
478n/a
4791 def get_request(self):
4809 data, client_addr = self.socket.recvfrom(self.max_packet_size)
4819 return (data, self.socket), client_addr
482n/a
4831 def server_activate(self):
484n/a # No need to call listen() for UDP.
4853 pass
486n/a
4871 def shutdown_request(self, request):
488n/a # No need to shutdown anything.
4896 self.close_request(request)
490n/a
4911 def close_request(self, request):
492n/a # No need to close anything.
4939 pass
494n/a
4952class ForkingMixIn:
496n/a
4971 """Mix-in class to handle each request in a new process."""
498n/a
4991 timeout = 300
5001 active_children = None
5011 max_children = 40
502n/a
5031 def collect_children(self):
504n/a """Internal routine to wait for children that have exited."""
5059 if self.active_children is None: return
5066 while len(self.active_children) >= self.max_children:
507n/a # XXX: This will wait for any child process, not just ones
508n/a # spawned by this library. This could confuse other
509n/a # libraries that expect to be able to wait for their own
510n/a # children.
5110 try:
5120 pid, status = os.waitpid(0, 0)
5130 except os.error:
5140 pid = None
5150 if pid not in self.active_children: continue
5160 self.active_children.remove(pid)
517n/a
518n/a # XXX: This loop runs more system calls than it ought
519n/a # to. There should be a way to put the active_children into a
520n/a # process group and then use os.waitpid(-pgid) to wait for any
521n/a # of that set, but I couldn't find a way to allocate pgids
522n/a # that couldn't collide.
52312 for child in self.active_children:
5246 try:
5256 pid, status = os.waitpid(child, os.WNOHANG)
5260 except os.error:
5270 pid = None
5286 if not pid: continue
5293 try:
5303 self.active_children.remove(pid)
5310 except ValueError, e:
5320 raise ValueError('%s. x=%d and list=%r' % (e.message, pid,
5330 self.active_children))
534n/a
5351 def handle_timeout(self):
536n/a """Wait for zombies after self.timeout seconds of inactivity.
537n/a
538n/a May be extended, do not override.
539n/a """
5400 self.collect_children()
541n/a
5421 def process_request(self, request, client_address):
543n/a """Fork a new subprocess to process the request."""
5449 self.collect_children()
5459 pid = os.fork()
5469 if pid:
547n/a # Parent process
5489 if self.active_children is None:
5493 self.active_children = []
5509 self.active_children.append(pid)
5519 self.close_request(request) #close handle in parent process
5529 return
553n/a else:
554n/a # Child process.
555n/a # This must never return, hence os._exit()!
5560 try:
5570 self.finish_request(request, client_address)
5580 self.shutdown_request(request)
5590 os._exit(0)
5600 except:
5610 try:
5620 self.handle_error(request, client_address)
5630 self.shutdown_request(request)
564n/a finally:
5650 os._exit(1)
566n/a
567n/a
5682class ThreadingMixIn:
5691 """Mix-in class to handle each request in a new thread."""
570n/a
571n/a # Decides how threads will act upon termination of the
572n/a # main process
5731 daemon_threads = False
574n/a
5751 def process_request_thread(self, request, client_address):
576n/a """Same as in BaseServer but as a thread.
577n/a
578n/a In addition, exception handling is done here.
579n/a
580n/a """
58112 try:
58212 self.finish_request(request, client_address)
58312 self.shutdown_request(request)
5840 except:
5850 self.handle_error(request, client_address)
5860 self.shutdown_request(request)
587n/a
5881 def process_request(self, request, client_address):
589n/a """Start a new thread to process the request."""
59012 t = threading.Thread(target = self.process_request_thread,
59112 args = (request, client_address))
59212 if self.daemon_threads:
5930 t.setDaemon (1)
59412 t.start()
595n/a
596n/a
5972class ForkingUDPServer(ForkingMixIn, UDPServer): pass
5982class ForkingTCPServer(ForkingMixIn, TCPServer): pass
599n/a
6002class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
6012class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
602n/a
6031if hasattr(socket, 'AF_UNIX'):
604n/a
6052 class UnixStreamServer(TCPServer):
6061 address_family = socket.AF_UNIX
607n/a
6082 class UnixDatagramServer(UDPServer):
6091 address_family = socket.AF_UNIX
610n/a
6112 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
612n/a
6132 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
614n/a
6152class BaseRequestHandler:
616n/a
617n/a """Base class for request handler classes.
618n/a
619n/a This class is instantiated for each request to be handled. The
620n/a constructor sets the instance variables request, client_address
621n/a and server, and then calls the handle() method. To implement a
622n/a specific service, all you need to do is to derive a class which
623n/a defines a handle() method.
624n/a
625n/a The handle() method can find the request as self.request, the
626n/a client address as self.client_address, and the server (in case it
627n/a needs access to per-server information) as self.server. Since a
628n/a separate instance is created for each request, the handle() method
629n/a can define arbitrary other instance variariables.
630n/a
6311 """
632n/a
6331 def __init__(self, request, client_address, server):
634109 self.request = request
635109 self.client_address = client_address
636109 self.server = server
637109 self.setup()
638109 try:
639109 self.handle()
640n/a finally:
641109 self.finish()
642n/a
6431 def setup(self):
6440 pass
645n/a
6461 def handle(self):
6470 pass
648n/a
6491 def finish(self):
6500 pass
651n/a
652n/a
653n/a# The following two classes make it possible to use the same service
654n/a# class for stream or datagram servers.
655n/a# Each class sets up these instance variables:
656n/a# - rfile: a file object from which receives the request is read
657n/a# - wfile: a file object to which the reply is written
658n/a# When the handle() method returns, wfile is flushed properly
659n/a
660n/a
6612class StreamRequestHandler(BaseRequestHandler):
662n/a
6631 """Define self.rfile and self.wfile for stream sockets."""
664n/a
665n/a # Default buffer sizes for rfile, wfile.
666n/a # We default rfile to buffered because otherwise it could be
667n/a # really slow for large data (a getc() call per byte); we make
668n/a # wfile unbuffered because (a) often after a write() we want to
669n/a # read and we need to flush the line; (b) big writes to unbuffered
670n/a # files are typically optimized by stdio even when big reads
671n/a # aren't.
6721 rbufsize = -1
6731 wbufsize = 0
674n/a
675n/a # A timeout to apply to the request socket, if not None.
6761 timeout = None
677n/a
678n/a # Disable nagle algoritm for this socket, if True.
679n/a # Use only when wbufsize != 0, to avoid small packets.
6801 disable_nagle_algorithm = False
681n/a
6821 def setup(self):
683100 self.connection = self.request
684100 if self.timeout is not None:
6852 self.connection.settimeout(self.timeout)
686100 if self.disable_nagle_algorithm:
68730 self.connection.setsockopt(socket.IPPROTO_TCP,
68830 socket.TCP_NODELAY, True)
689100 self.rfile = self.connection.makefile('rb', self.rbufsize)
690100 self.wfile = self.connection.makefile('wb', self.wbufsize)
691n/a
6921 def finish(self):
693100 if not self.wfile.closed:
694100 self.wfile.flush()
695100 self.wfile.close()
696100 self.rfile.close()
697n/a
698n/a
6992class DatagramRequestHandler(BaseRequestHandler):
700n/a
701n/a # XXX Regrettably, I cannot get this working on Linux;
702n/a # s.recvfrom() doesn't return a meaningful client address.
703n/a
7041 """Define self.rfile and self.wfile for datagram sockets."""
705n/a
7061 def setup(self):
7076 try:
7086 from cStringIO import StringIO
7090 except ImportError:
7100 from StringIO import StringIO
7116 self.packet, self.socket = self.request
7126 self.rfile = StringIO(self.packet)
7136 self.wfile = StringIO()
714n/a
7151 def finish(self):
7166 self.socket.sendto(self.wfile.getvalue(), self.client_address)