ยปCore Development>Code coverage>Lib/asyncio/base_events.py

Python code coverage for Lib/asyncio/base_events.py

#countcontent
1n/a"""Base implementation of event loop.
2n/a
3n/aThe event loop can be broken up into a multiplexer (the part
4n/aresponsible for notifying us of I/O events) and the event loop proper,
5n/awhich wraps a multiplexer with functionality for scheduling callbacks,
6n/aimmediately or at a given time in the future.
7n/a
8n/aWhenever a public API takes a callback, subsequent positional
9n/aarguments will be passed to the callback if/when it is called. This
10n/aavoids the proliferation of trivial lambdas implementing closures.
11n/aKeyword arguments for the callback are not supported; this is a
12n/aconscious design decision, leaving the door open for keyword arguments
13n/ato modify the meaning of the API call itself.
14n/a"""
15n/a
16n/aimport collections
17n/aimport concurrent.futures
18n/aimport heapq
19n/aimport itertools
20n/aimport logging
21n/aimport os
22n/aimport socket
23n/aimport subprocess
24n/aimport threading
25n/aimport time
26n/aimport traceback
27n/aimport sys
28n/aimport warnings
29n/aimport weakref
30n/a
31n/afrom . import compat
32n/afrom . import coroutines
33n/afrom . import events
34n/afrom . import futures
35n/afrom . import tasks
36n/afrom .coroutines import coroutine
37n/afrom .log import logger
38n/a
39n/a
40n/a__all__ = ['BaseEventLoop']
41n/a
42n/a
43n/a# Minimum number of _scheduled timer handles before cleanup of
44n/a# cancelled handles is performed.
45n/a_MIN_SCHEDULED_TIMER_HANDLES = 100
46n/a
47n/a# Minimum fraction of _scheduled timer handles that are cancelled
48n/a# before cleanup of cancelled handles is performed.
49n/a_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
50n/a
51n/a# Exceptions which must not call the exception handler in fatal error
52n/a# methods (_fatal_error())
53n/a_FATAL_ERROR_IGNORE = (BrokenPipeError,
54n/a ConnectionResetError, ConnectionAbortedError)
55n/a
56n/a
57n/adef _format_handle(handle):
58n/a cb = handle._callback
59n/a if isinstance(getattr(cb, '__self__', None), tasks.Task):
60n/a # format the task
61n/a return repr(cb.__self__)
62n/a else:
63n/a return str(handle)
64n/a
65n/a
66n/adef _format_pipe(fd):
67n/a if fd == subprocess.PIPE:
68n/a return '<pipe>'
69n/a elif fd == subprocess.STDOUT:
70n/a return '<stdout>'
71n/a else:
72n/a return repr(fd)
73n/a
74n/a
75n/adef _set_reuseport(sock):
76n/a if not hasattr(socket, 'SO_REUSEPORT'):
77n/a raise ValueError('reuse_port not supported by socket module')
78n/a else:
79n/a try:
80n/a sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
81n/a except OSError:
82n/a raise ValueError('reuse_port not supported by socket module, '
83n/a 'SO_REUSEPORT defined but not implemented.')
84n/a
85n/a
86n/adef _is_stream_socket(sock):
87n/a # Linux's socket.type is a bitmask that can include extra info
88n/a # about socket, therefore we can't do simple
89n/a # `sock_type == socket.SOCK_STREAM`.
90n/a return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
91n/a
92n/a
93n/adef _is_dgram_socket(sock):
94n/a # Linux's socket.type is a bitmask that can include extra info
95n/a # about socket, therefore we can't do simple
96n/a # `sock_type == socket.SOCK_DGRAM`.
97n/a return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
98n/a
99n/a
100n/adef _ipaddr_info(host, port, family, type, proto):
101n/a # Try to skip getaddrinfo if "host" is already an IP. Users might have
102n/a # handled name resolution in their own code and pass in resolved IPs.
103n/a if not hasattr(socket, 'inet_pton'):
104n/a return
105n/a
106n/a if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
107n/a host is None:
108n/a return None
109n/a
110n/a if type == socket.SOCK_STREAM:
111n/a # Linux only:
112n/a # getaddrinfo() can raise when socket.type is a bit mask.
113n/a # So if socket.type is a bit mask of SOCK_STREAM, and say
114n/a # SOCK_NONBLOCK, we simply return None, which will trigger
115n/a # a call to getaddrinfo() letting it process this request.
116n/a proto = socket.IPPROTO_TCP
117n/a elif type == socket.SOCK_DGRAM:
118n/a proto = socket.IPPROTO_UDP
119n/a else:
120n/a return None
121n/a
122n/a if port is None:
123n/a port = 0
124n/a elif isinstance(port, bytes) and port == b'':
125n/a port = 0
126n/a elif isinstance(port, str) and port == '':
127n/a port = 0
128n/a else:
129n/a # If port's a service name like "http", don't skip getaddrinfo.
130n/a try:
131n/a port = int(port)
132n/a except (TypeError, ValueError):
133n/a return None
134n/a
135n/a if family == socket.AF_UNSPEC:
136n/a afs = [socket.AF_INET]
137n/a if hasattr(socket, 'AF_INET6'):
138n/a afs.append(socket.AF_INET6)
139n/a else:
140n/a afs = [family]
141n/a
142n/a if isinstance(host, bytes):
143n/a host = host.decode('idna')
144n/a if '%' in host:
145n/a # Linux's inet_pton doesn't accept an IPv6 zone index after host,
146n/a # like '::1%lo0'.
147n/a return None
148n/a
149n/a for af in afs:
150n/a try:
151n/a socket.inet_pton(af, host)
152n/a # The host has already been resolved.
153n/a return af, type, proto, '', (host, port)
154n/a except OSError:
155n/a pass
156n/a
157n/a # "host" is not an IP address.
158n/a return None
159n/a
160n/a
161n/adef _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
162n/a flags=0, loop):
163n/a host, port = address[:2]
164n/a info = _ipaddr_info(host, port, family, type, proto)
165n/a if info is not None:
166n/a # "host" is already a resolved IP.
167n/a fut = loop.create_future()
168n/a fut.set_result([info])
169n/a return fut
170n/a else:
171n/a return loop.getaddrinfo(host, port, family=family, type=type,
172n/a proto=proto, flags=flags)
173n/a
174n/a
175n/adef _run_until_complete_cb(fut):
176n/a exc = fut._exception
177n/a if (isinstance(exc, BaseException)
178n/a and not isinstance(exc, Exception)):
179n/a # Issue #22429: run_forever() already finished, no need to
180n/a # stop it.
181n/a return
182n/a fut._loop.stop()
183n/a
184n/a
185n/aclass Server(events.AbstractServer):
186n/a
187n/a def __init__(self, loop, sockets):
188n/a self._loop = loop
189n/a self.sockets = sockets
190n/a self._active_count = 0
191n/a self._waiters = []
192n/a
193n/a def __repr__(self):
194n/a return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
195n/a
196n/a def _attach(self):
197n/a assert self.sockets is not None
198n/a self._active_count += 1
199n/a
200n/a def _detach(self):
201n/a assert self._active_count > 0
202n/a self._active_count -= 1
203n/a if self._active_count == 0 and self.sockets is None:
204n/a self._wakeup()
205n/a
206n/a def close(self):
207n/a sockets = self.sockets
208n/a if sockets is None:
209n/a return
210n/a self.sockets = None
211n/a for sock in sockets:
212n/a self._loop._stop_serving(sock)
213n/a if self._active_count == 0:
214n/a self._wakeup()
215n/a
216n/a def _wakeup(self):
217n/a waiters = self._waiters
218n/a self._waiters = None
219n/a for waiter in waiters:
220n/a if not waiter.done():
221n/a waiter.set_result(waiter)
222n/a
223n/a @coroutine
224n/a def wait_closed(self):
225n/a if self.sockets is None or self._waiters is None:
226n/a return
227n/a waiter = self._loop.create_future()
228n/a self._waiters.append(waiter)
229n/a yield from waiter
230n/a
231n/a
232n/aclass BaseEventLoop(events.AbstractEventLoop):
233n/a
234n/a def __init__(self):
235n/a self._timer_cancelled_count = 0
236n/a self._closed = False
237n/a self._stopping = False
238n/a self._ready = collections.deque()
239n/a self._scheduled = []
240n/a self._default_executor = None
241n/a self._internal_fds = 0
242n/a # Identifier of the thread running the event loop, or None if the
243n/a # event loop is not running
244n/a self._thread_id = None
245n/a self._clock_resolution = time.get_clock_info('monotonic').resolution
246n/a self._exception_handler = None
247n/a self.set_debug((not sys.flags.ignore_environment
248n/a and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
249n/a # In debug mode, if the execution of a callback or a step of a task
250n/a # exceed this duration in seconds, the slow callback/task is logged.
251n/a self.slow_callback_duration = 0.1
252n/a self._current_handle = None
253n/a self._task_factory = None
254n/a self._coroutine_wrapper_set = False
255n/a
256n/a if hasattr(sys, 'get_asyncgen_hooks'):
257n/a # Python >= 3.6
258n/a # A weak set of all asynchronous generators that are
259n/a # being iterated by the loop.
260n/a self._asyncgens = weakref.WeakSet()
261n/a else:
262n/a self._asyncgens = None
263n/a
264n/a # Set to True when `loop.shutdown_asyncgens` is called.
265n/a self._asyncgens_shutdown_called = False
266n/a
267n/a def __repr__(self):
268n/a return ('<%s running=%s closed=%s debug=%s>'
269n/a % (self.__class__.__name__, self.is_running(),
270n/a self.is_closed(), self.get_debug()))
271n/a
272n/a def create_future(self):
273n/a """Create a Future object attached to the loop."""
274n/a return futures.Future(loop=self)
275n/a
276n/a def create_task(self, coro):
277n/a """Schedule a coroutine object.
278n/a
279n/a Return a task object.
280n/a """
281n/a self._check_closed()
282n/a if self._task_factory is None:
283n/a task = tasks.Task(coro, loop=self)
284n/a if task._source_traceback:
285n/a del task._source_traceback[-1]
286n/a else:
287n/a task = self._task_factory(self, coro)
288n/a return task
289n/a
290n/a def set_task_factory(self, factory):
291n/a """Set a task factory that will be used by loop.create_task().
292n/a
293n/a If factory is None the default task factory will be set.
294n/a
295n/a If factory is a callable, it should have a signature matching
296n/a '(loop, coro)', where 'loop' will be a reference to the active
297n/a event loop, 'coro' will be a coroutine object. The callable
298n/a must return a Future.
299n/a """
300n/a if factory is not None and not callable(factory):
301n/a raise TypeError('task factory must be a callable or None')
302n/a self._task_factory = factory
303n/a
304n/a def get_task_factory(self):
305n/a """Return a task factory, or None if the default one is in use."""
306n/a return self._task_factory
307n/a
308n/a def _make_socket_transport(self, sock, protocol, waiter=None, *,
309n/a extra=None, server=None):
310n/a """Create socket transport."""
311n/a raise NotImplementedError
312n/a
313n/a def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
314n/a *, server_side=False, server_hostname=None,
315n/a extra=None, server=None):
316n/a """Create SSL transport."""
317n/a raise NotImplementedError
318n/a
319n/a def _make_datagram_transport(self, sock, protocol,
320n/a address=None, waiter=None, extra=None):
321n/a """Create datagram transport."""
322n/a raise NotImplementedError
323n/a
324n/a def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
325n/a extra=None):
326n/a """Create read pipe transport."""
327n/a raise NotImplementedError
328n/a
329n/a def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
330n/a extra=None):
331n/a """Create write pipe transport."""
332n/a raise NotImplementedError
333n/a
334n/a @coroutine
335n/a def _make_subprocess_transport(self, protocol, args, shell,
336n/a stdin, stdout, stderr, bufsize,
337n/a extra=None, **kwargs):
338n/a """Create subprocess transport."""
339n/a raise NotImplementedError
340n/a
341n/a def _write_to_self(self):
342n/a """Write a byte to self-pipe, to wake up the event loop.
343n/a
344n/a This may be called from a different thread.
345n/a
346n/a The subclass is responsible for implementing the self-pipe.
347n/a """
348n/a raise NotImplementedError
349n/a
350n/a def _process_events(self, event_list):
351n/a """Process selector events."""
352n/a raise NotImplementedError
353n/a
354n/a def _check_closed(self):
355n/a if self._closed:
356n/a raise RuntimeError('Event loop is closed')
357n/a
358n/a def _asyncgen_finalizer_hook(self, agen):
359n/a self._asyncgens.discard(agen)
360n/a if not self.is_closed():
361n/a self.create_task(agen.aclose())
362n/a # Wake up the loop if the finalizer was called from
363n/a # a different thread.
364n/a self._write_to_self()
365n/a
366n/a def _asyncgen_firstiter_hook(self, agen):
367n/a if self._asyncgens_shutdown_called:
368n/a warnings.warn(
369n/a "asynchronous generator {!r} was scheduled after "
370n/a "loop.shutdown_asyncgens() call".format(agen),
371n/a ResourceWarning, source=self)
372n/a
373n/a self._asyncgens.add(agen)
374n/a
375n/a @coroutine
376n/a def shutdown_asyncgens(self):
377n/a """Shutdown all active asynchronous generators."""
378n/a self._asyncgens_shutdown_called = True
379n/a
380n/a if self._asyncgens is None or not len(self._asyncgens):
381n/a # If Python version is <3.6 or we don't have any asynchronous
382n/a # generators alive.
383n/a return
384n/a
385n/a closing_agens = list(self._asyncgens)
386n/a self._asyncgens.clear()
387n/a
388n/a shutdown_coro = tasks.gather(
389n/a *[ag.aclose() for ag in closing_agens],
390n/a return_exceptions=True,
391n/a loop=self)
392n/a
393n/a results = yield from shutdown_coro
394n/a for result, agen in zip(results, closing_agens):
395n/a if isinstance(result, Exception):
396n/a self.call_exception_handler({
397n/a 'message': 'an error occurred during closing of '
398n/a 'asynchronous generator {!r}'.format(agen),
399n/a 'exception': result,
400n/a 'asyncgen': agen
401n/a })
402n/a
403n/a def run_forever(self):
404n/a """Run until stop() is called."""
405n/a self._check_closed()
406n/a if self.is_running():
407n/a raise RuntimeError('This event loop is already running')
408n/a if events._get_running_loop() is not None:
409n/a raise RuntimeError(
410n/a 'Cannot run the event loop while another loop is running')
411n/a self._set_coroutine_wrapper(self._debug)
412n/a self._thread_id = threading.get_ident()
413n/a if self._asyncgens is not None:
414n/a old_agen_hooks = sys.get_asyncgen_hooks()
415n/a sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
416n/a finalizer=self._asyncgen_finalizer_hook)
417n/a try:
418n/a events._set_running_loop(self)
419n/a while True:
420n/a self._run_once()
421n/a if self._stopping:
422n/a break
423n/a finally:
424n/a self._stopping = False
425n/a self._thread_id = None
426n/a events._set_running_loop(None)
427n/a self._set_coroutine_wrapper(False)
428n/a if self._asyncgens is not None:
429n/a sys.set_asyncgen_hooks(*old_agen_hooks)
430n/a
431n/a def run_until_complete(self, future):
432n/a """Run until the Future is done.
433n/a
434n/a If the argument is a coroutine, it is wrapped in a Task.
435n/a
436n/a WARNING: It would be disastrous to call run_until_complete()
437n/a with the same coroutine twice -- it would wrap it in two
438n/a different Tasks and that can't be good.
439n/a
440n/a Return the Future's result, or raise its exception.
441n/a """
442n/a self._check_closed()
443n/a
444n/a new_task = not futures.isfuture(future)
445n/a future = tasks.ensure_future(future, loop=self)
446n/a if new_task:
447n/a # An exception is raised if the future didn't complete, so there
448n/a # is no need to log the "destroy pending task" message
449n/a future._log_destroy_pending = False
450n/a
451n/a future.add_done_callback(_run_until_complete_cb)
452n/a try:
453n/a self.run_forever()
454n/a except:
455n/a if new_task and future.done() and not future.cancelled():
456n/a # The coroutine raised a BaseException. Consume the exception
457n/a # to not log a warning, the caller doesn't have access to the
458n/a # local task.
459n/a future.exception()
460n/a raise
461n/a future.remove_done_callback(_run_until_complete_cb)
462n/a if not future.done():
463n/a raise RuntimeError('Event loop stopped before Future completed.')
464n/a
465n/a return future.result()
466n/a
467n/a def stop(self):
468n/a """Stop running the event loop.
469n/a
470n/a Every callback already scheduled will still run. This simply informs
471n/a run_forever to stop looping after a complete iteration.
472n/a """
473n/a self._stopping = True
474n/a
475n/a def close(self):
476n/a """Close the event loop.
477n/a
478n/a This clears the queues and shuts down the executor,
479n/a but does not wait for the executor to finish.
480n/a
481n/a The event loop must not be running.
482n/a """
483n/a if self.is_running():
484n/a raise RuntimeError("Cannot close a running event loop")
485n/a if self._closed:
486n/a return
487n/a if self._debug:
488n/a logger.debug("Close %r", self)
489n/a self._closed = True
490n/a self._ready.clear()
491n/a self._scheduled.clear()
492n/a executor = self._default_executor
493n/a if executor is not None:
494n/a self._default_executor = None
495n/a executor.shutdown(wait=False)
496n/a
497n/a def is_closed(self):
498n/a """Returns True if the event loop was closed."""
499n/a return self._closed
500n/a
501n/a # On Python 3.3 and older, objects with a destructor part of a reference
502n/a # cycle are never destroyed. It's not more the case on Python 3.4 thanks
503n/a # to the PEP 442.
504n/a if compat.PY34:
505n/a def __del__(self):
506n/a if not self.is_closed():
507n/a warnings.warn("unclosed event loop %r" % self, ResourceWarning,
508n/a source=self)
509n/a if not self.is_running():
510n/a self.close()
511n/a
512n/a def is_running(self):
513n/a """Returns True if the event loop is running."""
514n/a return (self._thread_id is not None)
515n/a
516n/a def time(self):
517n/a """Return the time according to the event loop's clock.
518n/a
519n/a This is a float expressed in seconds since an epoch, but the
520n/a epoch, precision, accuracy and drift are unspecified and may
521n/a differ per event loop.
522n/a """
523n/a return time.monotonic()
524n/a
525n/a def call_later(self, delay, callback, *args):
526n/a """Arrange for a callback to be called at a given time.
527n/a
528n/a Return a Handle: an opaque object with a cancel() method that
529n/a can be used to cancel the call.
530n/a
531n/a The delay can be an int or float, expressed in seconds. It is
532n/a always relative to the current time.
533n/a
534n/a Each callback will be called exactly once. If two callbacks
535n/a are scheduled for exactly the same time, it undefined which
536n/a will be called first.
537n/a
538n/a Any positional arguments after the callback will be passed to
539n/a the callback when it is called.
540n/a """
541n/a timer = self.call_at(self.time() + delay, callback, *args)
542n/a if timer._source_traceback:
543n/a del timer._source_traceback[-1]
544n/a return timer
545n/a
546n/a def call_at(self, when, callback, *args):
547n/a """Like call_later(), but uses an absolute time.
548n/a
549n/a Absolute time corresponds to the event loop's time() method.
550n/a """
551n/a self._check_closed()
552n/a if self._debug:
553n/a self._check_thread()
554n/a self._check_callback(callback, 'call_at')
555n/a timer = events.TimerHandle(when, callback, args, self)
556n/a if timer._source_traceback:
557n/a del timer._source_traceback[-1]
558n/a heapq.heappush(self._scheduled, timer)
559n/a timer._scheduled = True
560n/a return timer
561n/a
562n/a def call_soon(self, callback, *args):
563n/a """Arrange for a callback to be called as soon as possible.
564n/a
565n/a This operates as a FIFO queue: callbacks are called in the
566n/a order in which they are registered. Each callback will be
567n/a called exactly once.
568n/a
569n/a Any positional arguments after the callback will be passed to
570n/a the callback when it is called.
571n/a """
572n/a self._check_closed()
573n/a if self._debug:
574n/a self._check_thread()
575n/a self._check_callback(callback, 'call_soon')
576n/a handle = self._call_soon(callback, args)
577n/a if handle._source_traceback:
578n/a del handle._source_traceback[-1]
579n/a return handle
580n/a
581n/a def _check_callback(self, callback, method):
582n/a if (coroutines.iscoroutine(callback) or
583n/a coroutines.iscoroutinefunction(callback)):
584n/a raise TypeError(
585n/a "coroutines cannot be used with {}()".format(method))
586n/a if not callable(callback):
587n/a raise TypeError(
588n/a 'a callable object was expected by {}(), got {!r}'.format(
589n/a method, callback))
590n/a
591n/a
592n/a def _call_soon(self, callback, args):
593n/a handle = events.Handle(callback, args, self)
594n/a if handle._source_traceback:
595n/a del handle._source_traceback[-1]
596n/a self._ready.append(handle)
597n/a return handle
598n/a
599n/a def _check_thread(self):
600n/a """Check that the current thread is the thread running the event loop.
601n/a
602n/a Non-thread-safe methods of this class make this assumption and will
603n/a likely behave incorrectly when the assumption is violated.
604n/a
605n/a Should only be called when (self._debug == True). The caller is
606n/a responsible for checking this condition for performance reasons.
607n/a """
608n/a if self._thread_id is None:
609n/a return
610n/a thread_id = threading.get_ident()
611n/a if thread_id != self._thread_id:
612n/a raise RuntimeError(
613n/a "Non-thread-safe operation invoked on an event loop other "
614n/a "than the current one")
615n/a
616n/a def call_soon_threadsafe(self, callback, *args):
617n/a """Like call_soon(), but thread-safe."""
618n/a self._check_closed()
619n/a if self._debug:
620n/a self._check_callback(callback, 'call_soon_threadsafe')
621n/a handle = self._call_soon(callback, args)
622n/a if handle._source_traceback:
623n/a del handle._source_traceback[-1]
624n/a self._write_to_self()
625n/a return handle
626n/a
627n/a def run_in_executor(self, executor, func, *args):
628n/a self._check_closed()
629n/a if self._debug:
630n/a self._check_callback(func, 'run_in_executor')
631n/a if executor is None:
632n/a executor = self._default_executor
633n/a if executor is None:
634n/a executor = concurrent.futures.ThreadPoolExecutor()
635n/a self._default_executor = executor
636n/a return futures.wrap_future(executor.submit(func, *args), loop=self)
637n/a
638n/a def set_default_executor(self, executor):
639n/a self._default_executor = executor
640n/a
641n/a def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
642n/a msg = ["%s:%r" % (host, port)]
643n/a if family:
644n/a msg.append('family=%r' % family)
645n/a if type:
646n/a msg.append('type=%r' % type)
647n/a if proto:
648n/a msg.append('proto=%r' % proto)
649n/a if flags:
650n/a msg.append('flags=%r' % flags)
651n/a msg = ', '.join(msg)
652n/a logger.debug('Get address info %s', msg)
653n/a
654n/a t0 = self.time()
655n/a addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
656n/a dt = self.time() - t0
657n/a
658n/a msg = ('Getting address info %s took %.3f ms: %r'
659n/a % (msg, dt * 1e3, addrinfo))
660n/a if dt >= self.slow_callback_duration:
661n/a logger.info(msg)
662n/a else:
663n/a logger.debug(msg)
664n/a return addrinfo
665n/a
666n/a def getaddrinfo(self, host, port, *,
667n/a family=0, type=0, proto=0, flags=0):
668n/a if self._debug:
669n/a return self.run_in_executor(None, self._getaddrinfo_debug,
670n/a host, port, family, type, proto, flags)
671n/a else:
672n/a return self.run_in_executor(None, socket.getaddrinfo,
673n/a host, port, family, type, proto, flags)
674n/a
675n/a def getnameinfo(self, sockaddr, flags=0):
676n/a return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
677n/a
678n/a @coroutine
679n/a def create_connection(self, protocol_factory, host=None, port=None, *,
680n/a ssl=None, family=0, proto=0, flags=0, sock=None,
681n/a local_addr=None, server_hostname=None):
682n/a """Connect to a TCP server.
683n/a
684n/a Create a streaming transport connection to a given Internet host and
685n/a port: socket family AF_INET or socket.AF_INET6 depending on host (or
686n/a family if specified), socket type SOCK_STREAM. protocol_factory must be
687n/a a callable returning a protocol instance.
688n/a
689n/a This method is a coroutine which will try to establish the connection
690n/a in the background. When successful, the coroutine returns a
691n/a (transport, protocol) pair.
692n/a """
693n/a if server_hostname is not None and not ssl:
694n/a raise ValueError('server_hostname is only meaningful with ssl')
695n/a
696n/a if server_hostname is None and ssl:
697n/a # Use host as default for server_hostname. It is an error
698n/a # if host is empty or not set, e.g. when an
699n/a # already-connected socket was passed or when only a port
700n/a # is given. To avoid this error, you can pass
701n/a # server_hostname='' -- this will bypass the hostname
702n/a # check. (This also means that if host is a numeric
703n/a # IP/IPv6 address, we will attempt to verify that exact
704n/a # address; this will probably fail, but it is possible to
705n/a # create a certificate for a specific IP address, so we
706n/a # don't judge it here.)
707n/a if not host:
708n/a raise ValueError('You must set server_hostname '
709n/a 'when using ssl without a host')
710n/a server_hostname = host
711n/a
712n/a if host is not None or port is not None:
713n/a if sock is not None:
714n/a raise ValueError(
715n/a 'host/port and sock can not be specified at the same time')
716n/a
717n/a f1 = _ensure_resolved((host, port), family=family,
718n/a type=socket.SOCK_STREAM, proto=proto,
719n/a flags=flags, loop=self)
720n/a fs = [f1]
721n/a if local_addr is not None:
722n/a f2 = _ensure_resolved(local_addr, family=family,
723n/a type=socket.SOCK_STREAM, proto=proto,
724n/a flags=flags, loop=self)
725n/a fs.append(f2)
726n/a else:
727n/a f2 = None
728n/a
729n/a yield from tasks.wait(fs, loop=self)
730n/a
731n/a infos = f1.result()
732n/a if not infos:
733n/a raise OSError('getaddrinfo() returned empty list')
734n/a if f2 is not None:
735n/a laddr_infos = f2.result()
736n/a if not laddr_infos:
737n/a raise OSError('getaddrinfo() returned empty list')
738n/a
739n/a exceptions = []
740n/a for family, type, proto, cname, address in infos:
741n/a try:
742n/a sock = socket.socket(family=family, type=type, proto=proto)
743n/a sock.setblocking(False)
744n/a if f2 is not None:
745n/a for _, _, _, _, laddr in laddr_infos:
746n/a try:
747n/a sock.bind(laddr)
748n/a break
749n/a except OSError as exc:
750n/a exc = OSError(
751n/a exc.errno, 'error while '
752n/a 'attempting to bind on address '
753n/a '{!r}: {}'.format(
754n/a laddr, exc.strerror.lower()))
755n/a exceptions.append(exc)
756n/a else:
757n/a sock.close()
758n/a sock = None
759n/a continue
760n/a if self._debug:
761n/a logger.debug("connect %r to %r", sock, address)
762n/a yield from self.sock_connect(sock, address)
763n/a except OSError as exc:
764n/a if sock is not None:
765n/a sock.close()
766n/a exceptions.append(exc)
767n/a except:
768n/a if sock is not None:
769n/a sock.close()
770n/a raise
771n/a else:
772n/a break
773n/a else:
774n/a if len(exceptions) == 1:
775n/a raise exceptions[0]
776n/a else:
777n/a # If they all have the same str(), raise one.
778n/a model = str(exceptions[0])
779n/a if all(str(exc) == model for exc in exceptions):
780n/a raise exceptions[0]
781n/a # Raise a combined exception so the user can see all
782n/a # the various error messages.
783n/a raise OSError('Multiple exceptions: {}'.format(
784n/a ', '.join(str(exc) for exc in exceptions)))
785n/a
786n/a else:
787n/a if sock is None:
788n/a raise ValueError(
789n/a 'host and port was not specified and no sock specified')
790n/a if not _is_stream_socket(sock):
791n/a # We allow AF_INET, AF_INET6, AF_UNIX as long as they
792n/a # are SOCK_STREAM.
793n/a # We support passing AF_UNIX sockets even though we have
794n/a # a dedicated API for that: create_unix_connection.
795n/a # Disallowing AF_UNIX in this method, breaks backwards
796n/a # compatibility.
797n/a raise ValueError(
798n/a 'A Stream Socket was expected, got {!r}'.format(sock))
799n/a
800n/a transport, protocol = yield from self._create_connection_transport(
801n/a sock, protocol_factory, ssl, server_hostname)
802n/a if self._debug:
803n/a # Get the socket from the transport because SSL transport closes
804n/a # the old socket and creates a new SSL socket
805n/a sock = transport.get_extra_info('socket')
806n/a logger.debug("%r connected to %s:%r: (%r, %r)",
807n/a sock, host, port, transport, protocol)
808n/a return transport, protocol
809n/a
810n/a @coroutine
811n/a def _create_connection_transport(self, sock, protocol_factory, ssl,
812n/a server_hostname, server_side=False):
813n/a
814n/a sock.setblocking(False)
815n/a
816n/a protocol = protocol_factory()
817n/a waiter = self.create_future()
818n/a if ssl:
819n/a sslcontext = None if isinstance(ssl, bool) else ssl
820n/a transport = self._make_ssl_transport(
821n/a sock, protocol, sslcontext, waiter,
822n/a server_side=server_side, server_hostname=server_hostname)
823n/a else:
824n/a transport = self._make_socket_transport(sock, protocol, waiter)
825n/a
826n/a try:
827n/a yield from waiter
828n/a except:
829n/a transport.close()
830n/a raise
831n/a
832n/a return transport, protocol
833n/a
834n/a @coroutine
835n/a def create_datagram_endpoint(self, protocol_factory,
836n/a local_addr=None, remote_addr=None, *,
837n/a family=0, proto=0, flags=0,
838n/a reuse_address=None, reuse_port=None,
839n/a allow_broadcast=None, sock=None):
840n/a """Create datagram connection."""
841n/a if sock is not None:
842n/a if not _is_dgram_socket(sock):
843n/a raise ValueError(
844n/a 'A UDP Socket was expected, got {!r}'.format(sock))
845n/a if (local_addr or remote_addr or
846n/a family or proto or flags or
847n/a reuse_address or reuse_port or allow_broadcast):
848n/a # show the problematic kwargs in exception msg
849n/a opts = dict(local_addr=local_addr, remote_addr=remote_addr,
850n/a family=family, proto=proto, flags=flags,
851n/a reuse_address=reuse_address, reuse_port=reuse_port,
852n/a allow_broadcast=allow_broadcast)
853n/a problems = ', '.join(
854n/a '{}={}'.format(k, v) for k, v in opts.items() if v)
855n/a raise ValueError(
856n/a 'socket modifier keyword arguments can not be used '
857n/a 'when sock is specified. ({})'.format(problems))
858n/a sock.setblocking(False)
859n/a r_addr = None
860n/a else:
861n/a if not (local_addr or remote_addr):
862n/a if family == 0:
863n/a raise ValueError('unexpected address family')
864n/a addr_pairs_info = (((family, proto), (None, None)),)
865n/a else:
866n/a # join address by (family, protocol)
867n/a addr_infos = collections.OrderedDict()
868n/a for idx, addr in ((0, local_addr), (1, remote_addr)):
869n/a if addr is not None:
870n/a assert isinstance(addr, tuple) and len(addr) == 2, (
871n/a '2-tuple is expected')
872n/a
873n/a infos = yield from _ensure_resolved(
874n/a addr, family=family, type=socket.SOCK_DGRAM,
875n/a proto=proto, flags=flags, loop=self)
876n/a if not infos:
877n/a raise OSError('getaddrinfo() returned empty list')
878n/a
879n/a for fam, _, pro, _, address in infos:
880n/a key = (fam, pro)
881n/a if key not in addr_infos:
882n/a addr_infos[key] = [None, None]
883n/a addr_infos[key][idx] = address
884n/a
885n/a # each addr has to have info for each (family, proto) pair
886n/a addr_pairs_info = [
887n/a (key, addr_pair) for key, addr_pair in addr_infos.items()
888n/a if not ((local_addr and addr_pair[0] is None) or
889n/a (remote_addr and addr_pair[1] is None))]
890n/a
891n/a if not addr_pairs_info:
892n/a raise ValueError('can not get address information')
893n/a
894n/a exceptions = []
895n/a
896n/a if reuse_address is None:
897n/a reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
898n/a
899n/a for ((family, proto),
900n/a (local_address, remote_address)) in addr_pairs_info:
901n/a sock = None
902n/a r_addr = None
903n/a try:
904n/a sock = socket.socket(
905n/a family=family, type=socket.SOCK_DGRAM, proto=proto)
906n/a if reuse_address:
907n/a sock.setsockopt(
908n/a socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
909n/a if reuse_port:
910n/a _set_reuseport(sock)
911n/a if allow_broadcast:
912n/a sock.setsockopt(
913n/a socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
914n/a sock.setblocking(False)
915n/a
916n/a if local_addr:
917n/a sock.bind(local_address)
918n/a if remote_addr:
919n/a yield from self.sock_connect(sock, remote_address)
920n/a r_addr = remote_address
921n/a except OSError as exc:
922n/a if sock is not None:
923n/a sock.close()
924n/a exceptions.append(exc)
925n/a except:
926n/a if sock is not None:
927n/a sock.close()
928n/a raise
929n/a else:
930n/a break
931n/a else:
932n/a raise exceptions[0]
933n/a
934n/a protocol = protocol_factory()
935n/a waiter = self.create_future()
936n/a transport = self._make_datagram_transport(
937n/a sock, protocol, r_addr, waiter)
938n/a if self._debug:
939n/a if local_addr:
940n/a logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
941n/a "created: (%r, %r)",
942n/a local_addr, remote_addr, transport, protocol)
943n/a else:
944n/a logger.debug("Datagram endpoint remote_addr=%r created: "
945n/a "(%r, %r)",
946n/a remote_addr, transport, protocol)
947n/a
948n/a try:
949n/a yield from waiter
950n/a except:
951n/a transport.close()
952n/a raise
953n/a
954n/a return transport, protocol
955n/a
956n/a @coroutine
957n/a def _create_server_getaddrinfo(self, host, port, family, flags):
958n/a infos = yield from _ensure_resolved((host, port), family=family,
959n/a type=socket.SOCK_STREAM,
960n/a flags=flags, loop=self)
961n/a if not infos:
962n/a raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
963n/a return infos
964n/a
965n/a @coroutine
966n/a def create_server(self, protocol_factory, host=None, port=None,
967n/a *,
968n/a family=socket.AF_UNSPEC,
969n/a flags=socket.AI_PASSIVE,
970n/a sock=None,
971n/a backlog=100,
972n/a ssl=None,
973n/a reuse_address=None,
974n/a reuse_port=None):
975n/a """Create a TCP server.
976n/a
977n/a The host parameter can be a string, in that case the TCP server is bound
978n/a to host and port.
979n/a
980n/a The host parameter can also be a sequence of strings and in that case
981n/a the TCP server is bound to all hosts of the sequence. If a host
982n/a appears multiple times (possibly indirectly e.g. when hostnames
983n/a resolve to the same IP address), the server is only bound once to that
984n/a host.
985n/a
986n/a Return a Server object which can be used to stop the service.
987n/a
988n/a This method is a coroutine.
989n/a """
990n/a if isinstance(ssl, bool):
991n/a raise TypeError('ssl argument must be an SSLContext or None')
992n/a if host is not None or port is not None:
993n/a if sock is not None:
994n/a raise ValueError(
995n/a 'host/port and sock can not be specified at the same time')
996n/a
997n/a AF_INET6 = getattr(socket, 'AF_INET6', 0)
998n/a if reuse_address is None:
999n/a reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1000n/a sockets = []
1001n/a if host == '':
1002n/a hosts = [None]
1003n/a elif (isinstance(host, str) or
1004n/a not isinstance(host, collections.Iterable)):
1005n/a hosts = [host]
1006n/a else:
1007n/a hosts = host
1008n/a
1009n/a fs = [self._create_server_getaddrinfo(host, port, family=family,
1010n/a flags=flags)
1011n/a for host in hosts]
1012n/a infos = yield from tasks.gather(*fs, loop=self)
1013n/a infos = set(itertools.chain.from_iterable(infos))
1014n/a
1015n/a completed = False
1016n/a try:
1017n/a for res in infos:
1018n/a af, socktype, proto, canonname, sa = res
1019n/a try:
1020n/a sock = socket.socket(af, socktype, proto)
1021n/a except socket.error:
1022n/a # Assume it's a bad family/type/protocol combination.
1023n/a if self._debug:
1024n/a logger.warning('create_server() failed to create '
1025n/a 'socket.socket(%r, %r, %r)',
1026n/a af, socktype, proto, exc_info=True)
1027n/a continue
1028n/a sockets.append(sock)
1029n/a if reuse_address:
1030n/a sock.setsockopt(
1031n/a socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1032n/a if reuse_port:
1033n/a _set_reuseport(sock)
1034n/a # Disable IPv4/IPv6 dual stack support (enabled by
1035n/a # default on Linux) which makes a single socket
1036n/a # listen on both address families.
1037n/a if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
1038n/a sock.setsockopt(socket.IPPROTO_IPV6,
1039n/a socket.IPV6_V6ONLY,
1040n/a True)
1041n/a try:
1042n/a sock.bind(sa)
1043n/a except OSError as err:
1044n/a raise OSError(err.errno, 'error while attempting '
1045n/a 'to bind on address %r: %s'
1046n/a % (sa, err.strerror.lower()))
1047n/a completed = True
1048n/a finally:
1049n/a if not completed:
1050n/a for sock in sockets:
1051n/a sock.close()
1052n/a else:
1053n/a if sock is None:
1054n/a raise ValueError('Neither host/port nor sock were specified')
1055n/a if not _is_stream_socket(sock):
1056n/a raise ValueError(
1057n/a 'A Stream Socket was expected, got {!r}'.format(sock))
1058n/a sockets = [sock]
1059n/a
1060n/a server = Server(self, sockets)
1061n/a for sock in sockets:
1062n/a sock.listen(backlog)
1063n/a sock.setblocking(False)
1064n/a self._start_serving(protocol_factory, sock, ssl, server, backlog)
1065n/a if self._debug:
1066n/a logger.info("%r is serving", server)
1067n/a return server
1068n/a
1069n/a @coroutine
1070n/a def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
1071n/a """Handle an accepted connection.
1072n/a
1073n/a This is used by servers that accept connections outside of
1074n/a asyncio but that use asyncio to handle connections.
1075n/a
1076n/a This method is a coroutine. When completed, the coroutine
1077n/a returns a (transport, protocol) pair.
1078n/a """
1079n/a if not _is_stream_socket(sock):
1080n/a raise ValueError(
1081n/a 'A Stream Socket was expected, got {!r}'.format(sock))
1082n/a
1083n/a transport, protocol = yield from self._create_connection_transport(
1084n/a sock, protocol_factory, ssl, '', server_side=True)
1085n/a if self._debug:
1086n/a # Get the socket from the transport because SSL transport closes
1087n/a # the old socket and creates a new SSL socket
1088n/a sock = transport.get_extra_info('socket')
1089n/a logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1090n/a return transport, protocol
1091n/a
1092n/a @coroutine
1093n/a def connect_read_pipe(self, protocol_factory, pipe):
1094n/a protocol = protocol_factory()
1095n/a waiter = self.create_future()
1096n/a transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1097n/a
1098n/a try:
1099n/a yield from waiter
1100n/a except:
1101n/a transport.close()
1102n/a raise
1103n/a
1104n/a if self._debug:
1105n/a logger.debug('Read pipe %r connected: (%r, %r)',
1106n/a pipe.fileno(), transport, protocol)
1107n/a return transport, protocol
1108n/a
1109n/a @coroutine
1110n/a def connect_write_pipe(self, protocol_factory, pipe):
1111n/a protocol = protocol_factory()
1112n/a waiter = self.create_future()
1113n/a transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1114n/a
1115n/a try:
1116n/a yield from waiter
1117n/a except:
1118n/a transport.close()
1119n/a raise
1120n/a
1121n/a if self._debug:
1122n/a logger.debug('Write pipe %r connected: (%r, %r)',
1123n/a pipe.fileno(), transport, protocol)
1124n/a return transport, protocol
1125n/a
1126n/a def _log_subprocess(self, msg, stdin, stdout, stderr):
1127n/a info = [msg]
1128n/a if stdin is not None:
1129n/a info.append('stdin=%s' % _format_pipe(stdin))
1130n/a if stdout is not None and stderr == subprocess.STDOUT:
1131n/a info.append('stdout=stderr=%s' % _format_pipe(stdout))
1132n/a else:
1133n/a if stdout is not None:
1134n/a info.append('stdout=%s' % _format_pipe(stdout))
1135n/a if stderr is not None:
1136n/a info.append('stderr=%s' % _format_pipe(stderr))
1137n/a logger.debug(' '.join(info))
1138n/a
1139n/a @coroutine
1140n/a def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
1141n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1142n/a universal_newlines=False, shell=True, bufsize=0,
1143n/a **kwargs):
1144n/a if not isinstance(cmd, (bytes, str)):
1145n/a raise ValueError("cmd must be a string")
1146n/a if universal_newlines:
1147n/a raise ValueError("universal_newlines must be False")
1148n/a if not shell:
1149n/a raise ValueError("shell must be True")
1150n/a if bufsize != 0:
1151n/a raise ValueError("bufsize must be 0")
1152n/a protocol = protocol_factory()
1153n/a if self._debug:
1154n/a # don't log parameters: they may contain sensitive information
1155n/a # (password) and may be too long
1156n/a debug_log = 'run shell command %r' % cmd
1157n/a self._log_subprocess(debug_log, stdin, stdout, stderr)
1158n/a transport = yield from self._make_subprocess_transport(
1159n/a protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1160n/a if self._debug:
1161n/a logger.info('%s: %r', debug_log, transport)
1162n/a return transport, protocol
1163n/a
1164n/a @coroutine
1165n/a def subprocess_exec(self, protocol_factory, program, *args,
1166n/a stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1167n/a stderr=subprocess.PIPE, universal_newlines=False,
1168n/a shell=False, bufsize=0, **kwargs):
1169n/a if universal_newlines:
1170n/a raise ValueError("universal_newlines must be False")
1171n/a if shell:
1172n/a raise ValueError("shell must be False")
1173n/a if bufsize != 0:
1174n/a raise ValueError("bufsize must be 0")
1175n/a popen_args = (program,) + args
1176n/a for arg in popen_args:
1177n/a if not isinstance(arg, (str, bytes)):
1178n/a raise TypeError("program arguments must be "
1179n/a "a bytes or text string, not %s"
1180n/a % type(arg).__name__)
1181n/a protocol = protocol_factory()
1182n/a if self._debug:
1183n/a # don't log parameters: they may contain sensitive information
1184n/a # (password) and may be too long
1185n/a debug_log = 'execute program %r' % program
1186n/a self._log_subprocess(debug_log, stdin, stdout, stderr)
1187n/a transport = yield from self._make_subprocess_transport(
1188n/a protocol, popen_args, False, stdin, stdout, stderr,
1189n/a bufsize, **kwargs)
1190n/a if self._debug:
1191n/a logger.info('%s: %r', debug_log, transport)
1192n/a return transport, protocol
1193n/a
1194n/a def get_exception_handler(self):
1195n/a """Return an exception handler, or None if the default one is in use.
1196n/a """
1197n/a return self._exception_handler
1198n/a
1199n/a def set_exception_handler(self, handler):
1200n/a """Set handler as the new event loop exception handler.
1201n/a
1202n/a If handler is None, the default exception handler will
1203n/a be set.
1204n/a
1205n/a If handler is a callable object, it should have a
1206n/a signature matching '(loop, context)', where 'loop'
1207n/a will be a reference to the active event loop, 'context'
1208n/a will be a dict object (see `call_exception_handler()`
1209n/a documentation for details about context).
1210n/a """
1211n/a if handler is not None and not callable(handler):
1212n/a raise TypeError('A callable object or None is expected, '
1213n/a 'got {!r}'.format(handler))
1214n/a self._exception_handler = handler
1215n/a
1216n/a def default_exception_handler(self, context):
1217n/a """Default exception handler.
1218n/a
1219n/a This is called when an exception occurs and no exception
1220n/a handler is set, and can be called by a custom exception
1221n/a handler that wants to defer to the default behavior.
1222n/a
1223n/a The context parameter has the same meaning as in
1224n/a `call_exception_handler()`.
1225n/a """
1226n/a message = context.get('message')
1227n/a if not message:
1228n/a message = 'Unhandled exception in event loop'
1229n/a
1230n/a exception = context.get('exception')
1231n/a if exception is not None:
1232n/a exc_info = (type(exception), exception, exception.__traceback__)
1233n/a else:
1234n/a exc_info = False
1235n/a
1236n/a if ('source_traceback' not in context
1237n/a and self._current_handle is not None
1238n/a and self._current_handle._source_traceback):
1239n/a context['handle_traceback'] = self._current_handle._source_traceback
1240n/a
1241n/a log_lines = [message]
1242n/a for key in sorted(context):
1243n/a if key in {'message', 'exception'}:
1244n/a continue
1245n/a value = context[key]
1246n/a if key == 'source_traceback':
1247n/a tb = ''.join(traceback.format_list(value))
1248n/a value = 'Object created at (most recent call last):\n'
1249n/a value += tb.rstrip()
1250n/a elif key == 'handle_traceback':
1251n/a tb = ''.join(traceback.format_list(value))
1252n/a value = 'Handle created at (most recent call last):\n'
1253n/a value += tb.rstrip()
1254n/a else:
1255n/a value = repr(value)
1256n/a log_lines.append('{}: {}'.format(key, value))
1257n/a
1258n/a logger.error('\n'.join(log_lines), exc_info=exc_info)
1259n/a
1260n/a def call_exception_handler(self, context):
1261n/a """Call the current event loop's exception handler.
1262n/a
1263n/a The context argument is a dict containing the following keys:
1264n/a
1265n/a - 'message': Error message;
1266n/a - 'exception' (optional): Exception object;
1267n/a - 'future' (optional): Future instance;
1268n/a - 'handle' (optional): Handle instance;
1269n/a - 'protocol' (optional): Protocol instance;
1270n/a - 'transport' (optional): Transport instance;
1271n/a - 'socket' (optional): Socket instance;
1272n/a - 'asyncgen' (optional): Asynchronous generator that caused
1273n/a the exception.
1274n/a
1275n/a New keys maybe introduced in the future.
1276n/a
1277n/a Note: do not overload this method in an event loop subclass.
1278n/a For custom exception handling, use the
1279n/a `set_exception_handler()` method.
1280n/a """
1281n/a if self._exception_handler is None:
1282n/a try:
1283n/a self.default_exception_handler(context)
1284n/a except Exception:
1285n/a # Second protection layer for unexpected errors
1286n/a # in the default implementation, as well as for subclassed
1287n/a # event loops with overloaded "default_exception_handler".
1288n/a logger.error('Exception in default exception handler',
1289n/a exc_info=True)
1290n/a else:
1291n/a try:
1292n/a self._exception_handler(self, context)
1293n/a except Exception as exc:
1294n/a # Exception in the user set custom exception handler.
1295n/a try:
1296n/a # Let's try default handler.
1297n/a self.default_exception_handler({
1298n/a 'message': 'Unhandled error in exception handler',
1299n/a 'exception': exc,
1300n/a 'context': context,
1301n/a })
1302n/a except Exception:
1303n/a # Guard 'default_exception_handler' in case it is
1304n/a # overloaded.
1305n/a logger.error('Exception in default exception handler '
1306n/a 'while handling an unexpected error '
1307n/a 'in custom exception handler',
1308n/a exc_info=True)
1309n/a
1310n/a def _add_callback(self, handle):
1311n/a """Add a Handle to _scheduled (TimerHandle) or _ready."""
1312n/a assert isinstance(handle, events.Handle), 'A Handle is required here'
1313n/a if handle._cancelled:
1314n/a return
1315n/a assert not isinstance(handle, events.TimerHandle)
1316n/a self._ready.append(handle)
1317n/a
1318n/a def _add_callback_signalsafe(self, handle):
1319n/a """Like _add_callback() but called from a signal handler."""
1320n/a self._add_callback(handle)
1321n/a self._write_to_self()
1322n/a
1323n/a def _timer_handle_cancelled(self, handle):
1324n/a """Notification that a TimerHandle has been cancelled."""
1325n/a if handle._scheduled:
1326n/a self._timer_cancelled_count += 1
1327n/a
1328n/a def _run_once(self):
1329n/a """Run one full iteration of the event loop.
1330n/a
1331n/a This calls all currently ready callbacks, polls for I/O,
1332n/a schedules the resulting callbacks, and finally schedules
1333n/a 'call_later' callbacks.
1334n/a """
1335n/a
1336n/a sched_count = len(self._scheduled)
1337n/a if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1338n/a self._timer_cancelled_count / sched_count >
1339n/a _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1340n/a # Remove delayed calls that were cancelled if their number
1341n/a # is too high
1342n/a new_scheduled = []
1343n/a for handle in self._scheduled:
1344n/a if handle._cancelled:
1345n/a handle._scheduled = False
1346n/a else:
1347n/a new_scheduled.append(handle)
1348n/a
1349n/a heapq.heapify(new_scheduled)
1350n/a self._scheduled = new_scheduled
1351n/a self._timer_cancelled_count = 0
1352n/a else:
1353n/a # Remove delayed calls that were cancelled from head of queue.
1354n/a while self._scheduled and self._scheduled[0]._cancelled:
1355n/a self._timer_cancelled_count -= 1
1356n/a handle = heapq.heappop(self._scheduled)
1357n/a handle._scheduled = False
1358n/a
1359n/a timeout = None
1360n/a if self._ready or self._stopping:
1361n/a timeout = 0
1362n/a elif self._scheduled:
1363n/a # Compute the desired timeout.
1364n/a when = self._scheduled[0]._when
1365n/a timeout = max(0, when - self.time())
1366n/a
1367n/a if self._debug and timeout != 0:
1368n/a t0 = self.time()
1369n/a event_list = self._selector.select(timeout)
1370n/a dt = self.time() - t0
1371n/a if dt >= 1.0:
1372n/a level = logging.INFO
1373n/a else:
1374n/a level = logging.DEBUG
1375n/a nevent = len(event_list)
1376n/a if timeout is None:
1377n/a logger.log(level, 'poll took %.3f ms: %s events',
1378n/a dt * 1e3, nevent)
1379n/a elif nevent:
1380n/a logger.log(level,
1381n/a 'poll %.3f ms took %.3f ms: %s events',
1382n/a timeout * 1e3, dt * 1e3, nevent)
1383n/a elif dt >= 1.0:
1384n/a logger.log(level,
1385n/a 'poll %.3f ms took %.3f ms: timeout',
1386n/a timeout * 1e3, dt * 1e3)
1387n/a else:
1388n/a event_list = self._selector.select(timeout)
1389n/a self._process_events(event_list)
1390n/a
1391n/a # Handle 'later' callbacks that are ready.
1392n/a end_time = self.time() + self._clock_resolution
1393n/a while self._scheduled:
1394n/a handle = self._scheduled[0]
1395n/a if handle._when >= end_time:
1396n/a break
1397n/a handle = heapq.heappop(self._scheduled)
1398n/a handle._scheduled = False
1399n/a self._ready.append(handle)
1400n/a
1401n/a # This is the only place where callbacks are actually *called*.
1402n/a # All other places just add them to ready.
1403n/a # Note: We run all currently scheduled callbacks, but not any
1404n/a # callbacks scheduled by callbacks run this time around --
1405n/a # they will be run the next time (after another I/O poll).
1406n/a # Use an idiom that is thread-safe without using locks.
1407n/a ntodo = len(self._ready)
1408n/a for i in range(ntodo):
1409n/a handle = self._ready.popleft()
1410n/a if handle._cancelled:
1411n/a continue
1412n/a if self._debug:
1413n/a try:
1414n/a self._current_handle = handle
1415n/a t0 = self.time()
1416n/a handle._run()
1417n/a dt = self.time() - t0
1418n/a if dt >= self.slow_callback_duration:
1419n/a logger.warning('Executing %s took %.3f seconds',
1420n/a _format_handle(handle), dt)
1421n/a finally:
1422n/a self._current_handle = None
1423n/a else:
1424n/a handle._run()
1425n/a handle = None # Needed to break cycles when an exception occurs.
1426n/a
1427n/a def _set_coroutine_wrapper(self, enabled):
1428n/a try:
1429n/a set_wrapper = sys.set_coroutine_wrapper
1430n/a get_wrapper = sys.get_coroutine_wrapper
1431n/a except AttributeError:
1432n/a return
1433n/a
1434n/a enabled = bool(enabled)
1435n/a if self._coroutine_wrapper_set == enabled:
1436n/a return
1437n/a
1438n/a wrapper = coroutines.debug_wrapper
1439n/a current_wrapper = get_wrapper()
1440n/a
1441n/a if enabled:
1442n/a if current_wrapper not in (None, wrapper):
1443n/a warnings.warn(
1444n/a "loop.set_debug(True): cannot set debug coroutine "
1445n/a "wrapper; another wrapper is already set %r" %
1446n/a current_wrapper, RuntimeWarning)
1447n/a else:
1448n/a set_wrapper(wrapper)
1449n/a self._coroutine_wrapper_set = True
1450n/a else:
1451n/a if current_wrapper not in (None, wrapper):
1452n/a warnings.warn(
1453n/a "loop.set_debug(False): cannot unset debug coroutine "
1454n/a "wrapper; another wrapper was set %r" %
1455n/a current_wrapper, RuntimeWarning)
1456n/a else:
1457n/a set_wrapper(None)
1458n/a self._coroutine_wrapper_set = False
1459n/a
1460n/a def get_debug(self):
1461n/a return self._debug
1462n/a
1463n/a def set_debug(self, enabled):
1464n/a self._debug = enabled
1465n/a
1466n/a if self.is_running():
1467n/a self._set_coroutine_wrapper(enabled)