ยปCore Development>Code coverage>Lib/asyncio/windows_events.py

Python code coverage for Lib/asyncio/windows_events.py

#countcontent
1n/a"""Selector and proactor event loops for Windows."""
2n/a
3n/aimport _winapi
4n/aimport errno
5n/aimport math
6n/aimport socket
7n/aimport struct
8n/aimport weakref
9n/a
10n/afrom . import events
11n/afrom . import base_subprocess
12n/afrom . import futures
13n/afrom . import proactor_events
14n/afrom . import selector_events
15n/afrom . import tasks
16n/afrom . import windows_utils
17n/afrom . import _overlapped
18n/afrom .coroutines import coroutine
19n/afrom .log import logger
20n/a
21n/a
22n/a__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23n/a 'DefaultEventLoopPolicy',
24n/a ]
25n/a
26n/a
27n/aNULL = 0
28n/aINFINITE = 0xffffffff
29n/aERROR_CONNECTION_REFUSED = 1225
30n/aERROR_CONNECTION_ABORTED = 1236
31n/a
32n/a# Initial delay in seconds for connect_pipe() before retrying to connect
33n/aCONNECT_PIPE_INIT_DELAY = 0.001
34n/a
35n/a# Maximum delay in seconds for connect_pipe() before retrying to connect
36n/aCONNECT_PIPE_MAX_DELAY = 0.100
37n/a
38n/a
39n/aclass _OverlappedFuture(futures.Future):
40n/a """Subclass of Future which represents an overlapped operation.
41n/a
42n/a Cancelling it will immediately cancel the overlapped operation.
43n/a """
44n/a
45n/a def __init__(self, ov, *, loop=None):
46n/a super().__init__(loop=loop)
47n/a if self._source_traceback:
48n/a del self._source_traceback[-1]
49n/a self._ov = ov
50n/a
51n/a def _repr_info(self):
52n/a info = super()._repr_info()
53n/a if self._ov is not None:
54n/a state = 'pending' if self._ov.pending else 'completed'
55n/a info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
56n/a return info
57n/a
58n/a def _cancel_overlapped(self):
59n/a if self._ov is None:
60n/a return
61n/a try:
62n/a self._ov.cancel()
63n/a except OSError as exc:
64n/a context = {
65n/a 'message': 'Cancelling an overlapped future failed',
66n/a 'exception': exc,
67n/a 'future': self,
68n/a }
69n/a if self._source_traceback:
70n/a context['source_traceback'] = self._source_traceback
71n/a self._loop.call_exception_handler(context)
72n/a self._ov = None
73n/a
74n/a def cancel(self):
75n/a self._cancel_overlapped()
76n/a return super().cancel()
77n/a
78n/a def set_exception(self, exception):
79n/a super().set_exception(exception)
80n/a self._cancel_overlapped()
81n/a
82n/a def set_result(self, result):
83n/a super().set_result(result)
84n/a self._ov = None
85n/a
86n/a
87n/aclass _BaseWaitHandleFuture(futures.Future):
88n/a """Subclass of Future which represents a wait handle."""
89n/a
90n/a def __init__(self, ov, handle, wait_handle, *, loop=None):
91n/a super().__init__(loop=loop)
92n/a if self._source_traceback:
93n/a del self._source_traceback[-1]
94n/a # Keep a reference to the Overlapped object to keep it alive until the
95n/a # wait is unregistered
96n/a self._ov = ov
97n/a self._handle = handle
98n/a self._wait_handle = wait_handle
99n/a
100n/a # Should we call UnregisterWaitEx() if the wait completes
101n/a # or is cancelled?
102n/a self._registered = True
103n/a
104n/a def _poll(self):
105n/a # non-blocking wait: use a timeout of 0 millisecond
106n/a return (_winapi.WaitForSingleObject(self._handle, 0) ==
107n/a _winapi.WAIT_OBJECT_0)
108n/a
109n/a def _repr_info(self):
110n/a info = super()._repr_info()
111n/a info.append('handle=%#x' % self._handle)
112n/a if self._handle is not None:
113n/a state = 'signaled' if self._poll() else 'waiting'
114n/a info.append(state)
115n/a if self._wait_handle is not None:
116n/a info.append('wait_handle=%#x' % self._wait_handle)
117n/a return info
118n/a
119n/a def _unregister_wait_cb(self, fut):
120n/a # The wait was unregistered: it's not safe to destroy the Overlapped
121n/a # object
122n/a self._ov = None
123n/a
124n/a def _unregister_wait(self):
125n/a if not self._registered:
126n/a return
127n/a self._registered = False
128n/a
129n/a wait_handle = self._wait_handle
130n/a self._wait_handle = None
131n/a try:
132n/a _overlapped.UnregisterWait(wait_handle)
133n/a except OSError as exc:
134n/a if exc.winerror != _overlapped.ERROR_IO_PENDING:
135n/a context = {
136n/a 'message': 'Failed to unregister the wait handle',
137n/a 'exception': exc,
138n/a 'future': self,
139n/a }
140n/a if self._source_traceback:
141n/a context['source_traceback'] = self._source_traceback
142n/a self._loop.call_exception_handler(context)
143n/a return
144n/a # ERROR_IO_PENDING means that the unregister is pending
145n/a
146n/a self._unregister_wait_cb(None)
147n/a
148n/a def cancel(self):
149n/a self._unregister_wait()
150n/a return super().cancel()
151n/a
152n/a def set_exception(self, exception):
153n/a self._unregister_wait()
154n/a super().set_exception(exception)
155n/a
156n/a def set_result(self, result):
157n/a self._unregister_wait()
158n/a super().set_result(result)
159n/a
160n/a
161n/aclass _WaitCancelFuture(_BaseWaitHandleFuture):
162n/a """Subclass of Future which represents a wait for the cancellation of a
163n/a _WaitHandleFuture using an event.
164n/a """
165n/a
166n/a def __init__(self, ov, event, wait_handle, *, loop=None):
167n/a super().__init__(ov, event, wait_handle, loop=loop)
168n/a
169n/a self._done_callback = None
170n/a
171n/a def cancel(self):
172n/a raise RuntimeError("_WaitCancelFuture must not be cancelled")
173n/a
174n/a def set_result(self, result):
175n/a super().set_result(result)
176n/a if self._done_callback is not None:
177n/a self._done_callback(self)
178n/a
179n/a def set_exception(self, exception):
180n/a super().set_exception(exception)
181n/a if self._done_callback is not None:
182n/a self._done_callback(self)
183n/a
184n/a
185n/aclass _WaitHandleFuture(_BaseWaitHandleFuture):
186n/a def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
187n/a super().__init__(ov, handle, wait_handle, loop=loop)
188n/a self._proactor = proactor
189n/a self._unregister_proactor = True
190n/a self._event = _overlapped.CreateEvent(None, True, False, None)
191n/a self._event_fut = None
192n/a
193n/a def _unregister_wait_cb(self, fut):
194n/a if self._event is not None:
195n/a _winapi.CloseHandle(self._event)
196n/a self._event = None
197n/a self._event_fut = None
198n/a
199n/a # If the wait was cancelled, the wait may never be signalled, so
200n/a # it's required to unregister it. Otherwise, IocpProactor.close() will
201n/a # wait forever for an event which will never come.
202n/a #
203n/a # If the IocpProactor already received the event, it's safe to call
204n/a # _unregister() because we kept a reference to the Overlapped object
205n/a # which is used as a unique key.
206n/a self._proactor._unregister(self._ov)
207n/a self._proactor = None
208n/a
209n/a super()._unregister_wait_cb(fut)
210n/a
211n/a def _unregister_wait(self):
212n/a if not self._registered:
213n/a return
214n/a self._registered = False
215n/a
216n/a wait_handle = self._wait_handle
217n/a self._wait_handle = None
218n/a try:
219n/a _overlapped.UnregisterWaitEx(wait_handle, self._event)
220n/a except OSError as exc:
221n/a if exc.winerror != _overlapped.ERROR_IO_PENDING:
222n/a context = {
223n/a 'message': 'Failed to unregister the wait handle',
224n/a 'exception': exc,
225n/a 'future': self,
226n/a }
227n/a if self._source_traceback:
228n/a context['source_traceback'] = self._source_traceback
229n/a self._loop.call_exception_handler(context)
230n/a return
231n/a # ERROR_IO_PENDING is not an error, the wait was unregistered
232n/a
233n/a self._event_fut = self._proactor._wait_cancel(self._event,
234n/a self._unregister_wait_cb)
235n/a
236n/a
237n/aclass PipeServer(object):
238n/a """Class representing a pipe server.
239n/a
240n/a This is much like a bound, listening socket.
241n/a """
242n/a def __init__(self, address):
243n/a self._address = address
244n/a self._free_instances = weakref.WeakSet()
245n/a # initialize the pipe attribute before calling _server_pipe_handle()
246n/a # because this function can raise an exception and the destructor calls
247n/a # the close() method
248n/a self._pipe = None
249n/a self._accept_pipe_future = None
250n/a self._pipe = self._server_pipe_handle(True)
251n/a
252n/a def _get_unconnected_pipe(self):
253n/a # Create new instance and return previous one. This ensures
254n/a # that (until the server is closed) there is always at least
255n/a # one pipe handle for address. Therefore if a client attempt
256n/a # to connect it will not fail with FileNotFoundError.
257n/a tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
258n/a return tmp
259n/a
260n/a def _server_pipe_handle(self, first):
261n/a # Return a wrapper for a new pipe handle.
262n/a if self.closed():
263n/a return None
264n/a flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
265n/a if first:
266n/a flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
267n/a h = _winapi.CreateNamedPipe(
268n/a self._address, flags,
269n/a _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
270n/a _winapi.PIPE_WAIT,
271n/a _winapi.PIPE_UNLIMITED_INSTANCES,
272n/a windows_utils.BUFSIZE, windows_utils.BUFSIZE,
273n/a _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
274n/a pipe = windows_utils.PipeHandle(h)
275n/a self._free_instances.add(pipe)
276n/a return pipe
277n/a
278n/a def closed(self):
279n/a return (self._address is None)
280n/a
281n/a def close(self):
282n/a if self._accept_pipe_future is not None:
283n/a self._accept_pipe_future.cancel()
284n/a self._accept_pipe_future = None
285n/a # Close all instances which have not been connected to by a client.
286n/a if self._address is not None:
287n/a for pipe in self._free_instances:
288n/a pipe.close()
289n/a self._pipe = None
290n/a self._address = None
291n/a self._free_instances.clear()
292n/a
293n/a __del__ = close
294n/a
295n/a
296n/aclass _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
297n/a """Windows version of selector event loop."""
298n/a
299n/a def _socketpair(self):
300n/a return windows_utils.socketpair()
301n/a
302n/a
303n/aclass ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304n/a """Windows version of proactor event loop using IOCP."""
305n/a
306n/a def __init__(self, proactor=None):
307n/a if proactor is None:
308n/a proactor = IocpProactor()
309n/a super().__init__(proactor)
310n/a
311n/a def _socketpair(self):
312n/a return windows_utils.socketpair()
313n/a
314n/a @coroutine
315n/a def create_pipe_connection(self, protocol_factory, address):
316n/a f = self._proactor.connect_pipe(address)
317n/a pipe = yield from f
318n/a protocol = protocol_factory()
319n/a trans = self._make_duplex_pipe_transport(pipe, protocol,
320n/a extra={'addr': address})
321n/a return trans, protocol
322n/a
323n/a @coroutine
324n/a def start_serving_pipe(self, protocol_factory, address):
325n/a server = PipeServer(address)
326n/a
327n/a def loop_accept_pipe(f=None):
328n/a pipe = None
329n/a try:
330n/a if f:
331n/a pipe = f.result()
332n/a server._free_instances.discard(pipe)
333n/a
334n/a if server.closed():
335n/a # A client connected before the server was closed:
336n/a # drop the client (close the pipe) and exit
337n/a pipe.close()
338n/a return
339n/a
340n/a protocol = protocol_factory()
341n/a self._make_duplex_pipe_transport(
342n/a pipe, protocol, extra={'addr': address})
343n/a
344n/a pipe = server._get_unconnected_pipe()
345n/a if pipe is None:
346n/a return
347n/a
348n/a f = self._proactor.accept_pipe(pipe)
349n/a except OSError as exc:
350n/a if pipe and pipe.fileno() != -1:
351n/a self.call_exception_handler({
352n/a 'message': 'Pipe accept failed',
353n/a 'exception': exc,
354n/a 'pipe': pipe,
355n/a })
356n/a pipe.close()
357n/a elif self._debug:
358n/a logger.warning("Accept pipe failed on pipe %r",
359n/a pipe, exc_info=True)
360n/a except futures.CancelledError:
361n/a if pipe:
362n/a pipe.close()
363n/a else:
364n/a server._accept_pipe_future = f
365n/a f.add_done_callback(loop_accept_pipe)
366n/a
367n/a self.call_soon(loop_accept_pipe)
368n/a return [server]
369n/a
370n/a @coroutine
371n/a def _make_subprocess_transport(self, protocol, args, shell,
372n/a stdin, stdout, stderr, bufsize,
373n/a extra=None, **kwargs):
374n/a waiter = self.create_future()
375n/a transp = _WindowsSubprocessTransport(self, protocol, args, shell,
376n/a stdin, stdout, stderr, bufsize,
377n/a waiter=waiter, extra=extra,
378n/a **kwargs)
379n/a try:
380n/a yield from waiter
381n/a except Exception as exc:
382n/a # Workaround CPython bug #23353: using yield/yield-from in an
383n/a # except block of a generator doesn't clear properly sys.exc_info()
384n/a err = exc
385n/a else:
386n/a err = None
387n/a
388n/a if err is not None:
389n/a transp.close()
390n/a yield from transp._wait()
391n/a raise err
392n/a
393n/a return transp
394n/a
395n/a
396n/aclass IocpProactor:
397n/a """Proactor implementation using IOCP."""
398n/a
399n/a def __init__(self, concurrency=0xffffffff):
400n/a self._loop = None
401n/a self._results = []
402n/a self._iocp = _overlapped.CreateIoCompletionPort(
403n/a _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
404n/a self._cache = {}
405n/a self._registered = weakref.WeakSet()
406n/a self._unregistered = []
407n/a self._stopped_serving = weakref.WeakSet()
408n/a
409n/a def __repr__(self):
410n/a return ('<%s overlapped#=%s result#=%s>'
411n/a % (self.__class__.__name__, len(self._cache),
412n/a len(self._results)))
413n/a
414n/a def set_loop(self, loop):
415n/a self._loop = loop
416n/a
417n/a def select(self, timeout=None):
418n/a if not self._results:
419n/a self._poll(timeout)
420n/a tmp = self._results
421n/a self._results = []
422n/a return tmp
423n/a
424n/a def _result(self, value):
425n/a fut = self._loop.create_future()
426n/a fut.set_result(value)
427n/a return fut
428n/a
429n/a def recv(self, conn, nbytes, flags=0):
430n/a self._register_with_iocp(conn)
431n/a ov = _overlapped.Overlapped(NULL)
432n/a try:
433n/a if isinstance(conn, socket.socket):
434n/a ov.WSARecv(conn.fileno(), nbytes, flags)
435n/a else:
436n/a ov.ReadFile(conn.fileno(), nbytes)
437n/a except BrokenPipeError:
438n/a return self._result(b'')
439n/a
440n/a def finish_recv(trans, key, ov):
441n/a try:
442n/a return ov.getresult()
443n/a except OSError as exc:
444n/a if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
445n/a raise ConnectionResetError(*exc.args)
446n/a else:
447n/a raise
448n/a
449n/a return self._register(ov, conn, finish_recv)
450n/a
451n/a def send(self, conn, buf, flags=0):
452n/a self._register_with_iocp(conn)
453n/a ov = _overlapped.Overlapped(NULL)
454n/a if isinstance(conn, socket.socket):
455n/a ov.WSASend(conn.fileno(), buf, flags)
456n/a else:
457n/a ov.WriteFile(conn.fileno(), buf)
458n/a
459n/a def finish_send(trans, key, ov):
460n/a try:
461n/a return ov.getresult()
462n/a except OSError as exc:
463n/a if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
464n/a raise ConnectionResetError(*exc.args)
465n/a else:
466n/a raise
467n/a
468n/a return self._register(ov, conn, finish_send)
469n/a
470n/a def accept(self, listener):
471n/a self._register_with_iocp(listener)
472n/a conn = self._get_accept_socket(listener.family)
473n/a ov = _overlapped.Overlapped(NULL)
474n/a ov.AcceptEx(listener.fileno(), conn.fileno())
475n/a
476n/a def finish_accept(trans, key, ov):
477n/a ov.getresult()
478n/a # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
479n/a buf = struct.pack('@P', listener.fileno())
480n/a conn.setsockopt(socket.SOL_SOCKET,
481n/a _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
482n/a conn.settimeout(listener.gettimeout())
483n/a return conn, conn.getpeername()
484n/a
485n/a @coroutine
486n/a def accept_coro(future, conn):
487n/a # Coroutine closing the accept socket if the future is cancelled
488n/a try:
489n/a yield from future
490n/a except futures.CancelledError:
491n/a conn.close()
492n/a raise
493n/a
494n/a future = self._register(ov, listener, finish_accept)
495n/a coro = accept_coro(future, conn)
496n/a tasks.ensure_future(coro, loop=self._loop)
497n/a return future
498n/a
499n/a def connect(self, conn, address):
500n/a self._register_with_iocp(conn)
501n/a # The socket needs to be locally bound before we call ConnectEx().
502n/a try:
503n/a _overlapped.BindLocal(conn.fileno(), conn.family)
504n/a except OSError as e:
505n/a if e.winerror != errno.WSAEINVAL:
506n/a raise
507n/a # Probably already locally bound; check using getsockname().
508n/a if conn.getsockname()[1] == 0:
509n/a raise
510n/a ov = _overlapped.Overlapped(NULL)
511n/a ov.ConnectEx(conn.fileno(), address)
512n/a
513n/a def finish_connect(trans, key, ov):
514n/a ov.getresult()
515n/a # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
516n/a conn.setsockopt(socket.SOL_SOCKET,
517n/a _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
518n/a return conn
519n/a
520n/a return self._register(ov, conn, finish_connect)
521n/a
522n/a def accept_pipe(self, pipe):
523n/a self._register_with_iocp(pipe)
524n/a ov = _overlapped.Overlapped(NULL)
525n/a connected = ov.ConnectNamedPipe(pipe.fileno())
526n/a
527n/a if connected:
528n/a # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
529n/a # that the pipe is connected. There is no need to wait for the
530n/a # completion of the connection.
531n/a return self._result(pipe)
532n/a
533n/a def finish_accept_pipe(trans, key, ov):
534n/a ov.getresult()
535n/a return pipe
536n/a
537n/a return self._register(ov, pipe, finish_accept_pipe)
538n/a
539n/a @coroutine
540n/a def connect_pipe(self, address):
541n/a delay = CONNECT_PIPE_INIT_DELAY
542n/a while True:
543n/a # Unfortunately there is no way to do an overlapped connect to a pipe.
544n/a # Call CreateFile() in a loop until it doesn't fail with
545n/a # ERROR_PIPE_BUSY
546n/a try:
547n/a handle = _overlapped.ConnectPipe(address)
548n/a break
549n/a except OSError as exc:
550n/a if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
551n/a raise
552n/a
553n/a # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
554n/a delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
555n/a yield from tasks.sleep(delay, loop=self._loop)
556n/a
557n/a return windows_utils.PipeHandle(handle)
558n/a
559n/a def wait_for_handle(self, handle, timeout=None):
560n/a """Wait for a handle.
561n/a
562n/a Return a Future object. The result of the future is True if the wait
563n/a completed, or False if the wait did not complete (on timeout).
564n/a """
565n/a return self._wait_for_handle(handle, timeout, False)
566n/a
567n/a def _wait_cancel(self, event, done_callback):
568n/a fut = self._wait_for_handle(event, None, True)
569n/a # add_done_callback() cannot be used because the wait may only complete
570n/a # in IocpProactor.close(), while the event loop is not running.
571n/a fut._done_callback = done_callback
572n/a return fut
573n/a
574n/a def _wait_for_handle(self, handle, timeout, _is_cancel):
575n/a if timeout is None:
576n/a ms = _winapi.INFINITE
577n/a else:
578n/a # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
579n/a # round away from zero to wait *at least* timeout seconds.
580n/a ms = math.ceil(timeout * 1e3)
581n/a
582n/a # We only create ov so we can use ov.address as a key for the cache.
583n/a ov = _overlapped.Overlapped(NULL)
584n/a wait_handle = _overlapped.RegisterWaitWithQueue(
585n/a handle, self._iocp, ov.address, ms)
586n/a if _is_cancel:
587n/a f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
588n/a else:
589n/a f = _WaitHandleFuture(ov, handle, wait_handle, self,
590n/a loop=self._loop)
591n/a if f._source_traceback:
592n/a del f._source_traceback[-1]
593n/a
594n/a def finish_wait_for_handle(trans, key, ov):
595n/a # Note that this second wait means that we should only use
596n/a # this with handles types where a successful wait has no
597n/a # effect. So events or processes are all right, but locks
598n/a # or semaphores are not. Also note if the handle is
599n/a # signalled and then quickly reset, then we may return
600n/a # False even though we have not timed out.
601n/a return f._poll()
602n/a
603n/a self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
604n/a return f
605n/a
606n/a def _register_with_iocp(self, obj):
607n/a # To get notifications of finished ops on this objects sent to the
608n/a # completion port, were must register the handle.
609n/a if obj not in self._registered:
610n/a self._registered.add(obj)
611n/a _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
612n/a # XXX We could also use SetFileCompletionNotificationModes()
613n/a # to avoid sending notifications to completion port of ops
614n/a # that succeed immediately.
615n/a
616n/a def _register(self, ov, obj, callback):
617n/a # Return a future which will be set with the result of the
618n/a # operation when it completes. The future's value is actually
619n/a # the value returned by callback().
620n/a f = _OverlappedFuture(ov, loop=self._loop)
621n/a if f._source_traceback:
622n/a del f._source_traceback[-1]
623n/a if not ov.pending:
624n/a # The operation has completed, so no need to postpone the
625n/a # work. We cannot take this short cut if we need the
626n/a # NumberOfBytes, CompletionKey values returned by
627n/a # PostQueuedCompletionStatus().
628n/a try:
629n/a value = callback(None, None, ov)
630n/a except OSError as e:
631n/a f.set_exception(e)
632n/a else:
633n/a f.set_result(value)
634n/a # Even if GetOverlappedResult() was called, we have to wait for the
635n/a # notification of the completion in GetQueuedCompletionStatus().
636n/a # Register the overlapped operation to keep a reference to the
637n/a # OVERLAPPED object, otherwise the memory is freed and Windows may
638n/a # read uninitialized memory.
639n/a
640n/a # Register the overlapped operation for later. Note that
641n/a # we only store obj to prevent it from being garbage
642n/a # collected too early.
643n/a self._cache[ov.address] = (f, ov, obj, callback)
644n/a return f
645n/a
646n/a def _unregister(self, ov):
647n/a """Unregister an overlapped object.
648n/a
649n/a Call this method when its future has been cancelled. The event can
650n/a already be signalled (pending in the proactor event queue). It is also
651n/a safe if the event is never signalled (because it was cancelled).
652n/a """
653n/a self._unregistered.append(ov)
654n/a
655n/a def _get_accept_socket(self, family):
656n/a s = socket.socket(family)
657n/a s.settimeout(0)
658n/a return s
659n/a
660n/a def _poll(self, timeout=None):
661n/a if timeout is None:
662n/a ms = INFINITE
663n/a elif timeout < 0:
664n/a raise ValueError("negative timeout")
665n/a else:
666n/a # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
667n/a # round away from zero to wait *at least* timeout seconds.
668n/a ms = math.ceil(timeout * 1e3)
669n/a if ms >= INFINITE:
670n/a raise ValueError("timeout too big")
671n/a
672n/a while True:
673n/a status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
674n/a if status is None:
675n/a break
676n/a ms = 0
677n/a
678n/a err, transferred, key, address = status
679n/a try:
680n/a f, ov, obj, callback = self._cache.pop(address)
681n/a except KeyError:
682n/a if self._loop.get_debug():
683n/a self._loop.call_exception_handler({
684n/a 'message': ('GetQueuedCompletionStatus() returned an '
685n/a 'unexpected event'),
686n/a 'status': ('err=%s transferred=%s key=%#x address=%#x'
687n/a % (err, transferred, key, address)),
688n/a })
689n/a
690n/a # key is either zero, or it is used to return a pipe
691n/a # handle which should be closed to avoid a leak.
692n/a if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
693n/a _winapi.CloseHandle(key)
694n/a continue
695n/a
696n/a if obj in self._stopped_serving:
697n/a f.cancel()
698n/a # Don't call the callback if _register() already read the result or
699n/a # if the overlapped has been cancelled
700n/a elif not f.done():
701n/a try:
702n/a value = callback(transferred, key, ov)
703n/a except OSError as e:
704n/a f.set_exception(e)
705n/a self._results.append(f)
706n/a else:
707n/a f.set_result(value)
708n/a self._results.append(f)
709n/a
710n/a # Remove unregisted futures
711n/a for ov in self._unregistered:
712n/a self._cache.pop(ov.address, None)
713n/a self._unregistered.clear()
714n/a
715n/a def _stop_serving(self, obj):
716n/a # obj is a socket or pipe handle. It will be closed in
717n/a # BaseProactorEventLoop._stop_serving() which will make any
718n/a # pending operations fail quickly.
719n/a self._stopped_serving.add(obj)
720n/a
721n/a def close(self):
722n/a # Cancel remaining registered operations.
723n/a for address, (fut, ov, obj, callback) in list(self._cache.items()):
724n/a if fut.cancelled():
725n/a # Nothing to do with cancelled futures
726n/a pass
727n/a elif isinstance(fut, _WaitCancelFuture):
728n/a # _WaitCancelFuture must not be cancelled
729n/a pass
730n/a else:
731n/a try:
732n/a fut.cancel()
733n/a except OSError as exc:
734n/a if self._loop is not None:
735n/a context = {
736n/a 'message': 'Cancelling a future failed',
737n/a 'exception': exc,
738n/a 'future': fut,
739n/a }
740n/a if fut._source_traceback:
741n/a context['source_traceback'] = fut._source_traceback
742n/a self._loop.call_exception_handler(context)
743n/a
744n/a while self._cache:
745n/a if not self._poll(1):
746n/a logger.debug('taking long time to close proactor')
747n/a
748n/a self._results = []
749n/a if self._iocp is not None:
750n/a _winapi.CloseHandle(self._iocp)
751n/a self._iocp = None
752n/a
753n/a def __del__(self):
754n/a self.close()
755n/a
756n/a
757n/aclass _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
758n/a
759n/a def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
760n/a self._proc = windows_utils.Popen(
761n/a args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
762n/a bufsize=bufsize, **kwargs)
763n/a
764n/a def callback(f):
765n/a returncode = self._proc.poll()
766n/a self._process_exited(returncode)
767n/a
768n/a f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
769n/a f.add_done_callback(callback)
770n/a
771n/a
772n/aSelectorEventLoop = _WindowsSelectorEventLoop
773n/a
774n/a
775n/aclass _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
776n/a _loop_factory = SelectorEventLoop
777n/a
778n/a
779n/aDefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy