ยปCore Development>Code coverage>Lib/asyncore.py

Python code coverage for Lib/asyncore.py

#countcontent
1n/a# -*- Mode: Python -*-
2n/a# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3n/a# Author: Sam Rushing <rushing@nightmare.com>
4n/a
5n/a# ======================================================================
6n/a# Copyright 1996 by Sam Rushing
7n/a#
8n/a# All Rights Reserved
9n/a#
10n/a# Permission to use, copy, modify, and distribute this software and
11n/a# its documentation for any purpose and without fee is hereby
12n/a# granted, provided that the above copyright notice appear in all
13n/a# copies and that both that copyright notice and this permission
14n/a# notice appear in supporting documentation, and that the name of Sam
15n/a# Rushing not be used in advertising or publicity pertaining to
16n/a# distribution of the software without specific, written prior
17n/a# permission.
18n/a#
19n/a# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20n/a# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21n/a# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22n/a# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23n/a# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24n/a# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25n/a# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26n/a# ======================================================================
27n/a
28n/a"""Basic infrastructure for asynchronous socket service clients and servers.
29n/a
30n/aThere are only two ways to have a program on a single processor do "more
31n/athan one thing at a time". Multi-threaded programming is the simplest and
32n/amost popular way to do it, but there is another very different technique,
33n/athat lets you have nearly all the advantages of multi-threading, without
34n/aactually using multiple threads. it's really only practical if your program
35n/ais largely I/O bound. If your program is CPU bound, then pre-emptive
36n/ascheduled threads are probably what you really need. Network servers are
37n/ararely CPU-bound, however.
38n/a
39n/aIf your operating system supports the select() system call in its I/O
40n/alibrary (and nearly all do), then you can use it to juggle multiple
41n/acommunication channels at once; doing other work while your I/O is taking
42n/aplace in the "background." Although this strategy can seem strange and
43n/acomplex, especially at first, it is in many ways easier to understand and
44n/acontrol than multi-threaded programming. The module documented here solves
45n/amany of the difficult problems for you, making the task of building
46n/asophisticated high-performance network servers and clients a snap.
47n/a"""
48n/a
49n/aimport select
50n/aimport socket
51n/aimport sys
52n/aimport time
53n/aimport warnings
54n/a
55n/aimport os
56n/afrom errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57n/a ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
58n/a errorcode
59n/a
60n/a_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
61n/a EBADF})
62n/a
63n/atry:
64n/a socket_map
65n/aexcept NameError:
66n/a socket_map = {}
67n/a
68n/adef _strerror(err):
69n/a try:
70n/a return os.strerror(err)
71n/a except (ValueError, OverflowError, NameError):
72n/a if err in errorcode:
73n/a return errorcode[err]
74n/a return "Unknown error %s" %err
75n/a
76n/aclass ExitNow(Exception):
77n/a pass
78n/a
79n/a_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
80n/a
81n/adef read(obj):
82n/a try:
83n/a obj.handle_read_event()
84n/a except _reraised_exceptions:
85n/a raise
86n/a except:
87n/a obj.handle_error()
88n/a
89n/adef write(obj):
90n/a try:
91n/a obj.handle_write_event()
92n/a except _reraised_exceptions:
93n/a raise
94n/a except:
95n/a obj.handle_error()
96n/a
97n/adef _exception(obj):
98n/a try:
99n/a obj.handle_expt_event()
100n/a except _reraised_exceptions:
101n/a raise
102n/a except:
103n/a obj.handle_error()
104n/a
105n/adef readwrite(obj, flags):
106n/a try:
107n/a if flags & select.POLLIN:
108n/a obj.handle_read_event()
109n/a if flags & select.POLLOUT:
110n/a obj.handle_write_event()
111n/a if flags & select.POLLPRI:
112n/a obj.handle_expt_event()
113n/a if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
114n/a obj.handle_close()
115n/a except OSError as e:
116n/a if e.args[0] not in _DISCONNECTED:
117n/a obj.handle_error()
118n/a else:
119n/a obj.handle_close()
120n/a except _reraised_exceptions:
121n/a raise
122n/a except:
123n/a obj.handle_error()
124n/a
125n/adef poll(timeout=0.0, map=None):
126n/a if map is None:
127n/a map = socket_map
128n/a if map:
129n/a r = []; w = []; e = []
130n/a for fd, obj in list(map.items()):
131n/a is_r = obj.readable()
132n/a is_w = obj.writable()
133n/a if is_r:
134n/a r.append(fd)
135n/a # accepting sockets should not be writable
136n/a if is_w and not obj.accepting:
137n/a w.append(fd)
138n/a if is_r or is_w:
139n/a e.append(fd)
140n/a if [] == r == w == e:
141n/a time.sleep(timeout)
142n/a return
143n/a
144n/a r, w, e = select.select(r, w, e, timeout)
145n/a
146n/a for fd in r:
147n/a obj = map.get(fd)
148n/a if obj is None:
149n/a continue
150n/a read(obj)
151n/a
152n/a for fd in w:
153n/a obj = map.get(fd)
154n/a if obj is None:
155n/a continue
156n/a write(obj)
157n/a
158n/a for fd in e:
159n/a obj = map.get(fd)
160n/a if obj is None:
161n/a continue
162n/a _exception(obj)
163n/a
164n/adef poll2(timeout=0.0, map=None):
165n/a # Use the poll() support added to the select module in Python 2.0
166n/a if map is None:
167n/a map = socket_map
168n/a if timeout is not None:
169n/a # timeout is in milliseconds
170n/a timeout = int(timeout*1000)
171n/a pollster = select.poll()
172n/a if map:
173n/a for fd, obj in list(map.items()):
174n/a flags = 0
175n/a if obj.readable():
176n/a flags |= select.POLLIN | select.POLLPRI
177n/a # accepting sockets should not be writable
178n/a if obj.writable() and not obj.accepting:
179n/a flags |= select.POLLOUT
180n/a if flags:
181n/a pollster.register(fd, flags)
182n/a
183n/a r = pollster.poll(timeout)
184n/a for fd, flags in r:
185n/a obj = map.get(fd)
186n/a if obj is None:
187n/a continue
188n/a readwrite(obj, flags)
189n/a
190n/apoll3 = poll2 # Alias for backward compatibility
191n/a
192n/adef loop(timeout=30.0, use_poll=False, map=None, count=None):
193n/a if map is None:
194n/a map = socket_map
195n/a
196n/a if use_poll and hasattr(select, 'poll'):
197n/a poll_fun = poll2
198n/a else:
199n/a poll_fun = poll
200n/a
201n/a if count is None:
202n/a while map:
203n/a poll_fun(timeout, map)
204n/a
205n/a else:
206n/a while map and count > 0:
207n/a poll_fun(timeout, map)
208n/a count = count - 1
209n/a
210n/aclass dispatcher:
211n/a
212n/a debug = False
213n/a connected = False
214n/a accepting = False
215n/a connecting = False
216n/a closing = False
217n/a addr = None
218n/a ignore_log_types = frozenset({'warning'})
219n/a
220n/a def __init__(self, sock=None, map=None):
221n/a if map is None:
222n/a self._map = socket_map
223n/a else:
224n/a self._map = map
225n/a
226n/a self._fileno = None
227n/a
228n/a if sock:
229n/a # Set to nonblocking just to make sure for cases where we
230n/a # get a socket from a blocking source.
231n/a sock.setblocking(0)
232n/a self.set_socket(sock, map)
233n/a self.connected = True
234n/a # The constructor no longer requires that the socket
235n/a # passed be connected.
236n/a try:
237n/a self.addr = sock.getpeername()
238n/a except OSError as err:
239n/a if err.args[0] in (ENOTCONN, EINVAL):
240n/a # To handle the case where we got an unconnected
241n/a # socket.
242n/a self.connected = False
243n/a else:
244n/a # The socket is broken in some unknown way, alert
245n/a # the user and remove it from the map (to prevent
246n/a # polling of broken sockets).
247n/a self.del_channel(map)
248n/a raise
249n/a else:
250n/a self.socket = None
251n/a
252n/a def __repr__(self):
253n/a status = [self.__class__.__module__+"."+self.__class__.__qualname__]
254n/a if self.accepting and self.addr:
255n/a status.append('listening')
256n/a elif self.connected:
257n/a status.append('connected')
258n/a if self.addr is not None:
259n/a try:
260n/a status.append('%s:%d' % self.addr)
261n/a except TypeError:
262n/a status.append(repr(self.addr))
263n/a return '<%s at %#x>' % (' '.join(status), id(self))
264n/a
265n/a __str__ = __repr__
266n/a
267n/a def add_channel(self, map=None):
268n/a #self.log_info('adding channel %s' % self)
269n/a if map is None:
270n/a map = self._map
271n/a map[self._fileno] = self
272n/a
273n/a def del_channel(self, map=None):
274n/a fd = self._fileno
275n/a if map is None:
276n/a map = self._map
277n/a if fd in map:
278n/a #self.log_info('closing channel %d:%s' % (fd, self))
279n/a del map[fd]
280n/a self._fileno = None
281n/a
282n/a def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
283n/a self.family_and_type = family, type
284n/a sock = socket.socket(family, type)
285n/a sock.setblocking(0)
286n/a self.set_socket(sock)
287n/a
288n/a def set_socket(self, sock, map=None):
289n/a self.socket = sock
290n/a## self.__dict__['socket'] = sock
291n/a self._fileno = sock.fileno()
292n/a self.add_channel(map)
293n/a
294n/a def set_reuse_addr(self):
295n/a # try to re-use a server port if possible
296n/a try:
297n/a self.socket.setsockopt(
298n/a socket.SOL_SOCKET, socket.SO_REUSEADDR,
299n/a self.socket.getsockopt(socket.SOL_SOCKET,
300n/a socket.SO_REUSEADDR) | 1
301n/a )
302n/a except OSError:
303n/a pass
304n/a
305n/a # ==================================================
306n/a # predicates for select()
307n/a # these are used as filters for the lists of sockets
308n/a # to pass to select().
309n/a # ==================================================
310n/a
311n/a def readable(self):
312n/a return True
313n/a
314n/a def writable(self):
315n/a return True
316n/a
317n/a # ==================================================
318n/a # socket object methods.
319n/a # ==================================================
320n/a
321n/a def listen(self, num):
322n/a self.accepting = True
323n/a if os.name == 'nt' and num > 5:
324n/a num = 5
325n/a return self.socket.listen(num)
326n/a
327n/a def bind(self, addr):
328n/a self.addr = addr
329n/a return self.socket.bind(addr)
330n/a
331n/a def connect(self, address):
332n/a self.connected = False
333n/a self.connecting = True
334n/a err = self.socket.connect_ex(address)
335n/a if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
336n/a or err == EINVAL and os.name == 'nt':
337n/a self.addr = address
338n/a return
339n/a if err in (0, EISCONN):
340n/a self.addr = address
341n/a self.handle_connect_event()
342n/a else:
343n/a raise OSError(err, errorcode[err])
344n/a
345n/a def accept(self):
346n/a # XXX can return either an address pair or None
347n/a try:
348n/a conn, addr = self.socket.accept()
349n/a except TypeError:
350n/a return None
351n/a except OSError as why:
352n/a if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
353n/a return None
354n/a else:
355n/a raise
356n/a else:
357n/a return conn, addr
358n/a
359n/a def send(self, data):
360n/a try:
361n/a result = self.socket.send(data)
362n/a return result
363n/a except OSError as why:
364n/a if why.args[0] == EWOULDBLOCK:
365n/a return 0
366n/a elif why.args[0] in _DISCONNECTED:
367n/a self.handle_close()
368n/a return 0
369n/a else:
370n/a raise
371n/a
372n/a def recv(self, buffer_size):
373n/a try:
374n/a data = self.socket.recv(buffer_size)
375n/a if not data:
376n/a # a closed connection is indicated by signaling
377n/a # a read condition, and having recv() return 0.
378n/a self.handle_close()
379n/a return b''
380n/a else:
381n/a return data
382n/a except OSError as why:
383n/a # winsock sometimes raises ENOTCONN
384n/a if why.args[0] in _DISCONNECTED:
385n/a self.handle_close()
386n/a return b''
387n/a else:
388n/a raise
389n/a
390n/a def close(self):
391n/a self.connected = False
392n/a self.accepting = False
393n/a self.connecting = False
394n/a self.del_channel()
395n/a if self.socket is not None:
396n/a try:
397n/a self.socket.close()
398n/a except OSError as why:
399n/a if why.args[0] not in (ENOTCONN, EBADF):
400n/a raise
401n/a
402n/a # log and log_info may be overridden to provide more sophisticated
403n/a # logging and warning methods. In general, log is for 'hit' logging
404n/a # and 'log_info' is for informational, warning and error logging.
405n/a
406n/a def log(self, message):
407n/a sys.stderr.write('log: %s\n' % str(message))
408n/a
409n/a def log_info(self, message, type='info'):
410n/a if type not in self.ignore_log_types:
411n/a print('%s: %s' % (type, message))
412n/a
413n/a def handle_read_event(self):
414n/a if self.accepting:
415n/a # accepting sockets are never connected, they "spawn" new
416n/a # sockets that are connected
417n/a self.handle_accept()
418n/a elif not self.connected:
419n/a if self.connecting:
420n/a self.handle_connect_event()
421n/a self.handle_read()
422n/a else:
423n/a self.handle_read()
424n/a
425n/a def handle_connect_event(self):
426n/a err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
427n/a if err != 0:
428n/a raise OSError(err, _strerror(err))
429n/a self.handle_connect()
430n/a self.connected = True
431n/a self.connecting = False
432n/a
433n/a def handle_write_event(self):
434n/a if self.accepting:
435n/a # Accepting sockets shouldn't get a write event.
436n/a # We will pretend it didn't happen.
437n/a return
438n/a
439n/a if not self.connected:
440n/a if self.connecting:
441n/a self.handle_connect_event()
442n/a self.handle_write()
443n/a
444n/a def handle_expt_event(self):
445n/a # handle_expt_event() is called if there might be an error on the
446n/a # socket, or if there is OOB data
447n/a # check for the error condition first
448n/a err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
449n/a if err != 0:
450n/a # we can get here when select.select() says that there is an
451n/a # exceptional condition on the socket
452n/a # since there is an error, we'll go ahead and close the socket
453n/a # like we would in a subclassed handle_read() that received no
454n/a # data
455n/a self.handle_close()
456n/a else:
457n/a self.handle_expt()
458n/a
459n/a def handle_error(self):
460n/a nil, t, v, tbinfo = compact_traceback()
461n/a
462n/a # sometimes a user repr method will crash.
463n/a try:
464n/a self_repr = repr(self)
465n/a except:
466n/a self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
467n/a
468n/a self.log_info(
469n/a 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
470n/a self_repr,
471n/a t,
472n/a v,
473n/a tbinfo
474n/a ),
475n/a 'error'
476n/a )
477n/a self.handle_close()
478n/a
479n/a def handle_expt(self):
480n/a self.log_info('unhandled incoming priority event', 'warning')
481n/a
482n/a def handle_read(self):
483n/a self.log_info('unhandled read event', 'warning')
484n/a
485n/a def handle_write(self):
486n/a self.log_info('unhandled write event', 'warning')
487n/a
488n/a def handle_connect(self):
489n/a self.log_info('unhandled connect event', 'warning')
490n/a
491n/a def handle_accept(self):
492n/a pair = self.accept()
493n/a if pair is not None:
494n/a self.handle_accepted(*pair)
495n/a
496n/a def handle_accepted(self, sock, addr):
497n/a sock.close()
498n/a self.log_info('unhandled accepted event', 'warning')
499n/a
500n/a def handle_close(self):
501n/a self.log_info('unhandled close event', 'warning')
502n/a self.close()
503n/a
504n/a# ---------------------------------------------------------------------------
505n/a# adds simple buffered output capability, useful for simple clients.
506n/a# [for more sophisticated usage use asynchat.async_chat]
507n/a# ---------------------------------------------------------------------------
508n/a
509n/aclass dispatcher_with_send(dispatcher):
510n/a
511n/a def __init__(self, sock=None, map=None):
512n/a dispatcher.__init__(self, sock, map)
513n/a self.out_buffer = b''
514n/a
515n/a def initiate_send(self):
516n/a num_sent = 0
517n/a num_sent = dispatcher.send(self, self.out_buffer[:65536])
518n/a self.out_buffer = self.out_buffer[num_sent:]
519n/a
520n/a def handle_write(self):
521n/a self.initiate_send()
522n/a
523n/a def writable(self):
524n/a return (not self.connected) or len(self.out_buffer)
525n/a
526n/a def send(self, data):
527n/a if self.debug:
528n/a self.log_info('sending %s' % repr(data))
529n/a self.out_buffer = self.out_buffer + data
530n/a self.initiate_send()
531n/a
532n/a# ---------------------------------------------------------------------------
533n/a# used for debugging.
534n/a# ---------------------------------------------------------------------------
535n/a
536n/adef compact_traceback():
537n/a t, v, tb = sys.exc_info()
538n/a tbinfo = []
539n/a if not tb: # Must have a traceback
540n/a raise AssertionError("traceback does not exist")
541n/a while tb:
542n/a tbinfo.append((
543n/a tb.tb_frame.f_code.co_filename,
544n/a tb.tb_frame.f_code.co_name,
545n/a str(tb.tb_lineno)
546n/a ))
547n/a tb = tb.tb_next
548n/a
549n/a # just to be safe
550n/a del tb
551n/a
552n/a file, function, line = tbinfo[-1]
553n/a info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
554n/a return (file, function, line), t, v, info
555n/a
556n/adef close_all(map=None, ignore_all=False):
557n/a if map is None:
558n/a map = socket_map
559n/a for x in list(map.values()):
560n/a try:
561n/a x.close()
562n/a except OSError as x:
563n/a if x.args[0] == EBADF:
564n/a pass
565n/a elif not ignore_all:
566n/a raise
567n/a except _reraised_exceptions:
568n/a raise
569n/a except:
570n/a if not ignore_all:
571n/a raise
572n/a map.clear()
573n/a
574n/a# Asynchronous File I/O:
575n/a#
576n/a# After a little research (reading man pages on various unixen, and
577n/a# digging through the linux kernel), I've determined that select()
578n/a# isn't meant for doing asynchronous file i/o.
579n/a# Heartening, though - reading linux/mm/filemap.c shows that linux
580n/a# supports asynchronous read-ahead. So _MOST_ of the time, the data
581n/a# will be sitting in memory for us already when we go to read it.
582n/a#
583n/a# What other OS's (besides NT) support async file i/o? [VMS?]
584n/a#
585n/a# Regardless, this is useful for pipes, and stdin/stdout...
586n/a
587n/aif os.name == 'posix':
588n/a class file_wrapper:
589n/a # Here we override just enough to make a file
590n/a # look like a socket for the purposes of asyncore.
591n/a # The passed fd is automatically os.dup()'d
592n/a
593n/a def __init__(self, fd):
594n/a self.fd = os.dup(fd)
595n/a
596n/a def __del__(self):
597n/a if self.fd >= 0:
598n/a warnings.warn("unclosed file %r" % self, ResourceWarning,
599n/a source=self)
600n/a self.close()
601n/a
602n/a def recv(self, *args):
603n/a return os.read(self.fd, *args)
604n/a
605n/a def send(self, *args):
606n/a return os.write(self.fd, *args)
607n/a
608n/a def getsockopt(self, level, optname, buflen=None):
609n/a if (level == socket.SOL_SOCKET and
610n/a optname == socket.SO_ERROR and
611n/a not buflen):
612n/a return 0
613n/a raise NotImplementedError("Only asyncore specific behaviour "
614n/a "implemented.")
615n/a
616n/a read = recv
617n/a write = send
618n/a
619n/a def close(self):
620n/a if self.fd < 0:
621n/a return
622n/a os.close(self.fd)
623n/a self.fd = -1
624n/a
625n/a def fileno(self):
626n/a return self.fd
627n/a
628n/a class file_dispatcher(dispatcher):
629n/a
630n/a def __init__(self, fd, map=None):
631n/a dispatcher.__init__(self, None, map)
632n/a self.connected = True
633n/a try:
634n/a fd = fd.fileno()
635n/a except AttributeError:
636n/a pass
637n/a self.set_file(fd)
638n/a # set it to non-blocking mode
639n/a os.set_blocking(fd, False)
640n/a
641n/a def set_file(self, fd):
642n/a self.socket = file_wrapper(fd)
643n/a self._fileno = self.socket.fileno()
644n/a self.add_channel()