ยปCore Development>Code coverage>Lib/asyncio/selector_events.py

Python code coverage for Lib/asyncio/selector_events.py

#countcontent
1n/a"""Event loop using a selector and related classes.
2n/a
3n/aA selector is a "notify-when-ready" multiplexer. For a subclass which
4n/aalso includes support for signal handling, see the unix_events sub-module.
5n/a"""
6n/a
7n/a__all__ = ['BaseSelectorEventLoop']
8n/a
9n/aimport collections
10n/aimport errno
11n/aimport functools
12n/aimport socket
13n/aimport warnings
14n/aimport weakref
15n/atry:
16n/a import ssl
17n/aexcept ImportError: # pragma: no cover
18n/a ssl = None
19n/a
20n/afrom . import base_events
21n/afrom . import compat
22n/afrom . import constants
23n/afrom . import events
24n/afrom . import futures
25n/afrom . import selectors
26n/afrom . import transports
27n/afrom . import sslproto
28n/afrom .coroutines import coroutine
29n/afrom .log import logger
30n/a
31n/a
32n/adef _test_selector_event(selector, fd, event):
33n/a # Test if the selector is monitoring 'event' events
34n/a # for the file descriptor 'fd'.
35n/a try:
36n/a key = selector.get_key(fd)
37n/a except KeyError:
38n/a return False
39n/a else:
40n/a return bool(key.events & event)
41n/a
42n/a
43n/aif hasattr(socket, 'TCP_NODELAY'):
44n/a def _set_nodelay(sock):
45n/a if (sock.family in {socket.AF_INET, socket.AF_INET6} and
46n/a sock.type == socket.SOCK_STREAM and
47n/a sock.proto == socket.IPPROTO_TCP):
48n/a sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
49n/aelse:
50n/a def _set_nodelay(sock):
51n/a pass
52n/a
53n/a
54n/aclass BaseSelectorEventLoop(base_events.BaseEventLoop):
55n/a """Selector event loop.
56n/a
57n/a See events.EventLoop for API specification.
58n/a """
59n/a
60n/a def __init__(self, selector=None):
61n/a super().__init__()
62n/a
63n/a if selector is None:
64n/a selector = selectors.DefaultSelector()
65n/a logger.debug('Using selector: %s', selector.__class__.__name__)
66n/a self._selector = selector
67n/a self._make_self_pipe()
68n/a self._transports = weakref.WeakValueDictionary()
69n/a
70n/a def _make_socket_transport(self, sock, protocol, waiter=None, *,
71n/a extra=None, server=None):
72n/a return _SelectorSocketTransport(self, sock, protocol, waiter,
73n/a extra, server)
74n/a
75n/a def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
76n/a *, server_side=False, server_hostname=None,
77n/a extra=None, server=None):
78n/a if not sslproto._is_sslproto_available():
79n/a return self._make_legacy_ssl_transport(
80n/a rawsock, protocol, sslcontext, waiter,
81n/a server_side=server_side, server_hostname=server_hostname,
82n/a extra=extra, server=server)
83n/a
84n/a ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
85n/a server_side, server_hostname)
86n/a _SelectorSocketTransport(self, rawsock, ssl_protocol,
87n/a extra=extra, server=server)
88n/a return ssl_protocol._app_transport
89n/a
90n/a def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
91n/a waiter, *,
92n/a server_side=False, server_hostname=None,
93n/a extra=None, server=None):
94n/a # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
95n/a # on Python 3.4 and older, when ssl.MemoryBIO is not available.
96n/a return _SelectorSslTransport(
97n/a self, rawsock, protocol, sslcontext, waiter,
98n/a server_side, server_hostname, extra, server)
99n/a
100n/a def _make_datagram_transport(self, sock, protocol,
101n/a address=None, waiter=None, extra=None):
102n/a return _SelectorDatagramTransport(self, sock, protocol,
103n/a address, waiter, extra)
104n/a
105n/a def close(self):
106n/a if self.is_running():
107n/a raise RuntimeError("Cannot close a running event loop")
108n/a if self.is_closed():
109n/a return
110n/a self._close_self_pipe()
111n/a super().close()
112n/a if self._selector is not None:
113n/a self._selector.close()
114n/a self._selector = None
115n/a
116n/a def _socketpair(self):
117n/a raise NotImplementedError
118n/a
119n/a def _close_self_pipe(self):
120n/a self._remove_reader(self._ssock.fileno())
121n/a self._ssock.close()
122n/a self._ssock = None
123n/a self._csock.close()
124n/a self._csock = None
125n/a self._internal_fds -= 1
126n/a
127n/a def _make_self_pipe(self):
128n/a # A self-socket, really. :-)
129n/a self._ssock, self._csock = self._socketpair()
130n/a self._ssock.setblocking(False)
131n/a self._csock.setblocking(False)
132n/a self._internal_fds += 1
133n/a self._add_reader(self._ssock.fileno(), self._read_from_self)
134n/a
135n/a def _process_self_data(self, data):
136n/a pass
137n/a
138n/a def _read_from_self(self):
139n/a while True:
140n/a try:
141n/a data = self._ssock.recv(4096)
142n/a if not data:
143n/a break
144n/a self._process_self_data(data)
145n/a except InterruptedError:
146n/a continue
147n/a except BlockingIOError:
148n/a break
149n/a
150n/a def _write_to_self(self):
151n/a # This may be called from a different thread, possibly after
152n/a # _close_self_pipe() has been called or even while it is
153n/a # running. Guard for self._csock being None or closed. When
154n/a # a socket is closed, send() raises OSError (with errno set to
155n/a # EBADF, but let's not rely on the exact error code).
156n/a csock = self._csock
157n/a if csock is not None:
158n/a try:
159n/a csock.send(b'\0')
160n/a except OSError:
161n/a if self._debug:
162n/a logger.debug("Fail to write a null byte into the "
163n/a "self-pipe socket",
164n/a exc_info=True)
165n/a
166n/a def _start_serving(self, protocol_factory, sock,
167n/a sslcontext=None, server=None, backlog=100):
168n/a self._add_reader(sock.fileno(), self._accept_connection,
169n/a protocol_factory, sock, sslcontext, server, backlog)
170n/a
171n/a def _accept_connection(self, protocol_factory, sock,
172n/a sslcontext=None, server=None, backlog=100):
173n/a # This method is only called once for each event loop tick where the
174n/a # listening socket has triggered an EVENT_READ. There may be multiple
175n/a # connections waiting for an .accept() so it is called in a loop.
176n/a # See https://bugs.python.org/issue27906 for more details.
177n/a for _ in range(backlog):
178n/a try:
179n/a conn, addr = sock.accept()
180n/a if self._debug:
181n/a logger.debug("%r got a new connection from %r: %r",
182n/a server, addr, conn)
183n/a conn.setblocking(False)
184n/a except (BlockingIOError, InterruptedError, ConnectionAbortedError):
185n/a # Early exit because the socket accept buffer is empty.
186n/a return None
187n/a except OSError as exc:
188n/a # There's nowhere to send the error, so just log it.
189n/a if exc.errno in (errno.EMFILE, errno.ENFILE,
190n/a errno.ENOBUFS, errno.ENOMEM):
191n/a # Some platforms (e.g. Linux keep reporting the FD as
192n/a # ready, so we remove the read handler temporarily.
193n/a # We'll try again in a while.
194n/a self.call_exception_handler({
195n/a 'message': 'socket.accept() out of system resource',
196n/a 'exception': exc,
197n/a 'socket': sock,
198n/a })
199n/a self._remove_reader(sock.fileno())
200n/a self.call_later(constants.ACCEPT_RETRY_DELAY,
201n/a self._start_serving,
202n/a protocol_factory, sock, sslcontext, server,
203n/a backlog)
204n/a else:
205n/a raise # The event loop will catch, log and ignore it.
206n/a else:
207n/a extra = {'peername': addr}
208n/a accept = self._accept_connection2(protocol_factory, conn, extra,
209n/a sslcontext, server)
210n/a self.create_task(accept)
211n/a
212n/a @coroutine
213n/a def _accept_connection2(self, protocol_factory, conn, extra,
214n/a sslcontext=None, server=None):
215n/a protocol = None
216n/a transport = None
217n/a try:
218n/a protocol = protocol_factory()
219n/a waiter = self.create_future()
220n/a if sslcontext:
221n/a transport = self._make_ssl_transport(
222n/a conn, protocol, sslcontext, waiter=waiter,
223n/a server_side=True, extra=extra, server=server)
224n/a else:
225n/a transport = self._make_socket_transport(
226n/a conn, protocol, waiter=waiter, extra=extra,
227n/a server=server)
228n/a
229n/a try:
230n/a yield from waiter
231n/a except:
232n/a transport.close()
233n/a raise
234n/a
235n/a # It's now up to the protocol to handle the connection.
236n/a except Exception as exc:
237n/a if self._debug:
238n/a context = {
239n/a 'message': ('Error on transport creation '
240n/a 'for incoming connection'),
241n/a 'exception': exc,
242n/a }
243n/a if protocol is not None:
244n/a context['protocol'] = protocol
245n/a if transport is not None:
246n/a context['transport'] = transport
247n/a self.call_exception_handler(context)
248n/a
249n/a def _ensure_fd_no_transport(self, fd):
250n/a try:
251n/a transport = self._transports[fd]
252n/a except KeyError:
253n/a pass
254n/a else:
255n/a if not transport.is_closing():
256n/a raise RuntimeError(
257n/a 'File descriptor {!r} is used by transport {!r}'.format(
258n/a fd, transport))
259n/a
260n/a def _add_reader(self, fd, callback, *args):
261n/a self._check_closed()
262n/a handle = events.Handle(callback, args, self)
263n/a try:
264n/a key = self._selector.get_key(fd)
265n/a except KeyError:
266n/a self._selector.register(fd, selectors.EVENT_READ,
267n/a (handle, None))
268n/a else:
269n/a mask, (reader, writer) = key.events, key.data
270n/a self._selector.modify(fd, mask | selectors.EVENT_READ,
271n/a (handle, writer))
272n/a if reader is not None:
273n/a reader.cancel()
274n/a
275n/a def _remove_reader(self, fd):
276n/a if self.is_closed():
277n/a return False
278n/a try:
279n/a key = self._selector.get_key(fd)
280n/a except KeyError:
281n/a return False
282n/a else:
283n/a mask, (reader, writer) = key.events, key.data
284n/a mask &= ~selectors.EVENT_READ
285n/a if not mask:
286n/a self._selector.unregister(fd)
287n/a else:
288n/a self._selector.modify(fd, mask, (None, writer))
289n/a
290n/a if reader is not None:
291n/a reader.cancel()
292n/a return True
293n/a else:
294n/a return False
295n/a
296n/a def _add_writer(self, fd, callback, *args):
297n/a self._check_closed()
298n/a handle = events.Handle(callback, args, self)
299n/a try:
300n/a key = self._selector.get_key(fd)
301n/a except KeyError:
302n/a self._selector.register(fd, selectors.EVENT_WRITE,
303n/a (None, handle))
304n/a else:
305n/a mask, (reader, writer) = key.events, key.data
306n/a self._selector.modify(fd, mask | selectors.EVENT_WRITE,
307n/a (reader, handle))
308n/a if writer is not None:
309n/a writer.cancel()
310n/a
311n/a def _remove_writer(self, fd):
312n/a """Remove a writer callback."""
313n/a if self.is_closed():
314n/a return False
315n/a try:
316n/a key = self._selector.get_key(fd)
317n/a except KeyError:
318n/a return False
319n/a else:
320n/a mask, (reader, writer) = key.events, key.data
321n/a # Remove both writer and connector.
322n/a mask &= ~selectors.EVENT_WRITE
323n/a if not mask:
324n/a self._selector.unregister(fd)
325n/a else:
326n/a self._selector.modify(fd, mask, (reader, None))
327n/a
328n/a if writer is not None:
329n/a writer.cancel()
330n/a return True
331n/a else:
332n/a return False
333n/a
334n/a def add_reader(self, fd, callback, *args):
335n/a """Add a reader callback."""
336n/a self._ensure_fd_no_transport(fd)
337n/a return self._add_reader(fd, callback, *args)
338n/a
339n/a def remove_reader(self, fd):
340n/a """Remove a reader callback."""
341n/a self._ensure_fd_no_transport(fd)
342n/a return self._remove_reader(fd)
343n/a
344n/a def add_writer(self, fd, callback, *args):
345n/a """Add a writer callback.."""
346n/a self._ensure_fd_no_transport(fd)
347n/a return self._add_writer(fd, callback, *args)
348n/a
349n/a def remove_writer(self, fd):
350n/a """Remove a writer callback."""
351n/a self._ensure_fd_no_transport(fd)
352n/a return self._remove_writer(fd)
353n/a
354n/a def sock_recv(self, sock, n):
355n/a """Receive data from the socket.
356n/a
357n/a The return value is a bytes object representing the data received.
358n/a The maximum amount of data to be received at once is specified by
359n/a nbytes.
360n/a
361n/a This method is a coroutine.
362n/a """
363n/a if self._debug and sock.gettimeout() != 0:
364n/a raise ValueError("the socket must be non-blocking")
365n/a fut = self.create_future()
366n/a self._sock_recv(fut, False, sock, n)
367n/a return fut
368n/a
369n/a def _sock_recv(self, fut, registered, sock, n):
370n/a # _sock_recv() can add itself as an I/O callback if the operation can't
371n/a # be done immediately. Don't use it directly, call sock_recv().
372n/a fd = sock.fileno()
373n/a if registered:
374n/a # Remove the callback early. It should be rare that the
375n/a # selector says the fd is ready but the call still returns
376n/a # EAGAIN, and I am willing to take a hit in that case in
377n/a # order to simplify the common case.
378n/a self.remove_reader(fd)
379n/a if fut.cancelled():
380n/a return
381n/a try:
382n/a data = sock.recv(n)
383n/a except (BlockingIOError, InterruptedError):
384n/a self.add_reader(fd, self._sock_recv, fut, True, sock, n)
385n/a except Exception as exc:
386n/a fut.set_exception(exc)
387n/a else:
388n/a fut.set_result(data)
389n/a
390n/a def sock_sendall(self, sock, data):
391n/a """Send data to the socket.
392n/a
393n/a The socket must be connected to a remote socket. This method continues
394n/a to send data from data until either all data has been sent or an
395n/a error occurs. None is returned on success. On error, an exception is
396n/a raised, and there is no way to determine how much data, if any, was
397n/a successfully processed by the receiving end of the connection.
398n/a
399n/a This method is a coroutine.
400n/a """
401n/a if self._debug and sock.gettimeout() != 0:
402n/a raise ValueError("the socket must be non-blocking")
403n/a fut = self.create_future()
404n/a if data:
405n/a self._sock_sendall(fut, False, sock, data)
406n/a else:
407n/a fut.set_result(None)
408n/a return fut
409n/a
410n/a def _sock_sendall(self, fut, registered, sock, data):
411n/a fd = sock.fileno()
412n/a
413n/a if registered:
414n/a self.remove_writer(fd)
415n/a if fut.cancelled():
416n/a return
417n/a
418n/a try:
419n/a n = sock.send(data)
420n/a except (BlockingIOError, InterruptedError):
421n/a n = 0
422n/a except Exception as exc:
423n/a fut.set_exception(exc)
424n/a return
425n/a
426n/a if n == len(data):
427n/a fut.set_result(None)
428n/a else:
429n/a if n:
430n/a data = data[n:]
431n/a self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
432n/a
433n/a @coroutine
434n/a def sock_connect(self, sock, address):
435n/a """Connect to a remote socket at address.
436n/a
437n/a This method is a coroutine.
438n/a """
439n/a if self._debug and sock.gettimeout() != 0:
440n/a raise ValueError("the socket must be non-blocking")
441n/a
442n/a if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
443n/a resolved = base_events._ensure_resolved(
444n/a address, family=sock.family, proto=sock.proto, loop=self)
445n/a if not resolved.done():
446n/a yield from resolved
447n/a _, _, _, _, address = resolved.result()[0]
448n/a
449n/a fut = self.create_future()
450n/a self._sock_connect(fut, sock, address)
451n/a return (yield from fut)
452n/a
453n/a def _sock_connect(self, fut, sock, address):
454n/a fd = sock.fileno()
455n/a try:
456n/a sock.connect(address)
457n/a except (BlockingIOError, InterruptedError):
458n/a # Issue #23618: When the C function connect() fails with EINTR, the
459n/a # connection runs in background. We have to wait until the socket
460n/a # becomes writable to be notified when the connection succeed or
461n/a # fails.
462n/a fut.add_done_callback(
463n/a functools.partial(self._sock_connect_done, fd))
464n/a self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
465n/a except Exception as exc:
466n/a fut.set_exception(exc)
467n/a else:
468n/a fut.set_result(None)
469n/a
470n/a def _sock_connect_done(self, fd, fut):
471n/a self.remove_writer(fd)
472n/a
473n/a def _sock_connect_cb(self, fut, sock, address):
474n/a if fut.cancelled():
475n/a return
476n/a
477n/a try:
478n/a err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
479n/a if err != 0:
480n/a # Jump to any except clause below.
481n/a raise OSError(err, 'Connect call failed %s' % (address,))
482n/a except (BlockingIOError, InterruptedError):
483n/a # socket is still registered, the callback will be retried later
484n/a pass
485n/a except Exception as exc:
486n/a fut.set_exception(exc)
487n/a else:
488n/a fut.set_result(None)
489n/a
490n/a def sock_accept(self, sock):
491n/a """Accept a connection.
492n/a
493n/a The socket must be bound to an address and listening for connections.
494n/a The return value is a pair (conn, address) where conn is a new socket
495n/a object usable to send and receive data on the connection, and address
496n/a is the address bound to the socket on the other end of the connection.
497n/a
498n/a This method is a coroutine.
499n/a """
500n/a if self._debug and sock.gettimeout() != 0:
501n/a raise ValueError("the socket must be non-blocking")
502n/a fut = self.create_future()
503n/a self._sock_accept(fut, False, sock)
504n/a return fut
505n/a
506n/a def _sock_accept(self, fut, registered, sock):
507n/a fd = sock.fileno()
508n/a if registered:
509n/a self.remove_reader(fd)
510n/a if fut.cancelled():
511n/a return
512n/a try:
513n/a conn, address = sock.accept()
514n/a conn.setblocking(False)
515n/a except (BlockingIOError, InterruptedError):
516n/a self.add_reader(fd, self._sock_accept, fut, True, sock)
517n/a except Exception as exc:
518n/a fut.set_exception(exc)
519n/a else:
520n/a fut.set_result((conn, address))
521n/a
522n/a def _process_events(self, event_list):
523n/a for key, mask in event_list:
524n/a fileobj, (reader, writer) = key.fileobj, key.data
525n/a if mask & selectors.EVENT_READ and reader is not None:
526n/a if reader._cancelled:
527n/a self._remove_reader(fileobj)
528n/a else:
529n/a self._add_callback(reader)
530n/a if mask & selectors.EVENT_WRITE and writer is not None:
531n/a if writer._cancelled:
532n/a self._remove_writer(fileobj)
533n/a else:
534n/a self._add_callback(writer)
535n/a
536n/a def _stop_serving(self, sock):
537n/a self._remove_reader(sock.fileno())
538n/a sock.close()
539n/a
540n/a
541n/aclass _SelectorTransport(transports._FlowControlMixin,
542n/a transports.Transport):
543n/a
544n/a max_size = 256 * 1024 # Buffer size passed to recv().
545n/a
546n/a _buffer_factory = bytearray # Constructs initial value for self._buffer.
547n/a
548n/a # Attribute used in the destructor: it must be set even if the constructor
549n/a # is not called (see _SelectorSslTransport which may start by raising an
550n/a # exception)
551n/a _sock = None
552n/a
553n/a def __init__(self, loop, sock, protocol, extra=None, server=None):
554n/a super().__init__(extra, loop)
555n/a self._extra['socket'] = sock
556n/a self._extra['sockname'] = sock.getsockname()
557n/a if 'peername' not in self._extra:
558n/a try:
559n/a self._extra['peername'] = sock.getpeername()
560n/a except socket.error:
561n/a self._extra['peername'] = None
562n/a self._sock = sock
563n/a self._sock_fd = sock.fileno()
564n/a self._protocol = protocol
565n/a self._protocol_connected = True
566n/a self._server = server
567n/a self._buffer = self._buffer_factory()
568n/a self._conn_lost = 0 # Set when call to connection_lost scheduled.
569n/a self._closing = False # Set when close() called.
570n/a if self._server is not None:
571n/a self._server._attach()
572n/a loop._transports[self._sock_fd] = self
573n/a
574n/a def __repr__(self):
575n/a info = [self.__class__.__name__]
576n/a if self._sock is None:
577n/a info.append('closed')
578n/a elif self._closing:
579n/a info.append('closing')
580n/a info.append('fd=%s' % self._sock_fd)
581n/a # test if the transport was closed
582n/a if self._loop is not None and not self._loop.is_closed():
583n/a polling = _test_selector_event(self._loop._selector,
584n/a self._sock_fd, selectors.EVENT_READ)
585n/a if polling:
586n/a info.append('read=polling')
587n/a else:
588n/a info.append('read=idle')
589n/a
590n/a polling = _test_selector_event(self._loop._selector,
591n/a self._sock_fd,
592n/a selectors.EVENT_WRITE)
593n/a if polling:
594n/a state = 'polling'
595n/a else:
596n/a state = 'idle'
597n/a
598n/a bufsize = self.get_write_buffer_size()
599n/a info.append('write=<%s, bufsize=%s>' % (state, bufsize))
600n/a return '<%s>' % ' '.join(info)
601n/a
602n/a def abort(self):
603n/a self._force_close(None)
604n/a
605n/a def set_protocol(self, protocol):
606n/a self._protocol = protocol
607n/a
608n/a def get_protocol(self):
609n/a return self._protocol
610n/a
611n/a def is_closing(self):
612n/a return self._closing
613n/a
614n/a def close(self):
615n/a if self._closing:
616n/a return
617n/a self._closing = True
618n/a self._loop._remove_reader(self._sock_fd)
619n/a if not self._buffer:
620n/a self._conn_lost += 1
621n/a self._loop._remove_writer(self._sock_fd)
622n/a self._loop.call_soon(self._call_connection_lost, None)
623n/a
624n/a # On Python 3.3 and older, objects with a destructor part of a reference
625n/a # cycle are never destroyed. It's not more the case on Python 3.4 thanks
626n/a # to the PEP 442.
627n/a if compat.PY34:
628n/a def __del__(self):
629n/a if self._sock is not None:
630n/a warnings.warn("unclosed transport %r" % self, ResourceWarning,
631n/a source=self)
632n/a self._sock.close()
633n/a
634n/a def _fatal_error(self, exc, message='Fatal error on transport'):
635n/a # Should be called from exception handler only.
636n/a if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
637n/a if self._loop.get_debug():
638n/a logger.debug("%r: %s", self, message, exc_info=True)
639n/a else:
640n/a self._loop.call_exception_handler({
641n/a 'message': message,
642n/a 'exception': exc,
643n/a 'transport': self,
644n/a 'protocol': self._protocol,
645n/a })
646n/a self._force_close(exc)
647n/a
648n/a def _force_close(self, exc):
649n/a if self._conn_lost:
650n/a return
651n/a if self._buffer:
652n/a self._buffer.clear()
653n/a self._loop._remove_writer(self._sock_fd)
654n/a if not self._closing:
655n/a self._closing = True
656n/a self._loop._remove_reader(self._sock_fd)
657n/a self._conn_lost += 1
658n/a self._loop.call_soon(self._call_connection_lost, exc)
659n/a
660n/a def _call_connection_lost(self, exc):
661n/a try:
662n/a if self._protocol_connected:
663n/a self._protocol.connection_lost(exc)
664n/a finally:
665n/a self._sock.close()
666n/a self._sock = None
667n/a self._protocol = None
668n/a self._loop = None
669n/a server = self._server
670n/a if server is not None:
671n/a server._detach()
672n/a self._server = None
673n/a
674n/a def get_write_buffer_size(self):
675n/a return len(self._buffer)
676n/a
677n/a
678n/aclass _SelectorSocketTransport(_SelectorTransport):
679n/a
680n/a def __init__(self, loop, sock, protocol, waiter=None,
681n/a extra=None, server=None):
682n/a super().__init__(loop, sock, protocol, extra, server)
683n/a self._eof = False
684n/a self._paused = False
685n/a
686n/a # Disable the Nagle algorithm -- small writes will be
687n/a # sent without waiting for the TCP ACK. This generally
688n/a # decreases the latency (in some cases significantly.)
689n/a _set_nodelay(self._sock)
690n/a
691n/a self._loop.call_soon(self._protocol.connection_made, self)
692n/a # only start reading when connection_made() has been called
693n/a self._loop.call_soon(self._loop._add_reader,
694n/a self._sock_fd, self._read_ready)
695n/a if waiter is not None:
696n/a # only wake up the waiter when connection_made() has been called
697n/a self._loop.call_soon(futures._set_result_unless_cancelled,
698n/a waiter, None)
699n/a
700n/a def pause_reading(self):
701n/a if self._closing:
702n/a raise RuntimeError('Cannot pause_reading() when closing')
703n/a if self._paused:
704n/a raise RuntimeError('Already paused')
705n/a self._paused = True
706n/a self._loop._remove_reader(self._sock_fd)
707n/a if self._loop.get_debug():
708n/a logger.debug("%r pauses reading", self)
709n/a
710n/a def resume_reading(self):
711n/a if not self._paused:
712n/a raise RuntimeError('Not paused')
713n/a self._paused = False
714n/a if self._closing:
715n/a return
716n/a self._loop._add_reader(self._sock_fd, self._read_ready)
717n/a if self._loop.get_debug():
718n/a logger.debug("%r resumes reading", self)
719n/a
720n/a def _read_ready(self):
721n/a if self._conn_lost:
722n/a return
723n/a try:
724n/a data = self._sock.recv(self.max_size)
725n/a except (BlockingIOError, InterruptedError):
726n/a pass
727n/a except Exception as exc:
728n/a self._fatal_error(exc, 'Fatal read error on socket transport')
729n/a else:
730n/a if data:
731n/a self._protocol.data_received(data)
732n/a else:
733n/a if self._loop.get_debug():
734n/a logger.debug("%r received EOF", self)
735n/a keep_open = self._protocol.eof_received()
736n/a if keep_open:
737n/a # We're keeping the connection open so the
738n/a # protocol can write more, but we still can't
739n/a # receive more, so remove the reader callback.
740n/a self._loop._remove_reader(self._sock_fd)
741n/a else:
742n/a self.close()
743n/a
744n/a def write(self, data):
745n/a if not isinstance(data, (bytes, bytearray, memoryview)):
746n/a raise TypeError('data argument must be a bytes-like object, '
747n/a 'not %r' % type(data).__name__)
748n/a if self._eof:
749n/a raise RuntimeError('Cannot call write() after write_eof()')
750n/a if not data:
751n/a return
752n/a
753n/a if self._conn_lost:
754n/a if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
755n/a logger.warning('socket.send() raised exception.')
756n/a self._conn_lost += 1
757n/a return
758n/a
759n/a if not self._buffer:
760n/a # Optimization: try to send now.
761n/a try:
762n/a n = self._sock.send(data)
763n/a except (BlockingIOError, InterruptedError):
764n/a pass
765n/a except Exception as exc:
766n/a self._fatal_error(exc, 'Fatal write error on socket transport')
767n/a return
768n/a else:
769n/a data = data[n:]
770n/a if not data:
771n/a return
772n/a # Not all was written; register write handler.
773n/a self._loop._add_writer(self._sock_fd, self._write_ready)
774n/a
775n/a # Add it to the buffer.
776n/a self._buffer.extend(data)
777n/a self._maybe_pause_protocol()
778n/a
779n/a def _write_ready(self):
780n/a assert self._buffer, 'Data should not be empty'
781n/a
782n/a if self._conn_lost:
783n/a return
784n/a try:
785n/a n = self._sock.send(self._buffer)
786n/a except (BlockingIOError, InterruptedError):
787n/a pass
788n/a except Exception as exc:
789n/a self._loop._remove_writer(self._sock_fd)
790n/a self._buffer.clear()
791n/a self._fatal_error(exc, 'Fatal write error on socket transport')
792n/a else:
793n/a if n:
794n/a del self._buffer[:n]
795n/a self._maybe_resume_protocol() # May append to buffer.
796n/a if not self._buffer:
797n/a self._loop._remove_writer(self._sock_fd)
798n/a if self._closing:
799n/a self._call_connection_lost(None)
800n/a elif self._eof:
801n/a self._sock.shutdown(socket.SHUT_WR)
802n/a
803n/a def write_eof(self):
804n/a if self._eof:
805n/a return
806n/a self._eof = True
807n/a if not self._buffer:
808n/a self._sock.shutdown(socket.SHUT_WR)
809n/a
810n/a def can_write_eof(self):
811n/a return True
812n/a
813n/a
814n/aclass _SelectorSslTransport(_SelectorTransport):
815n/a
816n/a _buffer_factory = bytearray
817n/a
818n/a def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
819n/a server_side=False, server_hostname=None,
820n/a extra=None, server=None):
821n/a if ssl is None:
822n/a raise RuntimeError('stdlib ssl module not available')
823n/a
824n/a if not sslcontext:
825n/a sslcontext = sslproto._create_transport_context(server_side, server_hostname)
826n/a
827n/a wrap_kwargs = {
828n/a 'server_side': server_side,
829n/a 'do_handshake_on_connect': False,
830n/a }
831n/a if server_hostname and not server_side:
832n/a wrap_kwargs['server_hostname'] = server_hostname
833n/a sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
834n/a
835n/a super().__init__(loop, sslsock, protocol, extra, server)
836n/a # the protocol connection is only made after the SSL handshake
837n/a self._protocol_connected = False
838n/a
839n/a self._server_hostname = server_hostname
840n/a self._waiter = waiter
841n/a self._sslcontext = sslcontext
842n/a self._paused = False
843n/a
844n/a # SSL-specific extra info. (peercert is set later)
845n/a self._extra.update(sslcontext=sslcontext)
846n/a
847n/a if self._loop.get_debug():
848n/a logger.debug("%r starts SSL handshake", self)
849n/a start_time = self._loop.time()
850n/a else:
851n/a start_time = None
852n/a self._on_handshake(start_time)
853n/a
854n/a def _wakeup_waiter(self, exc=None):
855n/a if self._waiter is None:
856n/a return
857n/a if not self._waiter.cancelled():
858n/a if exc is not None:
859n/a self._waiter.set_exception(exc)
860n/a else:
861n/a self._waiter.set_result(None)
862n/a self._waiter = None
863n/a
864n/a def _on_handshake(self, start_time):
865n/a try:
866n/a self._sock.do_handshake()
867n/a except ssl.SSLWantReadError:
868n/a self._loop._add_reader(self._sock_fd,
869n/a self._on_handshake, start_time)
870n/a return
871n/a except ssl.SSLWantWriteError:
872n/a self._loop._add_writer(self._sock_fd,
873n/a self._on_handshake, start_time)
874n/a return
875n/a except BaseException as exc:
876n/a if self._loop.get_debug():
877n/a logger.warning("%r: SSL handshake failed",
878n/a self, exc_info=True)
879n/a self._loop._remove_reader(self._sock_fd)
880n/a self._loop._remove_writer(self._sock_fd)
881n/a self._sock.close()
882n/a self._wakeup_waiter(exc)
883n/a if isinstance(exc, Exception):
884n/a return
885n/a else:
886n/a raise
887n/a
888n/a self._loop._remove_reader(self._sock_fd)
889n/a self._loop._remove_writer(self._sock_fd)
890n/a
891n/a peercert = self._sock.getpeercert()
892n/a if not hasattr(self._sslcontext, 'check_hostname'):
893n/a # Verify hostname if requested, Python 3.4+ uses check_hostname
894n/a # and checks the hostname in do_handshake()
895n/a if (self._server_hostname and
896n/a self._sslcontext.verify_mode != ssl.CERT_NONE):
897n/a try:
898n/a ssl.match_hostname(peercert, self._server_hostname)
899n/a except Exception as exc:
900n/a if self._loop.get_debug():
901n/a logger.warning("%r: SSL handshake failed "
902n/a "on matching the hostname",
903n/a self, exc_info=True)
904n/a self._sock.close()
905n/a self._wakeup_waiter(exc)
906n/a return
907n/a
908n/a # Add extra info that becomes available after handshake.
909n/a self._extra.update(peercert=peercert,
910n/a cipher=self._sock.cipher(),
911n/a compression=self._sock.compression(),
912n/a ssl_object=self._sock,
913n/a )
914n/a
915n/a self._read_wants_write = False
916n/a self._write_wants_read = False
917n/a self._loop._add_reader(self._sock_fd, self._read_ready)
918n/a self._protocol_connected = True
919n/a self._loop.call_soon(self._protocol.connection_made, self)
920n/a # only wake up the waiter when connection_made() has been called
921n/a self._loop.call_soon(self._wakeup_waiter)
922n/a
923n/a if self._loop.get_debug():
924n/a dt = self._loop.time() - start_time
925n/a logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
926n/a
927n/a def pause_reading(self):
928n/a # XXX This is a bit icky, given the comment at the top of
929n/a # _read_ready(). Is it possible to evoke a deadlock? I don't
930n/a # know, although it doesn't look like it; write() will still
931n/a # accept more data for the buffer and eventually the app will
932n/a # call resume_reading() again, and things will flow again.
933n/a
934n/a if self._closing:
935n/a raise RuntimeError('Cannot pause_reading() when closing')
936n/a if self._paused:
937n/a raise RuntimeError('Already paused')
938n/a self._paused = True
939n/a self._loop._remove_reader(self._sock_fd)
940n/a if self._loop.get_debug():
941n/a logger.debug("%r pauses reading", self)
942n/a
943n/a def resume_reading(self):
944n/a if not self._paused:
945n/a raise RuntimeError('Not paused')
946n/a self._paused = False
947n/a if self._closing:
948n/a return
949n/a self._loop._add_reader(self._sock_fd, self._read_ready)
950n/a if self._loop.get_debug():
951n/a logger.debug("%r resumes reading", self)
952n/a
953n/a def _read_ready(self):
954n/a if self._conn_lost:
955n/a return
956n/a if self._write_wants_read:
957n/a self._write_wants_read = False
958n/a self._write_ready()
959n/a
960n/a if self._buffer:
961n/a self._loop._add_writer(self._sock_fd, self._write_ready)
962n/a
963n/a try:
964n/a data = self._sock.recv(self.max_size)
965n/a except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
966n/a pass
967n/a except ssl.SSLWantWriteError:
968n/a self._read_wants_write = True
969n/a self._loop._remove_reader(self._sock_fd)
970n/a self._loop._add_writer(self._sock_fd, self._write_ready)
971n/a except Exception as exc:
972n/a self._fatal_error(exc, 'Fatal read error on SSL transport')
973n/a else:
974n/a if data:
975n/a self._protocol.data_received(data)
976n/a else:
977n/a try:
978n/a if self._loop.get_debug():
979n/a logger.debug("%r received EOF", self)
980n/a keep_open = self._protocol.eof_received()
981n/a if keep_open:
982n/a logger.warning('returning true from eof_received() '
983n/a 'has no effect when using ssl')
984n/a finally:
985n/a self.close()
986n/a
987n/a def _write_ready(self):
988n/a if self._conn_lost:
989n/a return
990n/a if self._read_wants_write:
991n/a self._read_wants_write = False
992n/a self._read_ready()
993n/a
994n/a if not (self._paused or self._closing):
995n/a self._loop._add_reader(self._sock_fd, self._read_ready)
996n/a
997n/a if self._buffer:
998n/a try:
999n/a n = self._sock.send(self._buffer)
1000n/a except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
1001n/a n = 0
1002n/a except ssl.SSLWantReadError:
1003n/a n = 0
1004n/a self._loop._remove_writer(self._sock_fd)
1005n/a self._write_wants_read = True
1006n/a except Exception as exc:
1007n/a self._loop._remove_writer(self._sock_fd)
1008n/a self._buffer.clear()
1009n/a self._fatal_error(exc, 'Fatal write error on SSL transport')
1010n/a return
1011n/a
1012n/a if n:
1013n/a del self._buffer[:n]
1014n/a
1015n/a self._maybe_resume_protocol() # May append to buffer.
1016n/a
1017n/a if not self._buffer:
1018n/a self._loop._remove_writer(self._sock_fd)
1019n/a if self._closing:
1020n/a self._call_connection_lost(None)
1021n/a
1022n/a def write(self, data):
1023n/a if not isinstance(data, (bytes, bytearray, memoryview)):
1024n/a raise TypeError('data argument must be a bytes-like object, '
1025n/a 'not %r' % type(data).__name__)
1026n/a if not data:
1027n/a return
1028n/a
1029n/a if self._conn_lost:
1030n/a if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1031n/a logger.warning('socket.send() raised exception.')
1032n/a self._conn_lost += 1
1033n/a return
1034n/a
1035n/a if not self._buffer:
1036n/a self._loop._add_writer(self._sock_fd, self._write_ready)
1037n/a
1038n/a # Add it to the buffer.
1039n/a self._buffer.extend(data)
1040n/a self._maybe_pause_protocol()
1041n/a
1042n/a def can_write_eof(self):
1043n/a return False
1044n/a
1045n/a
1046n/aclass _SelectorDatagramTransport(_SelectorTransport):
1047n/a
1048n/a _buffer_factory = collections.deque
1049n/a
1050n/a def __init__(self, loop, sock, protocol, address=None,
1051n/a waiter=None, extra=None):
1052n/a super().__init__(loop, sock, protocol, extra)
1053n/a self._address = address
1054n/a self._loop.call_soon(self._protocol.connection_made, self)
1055n/a # only start reading when connection_made() has been called
1056n/a self._loop.call_soon(self._loop._add_reader,
1057n/a self._sock_fd, self._read_ready)
1058n/a if waiter is not None:
1059n/a # only wake up the waiter when connection_made() has been called
1060n/a self._loop.call_soon(futures._set_result_unless_cancelled,
1061n/a waiter, None)
1062n/a
1063n/a def get_write_buffer_size(self):
1064n/a return sum(len(data) for data, _ in self._buffer)
1065n/a
1066n/a def _read_ready(self):
1067n/a if self._conn_lost:
1068n/a return
1069n/a try:
1070n/a data, addr = self._sock.recvfrom(self.max_size)
1071n/a except (BlockingIOError, InterruptedError):
1072n/a pass
1073n/a except OSError as exc:
1074n/a self._protocol.error_received(exc)
1075n/a except Exception as exc:
1076n/a self._fatal_error(exc, 'Fatal read error on datagram transport')
1077n/a else:
1078n/a self._protocol.datagram_received(data, addr)
1079n/a
1080n/a def sendto(self, data, addr=None):
1081n/a if not isinstance(data, (bytes, bytearray, memoryview)):
1082n/a raise TypeError('data argument must be a bytes-like object, '
1083n/a 'not %r' % type(data).__name__)
1084n/a if not data:
1085n/a return
1086n/a
1087n/a if self._address and addr not in (None, self._address):
1088n/a raise ValueError('Invalid address: must be None or %s' %
1089n/a (self._address,))
1090n/a
1091n/a if self._conn_lost and self._address:
1092n/a if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1093n/a logger.warning('socket.send() raised exception.')
1094n/a self._conn_lost += 1
1095n/a return
1096n/a
1097n/a if not self._buffer:
1098n/a # Attempt to send it right away first.
1099n/a try:
1100n/a if self._address:
1101n/a self._sock.send(data)
1102n/a else:
1103n/a self._sock.sendto(data, addr)
1104n/a return
1105n/a except (BlockingIOError, InterruptedError):
1106n/a self._loop._add_writer(self._sock_fd, self._sendto_ready)
1107n/a except OSError as exc:
1108n/a self._protocol.error_received(exc)
1109n/a return
1110n/a except Exception as exc:
1111n/a self._fatal_error(exc,
1112n/a 'Fatal write error on datagram transport')
1113n/a return
1114n/a
1115n/a # Ensure that what we buffer is immutable.
1116n/a self._buffer.append((bytes(data), addr))
1117n/a self._maybe_pause_protocol()
1118n/a
1119n/a def _sendto_ready(self):
1120n/a while self._buffer:
1121n/a data, addr = self._buffer.popleft()
1122n/a try:
1123n/a if self._address:
1124n/a self._sock.send(data)
1125n/a else:
1126n/a self._sock.sendto(data, addr)
1127n/a except (BlockingIOError, InterruptedError):
1128n/a self._buffer.appendleft((data, addr)) # Try again later.
1129n/a break
1130n/a except OSError as exc:
1131n/a self._protocol.error_received(exc)
1132n/a return
1133n/a except Exception as exc:
1134n/a self._fatal_error(exc,
1135n/a 'Fatal write error on datagram transport')
1136n/a return
1137n/a
1138n/a self._maybe_resume_protocol() # May append to buffer.
1139n/a if not self._buffer:
1140n/a self._loop._remove_writer(self._sock_fd)
1141n/a if self._closing:
1142n/a self._call_connection_lost(None)