ยปCore Development>Code coverage>Lib/asyncio/events.py

Python code coverage for Lib/asyncio/events.py

#countcontent
1n/a"""Event loop and event loop policy."""
2n/a
3n/a__all__ = ['AbstractEventLoopPolicy',
4n/a 'AbstractEventLoop', 'AbstractServer',
5n/a 'Handle', 'TimerHandle',
6n/a 'get_event_loop_policy', 'set_event_loop_policy',
7n/a 'get_event_loop', 'set_event_loop', 'new_event_loop',
8n/a 'get_child_watcher', 'set_child_watcher',
9n/a '_set_running_loop', '_get_running_loop',
10n/a ]
11n/a
12n/aimport functools
13n/aimport inspect
14n/aimport reprlib
15n/aimport socket
16n/aimport subprocess
17n/aimport sys
18n/aimport threading
19n/aimport traceback
20n/a
21n/afrom asyncio import compat
22n/a
23n/a
24n/adef _get_function_source(func):
25n/a if compat.PY34:
26n/a func = inspect.unwrap(func)
27n/a elif hasattr(func, '__wrapped__'):
28n/a func = func.__wrapped__
29n/a if inspect.isfunction(func):
30n/a code = func.__code__
31n/a return (code.co_filename, code.co_firstlineno)
32n/a if isinstance(func, functools.partial):
33n/a return _get_function_source(func.func)
34n/a if compat.PY34 and isinstance(func, functools.partialmethod):
35n/a return _get_function_source(func.func)
36n/a return None
37n/a
38n/a
39n/adef _format_args_and_kwargs(args, kwargs):
40n/a """Format function arguments and keyword arguments.
41n/a
42n/a Special case for a single parameter: ('hello',) is formatted as ('hello').
43n/a """
44n/a # use reprlib to limit the length of the output
45n/a items = []
46n/a if args:
47n/a items.extend(reprlib.repr(arg) for arg in args)
48n/a if kwargs:
49n/a items.extend('{}={}'.format(k, reprlib.repr(v))
50n/a for k, v in kwargs.items())
51n/a return '(' + ', '.join(items) + ')'
52n/a
53n/a
54n/adef _format_callback(func, args, kwargs, suffix=''):
55n/a if isinstance(func, functools.partial):
56n/a suffix = _format_args_and_kwargs(args, kwargs) + suffix
57n/a return _format_callback(func.func, func.args, func.keywords, suffix)
58n/a
59n/a if hasattr(func, '__qualname__'):
60n/a func_repr = getattr(func, '__qualname__')
61n/a elif hasattr(func, '__name__'):
62n/a func_repr = getattr(func, '__name__')
63n/a else:
64n/a func_repr = repr(func)
65n/a
66n/a func_repr += _format_args_and_kwargs(args, kwargs)
67n/a if suffix:
68n/a func_repr += suffix
69n/a return func_repr
70n/a
71n/adef _format_callback_source(func, args):
72n/a func_repr = _format_callback(func, args, None)
73n/a source = _get_function_source(func)
74n/a if source:
75n/a func_repr += ' at %s:%s' % source
76n/a return func_repr
77n/a
78n/a
79n/aclass Handle:
80n/a """Object returned by callback registration methods."""
81n/a
82n/a __slots__ = ('_callback', '_args', '_cancelled', '_loop',
83n/a '_source_traceback', '_repr', '__weakref__')
84n/a
85n/a def __init__(self, callback, args, loop):
86n/a self._loop = loop
87n/a self._callback = callback
88n/a self._args = args
89n/a self._cancelled = False
90n/a self._repr = None
91n/a if self._loop.get_debug():
92n/a self._source_traceback = traceback.extract_stack(sys._getframe(1))
93n/a else:
94n/a self._source_traceback = None
95n/a
96n/a def _repr_info(self):
97n/a info = [self.__class__.__name__]
98n/a if self._cancelled:
99n/a info.append('cancelled')
100n/a if self._callback is not None:
101n/a info.append(_format_callback_source(self._callback, self._args))
102n/a if self._source_traceback:
103n/a frame = self._source_traceback[-1]
104n/a info.append('created at %s:%s' % (frame[0], frame[1]))
105n/a return info
106n/a
107n/a def __repr__(self):
108n/a if self._repr is not None:
109n/a return self._repr
110n/a info = self._repr_info()
111n/a return '<%s>' % ' '.join(info)
112n/a
113n/a def cancel(self):
114n/a if not self._cancelled:
115n/a self._cancelled = True
116n/a if self._loop.get_debug():
117n/a # Keep a representation in debug mode to keep callback and
118n/a # parameters. For example, to log the warning
119n/a # "Executing <Handle...> took 2.5 second"
120n/a self._repr = repr(self)
121n/a self._callback = None
122n/a self._args = None
123n/a
124n/a def _run(self):
125n/a try:
126n/a self._callback(*self._args)
127n/a except Exception as exc:
128n/a cb = _format_callback_source(self._callback, self._args)
129n/a msg = 'Exception in callback {}'.format(cb)
130n/a context = {
131n/a 'message': msg,
132n/a 'exception': exc,
133n/a 'handle': self,
134n/a }
135n/a if self._source_traceback:
136n/a context['source_traceback'] = self._source_traceback
137n/a self._loop.call_exception_handler(context)
138n/a self = None # Needed to break cycles when an exception occurs.
139n/a
140n/a
141n/aclass TimerHandle(Handle):
142n/a """Object returned by timed callback registration methods."""
143n/a
144n/a __slots__ = ['_scheduled', '_when']
145n/a
146n/a def __init__(self, when, callback, args, loop):
147n/a assert when is not None
148n/a super().__init__(callback, args, loop)
149n/a if self._source_traceback:
150n/a del self._source_traceback[-1]
151n/a self._when = when
152n/a self._scheduled = False
153n/a
154n/a def _repr_info(self):
155n/a info = super()._repr_info()
156n/a pos = 2 if self._cancelled else 1
157n/a info.insert(pos, 'when=%s' % self._when)
158n/a return info
159n/a
160n/a def __hash__(self):
161n/a return hash(self._when)
162n/a
163n/a def __lt__(self, other):
164n/a return self._when < other._when
165n/a
166n/a def __le__(self, other):
167n/a if self._when < other._when:
168n/a return True
169n/a return self.__eq__(other)
170n/a
171n/a def __gt__(self, other):
172n/a return self._when > other._when
173n/a
174n/a def __ge__(self, other):
175n/a if self._when > other._when:
176n/a return True
177n/a return self.__eq__(other)
178n/a
179n/a def __eq__(self, other):
180n/a if isinstance(other, TimerHandle):
181n/a return (self._when == other._when and
182n/a self._callback == other._callback and
183n/a self._args == other._args and
184n/a self._cancelled == other._cancelled)
185n/a return NotImplemented
186n/a
187n/a def __ne__(self, other):
188n/a equal = self.__eq__(other)
189n/a return NotImplemented if equal is NotImplemented else not equal
190n/a
191n/a def cancel(self):
192n/a if not self._cancelled:
193n/a self._loop._timer_handle_cancelled(self)
194n/a super().cancel()
195n/a
196n/a
197n/aclass AbstractServer:
198n/a """Abstract server returned by create_server()."""
199n/a
200n/a def close(self):
201n/a """Stop serving. This leaves existing connections open."""
202n/a return NotImplemented
203n/a
204n/a def wait_closed(self):
205n/a """Coroutine to wait until service is closed."""
206n/a return NotImplemented
207n/a
208n/a
209n/aclass AbstractEventLoop:
210n/a """Abstract event loop."""
211n/a
212n/a # Running and stopping the event loop.
213n/a
214n/a def run_forever(self):
215n/a """Run the event loop until stop() is called."""
216n/a raise NotImplementedError
217n/a
218n/a def run_until_complete(self, future):
219n/a """Run the event loop until a Future is done.
220n/a
221n/a Return the Future's result, or raise its exception.
222n/a """
223n/a raise NotImplementedError
224n/a
225n/a def stop(self):
226n/a """Stop the event loop as soon as reasonable.
227n/a
228n/a Exactly how soon that is may depend on the implementation, but
229n/a no more I/O callbacks should be scheduled.
230n/a """
231n/a raise NotImplementedError
232n/a
233n/a def is_running(self):
234n/a """Return whether the event loop is currently running."""
235n/a raise NotImplementedError
236n/a
237n/a def is_closed(self):
238n/a """Returns True if the event loop was closed."""
239n/a raise NotImplementedError
240n/a
241n/a def close(self):
242n/a """Close the loop.
243n/a
244n/a The loop should not be running.
245n/a
246n/a This is idempotent and irreversible.
247n/a
248n/a No other methods should be called after this one.
249n/a """
250n/a raise NotImplementedError
251n/a
252n/a def shutdown_asyncgens(self):
253n/a """Shutdown all active asynchronous generators."""
254n/a raise NotImplementedError
255n/a
256n/a # Methods scheduling callbacks. All these return Handles.
257n/a
258n/a def _timer_handle_cancelled(self, handle):
259n/a """Notification that a TimerHandle has been cancelled."""
260n/a raise NotImplementedError
261n/a
262n/a def call_soon(self, callback, *args):
263n/a return self.call_later(0, callback, *args)
264n/a
265n/a def call_later(self, delay, callback, *args):
266n/a raise NotImplementedError
267n/a
268n/a def call_at(self, when, callback, *args):
269n/a raise NotImplementedError
270n/a
271n/a def time(self):
272n/a raise NotImplementedError
273n/a
274n/a def create_future(self):
275n/a raise NotImplementedError
276n/a
277n/a # Method scheduling a coroutine object: create a task.
278n/a
279n/a def create_task(self, coro):
280n/a raise NotImplementedError
281n/a
282n/a # Methods for interacting with threads.
283n/a
284n/a def call_soon_threadsafe(self, callback, *args):
285n/a raise NotImplementedError
286n/a
287n/a def run_in_executor(self, executor, func, *args):
288n/a raise NotImplementedError
289n/a
290n/a def set_default_executor(self, executor):
291n/a raise NotImplementedError
292n/a
293n/a # Network I/O methods returning Futures.
294n/a
295n/a def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
296n/a raise NotImplementedError
297n/a
298n/a def getnameinfo(self, sockaddr, flags=0):
299n/a raise NotImplementedError
300n/a
301n/a def create_connection(self, protocol_factory, host=None, port=None, *,
302n/a ssl=None, family=0, proto=0, flags=0, sock=None,
303n/a local_addr=None, server_hostname=None):
304n/a raise NotImplementedError
305n/a
306n/a def create_server(self, protocol_factory, host=None, port=None, *,
307n/a family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
308n/a sock=None, backlog=100, ssl=None, reuse_address=None,
309n/a reuse_port=None):
310n/a """A coroutine which creates a TCP server bound to host and port.
311n/a
312n/a The return value is a Server object which can be used to stop
313n/a the service.
314n/a
315n/a If host is an empty string or None all interfaces are assumed
316n/a and a list of multiple sockets will be returned (most likely
317n/a one for IPv4 and another one for IPv6). The host parameter can also be a
318n/a sequence (e.g. list) of hosts to bind to.
319n/a
320n/a family can be set to either AF_INET or AF_INET6 to force the
321n/a socket to use IPv4 or IPv6. If not set it will be determined
322n/a from host (defaults to AF_UNSPEC).
323n/a
324n/a flags is a bitmask for getaddrinfo().
325n/a
326n/a sock can optionally be specified in order to use a preexisting
327n/a socket object.
328n/a
329n/a backlog is the maximum number of queued connections passed to
330n/a listen() (defaults to 100).
331n/a
332n/a ssl can be set to an SSLContext to enable SSL over the
333n/a accepted connections.
334n/a
335n/a reuse_address tells the kernel to reuse a local socket in
336n/a TIME_WAIT state, without waiting for its natural timeout to
337n/a expire. If not specified will automatically be set to True on
338n/a UNIX.
339n/a
340n/a reuse_port tells the kernel to allow this endpoint to be bound to
341n/a the same port as other existing endpoints are bound to, so long as
342n/a they all set this flag when being created. This option is not
343n/a supported on Windows.
344n/a """
345n/a raise NotImplementedError
346n/a
347n/a def create_unix_connection(self, protocol_factory, path, *,
348n/a ssl=None, sock=None,
349n/a server_hostname=None):
350n/a raise NotImplementedError
351n/a
352n/a def create_unix_server(self, protocol_factory, path, *,
353n/a sock=None, backlog=100, ssl=None):
354n/a """A coroutine which creates a UNIX Domain Socket server.
355n/a
356n/a The return value is a Server object, which can be used to stop
357n/a the service.
358n/a
359n/a path is a str, representing a file systsem path to bind the
360n/a server socket to.
361n/a
362n/a sock can optionally be specified in order to use a preexisting
363n/a socket object.
364n/a
365n/a backlog is the maximum number of queued connections passed to
366n/a listen() (defaults to 100).
367n/a
368n/a ssl can be set to an SSLContext to enable SSL over the
369n/a accepted connections.
370n/a """
371n/a raise NotImplementedError
372n/a
373n/a def create_datagram_endpoint(self, protocol_factory,
374n/a local_addr=None, remote_addr=None, *,
375n/a family=0, proto=0, flags=0,
376n/a reuse_address=None, reuse_port=None,
377n/a allow_broadcast=None, sock=None):
378n/a """A coroutine which creates a datagram endpoint.
379n/a
380n/a This method will try to establish the endpoint in the background.
381n/a When successful, the coroutine returns a (transport, protocol) pair.
382n/a
383n/a protocol_factory must be a callable returning a protocol instance.
384n/a
385n/a socket family AF_INET or socket.AF_INET6 depending on host (or
386n/a family if specified), socket type SOCK_DGRAM.
387n/a
388n/a reuse_address tells the kernel to reuse a local socket in
389n/a TIME_WAIT state, without waiting for its natural timeout to
390n/a expire. If not specified it will automatically be set to True on
391n/a UNIX.
392n/a
393n/a reuse_port tells the kernel to allow this endpoint to be bound to
394n/a the same port as other existing endpoints are bound to, so long as
395n/a they all set this flag when being created. This option is not
396n/a supported on Windows and some UNIX's. If the
397n/a :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
398n/a capability is unsupported.
399n/a
400n/a allow_broadcast tells the kernel to allow this endpoint to send
401n/a messages to the broadcast address.
402n/a
403n/a sock can optionally be specified in order to use a preexisting
404n/a socket object.
405n/a """
406n/a raise NotImplementedError
407n/a
408n/a # Pipes and subprocesses.
409n/a
410n/a def connect_read_pipe(self, protocol_factory, pipe):
411n/a """Register read pipe in event loop. Set the pipe to non-blocking mode.
412n/a
413n/a protocol_factory should instantiate object with Protocol interface.
414n/a pipe is a file-like object.
415n/a Return pair (transport, protocol), where transport supports the
416n/a ReadTransport interface."""
417n/a # The reason to accept file-like object instead of just file descriptor
418n/a # is: we need to own pipe and close it at transport finishing
419n/a # Can got complicated errors if pass f.fileno(),
420n/a # close fd in pipe transport then close f and vise versa.
421n/a raise NotImplementedError
422n/a
423n/a def connect_write_pipe(self, protocol_factory, pipe):
424n/a """Register write pipe in event loop.
425n/a
426n/a protocol_factory should instantiate object with BaseProtocol interface.
427n/a Pipe is file-like object already switched to nonblocking.
428n/a Return pair (transport, protocol), where transport support
429n/a WriteTransport interface."""
430n/a # The reason to accept file-like object instead of just file descriptor
431n/a # is: we need to own pipe and close it at transport finishing
432n/a # Can got complicated errors if pass f.fileno(),
433n/a # close fd in pipe transport then close f and vise versa.
434n/a raise NotImplementedError
435n/a
436n/a def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
437n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE,
438n/a **kwargs):
439n/a raise NotImplementedError
440n/a
441n/a def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
442n/a stdout=subprocess.PIPE, stderr=subprocess.PIPE,
443n/a **kwargs):
444n/a raise NotImplementedError
445n/a
446n/a # Ready-based callback registration methods.
447n/a # The add_*() methods return None.
448n/a # The remove_*() methods return True if something was removed,
449n/a # False if there was nothing to delete.
450n/a
451n/a def add_reader(self, fd, callback, *args):
452n/a raise NotImplementedError
453n/a
454n/a def remove_reader(self, fd):
455n/a raise NotImplementedError
456n/a
457n/a def add_writer(self, fd, callback, *args):
458n/a raise NotImplementedError
459n/a
460n/a def remove_writer(self, fd):
461n/a raise NotImplementedError
462n/a
463n/a # Completion based I/O methods returning Futures.
464n/a
465n/a def sock_recv(self, sock, nbytes):
466n/a raise NotImplementedError
467n/a
468n/a def sock_sendall(self, sock, data):
469n/a raise NotImplementedError
470n/a
471n/a def sock_connect(self, sock, address):
472n/a raise NotImplementedError
473n/a
474n/a def sock_accept(self, sock):
475n/a raise NotImplementedError
476n/a
477n/a # Signal handling.
478n/a
479n/a def add_signal_handler(self, sig, callback, *args):
480n/a raise NotImplementedError
481n/a
482n/a def remove_signal_handler(self, sig):
483n/a raise NotImplementedError
484n/a
485n/a # Task factory.
486n/a
487n/a def set_task_factory(self, factory):
488n/a raise NotImplementedError
489n/a
490n/a def get_task_factory(self):
491n/a raise NotImplementedError
492n/a
493n/a # Error handlers.
494n/a
495n/a def get_exception_handler(self):
496n/a raise NotImplementedError
497n/a
498n/a def set_exception_handler(self, handler):
499n/a raise NotImplementedError
500n/a
501n/a def default_exception_handler(self, context):
502n/a raise NotImplementedError
503n/a
504n/a def call_exception_handler(self, context):
505n/a raise NotImplementedError
506n/a
507n/a # Debug flag management.
508n/a
509n/a def get_debug(self):
510n/a raise NotImplementedError
511n/a
512n/a def set_debug(self, enabled):
513n/a raise NotImplementedError
514n/a
515n/a
516n/aclass AbstractEventLoopPolicy:
517n/a """Abstract policy for accessing the event loop."""
518n/a
519n/a def get_event_loop(self):
520n/a """Get the event loop for the current context.
521n/a
522n/a Returns an event loop object implementing the BaseEventLoop interface,
523n/a or raises an exception in case no event loop has been set for the
524n/a current context and the current policy does not specify to create one.
525n/a
526n/a It should never return None."""
527n/a raise NotImplementedError
528n/a
529n/a def set_event_loop(self, loop):
530n/a """Set the event loop for the current context to loop."""
531n/a raise NotImplementedError
532n/a
533n/a def new_event_loop(self):
534n/a """Create and return a new event loop object according to this
535n/a policy's rules. If there's need to set this loop as the event loop for
536n/a the current context, set_event_loop must be called explicitly."""
537n/a raise NotImplementedError
538n/a
539n/a # Child processes handling (Unix only).
540n/a
541n/a def get_child_watcher(self):
542n/a "Get the watcher for child processes."
543n/a raise NotImplementedError
544n/a
545n/a def set_child_watcher(self, watcher):
546n/a """Set the watcher for child processes."""
547n/a raise NotImplementedError
548n/a
549n/a
550n/aclass BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
551n/a """Default policy implementation for accessing the event loop.
552n/a
553n/a In this policy, each thread has its own event loop. However, we
554n/a only automatically create an event loop by default for the main
555n/a thread; other threads by default have no event loop.
556n/a
557n/a Other policies may have different rules (e.g. a single global
558n/a event loop, or automatically creating an event loop per thread, or
559n/a using some other notion of context to which an event loop is
560n/a associated).
561n/a """
562n/a
563n/a _loop_factory = None
564n/a
565n/a class _Local(threading.local):
566n/a _loop = None
567n/a _set_called = False
568n/a
569n/a def __init__(self):
570n/a self._local = self._Local()
571n/a
572n/a def get_event_loop(self):
573n/a """Get the event loop.
574n/a
575n/a This may be None or an instance of EventLoop.
576n/a """
577n/a if (self._local._loop is None and
578n/a not self._local._set_called and
579n/a isinstance(threading.current_thread(), threading._MainThread)):
580n/a self.set_event_loop(self.new_event_loop())
581n/a if self._local._loop is None:
582n/a raise RuntimeError('There is no current event loop in thread %r.'
583n/a % threading.current_thread().name)
584n/a return self._local._loop
585n/a
586n/a def set_event_loop(self, loop):
587n/a """Set the event loop."""
588n/a self._local._set_called = True
589n/a assert loop is None or isinstance(loop, AbstractEventLoop)
590n/a self._local._loop = loop
591n/a
592n/a def new_event_loop(self):
593n/a """Create a new event loop.
594n/a
595n/a You must call set_event_loop() to make this the current event
596n/a loop.
597n/a """
598n/a return self._loop_factory()
599n/a
600n/a
601n/a# Event loop policy. The policy itself is always global, even if the
602n/a# policy's rules say that there is an event loop per thread (or other
603n/a# notion of context). The default policy is installed by the first
604n/a# call to get_event_loop_policy().
605n/a_event_loop_policy = None
606n/a
607n/a# Lock for protecting the on-the-fly creation of the event loop policy.
608n/a_lock = threading.Lock()
609n/a
610n/a
611n/a# A TLS for the running event loop, used by _get_running_loop.
612n/aclass _RunningLoop(threading.local):
613n/a _loop = None
614n/a_running_loop = _RunningLoop()
615n/a
616n/a
617n/adef _get_running_loop():
618n/a """Return the running event loop or None.
619n/a
620n/a This is a low-level function intended to be used by event loops.
621n/a This function is thread-specific.
622n/a """
623n/a return _running_loop._loop
624n/a
625n/a
626n/adef _set_running_loop(loop):
627n/a """Set the running event loop.
628n/a
629n/a This is a low-level function intended to be used by event loops.
630n/a This function is thread-specific.
631n/a """
632n/a _running_loop._loop = loop
633n/a
634n/a
635n/adef _init_event_loop_policy():
636n/a global _event_loop_policy
637n/a with _lock:
638n/a if _event_loop_policy is None: # pragma: no branch
639n/a from . import DefaultEventLoopPolicy
640n/a _event_loop_policy = DefaultEventLoopPolicy()
641n/a
642n/a
643n/adef get_event_loop_policy():
644n/a """Get the current event loop policy."""
645n/a if _event_loop_policy is None:
646n/a _init_event_loop_policy()
647n/a return _event_loop_policy
648n/a
649n/a
650n/adef set_event_loop_policy(policy):
651n/a """Set the current event loop policy.
652n/a
653n/a If policy is None, the default policy is restored."""
654n/a global _event_loop_policy
655n/a assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
656n/a _event_loop_policy = policy
657n/a
658n/a
659n/adef get_event_loop():
660n/a """Return an asyncio event loop.
661n/a
662n/a When called from a coroutine or a callback (e.g. scheduled with call_soon
663n/a or similar API), this function will always return the running event loop.
664n/a
665n/a If there is no running event loop set, the function will return
666n/a the result of `get_event_loop_policy().get_event_loop()` call.
667n/a """
668n/a current_loop = _get_running_loop()
669n/a if current_loop is not None:
670n/a return current_loop
671n/a return get_event_loop_policy().get_event_loop()
672n/a
673n/a
674n/adef set_event_loop(loop):
675n/a """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
676n/a get_event_loop_policy().set_event_loop(loop)
677n/a
678n/a
679n/adef new_event_loop():
680n/a """Equivalent to calling get_event_loop_policy().new_event_loop()."""
681n/a return get_event_loop_policy().new_event_loop()
682n/a
683n/a
684n/adef get_child_watcher():
685n/a """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
686n/a return get_event_loop_policy().get_child_watcher()
687n/a
688n/a
689n/adef set_child_watcher(watcher):
690n/a """Equivalent to calling
691n/a get_event_loop_policy().set_child_watcher(watcher)."""
692n/a return get_event_loop_policy().set_child_watcher(watcher)