ยปCore Development>Code coverage>Lib/asyncio/proactor_events.py

Python code coverage for Lib/asyncio/proactor_events.py

#countcontent
1n/a"""Event loop using a proactor and related classes.
2n/a
3n/aA proactor is a "notify-on-completion" multiplexer. Currently a
4n/aproactor is only implemented on Windows with IOCP.
5n/a"""
6n/a
7n/a__all__ = ['BaseProactorEventLoop']
8n/a
9n/aimport socket
10n/aimport warnings
11n/a
12n/afrom . import base_events
13n/afrom . import compat
14n/afrom . import constants
15n/afrom . import futures
16n/afrom . import sslproto
17n/afrom . import transports
18n/afrom .log import logger
19n/a
20n/a
21n/aclass _ProactorBasePipeTransport(transports._FlowControlMixin,
22n/a transports.BaseTransport):
23n/a """Base class for pipe and socket transports."""
24n/a
25n/a def __init__(self, loop, sock, protocol, waiter=None,
26n/a extra=None, server=None):
27n/a super().__init__(extra, loop)
28n/a self._set_extra(sock)
29n/a self._sock = sock
30n/a self._protocol = protocol
31n/a self._server = server
32n/a self._buffer = None # None or bytearray.
33n/a self._read_fut = None
34n/a self._write_fut = None
35n/a self._pending_write = 0
36n/a self._conn_lost = 0
37n/a self._closing = False # Set when close() called.
38n/a self._eof_written = False
39n/a if self._server is not None:
40n/a self._server._attach()
41n/a self._loop.call_soon(self._protocol.connection_made, self)
42n/a if waiter is not None:
43n/a # only wake up the waiter when connection_made() has been called
44n/a self._loop.call_soon(futures._set_result_unless_cancelled,
45n/a waiter, None)
46n/a
47n/a def __repr__(self):
48n/a info = [self.__class__.__name__]
49n/a if self._sock is None:
50n/a info.append('closed')
51n/a elif self._closing:
52n/a info.append('closing')
53n/a if self._sock is not None:
54n/a info.append('fd=%s' % self._sock.fileno())
55n/a if self._read_fut is not None:
56n/a info.append('read=%s' % self._read_fut)
57n/a if self._write_fut is not None:
58n/a info.append("write=%r" % self._write_fut)
59n/a if self._buffer:
60n/a bufsize = len(self._buffer)
61n/a info.append('write_bufsize=%s' % bufsize)
62n/a if self._eof_written:
63n/a info.append('EOF written')
64n/a return '<%s>' % ' '.join(info)
65n/a
66n/a def _set_extra(self, sock):
67n/a self._extra['pipe'] = sock
68n/a
69n/a def set_protocol(self, protocol):
70n/a self._protocol = protocol
71n/a
72n/a def get_protocol(self):
73n/a return self._protocol
74n/a
75n/a def is_closing(self):
76n/a return self._closing
77n/a
78n/a def close(self):
79n/a if self._closing:
80n/a return
81n/a self._closing = True
82n/a self._conn_lost += 1
83n/a if not self._buffer and self._write_fut is None:
84n/a self._loop.call_soon(self._call_connection_lost, None)
85n/a if self._read_fut is not None:
86n/a self._read_fut.cancel()
87n/a self._read_fut = None
88n/a
89n/a # On Python 3.3 and older, objects with a destructor part of a reference
90n/a # cycle are never destroyed. It's not more the case on Python 3.4 thanks
91n/a # to the PEP 442.
92n/a if compat.PY34:
93n/a def __del__(self):
94n/a if self._sock is not None:
95n/a warnings.warn("unclosed transport %r" % self, ResourceWarning,
96n/a source=self)
97n/a self.close()
98n/a
99n/a def _fatal_error(self, exc, message='Fatal error on pipe transport'):
100n/a if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
101n/a if self._loop.get_debug():
102n/a logger.debug("%r: %s", self, message, exc_info=True)
103n/a else:
104n/a self._loop.call_exception_handler({
105n/a 'message': message,
106n/a 'exception': exc,
107n/a 'transport': self,
108n/a 'protocol': self._protocol,
109n/a })
110n/a self._force_close(exc)
111n/a
112n/a def _force_close(self, exc):
113n/a if self._closing:
114n/a return
115n/a self._closing = True
116n/a self._conn_lost += 1
117n/a if self._write_fut:
118n/a self._write_fut.cancel()
119n/a self._write_fut = None
120n/a if self._read_fut:
121n/a self._read_fut.cancel()
122n/a self._read_fut = None
123n/a self._pending_write = 0
124n/a self._buffer = None
125n/a self._loop.call_soon(self._call_connection_lost, exc)
126n/a
127n/a def _call_connection_lost(self, exc):
128n/a try:
129n/a self._protocol.connection_lost(exc)
130n/a finally:
131n/a # XXX If there is a pending overlapped read on the other
132n/a # end then it may fail with ERROR_NETNAME_DELETED if we
133n/a # just close our end. First calling shutdown() seems to
134n/a # cure it, but maybe using DisconnectEx() would be better.
135n/a if hasattr(self._sock, 'shutdown'):
136n/a self._sock.shutdown(socket.SHUT_RDWR)
137n/a self._sock.close()
138n/a self._sock = None
139n/a server = self._server
140n/a if server is not None:
141n/a server._detach()
142n/a self._server = None
143n/a
144n/a def get_write_buffer_size(self):
145n/a size = self._pending_write
146n/a if self._buffer is not None:
147n/a size += len(self._buffer)
148n/a return size
149n/a
150n/a
151n/aclass _ProactorReadPipeTransport(_ProactorBasePipeTransport,
152n/a transports.ReadTransport):
153n/a """Transport for read pipes."""
154n/a
155n/a def __init__(self, loop, sock, protocol, waiter=None,
156n/a extra=None, server=None):
157n/a super().__init__(loop, sock, protocol, waiter, extra, server)
158n/a self._paused = False
159n/a self._loop.call_soon(self._loop_reading)
160n/a
161n/a def pause_reading(self):
162n/a if self._closing:
163n/a raise RuntimeError('Cannot pause_reading() when closing')
164n/a if self._paused:
165n/a raise RuntimeError('Already paused')
166n/a self._paused = True
167n/a if self._loop.get_debug():
168n/a logger.debug("%r pauses reading", self)
169n/a
170n/a def resume_reading(self):
171n/a if not self._paused:
172n/a raise RuntimeError('Not paused')
173n/a self._paused = False
174n/a if self._closing:
175n/a return
176n/a self._loop.call_soon(self._loop_reading, self._read_fut)
177n/a if self._loop.get_debug():
178n/a logger.debug("%r resumes reading", self)
179n/a
180n/a def _loop_reading(self, fut=None):
181n/a if self._paused:
182n/a return
183n/a data = None
184n/a
185n/a try:
186n/a if fut is not None:
187n/a assert self._read_fut is fut or (self._read_fut is None and
188n/a self._closing)
189n/a self._read_fut = None
190n/a data = fut.result() # deliver data later in "finally" clause
191n/a
192n/a if self._closing:
193n/a # since close() has been called we ignore any read data
194n/a data = None
195n/a return
196n/a
197n/a if data == b'':
198n/a # we got end-of-file so no need to reschedule a new read
199n/a return
200n/a
201n/a # reschedule a new read
202n/a self._read_fut = self._loop._proactor.recv(self._sock, 4096)
203n/a except ConnectionAbortedError as exc:
204n/a if not self._closing:
205n/a self._fatal_error(exc, 'Fatal read error on pipe transport')
206n/a elif self._loop.get_debug():
207n/a logger.debug("Read error on pipe transport while closing",
208n/a exc_info=True)
209n/a except ConnectionResetError as exc:
210n/a self._force_close(exc)
211n/a except OSError as exc:
212n/a self._fatal_error(exc, 'Fatal read error on pipe transport')
213n/a except futures.CancelledError:
214n/a if not self._closing:
215n/a raise
216n/a else:
217n/a self._read_fut.add_done_callback(self._loop_reading)
218n/a finally:
219n/a if data:
220n/a self._protocol.data_received(data)
221n/a elif data is not None:
222n/a if self._loop.get_debug():
223n/a logger.debug("%r received EOF", self)
224n/a keep_open = self._protocol.eof_received()
225n/a if not keep_open:
226n/a self.close()
227n/a
228n/a
229n/aclass _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
230n/a transports.WriteTransport):
231n/a """Transport for write pipes."""
232n/a
233n/a def write(self, data):
234n/a if not isinstance(data, (bytes, bytearray, memoryview)):
235n/a raise TypeError('data argument must be byte-ish (%r)',
236n/a type(data))
237n/a if self._eof_written:
238n/a raise RuntimeError('write_eof() already called')
239n/a
240n/a if not data:
241n/a return
242n/a
243n/a if self._conn_lost:
244n/a if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
245n/a logger.warning('socket.send() raised exception.')
246n/a self._conn_lost += 1
247n/a return
248n/a
249n/a # Observable states:
250n/a # 1. IDLE: _write_fut and _buffer both None
251n/a # 2. WRITING: _write_fut set; _buffer None
252n/a # 3. BACKED UP: _write_fut set; _buffer a bytearray
253n/a # We always copy the data, so the caller can't modify it
254n/a # while we're still waiting for the I/O to happen.
255n/a if self._write_fut is None: # IDLE -> WRITING
256n/a assert self._buffer is None
257n/a # Pass a copy, except if it's already immutable.
258n/a self._loop_writing(data=bytes(data))
259n/a elif not self._buffer: # WRITING -> BACKED UP
260n/a # Make a mutable copy which we can extend.
261n/a self._buffer = bytearray(data)
262n/a self._maybe_pause_protocol()
263n/a else: # BACKED UP
264n/a # Append to buffer (also copies).
265n/a self._buffer.extend(data)
266n/a self._maybe_pause_protocol()
267n/a
268n/a def _loop_writing(self, f=None, data=None):
269n/a try:
270n/a assert f is self._write_fut
271n/a self._write_fut = None
272n/a self._pending_write = 0
273n/a if f:
274n/a f.result()
275n/a if data is None:
276n/a data = self._buffer
277n/a self._buffer = None
278n/a if not data:
279n/a if self._closing:
280n/a self._loop.call_soon(self._call_connection_lost, None)
281n/a if self._eof_written:
282n/a self._sock.shutdown(socket.SHUT_WR)
283n/a # Now that we've reduced the buffer size, tell the
284n/a # protocol to resume writing if it was paused. Note that
285n/a # we do this last since the callback is called immediately
286n/a # and it may add more data to the buffer (even causing the
287n/a # protocol to be paused again).
288n/a self._maybe_resume_protocol()
289n/a else:
290n/a self._write_fut = self._loop._proactor.send(self._sock, data)
291n/a if not self._write_fut.done():
292n/a assert self._pending_write == 0
293n/a self._pending_write = len(data)
294n/a self._write_fut.add_done_callback(self._loop_writing)
295n/a self._maybe_pause_protocol()
296n/a else:
297n/a self._write_fut.add_done_callback(self._loop_writing)
298n/a except ConnectionResetError as exc:
299n/a self._force_close(exc)
300n/a except OSError as exc:
301n/a self._fatal_error(exc, 'Fatal write error on pipe transport')
302n/a
303n/a def can_write_eof(self):
304n/a return True
305n/a
306n/a def write_eof(self):
307n/a self.close()
308n/a
309n/a def abort(self):
310n/a self._force_close(None)
311n/a
312n/a
313n/aclass _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
314n/a def __init__(self, *args, **kw):
315n/a super().__init__(*args, **kw)
316n/a self._read_fut = self._loop._proactor.recv(self._sock, 16)
317n/a self._read_fut.add_done_callback(self._pipe_closed)
318n/a
319n/a def _pipe_closed(self, fut):
320n/a if fut.cancelled():
321n/a # the transport has been closed
322n/a return
323n/a assert fut.result() == b''
324n/a if self._closing:
325n/a assert self._read_fut is None
326n/a return
327n/a assert fut is self._read_fut, (fut, self._read_fut)
328n/a self._read_fut = None
329n/a if self._write_fut is not None:
330n/a self._force_close(BrokenPipeError())
331n/a else:
332n/a self.close()
333n/a
334n/a
335n/aclass _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
336n/a _ProactorBaseWritePipeTransport,
337n/a transports.Transport):
338n/a """Transport for duplex pipes."""
339n/a
340n/a def can_write_eof(self):
341n/a return False
342n/a
343n/a def write_eof(self):
344n/a raise NotImplementedError
345n/a
346n/a
347n/aclass _ProactorSocketTransport(_ProactorReadPipeTransport,
348n/a _ProactorBaseWritePipeTransport,
349n/a transports.Transport):
350n/a """Transport for connected sockets."""
351n/a
352n/a def _set_extra(self, sock):
353n/a self._extra['socket'] = sock
354n/a try:
355n/a self._extra['sockname'] = sock.getsockname()
356n/a except (socket.error, AttributeError):
357n/a if self._loop.get_debug():
358n/a logger.warning("getsockname() failed on %r",
359n/a sock, exc_info=True)
360n/a if 'peername' not in self._extra:
361n/a try:
362n/a self._extra['peername'] = sock.getpeername()
363n/a except (socket.error, AttributeError):
364n/a if self._loop.get_debug():
365n/a logger.warning("getpeername() failed on %r",
366n/a sock, exc_info=True)
367n/a
368n/a def can_write_eof(self):
369n/a return True
370n/a
371n/a def write_eof(self):
372n/a if self._closing or self._eof_written:
373n/a return
374n/a self._eof_written = True
375n/a if self._write_fut is None:
376n/a self._sock.shutdown(socket.SHUT_WR)
377n/a
378n/a
379n/aclass BaseProactorEventLoop(base_events.BaseEventLoop):
380n/a
381n/a def __init__(self, proactor):
382n/a super().__init__()
383n/a logger.debug('Using proactor: %s', proactor.__class__.__name__)
384n/a self._proactor = proactor
385n/a self._selector = proactor # convenient alias
386n/a self._self_reading_future = None
387n/a self._accept_futures = {} # socket file descriptor => Future
388n/a proactor.set_loop(self)
389n/a self._make_self_pipe()
390n/a
391n/a def _make_socket_transport(self, sock, protocol, waiter=None,
392n/a extra=None, server=None):
393n/a return _ProactorSocketTransport(self, sock, protocol, waiter,
394n/a extra, server)
395n/a
396n/a def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
397n/a *, server_side=False, server_hostname=None,
398n/a extra=None, server=None):
399n/a if not sslproto._is_sslproto_available():
400n/a raise NotImplementedError("Proactor event loop requires Python 3.5"
401n/a " or newer (ssl.MemoryBIO) to support "
402n/a "SSL")
403n/a
404n/a ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
405n/a server_side, server_hostname)
406n/a _ProactorSocketTransport(self, rawsock, ssl_protocol,
407n/a extra=extra, server=server)
408n/a return ssl_protocol._app_transport
409n/a
410n/a def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
411n/a extra=None):
412n/a return _ProactorDuplexPipeTransport(self,
413n/a sock, protocol, waiter, extra)
414n/a
415n/a def _make_read_pipe_transport(self, sock, protocol, waiter=None,
416n/a extra=None):
417n/a return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
418n/a
419n/a def _make_write_pipe_transport(self, sock, protocol, waiter=None,
420n/a extra=None):
421n/a # We want connection_lost() to be called when other end closes
422n/a return _ProactorWritePipeTransport(self,
423n/a sock, protocol, waiter, extra)
424n/a
425n/a def close(self):
426n/a if self.is_running():
427n/a raise RuntimeError("Cannot close a running event loop")
428n/a if self.is_closed():
429n/a return
430n/a
431n/a # Call these methods before closing the event loop (before calling
432n/a # BaseEventLoop.close), because they can schedule callbacks with
433n/a # call_soon(), which is forbidden when the event loop is closed.
434n/a self._stop_accept_futures()
435n/a self._close_self_pipe()
436n/a self._proactor.close()
437n/a self._proactor = None
438n/a self._selector = None
439n/a
440n/a # Close the event loop
441n/a super().close()
442n/a
443n/a def sock_recv(self, sock, n):
444n/a return self._proactor.recv(sock, n)
445n/a
446n/a def sock_sendall(self, sock, data):
447n/a return self._proactor.send(sock, data)
448n/a
449n/a def sock_connect(self, sock, address):
450n/a return self._proactor.connect(sock, address)
451n/a
452n/a def sock_accept(self, sock):
453n/a return self._proactor.accept(sock)
454n/a
455n/a def _socketpair(self):
456n/a raise NotImplementedError
457n/a
458n/a def _close_self_pipe(self):
459n/a if self._self_reading_future is not None:
460n/a self._self_reading_future.cancel()
461n/a self._self_reading_future = None
462n/a self._ssock.close()
463n/a self._ssock = None
464n/a self._csock.close()
465n/a self._csock = None
466n/a self._internal_fds -= 1
467n/a
468n/a def _make_self_pipe(self):
469n/a # A self-socket, really. :-)
470n/a self._ssock, self._csock = self._socketpair()
471n/a self._ssock.setblocking(False)
472n/a self._csock.setblocking(False)
473n/a self._internal_fds += 1
474n/a self.call_soon(self._loop_self_reading)
475n/a
476n/a def _loop_self_reading(self, f=None):
477n/a try:
478n/a if f is not None:
479n/a f.result() # may raise
480n/a f = self._proactor.recv(self._ssock, 4096)
481n/a except futures.CancelledError:
482n/a # _close_self_pipe() has been called, stop waiting for data
483n/a return
484n/a except Exception as exc:
485n/a self.call_exception_handler({
486n/a 'message': 'Error on reading from the event loop self pipe',
487n/a 'exception': exc,
488n/a 'loop': self,
489n/a })
490n/a else:
491n/a self._self_reading_future = f
492n/a f.add_done_callback(self._loop_self_reading)
493n/a
494n/a def _write_to_self(self):
495n/a self._csock.send(b'\0')
496n/a
497n/a def _start_serving(self, protocol_factory, sock,
498n/a sslcontext=None, server=None, backlog=100):
499n/a
500n/a def loop(f=None):
501n/a try:
502n/a if f is not None:
503n/a conn, addr = f.result()
504n/a if self._debug:
505n/a logger.debug("%r got a new connection from %r: %r",
506n/a server, addr, conn)
507n/a protocol = protocol_factory()
508n/a if sslcontext is not None:
509n/a self._make_ssl_transport(
510n/a conn, protocol, sslcontext, server_side=True,
511n/a extra={'peername': addr}, server=server)
512n/a else:
513n/a self._make_socket_transport(
514n/a conn, protocol,
515n/a extra={'peername': addr}, server=server)
516n/a if self.is_closed():
517n/a return
518n/a f = self._proactor.accept(sock)
519n/a except OSError as exc:
520n/a if sock.fileno() != -1:
521n/a self.call_exception_handler({
522n/a 'message': 'Accept failed on a socket',
523n/a 'exception': exc,
524n/a 'socket': sock,
525n/a })
526n/a sock.close()
527n/a elif self._debug:
528n/a logger.debug("Accept failed on socket %r",
529n/a sock, exc_info=True)
530n/a except futures.CancelledError:
531n/a sock.close()
532n/a else:
533n/a self._accept_futures[sock.fileno()] = f
534n/a f.add_done_callback(loop)
535n/a
536n/a self.call_soon(loop)
537n/a
538n/a def _process_events(self, event_list):
539n/a # Events are processed in the IocpProactor._poll() method
540n/a pass
541n/a
542n/a def _stop_accept_futures(self):
543n/a for future in self._accept_futures.values():
544n/a future.cancel()
545n/a self._accept_futures.clear()
546n/a
547n/a def _stop_serving(self, sock):
548n/a self._stop_accept_futures()
549n/a self._proactor._stop_serving(sock)
550n/a sock.close()