ยปCore Development>Code coverage>Lib/test/test_asyncio/test_base_events.py

Python code coverage for Lib/test/test_asyncio/test_base_events.py

#countcontent
1n/a"""Tests for base_events.py"""
2n/a
3n/aimport errno
4n/aimport logging
5n/aimport math
6n/aimport os
7n/aimport socket
8n/aimport sys
9n/aimport threading
10n/aimport time
11n/aimport unittest
12n/afrom unittest import mock
13n/a
14n/aimport asyncio
15n/afrom asyncio import base_events
16n/afrom asyncio import constants
17n/afrom asyncio import test_utils
18n/atry:
19n/a from test import support
20n/aexcept ImportError:
21n/a from asyncio import test_support as support
22n/atry:
23n/a from test.support.script_helper import assert_python_ok
24n/aexcept ImportError:
25n/a try:
26n/a from test.script_helper import assert_python_ok
27n/a except ImportError:
28n/a from asyncio.test_support import assert_python_ok
29n/a
30n/a
31n/aMOCK_ANY = mock.ANY
32n/aPY34 = sys.version_info >= (3, 4)
33n/a
34n/a
35n/adef mock_socket_module():
36n/a m_socket = mock.MagicMock(spec=socket)
37n/a for name in (
38n/a 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
39n/a 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
40n/a ):
41n/a if hasattr(socket, name):
42n/a setattr(m_socket, name, getattr(socket, name))
43n/a else:
44n/a delattr(m_socket, name)
45n/a
46n/a m_socket.socket = mock.MagicMock()
47n/a m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
48n/a m_socket.getaddrinfo._is_coroutine = False
49n/a
50n/a return m_socket
51n/a
52n/a
53n/adef patch_socket(f):
54n/a return mock.patch('asyncio.base_events.socket',
55n/a new_callable=mock_socket_module)(f)
56n/a
57n/a
58n/aclass BaseEventTests(test_utils.TestCase):
59n/a
60n/a def test_ipaddr_info(self):
61n/a UNSPEC = socket.AF_UNSPEC
62n/a INET = socket.AF_INET
63n/a INET6 = socket.AF_INET6
64n/a STREAM = socket.SOCK_STREAM
65n/a DGRAM = socket.SOCK_DGRAM
66n/a TCP = socket.IPPROTO_TCP
67n/a UDP = socket.IPPROTO_UDP
68n/a
69n/a self.assertEqual(
70n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
71n/a base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
72n/a
73n/a self.assertEqual(
74n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
75n/a base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
76n/a
77n/a self.assertEqual(
78n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
79n/a base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
80n/a
81n/a self.assertEqual(
82n/a (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
83n/a base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
84n/a
85n/a # Socket type STREAM implies TCP protocol.
86n/a self.assertEqual(
87n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
88n/a base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
89n/a
90n/a # Socket type DGRAM implies UDP protocol.
91n/a self.assertEqual(
92n/a (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
93n/a base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
94n/a
95n/a # No socket type.
96n/a self.assertIsNone(
97n/a base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
98n/a
99n/a # IPv4 address with family IPv6.
100n/a self.assertIsNone(
101n/a base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
102n/a
103n/a self.assertEqual(
104n/a (INET6, STREAM, TCP, '', ('::3', 1)),
105n/a base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
106n/a
107n/a self.assertEqual(
108n/a (INET6, STREAM, TCP, '', ('::3', 1)),
109n/a base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
110n/a
111n/a # IPv6 address with family IPv4.
112n/a self.assertIsNone(
113n/a base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
114n/a
115n/a # IPv6 address with zone index.
116n/a self.assertIsNone(
117n/a base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
118n/a
119n/a if hasattr(socket, 'SOCK_NONBLOCK'):
120n/a self.assertEqual(
121n/a None,
122n/a base_events._ipaddr_info(
123n/a '1.2.3.4', 1, INET, STREAM | socket.SOCK_NONBLOCK, TCP))
124n/a
125n/a
126n/a def test_port_parameter_types(self):
127n/a # Test obscure kinds of arguments for "port".
128n/a INET = socket.AF_INET
129n/a STREAM = socket.SOCK_STREAM
130n/a TCP = socket.IPPROTO_TCP
131n/a
132n/a self.assertEqual(
133n/a (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
134n/a base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
135n/a
136n/a self.assertEqual(
137n/a (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
138n/a base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
139n/a
140n/a self.assertEqual(
141n/a (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
142n/a base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
143n/a
144n/a self.assertEqual(
145n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
146n/a base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
147n/a
148n/a self.assertEqual(
149n/a (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
150n/a base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
151n/a
152n/a @patch_socket
153n/a def test_ipaddr_info_no_inet_pton(self, m_socket):
154n/a del m_socket.inet_pton
155n/a self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
156n/a socket.AF_INET,
157n/a socket.SOCK_STREAM,
158n/a socket.IPPROTO_TCP))
159n/a
160n/a
161n/aclass BaseEventLoopTests(test_utils.TestCase):
162n/a
163n/a def setUp(self):
164n/a super().setUp()
165n/a self.loop = base_events.BaseEventLoop()
166n/a self.loop._selector = mock.Mock()
167n/a self.loop._selector.select.return_value = ()
168n/a self.set_event_loop(self.loop)
169n/a
170n/a def test_not_implemented(self):
171n/a m = mock.Mock()
172n/a self.assertRaises(
173n/a NotImplementedError,
174n/a self.loop._make_socket_transport, m, m)
175n/a self.assertRaises(
176n/a NotImplementedError,
177n/a self.loop._make_ssl_transport, m, m, m, m)
178n/a self.assertRaises(
179n/a NotImplementedError,
180n/a self.loop._make_datagram_transport, m, m)
181n/a self.assertRaises(
182n/a NotImplementedError, self.loop._process_events, [])
183n/a self.assertRaises(
184n/a NotImplementedError, self.loop._write_to_self)
185n/a self.assertRaises(
186n/a NotImplementedError,
187n/a self.loop._make_read_pipe_transport, m, m)
188n/a self.assertRaises(
189n/a NotImplementedError,
190n/a self.loop._make_write_pipe_transport, m, m)
191n/a gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
192n/a with self.assertRaises(NotImplementedError):
193n/a gen.send(None)
194n/a
195n/a def test_close(self):
196n/a self.assertFalse(self.loop.is_closed())
197n/a self.loop.close()
198n/a self.assertTrue(self.loop.is_closed())
199n/a
200n/a # it should be possible to call close() more than once
201n/a self.loop.close()
202n/a self.loop.close()
203n/a
204n/a # operation blocked when the loop is closed
205n/a f = asyncio.Future(loop=self.loop)
206n/a self.assertRaises(RuntimeError, self.loop.run_forever)
207n/a self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
208n/a
209n/a def test__add_callback_handle(self):
210n/a h = asyncio.Handle(lambda: False, (), self.loop)
211n/a
212n/a self.loop._add_callback(h)
213n/a self.assertFalse(self.loop._scheduled)
214n/a self.assertIn(h, self.loop._ready)
215n/a
216n/a def test__add_callback_cancelled_handle(self):
217n/a h = asyncio.Handle(lambda: False, (), self.loop)
218n/a h.cancel()
219n/a
220n/a self.loop._add_callback(h)
221n/a self.assertFalse(self.loop._scheduled)
222n/a self.assertFalse(self.loop._ready)
223n/a
224n/a def test_set_default_executor(self):
225n/a executor = mock.Mock()
226n/a self.loop.set_default_executor(executor)
227n/a self.assertIs(executor, self.loop._default_executor)
228n/a
229n/a def test_getnameinfo(self):
230n/a sockaddr = mock.Mock()
231n/a self.loop.run_in_executor = mock.Mock()
232n/a self.loop.getnameinfo(sockaddr)
233n/a self.assertEqual(
234n/a (None, socket.getnameinfo, sockaddr, 0),
235n/a self.loop.run_in_executor.call_args[0])
236n/a
237n/a def test_call_soon(self):
238n/a def cb():
239n/a pass
240n/a
241n/a h = self.loop.call_soon(cb)
242n/a self.assertEqual(h._callback, cb)
243n/a self.assertIsInstance(h, asyncio.Handle)
244n/a self.assertIn(h, self.loop._ready)
245n/a
246n/a def test_call_soon_non_callable(self):
247n/a self.loop.set_debug(True)
248n/a with self.assertRaisesRegex(TypeError, 'a callable object'):
249n/a self.loop.call_soon(1)
250n/a
251n/a def test_call_later(self):
252n/a def cb():
253n/a pass
254n/a
255n/a h = self.loop.call_later(10.0, cb)
256n/a self.assertIsInstance(h, asyncio.TimerHandle)
257n/a self.assertIn(h, self.loop._scheduled)
258n/a self.assertNotIn(h, self.loop._ready)
259n/a
260n/a def test_call_later_negative_delays(self):
261n/a calls = []
262n/a
263n/a def cb(arg):
264n/a calls.append(arg)
265n/a
266n/a self.loop._process_events = mock.Mock()
267n/a self.loop.call_later(-1, cb, 'a')
268n/a self.loop.call_later(-2, cb, 'b')
269n/a test_utils.run_briefly(self.loop)
270n/a self.assertEqual(calls, ['b', 'a'])
271n/a
272n/a def test_time_and_call_at(self):
273n/a def cb():
274n/a self.loop.stop()
275n/a
276n/a self.loop._process_events = mock.Mock()
277n/a delay = 0.1
278n/a
279n/a when = self.loop.time() + delay
280n/a self.loop.call_at(when, cb)
281n/a t0 = self.loop.time()
282n/a self.loop.run_forever()
283n/a dt = self.loop.time() - t0
284n/a
285n/a # 50 ms: maximum granularity of the event loop
286n/a self.assertGreaterEqual(dt, delay - 0.050, dt)
287n/a # tolerate a difference of +800 ms because some Python buildbots
288n/a # are really slow
289n/a self.assertLessEqual(dt, 0.9, dt)
290n/a
291n/a def check_thread(self, loop, debug):
292n/a def cb():
293n/a pass
294n/a
295n/a loop.set_debug(debug)
296n/a if debug:
297n/a msg = ("Non-thread-safe operation invoked on an event loop other "
298n/a "than the current one")
299n/a with self.assertRaisesRegex(RuntimeError, msg):
300n/a loop.call_soon(cb)
301n/a with self.assertRaisesRegex(RuntimeError, msg):
302n/a loop.call_later(60, cb)
303n/a with self.assertRaisesRegex(RuntimeError, msg):
304n/a loop.call_at(loop.time() + 60, cb)
305n/a else:
306n/a loop.call_soon(cb)
307n/a loop.call_later(60, cb)
308n/a loop.call_at(loop.time() + 60, cb)
309n/a
310n/a def test_check_thread(self):
311n/a def check_in_thread(loop, event, debug, create_loop, fut):
312n/a # wait until the event loop is running
313n/a event.wait()
314n/a
315n/a try:
316n/a if create_loop:
317n/a loop2 = base_events.BaseEventLoop()
318n/a try:
319n/a asyncio.set_event_loop(loop2)
320n/a self.check_thread(loop, debug)
321n/a finally:
322n/a asyncio.set_event_loop(None)
323n/a loop2.close()
324n/a else:
325n/a self.check_thread(loop, debug)
326n/a except Exception as exc:
327n/a loop.call_soon_threadsafe(fut.set_exception, exc)
328n/a else:
329n/a loop.call_soon_threadsafe(fut.set_result, None)
330n/a
331n/a def test_thread(loop, debug, create_loop=False):
332n/a event = threading.Event()
333n/a fut = asyncio.Future(loop=loop)
334n/a loop.call_soon(event.set)
335n/a args = (loop, event, debug, create_loop, fut)
336n/a thread = threading.Thread(target=check_in_thread, args=args)
337n/a thread.start()
338n/a loop.run_until_complete(fut)
339n/a thread.join()
340n/a
341n/a self.loop._process_events = mock.Mock()
342n/a self.loop._write_to_self = mock.Mock()
343n/a
344n/a # raise RuntimeError if the thread has no event loop
345n/a test_thread(self.loop, True)
346n/a
347n/a # check disabled if debug mode is disabled
348n/a test_thread(self.loop, False)
349n/a
350n/a # raise RuntimeError if the event loop of the thread is not the called
351n/a # event loop
352n/a test_thread(self.loop, True, create_loop=True)
353n/a
354n/a # check disabled if debug mode is disabled
355n/a test_thread(self.loop, False, create_loop=True)
356n/a
357n/a def test_run_once_in_executor_plain(self):
358n/a def cb():
359n/a pass
360n/a f = asyncio.Future(loop=self.loop)
361n/a executor = mock.Mock()
362n/a executor.submit.return_value = f
363n/a
364n/a self.loop.set_default_executor(executor)
365n/a
366n/a res = self.loop.run_in_executor(None, cb)
367n/a self.assertIs(f, res)
368n/a
369n/a executor = mock.Mock()
370n/a executor.submit.return_value = f
371n/a res = self.loop.run_in_executor(executor, cb)
372n/a self.assertIs(f, res)
373n/a self.assertTrue(executor.submit.called)
374n/a
375n/a f.cancel() # Don't complain about abandoned Future.
376n/a
377n/a def test__run_once(self):
378n/a h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
379n/a self.loop)
380n/a h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
381n/a self.loop)
382n/a
383n/a h1.cancel()
384n/a
385n/a self.loop._process_events = mock.Mock()
386n/a self.loop._scheduled.append(h1)
387n/a self.loop._scheduled.append(h2)
388n/a self.loop._run_once()
389n/a
390n/a t = self.loop._selector.select.call_args[0][0]
391n/a self.assertTrue(9.5 < t < 10.5, t)
392n/a self.assertEqual([h2], self.loop._scheduled)
393n/a self.assertTrue(self.loop._process_events.called)
394n/a
395n/a def test_set_debug(self):
396n/a self.loop.set_debug(True)
397n/a self.assertTrue(self.loop.get_debug())
398n/a self.loop.set_debug(False)
399n/a self.assertFalse(self.loop.get_debug())
400n/a
401n/a @mock.patch('asyncio.base_events.logger')
402n/a def test__run_once_logging(self, m_logger):
403n/a def slow_select(timeout):
404n/a # Sleep a bit longer than a second to avoid timer resolution
405n/a # issues.
406n/a time.sleep(1.1)
407n/a return []
408n/a
409n/a # logging needs debug flag
410n/a self.loop.set_debug(True)
411n/a
412n/a # Log to INFO level if timeout > 1.0 sec.
413n/a self.loop._selector.select = slow_select
414n/a self.loop._process_events = mock.Mock()
415n/a self.loop._run_once()
416n/a self.assertEqual(logging.INFO, m_logger.log.call_args[0][0])
417n/a
418n/a def fast_select(timeout):
419n/a time.sleep(0.001)
420n/a return []
421n/a
422n/a self.loop._selector.select = fast_select
423n/a self.loop._run_once()
424n/a self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0])
425n/a
426n/a def test__run_once_schedule_handle(self):
427n/a handle = None
428n/a processed = False
429n/a
430n/a def cb(loop):
431n/a nonlocal processed, handle
432n/a processed = True
433n/a handle = loop.call_soon(lambda: True)
434n/a
435n/a h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
436n/a self.loop)
437n/a
438n/a self.loop._process_events = mock.Mock()
439n/a self.loop._scheduled.append(h)
440n/a self.loop._run_once()
441n/a
442n/a self.assertTrue(processed)
443n/a self.assertEqual([handle], list(self.loop._ready))
444n/a
445n/a def test__run_once_cancelled_event_cleanup(self):
446n/a self.loop._process_events = mock.Mock()
447n/a
448n/a self.assertTrue(
449n/a 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
450n/a
451n/a def cb():
452n/a pass
453n/a
454n/a # Set up one "blocking" event that will not be cancelled to
455n/a # ensure later cancelled events do not make it to the head
456n/a # of the queue and get cleaned.
457n/a not_cancelled_count = 1
458n/a self.loop.call_later(3000, cb)
459n/a
460n/a # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
461n/a # cancelled handles, ensure they aren't removed
462n/a
463n/a cancelled_count = 2
464n/a for x in range(2):
465n/a h = self.loop.call_later(3600, cb)
466n/a h.cancel()
467n/a
468n/a # Add some cancelled events that will be at head and removed
469n/a cancelled_count += 2
470n/a for x in range(2):
471n/a h = self.loop.call_later(100, cb)
472n/a h.cancel()
473n/a
474n/a # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
475n/a self.assertLessEqual(cancelled_count + not_cancelled_count,
476n/a base_events._MIN_SCHEDULED_TIMER_HANDLES)
477n/a
478n/a self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
479n/a
480n/a self.loop._run_once()
481n/a
482n/a cancelled_count -= 2
483n/a
484n/a self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
485n/a
486n/a self.assertEqual(len(self.loop._scheduled),
487n/a cancelled_count + not_cancelled_count)
488n/a
489n/a # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
490n/a # so that deletion of cancelled events will occur on next _run_once
491n/a add_cancel_count = int(math.ceil(
492n/a base_events._MIN_SCHEDULED_TIMER_HANDLES *
493n/a base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
494n/a
495n/a add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
496n/a add_cancel_count, 0)
497n/a
498n/a # Add some events that will not be cancelled
499n/a not_cancelled_count += add_not_cancel_count
500n/a for x in range(add_not_cancel_count):
501n/a self.loop.call_later(3600, cb)
502n/a
503n/a # Add enough cancelled events
504n/a cancelled_count += add_cancel_count
505n/a for x in range(add_cancel_count):
506n/a h = self.loop.call_later(3600, cb)
507n/a h.cancel()
508n/a
509n/a # Ensure all handles are still scheduled
510n/a self.assertEqual(len(self.loop._scheduled),
511n/a cancelled_count + not_cancelled_count)
512n/a
513n/a self.loop._run_once()
514n/a
515n/a # Ensure cancelled events were removed
516n/a self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
517n/a
518n/a # Ensure only uncancelled events remain scheduled
519n/a self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
520n/a
521n/a def test_run_until_complete_type_error(self):
522n/a self.assertRaises(TypeError,
523n/a self.loop.run_until_complete, 'blah')
524n/a
525n/a def test_run_until_complete_loop(self):
526n/a task = asyncio.Future(loop=self.loop)
527n/a other_loop = self.new_test_loop()
528n/a self.addCleanup(other_loop.close)
529n/a self.assertRaises(ValueError,
530n/a other_loop.run_until_complete, task)
531n/a
532n/a def test_subprocess_exec_invalid_args(self):
533n/a args = [sys.executable, '-c', 'pass']
534n/a
535n/a # missing program parameter (empty args)
536n/a self.assertRaises(TypeError,
537n/a self.loop.run_until_complete, self.loop.subprocess_exec,
538n/a asyncio.SubprocessProtocol)
539n/a
540n/a # expected multiple arguments, not a list
541n/a self.assertRaises(TypeError,
542n/a self.loop.run_until_complete, self.loop.subprocess_exec,
543n/a asyncio.SubprocessProtocol, args)
544n/a
545n/a # program arguments must be strings, not int
546n/a self.assertRaises(TypeError,
547n/a self.loop.run_until_complete, self.loop.subprocess_exec,
548n/a asyncio.SubprocessProtocol, sys.executable, 123)
549n/a
550n/a # universal_newlines, shell, bufsize must not be set
551n/a self.assertRaises(TypeError,
552n/a self.loop.run_until_complete, self.loop.subprocess_exec,
553n/a asyncio.SubprocessProtocol, *args, universal_newlines=True)
554n/a self.assertRaises(TypeError,
555n/a self.loop.run_until_complete, self.loop.subprocess_exec,
556n/a asyncio.SubprocessProtocol, *args, shell=True)
557n/a self.assertRaises(TypeError,
558n/a self.loop.run_until_complete, self.loop.subprocess_exec,
559n/a asyncio.SubprocessProtocol, *args, bufsize=4096)
560n/a
561n/a def test_subprocess_shell_invalid_args(self):
562n/a # expected a string, not an int or a list
563n/a self.assertRaises(TypeError,
564n/a self.loop.run_until_complete, self.loop.subprocess_shell,
565n/a asyncio.SubprocessProtocol, 123)
566n/a self.assertRaises(TypeError,
567n/a self.loop.run_until_complete, self.loop.subprocess_shell,
568n/a asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
569n/a
570n/a # universal_newlines, shell, bufsize must not be set
571n/a self.assertRaises(TypeError,
572n/a self.loop.run_until_complete, self.loop.subprocess_shell,
573n/a asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
574n/a self.assertRaises(TypeError,
575n/a self.loop.run_until_complete, self.loop.subprocess_shell,
576n/a asyncio.SubprocessProtocol, 'exit 0', shell=True)
577n/a self.assertRaises(TypeError,
578n/a self.loop.run_until_complete, self.loop.subprocess_shell,
579n/a asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
580n/a
581n/a def test_default_exc_handler_callback(self):
582n/a self.loop._process_events = mock.Mock()
583n/a
584n/a def zero_error(fut):
585n/a fut.set_result(True)
586n/a 1/0
587n/a
588n/a # Test call_soon (events.Handle)
589n/a with mock.patch('asyncio.base_events.logger') as log:
590n/a fut = asyncio.Future(loop=self.loop)
591n/a self.loop.call_soon(zero_error, fut)
592n/a fut.add_done_callback(lambda fut: self.loop.stop())
593n/a self.loop.run_forever()
594n/a log.error.assert_called_with(
595n/a test_utils.MockPattern('Exception in callback.*zero'),
596n/a exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
597n/a
598n/a # Test call_later (events.TimerHandle)
599n/a with mock.patch('asyncio.base_events.logger') as log:
600n/a fut = asyncio.Future(loop=self.loop)
601n/a self.loop.call_later(0.01, zero_error, fut)
602n/a fut.add_done_callback(lambda fut: self.loop.stop())
603n/a self.loop.run_forever()
604n/a log.error.assert_called_with(
605n/a test_utils.MockPattern('Exception in callback.*zero'),
606n/a exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
607n/a
608n/a def test_default_exc_handler_coro(self):
609n/a self.loop._process_events = mock.Mock()
610n/a
611n/a @asyncio.coroutine
612n/a def zero_error_coro():
613n/a yield from asyncio.sleep(0.01, loop=self.loop)
614n/a 1/0
615n/a
616n/a # Test Future.__del__
617n/a with mock.patch('asyncio.base_events.logger') as log:
618n/a fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
619n/a fut.add_done_callback(lambda *args: self.loop.stop())
620n/a self.loop.run_forever()
621n/a fut = None # Trigger Future.__del__ or futures._TracebackLogger
622n/a support.gc_collect()
623n/a if PY34:
624n/a # Future.__del__ in Python 3.4 logs error with
625n/a # an actual exception context
626n/a log.error.assert_called_with(
627n/a test_utils.MockPattern('.*exception was never retrieved'),
628n/a exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
629n/a else:
630n/a # futures._TracebackLogger logs only textual traceback
631n/a log.error.assert_called_with(
632n/a test_utils.MockPattern(
633n/a '.*exception was never retrieved.*ZeroDiv'),
634n/a exc_info=False)
635n/a
636n/a def test_set_exc_handler_invalid(self):
637n/a with self.assertRaisesRegex(TypeError, 'A callable object or None'):
638n/a self.loop.set_exception_handler('spam')
639n/a
640n/a def test_set_exc_handler_custom(self):
641n/a def zero_error():
642n/a 1/0
643n/a
644n/a def run_loop():
645n/a handle = self.loop.call_soon(zero_error)
646n/a self.loop._run_once()
647n/a return handle
648n/a
649n/a self.loop.set_debug(True)
650n/a self.loop._process_events = mock.Mock()
651n/a
652n/a self.assertIsNone(self.loop.get_exception_handler())
653n/a mock_handler = mock.Mock()
654n/a self.loop.set_exception_handler(mock_handler)
655n/a self.assertIs(self.loop.get_exception_handler(), mock_handler)
656n/a handle = run_loop()
657n/a mock_handler.assert_called_with(self.loop, {
658n/a 'exception': MOCK_ANY,
659n/a 'message': test_utils.MockPattern(
660n/a 'Exception in callback.*zero_error'),
661n/a 'handle': handle,
662n/a 'source_traceback': handle._source_traceback,
663n/a })
664n/a mock_handler.reset_mock()
665n/a
666n/a self.loop.set_exception_handler(None)
667n/a with mock.patch('asyncio.base_events.logger') as log:
668n/a run_loop()
669n/a log.error.assert_called_with(
670n/a test_utils.MockPattern(
671n/a 'Exception in callback.*zero'),
672n/a exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
673n/a
674n/a assert not mock_handler.called
675n/a
676n/a def test_set_exc_handler_broken(self):
677n/a def run_loop():
678n/a def zero_error():
679n/a 1/0
680n/a self.loop.call_soon(zero_error)
681n/a self.loop._run_once()
682n/a
683n/a def handler(loop, context):
684n/a raise AttributeError('spam')
685n/a
686n/a self.loop._process_events = mock.Mock()
687n/a
688n/a self.loop.set_exception_handler(handler)
689n/a
690n/a with mock.patch('asyncio.base_events.logger') as log:
691n/a run_loop()
692n/a log.error.assert_called_with(
693n/a test_utils.MockPattern(
694n/a 'Unhandled error in exception handler'),
695n/a exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
696n/a
697n/a def test_default_exc_handler_broken(self):
698n/a _context = None
699n/a
700n/a class Loop(base_events.BaseEventLoop):
701n/a
702n/a _selector = mock.Mock()
703n/a _process_events = mock.Mock()
704n/a
705n/a def default_exception_handler(self, context):
706n/a nonlocal _context
707n/a _context = context
708n/a # Simulates custom buggy "default_exception_handler"
709n/a raise ValueError('spam')
710n/a
711n/a loop = Loop()
712n/a self.addCleanup(loop.close)
713n/a asyncio.set_event_loop(loop)
714n/a
715n/a def run_loop():
716n/a def zero_error():
717n/a 1/0
718n/a loop.call_soon(zero_error)
719n/a loop._run_once()
720n/a
721n/a with mock.patch('asyncio.base_events.logger') as log:
722n/a run_loop()
723n/a log.error.assert_called_with(
724n/a 'Exception in default exception handler',
725n/a exc_info=True)
726n/a
727n/a def custom_handler(loop, context):
728n/a raise ValueError('ham')
729n/a
730n/a _context = None
731n/a loop.set_exception_handler(custom_handler)
732n/a with mock.patch('asyncio.base_events.logger') as log:
733n/a run_loop()
734n/a log.error.assert_called_with(
735n/a test_utils.MockPattern('Exception in default exception.*'
736n/a 'while handling.*in custom'),
737n/a exc_info=True)
738n/a
739n/a # Check that original context was passed to default
740n/a # exception handler.
741n/a self.assertIn('context', _context)
742n/a self.assertIs(type(_context['context']['exception']),
743n/a ZeroDivisionError)
744n/a
745n/a def test_set_task_factory_invalid(self):
746n/a with self.assertRaisesRegex(
747n/a TypeError, 'task factory must be a callable or None'):
748n/a
749n/a self.loop.set_task_factory(1)
750n/a
751n/a self.assertIsNone(self.loop.get_task_factory())
752n/a
753n/a def test_set_task_factory(self):
754n/a self.loop._process_events = mock.Mock()
755n/a
756n/a class MyTask(asyncio.Task):
757n/a pass
758n/a
759n/a @asyncio.coroutine
760n/a def coro():
761n/a pass
762n/a
763n/a factory = lambda loop, coro: MyTask(coro, loop=loop)
764n/a
765n/a self.assertIsNone(self.loop.get_task_factory())
766n/a self.loop.set_task_factory(factory)
767n/a self.assertIs(self.loop.get_task_factory(), factory)
768n/a
769n/a task = self.loop.create_task(coro())
770n/a self.assertTrue(isinstance(task, MyTask))
771n/a self.loop.run_until_complete(task)
772n/a
773n/a self.loop.set_task_factory(None)
774n/a self.assertIsNone(self.loop.get_task_factory())
775n/a
776n/a task = self.loop.create_task(coro())
777n/a self.assertTrue(isinstance(task, asyncio.Task))
778n/a self.assertFalse(isinstance(task, MyTask))
779n/a self.loop.run_until_complete(task)
780n/a
781n/a def test_env_var_debug(self):
782n/a code = '\n'.join((
783n/a 'import asyncio',
784n/a 'loop = asyncio.get_event_loop()',
785n/a 'print(loop.get_debug())'))
786n/a
787n/a # Test with -E to not fail if the unit test was run with
788n/a # PYTHONASYNCIODEBUG set to a non-empty string
789n/a sts, stdout, stderr = assert_python_ok('-E', '-c', code)
790n/a self.assertEqual(stdout.rstrip(), b'False')
791n/a
792n/a sts, stdout, stderr = assert_python_ok('-c', code,
793n/a PYTHONASYNCIODEBUG='')
794n/a self.assertEqual(stdout.rstrip(), b'False')
795n/a
796n/a sts, stdout, stderr = assert_python_ok('-c', code,
797n/a PYTHONASYNCIODEBUG='1')
798n/a self.assertEqual(stdout.rstrip(), b'True')
799n/a
800n/a sts, stdout, stderr = assert_python_ok('-E', '-c', code,
801n/a PYTHONASYNCIODEBUG='1')
802n/a self.assertEqual(stdout.rstrip(), b'False')
803n/a
804n/a def test_create_task(self):
805n/a class MyTask(asyncio.Task):
806n/a pass
807n/a
808n/a @asyncio.coroutine
809n/a def test():
810n/a pass
811n/a
812n/a class EventLoop(base_events.BaseEventLoop):
813n/a def create_task(self, coro):
814n/a return MyTask(coro, loop=loop)
815n/a
816n/a loop = EventLoop()
817n/a self.set_event_loop(loop)
818n/a
819n/a coro = test()
820n/a task = asyncio.ensure_future(coro, loop=loop)
821n/a self.assertIsInstance(task, MyTask)
822n/a
823n/a # make warnings quiet
824n/a task._log_destroy_pending = False
825n/a coro.close()
826n/a
827n/a def test_run_forever_keyboard_interrupt(self):
828n/a # Python issue #22601: ensure that the temporary task created by
829n/a # run_forever() consumes the KeyboardInterrupt and so don't log
830n/a # a warning
831n/a @asyncio.coroutine
832n/a def raise_keyboard_interrupt():
833n/a raise KeyboardInterrupt
834n/a
835n/a self.loop._process_events = mock.Mock()
836n/a self.loop.call_exception_handler = mock.Mock()
837n/a
838n/a try:
839n/a self.loop.run_until_complete(raise_keyboard_interrupt())
840n/a except KeyboardInterrupt:
841n/a pass
842n/a self.loop.close()
843n/a support.gc_collect()
844n/a
845n/a self.assertFalse(self.loop.call_exception_handler.called)
846n/a
847n/a def test_run_until_complete_baseexception(self):
848n/a # Python issue #22429: run_until_complete() must not schedule a pending
849n/a # call to stop() if the future raised a BaseException
850n/a @asyncio.coroutine
851n/a def raise_keyboard_interrupt():
852n/a raise KeyboardInterrupt
853n/a
854n/a self.loop._process_events = mock.Mock()
855n/a
856n/a try:
857n/a self.loop.run_until_complete(raise_keyboard_interrupt())
858n/a except KeyboardInterrupt:
859n/a pass
860n/a
861n/a def func():
862n/a self.loop.stop()
863n/a func.called = True
864n/a func.called = False
865n/a try:
866n/a self.loop.call_soon(func)
867n/a self.loop.run_forever()
868n/a except KeyboardInterrupt:
869n/a pass
870n/a self.assertTrue(func.called)
871n/a
872n/a def test_single_selecter_event_callback_after_stopping(self):
873n/a # Python issue #25593: A stopped event loop may cause event callbacks
874n/a # to run more than once.
875n/a event_sentinel = object()
876n/a callcount = 0
877n/a doer = None
878n/a
879n/a def proc_events(event_list):
880n/a nonlocal doer
881n/a if event_sentinel in event_list:
882n/a doer = self.loop.call_soon(do_event)
883n/a
884n/a def do_event():
885n/a nonlocal callcount
886n/a callcount += 1
887n/a self.loop.call_soon(clear_selector)
888n/a
889n/a def clear_selector():
890n/a doer.cancel()
891n/a self.loop._selector.select.return_value = ()
892n/a
893n/a self.loop._process_events = proc_events
894n/a self.loop._selector.select.return_value = (event_sentinel,)
895n/a
896n/a for i in range(1, 3):
897n/a with self.subTest('Loop %d/2' % i):
898n/a self.loop.call_soon(self.loop.stop)
899n/a self.loop.run_forever()
900n/a self.assertEqual(callcount, 1)
901n/a
902n/a def test_run_once(self):
903n/a # Simple test for test_utils.run_once(). It may seem strange
904n/a # to have a test for this (the function isn't even used!) but
905n/a # it's a de-factor standard API for library tests. This tests
906n/a # the idiom: loop.call_soon(loop.stop); loop.run_forever().
907n/a count = 0
908n/a
909n/a def callback():
910n/a nonlocal count
911n/a count += 1
912n/a
913n/a self.loop._process_events = mock.Mock()
914n/a self.loop.call_soon(callback)
915n/a test_utils.run_once(self.loop)
916n/a self.assertEqual(count, 1)
917n/a
918n/a def test_run_forever_pre_stopped(self):
919n/a # Test that the old idiom for pre-stopping the loop works.
920n/a self.loop._process_events = mock.Mock()
921n/a self.loop.stop()
922n/a self.loop.run_forever()
923n/a self.loop._selector.select.assert_called_once_with(0)
924n/a
925n/a
926n/aclass MyProto(asyncio.Protocol):
927n/a done = None
928n/a
929n/a def __init__(self, create_future=False):
930n/a self.state = 'INITIAL'
931n/a self.nbytes = 0
932n/a if create_future:
933n/a self.done = asyncio.Future()
934n/a
935n/a def connection_made(self, transport):
936n/a self.transport = transport
937n/a assert self.state == 'INITIAL', self.state
938n/a self.state = 'CONNECTED'
939n/a transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
940n/a
941n/a def data_received(self, data):
942n/a assert self.state == 'CONNECTED', self.state
943n/a self.nbytes += len(data)
944n/a
945n/a def eof_received(self):
946n/a assert self.state == 'CONNECTED', self.state
947n/a self.state = 'EOF'
948n/a
949n/a def connection_lost(self, exc):
950n/a assert self.state in ('CONNECTED', 'EOF'), self.state
951n/a self.state = 'CLOSED'
952n/a if self.done:
953n/a self.done.set_result(None)
954n/a
955n/a
956n/aclass MyDatagramProto(asyncio.DatagramProtocol):
957n/a done = None
958n/a
959n/a def __init__(self, create_future=False, loop=None):
960n/a self.state = 'INITIAL'
961n/a self.nbytes = 0
962n/a if create_future:
963n/a self.done = asyncio.Future(loop=loop)
964n/a
965n/a def connection_made(self, transport):
966n/a self.transport = transport
967n/a assert self.state == 'INITIAL', self.state
968n/a self.state = 'INITIALIZED'
969n/a
970n/a def datagram_received(self, data, addr):
971n/a assert self.state == 'INITIALIZED', self.state
972n/a self.nbytes += len(data)
973n/a
974n/a def error_received(self, exc):
975n/a assert self.state == 'INITIALIZED', self.state
976n/a
977n/a def connection_lost(self, exc):
978n/a assert self.state == 'INITIALIZED', self.state
979n/a self.state = 'CLOSED'
980n/a if self.done:
981n/a self.done.set_result(None)
982n/a
983n/a
984n/aclass BaseEventLoopWithSelectorTests(test_utils.TestCase):
985n/a
986n/a def setUp(self):
987n/a super().setUp()
988n/a self.loop = asyncio.new_event_loop()
989n/a self.set_event_loop(self.loop)
990n/a
991n/a @patch_socket
992n/a def test_create_connection_multiple_errors(self, m_socket):
993n/a
994n/a class MyProto(asyncio.Protocol):
995n/a pass
996n/a
997n/a @asyncio.coroutine
998n/a def getaddrinfo(*args, **kw):
999n/a yield from []
1000n/a return [(2, 1, 6, '', ('107.6.106.82', 80)),
1001n/a (2, 1, 6, '', ('107.6.106.82', 80))]
1002n/a
1003n/a def getaddrinfo_task(*args, **kwds):
1004n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1005n/a
1006n/a idx = -1
1007n/a errors = ['err1', 'err2']
1008n/a
1009n/a def _socket(*args, **kw):
1010n/a nonlocal idx, errors
1011n/a idx += 1
1012n/a raise OSError(errors[idx])
1013n/a
1014n/a m_socket.socket = _socket
1015n/a
1016n/a self.loop.getaddrinfo = getaddrinfo_task
1017n/a
1018n/a coro = self.loop.create_connection(MyProto, 'example.com', 80)
1019n/a with self.assertRaises(OSError) as cm:
1020n/a self.loop.run_until_complete(coro)
1021n/a
1022n/a self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1023n/a
1024n/a @patch_socket
1025n/a def test_create_connection_timeout(self, m_socket):
1026n/a # Ensure that the socket is closed on timeout
1027n/a sock = mock.Mock()
1028n/a m_socket.socket.return_value = sock
1029n/a
1030n/a def getaddrinfo(*args, **kw):
1031n/a fut = asyncio.Future(loop=self.loop)
1032n/a addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1033n/a ('127.0.0.1', 80))
1034n/a fut.set_result([addr])
1035n/a return fut
1036n/a self.loop.getaddrinfo = getaddrinfo
1037n/a
1038n/a with mock.patch.object(self.loop, 'sock_connect',
1039n/a side_effect=asyncio.TimeoutError):
1040n/a coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1041n/a with self.assertRaises(asyncio.TimeoutError):
1042n/a self.loop.run_until_complete(coro)
1043n/a self.assertTrue(sock.close.called)
1044n/a
1045n/a def test_create_connection_host_port_sock(self):
1046n/a coro = self.loop.create_connection(
1047n/a MyProto, 'example.com', 80, sock=object())
1048n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1049n/a
1050n/a def test_create_connection_wrong_sock(self):
1051n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1052n/a with sock:
1053n/a coro = self.loop.create_connection(MyProto, sock=sock)
1054n/a with self.assertRaisesRegex(ValueError,
1055n/a 'A Stream Socket was expected'):
1056n/a self.loop.run_until_complete(coro)
1057n/a
1058n/a def test_create_server_wrong_sock(self):
1059n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1060n/a with sock:
1061n/a coro = self.loop.create_server(MyProto, sock=sock)
1062n/a with self.assertRaisesRegex(ValueError,
1063n/a 'A Stream Socket was expected'):
1064n/a self.loop.run_until_complete(coro)
1065n/a
1066n/a @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1067n/a 'no socket.SOCK_NONBLOCK (linux only)')
1068n/a def test_create_server_stream_bittype(self):
1069n/a sock = socket.socket(
1070n/a socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1071n/a with sock:
1072n/a coro = self.loop.create_server(lambda: None, sock=sock)
1073n/a srv = self.loop.run_until_complete(coro)
1074n/a srv.close()
1075n/a self.loop.run_until_complete(srv.wait_closed())
1076n/a
1077n/a def test_create_datagram_endpoint_wrong_sock(self):
1078n/a sock = socket.socket(socket.AF_INET)
1079n/a with sock:
1080n/a coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1081n/a with self.assertRaisesRegex(ValueError,
1082n/a 'A UDP Socket was expected'):
1083n/a self.loop.run_until_complete(coro)
1084n/a
1085n/a def test_create_connection_no_host_port_sock(self):
1086n/a coro = self.loop.create_connection(MyProto)
1087n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1088n/a
1089n/a def test_create_connection_no_getaddrinfo(self):
1090n/a @asyncio.coroutine
1091n/a def getaddrinfo(*args, **kw):
1092n/a yield from []
1093n/a
1094n/a def getaddrinfo_task(*args, **kwds):
1095n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1096n/a
1097n/a self.loop.getaddrinfo = getaddrinfo_task
1098n/a coro = self.loop.create_connection(MyProto, 'example.com', 80)
1099n/a self.assertRaises(
1100n/a OSError, self.loop.run_until_complete, coro)
1101n/a
1102n/a def test_create_connection_connect_err(self):
1103n/a @asyncio.coroutine
1104n/a def getaddrinfo(*args, **kw):
1105n/a yield from []
1106n/a return [(2, 1, 6, '', ('107.6.106.82', 80))]
1107n/a
1108n/a def getaddrinfo_task(*args, **kwds):
1109n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1110n/a
1111n/a self.loop.getaddrinfo = getaddrinfo_task
1112n/a self.loop.sock_connect = mock.Mock()
1113n/a self.loop.sock_connect.side_effect = OSError
1114n/a
1115n/a coro = self.loop.create_connection(MyProto, 'example.com', 80)
1116n/a self.assertRaises(
1117n/a OSError, self.loop.run_until_complete, coro)
1118n/a
1119n/a def test_create_connection_multiple(self):
1120n/a @asyncio.coroutine
1121n/a def getaddrinfo(*args, **kw):
1122n/a return [(2, 1, 6, '', ('0.0.0.1', 80)),
1123n/a (2, 1, 6, '', ('0.0.0.2', 80))]
1124n/a
1125n/a def getaddrinfo_task(*args, **kwds):
1126n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1127n/a
1128n/a self.loop.getaddrinfo = getaddrinfo_task
1129n/a self.loop.sock_connect = mock.Mock()
1130n/a self.loop.sock_connect.side_effect = OSError
1131n/a
1132n/a coro = self.loop.create_connection(
1133n/a MyProto, 'example.com', 80, family=socket.AF_INET)
1134n/a with self.assertRaises(OSError):
1135n/a self.loop.run_until_complete(coro)
1136n/a
1137n/a @patch_socket
1138n/a def test_create_connection_multiple_errors_local_addr(self, m_socket):
1139n/a
1140n/a def bind(addr):
1141n/a if addr[0] == '0.0.0.1':
1142n/a err = OSError('Err')
1143n/a err.strerror = 'Err'
1144n/a raise err
1145n/a
1146n/a m_socket.socket.return_value.bind = bind
1147n/a
1148n/a @asyncio.coroutine
1149n/a def getaddrinfo(*args, **kw):
1150n/a return [(2, 1, 6, '', ('0.0.0.1', 80)),
1151n/a (2, 1, 6, '', ('0.0.0.2', 80))]
1152n/a
1153n/a def getaddrinfo_task(*args, **kwds):
1154n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1155n/a
1156n/a self.loop.getaddrinfo = getaddrinfo_task
1157n/a self.loop.sock_connect = mock.Mock()
1158n/a self.loop.sock_connect.side_effect = OSError('Err2')
1159n/a
1160n/a coro = self.loop.create_connection(
1161n/a MyProto, 'example.com', 80, family=socket.AF_INET,
1162n/a local_addr=(None, 8080))
1163n/a with self.assertRaises(OSError) as cm:
1164n/a self.loop.run_until_complete(coro)
1165n/a
1166n/a self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1167n/a self.assertTrue(m_socket.socket.return_value.close.called)
1168n/a
1169n/a def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1170n/a # Test the fallback code, even if this system has inet_pton.
1171n/a if not allow_inet_pton:
1172n/a del m_socket.inet_pton
1173n/a
1174n/a m_socket.getaddrinfo = socket.getaddrinfo
1175n/a sock = m_socket.socket.return_value
1176n/a
1177n/a self.loop._add_reader = mock.Mock()
1178n/a self.loop._add_reader._is_coroutine = False
1179n/a self.loop._add_writer = mock.Mock()
1180n/a self.loop._add_writer._is_coroutine = False
1181n/a
1182n/a coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1183n/a t, p = self.loop.run_until_complete(coro)
1184n/a try:
1185n/a sock.connect.assert_called_with(('1.2.3.4', 80))
1186n/a _, kwargs = m_socket.socket.call_args
1187n/a self.assertEqual(kwargs['family'], m_socket.AF_INET)
1188n/a self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1189n/a finally:
1190n/a t.close()
1191n/a test_utils.run_briefly(self.loop) # allow transport to close
1192n/a
1193n/a sock.family = socket.AF_INET6
1194n/a coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1195n/a t, p = self.loop.run_until_complete(coro)
1196n/a try:
1197n/a # Without inet_pton we use getaddrinfo, which transforms ('::1', 80)
1198n/a # to ('::1', 80, 0, 0). The last 0s are flow info, scope id.
1199n/a [address] = sock.connect.call_args[0]
1200n/a host, port = address[:2]
1201n/a self.assertRegex(host, r'::(0\.)*1')
1202n/a self.assertEqual(port, 80)
1203n/a _, kwargs = m_socket.socket.call_args
1204n/a self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1205n/a self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1206n/a finally:
1207n/a t.close()
1208n/a test_utils.run_briefly(self.loop) # allow transport to close
1209n/a
1210n/a @patch_socket
1211n/a def test_create_connection_ip_addr(self, m_socket):
1212n/a self._test_create_connection_ip_addr(m_socket, True)
1213n/a
1214n/a @patch_socket
1215n/a def test_create_connection_no_inet_pton(self, m_socket):
1216n/a self._test_create_connection_ip_addr(m_socket, False)
1217n/a
1218n/a @patch_socket
1219n/a def test_create_connection_service_name(self, m_socket):
1220n/a m_socket.getaddrinfo = socket.getaddrinfo
1221n/a sock = m_socket.socket.return_value
1222n/a
1223n/a self.loop._add_reader = mock.Mock()
1224n/a self.loop._add_reader._is_coroutine = False
1225n/a self.loop._add_writer = mock.Mock()
1226n/a self.loop._add_writer._is_coroutine = False
1227n/a
1228n/a for service, port in ('http', 80), (b'http', 80):
1229n/a coro = self.loop.create_connection(asyncio.Protocol,
1230n/a '127.0.0.1', service)
1231n/a
1232n/a t, p = self.loop.run_until_complete(coro)
1233n/a try:
1234n/a sock.connect.assert_called_with(('127.0.0.1', port))
1235n/a _, kwargs = m_socket.socket.call_args
1236n/a self.assertEqual(kwargs['family'], m_socket.AF_INET)
1237n/a self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1238n/a finally:
1239n/a t.close()
1240n/a test_utils.run_briefly(self.loop) # allow transport to close
1241n/a
1242n/a for service in 'nonsense', b'nonsense':
1243n/a coro = self.loop.create_connection(asyncio.Protocol,
1244n/a '127.0.0.1', service)
1245n/a
1246n/a with self.assertRaises(OSError):
1247n/a self.loop.run_until_complete(coro)
1248n/a
1249n/a def test_create_connection_no_local_addr(self):
1250n/a @asyncio.coroutine
1251n/a def getaddrinfo(host, *args, **kw):
1252n/a if host == 'example.com':
1253n/a return [(2, 1, 6, '', ('107.6.106.82', 80)),
1254n/a (2, 1, 6, '', ('107.6.106.82', 80))]
1255n/a else:
1256n/a return []
1257n/a
1258n/a def getaddrinfo_task(*args, **kwds):
1259n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1260n/a self.loop.getaddrinfo = getaddrinfo_task
1261n/a
1262n/a coro = self.loop.create_connection(
1263n/a MyProto, 'example.com', 80, family=socket.AF_INET,
1264n/a local_addr=(None, 8080))
1265n/a self.assertRaises(
1266n/a OSError, self.loop.run_until_complete, coro)
1267n/a
1268n/a @patch_socket
1269n/a def test_create_connection_bluetooth(self, m_socket):
1270n/a # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1271n/a # we can't recognize an address is resolved, e.g. a Bluetooth address.
1272n/a addr = ('00:01:02:03:04:05', 1)
1273n/a
1274n/a def getaddrinfo(host, port, *args, **kw):
1275n/a assert (host, port) == addr
1276n/a return [(999, 1, 999, '', (addr, 1))]
1277n/a
1278n/a m_socket.getaddrinfo = getaddrinfo
1279n/a sock = m_socket.socket()
1280n/a coro = self.loop.sock_connect(sock, addr)
1281n/a self.loop.run_until_complete(coro)
1282n/a
1283n/a def test_create_connection_ssl_server_hostname_default(self):
1284n/a self.loop.getaddrinfo = mock.Mock()
1285n/a
1286n/a def mock_getaddrinfo(*args, **kwds):
1287n/a f = asyncio.Future(loop=self.loop)
1288n/a f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1289n/a socket.SOL_TCP, '', ('1.2.3.4', 80))])
1290n/a return f
1291n/a
1292n/a self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1293n/a self.loop.sock_connect = mock.Mock()
1294n/a self.loop.sock_connect.return_value = ()
1295n/a self.loop._make_ssl_transport = mock.Mock()
1296n/a
1297n/a class _SelectorTransportMock:
1298n/a _sock = None
1299n/a
1300n/a def get_extra_info(self, key):
1301n/a return mock.Mock()
1302n/a
1303n/a def close(self):
1304n/a self._sock.close()
1305n/a
1306n/a def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1307n/a **kwds):
1308n/a waiter.set_result(None)
1309n/a transport = _SelectorTransportMock()
1310n/a transport._sock = sock
1311n/a return transport
1312n/a
1313n/a self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1314n/a ANY = mock.ANY
1315n/a # First try the default server_hostname.
1316n/a self.loop._make_ssl_transport.reset_mock()
1317n/a coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True)
1318n/a transport, _ = self.loop.run_until_complete(coro)
1319n/a transport.close()
1320n/a self.loop._make_ssl_transport.assert_called_with(
1321n/a ANY, ANY, ANY, ANY,
1322n/a server_side=False,
1323n/a server_hostname='python.org')
1324n/a # Next try an explicit server_hostname.
1325n/a self.loop._make_ssl_transport.reset_mock()
1326n/a coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
1327n/a server_hostname='perl.com')
1328n/a transport, _ = self.loop.run_until_complete(coro)
1329n/a transport.close()
1330n/a self.loop._make_ssl_transport.assert_called_with(
1331n/a ANY, ANY, ANY, ANY,
1332n/a server_side=False,
1333n/a server_hostname='perl.com')
1334n/a # Finally try an explicit empty server_hostname.
1335n/a self.loop._make_ssl_transport.reset_mock()
1336n/a coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
1337n/a server_hostname='')
1338n/a transport, _ = self.loop.run_until_complete(coro)
1339n/a transport.close()
1340n/a self.loop._make_ssl_transport.assert_called_with(ANY, ANY, ANY, ANY,
1341n/a server_side=False,
1342n/a server_hostname='')
1343n/a
1344n/a def test_create_connection_no_ssl_server_hostname_errors(self):
1345n/a # When not using ssl, server_hostname must be None.
1346n/a coro = self.loop.create_connection(MyProto, 'python.org', 80,
1347n/a server_hostname='')
1348n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1349n/a coro = self.loop.create_connection(MyProto, 'python.org', 80,
1350n/a server_hostname='python.org')
1351n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1352n/a
1353n/a def test_create_connection_ssl_server_hostname_errors(self):
1354n/a # When using ssl, server_hostname may be None if host is non-empty.
1355n/a coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1356n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1357n/a coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1358n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1359n/a sock = socket.socket()
1360n/a coro = self.loop.create_connection(MyProto, None, None,
1361n/a ssl=True, sock=sock)
1362n/a self.addCleanup(sock.close)
1363n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1364n/a
1365n/a def test_create_server_empty_host(self):
1366n/a # if host is empty string use None instead
1367n/a host = object()
1368n/a
1369n/a @asyncio.coroutine
1370n/a def getaddrinfo(*args, **kw):
1371n/a nonlocal host
1372n/a host = args[0]
1373n/a yield from []
1374n/a
1375n/a def getaddrinfo_task(*args, **kwds):
1376n/a return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1377n/a
1378n/a self.loop.getaddrinfo = getaddrinfo_task
1379n/a fut = self.loop.create_server(MyProto, '', 0)
1380n/a self.assertRaises(OSError, self.loop.run_until_complete, fut)
1381n/a self.assertIsNone(host)
1382n/a
1383n/a def test_create_server_host_port_sock(self):
1384n/a fut = self.loop.create_server(
1385n/a MyProto, '0.0.0.0', 0, sock=object())
1386n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1387n/a
1388n/a def test_create_server_no_host_port_sock(self):
1389n/a fut = self.loop.create_server(MyProto)
1390n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1391n/a
1392n/a def test_create_server_no_getaddrinfo(self):
1393n/a getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1394n/a getaddrinfo.return_value = []
1395n/a
1396n/a f = self.loop.create_server(MyProto, 'python.org', 0)
1397n/a self.assertRaises(OSError, self.loop.run_until_complete, f)
1398n/a
1399n/a @patch_socket
1400n/a def test_create_server_nosoreuseport(self, m_socket):
1401n/a m_socket.getaddrinfo = socket.getaddrinfo
1402n/a del m_socket.SO_REUSEPORT
1403n/a m_socket.socket.return_value = mock.Mock()
1404n/a
1405n/a f = self.loop.create_server(
1406n/a MyProto, '0.0.0.0', 0, reuse_port=True)
1407n/a
1408n/a self.assertRaises(ValueError, self.loop.run_until_complete, f)
1409n/a
1410n/a @patch_socket
1411n/a def test_create_server_soreuseport_only_defined(self, m_socket):
1412n/a m_socket.getaddrinfo = socket.getaddrinfo
1413n/a m_socket.socket.return_value = mock.Mock()
1414n/a m_socket.SO_REUSEPORT = -1
1415n/a
1416n/a f = self.loop.create_server(
1417n/a MyProto, '0.0.0.0', 0, reuse_port=True)
1418n/a
1419n/a self.assertRaises(ValueError, self.loop.run_until_complete, f)
1420n/a
1421n/a @patch_socket
1422n/a def test_create_server_cant_bind(self, m_socket):
1423n/a
1424n/a class Err(OSError):
1425n/a strerror = 'error'
1426n/a
1427n/a m_socket.getaddrinfo.return_value = [
1428n/a (2, 1, 6, '', ('127.0.0.1', 10100))]
1429n/a m_socket.getaddrinfo._is_coroutine = False
1430n/a m_sock = m_socket.socket.return_value = mock.Mock()
1431n/a m_sock.bind.side_effect = Err
1432n/a
1433n/a fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1434n/a self.assertRaises(OSError, self.loop.run_until_complete, fut)
1435n/a self.assertTrue(m_sock.close.called)
1436n/a
1437n/a @patch_socket
1438n/a def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1439n/a m_socket.getaddrinfo.return_value = []
1440n/a m_socket.getaddrinfo._is_coroutine = False
1441n/a
1442n/a coro = self.loop.create_datagram_endpoint(
1443n/a MyDatagramProto, local_addr=('localhost', 0))
1444n/a self.assertRaises(
1445n/a OSError, self.loop.run_until_complete, coro)
1446n/a
1447n/a def test_create_datagram_endpoint_addr_error(self):
1448n/a coro = self.loop.create_datagram_endpoint(
1449n/a MyDatagramProto, local_addr='localhost')
1450n/a self.assertRaises(
1451n/a AssertionError, self.loop.run_until_complete, coro)
1452n/a coro = self.loop.create_datagram_endpoint(
1453n/a MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1454n/a self.assertRaises(
1455n/a AssertionError, self.loop.run_until_complete, coro)
1456n/a
1457n/a def test_create_datagram_endpoint_connect_err(self):
1458n/a self.loop.sock_connect = mock.Mock()
1459n/a self.loop.sock_connect.side_effect = OSError
1460n/a
1461n/a coro = self.loop.create_datagram_endpoint(
1462n/a asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1463n/a self.assertRaises(
1464n/a OSError, self.loop.run_until_complete, coro)
1465n/a
1466n/a @patch_socket
1467n/a def test_create_datagram_endpoint_socket_err(self, m_socket):
1468n/a m_socket.getaddrinfo = socket.getaddrinfo
1469n/a m_socket.socket.side_effect = OSError
1470n/a
1471n/a coro = self.loop.create_datagram_endpoint(
1472n/a asyncio.DatagramProtocol, family=socket.AF_INET)
1473n/a self.assertRaises(
1474n/a OSError, self.loop.run_until_complete, coro)
1475n/a
1476n/a coro = self.loop.create_datagram_endpoint(
1477n/a asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1478n/a self.assertRaises(
1479n/a OSError, self.loop.run_until_complete, coro)
1480n/a
1481n/a @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1482n/a def test_create_datagram_endpoint_no_matching_family(self):
1483n/a coro = self.loop.create_datagram_endpoint(
1484n/a asyncio.DatagramProtocol,
1485n/a remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1486n/a self.assertRaises(
1487n/a ValueError, self.loop.run_until_complete, coro)
1488n/a
1489n/a @patch_socket
1490n/a def test_create_datagram_endpoint_setblk_err(self, m_socket):
1491n/a m_socket.socket.return_value.setblocking.side_effect = OSError
1492n/a
1493n/a coro = self.loop.create_datagram_endpoint(
1494n/a asyncio.DatagramProtocol, family=socket.AF_INET)
1495n/a self.assertRaises(
1496n/a OSError, self.loop.run_until_complete, coro)
1497n/a self.assertTrue(
1498n/a m_socket.socket.return_value.close.called)
1499n/a
1500n/a def test_create_datagram_endpoint_noaddr_nofamily(self):
1501n/a coro = self.loop.create_datagram_endpoint(
1502n/a asyncio.DatagramProtocol)
1503n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1504n/a
1505n/a @patch_socket
1506n/a def test_create_datagram_endpoint_cant_bind(self, m_socket):
1507n/a class Err(OSError):
1508n/a pass
1509n/a
1510n/a m_socket.getaddrinfo = socket.getaddrinfo
1511n/a m_sock = m_socket.socket.return_value = mock.Mock()
1512n/a m_sock.bind.side_effect = Err
1513n/a
1514n/a fut = self.loop.create_datagram_endpoint(
1515n/a MyDatagramProto,
1516n/a local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1517n/a self.assertRaises(Err, self.loop.run_until_complete, fut)
1518n/a self.assertTrue(m_sock.close.called)
1519n/a
1520n/a def test_create_datagram_endpoint_sock(self):
1521n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1522n/a sock.bind(('127.0.0.1', 0))
1523n/a fut = self.loop.create_datagram_endpoint(
1524n/a lambda: MyDatagramProto(create_future=True, loop=self.loop),
1525n/a sock=sock)
1526n/a transport, protocol = self.loop.run_until_complete(fut)
1527n/a transport.close()
1528n/a self.loop.run_until_complete(protocol.done)
1529n/a self.assertEqual('CLOSED', protocol.state)
1530n/a
1531n/a def test_create_datagram_endpoint_sock_sockopts(self):
1532n/a class FakeSock:
1533n/a type = socket.SOCK_DGRAM
1534n/a
1535n/a fut = self.loop.create_datagram_endpoint(
1536n/a MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1537n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1538n/a
1539n/a fut = self.loop.create_datagram_endpoint(
1540n/a MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1541n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1542n/a
1543n/a fut = self.loop.create_datagram_endpoint(
1544n/a MyDatagramProto, family=1, sock=FakeSock())
1545n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1546n/a
1547n/a fut = self.loop.create_datagram_endpoint(
1548n/a MyDatagramProto, proto=1, sock=FakeSock())
1549n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1550n/a
1551n/a fut = self.loop.create_datagram_endpoint(
1552n/a MyDatagramProto, flags=1, sock=FakeSock())
1553n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1554n/a
1555n/a fut = self.loop.create_datagram_endpoint(
1556n/a MyDatagramProto, reuse_address=True, sock=FakeSock())
1557n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1558n/a
1559n/a fut = self.loop.create_datagram_endpoint(
1560n/a MyDatagramProto, reuse_port=True, sock=FakeSock())
1561n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1562n/a
1563n/a fut = self.loop.create_datagram_endpoint(
1564n/a MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1565n/a self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1566n/a
1567n/a def test_create_datagram_endpoint_sockopts(self):
1568n/a # Socket options should not be applied unless asked for.
1569n/a # SO_REUSEADDR defaults to on for UNIX.
1570n/a # SO_REUSEPORT is not available on all platforms.
1571n/a
1572n/a coro = self.loop.create_datagram_endpoint(
1573n/a lambda: MyDatagramProto(create_future=True, loop=self.loop),
1574n/a local_addr=('127.0.0.1', 0))
1575n/a transport, protocol = self.loop.run_until_complete(coro)
1576n/a sock = transport.get_extra_info('socket')
1577n/a
1578n/a reuse_address_default_on = (
1579n/a os.name == 'posix' and sys.platform != 'cygwin')
1580n/a reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1581n/a
1582n/a if reuse_address_default_on:
1583n/a self.assertTrue(
1584n/a sock.getsockopt(
1585n/a socket.SOL_SOCKET, socket.SO_REUSEADDR))
1586n/a else:
1587n/a self.assertFalse(
1588n/a sock.getsockopt(
1589n/a socket.SOL_SOCKET, socket.SO_REUSEADDR))
1590n/a if reuseport_supported:
1591n/a self.assertFalse(
1592n/a sock.getsockopt(
1593n/a socket.SOL_SOCKET, socket.SO_REUSEPORT))
1594n/a self.assertFalse(
1595n/a sock.getsockopt(
1596n/a socket.SOL_SOCKET, socket.SO_BROADCAST))
1597n/a
1598n/a transport.close()
1599n/a self.loop.run_until_complete(protocol.done)
1600n/a self.assertEqual('CLOSED', protocol.state)
1601n/a
1602n/a coro = self.loop.create_datagram_endpoint(
1603n/a lambda: MyDatagramProto(create_future=True, loop=self.loop),
1604n/a local_addr=('127.0.0.1', 0),
1605n/a reuse_address=True,
1606n/a reuse_port=reuseport_supported,
1607n/a allow_broadcast=True)
1608n/a transport, protocol = self.loop.run_until_complete(coro)
1609n/a sock = transport.get_extra_info('socket')
1610n/a
1611n/a self.assertTrue(
1612n/a sock.getsockopt(
1613n/a socket.SOL_SOCKET, socket.SO_REUSEADDR))
1614n/a if reuseport_supported:
1615n/a self.assertTrue(
1616n/a sock.getsockopt(
1617n/a socket.SOL_SOCKET, socket.SO_REUSEPORT))
1618n/a self.assertTrue(
1619n/a sock.getsockopt(
1620n/a socket.SOL_SOCKET, socket.SO_BROADCAST))
1621n/a
1622n/a transport.close()
1623n/a self.loop.run_until_complete(protocol.done)
1624n/a self.assertEqual('CLOSED', protocol.state)
1625n/a
1626n/a @patch_socket
1627n/a def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1628n/a del m_socket.SO_REUSEPORT
1629n/a m_socket.socket.return_value = mock.Mock()
1630n/a
1631n/a coro = self.loop.create_datagram_endpoint(
1632n/a lambda: MyDatagramProto(loop=self.loop),
1633n/a local_addr=('127.0.0.1', 0),
1634n/a reuse_address=False,
1635n/a reuse_port=True)
1636n/a
1637n/a self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1638n/a
1639n/a @patch_socket
1640n/a def test_create_datagram_endpoint_ip_addr(self, m_socket):
1641n/a def getaddrinfo(*args, **kw):
1642n/a self.fail('should not have called getaddrinfo')
1643n/a
1644n/a m_socket.getaddrinfo = getaddrinfo
1645n/a m_socket.socket.return_value.bind = bind = mock.Mock()
1646n/a self.loop._add_reader = mock.Mock()
1647n/a self.loop._add_reader._is_coroutine = False
1648n/a
1649n/a reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1650n/a coro = self.loop.create_datagram_endpoint(
1651n/a lambda: MyDatagramProto(loop=self.loop),
1652n/a local_addr=('1.2.3.4', 0),
1653n/a reuse_address=False,
1654n/a reuse_port=reuseport_supported)
1655n/a
1656n/a t, p = self.loop.run_until_complete(coro)
1657n/a try:
1658n/a bind.assert_called_with(('1.2.3.4', 0))
1659n/a m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1660n/a proto=m_socket.IPPROTO_UDP,
1661n/a type=m_socket.SOCK_DGRAM)
1662n/a finally:
1663n/a t.close()
1664n/a test_utils.run_briefly(self.loop) # allow transport to close
1665n/a
1666n/a def test_accept_connection_retry(self):
1667n/a sock = mock.Mock()
1668n/a sock.accept.side_effect = BlockingIOError()
1669n/a
1670n/a self.loop._accept_connection(MyProto, sock)
1671n/a self.assertFalse(sock.close.called)
1672n/a
1673n/a @mock.patch('asyncio.base_events.logger')
1674n/a def test_accept_connection_exception(self, m_log):
1675n/a sock = mock.Mock()
1676n/a sock.fileno.return_value = 10
1677n/a sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1678n/a self.loop._remove_reader = mock.Mock()
1679n/a self.loop.call_later = mock.Mock()
1680n/a
1681n/a self.loop._accept_connection(MyProto, sock)
1682n/a self.assertTrue(m_log.error.called)
1683n/a self.assertFalse(sock.close.called)
1684n/a self.loop._remove_reader.assert_called_with(10)
1685n/a self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
1686n/a # self.loop._start_serving
1687n/a mock.ANY,
1688n/a MyProto, sock, None, None, mock.ANY)
1689n/a
1690n/a def test_call_coroutine(self):
1691n/a @asyncio.coroutine
1692n/a def simple_coroutine():
1693n/a pass
1694n/a
1695n/a self.loop.set_debug(True)
1696n/a coro_func = simple_coroutine
1697n/a coro_obj = coro_func()
1698n/a self.addCleanup(coro_obj.close)
1699n/a for func in (coro_func, coro_obj):
1700n/a with self.assertRaises(TypeError):
1701n/a self.loop.call_soon(func)
1702n/a with self.assertRaises(TypeError):
1703n/a self.loop.call_soon_threadsafe(func)
1704n/a with self.assertRaises(TypeError):
1705n/a self.loop.call_later(60, func)
1706n/a with self.assertRaises(TypeError):
1707n/a self.loop.call_at(self.loop.time() + 60, func)
1708n/a with self.assertRaises(TypeError):
1709n/a self.loop.run_in_executor(None, func)
1710n/a
1711n/a @mock.patch('asyncio.base_events.logger')
1712n/a def test_log_slow_callbacks(self, m_logger):
1713n/a def stop_loop_cb(loop):
1714n/a loop.stop()
1715n/a
1716n/a @asyncio.coroutine
1717n/a def stop_loop_coro(loop):
1718n/a yield from ()
1719n/a loop.stop()
1720n/a
1721n/a asyncio.set_event_loop(self.loop)
1722n/a self.loop.set_debug(True)
1723n/a self.loop.slow_callback_duration = 0.0
1724n/a
1725n/a # slow callback
1726n/a self.loop.call_soon(stop_loop_cb, self.loop)
1727n/a self.loop.run_forever()
1728n/a fmt, *args = m_logger.warning.call_args[0]
1729n/a self.assertRegex(fmt % tuple(args),
1730n/a "^Executing <Handle.*stop_loop_cb.*> "
1731n/a "took .* seconds$")
1732n/a
1733n/a # slow task
1734n/a asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1735n/a self.loop.run_forever()
1736n/a fmt, *args = m_logger.warning.call_args[0]
1737n/a self.assertRegex(fmt % tuple(args),
1738n/a "^Executing <Task.*stop_loop_coro.*> "
1739n/a "took .* seconds$")
1740n/a
1741n/a
1742n/aclass RunningLoopTests(unittest.TestCase):
1743n/a
1744n/a def test_running_loop_within_a_loop(self):
1745n/a @asyncio.coroutine
1746n/a def runner(loop):
1747n/a loop.run_forever()
1748n/a
1749n/a loop = asyncio.new_event_loop()
1750n/a outer_loop = asyncio.new_event_loop()
1751n/a try:
1752n/a with self.assertRaisesRegex(RuntimeError,
1753n/a 'while another loop is running'):
1754n/a outer_loop.run_until_complete(runner(loop))
1755n/a finally:
1756n/a loop.close()
1757n/a outer_loop.close()
1758n/a
1759n/a
1760n/aif __name__ == '__main__':
1761n/a unittest.main()