ยปCore Development>Code coverage>Lib/test/test_asyncio/test_events.py

Python code coverage for Lib/test/test_asyncio/test_events.py

#countcontent
1n/a"""Tests for events.py."""
2n/a
3n/aimport collections.abc
4n/aimport functools
5n/aimport gc
6n/aimport io
7n/aimport os
8n/aimport platform
9n/aimport re
10n/aimport signal
11n/aimport socket
12n/atry:
13n/a import ssl
14n/aexcept ImportError:
15n/a ssl = None
16n/aimport subprocess
17n/aimport sys
18n/aimport threading
19n/aimport time
20n/aimport errno
21n/aimport unittest
22n/afrom unittest import mock
23n/aimport weakref
24n/a
25n/aif sys.platform != 'win32':
26n/a import tty
27n/a
28n/aimport asyncio
29n/afrom asyncio import coroutines
30n/afrom asyncio import proactor_events
31n/afrom asyncio import selector_events
32n/afrom asyncio import sslproto
33n/afrom asyncio import test_utils
34n/atry:
35n/a from test import support
36n/aexcept ImportError:
37n/a from asyncio import test_support as support
38n/a
39n/a
40n/adef data_file(filename):
41n/a if hasattr(support, 'TEST_HOME_DIR'):
42n/a fullname = os.path.join(support.TEST_HOME_DIR, filename)
43n/a if os.path.isfile(fullname):
44n/a return fullname
45n/a fullname = os.path.join(os.path.dirname(__file__), filename)
46n/a if os.path.isfile(fullname):
47n/a return fullname
48n/a raise FileNotFoundError(filename)
49n/a
50n/a
51n/adef osx_tiger():
52n/a """Return True if the platform is Mac OS 10.4 or older."""
53n/a if sys.platform != 'darwin':
54n/a return False
55n/a version = platform.mac_ver()[0]
56n/a version = tuple(map(int, version.split('.')))
57n/a return version < (10, 5)
58n/a
59n/a
60n/aONLYCERT = data_file('ssl_cert.pem')
61n/aONLYKEY = data_file('ssl_key.pem')
62n/aSIGNED_CERTFILE = data_file('keycert3.pem')
63n/aSIGNING_CA = data_file('pycacert.pem')
64n/aPEERCERT = {'serialNumber': 'B09264B1F2DA21D1',
65n/a 'version': 1,
66n/a 'subject': ((('countryName', 'XY'),),
67n/a (('localityName', 'Castle Anthrax'),),
68n/a (('organizationName', 'Python Software Foundation'),),
69n/a (('commonName', 'localhost'),)),
70n/a 'issuer': ((('countryName', 'XY'),),
71n/a (('organizationName', 'Python Software Foundation CA'),),
72n/a (('commonName', 'our-ca-server'),)),
73n/a 'notAfter': 'Nov 13 19:47:07 2022 GMT',
74n/a 'notBefore': 'Jan 4 19:47:07 2013 GMT'}
75n/a
76n/a
77n/aclass MyBaseProto(asyncio.Protocol):
78n/a connected = None
79n/a done = None
80n/a
81n/a def __init__(self, loop=None):
82n/a self.transport = None
83n/a self.state = 'INITIAL'
84n/a self.nbytes = 0
85n/a if loop is not None:
86n/a self.connected = asyncio.Future(loop=loop)
87n/a self.done = asyncio.Future(loop=loop)
88n/a
89n/a def connection_made(self, transport):
90n/a self.transport = transport
91n/a assert self.state == 'INITIAL', self.state
92n/a self.state = 'CONNECTED'
93n/a if self.connected:
94n/a self.connected.set_result(None)
95n/a
96n/a def data_received(self, data):
97n/a assert self.state == 'CONNECTED', self.state
98n/a self.nbytes += len(data)
99n/a
100n/a def eof_received(self):
101n/a assert self.state == 'CONNECTED', self.state
102n/a self.state = 'EOF'
103n/a
104n/a def connection_lost(self, exc):
105n/a assert self.state in ('CONNECTED', 'EOF'), self.state
106n/a self.state = 'CLOSED'
107n/a if self.done:
108n/a self.done.set_result(None)
109n/a
110n/a
111n/aclass MyProto(MyBaseProto):
112n/a def connection_made(self, transport):
113n/a super().connection_made(transport)
114n/a transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
115n/a
116n/a
117n/aclass MyDatagramProto(asyncio.DatagramProtocol):
118n/a done = None
119n/a
120n/a def __init__(self, loop=None):
121n/a self.state = 'INITIAL'
122n/a self.nbytes = 0
123n/a if loop is not None:
124n/a self.done = asyncio.Future(loop=loop)
125n/a
126n/a def connection_made(self, transport):
127n/a self.transport = transport
128n/a assert self.state == 'INITIAL', self.state
129n/a self.state = 'INITIALIZED'
130n/a
131n/a def datagram_received(self, data, addr):
132n/a assert self.state == 'INITIALIZED', self.state
133n/a self.nbytes += len(data)
134n/a
135n/a def error_received(self, exc):
136n/a assert self.state == 'INITIALIZED', self.state
137n/a
138n/a def connection_lost(self, exc):
139n/a assert self.state == 'INITIALIZED', self.state
140n/a self.state = 'CLOSED'
141n/a if self.done:
142n/a self.done.set_result(None)
143n/a
144n/a
145n/aclass MyReadPipeProto(asyncio.Protocol):
146n/a done = None
147n/a
148n/a def __init__(self, loop=None):
149n/a self.state = ['INITIAL']
150n/a self.nbytes = 0
151n/a self.transport = None
152n/a if loop is not None:
153n/a self.done = asyncio.Future(loop=loop)
154n/a
155n/a def connection_made(self, transport):
156n/a self.transport = transport
157n/a assert self.state == ['INITIAL'], self.state
158n/a self.state.append('CONNECTED')
159n/a
160n/a def data_received(self, data):
161n/a assert self.state == ['INITIAL', 'CONNECTED'], self.state
162n/a self.nbytes += len(data)
163n/a
164n/a def eof_received(self):
165n/a assert self.state == ['INITIAL', 'CONNECTED'], self.state
166n/a self.state.append('EOF')
167n/a
168n/a def connection_lost(self, exc):
169n/a if 'EOF' not in self.state:
170n/a self.state.append('EOF') # It is okay if EOF is missed.
171n/a assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
172n/a self.state.append('CLOSED')
173n/a if self.done:
174n/a self.done.set_result(None)
175n/a
176n/a
177n/aclass MyWritePipeProto(asyncio.BaseProtocol):
178n/a done = None
179n/a
180n/a def __init__(self, loop=None):
181n/a self.state = 'INITIAL'
182n/a self.transport = None
183n/a if loop is not None:
184n/a self.done = asyncio.Future(loop=loop)
185n/a
186n/a def connection_made(self, transport):
187n/a self.transport = transport
188n/a assert self.state == 'INITIAL', self.state
189n/a self.state = 'CONNECTED'
190n/a
191n/a def connection_lost(self, exc):
192n/a assert self.state == 'CONNECTED', self.state
193n/a self.state = 'CLOSED'
194n/a if self.done:
195n/a self.done.set_result(None)
196n/a
197n/a
198n/aclass MySubprocessProtocol(asyncio.SubprocessProtocol):
199n/a
200n/a def __init__(self, loop):
201n/a self.state = 'INITIAL'
202n/a self.transport = None
203n/a self.connected = asyncio.Future(loop=loop)
204n/a self.completed = asyncio.Future(loop=loop)
205n/a self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)}
206n/a self.data = {1: b'', 2: b''}
207n/a self.returncode = None
208n/a self.got_data = {1: asyncio.Event(loop=loop),
209n/a 2: asyncio.Event(loop=loop)}
210n/a
211n/a def connection_made(self, transport):
212n/a self.transport = transport
213n/a assert self.state == 'INITIAL', self.state
214n/a self.state = 'CONNECTED'
215n/a self.connected.set_result(None)
216n/a
217n/a def connection_lost(self, exc):
218n/a assert self.state == 'CONNECTED', self.state
219n/a self.state = 'CLOSED'
220n/a self.completed.set_result(None)
221n/a
222n/a def pipe_data_received(self, fd, data):
223n/a assert self.state == 'CONNECTED', self.state
224n/a self.data[fd] += data
225n/a self.got_data[fd].set()
226n/a
227n/a def pipe_connection_lost(self, fd, exc):
228n/a assert self.state == 'CONNECTED', self.state
229n/a if exc:
230n/a self.disconnects[fd].set_exception(exc)
231n/a else:
232n/a self.disconnects[fd].set_result(exc)
233n/a
234n/a def process_exited(self):
235n/a assert self.state == 'CONNECTED', self.state
236n/a self.returncode = self.transport.get_returncode()
237n/a
238n/a
239n/aclass EventLoopTestsMixin:
240n/a
241n/a def setUp(self):
242n/a super().setUp()
243n/a self.loop = self.create_event_loop()
244n/a self.set_event_loop(self.loop)
245n/a
246n/a def tearDown(self):
247n/a # just in case if we have transport close callbacks
248n/a if not self.loop.is_closed():
249n/a test_utils.run_briefly(self.loop)
250n/a
251n/a self.loop.close()
252n/a gc.collect()
253n/a super().tearDown()
254n/a
255n/a def test_run_until_complete_nesting(self):
256n/a @asyncio.coroutine
257n/a def coro1():
258n/a yield
259n/a
260n/a @asyncio.coroutine
261n/a def coro2():
262n/a self.assertTrue(self.loop.is_running())
263n/a self.loop.run_until_complete(coro1())
264n/a
265n/a self.assertRaises(
266n/a RuntimeError, self.loop.run_until_complete, coro2())
267n/a
268n/a # Note: because of the default Windows timing granularity of
269n/a # 15.6 msec, we use fairly long sleep times here (~100 msec).
270n/a
271n/a def test_run_until_complete(self):
272n/a t0 = self.loop.time()
273n/a self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
274n/a t1 = self.loop.time()
275n/a self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
276n/a
277n/a def test_run_until_complete_stopped(self):
278n/a @asyncio.coroutine
279n/a def cb():
280n/a self.loop.stop()
281n/a yield from asyncio.sleep(0.1, loop=self.loop)
282n/a task = cb()
283n/a self.assertRaises(RuntimeError,
284n/a self.loop.run_until_complete, task)
285n/a
286n/a def test_call_later(self):
287n/a results = []
288n/a
289n/a def callback(arg):
290n/a results.append(arg)
291n/a self.loop.stop()
292n/a
293n/a self.loop.call_later(0.1, callback, 'hello world')
294n/a t0 = time.monotonic()
295n/a self.loop.run_forever()
296n/a t1 = time.monotonic()
297n/a self.assertEqual(results, ['hello world'])
298n/a self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
299n/a
300n/a def test_call_soon(self):
301n/a results = []
302n/a
303n/a def callback(arg1, arg2):
304n/a results.append((arg1, arg2))
305n/a self.loop.stop()
306n/a
307n/a self.loop.call_soon(callback, 'hello', 'world')
308n/a self.loop.run_forever()
309n/a self.assertEqual(results, [('hello', 'world')])
310n/a
311n/a def test_call_soon_threadsafe(self):
312n/a results = []
313n/a lock = threading.Lock()
314n/a
315n/a def callback(arg):
316n/a results.append(arg)
317n/a if len(results) >= 2:
318n/a self.loop.stop()
319n/a
320n/a def run_in_thread():
321n/a self.loop.call_soon_threadsafe(callback, 'hello')
322n/a lock.release()
323n/a
324n/a lock.acquire()
325n/a t = threading.Thread(target=run_in_thread)
326n/a t.start()
327n/a
328n/a with lock:
329n/a self.loop.call_soon(callback, 'world')
330n/a self.loop.run_forever()
331n/a t.join()
332n/a self.assertEqual(results, ['hello', 'world'])
333n/a
334n/a def test_call_soon_threadsafe_same_thread(self):
335n/a results = []
336n/a
337n/a def callback(arg):
338n/a results.append(arg)
339n/a if len(results) >= 2:
340n/a self.loop.stop()
341n/a
342n/a self.loop.call_soon_threadsafe(callback, 'hello')
343n/a self.loop.call_soon(callback, 'world')
344n/a self.loop.run_forever()
345n/a self.assertEqual(results, ['hello', 'world'])
346n/a
347n/a def test_run_in_executor(self):
348n/a def run(arg):
349n/a return (arg, threading.get_ident())
350n/a f2 = self.loop.run_in_executor(None, run, 'yo')
351n/a res, thread_id = self.loop.run_until_complete(f2)
352n/a self.assertEqual(res, 'yo')
353n/a self.assertNotEqual(thread_id, threading.get_ident())
354n/a
355n/a def test_reader_callback(self):
356n/a r, w = test_utils.socketpair()
357n/a r.setblocking(False)
358n/a bytes_read = bytearray()
359n/a
360n/a def reader():
361n/a try:
362n/a data = r.recv(1024)
363n/a except BlockingIOError:
364n/a # Spurious readiness notifications are possible
365n/a # at least on Linux -- see man select.
366n/a return
367n/a if data:
368n/a bytes_read.extend(data)
369n/a else:
370n/a self.assertTrue(self.loop.remove_reader(r.fileno()))
371n/a r.close()
372n/a
373n/a self.loop.add_reader(r.fileno(), reader)
374n/a self.loop.call_soon(w.send, b'abc')
375n/a test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
376n/a self.loop.call_soon(w.send, b'def')
377n/a test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
378n/a self.loop.call_soon(w.close)
379n/a self.loop.call_soon(self.loop.stop)
380n/a self.loop.run_forever()
381n/a self.assertEqual(bytes_read, b'abcdef')
382n/a
383n/a def test_writer_callback(self):
384n/a r, w = test_utils.socketpair()
385n/a w.setblocking(False)
386n/a
387n/a def writer(data):
388n/a w.send(data)
389n/a self.loop.stop()
390n/a
391n/a data = b'x' * 1024
392n/a self.loop.add_writer(w.fileno(), writer, data)
393n/a self.loop.run_forever()
394n/a
395n/a self.assertTrue(self.loop.remove_writer(w.fileno()))
396n/a self.assertFalse(self.loop.remove_writer(w.fileno()))
397n/a
398n/a w.close()
399n/a read = r.recv(len(data) * 2)
400n/a r.close()
401n/a self.assertEqual(read, data)
402n/a
403n/a def _basetest_sock_client_ops(self, httpd, sock):
404n/a if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
405n/a # in debug mode, socket operations must fail
406n/a # if the socket is not in blocking mode
407n/a self.loop.set_debug(True)
408n/a sock.setblocking(True)
409n/a with self.assertRaises(ValueError):
410n/a self.loop.run_until_complete(
411n/a self.loop.sock_connect(sock, httpd.address))
412n/a with self.assertRaises(ValueError):
413n/a self.loop.run_until_complete(
414n/a self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
415n/a with self.assertRaises(ValueError):
416n/a self.loop.run_until_complete(
417n/a self.loop.sock_recv(sock, 1024))
418n/a with self.assertRaises(ValueError):
419n/a self.loop.run_until_complete(
420n/a self.loop.sock_accept(sock))
421n/a
422n/a # test in non-blocking mode
423n/a sock.setblocking(False)
424n/a self.loop.run_until_complete(
425n/a self.loop.sock_connect(sock, httpd.address))
426n/a self.loop.run_until_complete(
427n/a self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
428n/a data = self.loop.run_until_complete(
429n/a self.loop.sock_recv(sock, 1024))
430n/a # consume data
431n/a self.loop.run_until_complete(
432n/a self.loop.sock_recv(sock, 1024))
433n/a sock.close()
434n/a self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
435n/a
436n/a def test_sock_client_ops(self):
437n/a with test_utils.run_test_server() as httpd:
438n/a sock = socket.socket()
439n/a self._basetest_sock_client_ops(httpd, sock)
440n/a
441n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
442n/a def test_unix_sock_client_ops(self):
443n/a with test_utils.run_test_unix_server() as httpd:
444n/a sock = socket.socket(socket.AF_UNIX)
445n/a self._basetest_sock_client_ops(httpd, sock)
446n/a
447n/a def test_sock_client_fail(self):
448n/a # Make sure that we will get an unused port
449n/a address = None
450n/a try:
451n/a s = socket.socket()
452n/a s.bind(('127.0.0.1', 0))
453n/a address = s.getsockname()
454n/a finally:
455n/a s.close()
456n/a
457n/a sock = socket.socket()
458n/a sock.setblocking(False)
459n/a with self.assertRaises(ConnectionRefusedError):
460n/a self.loop.run_until_complete(
461n/a self.loop.sock_connect(sock, address))
462n/a sock.close()
463n/a
464n/a def test_sock_accept(self):
465n/a listener = socket.socket()
466n/a listener.setblocking(False)
467n/a listener.bind(('127.0.0.1', 0))
468n/a listener.listen(1)
469n/a client = socket.socket()
470n/a client.connect(listener.getsockname())
471n/a
472n/a f = self.loop.sock_accept(listener)
473n/a conn, addr = self.loop.run_until_complete(f)
474n/a self.assertEqual(conn.gettimeout(), 0)
475n/a self.assertEqual(addr, client.getsockname())
476n/a self.assertEqual(client.getpeername(), listener.getsockname())
477n/a client.close()
478n/a conn.close()
479n/a listener.close()
480n/a
481n/a @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
482n/a def test_add_signal_handler(self):
483n/a caught = 0
484n/a
485n/a def my_handler():
486n/a nonlocal caught
487n/a caught += 1
488n/a
489n/a # Check error behavior first.
490n/a self.assertRaises(
491n/a TypeError, self.loop.add_signal_handler, 'boom', my_handler)
492n/a self.assertRaises(
493n/a TypeError, self.loop.remove_signal_handler, 'boom')
494n/a self.assertRaises(
495n/a ValueError, self.loop.add_signal_handler, signal.NSIG+1,
496n/a my_handler)
497n/a self.assertRaises(
498n/a ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
499n/a self.assertRaises(
500n/a ValueError, self.loop.add_signal_handler, 0, my_handler)
501n/a self.assertRaises(
502n/a ValueError, self.loop.remove_signal_handler, 0)
503n/a self.assertRaises(
504n/a ValueError, self.loop.add_signal_handler, -1, my_handler)
505n/a self.assertRaises(
506n/a ValueError, self.loop.remove_signal_handler, -1)
507n/a self.assertRaises(
508n/a RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
509n/a my_handler)
510n/a # Removing SIGKILL doesn't raise, since we don't call signal().
511n/a self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
512n/a # Now set a handler and handle it.
513n/a self.loop.add_signal_handler(signal.SIGINT, my_handler)
514n/a
515n/a os.kill(os.getpid(), signal.SIGINT)
516n/a test_utils.run_until(self.loop, lambda: caught)
517n/a
518n/a # Removing it should restore the default handler.
519n/a self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
520n/a self.assertEqual(signal.getsignal(signal.SIGINT),
521n/a signal.default_int_handler)
522n/a # Removing again returns False.
523n/a self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
524n/a
525n/a @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
526n/a def test_signal_handling_while_selecting(self):
527n/a # Test with a signal actually arriving during a select() call.
528n/a caught = 0
529n/a
530n/a def my_handler():
531n/a nonlocal caught
532n/a caught += 1
533n/a self.loop.stop()
534n/a
535n/a self.loop.add_signal_handler(signal.SIGALRM, my_handler)
536n/a
537n/a signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once.
538n/a self.loop.run_forever()
539n/a self.assertEqual(caught, 1)
540n/a
541n/a @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
542n/a def test_signal_handling_args(self):
543n/a some_args = (42,)
544n/a caught = 0
545n/a
546n/a def my_handler(*args):
547n/a nonlocal caught
548n/a caught += 1
549n/a self.assertEqual(args, some_args)
550n/a
551n/a self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
552n/a
553n/a signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once.
554n/a self.loop.call_later(0.5, self.loop.stop)
555n/a self.loop.run_forever()
556n/a self.assertEqual(caught, 1)
557n/a
558n/a def _basetest_create_connection(self, connection_fut, check_sockname=True):
559n/a tr, pr = self.loop.run_until_complete(connection_fut)
560n/a self.assertIsInstance(tr, asyncio.Transport)
561n/a self.assertIsInstance(pr, asyncio.Protocol)
562n/a self.assertIs(pr.transport, tr)
563n/a if check_sockname:
564n/a self.assertIsNotNone(tr.get_extra_info('sockname'))
565n/a self.loop.run_until_complete(pr.done)
566n/a self.assertGreater(pr.nbytes, 0)
567n/a tr.close()
568n/a
569n/a def test_create_connection(self):
570n/a with test_utils.run_test_server() as httpd:
571n/a conn_fut = self.loop.create_connection(
572n/a lambda: MyProto(loop=self.loop), *httpd.address)
573n/a self._basetest_create_connection(conn_fut)
574n/a
575n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
576n/a def test_create_unix_connection(self):
577n/a # Issue #20682: On Mac OS X Tiger, getsockname() returns a
578n/a # zero-length address for UNIX socket.
579n/a check_sockname = not osx_tiger()
580n/a
581n/a with test_utils.run_test_unix_server() as httpd:
582n/a conn_fut = self.loop.create_unix_connection(
583n/a lambda: MyProto(loop=self.loop), httpd.address)
584n/a self._basetest_create_connection(conn_fut, check_sockname)
585n/a
586n/a def test_create_connection_sock(self):
587n/a with test_utils.run_test_server() as httpd:
588n/a sock = None
589n/a infos = self.loop.run_until_complete(
590n/a self.loop.getaddrinfo(
591n/a *httpd.address, type=socket.SOCK_STREAM))
592n/a for family, type, proto, cname, address in infos:
593n/a try:
594n/a sock = socket.socket(family=family, type=type, proto=proto)
595n/a sock.setblocking(False)
596n/a self.loop.run_until_complete(
597n/a self.loop.sock_connect(sock, address))
598n/a except:
599n/a pass
600n/a else:
601n/a break
602n/a else:
603n/a assert False, 'Can not create socket.'
604n/a
605n/a f = self.loop.create_connection(
606n/a lambda: MyProto(loop=self.loop), sock=sock)
607n/a tr, pr = self.loop.run_until_complete(f)
608n/a self.assertIsInstance(tr, asyncio.Transport)
609n/a self.assertIsInstance(pr, asyncio.Protocol)
610n/a self.loop.run_until_complete(pr.done)
611n/a self.assertGreater(pr.nbytes, 0)
612n/a tr.close()
613n/a
614n/a def check_ssl_extra_info(self, client, check_sockname=True,
615n/a peername=None, peercert={}):
616n/a if check_sockname:
617n/a self.assertIsNotNone(client.get_extra_info('sockname'))
618n/a if peername:
619n/a self.assertEqual(peername,
620n/a client.get_extra_info('peername'))
621n/a else:
622n/a self.assertIsNotNone(client.get_extra_info('peername'))
623n/a self.assertEqual(peercert,
624n/a client.get_extra_info('peercert'))
625n/a
626n/a # test SSL cipher
627n/a cipher = client.get_extra_info('cipher')
628n/a self.assertIsInstance(cipher, tuple)
629n/a self.assertEqual(len(cipher), 3, cipher)
630n/a self.assertIsInstance(cipher[0], str)
631n/a self.assertIsInstance(cipher[1], str)
632n/a self.assertIsInstance(cipher[2], int)
633n/a
634n/a # test SSL object
635n/a sslobj = client.get_extra_info('ssl_object')
636n/a self.assertIsNotNone(sslobj)
637n/a self.assertEqual(sslobj.compression(),
638n/a client.get_extra_info('compression'))
639n/a self.assertEqual(sslobj.cipher(),
640n/a client.get_extra_info('cipher'))
641n/a self.assertEqual(sslobj.getpeercert(),
642n/a client.get_extra_info('peercert'))
643n/a self.assertEqual(sslobj.compression(),
644n/a client.get_extra_info('compression'))
645n/a
646n/a def _basetest_create_ssl_connection(self, connection_fut,
647n/a check_sockname=True,
648n/a peername=None):
649n/a tr, pr = self.loop.run_until_complete(connection_fut)
650n/a self.assertIsInstance(tr, asyncio.Transport)
651n/a self.assertIsInstance(pr, asyncio.Protocol)
652n/a self.assertTrue('ssl' in tr.__class__.__name__.lower())
653n/a self.check_ssl_extra_info(tr, check_sockname, peername)
654n/a self.loop.run_until_complete(pr.done)
655n/a self.assertGreater(pr.nbytes, 0)
656n/a tr.close()
657n/a
658n/a def _test_create_ssl_connection(self, httpd, create_connection,
659n/a check_sockname=True, peername=None):
660n/a conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
661n/a self._basetest_create_ssl_connection(conn_fut, check_sockname,
662n/a peername)
663n/a
664n/a # ssl.Purpose was introduced in Python 3.4
665n/a if hasattr(ssl, 'Purpose'):
666n/a def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
667n/a cafile=None, capath=None,
668n/a cadata=None):
669n/a """
670n/a A ssl.create_default_context() replacement that doesn't enable
671n/a cert validation.
672n/a """
673n/a self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
674n/a return test_utils.dummy_ssl_context()
675n/a
676n/a # With ssl=True, ssl.create_default_context() should be called
677n/a with mock.patch('ssl.create_default_context',
678n/a side_effect=_dummy_ssl_create_context) as m:
679n/a conn_fut = create_connection(ssl=True)
680n/a self._basetest_create_ssl_connection(conn_fut, check_sockname,
681n/a peername)
682n/a self.assertEqual(m.call_count, 1)
683n/a
684n/a # With the real ssl.create_default_context(), certificate
685n/a # validation will fail
686n/a with self.assertRaises(ssl.SSLError) as cm:
687n/a conn_fut = create_connection(ssl=True)
688n/a # Ignore the "SSL handshake failed" log in debug mode
689n/a with test_utils.disable_logger():
690n/a self._basetest_create_ssl_connection(conn_fut, check_sockname,
691n/a peername)
692n/a
693n/a self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
694n/a
695n/a @unittest.skipIf(ssl is None, 'No ssl module')
696n/a def test_create_ssl_connection(self):
697n/a with test_utils.run_test_server(use_ssl=True) as httpd:
698n/a create_connection = functools.partial(
699n/a self.loop.create_connection,
700n/a lambda: MyProto(loop=self.loop),
701n/a *httpd.address)
702n/a self._test_create_ssl_connection(httpd, create_connection,
703n/a peername=httpd.address)
704n/a
705n/a def test_legacy_create_ssl_connection(self):
706n/a with test_utils.force_legacy_ssl_support():
707n/a self.test_create_ssl_connection()
708n/a
709n/a @unittest.skipIf(ssl is None, 'No ssl module')
710n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
711n/a def test_create_ssl_unix_connection(self):
712n/a # Issue #20682: On Mac OS X Tiger, getsockname() returns a
713n/a # zero-length address for UNIX socket.
714n/a check_sockname = not osx_tiger()
715n/a
716n/a with test_utils.run_test_unix_server(use_ssl=True) as httpd:
717n/a create_connection = functools.partial(
718n/a self.loop.create_unix_connection,
719n/a lambda: MyProto(loop=self.loop), httpd.address,
720n/a server_hostname='127.0.0.1')
721n/a
722n/a self._test_create_ssl_connection(httpd, create_connection,
723n/a check_sockname,
724n/a peername=httpd.address)
725n/a
726n/a def test_legacy_create_ssl_unix_connection(self):
727n/a with test_utils.force_legacy_ssl_support():
728n/a self.test_create_ssl_unix_connection()
729n/a
730n/a def test_create_connection_local_addr(self):
731n/a with test_utils.run_test_server() as httpd:
732n/a port = support.find_unused_port()
733n/a f = self.loop.create_connection(
734n/a lambda: MyProto(loop=self.loop),
735n/a *httpd.address, local_addr=(httpd.address[0], port))
736n/a tr, pr = self.loop.run_until_complete(f)
737n/a expected = pr.transport.get_extra_info('sockname')[1]
738n/a self.assertEqual(port, expected)
739n/a tr.close()
740n/a
741n/a def test_create_connection_local_addr_in_use(self):
742n/a with test_utils.run_test_server() as httpd:
743n/a f = self.loop.create_connection(
744n/a lambda: MyProto(loop=self.loop),
745n/a *httpd.address, local_addr=httpd.address)
746n/a with self.assertRaises(OSError) as cm:
747n/a self.loop.run_until_complete(f)
748n/a self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
749n/a self.assertIn(str(httpd.address), cm.exception.strerror)
750n/a
751n/a def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
752n/a loop = self.loop
753n/a
754n/a class MyProto(MyBaseProto):
755n/a
756n/a def connection_lost(self, exc):
757n/a super().connection_lost(exc)
758n/a loop.call_soon(loop.stop)
759n/a
760n/a def data_received(self, data):
761n/a super().data_received(data)
762n/a self.transport.write(expected_response)
763n/a
764n/a lsock = socket.socket()
765n/a lsock.bind(('127.0.0.1', 0))
766n/a lsock.listen(1)
767n/a addr = lsock.getsockname()
768n/a
769n/a message = b'test data'
770n/a response = None
771n/a expected_response = b'roger'
772n/a
773n/a def client():
774n/a nonlocal response
775n/a try:
776n/a csock = socket.socket()
777n/a if client_ssl is not None:
778n/a csock = client_ssl.wrap_socket(csock)
779n/a csock.connect(addr)
780n/a csock.sendall(message)
781n/a response = csock.recv(99)
782n/a csock.close()
783n/a except Exception as exc:
784n/a print(
785n/a "Failure in client thread in test_connect_accepted_socket",
786n/a exc)
787n/a
788n/a thread = threading.Thread(target=client, daemon=True)
789n/a thread.start()
790n/a
791n/a conn, _ = lsock.accept()
792n/a proto = MyProto(loop=loop)
793n/a proto.loop = loop
794n/a loop.run_until_complete(
795n/a loop.connect_accepted_socket(
796n/a (lambda: proto), conn, ssl=server_ssl))
797n/a loop.run_forever()
798n/a proto.transport.close()
799n/a lsock.close()
800n/a
801n/a thread.join(1)
802n/a self.assertFalse(thread.is_alive())
803n/a self.assertEqual(proto.state, 'CLOSED')
804n/a self.assertEqual(proto.nbytes, len(message))
805n/a self.assertEqual(response, expected_response)
806n/a
807n/a @unittest.skipIf(ssl is None, 'No ssl module')
808n/a def test_ssl_connect_accepted_socket(self):
809n/a if (sys.platform == 'win32' and
810n/a sys.version_info < (3, 5) and
811n/a isinstance(self.loop, proactor_events.BaseProactorEventLoop)
812n/a ):
813n/a raise unittest.SkipTest(
814n/a 'SSL not supported with proactor event loops before Python 3.5'
815n/a )
816n/a
817n/a server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
818n/a server_context.load_cert_chain(ONLYCERT, ONLYKEY)
819n/a if hasattr(server_context, 'check_hostname'):
820n/a server_context.check_hostname = False
821n/a server_context.verify_mode = ssl.CERT_NONE
822n/a
823n/a client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
824n/a if hasattr(server_context, 'check_hostname'):
825n/a client_context.check_hostname = False
826n/a client_context.verify_mode = ssl.CERT_NONE
827n/a
828n/a self.test_connect_accepted_socket(server_context, client_context)
829n/a
830n/a @mock.patch('asyncio.base_events.socket')
831n/a def create_server_multiple_hosts(self, family, hosts, mock_sock):
832n/a @asyncio.coroutine
833n/a def getaddrinfo(host, port, *args, **kw):
834n/a if family == socket.AF_INET:
835n/a return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
836n/a else:
837n/a return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
838n/a
839n/a def getaddrinfo_task(*args, **kwds):
840n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
841n/a
842n/a unique_hosts = set(hosts)
843n/a
844n/a if family == socket.AF_INET:
845n/a mock_sock.socket().getsockbyname.side_effect = [
846n/a (host, 80) for host in unique_hosts]
847n/a else:
848n/a mock_sock.socket().getsockbyname.side_effect = [
849n/a (host, 80, 0, 0) for host in unique_hosts]
850n/a self.loop.getaddrinfo = getaddrinfo_task
851n/a self.loop._start_serving = mock.Mock()
852n/a self.loop._stop_serving = mock.Mock()
853n/a f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
854n/a server = self.loop.run_until_complete(f)
855n/a self.addCleanup(server.close)
856n/a server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
857n/a self.assertEqual(server_hosts, unique_hosts)
858n/a
859n/a def test_create_server_multiple_hosts_ipv4(self):
860n/a self.create_server_multiple_hosts(socket.AF_INET,
861n/a ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
862n/a
863n/a def test_create_server_multiple_hosts_ipv6(self):
864n/a self.create_server_multiple_hosts(socket.AF_INET6,
865n/a ['::1', '::2', '::1'])
866n/a
867n/a def test_create_server(self):
868n/a proto = MyProto(self.loop)
869n/a f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
870n/a server = self.loop.run_until_complete(f)
871n/a self.assertEqual(len(server.sockets), 1)
872n/a sock = server.sockets[0]
873n/a host, port = sock.getsockname()
874n/a self.assertEqual(host, '0.0.0.0')
875n/a client = socket.socket()
876n/a client.connect(('127.0.0.1', port))
877n/a client.sendall(b'xxx')
878n/a
879n/a self.loop.run_until_complete(proto.connected)
880n/a self.assertEqual('CONNECTED', proto.state)
881n/a
882n/a test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
883n/a self.assertEqual(3, proto.nbytes)
884n/a
885n/a # extra info is available
886n/a self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
887n/a self.assertEqual('127.0.0.1',
888n/a proto.transport.get_extra_info('peername')[0])
889n/a
890n/a # close connection
891n/a proto.transport.close()
892n/a self.loop.run_until_complete(proto.done)
893n/a
894n/a self.assertEqual('CLOSED', proto.state)
895n/a
896n/a # the client socket must be closed after to avoid ECONNRESET upon
897n/a # recv()/send() on the serving socket
898n/a client.close()
899n/a
900n/a # close server
901n/a server.close()
902n/a
903n/a @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
904n/a def test_create_server_reuse_port(self):
905n/a proto = MyProto(self.loop)
906n/a f = self.loop.create_server(
907n/a lambda: proto, '0.0.0.0', 0)
908n/a server = self.loop.run_until_complete(f)
909n/a self.assertEqual(len(server.sockets), 1)
910n/a sock = server.sockets[0]
911n/a self.assertFalse(
912n/a sock.getsockopt(
913n/a socket.SOL_SOCKET, socket.SO_REUSEPORT))
914n/a server.close()
915n/a
916n/a test_utils.run_briefly(self.loop)
917n/a
918n/a proto = MyProto(self.loop)
919n/a f = self.loop.create_server(
920n/a lambda: proto, '0.0.0.0', 0, reuse_port=True)
921n/a server = self.loop.run_until_complete(f)
922n/a self.assertEqual(len(server.sockets), 1)
923n/a sock = server.sockets[0]
924n/a self.assertTrue(
925n/a sock.getsockopt(
926n/a socket.SOL_SOCKET, socket.SO_REUSEPORT))
927n/a server.close()
928n/a
929n/a def _make_unix_server(self, factory, **kwargs):
930n/a path = test_utils.gen_unix_socket_path()
931n/a self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
932n/a
933n/a f = self.loop.create_unix_server(factory, path, **kwargs)
934n/a server = self.loop.run_until_complete(f)
935n/a
936n/a return server, path
937n/a
938n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
939n/a def test_create_unix_server(self):
940n/a proto = MyProto(loop=self.loop)
941n/a server, path = self._make_unix_server(lambda: proto)
942n/a self.assertEqual(len(server.sockets), 1)
943n/a
944n/a client = socket.socket(socket.AF_UNIX)
945n/a client.connect(path)
946n/a client.sendall(b'xxx')
947n/a
948n/a self.loop.run_until_complete(proto.connected)
949n/a self.assertEqual('CONNECTED', proto.state)
950n/a test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
951n/a self.assertEqual(3, proto.nbytes)
952n/a
953n/a # close connection
954n/a proto.transport.close()
955n/a self.loop.run_until_complete(proto.done)
956n/a
957n/a self.assertEqual('CLOSED', proto.state)
958n/a
959n/a # the client socket must be closed after to avoid ECONNRESET upon
960n/a # recv()/send() on the serving socket
961n/a client.close()
962n/a
963n/a # close server
964n/a server.close()
965n/a
966n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
967n/a def test_create_unix_server_path_socket_error(self):
968n/a proto = MyProto(loop=self.loop)
969n/a sock = socket.socket()
970n/a with sock:
971n/a f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
972n/a with self.assertRaisesRegex(ValueError,
973n/a 'path and sock can not be specified '
974n/a 'at the same time'):
975n/a self.loop.run_until_complete(f)
976n/a
977n/a def _create_ssl_context(self, certfile, keyfile=None):
978n/a sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
979n/a sslcontext.options |= ssl.OP_NO_SSLv2
980n/a sslcontext.load_cert_chain(certfile, keyfile)
981n/a return sslcontext
982n/a
983n/a def _make_ssl_server(self, factory, certfile, keyfile=None):
984n/a sslcontext = self._create_ssl_context(certfile, keyfile)
985n/a
986n/a f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
987n/a server = self.loop.run_until_complete(f)
988n/a
989n/a sock = server.sockets[0]
990n/a host, port = sock.getsockname()
991n/a self.assertEqual(host, '127.0.0.1')
992n/a return server, host, port
993n/a
994n/a def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
995n/a sslcontext = self._create_ssl_context(certfile, keyfile)
996n/a return self._make_unix_server(factory, ssl=sslcontext)
997n/a
998n/a @unittest.skipIf(ssl is None, 'No ssl module')
999n/a def test_create_server_ssl(self):
1000n/a proto = MyProto(loop=self.loop)
1001n/a server, host, port = self._make_ssl_server(
1002n/a lambda: proto, ONLYCERT, ONLYKEY)
1003n/a
1004n/a f_c = self.loop.create_connection(MyBaseProto, host, port,
1005n/a ssl=test_utils.dummy_ssl_context())
1006n/a client, pr = self.loop.run_until_complete(f_c)
1007n/a
1008n/a client.write(b'xxx')
1009n/a self.loop.run_until_complete(proto.connected)
1010n/a self.assertEqual('CONNECTED', proto.state)
1011n/a
1012n/a test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1013n/a self.assertEqual(3, proto.nbytes)
1014n/a
1015n/a # extra info is available
1016n/a self.check_ssl_extra_info(client, peername=(host, port))
1017n/a
1018n/a # close connection
1019n/a proto.transport.close()
1020n/a self.loop.run_until_complete(proto.done)
1021n/a self.assertEqual('CLOSED', proto.state)
1022n/a
1023n/a # the client socket must be closed after to avoid ECONNRESET upon
1024n/a # recv()/send() on the serving socket
1025n/a client.close()
1026n/a
1027n/a # stop serving
1028n/a server.close()
1029n/a
1030n/a def test_legacy_create_server_ssl(self):
1031n/a with test_utils.force_legacy_ssl_support():
1032n/a self.test_create_server_ssl()
1033n/a
1034n/a @unittest.skipIf(ssl is None, 'No ssl module')
1035n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1036n/a def test_create_unix_server_ssl(self):
1037n/a proto = MyProto(loop=self.loop)
1038n/a server, path = self._make_ssl_unix_server(
1039n/a lambda: proto, ONLYCERT, ONLYKEY)
1040n/a
1041n/a f_c = self.loop.create_unix_connection(
1042n/a MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
1043n/a server_hostname='')
1044n/a
1045n/a client, pr = self.loop.run_until_complete(f_c)
1046n/a
1047n/a client.write(b'xxx')
1048n/a self.loop.run_until_complete(proto.connected)
1049n/a self.assertEqual('CONNECTED', proto.state)
1050n/a test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1051n/a self.assertEqual(3, proto.nbytes)
1052n/a
1053n/a # close connection
1054n/a proto.transport.close()
1055n/a self.loop.run_until_complete(proto.done)
1056n/a self.assertEqual('CLOSED', proto.state)
1057n/a
1058n/a # the client socket must be closed after to avoid ECONNRESET upon
1059n/a # recv()/send() on the serving socket
1060n/a client.close()
1061n/a
1062n/a # stop serving
1063n/a server.close()
1064n/a
1065n/a def test_legacy_create_unix_server_ssl(self):
1066n/a with test_utils.force_legacy_ssl_support():
1067n/a self.test_create_unix_server_ssl()
1068n/a
1069n/a @unittest.skipIf(ssl is None, 'No ssl module')
1070n/a def test_create_server_ssl_verify_failed(self):
1071n/a proto = MyProto(loop=self.loop)
1072n/a server, host, port = self._make_ssl_server(
1073n/a lambda: proto, SIGNED_CERTFILE)
1074n/a
1075n/a sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1076n/a sslcontext_client.options |= ssl.OP_NO_SSLv2
1077n/a sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1078n/a if hasattr(sslcontext_client, 'check_hostname'):
1079n/a sslcontext_client.check_hostname = True
1080n/a
1081n/a
1082n/a # no CA loaded
1083n/a f_c = self.loop.create_connection(MyProto, host, port,
1084n/a ssl=sslcontext_client)
1085n/a with mock.patch.object(self.loop, 'call_exception_handler'):
1086n/a with test_utils.disable_logger():
1087n/a with self.assertRaisesRegex(ssl.SSLError,
1088n/a '(?i)certificate.verify.failed'):
1089n/a self.loop.run_until_complete(f_c)
1090n/a
1091n/a # execute the loop to log the connection error
1092n/a test_utils.run_briefly(self.loop)
1093n/a
1094n/a # close connection
1095n/a self.assertIsNone(proto.transport)
1096n/a server.close()
1097n/a
1098n/a def test_legacy_create_server_ssl_verify_failed(self):
1099n/a with test_utils.force_legacy_ssl_support():
1100n/a self.test_create_server_ssl_verify_failed()
1101n/a
1102n/a @unittest.skipIf(ssl is None, 'No ssl module')
1103n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1104n/a def test_create_unix_server_ssl_verify_failed(self):
1105n/a proto = MyProto(loop=self.loop)
1106n/a server, path = self._make_ssl_unix_server(
1107n/a lambda: proto, SIGNED_CERTFILE)
1108n/a
1109n/a sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1110n/a sslcontext_client.options |= ssl.OP_NO_SSLv2
1111n/a sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1112n/a if hasattr(sslcontext_client, 'check_hostname'):
1113n/a sslcontext_client.check_hostname = True
1114n/a
1115n/a # no CA loaded
1116n/a f_c = self.loop.create_unix_connection(MyProto, path,
1117n/a ssl=sslcontext_client,
1118n/a server_hostname='invalid')
1119n/a with mock.patch.object(self.loop, 'call_exception_handler'):
1120n/a with test_utils.disable_logger():
1121n/a with self.assertRaisesRegex(ssl.SSLError,
1122n/a '(?i)certificate.verify.failed'):
1123n/a self.loop.run_until_complete(f_c)
1124n/a
1125n/a # execute the loop to log the connection error
1126n/a test_utils.run_briefly(self.loop)
1127n/a
1128n/a # close connection
1129n/a self.assertIsNone(proto.transport)
1130n/a server.close()
1131n/a
1132n/a
1133n/a def test_legacy_create_unix_server_ssl_verify_failed(self):
1134n/a with test_utils.force_legacy_ssl_support():
1135n/a self.test_create_unix_server_ssl_verify_failed()
1136n/a
1137n/a @unittest.skipIf(ssl is None, 'No ssl module')
1138n/a def test_create_server_ssl_match_failed(self):
1139n/a proto = MyProto(loop=self.loop)
1140n/a server, host, port = self._make_ssl_server(
1141n/a lambda: proto, SIGNED_CERTFILE)
1142n/a
1143n/a sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1144n/a sslcontext_client.options |= ssl.OP_NO_SSLv2
1145n/a sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1146n/a sslcontext_client.load_verify_locations(
1147n/a cafile=SIGNING_CA)
1148n/a if hasattr(sslcontext_client, 'check_hostname'):
1149n/a sslcontext_client.check_hostname = True
1150n/a
1151n/a # incorrect server_hostname
1152n/a f_c = self.loop.create_connection(MyProto, host, port,
1153n/a ssl=sslcontext_client)
1154n/a with mock.patch.object(self.loop, 'call_exception_handler'):
1155n/a with test_utils.disable_logger():
1156n/a with self.assertRaisesRegex(
1157n/a ssl.CertificateError,
1158n/a "hostname '127.0.0.1' doesn't match 'localhost'"):
1159n/a self.loop.run_until_complete(f_c)
1160n/a
1161n/a # close connection
1162n/a proto.transport.close()
1163n/a server.close()
1164n/a
1165n/a def test_legacy_create_server_ssl_match_failed(self):
1166n/a with test_utils.force_legacy_ssl_support():
1167n/a self.test_create_server_ssl_match_failed()
1168n/a
1169n/a @unittest.skipIf(ssl is None, 'No ssl module')
1170n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1171n/a def test_create_unix_server_ssl_verified(self):
1172n/a proto = MyProto(loop=self.loop)
1173n/a server, path = self._make_ssl_unix_server(
1174n/a lambda: proto, SIGNED_CERTFILE)
1175n/a
1176n/a sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1177n/a sslcontext_client.options |= ssl.OP_NO_SSLv2
1178n/a sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1179n/a sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
1180n/a if hasattr(sslcontext_client, 'check_hostname'):
1181n/a sslcontext_client.check_hostname = True
1182n/a
1183n/a # Connection succeeds with correct CA and server hostname.
1184n/a f_c = self.loop.create_unix_connection(MyProto, path,
1185n/a ssl=sslcontext_client,
1186n/a server_hostname='localhost')
1187n/a client, pr = self.loop.run_until_complete(f_c)
1188n/a
1189n/a # close connection
1190n/a proto.transport.close()
1191n/a client.close()
1192n/a server.close()
1193n/a self.loop.run_until_complete(proto.done)
1194n/a
1195n/a def test_legacy_create_unix_server_ssl_verified(self):
1196n/a with test_utils.force_legacy_ssl_support():
1197n/a self.test_create_unix_server_ssl_verified()
1198n/a
1199n/a @unittest.skipIf(ssl is None, 'No ssl module')
1200n/a def test_create_server_ssl_verified(self):
1201n/a proto = MyProto(loop=self.loop)
1202n/a server, host, port = self._make_ssl_server(
1203n/a lambda: proto, SIGNED_CERTFILE)
1204n/a
1205n/a sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1206n/a sslcontext_client.options |= ssl.OP_NO_SSLv2
1207n/a sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1208n/a sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
1209n/a if hasattr(sslcontext_client, 'check_hostname'):
1210n/a sslcontext_client.check_hostname = True
1211n/a
1212n/a # Connection succeeds with correct CA and server hostname.
1213n/a f_c = self.loop.create_connection(MyProto, host, port,
1214n/a ssl=sslcontext_client,
1215n/a server_hostname='localhost')
1216n/a client, pr = self.loop.run_until_complete(f_c)
1217n/a
1218n/a # extra info is available
1219n/a self.check_ssl_extra_info(client,peername=(host, port),
1220n/a peercert=PEERCERT)
1221n/a
1222n/a # close connection
1223n/a proto.transport.close()
1224n/a client.close()
1225n/a server.close()
1226n/a self.loop.run_until_complete(proto.done)
1227n/a
1228n/a def test_legacy_create_server_ssl_verified(self):
1229n/a with test_utils.force_legacy_ssl_support():
1230n/a self.test_create_server_ssl_verified()
1231n/a
1232n/a def test_create_server_sock(self):
1233n/a proto = asyncio.Future(loop=self.loop)
1234n/a
1235n/a class TestMyProto(MyProto):
1236n/a def connection_made(self, transport):
1237n/a super().connection_made(transport)
1238n/a proto.set_result(self)
1239n/a
1240n/a sock_ob = socket.socket(type=socket.SOCK_STREAM)
1241n/a sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1242n/a sock_ob.bind(('0.0.0.0', 0))
1243n/a
1244n/a f = self.loop.create_server(TestMyProto, sock=sock_ob)
1245n/a server = self.loop.run_until_complete(f)
1246n/a sock = server.sockets[0]
1247n/a self.assertIs(sock, sock_ob)
1248n/a
1249n/a host, port = sock.getsockname()
1250n/a self.assertEqual(host, '0.0.0.0')
1251n/a client = socket.socket()
1252n/a client.connect(('127.0.0.1', port))
1253n/a client.send(b'xxx')
1254n/a client.close()
1255n/a server.close()
1256n/a
1257n/a def test_create_server_addr_in_use(self):
1258n/a sock_ob = socket.socket(type=socket.SOCK_STREAM)
1259n/a sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1260n/a sock_ob.bind(('0.0.0.0', 0))
1261n/a
1262n/a f = self.loop.create_server(MyProto, sock=sock_ob)
1263n/a server = self.loop.run_until_complete(f)
1264n/a sock = server.sockets[0]
1265n/a host, port = sock.getsockname()
1266n/a
1267n/a f = self.loop.create_server(MyProto, host=host, port=port)
1268n/a with self.assertRaises(OSError) as cm:
1269n/a self.loop.run_until_complete(f)
1270n/a self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1271n/a
1272n/a server.close()
1273n/a
1274n/a @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1275n/a def test_create_server_dual_stack(self):
1276n/a f_proto = asyncio.Future(loop=self.loop)
1277n/a
1278n/a class TestMyProto(MyProto):
1279n/a def connection_made(self, transport):
1280n/a super().connection_made(transport)
1281n/a f_proto.set_result(self)
1282n/a
1283n/a try_count = 0
1284n/a while True:
1285n/a try:
1286n/a port = support.find_unused_port()
1287n/a f = self.loop.create_server(TestMyProto, host=None, port=port)
1288n/a server = self.loop.run_until_complete(f)
1289n/a except OSError as ex:
1290n/a if ex.errno == errno.EADDRINUSE:
1291n/a try_count += 1
1292n/a self.assertGreaterEqual(5, try_count)
1293n/a continue
1294n/a else:
1295n/a raise
1296n/a else:
1297n/a break
1298n/a client = socket.socket()
1299n/a client.connect(('127.0.0.1', port))
1300n/a client.send(b'xxx')
1301n/a proto = self.loop.run_until_complete(f_proto)
1302n/a proto.transport.close()
1303n/a client.close()
1304n/a
1305n/a f_proto = asyncio.Future(loop=self.loop)
1306n/a client = socket.socket(socket.AF_INET6)
1307n/a client.connect(('::1', port))
1308n/a client.send(b'xxx')
1309n/a proto = self.loop.run_until_complete(f_proto)
1310n/a proto.transport.close()
1311n/a client.close()
1312n/a
1313n/a server.close()
1314n/a
1315n/a def test_server_close(self):
1316n/a f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1317n/a server = self.loop.run_until_complete(f)
1318n/a sock = server.sockets[0]
1319n/a host, port = sock.getsockname()
1320n/a
1321n/a client = socket.socket()
1322n/a client.connect(('127.0.0.1', port))
1323n/a client.send(b'xxx')
1324n/a client.close()
1325n/a
1326n/a server.close()
1327n/a
1328n/a client = socket.socket()
1329n/a self.assertRaises(
1330n/a ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1331n/a client.close()
1332n/a
1333n/a def test_create_datagram_endpoint(self):
1334n/a class TestMyDatagramProto(MyDatagramProto):
1335n/a def __init__(inner_self):
1336n/a super().__init__(loop=self.loop)
1337n/a
1338n/a def datagram_received(self, data, addr):
1339n/a super().datagram_received(data, addr)
1340n/a self.transport.sendto(b'resp:'+data, addr)
1341n/a
1342n/a coro = self.loop.create_datagram_endpoint(
1343n/a TestMyDatagramProto, local_addr=('127.0.0.1', 0))
1344n/a s_transport, server = self.loop.run_until_complete(coro)
1345n/a host, port = s_transport.get_extra_info('sockname')
1346n/a
1347n/a self.assertIsInstance(s_transport, asyncio.Transport)
1348n/a self.assertIsInstance(server, TestMyDatagramProto)
1349n/a self.assertEqual('INITIALIZED', server.state)
1350n/a self.assertIs(server.transport, s_transport)
1351n/a
1352n/a coro = self.loop.create_datagram_endpoint(
1353n/a lambda: MyDatagramProto(loop=self.loop),
1354n/a remote_addr=(host, port))
1355n/a transport, client = self.loop.run_until_complete(coro)
1356n/a
1357n/a self.assertIsInstance(transport, asyncio.Transport)
1358n/a self.assertIsInstance(client, MyDatagramProto)
1359n/a self.assertEqual('INITIALIZED', client.state)
1360n/a self.assertIs(client.transport, transport)
1361n/a
1362n/a transport.sendto(b'xxx')
1363n/a test_utils.run_until(self.loop, lambda: server.nbytes)
1364n/a self.assertEqual(3, server.nbytes)
1365n/a test_utils.run_until(self.loop, lambda: client.nbytes)
1366n/a
1367n/a # received
1368n/a self.assertEqual(8, client.nbytes)
1369n/a
1370n/a # extra info is available
1371n/a self.assertIsNotNone(transport.get_extra_info('sockname'))
1372n/a
1373n/a # close connection
1374n/a transport.close()
1375n/a self.loop.run_until_complete(client.done)
1376n/a self.assertEqual('CLOSED', client.state)
1377n/a server.transport.close()
1378n/a
1379n/a def test_create_datagram_endpoint_sock(self):
1380n/a if (sys.platform == 'win32' and
1381n/a isinstance(self.loop, proactor_events.BaseProactorEventLoop)):
1382n/a raise unittest.SkipTest(
1383n/a 'UDP is not supported with proactor event loops')
1384n/a
1385n/a sock = None
1386n/a local_address = ('127.0.0.1', 0)
1387n/a infos = self.loop.run_until_complete(
1388n/a self.loop.getaddrinfo(
1389n/a *local_address, type=socket.SOCK_DGRAM))
1390n/a for family, type, proto, cname, address in infos:
1391n/a try:
1392n/a sock = socket.socket(family=family, type=type, proto=proto)
1393n/a sock.setblocking(False)
1394n/a sock.bind(address)
1395n/a except:
1396n/a pass
1397n/a else:
1398n/a break
1399n/a else:
1400n/a assert False, 'Can not create socket.'
1401n/a
1402n/a f = self.loop.create_datagram_endpoint(
1403n/a lambda: MyDatagramProto(loop=self.loop), sock=sock)
1404n/a tr, pr = self.loop.run_until_complete(f)
1405n/a self.assertIsInstance(tr, asyncio.Transport)
1406n/a self.assertIsInstance(pr, MyDatagramProto)
1407n/a tr.close()
1408n/a self.loop.run_until_complete(pr.done)
1409n/a
1410n/a def test_internal_fds(self):
1411n/a loop = self.create_event_loop()
1412n/a if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1413n/a loop.close()
1414n/a self.skipTest('loop is not a BaseSelectorEventLoop')
1415n/a
1416n/a self.assertEqual(1, loop._internal_fds)
1417n/a loop.close()
1418n/a self.assertEqual(0, loop._internal_fds)
1419n/a self.assertIsNone(loop._csock)
1420n/a self.assertIsNone(loop._ssock)
1421n/a
1422n/a @unittest.skipUnless(sys.platform != 'win32',
1423n/a "Don't support pipes for Windows")
1424n/a def test_read_pipe(self):
1425n/a proto = MyReadPipeProto(loop=self.loop)
1426n/a
1427n/a rpipe, wpipe = os.pipe()
1428n/a pipeobj = io.open(rpipe, 'rb', 1024)
1429n/a
1430n/a @asyncio.coroutine
1431n/a def connect():
1432n/a t, p = yield from self.loop.connect_read_pipe(
1433n/a lambda: proto, pipeobj)
1434n/a self.assertIs(p, proto)
1435n/a self.assertIs(t, proto.transport)
1436n/a self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1437n/a self.assertEqual(0, proto.nbytes)
1438n/a
1439n/a self.loop.run_until_complete(connect())
1440n/a
1441n/a os.write(wpipe, b'1')
1442n/a test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1443n/a self.assertEqual(1, proto.nbytes)
1444n/a
1445n/a os.write(wpipe, b'2345')
1446n/a test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1447n/a self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1448n/a self.assertEqual(5, proto.nbytes)
1449n/a
1450n/a os.close(wpipe)
1451n/a self.loop.run_until_complete(proto.done)
1452n/a self.assertEqual(
1453n/a ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1454n/a # extra info is available
1455n/a self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1456n/a
1457n/a @unittest.skipUnless(sys.platform != 'win32',
1458n/a "Don't support pipes for Windows")
1459n/a def test_unclosed_pipe_transport(self):
1460n/a # This test reproduces the issue #314 on GitHub
1461n/a loop = self.create_event_loop()
1462n/a read_proto = MyReadPipeProto(loop=loop)
1463n/a write_proto = MyWritePipeProto(loop=loop)
1464n/a
1465n/a rpipe, wpipe = os.pipe()
1466n/a rpipeobj = io.open(rpipe, 'rb', 1024)
1467n/a wpipeobj = io.open(wpipe, 'w', 1024)
1468n/a
1469n/a @asyncio.coroutine
1470n/a def connect():
1471n/a read_transport, _ = yield from loop.connect_read_pipe(
1472n/a lambda: read_proto, rpipeobj)
1473n/a write_transport, _ = yield from loop.connect_write_pipe(
1474n/a lambda: write_proto, wpipeobj)
1475n/a return read_transport, write_transport
1476n/a
1477n/a # Run and close the loop without closing the transports
1478n/a read_transport, write_transport = loop.run_until_complete(connect())
1479n/a loop.close()
1480n/a
1481n/a # These 'repr' calls used to raise an AttributeError
1482n/a # See Issue #314 on GitHub
1483n/a self.assertIn('open', repr(read_transport))
1484n/a self.assertIn('open', repr(write_transport))
1485n/a
1486n/a # Clean up (avoid ResourceWarning)
1487n/a rpipeobj.close()
1488n/a wpipeobj.close()
1489n/a read_transport._pipe = None
1490n/a write_transport._pipe = None
1491n/a
1492n/a @unittest.skipUnless(sys.platform != 'win32',
1493n/a "Don't support pipes for Windows")
1494n/a # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1495n/a # older than 10.6 (Snow Leopard)
1496n/a @support.requires_mac_ver(10, 6)
1497n/a # Issue #20495: The test hangs on FreeBSD 7.2 but pass on FreeBSD 9
1498n/a @support.requires_freebsd_version(8)
1499n/a def test_read_pty_output(self):
1500n/a proto = MyReadPipeProto(loop=self.loop)
1501n/a
1502n/a master, slave = os.openpty()
1503n/a master_read_obj = io.open(master, 'rb', 0)
1504n/a
1505n/a @asyncio.coroutine
1506n/a def connect():
1507n/a t, p = yield from self.loop.connect_read_pipe(lambda: proto,
1508n/a master_read_obj)
1509n/a self.assertIs(p, proto)
1510n/a self.assertIs(t, proto.transport)
1511n/a self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1512n/a self.assertEqual(0, proto.nbytes)
1513n/a
1514n/a self.loop.run_until_complete(connect())
1515n/a
1516n/a os.write(slave, b'1')
1517n/a test_utils.run_until(self.loop, lambda: proto.nbytes)
1518n/a self.assertEqual(1, proto.nbytes)
1519n/a
1520n/a os.write(slave, b'2345')
1521n/a test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1522n/a self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1523n/a self.assertEqual(5, proto.nbytes)
1524n/a
1525n/a os.close(slave)
1526n/a self.loop.run_until_complete(proto.done)
1527n/a self.assertEqual(
1528n/a ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1529n/a # extra info is available
1530n/a self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1531n/a
1532n/a @unittest.skipUnless(sys.platform != 'win32',
1533n/a "Don't support pipes for Windows")
1534n/a def test_write_pipe(self):
1535n/a rpipe, wpipe = os.pipe()
1536n/a pipeobj = io.open(wpipe, 'wb', 1024)
1537n/a
1538n/a proto = MyWritePipeProto(loop=self.loop)
1539n/a connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1540n/a transport, p = self.loop.run_until_complete(connect)
1541n/a self.assertIs(p, proto)
1542n/a self.assertIs(transport, proto.transport)
1543n/a self.assertEqual('CONNECTED', proto.state)
1544n/a
1545n/a transport.write(b'1')
1546n/a
1547n/a data = bytearray()
1548n/a def reader(data):
1549n/a chunk = os.read(rpipe, 1024)
1550n/a data += chunk
1551n/a return len(data)
1552n/a
1553n/a test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1554n/a self.assertEqual(b'1', data)
1555n/a
1556n/a transport.write(b'2345')
1557n/a test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1558n/a self.assertEqual(b'12345', data)
1559n/a self.assertEqual('CONNECTED', proto.state)
1560n/a
1561n/a os.close(rpipe)
1562n/a
1563n/a # extra info is available
1564n/a self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1565n/a
1566n/a # close connection
1567n/a proto.transport.close()
1568n/a self.loop.run_until_complete(proto.done)
1569n/a self.assertEqual('CLOSED', proto.state)
1570n/a
1571n/a @unittest.skipUnless(sys.platform != 'win32',
1572n/a "Don't support pipes for Windows")
1573n/a def test_write_pipe_disconnect_on_close(self):
1574n/a rsock, wsock = test_utils.socketpair()
1575n/a rsock.setblocking(False)
1576n/a pipeobj = io.open(wsock.detach(), 'wb', 1024)
1577n/a
1578n/a proto = MyWritePipeProto(loop=self.loop)
1579n/a connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1580n/a transport, p = self.loop.run_until_complete(connect)
1581n/a self.assertIs(p, proto)
1582n/a self.assertIs(transport, proto.transport)
1583n/a self.assertEqual('CONNECTED', proto.state)
1584n/a
1585n/a transport.write(b'1')
1586n/a data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1587n/a self.assertEqual(b'1', data)
1588n/a
1589n/a rsock.close()
1590n/a
1591n/a self.loop.run_until_complete(proto.done)
1592n/a self.assertEqual('CLOSED', proto.state)
1593n/a
1594n/a @unittest.skipUnless(sys.platform != 'win32',
1595n/a "Don't support pipes for Windows")
1596n/a # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1597n/a # older than 10.6 (Snow Leopard)
1598n/a @support.requires_mac_ver(10, 6)
1599n/a def test_write_pty(self):
1600n/a master, slave = os.openpty()
1601n/a slave_write_obj = io.open(slave, 'wb', 0)
1602n/a
1603n/a proto = MyWritePipeProto(loop=self.loop)
1604n/a connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1605n/a transport, p = self.loop.run_until_complete(connect)
1606n/a self.assertIs(p, proto)
1607n/a self.assertIs(transport, proto.transport)
1608n/a self.assertEqual('CONNECTED', proto.state)
1609n/a
1610n/a transport.write(b'1')
1611n/a
1612n/a data = bytearray()
1613n/a def reader(data):
1614n/a chunk = os.read(master, 1024)
1615n/a data += chunk
1616n/a return len(data)
1617n/a
1618n/a test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1619n/a timeout=10)
1620n/a self.assertEqual(b'1', data)
1621n/a
1622n/a transport.write(b'2345')
1623n/a test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1624n/a timeout=10)
1625n/a self.assertEqual(b'12345', data)
1626n/a self.assertEqual('CONNECTED', proto.state)
1627n/a
1628n/a os.close(master)
1629n/a
1630n/a # extra info is available
1631n/a self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1632n/a
1633n/a # close connection
1634n/a proto.transport.close()
1635n/a self.loop.run_until_complete(proto.done)
1636n/a self.assertEqual('CLOSED', proto.state)
1637n/a
1638n/a @unittest.skipUnless(sys.platform != 'win32',
1639n/a "Don't support pipes for Windows")
1640n/a # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1641n/a # older than 10.6 (Snow Leopard)
1642n/a @support.requires_mac_ver(10, 6)
1643n/a def test_bidirectional_pty(self):
1644n/a master, read_slave = os.openpty()
1645n/a write_slave = os.dup(read_slave)
1646n/a tty.setraw(read_slave)
1647n/a
1648n/a slave_read_obj = io.open(read_slave, 'rb', 0)
1649n/a read_proto = MyReadPipeProto(loop=self.loop)
1650n/a read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1651n/a slave_read_obj)
1652n/a read_transport, p = self.loop.run_until_complete(read_connect)
1653n/a self.assertIs(p, read_proto)
1654n/a self.assertIs(read_transport, read_proto.transport)
1655n/a self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1656n/a self.assertEqual(0, read_proto.nbytes)
1657n/a
1658n/a
1659n/a slave_write_obj = io.open(write_slave, 'wb', 0)
1660n/a write_proto = MyWritePipeProto(loop=self.loop)
1661n/a write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1662n/a slave_write_obj)
1663n/a write_transport, p = self.loop.run_until_complete(write_connect)
1664n/a self.assertIs(p, write_proto)
1665n/a self.assertIs(write_transport, write_proto.transport)
1666n/a self.assertEqual('CONNECTED', write_proto.state)
1667n/a
1668n/a data = bytearray()
1669n/a def reader(data):
1670n/a chunk = os.read(master, 1024)
1671n/a data += chunk
1672n/a return len(data)
1673n/a
1674n/a write_transport.write(b'1')
1675n/a test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
1676n/a self.assertEqual(b'1', data)
1677n/a self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1678n/a self.assertEqual('CONNECTED', write_proto.state)
1679n/a
1680n/a os.write(master, b'a')
1681n/a test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1682n/a timeout=10)
1683n/a self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1684n/a self.assertEqual(1, read_proto.nbytes)
1685n/a self.assertEqual('CONNECTED', write_proto.state)
1686n/a
1687n/a write_transport.write(b'2345')
1688n/a test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
1689n/a self.assertEqual(b'12345', data)
1690n/a self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1691n/a self.assertEqual('CONNECTED', write_proto.state)
1692n/a
1693n/a os.write(master, b'bcde')
1694n/a test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1695n/a timeout=10)
1696n/a self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1697n/a self.assertEqual(5, read_proto.nbytes)
1698n/a self.assertEqual('CONNECTED', write_proto.state)
1699n/a
1700n/a os.close(master)
1701n/a
1702n/a read_transport.close()
1703n/a self.loop.run_until_complete(read_proto.done)
1704n/a self.assertEqual(
1705n/a ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1706n/a
1707n/a write_transport.close()
1708n/a self.loop.run_until_complete(write_proto.done)
1709n/a self.assertEqual('CLOSED', write_proto.state)
1710n/a
1711n/a def test_prompt_cancellation(self):
1712n/a r, w = test_utils.socketpair()
1713n/a r.setblocking(False)
1714n/a f = self.loop.sock_recv(r, 1)
1715n/a ov = getattr(f, 'ov', None)
1716n/a if ov is not None:
1717n/a self.assertTrue(ov.pending)
1718n/a
1719n/a @asyncio.coroutine
1720n/a def main():
1721n/a try:
1722n/a self.loop.call_soon(f.cancel)
1723n/a yield from f
1724n/a except asyncio.CancelledError:
1725n/a res = 'cancelled'
1726n/a else:
1727n/a res = None
1728n/a finally:
1729n/a self.loop.stop()
1730n/a return res
1731n/a
1732n/a start = time.monotonic()
1733n/a t = asyncio.Task(main(), loop=self.loop)
1734n/a self.loop.run_forever()
1735n/a elapsed = time.monotonic() - start
1736n/a
1737n/a self.assertLess(elapsed, 0.1)
1738n/a self.assertEqual(t.result(), 'cancelled')
1739n/a self.assertRaises(asyncio.CancelledError, f.result)
1740n/a if ov is not None:
1741n/a self.assertFalse(ov.pending)
1742n/a self.loop._stop_serving(r)
1743n/a
1744n/a r.close()
1745n/a w.close()
1746n/a
1747n/a def test_timeout_rounding(self):
1748n/a def _run_once():
1749n/a self.loop._run_once_counter += 1
1750n/a orig_run_once()
1751n/a
1752n/a orig_run_once = self.loop._run_once
1753n/a self.loop._run_once_counter = 0
1754n/a self.loop._run_once = _run_once
1755n/a
1756n/a @asyncio.coroutine
1757n/a def wait():
1758n/a loop = self.loop
1759n/a yield from asyncio.sleep(1e-2, loop=loop)
1760n/a yield from asyncio.sleep(1e-4, loop=loop)
1761n/a yield from asyncio.sleep(1e-6, loop=loop)
1762n/a yield from asyncio.sleep(1e-8, loop=loop)
1763n/a yield from asyncio.sleep(1e-10, loop=loop)
1764n/a
1765n/a self.loop.run_until_complete(wait())
1766n/a # The ideal number of call is 12, but on some platforms, the selector
1767n/a # may sleep at little bit less than timeout depending on the resolution
1768n/a # of the clock used by the kernel. Tolerate a few useless calls on
1769n/a # these platforms.
1770n/a self.assertLessEqual(self.loop._run_once_counter, 20,
1771n/a {'clock_resolution': self.loop._clock_resolution,
1772n/a 'selector': self.loop._selector.__class__.__name__})
1773n/a
1774n/a def test_remove_fds_after_closing(self):
1775n/a loop = self.create_event_loop()
1776n/a callback = lambda: None
1777n/a r, w = test_utils.socketpair()
1778n/a self.addCleanup(r.close)
1779n/a self.addCleanup(w.close)
1780n/a loop.add_reader(r, callback)
1781n/a loop.add_writer(w, callback)
1782n/a loop.close()
1783n/a self.assertFalse(loop.remove_reader(r))
1784n/a self.assertFalse(loop.remove_writer(w))
1785n/a
1786n/a def test_add_fds_after_closing(self):
1787n/a loop = self.create_event_loop()
1788n/a callback = lambda: None
1789n/a r, w = test_utils.socketpair()
1790n/a self.addCleanup(r.close)
1791n/a self.addCleanup(w.close)
1792n/a loop.close()
1793n/a with self.assertRaises(RuntimeError):
1794n/a loop.add_reader(r, callback)
1795n/a with self.assertRaises(RuntimeError):
1796n/a loop.add_writer(w, callback)
1797n/a
1798n/a def test_close_running_event_loop(self):
1799n/a @asyncio.coroutine
1800n/a def close_loop(loop):
1801n/a self.loop.close()
1802n/a
1803n/a coro = close_loop(self.loop)
1804n/a with self.assertRaises(RuntimeError):
1805n/a self.loop.run_until_complete(coro)
1806n/a
1807n/a def test_close(self):
1808n/a self.loop.close()
1809n/a
1810n/a @asyncio.coroutine
1811n/a def test():
1812n/a pass
1813n/a
1814n/a func = lambda: False
1815n/a coro = test()
1816n/a self.addCleanup(coro.close)
1817n/a
1818n/a # operation blocked when the loop is closed
1819n/a with self.assertRaises(RuntimeError):
1820n/a self.loop.run_forever()
1821n/a with self.assertRaises(RuntimeError):
1822n/a fut = asyncio.Future(loop=self.loop)
1823n/a self.loop.run_until_complete(fut)
1824n/a with self.assertRaises(RuntimeError):
1825n/a self.loop.call_soon(func)
1826n/a with self.assertRaises(RuntimeError):
1827n/a self.loop.call_soon_threadsafe(func)
1828n/a with self.assertRaises(RuntimeError):
1829n/a self.loop.call_later(1.0, func)
1830n/a with self.assertRaises(RuntimeError):
1831n/a self.loop.call_at(self.loop.time() + .0, func)
1832n/a with self.assertRaises(RuntimeError):
1833n/a self.loop.run_in_executor(None, func)
1834n/a with self.assertRaises(RuntimeError):
1835n/a self.loop.create_task(coro)
1836n/a with self.assertRaises(RuntimeError):
1837n/a self.loop.add_signal_handler(signal.SIGTERM, func)
1838n/a
1839n/a
1840n/aclass SubprocessTestsMixin:
1841n/a
1842n/a def check_terminated(self, returncode):
1843n/a if sys.platform == 'win32':
1844n/a self.assertIsInstance(returncode, int)
1845n/a # expect 1 but sometimes get 0
1846n/a else:
1847n/a self.assertEqual(-signal.SIGTERM, returncode)
1848n/a
1849n/a def check_killed(self, returncode):
1850n/a if sys.platform == 'win32':
1851n/a self.assertIsInstance(returncode, int)
1852n/a # expect 1 but sometimes get 0
1853n/a else:
1854n/a self.assertEqual(-signal.SIGKILL, returncode)
1855n/a
1856n/a def test_subprocess_exec(self):
1857n/a prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1858n/a
1859n/a connect = self.loop.subprocess_exec(
1860n/a functools.partial(MySubprocessProtocol, self.loop),
1861n/a sys.executable, prog)
1862n/a transp, proto = self.loop.run_until_complete(connect)
1863n/a self.assertIsInstance(proto, MySubprocessProtocol)
1864n/a self.loop.run_until_complete(proto.connected)
1865n/a self.assertEqual('CONNECTED', proto.state)
1866n/a
1867n/a stdin = transp.get_pipe_transport(0)
1868n/a stdin.write(b'Python The Winner')
1869n/a self.loop.run_until_complete(proto.got_data[1].wait())
1870n/a with test_utils.disable_logger():
1871n/a transp.close()
1872n/a self.loop.run_until_complete(proto.completed)
1873n/a self.check_killed(proto.returncode)
1874n/a self.assertEqual(b'Python The Winner', proto.data[1])
1875n/a
1876n/a def test_subprocess_interactive(self):
1877n/a prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1878n/a
1879n/a connect = self.loop.subprocess_exec(
1880n/a functools.partial(MySubprocessProtocol, self.loop),
1881n/a sys.executable, prog)
1882n/a transp, proto = self.loop.run_until_complete(connect)
1883n/a self.assertIsInstance(proto, MySubprocessProtocol)
1884n/a self.loop.run_until_complete(proto.connected)
1885n/a self.assertEqual('CONNECTED', proto.state)
1886n/a
1887n/a stdin = transp.get_pipe_transport(0)
1888n/a stdin.write(b'Python ')
1889n/a self.loop.run_until_complete(proto.got_data[1].wait())
1890n/a proto.got_data[1].clear()
1891n/a self.assertEqual(b'Python ', proto.data[1])
1892n/a
1893n/a stdin.write(b'The Winner')
1894n/a self.loop.run_until_complete(proto.got_data[1].wait())
1895n/a self.assertEqual(b'Python The Winner', proto.data[1])
1896n/a
1897n/a with test_utils.disable_logger():
1898n/a transp.close()
1899n/a self.loop.run_until_complete(proto.completed)
1900n/a self.check_killed(proto.returncode)
1901n/a
1902n/a def test_subprocess_shell(self):
1903n/a connect = self.loop.subprocess_shell(
1904n/a functools.partial(MySubprocessProtocol, self.loop),
1905n/a 'echo Python')
1906n/a transp, proto = self.loop.run_until_complete(connect)
1907n/a self.assertIsInstance(proto, MySubprocessProtocol)
1908n/a self.loop.run_until_complete(proto.connected)
1909n/a
1910n/a transp.get_pipe_transport(0).close()
1911n/a self.loop.run_until_complete(proto.completed)
1912n/a self.assertEqual(0, proto.returncode)
1913n/a self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1914n/a self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1915n/a self.assertEqual(proto.data[2], b'')
1916n/a transp.close()
1917n/a
1918n/a def test_subprocess_exitcode(self):
1919n/a connect = self.loop.subprocess_shell(
1920n/a functools.partial(MySubprocessProtocol, self.loop),
1921n/a 'exit 7', stdin=None, stdout=None, stderr=None)
1922n/a transp, proto = self.loop.run_until_complete(connect)
1923n/a self.assertIsInstance(proto, MySubprocessProtocol)
1924n/a self.loop.run_until_complete(proto.completed)
1925n/a self.assertEqual(7, proto.returncode)
1926n/a transp.close()
1927n/a
1928n/a def test_subprocess_close_after_finish(self):
1929n/a connect = self.loop.subprocess_shell(
1930n/a functools.partial(MySubprocessProtocol, self.loop),
1931n/a 'exit 7', stdin=None, stdout=None, stderr=None)
1932n/a transp, proto = self.loop.run_until_complete(connect)
1933n/a self.assertIsInstance(proto, MySubprocessProtocol)
1934n/a self.assertIsNone(transp.get_pipe_transport(0))
1935n/a self.assertIsNone(transp.get_pipe_transport(1))
1936n/a self.assertIsNone(transp.get_pipe_transport(2))
1937n/a self.loop.run_until_complete(proto.completed)
1938n/a self.assertEqual(7, proto.returncode)
1939n/a self.assertIsNone(transp.close())
1940n/a
1941n/a def test_subprocess_kill(self):
1942n/a prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1943n/a
1944n/a connect = self.loop.subprocess_exec(
1945n/a functools.partial(MySubprocessProtocol, self.loop),
1946n/a sys.executable, prog)
1947n/a transp, proto = self.loop.run_until_complete(connect)
1948n/a self.assertIsInstance(proto, MySubprocessProtocol)
1949n/a self.loop.run_until_complete(proto.connected)
1950n/a
1951n/a transp.kill()
1952n/a self.loop.run_until_complete(proto.completed)
1953n/a self.check_killed(proto.returncode)
1954n/a transp.close()
1955n/a
1956n/a def test_subprocess_terminate(self):
1957n/a prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1958n/a
1959n/a connect = self.loop.subprocess_exec(
1960n/a functools.partial(MySubprocessProtocol, self.loop),
1961n/a sys.executable, prog)
1962n/a transp, proto = self.loop.run_until_complete(connect)
1963n/a self.assertIsInstance(proto, MySubprocessProtocol)
1964n/a self.loop.run_until_complete(proto.connected)
1965n/a
1966n/a transp.terminate()
1967n/a self.loop.run_until_complete(proto.completed)
1968n/a self.check_terminated(proto.returncode)
1969n/a transp.close()
1970n/a
1971n/a @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1972n/a def test_subprocess_send_signal(self):
1973n/a prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1974n/a
1975n/a connect = self.loop.subprocess_exec(
1976n/a functools.partial(MySubprocessProtocol, self.loop),
1977n/a sys.executable, prog)
1978n/a transp, proto = self.loop.run_until_complete(connect)
1979n/a self.assertIsInstance(proto, MySubprocessProtocol)
1980n/a self.loop.run_until_complete(proto.connected)
1981n/a
1982n/a transp.send_signal(signal.SIGHUP)
1983n/a self.loop.run_until_complete(proto.completed)
1984n/a self.assertEqual(-signal.SIGHUP, proto.returncode)
1985n/a transp.close()
1986n/a
1987n/a def test_subprocess_stderr(self):
1988n/a prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1989n/a
1990n/a connect = self.loop.subprocess_exec(
1991n/a functools.partial(MySubprocessProtocol, self.loop),
1992n/a sys.executable, prog)
1993n/a transp, proto = self.loop.run_until_complete(connect)
1994n/a self.assertIsInstance(proto, MySubprocessProtocol)
1995n/a self.loop.run_until_complete(proto.connected)
1996n/a
1997n/a stdin = transp.get_pipe_transport(0)
1998n/a stdin.write(b'test')
1999n/a
2000n/a self.loop.run_until_complete(proto.completed)
2001n/a
2002n/a transp.close()
2003n/a self.assertEqual(b'OUT:test', proto.data[1])
2004n/a self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
2005n/a self.assertEqual(0, proto.returncode)
2006n/a
2007n/a def test_subprocess_stderr_redirect_to_stdout(self):
2008n/a prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
2009n/a
2010n/a connect = self.loop.subprocess_exec(
2011n/a functools.partial(MySubprocessProtocol, self.loop),
2012n/a sys.executable, prog, stderr=subprocess.STDOUT)
2013n/a transp, proto = self.loop.run_until_complete(connect)
2014n/a self.assertIsInstance(proto, MySubprocessProtocol)
2015n/a self.loop.run_until_complete(proto.connected)
2016n/a
2017n/a stdin = transp.get_pipe_transport(0)
2018n/a self.assertIsNotNone(transp.get_pipe_transport(1))
2019n/a self.assertIsNone(transp.get_pipe_transport(2))
2020n/a
2021n/a stdin.write(b'test')
2022n/a self.loop.run_until_complete(proto.completed)
2023n/a self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
2024n/a proto.data[1])
2025n/a self.assertEqual(b'', proto.data[2])
2026n/a
2027n/a transp.close()
2028n/a self.assertEqual(0, proto.returncode)
2029n/a
2030n/a def test_subprocess_close_client_stream(self):
2031n/a prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
2032n/a
2033n/a connect = self.loop.subprocess_exec(
2034n/a functools.partial(MySubprocessProtocol, self.loop),
2035n/a sys.executable, prog)
2036n/a transp, proto = self.loop.run_until_complete(connect)
2037n/a self.assertIsInstance(proto, MySubprocessProtocol)
2038n/a self.loop.run_until_complete(proto.connected)
2039n/a
2040n/a stdin = transp.get_pipe_transport(0)
2041n/a stdout = transp.get_pipe_transport(1)
2042n/a stdin.write(b'test')
2043n/a self.loop.run_until_complete(proto.got_data[1].wait())
2044n/a self.assertEqual(b'OUT:test', proto.data[1])
2045n/a
2046n/a stdout.close()
2047n/a self.loop.run_until_complete(proto.disconnects[1])
2048n/a stdin.write(b'xxx')
2049n/a self.loop.run_until_complete(proto.got_data[2].wait())
2050n/a if sys.platform != 'win32':
2051n/a self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
2052n/a else:
2053n/a # After closing the read-end of a pipe, writing to the
2054n/a # write-end using os.write() fails with errno==EINVAL and
2055n/a # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
2056n/a # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
2057n/a self.assertEqual(b'ERR:OSError', proto.data[2])
2058n/a with test_utils.disable_logger():
2059n/a transp.close()
2060n/a self.loop.run_until_complete(proto.completed)
2061n/a self.check_killed(proto.returncode)
2062n/a
2063n/a def test_subprocess_wait_no_same_group(self):
2064n/a # start the new process in a new session
2065n/a connect = self.loop.subprocess_shell(
2066n/a functools.partial(MySubprocessProtocol, self.loop),
2067n/a 'exit 7', stdin=None, stdout=None, stderr=None,
2068n/a start_new_session=True)
2069n/a _, proto = yield self.loop.run_until_complete(connect)
2070n/a self.assertIsInstance(proto, MySubprocessProtocol)
2071n/a self.loop.run_until_complete(proto.completed)
2072n/a self.assertEqual(7, proto.returncode)
2073n/a
2074n/a def test_subprocess_exec_invalid_args(self):
2075n/a @asyncio.coroutine
2076n/a def connect(**kwds):
2077n/a yield from self.loop.subprocess_exec(
2078n/a asyncio.SubprocessProtocol,
2079n/a 'pwd', **kwds)
2080n/a
2081n/a with self.assertRaises(ValueError):
2082n/a self.loop.run_until_complete(connect(universal_newlines=True))
2083n/a with self.assertRaises(ValueError):
2084n/a self.loop.run_until_complete(connect(bufsize=4096))
2085n/a with self.assertRaises(ValueError):
2086n/a self.loop.run_until_complete(connect(shell=True))
2087n/a
2088n/a def test_subprocess_shell_invalid_args(self):
2089n/a @asyncio.coroutine
2090n/a def connect(cmd=None, **kwds):
2091n/a if not cmd:
2092n/a cmd = 'pwd'
2093n/a yield from self.loop.subprocess_shell(
2094n/a asyncio.SubprocessProtocol,
2095n/a cmd, **kwds)
2096n/a
2097n/a with self.assertRaises(ValueError):
2098n/a self.loop.run_until_complete(connect(['ls', '-l']))
2099n/a with self.assertRaises(ValueError):
2100n/a self.loop.run_until_complete(connect(universal_newlines=True))
2101n/a with self.assertRaises(ValueError):
2102n/a self.loop.run_until_complete(connect(bufsize=4096))
2103n/a with self.assertRaises(ValueError):
2104n/a self.loop.run_until_complete(connect(shell=False))
2105n/a
2106n/a
2107n/aif sys.platform == 'win32':
2108n/a
2109n/a class SelectEventLoopTests(EventLoopTestsMixin, test_utils.TestCase):
2110n/a
2111n/a def create_event_loop(self):
2112n/a return asyncio.SelectorEventLoop()
2113n/a
2114n/a class ProactorEventLoopTests(EventLoopTestsMixin,
2115n/a SubprocessTestsMixin,
2116n/a test_utils.TestCase):
2117n/a
2118n/a def create_event_loop(self):
2119n/a return asyncio.ProactorEventLoop()
2120n/a
2121n/a if not sslproto._is_sslproto_available():
2122n/a def test_create_ssl_connection(self):
2123n/a raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
2124n/a
2125n/a def test_create_server_ssl(self):
2126n/a raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
2127n/a
2128n/a def test_create_server_ssl_verify_failed(self):
2129n/a raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
2130n/a
2131n/a def test_create_server_ssl_match_failed(self):
2132n/a raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
2133n/a
2134n/a def test_create_server_ssl_verified(self):
2135n/a raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
2136n/a
2137n/a def test_legacy_create_ssl_connection(self):
2138n/a raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
2139n/a
2140n/a def test_legacy_create_server_ssl(self):
2141n/a raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
2142n/a
2143n/a def test_legacy_create_server_ssl_verify_failed(self):
2144n/a raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
2145n/a
2146n/a def test_legacy_create_server_ssl_match_failed(self):
2147n/a raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
2148n/a
2149n/a def test_legacy_create_server_ssl_verified(self):
2150n/a raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
2151n/a
2152n/a def test_reader_callback(self):
2153n/a raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2154n/a
2155n/a def test_reader_callback_cancel(self):
2156n/a raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2157n/a
2158n/a def test_writer_callback(self):
2159n/a raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2160n/a
2161n/a def test_writer_callback_cancel(self):
2162n/a raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2163n/a
2164n/a def test_create_datagram_endpoint(self):
2165n/a raise unittest.SkipTest(
2166n/a "IocpEventLoop does not have create_datagram_endpoint()")
2167n/a
2168n/a def test_remove_fds_after_closing(self):
2169n/a raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2170n/aelse:
2171n/a from asyncio import selectors
2172n/a
2173n/a class UnixEventLoopTestsMixin(EventLoopTestsMixin):
2174n/a def setUp(self):
2175n/a super().setUp()
2176n/a watcher = asyncio.SafeChildWatcher()
2177n/a watcher.attach_loop(self.loop)
2178n/a asyncio.set_child_watcher(watcher)
2179n/a
2180n/a def tearDown(self):
2181n/a asyncio.set_child_watcher(None)
2182n/a super().tearDown()
2183n/a
2184n/a if hasattr(selectors, 'KqueueSelector'):
2185n/a class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2186n/a SubprocessTestsMixin,
2187n/a test_utils.TestCase):
2188n/a
2189n/a def create_event_loop(self):
2190n/a return asyncio.SelectorEventLoop(
2191n/a selectors.KqueueSelector())
2192n/a
2193n/a # kqueue doesn't support character devices (PTY) on Mac OS X older
2194n/a # than 10.9 (Maverick)
2195n/a @support.requires_mac_ver(10, 9)
2196n/a # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2197n/a # hangs on OpenBSD 5.5
2198n/a @unittest.skipIf(sys.platform.startswith('openbsd'),
2199n/a 'test hangs on OpenBSD')
2200n/a def test_read_pty_output(self):
2201n/a super().test_read_pty_output()
2202n/a
2203n/a # kqueue doesn't support character devices (PTY) on Mac OS X older
2204n/a # than 10.9 (Maverick)
2205n/a @support.requires_mac_ver(10, 9)
2206n/a def test_write_pty(self):
2207n/a super().test_write_pty()
2208n/a
2209n/a if hasattr(selectors, 'EpollSelector'):
2210n/a class EPollEventLoopTests(UnixEventLoopTestsMixin,
2211n/a SubprocessTestsMixin,
2212n/a test_utils.TestCase):
2213n/a
2214n/a def create_event_loop(self):
2215n/a return asyncio.SelectorEventLoop(selectors.EpollSelector())
2216n/a
2217n/a if hasattr(selectors, 'PollSelector'):
2218n/a class PollEventLoopTests(UnixEventLoopTestsMixin,
2219n/a SubprocessTestsMixin,
2220n/a test_utils.TestCase):
2221n/a
2222n/a def create_event_loop(self):
2223n/a return asyncio.SelectorEventLoop(selectors.PollSelector())
2224n/a
2225n/a # Should always exist.
2226n/a class SelectEventLoopTests(UnixEventLoopTestsMixin,
2227n/a SubprocessTestsMixin,
2228n/a test_utils.TestCase):
2229n/a
2230n/a def create_event_loop(self):
2231n/a return asyncio.SelectorEventLoop(selectors.SelectSelector())
2232n/a
2233n/a
2234n/adef noop(*args, **kwargs):
2235n/a pass
2236n/a
2237n/a
2238n/aclass HandleTests(test_utils.TestCase):
2239n/a
2240n/a def setUp(self):
2241n/a super().setUp()
2242n/a self.loop = mock.Mock()
2243n/a self.loop.get_debug.return_value = True
2244n/a
2245n/a def test_handle(self):
2246n/a def callback(*args):
2247n/a return args
2248n/a
2249n/a args = ()
2250n/a h = asyncio.Handle(callback, args, self.loop)
2251n/a self.assertIs(h._callback, callback)
2252n/a self.assertIs(h._args, args)
2253n/a self.assertFalse(h._cancelled)
2254n/a
2255n/a h.cancel()
2256n/a self.assertTrue(h._cancelled)
2257n/a
2258n/a def test_callback_with_exception(self):
2259n/a def callback():
2260n/a raise ValueError()
2261n/a
2262n/a self.loop = mock.Mock()
2263n/a self.loop.call_exception_handler = mock.Mock()
2264n/a
2265n/a h = asyncio.Handle(callback, (), self.loop)
2266n/a h._run()
2267n/a
2268n/a self.loop.call_exception_handler.assert_called_with({
2269n/a 'message': test_utils.MockPattern('Exception in callback.*'),
2270n/a 'exception': mock.ANY,
2271n/a 'handle': h,
2272n/a 'source_traceback': h._source_traceback,
2273n/a })
2274n/a
2275n/a def test_handle_weakref(self):
2276n/a wd = weakref.WeakValueDictionary()
2277n/a h = asyncio.Handle(lambda: None, (), self.loop)
2278n/a wd['h'] = h # Would fail without __weakref__ slot.
2279n/a
2280n/a def test_handle_repr(self):
2281n/a self.loop.get_debug.return_value = False
2282n/a
2283n/a # simple function
2284n/a h = asyncio.Handle(noop, (1, 2), self.loop)
2285n/a filename, lineno = test_utils.get_function_source(noop)
2286n/a self.assertEqual(repr(h),
2287n/a '<Handle noop(1, 2) at %s:%s>'
2288n/a % (filename, lineno))
2289n/a
2290n/a # cancelled handle
2291n/a h.cancel()
2292n/a self.assertEqual(repr(h),
2293n/a '<Handle cancelled>')
2294n/a
2295n/a # decorated function
2296n/a cb = asyncio.coroutine(noop)
2297n/a h = asyncio.Handle(cb, (), self.loop)
2298n/a self.assertEqual(repr(h),
2299n/a '<Handle noop() at %s:%s>'
2300n/a % (filename, lineno))
2301n/a
2302n/a # partial function
2303n/a cb = functools.partial(noop, 1, 2)
2304n/a h = asyncio.Handle(cb, (3,), self.loop)
2305n/a regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2306n/a % (re.escape(filename), lineno))
2307n/a self.assertRegex(repr(h), regex)
2308n/a
2309n/a # partial function with keyword args
2310n/a cb = functools.partial(noop, x=1)
2311n/a h = asyncio.Handle(cb, (2, 3), self.loop)
2312n/a regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2313n/a % (re.escape(filename), lineno))
2314n/a self.assertRegex(repr(h), regex)
2315n/a
2316n/a # partial method
2317n/a if sys.version_info >= (3, 4):
2318n/a method = HandleTests.test_handle_repr
2319n/a cb = functools.partialmethod(method)
2320n/a filename, lineno = test_utils.get_function_source(method)
2321n/a h = asyncio.Handle(cb, (), self.loop)
2322n/a
2323n/a cb_regex = r'<function HandleTests.test_handle_repr .*>'
2324n/a cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
2325n/a regex = (r'^<Handle %s at %s:%s>$'
2326n/a % (cb_regex, re.escape(filename), lineno))
2327n/a self.assertRegex(repr(h), regex)
2328n/a
2329n/a def test_handle_repr_debug(self):
2330n/a self.loop.get_debug.return_value = True
2331n/a
2332n/a # simple function
2333n/a create_filename = __file__
2334n/a create_lineno = sys._getframe().f_lineno + 1
2335n/a h = asyncio.Handle(noop, (1, 2), self.loop)
2336n/a filename, lineno = test_utils.get_function_source(noop)
2337n/a self.assertEqual(repr(h),
2338n/a '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2339n/a % (filename, lineno, create_filename, create_lineno))
2340n/a
2341n/a # cancelled handle
2342n/a h.cancel()
2343n/a self.assertEqual(
2344n/a repr(h),
2345n/a '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2346n/a % (filename, lineno, create_filename, create_lineno))
2347n/a
2348n/a # double cancellation won't overwrite _repr
2349n/a h.cancel()
2350n/a self.assertEqual(
2351n/a repr(h),
2352n/a '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2353n/a % (filename, lineno, create_filename, create_lineno))
2354n/a
2355n/a def test_handle_source_traceback(self):
2356n/a loop = asyncio.get_event_loop_policy().new_event_loop()
2357n/a loop.set_debug(True)
2358n/a self.set_event_loop(loop)
2359n/a
2360n/a def check_source_traceback(h):
2361n/a lineno = sys._getframe(1).f_lineno - 1
2362n/a self.assertIsInstance(h._source_traceback, list)
2363n/a self.assertEqual(h._source_traceback[-1][:3],
2364n/a (__file__,
2365n/a lineno,
2366n/a 'test_handle_source_traceback'))
2367n/a
2368n/a # call_soon
2369n/a h = loop.call_soon(noop)
2370n/a check_source_traceback(h)
2371n/a
2372n/a # call_soon_threadsafe
2373n/a h = loop.call_soon_threadsafe(noop)
2374n/a check_source_traceback(h)
2375n/a
2376n/a # call_later
2377n/a h = loop.call_later(0, noop)
2378n/a check_source_traceback(h)
2379n/a
2380n/a # call_at
2381n/a h = loop.call_later(0, noop)
2382n/a check_source_traceback(h)
2383n/a
2384n/a @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2385n/a 'No collections.abc.Coroutine')
2386n/a def test_coroutine_like_object_debug_formatting(self):
2387n/a # Test that asyncio can format coroutines that are instances of
2388n/a # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2389n/a # (such as ones compiled with Cython).
2390n/a
2391n/a class Coro:
2392n/a def send(self, v):
2393n/a pass
2394n/a
2395n/a def throw(self, *exc):
2396n/a pass
2397n/a
2398n/a def close(self):
2399n/a pass
2400n/a
2401n/a def __await__(self):
2402n/a pass
2403n/a
2404n/a coro = Coro()
2405n/a coro.__name__ = 'AAA'
2406n/a self.assertTrue(asyncio.iscoroutine(coro))
2407n/a self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2408n/a
2409n/a coro.__qualname__ = 'BBB'
2410n/a self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2411n/a
2412n/a coro.cr_running = True
2413n/a self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2414n/a
2415n/a coro = Coro()
2416n/a # Some coroutines might not have '__name__', such as
2417n/a # built-in async_gen.asend().
2418n/a self.assertEqual(coroutines._format_coroutine(coro), 'Coro()')
2419n/a
2420n/a
2421n/aclass TimerTests(unittest.TestCase):
2422n/a
2423n/a def setUp(self):
2424n/a super().setUp()
2425n/a self.loop = mock.Mock()
2426n/a
2427n/a def test_hash(self):
2428n/a when = time.monotonic()
2429n/a h = asyncio.TimerHandle(when, lambda: False, (),
2430n/a mock.Mock())
2431n/a self.assertEqual(hash(h), hash(when))
2432n/a
2433n/a def test_timer(self):
2434n/a def callback(*args):
2435n/a return args
2436n/a
2437n/a args = (1, 2, 3)
2438n/a when = time.monotonic()
2439n/a h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2440n/a self.assertIs(h._callback, callback)
2441n/a self.assertIs(h._args, args)
2442n/a self.assertFalse(h._cancelled)
2443n/a
2444n/a # cancel
2445n/a h.cancel()
2446n/a self.assertTrue(h._cancelled)
2447n/a self.assertIsNone(h._callback)
2448n/a self.assertIsNone(h._args)
2449n/a
2450n/a # when cannot be None
2451n/a self.assertRaises(AssertionError,
2452n/a asyncio.TimerHandle, None, callback, args,
2453n/a self.loop)
2454n/a
2455n/a def test_timer_repr(self):
2456n/a self.loop.get_debug.return_value = False
2457n/a
2458n/a # simple function
2459n/a h = asyncio.TimerHandle(123, noop, (), self.loop)
2460n/a src = test_utils.get_function_source(noop)
2461n/a self.assertEqual(repr(h),
2462n/a '<TimerHandle when=123 noop() at %s:%s>' % src)
2463n/a
2464n/a # cancelled handle
2465n/a h.cancel()
2466n/a self.assertEqual(repr(h),
2467n/a '<TimerHandle cancelled when=123>')
2468n/a
2469n/a def test_timer_repr_debug(self):
2470n/a self.loop.get_debug.return_value = True
2471n/a
2472n/a # simple function
2473n/a create_filename = __file__
2474n/a create_lineno = sys._getframe().f_lineno + 1
2475n/a h = asyncio.TimerHandle(123, noop, (), self.loop)
2476n/a filename, lineno = test_utils.get_function_source(noop)
2477n/a self.assertEqual(repr(h),
2478n/a '<TimerHandle when=123 noop() '
2479n/a 'at %s:%s created at %s:%s>'
2480n/a % (filename, lineno, create_filename, create_lineno))
2481n/a
2482n/a # cancelled handle
2483n/a h.cancel()
2484n/a self.assertEqual(repr(h),
2485n/a '<TimerHandle cancelled when=123 noop() '
2486n/a 'at %s:%s created at %s:%s>'
2487n/a % (filename, lineno, create_filename, create_lineno))
2488n/a
2489n/a
2490n/a def test_timer_comparison(self):
2491n/a def callback(*args):
2492n/a return args
2493n/a
2494n/a when = time.monotonic()
2495n/a
2496n/a h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2497n/a h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2498n/a # TODO: Use assertLess etc.
2499n/a self.assertFalse(h1 < h2)
2500n/a self.assertFalse(h2 < h1)
2501n/a self.assertTrue(h1 <= h2)
2502n/a self.assertTrue(h2 <= h1)
2503n/a self.assertFalse(h1 > h2)
2504n/a self.assertFalse(h2 > h1)
2505n/a self.assertTrue(h1 >= h2)
2506n/a self.assertTrue(h2 >= h1)
2507n/a self.assertTrue(h1 == h2)
2508n/a self.assertFalse(h1 != h2)
2509n/a
2510n/a h2.cancel()
2511n/a self.assertFalse(h1 == h2)
2512n/a
2513n/a h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2514n/a h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2515n/a self.assertTrue(h1 < h2)
2516n/a self.assertFalse(h2 < h1)
2517n/a self.assertTrue(h1 <= h2)
2518n/a self.assertFalse(h2 <= h1)
2519n/a self.assertFalse(h1 > h2)
2520n/a self.assertTrue(h2 > h1)
2521n/a self.assertFalse(h1 >= h2)
2522n/a self.assertTrue(h2 >= h1)
2523n/a self.assertFalse(h1 == h2)
2524n/a self.assertTrue(h1 != h2)
2525n/a
2526n/a h3 = asyncio.Handle(callback, (), self.loop)
2527n/a self.assertIs(NotImplemented, h1.__eq__(h3))
2528n/a self.assertIs(NotImplemented, h1.__ne__(h3))
2529n/a
2530n/a
2531n/aclass AbstractEventLoopTests(unittest.TestCase):
2532n/a
2533n/a def test_not_implemented(self):
2534n/a f = mock.Mock()
2535n/a loop = asyncio.AbstractEventLoop()
2536n/a self.assertRaises(
2537n/a NotImplementedError, loop.run_forever)
2538n/a self.assertRaises(
2539n/a NotImplementedError, loop.run_until_complete, None)
2540n/a self.assertRaises(
2541n/a NotImplementedError, loop.stop)
2542n/a self.assertRaises(
2543n/a NotImplementedError, loop.is_running)
2544n/a self.assertRaises(
2545n/a NotImplementedError, loop.is_closed)
2546n/a self.assertRaises(
2547n/a NotImplementedError, loop.close)
2548n/a self.assertRaises(
2549n/a NotImplementedError, loop.create_task, None)
2550n/a self.assertRaises(
2551n/a NotImplementedError, loop.call_later, None, None)
2552n/a self.assertRaises(
2553n/a NotImplementedError, loop.call_at, f, f)
2554n/a self.assertRaises(
2555n/a NotImplementedError, loop.call_soon, None)
2556n/a self.assertRaises(
2557n/a NotImplementedError, loop.time)
2558n/a self.assertRaises(
2559n/a NotImplementedError, loop.call_soon_threadsafe, None)
2560n/a self.assertRaises(
2561n/a NotImplementedError, loop.run_in_executor, f, f)
2562n/a self.assertRaises(
2563n/a NotImplementedError, loop.set_default_executor, f)
2564n/a self.assertRaises(
2565n/a NotImplementedError, loop.getaddrinfo, 'localhost', 8080)
2566n/a self.assertRaises(
2567n/a NotImplementedError, loop.getnameinfo, ('localhost', 8080))
2568n/a self.assertRaises(
2569n/a NotImplementedError, loop.create_connection, f)
2570n/a self.assertRaises(
2571n/a NotImplementedError, loop.create_server, f)
2572n/a self.assertRaises(
2573n/a NotImplementedError, loop.create_datagram_endpoint, f)
2574n/a self.assertRaises(
2575n/a NotImplementedError, loop.add_reader, 1, f)
2576n/a self.assertRaises(
2577n/a NotImplementedError, loop.remove_reader, 1)
2578n/a self.assertRaises(
2579n/a NotImplementedError, loop.add_writer, 1, f)
2580n/a self.assertRaises(
2581n/a NotImplementedError, loop.remove_writer, 1)
2582n/a self.assertRaises(
2583n/a NotImplementedError, loop.sock_recv, f, 10)
2584n/a self.assertRaises(
2585n/a NotImplementedError, loop.sock_sendall, f, 10)
2586n/a self.assertRaises(
2587n/a NotImplementedError, loop.sock_connect, f, f)
2588n/a self.assertRaises(
2589n/a NotImplementedError, loop.sock_accept, f)
2590n/a self.assertRaises(
2591n/a NotImplementedError, loop.add_signal_handler, 1, f)
2592n/a self.assertRaises(
2593n/a NotImplementedError, loop.remove_signal_handler, 1)
2594n/a self.assertRaises(
2595n/a NotImplementedError, loop.remove_signal_handler, 1)
2596n/a self.assertRaises(
2597n/a NotImplementedError, loop.connect_read_pipe, f,
2598n/a mock.sentinel.pipe)
2599n/a self.assertRaises(
2600n/a NotImplementedError, loop.connect_write_pipe, f,
2601n/a mock.sentinel.pipe)
2602n/a self.assertRaises(
2603n/a NotImplementedError, loop.subprocess_shell, f,
2604n/a mock.sentinel)
2605n/a self.assertRaises(
2606n/a NotImplementedError, loop.subprocess_exec, f)
2607n/a self.assertRaises(
2608n/a NotImplementedError, loop.set_exception_handler, f)
2609n/a self.assertRaises(
2610n/a NotImplementedError, loop.default_exception_handler, f)
2611n/a self.assertRaises(
2612n/a NotImplementedError, loop.call_exception_handler, f)
2613n/a self.assertRaises(
2614n/a NotImplementedError, loop.get_debug)
2615n/a self.assertRaises(
2616n/a NotImplementedError, loop.set_debug, f)
2617n/a
2618n/a
2619n/aclass ProtocolsAbsTests(unittest.TestCase):
2620n/a
2621n/a def test_empty(self):
2622n/a f = mock.Mock()
2623n/a p = asyncio.Protocol()
2624n/a self.assertIsNone(p.connection_made(f))
2625n/a self.assertIsNone(p.connection_lost(f))
2626n/a self.assertIsNone(p.data_received(f))
2627n/a self.assertIsNone(p.eof_received())
2628n/a
2629n/a dp = asyncio.DatagramProtocol()
2630n/a self.assertIsNone(dp.connection_made(f))
2631n/a self.assertIsNone(dp.connection_lost(f))
2632n/a self.assertIsNone(dp.error_received(f))
2633n/a self.assertIsNone(dp.datagram_received(f, f))
2634n/a
2635n/a sp = asyncio.SubprocessProtocol()
2636n/a self.assertIsNone(sp.connection_made(f))
2637n/a self.assertIsNone(sp.connection_lost(f))
2638n/a self.assertIsNone(sp.pipe_data_received(1, f))
2639n/a self.assertIsNone(sp.pipe_connection_lost(1, f))
2640n/a self.assertIsNone(sp.process_exited())
2641n/a
2642n/a
2643n/aclass PolicyTests(unittest.TestCase):
2644n/a
2645n/a def test_event_loop_policy(self):
2646n/a policy = asyncio.AbstractEventLoopPolicy()
2647n/a self.assertRaises(NotImplementedError, policy.get_event_loop)
2648n/a self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2649n/a self.assertRaises(NotImplementedError, policy.new_event_loop)
2650n/a self.assertRaises(NotImplementedError, policy.get_child_watcher)
2651n/a self.assertRaises(NotImplementedError, policy.set_child_watcher,
2652n/a object())
2653n/a
2654n/a def test_get_event_loop(self):
2655n/a policy = asyncio.DefaultEventLoopPolicy()
2656n/a self.assertIsNone(policy._local._loop)
2657n/a
2658n/a loop = policy.get_event_loop()
2659n/a self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2660n/a
2661n/a self.assertIs(policy._local._loop, loop)
2662n/a self.assertIs(loop, policy.get_event_loop())
2663n/a loop.close()
2664n/a
2665n/a def test_get_event_loop_calls_set_event_loop(self):
2666n/a policy = asyncio.DefaultEventLoopPolicy()
2667n/a
2668n/a with mock.patch.object(
2669n/a policy, "set_event_loop",
2670n/a wraps=policy.set_event_loop) as m_set_event_loop:
2671n/a
2672n/a loop = policy.get_event_loop()
2673n/a
2674n/a # policy._local._loop must be set through .set_event_loop()
2675n/a # (the unix DefaultEventLoopPolicy needs this call to attach
2676n/a # the child watcher correctly)
2677n/a m_set_event_loop.assert_called_with(loop)
2678n/a
2679n/a loop.close()
2680n/a
2681n/a def test_get_event_loop_after_set_none(self):
2682n/a policy = asyncio.DefaultEventLoopPolicy()
2683n/a policy.set_event_loop(None)
2684n/a self.assertRaises(RuntimeError, policy.get_event_loop)
2685n/a
2686n/a @mock.patch('asyncio.events.threading.current_thread')
2687n/a def test_get_event_loop_thread(self, m_current_thread):
2688n/a
2689n/a def f():
2690n/a policy = asyncio.DefaultEventLoopPolicy()
2691n/a self.assertRaises(RuntimeError, policy.get_event_loop)
2692n/a
2693n/a th = threading.Thread(target=f)
2694n/a th.start()
2695n/a th.join()
2696n/a
2697n/a def test_new_event_loop(self):
2698n/a policy = asyncio.DefaultEventLoopPolicy()
2699n/a
2700n/a loop = policy.new_event_loop()
2701n/a self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2702n/a loop.close()
2703n/a
2704n/a def test_set_event_loop(self):
2705n/a policy = asyncio.DefaultEventLoopPolicy()
2706n/a old_loop = policy.get_event_loop()
2707n/a
2708n/a self.assertRaises(AssertionError, policy.set_event_loop, object())
2709n/a
2710n/a loop = policy.new_event_loop()
2711n/a policy.set_event_loop(loop)
2712n/a self.assertIs(loop, policy.get_event_loop())
2713n/a self.assertIsNot(old_loop, policy.get_event_loop())
2714n/a loop.close()
2715n/a old_loop.close()
2716n/a
2717n/a def test_get_event_loop_policy(self):
2718n/a policy = asyncio.get_event_loop_policy()
2719n/a self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2720n/a self.assertIs(policy, asyncio.get_event_loop_policy())
2721n/a
2722n/a def test_set_event_loop_policy(self):
2723n/a self.assertRaises(
2724n/a AssertionError, asyncio.set_event_loop_policy, object())
2725n/a
2726n/a old_policy = asyncio.get_event_loop_policy()
2727n/a
2728n/a policy = asyncio.DefaultEventLoopPolicy()
2729n/a asyncio.set_event_loop_policy(policy)
2730n/a self.assertIs(policy, asyncio.get_event_loop_policy())
2731n/a self.assertIsNot(policy, old_policy)
2732n/a
2733n/a def test_get_event_loop_returns_running_loop(self):
2734n/a class Policy(asyncio.DefaultEventLoopPolicy):
2735n/a def get_event_loop(self):
2736n/a raise NotImplementedError
2737n/a
2738n/a loop = None
2739n/a
2740n/a old_policy = asyncio.get_event_loop_policy()
2741n/a try:
2742n/a asyncio.set_event_loop_policy(Policy())
2743n/a loop = asyncio.new_event_loop()
2744n/a self.assertIs(asyncio._get_running_loop(), None)
2745n/a
2746n/a async def func():
2747n/a self.assertIs(asyncio.get_event_loop(), loop)
2748n/a self.assertIs(asyncio._get_running_loop(), loop)
2749n/a
2750n/a loop.run_until_complete(func())
2751n/a finally:
2752n/a asyncio.set_event_loop_policy(old_policy)
2753n/a if loop is not None:
2754n/a loop.close()
2755n/a
2756n/a self.assertIs(asyncio._get_running_loop(), None)
2757n/a
2758n/a
2759n/aif __name__ == '__main__':
2760n/a unittest.main()