ยปCore Development>Code coverage>Lib/test/test_asyncio/test_selector_events.py

Python code coverage for Lib/test/test_asyncio/test_selector_events.py

#countcontent
1n/a"""Tests for selector_events.py"""
2n/a
3n/aimport errno
4n/aimport socket
5n/aimport unittest
6n/afrom unittest import mock
7n/atry:
8n/a import ssl
9n/aexcept ImportError:
10n/a ssl = None
11n/a
12n/aimport asyncio
13n/afrom asyncio import selectors
14n/afrom asyncio import test_utils
15n/afrom asyncio.selector_events import BaseSelectorEventLoop
16n/afrom asyncio.selector_events import _SelectorTransport
17n/afrom asyncio.selector_events import _SelectorSslTransport
18n/afrom asyncio.selector_events import _SelectorSocketTransport
19n/afrom asyncio.selector_events import _SelectorDatagramTransport
20n/a
21n/a
22n/aMOCK_ANY = mock.ANY
23n/a
24n/a
25n/aclass TestBaseSelectorEventLoop(BaseSelectorEventLoop):
26n/a
27n/a def close(self):
28n/a # Don't call the close() method of the parent class, because the
29n/a # selector is mocked
30n/a self._closed = True
31n/a
32n/a def _make_self_pipe(self):
33n/a self._ssock = mock.Mock()
34n/a self._csock = mock.Mock()
35n/a self._internal_fds += 1
36n/a
37n/a
38n/adef list_to_buffer(l=()):
39n/a return bytearray().join(l)
40n/a
41n/a
42n/adef close_transport(transport):
43n/a # Don't call transport.close() because the event loop and the selector
44n/a # are mocked
45n/a if transport._sock is None:
46n/a return
47n/a transport._sock.close()
48n/a transport._sock = None
49n/a
50n/a
51n/aclass BaseSelectorEventLoopTests(test_utils.TestCase):
52n/a
53n/a def setUp(self):
54n/a super().setUp()
55n/a self.selector = mock.Mock()
56n/a self.selector.select.return_value = []
57n/a self.loop = TestBaseSelectorEventLoop(self.selector)
58n/a self.set_event_loop(self.loop)
59n/a
60n/a def test_make_socket_transport(self):
61n/a m = mock.Mock()
62n/a self.loop.add_reader = mock.Mock()
63n/a self.loop.add_reader._is_coroutine = False
64n/a transport = self.loop._make_socket_transport(m, asyncio.Protocol())
65n/a self.assertIsInstance(transport, _SelectorSocketTransport)
66n/a
67n/a # Calling repr() must not fail when the event loop is closed
68n/a self.loop.close()
69n/a repr(transport)
70n/a
71n/a close_transport(transport)
72n/a
73n/a @unittest.skipIf(ssl is None, 'No ssl module')
74n/a def test_make_ssl_transport(self):
75n/a m = mock.Mock()
76n/a self.loop._add_reader = mock.Mock()
77n/a self.loop._add_reader._is_coroutine = False
78n/a self.loop._add_writer = mock.Mock()
79n/a self.loop._remove_reader = mock.Mock()
80n/a self.loop._remove_writer = mock.Mock()
81n/a waiter = asyncio.Future(loop=self.loop)
82n/a with test_utils.disable_logger():
83n/a transport = self.loop._make_ssl_transport(
84n/a m, asyncio.Protocol(), m, waiter)
85n/a # execute the handshake while the logger is disabled
86n/a # to ignore SSL handshake failure
87n/a test_utils.run_briefly(self.loop)
88n/a
89n/a # Sanity check
90n/a class_name = transport.__class__.__name__
91n/a self.assertIn("ssl", class_name.lower())
92n/a self.assertIn("transport", class_name.lower())
93n/a
94n/a transport.close()
95n/a # execute pending callbacks to close the socket transport
96n/a test_utils.run_briefly(self.loop)
97n/a
98n/a @mock.patch('asyncio.selector_events.ssl', None)
99n/a @mock.patch('asyncio.sslproto.ssl', None)
100n/a def test_make_ssl_transport_without_ssl_error(self):
101n/a m = mock.Mock()
102n/a self.loop.add_reader = mock.Mock()
103n/a self.loop.add_writer = mock.Mock()
104n/a self.loop.remove_reader = mock.Mock()
105n/a self.loop.remove_writer = mock.Mock()
106n/a with self.assertRaises(RuntimeError):
107n/a self.loop._make_ssl_transport(m, m, m, m)
108n/a
109n/a def test_close(self):
110n/a class EventLoop(BaseSelectorEventLoop):
111n/a def _make_self_pipe(self):
112n/a self._ssock = mock.Mock()
113n/a self._csock = mock.Mock()
114n/a self._internal_fds += 1
115n/a
116n/a self.loop = EventLoop(self.selector)
117n/a self.set_event_loop(self.loop)
118n/a
119n/a ssock = self.loop._ssock
120n/a ssock.fileno.return_value = 7
121n/a csock = self.loop._csock
122n/a csock.fileno.return_value = 1
123n/a remove_reader = self.loop._remove_reader = mock.Mock()
124n/a
125n/a self.loop._selector.close()
126n/a self.loop._selector = selector = mock.Mock()
127n/a self.assertFalse(self.loop.is_closed())
128n/a
129n/a self.loop.close()
130n/a self.assertTrue(self.loop.is_closed())
131n/a self.assertIsNone(self.loop._selector)
132n/a self.assertIsNone(self.loop._csock)
133n/a self.assertIsNone(self.loop._ssock)
134n/a selector.close.assert_called_with()
135n/a ssock.close.assert_called_with()
136n/a csock.close.assert_called_with()
137n/a remove_reader.assert_called_with(7)
138n/a
139n/a # it should be possible to call close() more than once
140n/a self.loop.close()
141n/a self.loop.close()
142n/a
143n/a # operation blocked when the loop is closed
144n/a f = asyncio.Future(loop=self.loop)
145n/a self.assertRaises(RuntimeError, self.loop.run_forever)
146n/a self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
147n/a fd = 0
148n/a def callback():
149n/a pass
150n/a self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
151n/a self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
152n/a
153n/a def test_close_no_selector(self):
154n/a self.loop.remove_reader = mock.Mock()
155n/a self.loop._selector.close()
156n/a self.loop._selector = None
157n/a self.loop.close()
158n/a self.assertIsNone(self.loop._selector)
159n/a
160n/a def test_socketpair(self):
161n/a self.assertRaises(NotImplementedError, self.loop._socketpair)
162n/a
163n/a def test_read_from_self_tryagain(self):
164n/a self.loop._ssock.recv.side_effect = BlockingIOError
165n/a self.assertIsNone(self.loop._read_from_self())
166n/a
167n/a def test_read_from_self_exception(self):
168n/a self.loop._ssock.recv.side_effect = OSError
169n/a self.assertRaises(OSError, self.loop._read_from_self)
170n/a
171n/a def test_write_to_self_tryagain(self):
172n/a self.loop._csock.send.side_effect = BlockingIOError
173n/a with test_utils.disable_logger():
174n/a self.assertIsNone(self.loop._write_to_self())
175n/a
176n/a def test_write_to_self_exception(self):
177n/a # _write_to_self() swallows OSError
178n/a self.loop._csock.send.side_effect = RuntimeError()
179n/a self.assertRaises(RuntimeError, self.loop._write_to_self)
180n/a
181n/a def test_sock_recv(self):
182n/a sock = test_utils.mock_nonblocking_socket()
183n/a self.loop._sock_recv = mock.Mock()
184n/a
185n/a f = self.loop.sock_recv(sock, 1024)
186n/a self.assertIsInstance(f, asyncio.Future)
187n/a self.loop._sock_recv.assert_called_with(f, False, sock, 1024)
188n/a
189n/a def test__sock_recv_canceled_fut(self):
190n/a sock = mock.Mock()
191n/a
192n/a f = asyncio.Future(loop=self.loop)
193n/a f.cancel()
194n/a
195n/a self.loop._sock_recv(f, False, sock, 1024)
196n/a self.assertFalse(sock.recv.called)
197n/a
198n/a def test__sock_recv_unregister(self):
199n/a sock = mock.Mock()
200n/a sock.fileno.return_value = 10
201n/a
202n/a f = asyncio.Future(loop=self.loop)
203n/a f.cancel()
204n/a
205n/a self.loop.remove_reader = mock.Mock()
206n/a self.loop._sock_recv(f, True, sock, 1024)
207n/a self.assertEqual((10,), self.loop.remove_reader.call_args[0])
208n/a
209n/a def test__sock_recv_tryagain(self):
210n/a f = asyncio.Future(loop=self.loop)
211n/a sock = mock.Mock()
212n/a sock.fileno.return_value = 10
213n/a sock.recv.side_effect = BlockingIOError
214n/a
215n/a self.loop.add_reader = mock.Mock()
216n/a self.loop._sock_recv(f, False, sock, 1024)
217n/a self.assertEqual((10, self.loop._sock_recv, f, True, sock, 1024),
218n/a self.loop.add_reader.call_args[0])
219n/a
220n/a def test__sock_recv_exception(self):
221n/a f = asyncio.Future(loop=self.loop)
222n/a sock = mock.Mock()
223n/a sock.fileno.return_value = 10
224n/a err = sock.recv.side_effect = OSError()
225n/a
226n/a self.loop._sock_recv(f, False, sock, 1024)
227n/a self.assertIs(err, f.exception())
228n/a
229n/a def test_sock_sendall(self):
230n/a sock = test_utils.mock_nonblocking_socket()
231n/a self.loop._sock_sendall = mock.Mock()
232n/a
233n/a f = self.loop.sock_sendall(sock, b'data')
234n/a self.assertIsInstance(f, asyncio.Future)
235n/a self.assertEqual(
236n/a (f, False, sock, b'data'),
237n/a self.loop._sock_sendall.call_args[0])
238n/a
239n/a def test_sock_sendall_nodata(self):
240n/a sock = test_utils.mock_nonblocking_socket()
241n/a self.loop._sock_sendall = mock.Mock()
242n/a
243n/a f = self.loop.sock_sendall(sock, b'')
244n/a self.assertIsInstance(f, asyncio.Future)
245n/a self.assertTrue(f.done())
246n/a self.assertIsNone(f.result())
247n/a self.assertFalse(self.loop._sock_sendall.called)
248n/a
249n/a def test__sock_sendall_canceled_fut(self):
250n/a sock = mock.Mock()
251n/a
252n/a f = asyncio.Future(loop=self.loop)
253n/a f.cancel()
254n/a
255n/a self.loop._sock_sendall(f, False, sock, b'data')
256n/a self.assertFalse(sock.send.called)
257n/a
258n/a def test__sock_sendall_unregister(self):
259n/a sock = mock.Mock()
260n/a sock.fileno.return_value = 10
261n/a
262n/a f = asyncio.Future(loop=self.loop)
263n/a f.cancel()
264n/a
265n/a self.loop.remove_writer = mock.Mock()
266n/a self.loop._sock_sendall(f, True, sock, b'data')
267n/a self.assertEqual((10,), self.loop.remove_writer.call_args[0])
268n/a
269n/a def test__sock_sendall_tryagain(self):
270n/a f = asyncio.Future(loop=self.loop)
271n/a sock = mock.Mock()
272n/a sock.fileno.return_value = 10
273n/a sock.send.side_effect = BlockingIOError
274n/a
275n/a self.loop.add_writer = mock.Mock()
276n/a self.loop._sock_sendall(f, False, sock, b'data')
277n/a self.assertEqual(
278n/a (10, self.loop._sock_sendall, f, True, sock, b'data'),
279n/a self.loop.add_writer.call_args[0])
280n/a
281n/a def test__sock_sendall_interrupted(self):
282n/a f = asyncio.Future(loop=self.loop)
283n/a sock = mock.Mock()
284n/a sock.fileno.return_value = 10
285n/a sock.send.side_effect = InterruptedError
286n/a
287n/a self.loop.add_writer = mock.Mock()
288n/a self.loop._sock_sendall(f, False, sock, b'data')
289n/a self.assertEqual(
290n/a (10, self.loop._sock_sendall, f, True, sock, b'data'),
291n/a self.loop.add_writer.call_args[0])
292n/a
293n/a def test__sock_sendall_exception(self):
294n/a f = asyncio.Future(loop=self.loop)
295n/a sock = mock.Mock()
296n/a sock.fileno.return_value = 10
297n/a err = sock.send.side_effect = OSError()
298n/a
299n/a self.loop._sock_sendall(f, False, sock, b'data')
300n/a self.assertIs(f.exception(), err)
301n/a
302n/a def test__sock_sendall(self):
303n/a sock = mock.Mock()
304n/a
305n/a f = asyncio.Future(loop=self.loop)
306n/a sock.fileno.return_value = 10
307n/a sock.send.return_value = 4
308n/a
309n/a self.loop._sock_sendall(f, False, sock, b'data')
310n/a self.assertTrue(f.done())
311n/a self.assertIsNone(f.result())
312n/a
313n/a def test__sock_sendall_partial(self):
314n/a sock = mock.Mock()
315n/a
316n/a f = asyncio.Future(loop=self.loop)
317n/a sock.fileno.return_value = 10
318n/a sock.send.return_value = 2
319n/a
320n/a self.loop.add_writer = mock.Mock()
321n/a self.loop._sock_sendall(f, False, sock, b'data')
322n/a self.assertFalse(f.done())
323n/a self.assertEqual(
324n/a (10, self.loop._sock_sendall, f, True, sock, b'ta'),
325n/a self.loop.add_writer.call_args[0])
326n/a
327n/a def test__sock_sendall_none(self):
328n/a sock = mock.Mock()
329n/a
330n/a f = asyncio.Future(loop=self.loop)
331n/a sock.fileno.return_value = 10
332n/a sock.send.return_value = 0
333n/a
334n/a self.loop.add_writer = mock.Mock()
335n/a self.loop._sock_sendall(f, False, sock, b'data')
336n/a self.assertFalse(f.done())
337n/a self.assertEqual(
338n/a (10, self.loop._sock_sendall, f, True, sock, b'data'),
339n/a self.loop.add_writer.call_args[0])
340n/a
341n/a def test_sock_connect_timeout(self):
342n/a # asyncio issue #205: sock_connect() must unregister the socket on
343n/a # timeout error
344n/a
345n/a # prepare mocks
346n/a self.loop.add_writer = mock.Mock()
347n/a self.loop.remove_writer = mock.Mock()
348n/a sock = test_utils.mock_nonblocking_socket()
349n/a sock.connect.side_effect = BlockingIOError
350n/a
351n/a # first call to sock_connect() registers the socket
352n/a fut = self.loop.create_task(
353n/a self.loop.sock_connect(sock, ('127.0.0.1', 80)))
354n/a self.loop._run_once()
355n/a self.assertTrue(sock.connect.called)
356n/a self.assertTrue(self.loop.add_writer.called)
357n/a
358n/a # on timeout, the socket must be unregistered
359n/a sock.connect.reset_mock()
360n/a fut.cancel()
361n/a with self.assertRaises(asyncio.CancelledError):
362n/a self.loop.run_until_complete(fut)
363n/a self.assertTrue(self.loop.remove_writer.called)
364n/a
365n/a @mock.patch('socket.getaddrinfo')
366n/a def test_sock_connect_resolve_using_socket_params(self, m_gai):
367n/a addr = ('need-resolution.com', 8080)
368n/a sock = test_utils.mock_nonblocking_socket()
369n/a m_gai.side_effect = (None, None, None, None, ('127.0.0.1', 0))
370n/a m_gai._is_coroutine = False
371n/a con = self.loop.create_task(self.loop.sock_connect(sock, addr))
372n/a while not m_gai.called:
373n/a self.loop._run_once()
374n/a m_gai.assert_called_with(
375n/a addr[0], addr[1], sock.family, sock.type, sock.proto, 0)
376n/a
377n/a con.cancel()
378n/a with self.assertRaises(asyncio.CancelledError):
379n/a self.loop.run_until_complete(con)
380n/a
381n/a def test__sock_connect(self):
382n/a f = asyncio.Future(loop=self.loop)
383n/a
384n/a sock = mock.Mock()
385n/a sock.fileno.return_value = 10
386n/a
387n/a resolved = self.loop.create_future()
388n/a resolved.set_result([(socket.AF_INET, socket.SOCK_STREAM,
389n/a socket.IPPROTO_TCP, '', ('127.0.0.1', 8080))])
390n/a self.loop._sock_connect(f, sock, resolved)
391n/a self.assertTrue(f.done())
392n/a self.assertIsNone(f.result())
393n/a self.assertTrue(sock.connect.called)
394n/a
395n/a def test__sock_connect_cb_cancelled_fut(self):
396n/a sock = mock.Mock()
397n/a self.loop.remove_writer = mock.Mock()
398n/a
399n/a f = asyncio.Future(loop=self.loop)
400n/a f.cancel()
401n/a
402n/a self.loop._sock_connect_cb(f, sock, ('127.0.0.1', 8080))
403n/a self.assertFalse(sock.getsockopt.called)
404n/a
405n/a def test__sock_connect_writer(self):
406n/a # check that the fd is registered and then unregistered
407n/a self.loop._process_events = mock.Mock()
408n/a self.loop.add_writer = mock.Mock()
409n/a self.loop.remove_writer = mock.Mock()
410n/a
411n/a sock = mock.Mock()
412n/a sock.fileno.return_value = 10
413n/a sock.connect.side_effect = BlockingIOError
414n/a sock.getsockopt.return_value = 0
415n/a address = ('127.0.0.1', 8080)
416n/a resolved = self.loop.create_future()
417n/a resolved.set_result([(socket.AF_INET, socket.SOCK_STREAM,
418n/a socket.IPPROTO_TCP, '', address)])
419n/a
420n/a f = asyncio.Future(loop=self.loop)
421n/a self.loop._sock_connect(f, sock, resolved)
422n/a self.loop._run_once()
423n/a self.assertTrue(self.loop.add_writer.called)
424n/a self.assertEqual(10, self.loop.add_writer.call_args[0][0])
425n/a
426n/a self.loop._sock_connect_cb(f, sock, address)
427n/a # need to run the event loop to execute _sock_connect_done() callback
428n/a self.loop.run_until_complete(f)
429n/a self.assertEqual((10,), self.loop.remove_writer.call_args[0])
430n/a
431n/a def test__sock_connect_cb_tryagain(self):
432n/a f = asyncio.Future(loop=self.loop)
433n/a sock = mock.Mock()
434n/a sock.fileno.return_value = 10
435n/a sock.getsockopt.return_value = errno.EAGAIN
436n/a
437n/a # check that the exception is handled
438n/a self.loop._sock_connect_cb(f, sock, ('127.0.0.1', 8080))
439n/a
440n/a def test__sock_connect_cb_exception(self):
441n/a f = asyncio.Future(loop=self.loop)
442n/a sock = mock.Mock()
443n/a sock.fileno.return_value = 10
444n/a sock.getsockopt.return_value = errno.ENOTCONN
445n/a
446n/a self.loop.remove_writer = mock.Mock()
447n/a self.loop._sock_connect_cb(f, sock, ('127.0.0.1', 8080))
448n/a self.assertIsInstance(f.exception(), OSError)
449n/a
450n/a def test_sock_accept(self):
451n/a sock = test_utils.mock_nonblocking_socket()
452n/a self.loop._sock_accept = mock.Mock()
453n/a
454n/a f = self.loop.sock_accept(sock)
455n/a self.assertIsInstance(f, asyncio.Future)
456n/a self.assertEqual(
457n/a (f, False, sock), self.loop._sock_accept.call_args[0])
458n/a
459n/a def test__sock_accept(self):
460n/a f = asyncio.Future(loop=self.loop)
461n/a
462n/a conn = mock.Mock()
463n/a
464n/a sock = mock.Mock()
465n/a sock.fileno.return_value = 10
466n/a sock.accept.return_value = conn, ('127.0.0.1', 1000)
467n/a
468n/a self.loop._sock_accept(f, False, sock)
469n/a self.assertTrue(f.done())
470n/a self.assertEqual((conn, ('127.0.0.1', 1000)), f.result())
471n/a self.assertEqual((False,), conn.setblocking.call_args[0])
472n/a
473n/a def test__sock_accept_canceled_fut(self):
474n/a sock = mock.Mock()
475n/a
476n/a f = asyncio.Future(loop=self.loop)
477n/a f.cancel()
478n/a
479n/a self.loop._sock_accept(f, False, sock)
480n/a self.assertFalse(sock.accept.called)
481n/a
482n/a def test__sock_accept_unregister(self):
483n/a sock = mock.Mock()
484n/a sock.fileno.return_value = 10
485n/a
486n/a f = asyncio.Future(loop=self.loop)
487n/a f.cancel()
488n/a
489n/a self.loop.remove_reader = mock.Mock()
490n/a self.loop._sock_accept(f, True, sock)
491n/a self.assertEqual((10,), self.loop.remove_reader.call_args[0])
492n/a
493n/a def test__sock_accept_tryagain(self):
494n/a f = asyncio.Future(loop=self.loop)
495n/a sock = mock.Mock()
496n/a sock.fileno.return_value = 10
497n/a sock.accept.side_effect = BlockingIOError
498n/a
499n/a self.loop.add_reader = mock.Mock()
500n/a self.loop._sock_accept(f, False, sock)
501n/a self.assertEqual(
502n/a (10, self.loop._sock_accept, f, True, sock),
503n/a self.loop.add_reader.call_args[0])
504n/a
505n/a def test__sock_accept_exception(self):
506n/a f = asyncio.Future(loop=self.loop)
507n/a sock = mock.Mock()
508n/a sock.fileno.return_value = 10
509n/a err = sock.accept.side_effect = OSError()
510n/a
511n/a self.loop._sock_accept(f, False, sock)
512n/a self.assertIs(err, f.exception())
513n/a
514n/a def test_add_reader(self):
515n/a self.loop._selector.get_key.side_effect = KeyError
516n/a cb = lambda: True
517n/a self.loop.add_reader(1, cb)
518n/a
519n/a self.assertTrue(self.loop._selector.register.called)
520n/a fd, mask, (r, w) = self.loop._selector.register.call_args[0]
521n/a self.assertEqual(1, fd)
522n/a self.assertEqual(selectors.EVENT_READ, mask)
523n/a self.assertEqual(cb, r._callback)
524n/a self.assertIsNone(w)
525n/a
526n/a def test_add_reader_existing(self):
527n/a reader = mock.Mock()
528n/a writer = mock.Mock()
529n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
530n/a 1, 1, selectors.EVENT_WRITE, (reader, writer))
531n/a cb = lambda: True
532n/a self.loop.add_reader(1, cb)
533n/a
534n/a self.assertTrue(reader.cancel.called)
535n/a self.assertFalse(self.loop._selector.register.called)
536n/a self.assertTrue(self.loop._selector.modify.called)
537n/a fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
538n/a self.assertEqual(1, fd)
539n/a self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
540n/a self.assertEqual(cb, r._callback)
541n/a self.assertEqual(writer, w)
542n/a
543n/a def test_add_reader_existing_writer(self):
544n/a writer = mock.Mock()
545n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
546n/a 1, 1, selectors.EVENT_WRITE, (None, writer))
547n/a cb = lambda: True
548n/a self.loop.add_reader(1, cb)
549n/a
550n/a self.assertFalse(self.loop._selector.register.called)
551n/a self.assertTrue(self.loop._selector.modify.called)
552n/a fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
553n/a self.assertEqual(1, fd)
554n/a self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
555n/a self.assertEqual(cb, r._callback)
556n/a self.assertEqual(writer, w)
557n/a
558n/a def test_remove_reader(self):
559n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
560n/a 1, 1, selectors.EVENT_READ, (None, None))
561n/a self.assertFalse(self.loop.remove_reader(1))
562n/a
563n/a self.assertTrue(self.loop._selector.unregister.called)
564n/a
565n/a def test_remove_reader_read_write(self):
566n/a reader = mock.Mock()
567n/a writer = mock.Mock()
568n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
569n/a 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
570n/a (reader, writer))
571n/a self.assertTrue(
572n/a self.loop.remove_reader(1))
573n/a
574n/a self.assertFalse(self.loop._selector.unregister.called)
575n/a self.assertEqual(
576n/a (1, selectors.EVENT_WRITE, (None, writer)),
577n/a self.loop._selector.modify.call_args[0])
578n/a
579n/a def test_remove_reader_unknown(self):
580n/a self.loop._selector.get_key.side_effect = KeyError
581n/a self.assertFalse(
582n/a self.loop.remove_reader(1))
583n/a
584n/a def test_add_writer(self):
585n/a self.loop._selector.get_key.side_effect = KeyError
586n/a cb = lambda: True
587n/a self.loop.add_writer(1, cb)
588n/a
589n/a self.assertTrue(self.loop._selector.register.called)
590n/a fd, mask, (r, w) = self.loop._selector.register.call_args[0]
591n/a self.assertEqual(1, fd)
592n/a self.assertEqual(selectors.EVENT_WRITE, mask)
593n/a self.assertIsNone(r)
594n/a self.assertEqual(cb, w._callback)
595n/a
596n/a def test_add_writer_existing(self):
597n/a reader = mock.Mock()
598n/a writer = mock.Mock()
599n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
600n/a 1, 1, selectors.EVENT_READ, (reader, writer))
601n/a cb = lambda: True
602n/a self.loop.add_writer(1, cb)
603n/a
604n/a self.assertTrue(writer.cancel.called)
605n/a self.assertFalse(self.loop._selector.register.called)
606n/a self.assertTrue(self.loop._selector.modify.called)
607n/a fd, mask, (r, w) = self.loop._selector.modify.call_args[0]
608n/a self.assertEqual(1, fd)
609n/a self.assertEqual(selectors.EVENT_WRITE | selectors.EVENT_READ, mask)
610n/a self.assertEqual(reader, r)
611n/a self.assertEqual(cb, w._callback)
612n/a
613n/a def test_remove_writer(self):
614n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
615n/a 1, 1, selectors.EVENT_WRITE, (None, None))
616n/a self.assertFalse(self.loop.remove_writer(1))
617n/a
618n/a self.assertTrue(self.loop._selector.unregister.called)
619n/a
620n/a def test_remove_writer_read_write(self):
621n/a reader = mock.Mock()
622n/a writer = mock.Mock()
623n/a self.loop._selector.get_key.return_value = selectors.SelectorKey(
624n/a 1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
625n/a (reader, writer))
626n/a self.assertTrue(
627n/a self.loop.remove_writer(1))
628n/a
629n/a self.assertFalse(self.loop._selector.unregister.called)
630n/a self.assertEqual(
631n/a (1, selectors.EVENT_READ, (reader, None)),
632n/a self.loop._selector.modify.call_args[0])
633n/a
634n/a def test_remove_writer_unknown(self):
635n/a self.loop._selector.get_key.side_effect = KeyError
636n/a self.assertFalse(
637n/a self.loop.remove_writer(1))
638n/a
639n/a def test_process_events_read(self):
640n/a reader = mock.Mock()
641n/a reader._cancelled = False
642n/a
643n/a self.loop._add_callback = mock.Mock()
644n/a self.loop._process_events(
645n/a [(selectors.SelectorKey(
646n/a 1, 1, selectors.EVENT_READ, (reader, None)),
647n/a selectors.EVENT_READ)])
648n/a self.assertTrue(self.loop._add_callback.called)
649n/a self.loop._add_callback.assert_called_with(reader)
650n/a
651n/a def test_process_events_read_cancelled(self):
652n/a reader = mock.Mock()
653n/a reader.cancelled = True
654n/a
655n/a self.loop._remove_reader = mock.Mock()
656n/a self.loop._process_events(
657n/a [(selectors.SelectorKey(
658n/a 1, 1, selectors.EVENT_READ, (reader, None)),
659n/a selectors.EVENT_READ)])
660n/a self.loop._remove_reader.assert_called_with(1)
661n/a
662n/a def test_process_events_write(self):
663n/a writer = mock.Mock()
664n/a writer._cancelled = False
665n/a
666n/a self.loop._add_callback = mock.Mock()
667n/a self.loop._process_events(
668n/a [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
669n/a (None, writer)),
670n/a selectors.EVENT_WRITE)])
671n/a self.loop._add_callback.assert_called_with(writer)
672n/a
673n/a def test_process_events_write_cancelled(self):
674n/a writer = mock.Mock()
675n/a writer.cancelled = True
676n/a self.loop._remove_writer = mock.Mock()
677n/a
678n/a self.loop._process_events(
679n/a [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
680n/a (None, writer)),
681n/a selectors.EVENT_WRITE)])
682n/a self.loop._remove_writer.assert_called_with(1)
683n/a
684n/a def test_accept_connection_multiple(self):
685n/a sock = mock.Mock()
686n/a sock.accept.return_value = (mock.Mock(), mock.Mock())
687n/a backlog = 100
688n/a # Mock the coroutine generation for a connection to prevent
689n/a # warnings related to un-awaited coroutines.
690n/a mock_obj = mock.patch.object
691n/a with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
692n/a accept2_mock.return_value = None
693n/a with mock_obj(self.loop, 'create_task') as task_mock:
694n/a task_mock.return_value = None
695n/a self.loop._accept_connection(mock.Mock(), sock, backlog=backlog)
696n/a self.assertEqual(sock.accept.call_count, backlog)
697n/a
698n/a
699n/aclass SelectorTransportTests(test_utils.TestCase):
700n/a
701n/a def setUp(self):
702n/a super().setUp()
703n/a self.loop = self.new_test_loop()
704n/a self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
705n/a self.sock = mock.Mock(socket.socket)
706n/a self.sock.fileno.return_value = 7
707n/a
708n/a def create_transport(self):
709n/a transport = _SelectorTransport(self.loop, self.sock, self.protocol,
710n/a None)
711n/a self.addCleanup(close_transport, transport)
712n/a return transport
713n/a
714n/a def test_ctor(self):
715n/a tr = self.create_transport()
716n/a self.assertIs(tr._loop, self.loop)
717n/a self.assertIs(tr._sock, self.sock)
718n/a self.assertIs(tr._sock_fd, 7)
719n/a
720n/a def test_abort(self):
721n/a tr = self.create_transport()
722n/a tr._force_close = mock.Mock()
723n/a
724n/a tr.abort()
725n/a tr._force_close.assert_called_with(None)
726n/a
727n/a def test_close(self):
728n/a tr = self.create_transport()
729n/a tr.close()
730n/a
731n/a self.assertTrue(tr.is_closing())
732n/a self.assertEqual(1, self.loop.remove_reader_count[7])
733n/a self.protocol.connection_lost(None)
734n/a self.assertEqual(tr._conn_lost, 1)
735n/a
736n/a tr.close()
737n/a self.assertEqual(tr._conn_lost, 1)
738n/a self.assertEqual(1, self.loop.remove_reader_count[7])
739n/a
740n/a def test_close_write_buffer(self):
741n/a tr = self.create_transport()
742n/a tr._buffer.extend(b'data')
743n/a tr.close()
744n/a
745n/a self.assertFalse(self.loop.readers)
746n/a test_utils.run_briefly(self.loop)
747n/a self.assertFalse(self.protocol.connection_lost.called)
748n/a
749n/a def test_force_close(self):
750n/a tr = self.create_transport()
751n/a tr._buffer.extend(b'1')
752n/a self.loop._add_reader(7, mock.sentinel)
753n/a self.loop._add_writer(7, mock.sentinel)
754n/a tr._force_close(None)
755n/a
756n/a self.assertTrue(tr.is_closing())
757n/a self.assertEqual(tr._buffer, list_to_buffer())
758n/a self.assertFalse(self.loop.readers)
759n/a self.assertFalse(self.loop.writers)
760n/a
761n/a # second close should not remove reader
762n/a tr._force_close(None)
763n/a self.assertFalse(self.loop.readers)
764n/a self.assertEqual(1, self.loop.remove_reader_count[7])
765n/a
766n/a @mock.patch('asyncio.log.logger.error')
767n/a def test_fatal_error(self, m_exc):
768n/a exc = OSError()
769n/a tr = self.create_transport()
770n/a tr._force_close = mock.Mock()
771n/a tr._fatal_error(exc)
772n/a
773n/a m_exc.assert_called_with(
774n/a test_utils.MockPattern(
775n/a 'Fatal error on transport\nprotocol:.*\ntransport:.*'),
776n/a exc_info=(OSError, MOCK_ANY, MOCK_ANY))
777n/a
778n/a tr._force_close.assert_called_with(exc)
779n/a
780n/a def test_connection_lost(self):
781n/a exc = OSError()
782n/a tr = self.create_transport()
783n/a self.assertIsNotNone(tr._protocol)
784n/a self.assertIsNotNone(tr._loop)
785n/a tr._call_connection_lost(exc)
786n/a
787n/a self.protocol.connection_lost.assert_called_with(exc)
788n/a self.sock.close.assert_called_with()
789n/a self.assertIsNone(tr._sock)
790n/a
791n/a self.assertIsNone(tr._protocol)
792n/a self.assertIsNone(tr._loop)
793n/a
794n/a
795n/aclass SelectorSocketTransportTests(test_utils.TestCase):
796n/a
797n/a def setUp(self):
798n/a super().setUp()
799n/a self.loop = self.new_test_loop()
800n/a self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
801n/a self.sock = mock.Mock(socket.socket)
802n/a self.sock_fd = self.sock.fileno.return_value = 7
803n/a
804n/a def socket_transport(self, waiter=None):
805n/a transport = _SelectorSocketTransport(self.loop, self.sock,
806n/a self.protocol, waiter=waiter)
807n/a self.addCleanup(close_transport, transport)
808n/a return transport
809n/a
810n/a def test_ctor(self):
811n/a waiter = asyncio.Future(loop=self.loop)
812n/a tr = self.socket_transport(waiter=waiter)
813n/a self.loop.run_until_complete(waiter)
814n/a
815n/a self.loop.assert_reader(7, tr._read_ready)
816n/a test_utils.run_briefly(self.loop)
817n/a self.protocol.connection_made.assert_called_with(tr)
818n/a
819n/a def test_ctor_with_waiter(self):
820n/a waiter = asyncio.Future(loop=self.loop)
821n/a self.socket_transport(waiter=waiter)
822n/a self.loop.run_until_complete(waiter)
823n/a
824n/a self.assertIsNone(waiter.result())
825n/a
826n/a def test_pause_resume_reading(self):
827n/a tr = self.socket_transport()
828n/a test_utils.run_briefly(self.loop)
829n/a self.assertFalse(tr._paused)
830n/a self.loop.assert_reader(7, tr._read_ready)
831n/a tr.pause_reading()
832n/a self.assertTrue(tr._paused)
833n/a self.assertFalse(7 in self.loop.readers)
834n/a tr.resume_reading()
835n/a self.assertFalse(tr._paused)
836n/a self.loop.assert_reader(7, tr._read_ready)
837n/a with self.assertRaises(RuntimeError):
838n/a tr.resume_reading()
839n/a
840n/a def test_read_ready(self):
841n/a transport = self.socket_transport()
842n/a
843n/a self.sock.recv.return_value = b'data'
844n/a transport._read_ready()
845n/a
846n/a self.protocol.data_received.assert_called_with(b'data')
847n/a
848n/a def test_read_ready_eof(self):
849n/a transport = self.socket_transport()
850n/a transport.close = mock.Mock()
851n/a
852n/a self.sock.recv.return_value = b''
853n/a transport._read_ready()
854n/a
855n/a self.protocol.eof_received.assert_called_with()
856n/a transport.close.assert_called_with()
857n/a
858n/a def test_read_ready_eof_keep_open(self):
859n/a transport = self.socket_transport()
860n/a transport.close = mock.Mock()
861n/a
862n/a self.sock.recv.return_value = b''
863n/a self.protocol.eof_received.return_value = True
864n/a transport._read_ready()
865n/a
866n/a self.protocol.eof_received.assert_called_with()
867n/a self.assertFalse(transport.close.called)
868n/a
869n/a @mock.patch('logging.exception')
870n/a def test_read_ready_tryagain(self, m_exc):
871n/a self.sock.recv.side_effect = BlockingIOError
872n/a
873n/a transport = self.socket_transport()
874n/a transport._fatal_error = mock.Mock()
875n/a transport._read_ready()
876n/a
877n/a self.assertFalse(transport._fatal_error.called)
878n/a
879n/a @mock.patch('logging.exception')
880n/a def test_read_ready_tryagain_interrupted(self, m_exc):
881n/a self.sock.recv.side_effect = InterruptedError
882n/a
883n/a transport = self.socket_transport()
884n/a transport._fatal_error = mock.Mock()
885n/a transport._read_ready()
886n/a
887n/a self.assertFalse(transport._fatal_error.called)
888n/a
889n/a @mock.patch('logging.exception')
890n/a def test_read_ready_conn_reset(self, m_exc):
891n/a err = self.sock.recv.side_effect = ConnectionResetError()
892n/a
893n/a transport = self.socket_transport()
894n/a transport._force_close = mock.Mock()
895n/a with test_utils.disable_logger():
896n/a transport._read_ready()
897n/a transport._force_close.assert_called_with(err)
898n/a
899n/a @mock.patch('logging.exception')
900n/a def test_read_ready_err(self, m_exc):
901n/a err = self.sock.recv.side_effect = OSError()
902n/a
903n/a transport = self.socket_transport()
904n/a transport._fatal_error = mock.Mock()
905n/a transport._read_ready()
906n/a
907n/a transport._fatal_error.assert_called_with(
908n/a err,
909n/a 'Fatal read error on socket transport')
910n/a
911n/a def test_write(self):
912n/a data = b'data'
913n/a self.sock.send.return_value = len(data)
914n/a
915n/a transport = self.socket_transport()
916n/a transport.write(data)
917n/a self.sock.send.assert_called_with(data)
918n/a
919n/a def test_write_bytearray(self):
920n/a data = bytearray(b'data')
921n/a self.sock.send.return_value = len(data)
922n/a
923n/a transport = self.socket_transport()
924n/a transport.write(data)
925n/a self.sock.send.assert_called_with(data)
926n/a self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.
927n/a
928n/a def test_write_memoryview(self):
929n/a data = memoryview(b'data')
930n/a self.sock.send.return_value = len(data)
931n/a
932n/a transport = self.socket_transport()
933n/a transport.write(data)
934n/a self.sock.send.assert_called_with(data)
935n/a
936n/a def test_write_no_data(self):
937n/a transport = self.socket_transport()
938n/a transport._buffer.extend(b'data')
939n/a transport.write(b'')
940n/a self.assertFalse(self.sock.send.called)
941n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
942n/a
943n/a def test_write_buffer(self):
944n/a transport = self.socket_transport()
945n/a transport._buffer.extend(b'data1')
946n/a transport.write(b'data2')
947n/a self.assertFalse(self.sock.send.called)
948n/a self.assertEqual(list_to_buffer([b'data1', b'data2']),
949n/a transport._buffer)
950n/a
951n/a def test_write_partial(self):
952n/a data = b'data'
953n/a self.sock.send.return_value = 2
954n/a
955n/a transport = self.socket_transport()
956n/a transport.write(data)
957n/a
958n/a self.loop.assert_writer(7, transport._write_ready)
959n/a self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
960n/a
961n/a def test_write_partial_bytearray(self):
962n/a data = bytearray(b'data')
963n/a self.sock.send.return_value = 2
964n/a
965n/a transport = self.socket_transport()
966n/a transport.write(data)
967n/a
968n/a self.loop.assert_writer(7, transport._write_ready)
969n/a self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
970n/a self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.
971n/a
972n/a def test_write_partial_memoryview(self):
973n/a data = memoryview(b'data')
974n/a self.sock.send.return_value = 2
975n/a
976n/a transport = self.socket_transport()
977n/a transport.write(data)
978n/a
979n/a self.loop.assert_writer(7, transport._write_ready)
980n/a self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
981n/a
982n/a def test_write_partial_none(self):
983n/a data = b'data'
984n/a self.sock.send.return_value = 0
985n/a self.sock.fileno.return_value = 7
986n/a
987n/a transport = self.socket_transport()
988n/a transport.write(data)
989n/a
990n/a self.loop.assert_writer(7, transport._write_ready)
991n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
992n/a
993n/a def test_write_tryagain(self):
994n/a self.sock.send.side_effect = BlockingIOError
995n/a
996n/a data = b'data'
997n/a transport = self.socket_transport()
998n/a transport.write(data)
999n/a
1000n/a self.loop.assert_writer(7, transport._write_ready)
1001n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1002n/a
1003n/a @mock.patch('asyncio.selector_events.logger')
1004n/a def test_write_exception(self, m_log):
1005n/a err = self.sock.send.side_effect = OSError()
1006n/a
1007n/a data = b'data'
1008n/a transport = self.socket_transport()
1009n/a transport._fatal_error = mock.Mock()
1010n/a transport.write(data)
1011n/a transport._fatal_error.assert_called_with(
1012n/a err,
1013n/a 'Fatal write error on socket transport')
1014n/a transport._conn_lost = 1
1015n/a
1016n/a self.sock.reset_mock()
1017n/a transport.write(data)
1018n/a self.assertFalse(self.sock.send.called)
1019n/a self.assertEqual(transport._conn_lost, 2)
1020n/a transport.write(data)
1021n/a transport.write(data)
1022n/a transport.write(data)
1023n/a transport.write(data)
1024n/a m_log.warning.assert_called_with('socket.send() raised exception.')
1025n/a
1026n/a def test_write_str(self):
1027n/a transport = self.socket_transport()
1028n/a self.assertRaises(TypeError, transport.write, 'str')
1029n/a
1030n/a def test_write_closing(self):
1031n/a transport = self.socket_transport()
1032n/a transport.close()
1033n/a self.assertEqual(transport._conn_lost, 1)
1034n/a transport.write(b'data')
1035n/a self.assertEqual(transport._conn_lost, 2)
1036n/a
1037n/a def test_write_ready(self):
1038n/a data = b'data'
1039n/a self.sock.send.return_value = len(data)
1040n/a
1041n/a transport = self.socket_transport()
1042n/a transport._buffer.extend(data)
1043n/a self.loop._add_writer(7, transport._write_ready)
1044n/a transport._write_ready()
1045n/a self.assertTrue(self.sock.send.called)
1046n/a self.assertFalse(self.loop.writers)
1047n/a
1048n/a def test_write_ready_closing(self):
1049n/a data = b'data'
1050n/a self.sock.send.return_value = len(data)
1051n/a
1052n/a transport = self.socket_transport()
1053n/a transport._closing = True
1054n/a transport._buffer.extend(data)
1055n/a self.loop._add_writer(7, transport._write_ready)
1056n/a transport._write_ready()
1057n/a self.assertTrue(self.sock.send.called)
1058n/a self.assertFalse(self.loop.writers)
1059n/a self.sock.close.assert_called_with()
1060n/a self.protocol.connection_lost.assert_called_with(None)
1061n/a
1062n/a def test_write_ready_no_data(self):
1063n/a transport = self.socket_transport()
1064n/a # This is an internal error.
1065n/a self.assertRaises(AssertionError, transport._write_ready)
1066n/a
1067n/a def test_write_ready_partial(self):
1068n/a data = b'data'
1069n/a self.sock.send.return_value = 2
1070n/a
1071n/a transport = self.socket_transport()
1072n/a transport._buffer.extend(data)
1073n/a self.loop._add_writer(7, transport._write_ready)
1074n/a transport._write_ready()
1075n/a self.loop.assert_writer(7, transport._write_ready)
1076n/a self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
1077n/a
1078n/a def test_write_ready_partial_none(self):
1079n/a data = b'data'
1080n/a self.sock.send.return_value = 0
1081n/a
1082n/a transport = self.socket_transport()
1083n/a transport._buffer.extend(data)
1084n/a self.loop._add_writer(7, transport._write_ready)
1085n/a transport._write_ready()
1086n/a self.loop.assert_writer(7, transport._write_ready)
1087n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1088n/a
1089n/a def test_write_ready_tryagain(self):
1090n/a self.sock.send.side_effect = BlockingIOError
1091n/a
1092n/a transport = self.socket_transport()
1093n/a transport._buffer = list_to_buffer([b'data1', b'data2'])
1094n/a self.loop._add_writer(7, transport._write_ready)
1095n/a transport._write_ready()
1096n/a
1097n/a self.loop.assert_writer(7, transport._write_ready)
1098n/a self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)
1099n/a
1100n/a def test_write_ready_exception(self):
1101n/a err = self.sock.send.side_effect = OSError()
1102n/a
1103n/a transport = self.socket_transport()
1104n/a transport._fatal_error = mock.Mock()
1105n/a transport._buffer.extend(b'data')
1106n/a transport._write_ready()
1107n/a transport._fatal_error.assert_called_with(
1108n/a err,
1109n/a 'Fatal write error on socket transport')
1110n/a
1111n/a def test_write_eof(self):
1112n/a tr = self.socket_transport()
1113n/a self.assertTrue(tr.can_write_eof())
1114n/a tr.write_eof()
1115n/a self.sock.shutdown.assert_called_with(socket.SHUT_WR)
1116n/a tr.write_eof()
1117n/a self.assertEqual(self.sock.shutdown.call_count, 1)
1118n/a tr.close()
1119n/a
1120n/a def test_write_eof_buffer(self):
1121n/a tr = self.socket_transport()
1122n/a self.sock.send.side_effect = BlockingIOError
1123n/a tr.write(b'data')
1124n/a tr.write_eof()
1125n/a self.assertEqual(tr._buffer, list_to_buffer([b'data']))
1126n/a self.assertTrue(tr._eof)
1127n/a self.assertFalse(self.sock.shutdown.called)
1128n/a self.sock.send.side_effect = lambda _: 4
1129n/a tr._write_ready()
1130n/a self.assertTrue(self.sock.send.called)
1131n/a self.sock.shutdown.assert_called_with(socket.SHUT_WR)
1132n/a tr.close()
1133n/a
1134n/a @mock.patch('asyncio.base_events.logger')
1135n/a def test_transport_close_remove_writer(self, m_log):
1136n/a remove_writer = self.loop._remove_writer = mock.Mock()
1137n/a
1138n/a transport = self.socket_transport()
1139n/a transport.close()
1140n/a remove_writer.assert_called_with(self.sock_fd)
1141n/a
1142n/a
1143n/a@unittest.skipIf(ssl is None, 'No ssl module')
1144n/aclass SelectorSslTransportTests(test_utils.TestCase):
1145n/a
1146n/a def setUp(self):
1147n/a super().setUp()
1148n/a self.loop = self.new_test_loop()
1149n/a self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
1150n/a self.sock = mock.Mock(socket.socket)
1151n/a self.sock.fileno.return_value = 7
1152n/a self.sslsock = mock.Mock()
1153n/a self.sslsock.fileno.return_value = 1
1154n/a self.sslcontext = mock.Mock()
1155n/a self.sslcontext.wrap_socket.return_value = self.sslsock
1156n/a
1157n/a def ssl_transport(self, waiter=None, server_hostname=None):
1158n/a transport = _SelectorSslTransport(self.loop, self.sock, self.protocol,
1159n/a self.sslcontext, waiter=waiter,
1160n/a server_hostname=server_hostname)
1161n/a self.addCleanup(close_transport, transport)
1162n/a return transport
1163n/a
1164n/a def _make_one(self, create_waiter=None):
1165n/a transport = self.ssl_transport()
1166n/a self.sock.reset_mock()
1167n/a self.sslsock.reset_mock()
1168n/a self.sslcontext.reset_mock()
1169n/a self.loop.reset_counters()
1170n/a return transport
1171n/a
1172n/a def test_on_handshake(self):
1173n/a waiter = asyncio.Future(loop=self.loop)
1174n/a tr = self.ssl_transport(waiter=waiter)
1175n/a self.assertTrue(self.sslsock.do_handshake.called)
1176n/a self.loop.assert_reader(1, tr._read_ready)
1177n/a test_utils.run_briefly(self.loop)
1178n/a self.assertIsNone(waiter.result())
1179n/a
1180n/a def test_on_handshake_reader_retry(self):
1181n/a self.loop.set_debug(False)
1182n/a self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
1183n/a transport = self.ssl_transport()
1184n/a self.loop.assert_reader(1, transport._on_handshake, None)
1185n/a
1186n/a def test_on_handshake_writer_retry(self):
1187n/a self.loop.set_debug(False)
1188n/a self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
1189n/a transport = self.ssl_transport()
1190n/a self.loop.assert_writer(1, transport._on_handshake, None)
1191n/a
1192n/a def test_on_handshake_exc(self):
1193n/a exc = ValueError()
1194n/a self.sslsock.do_handshake.side_effect = exc
1195n/a with test_utils.disable_logger():
1196n/a waiter = asyncio.Future(loop=self.loop)
1197n/a self.ssl_transport(waiter=waiter)
1198n/a self.assertTrue(waiter.done())
1199n/a self.assertIs(exc, waiter.exception())
1200n/a self.assertTrue(self.sslsock.close.called)
1201n/a
1202n/a def test_on_handshake_base_exc(self):
1203n/a waiter = asyncio.Future(loop=self.loop)
1204n/a transport = self.ssl_transport(waiter=waiter)
1205n/a exc = BaseException()
1206n/a self.sslsock.do_handshake.side_effect = exc
1207n/a with test_utils.disable_logger():
1208n/a self.assertRaises(BaseException, transport._on_handshake, 0)
1209n/a self.assertTrue(self.sslsock.close.called)
1210n/a self.assertTrue(waiter.done())
1211n/a self.assertIs(exc, waiter.exception())
1212n/a
1213n/a def test_cancel_handshake(self):
1214n/a # Python issue #23197: cancelling a handshake must not raise an
1215n/a # exception or log an error, even if the handshake failed
1216n/a waiter = asyncio.Future(loop=self.loop)
1217n/a transport = self.ssl_transport(waiter=waiter)
1218n/a waiter.cancel()
1219n/a exc = ValueError()
1220n/a self.sslsock.do_handshake.side_effect = exc
1221n/a with test_utils.disable_logger():
1222n/a transport._on_handshake(0)
1223n/a transport.close()
1224n/a test_utils.run_briefly(self.loop)
1225n/a
1226n/a def test_pause_resume_reading(self):
1227n/a tr = self._make_one()
1228n/a self.assertFalse(tr._paused)
1229n/a self.loop.assert_reader(1, tr._read_ready)
1230n/a tr.pause_reading()
1231n/a self.assertTrue(tr._paused)
1232n/a self.assertFalse(1 in self.loop.readers)
1233n/a tr.resume_reading()
1234n/a self.assertFalse(tr._paused)
1235n/a self.loop.assert_reader(1, tr._read_ready)
1236n/a with self.assertRaises(RuntimeError):
1237n/a tr.resume_reading()
1238n/a
1239n/a def test_write(self):
1240n/a transport = self._make_one()
1241n/a transport.write(b'data')
1242n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1243n/a
1244n/a def test_write_bytearray(self):
1245n/a transport = self._make_one()
1246n/a data = bytearray(b'data')
1247n/a transport.write(data)
1248n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1249n/a self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.
1250n/a self.assertIsNot(data, transport._buffer) # Hasn't been incorporated.
1251n/a
1252n/a def test_write_memoryview(self):
1253n/a transport = self._make_one()
1254n/a data = memoryview(b'data')
1255n/a transport.write(data)
1256n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1257n/a
1258n/a def test_write_no_data(self):
1259n/a transport = self._make_one()
1260n/a transport._buffer.extend(b'data')
1261n/a transport.write(b'')
1262n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1263n/a
1264n/a def test_write_str(self):
1265n/a transport = self._make_one()
1266n/a self.assertRaises(TypeError, transport.write, 'str')
1267n/a
1268n/a def test_write_closing(self):
1269n/a transport = self._make_one()
1270n/a transport.close()
1271n/a self.assertEqual(transport._conn_lost, 1)
1272n/a transport.write(b'data')
1273n/a self.assertEqual(transport._conn_lost, 2)
1274n/a
1275n/a @mock.patch('asyncio.selector_events.logger')
1276n/a def test_write_exception(self, m_log):
1277n/a transport = self._make_one()
1278n/a transport._conn_lost = 1
1279n/a transport.write(b'data')
1280n/a self.assertEqual(transport._buffer, list_to_buffer())
1281n/a transport.write(b'data')
1282n/a transport.write(b'data')
1283n/a transport.write(b'data')
1284n/a transport.write(b'data')
1285n/a m_log.warning.assert_called_with('socket.send() raised exception.')
1286n/a
1287n/a def test_read_ready_recv(self):
1288n/a self.sslsock.recv.return_value = b'data'
1289n/a transport = self._make_one()
1290n/a transport._read_ready()
1291n/a self.assertTrue(self.sslsock.recv.called)
1292n/a self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
1293n/a
1294n/a def test_read_ready_write_wants_read(self):
1295n/a self.loop._add_writer = mock.Mock()
1296n/a self.sslsock.recv.side_effect = BlockingIOError
1297n/a transport = self._make_one()
1298n/a transport._write_wants_read = True
1299n/a transport._write_ready = mock.Mock()
1300n/a transport._buffer.extend(b'data')
1301n/a transport._read_ready()
1302n/a
1303n/a self.assertFalse(transport._write_wants_read)
1304n/a transport._write_ready.assert_called_with()
1305n/a self.loop._add_writer.assert_called_with(
1306n/a transport._sock_fd, transport._write_ready)
1307n/a
1308n/a def test_read_ready_recv_eof(self):
1309n/a self.sslsock.recv.return_value = b''
1310n/a transport = self._make_one()
1311n/a transport.close = mock.Mock()
1312n/a transport._read_ready()
1313n/a transport.close.assert_called_with()
1314n/a self.protocol.eof_received.assert_called_with()
1315n/a
1316n/a def test_read_ready_recv_conn_reset(self):
1317n/a err = self.sslsock.recv.side_effect = ConnectionResetError()
1318n/a transport = self._make_one()
1319n/a transport._force_close = mock.Mock()
1320n/a with test_utils.disable_logger():
1321n/a transport._read_ready()
1322n/a transport._force_close.assert_called_with(err)
1323n/a
1324n/a def test_read_ready_recv_retry(self):
1325n/a self.sslsock.recv.side_effect = ssl.SSLWantReadError
1326n/a transport = self._make_one()
1327n/a transport._read_ready()
1328n/a self.assertTrue(self.sslsock.recv.called)
1329n/a self.assertFalse(self.protocol.data_received.called)
1330n/a
1331n/a self.sslsock.recv.side_effect = BlockingIOError
1332n/a transport._read_ready()
1333n/a self.assertFalse(self.protocol.data_received.called)
1334n/a
1335n/a self.sslsock.recv.side_effect = InterruptedError
1336n/a transport._read_ready()
1337n/a self.assertFalse(self.protocol.data_received.called)
1338n/a
1339n/a def test_read_ready_recv_write(self):
1340n/a self.loop._remove_reader = mock.Mock()
1341n/a self.loop._add_writer = mock.Mock()
1342n/a self.sslsock.recv.side_effect = ssl.SSLWantWriteError
1343n/a transport = self._make_one()
1344n/a transport._read_ready()
1345n/a self.assertFalse(self.protocol.data_received.called)
1346n/a self.assertTrue(transport._read_wants_write)
1347n/a
1348n/a self.loop._remove_reader.assert_called_with(transport._sock_fd)
1349n/a self.loop._add_writer.assert_called_with(
1350n/a transport._sock_fd, transport._write_ready)
1351n/a
1352n/a def test_read_ready_recv_exc(self):
1353n/a err = self.sslsock.recv.side_effect = OSError()
1354n/a transport = self._make_one()
1355n/a transport._fatal_error = mock.Mock()
1356n/a transport._read_ready()
1357n/a transport._fatal_error.assert_called_with(
1358n/a err,
1359n/a 'Fatal read error on SSL transport')
1360n/a
1361n/a def test_write_ready_send(self):
1362n/a self.sslsock.send.return_value = 4
1363n/a transport = self._make_one()
1364n/a transport._buffer = list_to_buffer([b'data'])
1365n/a transport._write_ready()
1366n/a self.assertEqual(list_to_buffer(), transport._buffer)
1367n/a self.assertTrue(self.sslsock.send.called)
1368n/a
1369n/a def test_write_ready_send_none(self):
1370n/a self.sslsock.send.return_value = 0
1371n/a transport = self._make_one()
1372n/a transport._buffer = list_to_buffer([b'data1', b'data2'])
1373n/a transport._write_ready()
1374n/a self.assertTrue(self.sslsock.send.called)
1375n/a self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)
1376n/a
1377n/a def test_write_ready_send_partial(self):
1378n/a self.sslsock.send.return_value = 2
1379n/a transport = self._make_one()
1380n/a transport._buffer = list_to_buffer([b'data1', b'data2'])
1381n/a transport._write_ready()
1382n/a self.assertTrue(self.sslsock.send.called)
1383n/a self.assertEqual(list_to_buffer([b'ta1data2']), transport._buffer)
1384n/a
1385n/a def test_write_ready_send_closing_partial(self):
1386n/a self.sslsock.send.return_value = 2
1387n/a transport = self._make_one()
1388n/a transport._buffer = list_to_buffer([b'data1', b'data2'])
1389n/a transport._write_ready()
1390n/a self.assertTrue(self.sslsock.send.called)
1391n/a self.assertFalse(self.sslsock.close.called)
1392n/a
1393n/a def test_write_ready_send_closing(self):
1394n/a self.sslsock.send.return_value = 4
1395n/a transport = self._make_one()
1396n/a transport._buffer = list_to_buffer([b'data'])
1397n/a transport.close()
1398n/a transport._write_ready()
1399n/a self.protocol.connection_lost.assert_called_with(None)
1400n/a
1401n/a def test_write_ready_send_closing_empty_buffer(self):
1402n/a self.sslsock.send.return_value = 4
1403n/a call_soon = self.loop.call_soon = mock.Mock()
1404n/a transport = self._make_one()
1405n/a transport._buffer = list_to_buffer()
1406n/a transport.close()
1407n/a transport._write_ready()
1408n/a call_soon.assert_called_with(transport._call_connection_lost, None)
1409n/a
1410n/a def test_write_ready_send_retry(self):
1411n/a transport = self._make_one()
1412n/a transport._buffer = list_to_buffer([b'data'])
1413n/a
1414n/a self.sslsock.send.side_effect = ssl.SSLWantWriteError
1415n/a transport._write_ready()
1416n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1417n/a
1418n/a self.sslsock.send.side_effect = BlockingIOError()
1419n/a transport._write_ready()
1420n/a self.assertEqual(list_to_buffer([b'data']), transport._buffer)
1421n/a
1422n/a def test_write_ready_send_read(self):
1423n/a transport = self._make_one()
1424n/a transport._buffer = list_to_buffer([b'data'])
1425n/a
1426n/a self.loop._remove_writer = mock.Mock()
1427n/a self.sslsock.send.side_effect = ssl.SSLWantReadError
1428n/a transport._write_ready()
1429n/a self.assertFalse(self.protocol.data_received.called)
1430n/a self.assertTrue(transport._write_wants_read)
1431n/a self.loop._remove_writer.assert_called_with(transport._sock_fd)
1432n/a
1433n/a def test_write_ready_send_exc(self):
1434n/a err = self.sslsock.send.side_effect = OSError()
1435n/a
1436n/a transport = self._make_one()
1437n/a transport._buffer = list_to_buffer([b'data'])
1438n/a transport._fatal_error = mock.Mock()
1439n/a transport._write_ready()
1440n/a transport._fatal_error.assert_called_with(
1441n/a err,
1442n/a 'Fatal write error on SSL transport')
1443n/a self.assertEqual(list_to_buffer(), transport._buffer)
1444n/a
1445n/a def test_write_ready_read_wants_write(self):
1446n/a self.loop._add_reader = mock.Mock()
1447n/a self.sslsock.send.side_effect = BlockingIOError
1448n/a transport = self._make_one()
1449n/a transport._read_wants_write = True
1450n/a transport._read_ready = mock.Mock()
1451n/a transport._write_ready()
1452n/a
1453n/a self.assertFalse(transport._read_wants_write)
1454n/a transport._read_ready.assert_called_with()
1455n/a self.loop._add_reader.assert_called_with(
1456n/a transport._sock_fd, transport._read_ready)
1457n/a
1458n/a def test_write_eof(self):
1459n/a tr = self._make_one()
1460n/a self.assertFalse(tr.can_write_eof())
1461n/a self.assertRaises(NotImplementedError, tr.write_eof)
1462n/a
1463n/a def check_close(self):
1464n/a tr = self._make_one()
1465n/a tr.close()
1466n/a
1467n/a self.assertTrue(tr.is_closing())
1468n/a self.assertEqual(1, self.loop.remove_reader_count[1])
1469n/a self.assertEqual(tr._conn_lost, 1)
1470n/a
1471n/a tr.close()
1472n/a self.assertEqual(tr._conn_lost, 1)
1473n/a self.assertEqual(1, self.loop.remove_reader_count[1])
1474n/a
1475n/a test_utils.run_briefly(self.loop)
1476n/a
1477n/a def test_close(self):
1478n/a self.check_close()
1479n/a self.assertTrue(self.protocol.connection_made.called)
1480n/a self.assertTrue(self.protocol.connection_lost.called)
1481n/a
1482n/a def test_close_not_connected(self):
1483n/a self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
1484n/a self.check_close()
1485n/a self.assertFalse(self.protocol.connection_made.called)
1486n/a self.assertFalse(self.protocol.connection_lost.called)
1487n/a
1488n/a @unittest.skipIf(ssl is None, 'No SSL support')
1489n/a def test_server_hostname(self):
1490n/a self.ssl_transport(server_hostname='localhost')
1491n/a self.sslcontext.wrap_socket.assert_called_with(
1492n/a self.sock, do_handshake_on_connect=False, server_side=False,
1493n/a server_hostname='localhost')
1494n/a
1495n/a
1496n/aclass SelectorSslWithoutSslTransportTests(unittest.TestCase):
1497n/a
1498n/a @mock.patch('asyncio.selector_events.ssl', None)
1499n/a def test_ssl_transport_requires_ssl_module(self):
1500n/a Mock = mock.Mock
1501n/a with self.assertRaises(RuntimeError):
1502n/a _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
1503n/a
1504n/a
1505n/aclass SelectorDatagramTransportTests(test_utils.TestCase):
1506n/a
1507n/a def setUp(self):
1508n/a super().setUp()
1509n/a self.loop = self.new_test_loop()
1510n/a self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
1511n/a self.sock = mock.Mock(spec_set=socket.socket)
1512n/a self.sock.fileno.return_value = 7
1513n/a
1514n/a def datagram_transport(self, address=None):
1515n/a transport = _SelectorDatagramTransport(self.loop, self.sock,
1516n/a self.protocol,
1517n/a address=address)
1518n/a self.addCleanup(close_transport, transport)
1519n/a return transport
1520n/a
1521n/a def test_read_ready(self):
1522n/a transport = self.datagram_transport()
1523n/a
1524n/a self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234))
1525n/a transport._read_ready()
1526n/a
1527n/a self.protocol.datagram_received.assert_called_with(
1528n/a b'data', ('0.0.0.0', 1234))
1529n/a
1530n/a def test_read_ready_tryagain(self):
1531n/a transport = self.datagram_transport()
1532n/a
1533n/a self.sock.recvfrom.side_effect = BlockingIOError
1534n/a transport._fatal_error = mock.Mock()
1535n/a transport._read_ready()
1536n/a
1537n/a self.assertFalse(transport._fatal_error.called)
1538n/a
1539n/a def test_read_ready_err(self):
1540n/a transport = self.datagram_transport()
1541n/a
1542n/a err = self.sock.recvfrom.side_effect = RuntimeError()
1543n/a transport._fatal_error = mock.Mock()
1544n/a transport._read_ready()
1545n/a
1546n/a transport._fatal_error.assert_called_with(
1547n/a err,
1548n/a 'Fatal read error on datagram transport')
1549n/a
1550n/a def test_read_ready_oserr(self):
1551n/a transport = self.datagram_transport()
1552n/a
1553n/a err = self.sock.recvfrom.side_effect = OSError()
1554n/a transport._fatal_error = mock.Mock()
1555n/a transport._read_ready()
1556n/a
1557n/a self.assertFalse(transport._fatal_error.called)
1558n/a self.protocol.error_received.assert_called_with(err)
1559n/a
1560n/a def test_sendto(self):
1561n/a data = b'data'
1562n/a transport = self.datagram_transport()
1563n/a transport.sendto(data, ('0.0.0.0', 1234))
1564n/a self.assertTrue(self.sock.sendto.called)
1565n/a self.assertEqual(
1566n/a self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1567n/a
1568n/a def test_sendto_bytearray(self):
1569n/a data = bytearray(b'data')
1570n/a transport = self.datagram_transport()
1571n/a transport.sendto(data, ('0.0.0.0', 1234))
1572n/a self.assertTrue(self.sock.sendto.called)
1573n/a self.assertEqual(
1574n/a self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1575n/a
1576n/a def test_sendto_memoryview(self):
1577n/a data = memoryview(b'data')
1578n/a transport = self.datagram_transport()
1579n/a transport.sendto(data, ('0.0.0.0', 1234))
1580n/a self.assertTrue(self.sock.sendto.called)
1581n/a self.assertEqual(
1582n/a self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
1583n/a
1584n/a def test_sendto_no_data(self):
1585n/a transport = self.datagram_transport()
1586n/a transport._buffer.append((b'data', ('0.0.0.0', 12345)))
1587n/a transport.sendto(b'', ())
1588n/a self.assertFalse(self.sock.sendto.called)
1589n/a self.assertEqual(
1590n/a [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1591n/a
1592n/a def test_sendto_buffer(self):
1593n/a transport = self.datagram_transport()
1594n/a transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1595n/a transport.sendto(b'data2', ('0.0.0.0', 12345))
1596n/a self.assertFalse(self.sock.sendto.called)
1597n/a self.assertEqual(
1598n/a [(b'data1', ('0.0.0.0', 12345)),
1599n/a (b'data2', ('0.0.0.0', 12345))],
1600n/a list(transport._buffer))
1601n/a
1602n/a def test_sendto_buffer_bytearray(self):
1603n/a data2 = bytearray(b'data2')
1604n/a transport = self.datagram_transport()
1605n/a transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1606n/a transport.sendto(data2, ('0.0.0.0', 12345))
1607n/a self.assertFalse(self.sock.sendto.called)
1608n/a self.assertEqual(
1609n/a [(b'data1', ('0.0.0.0', 12345)),
1610n/a (b'data2', ('0.0.0.0', 12345))],
1611n/a list(transport._buffer))
1612n/a self.assertIsInstance(transport._buffer[1][0], bytes)
1613n/a
1614n/a def test_sendto_buffer_memoryview(self):
1615n/a data2 = memoryview(b'data2')
1616n/a transport = self.datagram_transport()
1617n/a transport._buffer.append((b'data1', ('0.0.0.0', 12345)))
1618n/a transport.sendto(data2, ('0.0.0.0', 12345))
1619n/a self.assertFalse(self.sock.sendto.called)
1620n/a self.assertEqual(
1621n/a [(b'data1', ('0.0.0.0', 12345)),
1622n/a (b'data2', ('0.0.0.0', 12345))],
1623n/a list(transport._buffer))
1624n/a self.assertIsInstance(transport._buffer[1][0], bytes)
1625n/a
1626n/a def test_sendto_tryagain(self):
1627n/a data = b'data'
1628n/a
1629n/a self.sock.sendto.side_effect = BlockingIOError
1630n/a
1631n/a transport = self.datagram_transport()
1632n/a transport.sendto(data, ('0.0.0.0', 12345))
1633n/a
1634n/a self.loop.assert_writer(7, transport._sendto_ready)
1635n/a self.assertEqual(
1636n/a [(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
1637n/a
1638n/a @mock.patch('asyncio.selector_events.logger')
1639n/a def test_sendto_exception(self, m_log):
1640n/a data = b'data'
1641n/a err = self.sock.sendto.side_effect = RuntimeError()
1642n/a
1643n/a transport = self.datagram_transport()
1644n/a transport._fatal_error = mock.Mock()
1645n/a transport.sendto(data, ())
1646n/a
1647n/a self.assertTrue(transport._fatal_error.called)
1648n/a transport._fatal_error.assert_called_with(
1649n/a err,
1650n/a 'Fatal write error on datagram transport')
1651n/a transport._conn_lost = 1
1652n/a
1653n/a transport._address = ('123',)
1654n/a transport.sendto(data)
1655n/a transport.sendto(data)
1656n/a transport.sendto(data)
1657n/a transport.sendto(data)
1658n/a transport.sendto(data)
1659n/a m_log.warning.assert_called_with('socket.send() raised exception.')
1660n/a
1661n/a def test_sendto_error_received(self):
1662n/a data = b'data'
1663n/a
1664n/a self.sock.sendto.side_effect = ConnectionRefusedError
1665n/a
1666n/a transport = self.datagram_transport()
1667n/a transport._fatal_error = mock.Mock()
1668n/a transport.sendto(data, ())
1669n/a
1670n/a self.assertEqual(transport._conn_lost, 0)
1671n/a self.assertFalse(transport._fatal_error.called)
1672n/a
1673n/a def test_sendto_error_received_connected(self):
1674n/a data = b'data'
1675n/a
1676n/a self.sock.send.side_effect = ConnectionRefusedError
1677n/a
1678n/a transport = self.datagram_transport(address=('0.0.0.0', 1))
1679n/a transport._fatal_error = mock.Mock()
1680n/a transport.sendto(data)
1681n/a
1682n/a self.assertFalse(transport._fatal_error.called)
1683n/a self.assertTrue(self.protocol.error_received.called)
1684n/a
1685n/a def test_sendto_str(self):
1686n/a transport = self.datagram_transport()
1687n/a self.assertRaises(TypeError, transport.sendto, 'str', ())
1688n/a
1689n/a def test_sendto_connected_addr(self):
1690n/a transport = self.datagram_transport(address=('0.0.0.0', 1))
1691n/a self.assertRaises(
1692n/a ValueError, transport.sendto, b'str', ('0.0.0.0', 2))
1693n/a
1694n/a def test_sendto_closing(self):
1695n/a transport = self.datagram_transport(address=(1,))
1696n/a transport.close()
1697n/a self.assertEqual(transport._conn_lost, 1)
1698n/a transport.sendto(b'data', (1,))
1699n/a self.assertEqual(transport._conn_lost, 2)
1700n/a
1701n/a def test_sendto_ready(self):
1702n/a data = b'data'
1703n/a self.sock.sendto.return_value = len(data)
1704n/a
1705n/a transport = self.datagram_transport()
1706n/a transport._buffer.append((data, ('0.0.0.0', 12345)))
1707n/a self.loop._add_writer(7, transport._sendto_ready)
1708n/a transport._sendto_ready()
1709n/a self.assertTrue(self.sock.sendto.called)
1710n/a self.assertEqual(
1711n/a self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1712n/a self.assertFalse(self.loop.writers)
1713n/a
1714n/a def test_sendto_ready_closing(self):
1715n/a data = b'data'
1716n/a self.sock.send.return_value = len(data)
1717n/a
1718n/a transport = self.datagram_transport()
1719n/a transport._closing = True
1720n/a transport._buffer.append((data, ()))
1721n/a self.loop._add_writer(7, transport._sendto_ready)
1722n/a transport._sendto_ready()
1723n/a self.sock.sendto.assert_called_with(data, ())
1724n/a self.assertFalse(self.loop.writers)
1725n/a self.sock.close.assert_called_with()
1726n/a self.protocol.connection_lost.assert_called_with(None)
1727n/a
1728n/a def test_sendto_ready_no_data(self):
1729n/a transport = self.datagram_transport()
1730n/a self.loop._add_writer(7, transport._sendto_ready)
1731n/a transport._sendto_ready()
1732n/a self.assertFalse(self.sock.sendto.called)
1733n/a self.assertFalse(self.loop.writers)
1734n/a
1735n/a def test_sendto_ready_tryagain(self):
1736n/a self.sock.sendto.side_effect = BlockingIOError
1737n/a
1738n/a transport = self.datagram_transport()
1739n/a transport._buffer.extend([(b'data1', ()), (b'data2', ())])
1740n/a self.loop._add_writer(7, transport._sendto_ready)
1741n/a transport._sendto_ready()
1742n/a
1743n/a self.loop.assert_writer(7, transport._sendto_ready)
1744n/a self.assertEqual(
1745n/a [(b'data1', ()), (b'data2', ())],
1746n/a list(transport._buffer))
1747n/a
1748n/a def test_sendto_ready_exception(self):
1749n/a err = self.sock.sendto.side_effect = RuntimeError()
1750n/a
1751n/a transport = self.datagram_transport()
1752n/a transport._fatal_error = mock.Mock()
1753n/a transport._buffer.append((b'data', ()))
1754n/a transport._sendto_ready()
1755n/a
1756n/a transport._fatal_error.assert_called_with(
1757n/a err,
1758n/a 'Fatal write error on datagram transport')
1759n/a
1760n/a def test_sendto_ready_error_received(self):
1761n/a self.sock.sendto.side_effect = ConnectionRefusedError
1762n/a
1763n/a transport = self.datagram_transport()
1764n/a transport._fatal_error = mock.Mock()
1765n/a transport._buffer.append((b'data', ()))
1766n/a transport._sendto_ready()
1767n/a
1768n/a self.assertFalse(transport._fatal_error.called)
1769n/a
1770n/a def test_sendto_ready_error_received_connection(self):
1771n/a self.sock.send.side_effect = ConnectionRefusedError
1772n/a
1773n/a transport = self.datagram_transport(address=('0.0.0.0', 1))
1774n/a transport._fatal_error = mock.Mock()
1775n/a transport._buffer.append((b'data', ()))
1776n/a transport._sendto_ready()
1777n/a
1778n/a self.assertFalse(transport._fatal_error.called)
1779n/a self.assertTrue(self.protocol.error_received.called)
1780n/a
1781n/a @mock.patch('asyncio.base_events.logger.error')
1782n/a def test_fatal_error_connected(self, m_exc):
1783n/a transport = self.datagram_transport(address=('0.0.0.0', 1))
1784n/a err = ConnectionRefusedError()
1785n/a transport._fatal_error(err)
1786n/a self.assertFalse(self.protocol.error_received.called)
1787n/a m_exc.assert_called_with(
1788n/a test_utils.MockPattern(
1789n/a 'Fatal error on transport\nprotocol:.*\ntransport:.*'),
1790n/a exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY))
1791n/a
1792n/aif __name__ == '__main__':
1793n/a unittest.main()