ยปCore Development>Code coverage>Lib/test/test_asyncio/test_unix_events.py

Python code coverage for Lib/test/test_asyncio/test_unix_events.py

#countcontent
1n/a"""Tests for unix_events.py."""
2n/a
3n/aimport collections
4n/aimport errno
5n/aimport io
6n/aimport os
7n/aimport pathlib
8n/aimport signal
9n/aimport socket
10n/aimport stat
11n/aimport sys
12n/aimport tempfile
13n/aimport threading
14n/aimport unittest
15n/afrom unittest import mock
16n/a
17n/aif sys.platform == 'win32':
18n/a raise unittest.SkipTest('UNIX only')
19n/a
20n/a
21n/aimport asyncio
22n/afrom asyncio import log
23n/afrom asyncio import test_utils
24n/afrom asyncio import unix_events
25n/a
26n/a
27n/aMOCK_ANY = mock.ANY
28n/a
29n/a
30n/adef close_pipe_transport(transport):
31n/a # Don't call transport.close() because the event loop and the selector
32n/a # are mocked
33n/a if transport._pipe is None:
34n/a return
35n/a transport._pipe.close()
36n/a transport._pipe = None
37n/a
38n/a
39n/a@unittest.skipUnless(signal, 'Signals are not supported')
40n/aclass SelectorEventLoopSignalTests(test_utils.TestCase):
41n/a
42n/a def setUp(self):
43n/a super().setUp()
44n/a self.loop = asyncio.SelectorEventLoop()
45n/a self.set_event_loop(self.loop)
46n/a
47n/a def test_check_signal(self):
48n/a self.assertRaises(
49n/a TypeError, self.loop._check_signal, '1')
50n/a self.assertRaises(
51n/a ValueError, self.loop._check_signal, signal.NSIG + 1)
52n/a
53n/a def test_handle_signal_no_handler(self):
54n/a self.loop._handle_signal(signal.NSIG + 1)
55n/a
56n/a def test_handle_signal_cancelled_handler(self):
57n/a h = asyncio.Handle(mock.Mock(), (),
58n/a loop=mock.Mock())
59n/a h.cancel()
60n/a self.loop._signal_handlers[signal.NSIG + 1] = h
61n/a self.loop.remove_signal_handler = mock.Mock()
62n/a self.loop._handle_signal(signal.NSIG + 1)
63n/a self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
64n/a
65n/a @mock.patch('asyncio.unix_events.signal')
66n/a def test_add_signal_handler_setup_error(self, m_signal):
67n/a m_signal.NSIG = signal.NSIG
68n/a m_signal.set_wakeup_fd.side_effect = ValueError
69n/a
70n/a self.assertRaises(
71n/a RuntimeError,
72n/a self.loop.add_signal_handler,
73n/a signal.SIGINT, lambda: True)
74n/a
75n/a @mock.patch('asyncio.unix_events.signal')
76n/a def test_add_signal_handler_coroutine_error(self, m_signal):
77n/a m_signal.NSIG = signal.NSIG
78n/a
79n/a @asyncio.coroutine
80n/a def simple_coroutine():
81n/a yield from []
82n/a
83n/a # callback must not be a coroutine function
84n/a coro_func = simple_coroutine
85n/a coro_obj = coro_func()
86n/a self.addCleanup(coro_obj.close)
87n/a for func in (coro_func, coro_obj):
88n/a self.assertRaisesRegex(
89n/a TypeError, 'coroutines cannot be used with add_signal_handler',
90n/a self.loop.add_signal_handler,
91n/a signal.SIGINT, func)
92n/a
93n/a @mock.patch('asyncio.unix_events.signal')
94n/a def test_add_signal_handler(self, m_signal):
95n/a m_signal.NSIG = signal.NSIG
96n/a
97n/a cb = lambda: True
98n/a self.loop.add_signal_handler(signal.SIGHUP, cb)
99n/a h = self.loop._signal_handlers.get(signal.SIGHUP)
100n/a self.assertIsInstance(h, asyncio.Handle)
101n/a self.assertEqual(h._callback, cb)
102n/a
103n/a @mock.patch('asyncio.unix_events.signal')
104n/a def test_add_signal_handler_install_error(self, m_signal):
105n/a m_signal.NSIG = signal.NSIG
106n/a
107n/a def set_wakeup_fd(fd):
108n/a if fd == -1:
109n/a raise ValueError()
110n/a m_signal.set_wakeup_fd = set_wakeup_fd
111n/a
112n/a class Err(OSError):
113n/a errno = errno.EFAULT
114n/a m_signal.signal.side_effect = Err
115n/a
116n/a self.assertRaises(
117n/a Err,
118n/a self.loop.add_signal_handler,
119n/a signal.SIGINT, lambda: True)
120n/a
121n/a @mock.patch('asyncio.unix_events.signal')
122n/a @mock.patch('asyncio.base_events.logger')
123n/a def test_add_signal_handler_install_error2(self, m_logging, m_signal):
124n/a m_signal.NSIG = signal.NSIG
125n/a
126n/a class Err(OSError):
127n/a errno = errno.EINVAL
128n/a m_signal.signal.side_effect = Err
129n/a
130n/a self.loop._signal_handlers[signal.SIGHUP] = lambda: True
131n/a self.assertRaises(
132n/a RuntimeError,
133n/a self.loop.add_signal_handler,
134n/a signal.SIGINT, lambda: True)
135n/a self.assertFalse(m_logging.info.called)
136n/a self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
137n/a
138n/a @mock.patch('asyncio.unix_events.signal')
139n/a @mock.patch('asyncio.base_events.logger')
140n/a def test_add_signal_handler_install_error3(self, m_logging, m_signal):
141n/a class Err(OSError):
142n/a errno = errno.EINVAL
143n/a m_signal.signal.side_effect = Err
144n/a m_signal.NSIG = signal.NSIG
145n/a
146n/a self.assertRaises(
147n/a RuntimeError,
148n/a self.loop.add_signal_handler,
149n/a signal.SIGINT, lambda: True)
150n/a self.assertFalse(m_logging.info.called)
151n/a self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
152n/a
153n/a @mock.patch('asyncio.unix_events.signal')
154n/a def test_remove_signal_handler(self, m_signal):
155n/a m_signal.NSIG = signal.NSIG
156n/a
157n/a self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
158n/a
159n/a self.assertTrue(
160n/a self.loop.remove_signal_handler(signal.SIGHUP))
161n/a self.assertTrue(m_signal.set_wakeup_fd.called)
162n/a self.assertTrue(m_signal.signal.called)
163n/a self.assertEqual(
164n/a (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
165n/a
166n/a @mock.patch('asyncio.unix_events.signal')
167n/a def test_remove_signal_handler_2(self, m_signal):
168n/a m_signal.NSIG = signal.NSIG
169n/a m_signal.SIGINT = signal.SIGINT
170n/a
171n/a self.loop.add_signal_handler(signal.SIGINT, lambda: True)
172n/a self.loop._signal_handlers[signal.SIGHUP] = object()
173n/a m_signal.set_wakeup_fd.reset_mock()
174n/a
175n/a self.assertTrue(
176n/a self.loop.remove_signal_handler(signal.SIGINT))
177n/a self.assertFalse(m_signal.set_wakeup_fd.called)
178n/a self.assertTrue(m_signal.signal.called)
179n/a self.assertEqual(
180n/a (signal.SIGINT, m_signal.default_int_handler),
181n/a m_signal.signal.call_args[0])
182n/a
183n/a @mock.patch('asyncio.unix_events.signal')
184n/a @mock.patch('asyncio.base_events.logger')
185n/a def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
186n/a m_signal.NSIG = signal.NSIG
187n/a self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
188n/a
189n/a m_signal.set_wakeup_fd.side_effect = ValueError
190n/a
191n/a self.loop.remove_signal_handler(signal.SIGHUP)
192n/a self.assertTrue(m_logging.info)
193n/a
194n/a @mock.patch('asyncio.unix_events.signal')
195n/a def test_remove_signal_handler_error(self, m_signal):
196n/a m_signal.NSIG = signal.NSIG
197n/a self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
198n/a
199n/a m_signal.signal.side_effect = OSError
200n/a
201n/a self.assertRaises(
202n/a OSError, self.loop.remove_signal_handler, signal.SIGHUP)
203n/a
204n/a @mock.patch('asyncio.unix_events.signal')
205n/a def test_remove_signal_handler_error2(self, m_signal):
206n/a m_signal.NSIG = signal.NSIG
207n/a self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
208n/a
209n/a class Err(OSError):
210n/a errno = errno.EINVAL
211n/a m_signal.signal.side_effect = Err
212n/a
213n/a self.assertRaises(
214n/a RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
215n/a
216n/a @mock.patch('asyncio.unix_events.signal')
217n/a def test_close(self, m_signal):
218n/a m_signal.NSIG = signal.NSIG
219n/a
220n/a self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
221n/a self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
222n/a
223n/a self.assertEqual(len(self.loop._signal_handlers), 2)
224n/a
225n/a m_signal.set_wakeup_fd.reset_mock()
226n/a
227n/a self.loop.close()
228n/a
229n/a self.assertEqual(len(self.loop._signal_handlers), 0)
230n/a m_signal.set_wakeup_fd.assert_called_once_with(-1)
231n/a
232n/a
233n/a@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
234n/a 'UNIX Sockets are not supported')
235n/aclass SelectorEventLoopUnixSocketTests(test_utils.TestCase):
236n/a
237n/a def setUp(self):
238n/a super().setUp()
239n/a self.loop = asyncio.SelectorEventLoop()
240n/a self.set_event_loop(self.loop)
241n/a
242n/a def test_create_unix_server_existing_path_sock(self):
243n/a with test_utils.unix_socket_path() as path:
244n/a sock = socket.socket(socket.AF_UNIX)
245n/a sock.bind(path)
246n/a sock.listen(1)
247n/a sock.close()
248n/a
249n/a coro = self.loop.create_unix_server(lambda: None, path)
250n/a srv = self.loop.run_until_complete(coro)
251n/a srv.close()
252n/a self.loop.run_until_complete(srv.wait_closed())
253n/a
254n/a @unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath')
255n/a def test_create_unix_server_pathlib(self):
256n/a with test_utils.unix_socket_path() as path:
257n/a path = pathlib.Path(path)
258n/a srv_coro = self.loop.create_unix_server(lambda: None, path)
259n/a srv = self.loop.run_until_complete(srv_coro)
260n/a srv.close()
261n/a self.loop.run_until_complete(srv.wait_closed())
262n/a
263n/a def test_create_unix_server_existing_path_nonsock(self):
264n/a with tempfile.NamedTemporaryFile() as file:
265n/a coro = self.loop.create_unix_server(lambda: None, file.name)
266n/a with self.assertRaisesRegex(OSError,
267n/a 'Address.*is already in use'):
268n/a self.loop.run_until_complete(coro)
269n/a
270n/a def test_create_unix_server_ssl_bool(self):
271n/a coro = self.loop.create_unix_server(lambda: None, path='spam',
272n/a ssl=True)
273n/a with self.assertRaisesRegex(TypeError,
274n/a 'ssl argument must be an SSLContext'):
275n/a self.loop.run_until_complete(coro)
276n/a
277n/a def test_create_unix_server_nopath_nosock(self):
278n/a coro = self.loop.create_unix_server(lambda: None, path=None)
279n/a with self.assertRaisesRegex(ValueError,
280n/a 'path was not specified, and no sock'):
281n/a self.loop.run_until_complete(coro)
282n/a
283n/a def test_create_unix_server_path_inetsock(self):
284n/a sock = socket.socket()
285n/a with sock:
286n/a coro = self.loop.create_unix_server(lambda: None, path=None,
287n/a sock=sock)
288n/a with self.assertRaisesRegex(ValueError,
289n/a 'A UNIX Domain Stream.*was expected'):
290n/a self.loop.run_until_complete(coro)
291n/a
292n/a def test_create_unix_server_path_dgram(self):
293n/a sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
294n/a with sock:
295n/a coro = self.loop.create_unix_server(lambda: None, path=None,
296n/a sock=sock)
297n/a with self.assertRaisesRegex(ValueError,
298n/a 'A UNIX Domain Stream.*was expected'):
299n/a self.loop.run_until_complete(coro)
300n/a
301n/a @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
302n/a 'no socket.SOCK_NONBLOCK (linux only)')
303n/a def test_create_unix_server_path_stream_bittype(self):
304n/a sock = socket.socket(
305n/a socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
306n/a with tempfile.NamedTemporaryFile() as file:
307n/a fn = file.name
308n/a try:
309n/a with sock:
310n/a sock.bind(fn)
311n/a coro = self.loop.create_unix_server(lambda: None, path=None,
312n/a sock=sock)
313n/a srv = self.loop.run_until_complete(coro)
314n/a srv.close()
315n/a self.loop.run_until_complete(srv.wait_closed())
316n/a finally:
317n/a os.unlink(fn)
318n/a
319n/a def test_create_unix_connection_path_inetsock(self):
320n/a sock = socket.socket()
321n/a with sock:
322n/a coro = self.loop.create_unix_connection(lambda: None, path=None,
323n/a sock=sock)
324n/a with self.assertRaisesRegex(ValueError,
325n/a 'A UNIX Domain Stream.*was expected'):
326n/a self.loop.run_until_complete(coro)
327n/a
328n/a @mock.patch('asyncio.unix_events.socket')
329n/a def test_create_unix_server_bind_error(self, m_socket):
330n/a # Ensure that the socket is closed on any bind error
331n/a sock = mock.Mock()
332n/a m_socket.socket.return_value = sock
333n/a
334n/a sock.bind.side_effect = OSError
335n/a coro = self.loop.create_unix_server(lambda: None, path="/test")
336n/a with self.assertRaises(OSError):
337n/a self.loop.run_until_complete(coro)
338n/a self.assertTrue(sock.close.called)
339n/a
340n/a sock.bind.side_effect = MemoryError
341n/a coro = self.loop.create_unix_server(lambda: None, path="/test")
342n/a with self.assertRaises(MemoryError):
343n/a self.loop.run_until_complete(coro)
344n/a self.assertTrue(sock.close.called)
345n/a
346n/a def test_create_unix_connection_path_sock(self):
347n/a coro = self.loop.create_unix_connection(
348n/a lambda: None, os.devnull, sock=object())
349n/a with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
350n/a self.loop.run_until_complete(coro)
351n/a
352n/a def test_create_unix_connection_nopath_nosock(self):
353n/a coro = self.loop.create_unix_connection(
354n/a lambda: None, None)
355n/a with self.assertRaisesRegex(ValueError,
356n/a 'no path and sock were specified'):
357n/a self.loop.run_until_complete(coro)
358n/a
359n/a def test_create_unix_connection_nossl_serverhost(self):
360n/a coro = self.loop.create_unix_connection(
361n/a lambda: None, os.devnull, server_hostname='spam')
362n/a with self.assertRaisesRegex(ValueError,
363n/a 'server_hostname is only meaningful'):
364n/a self.loop.run_until_complete(coro)
365n/a
366n/a def test_create_unix_connection_ssl_noserverhost(self):
367n/a coro = self.loop.create_unix_connection(
368n/a lambda: None, os.devnull, ssl=True)
369n/a
370n/a with self.assertRaisesRegex(
371n/a ValueError, 'you have to pass server_hostname when using ssl'):
372n/a
373n/a self.loop.run_until_complete(coro)
374n/a
375n/a
376n/aclass UnixReadPipeTransportTests(test_utils.TestCase):
377n/a
378n/a def setUp(self):
379n/a super().setUp()
380n/a self.loop = self.new_test_loop()
381n/a self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
382n/a self.pipe = mock.Mock(spec_set=io.RawIOBase)
383n/a self.pipe.fileno.return_value = 5
384n/a
385n/a blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
386n/a blocking_patcher.start()
387n/a self.addCleanup(blocking_patcher.stop)
388n/a
389n/a fstat_patcher = mock.patch('os.fstat')
390n/a m_fstat = fstat_patcher.start()
391n/a st = mock.Mock()
392n/a st.st_mode = stat.S_IFIFO
393n/a m_fstat.return_value = st
394n/a self.addCleanup(fstat_patcher.stop)
395n/a
396n/a def read_pipe_transport(self, waiter=None):
397n/a transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
398n/a self.protocol,
399n/a waiter=waiter)
400n/a self.addCleanup(close_pipe_transport, transport)
401n/a return transport
402n/a
403n/a def test_ctor(self):
404n/a waiter = asyncio.Future(loop=self.loop)
405n/a tr = self.read_pipe_transport(waiter=waiter)
406n/a self.loop.run_until_complete(waiter)
407n/a
408n/a self.protocol.connection_made.assert_called_with(tr)
409n/a self.loop.assert_reader(5, tr._read_ready)
410n/a self.assertIsNone(waiter.result())
411n/a
412n/a @mock.patch('os.read')
413n/a def test__read_ready(self, m_read):
414n/a tr = self.read_pipe_transport()
415n/a m_read.return_value = b'data'
416n/a tr._read_ready()
417n/a
418n/a m_read.assert_called_with(5, tr.max_size)
419n/a self.protocol.data_received.assert_called_with(b'data')
420n/a
421n/a @mock.patch('os.read')
422n/a def test__read_ready_eof(self, m_read):
423n/a tr = self.read_pipe_transport()
424n/a m_read.return_value = b''
425n/a tr._read_ready()
426n/a
427n/a m_read.assert_called_with(5, tr.max_size)
428n/a self.assertFalse(self.loop.readers)
429n/a test_utils.run_briefly(self.loop)
430n/a self.protocol.eof_received.assert_called_with()
431n/a self.protocol.connection_lost.assert_called_with(None)
432n/a
433n/a @mock.patch('os.read')
434n/a def test__read_ready_blocked(self, m_read):
435n/a tr = self.read_pipe_transport()
436n/a m_read.side_effect = BlockingIOError
437n/a tr._read_ready()
438n/a
439n/a m_read.assert_called_with(5, tr.max_size)
440n/a test_utils.run_briefly(self.loop)
441n/a self.assertFalse(self.protocol.data_received.called)
442n/a
443n/a @mock.patch('asyncio.log.logger.error')
444n/a @mock.patch('os.read')
445n/a def test__read_ready_error(self, m_read, m_logexc):
446n/a tr = self.read_pipe_transport()
447n/a err = OSError()
448n/a m_read.side_effect = err
449n/a tr._close = mock.Mock()
450n/a tr._read_ready()
451n/a
452n/a m_read.assert_called_with(5, tr.max_size)
453n/a tr._close.assert_called_with(err)
454n/a m_logexc.assert_called_with(
455n/a test_utils.MockPattern(
456n/a 'Fatal read error on pipe transport'
457n/a '\nprotocol:.*\ntransport:.*'),
458n/a exc_info=(OSError, MOCK_ANY, MOCK_ANY))
459n/a
460n/a @mock.patch('os.read')
461n/a def test_pause_reading(self, m_read):
462n/a tr = self.read_pipe_transport()
463n/a m = mock.Mock()
464n/a self.loop.add_reader(5, m)
465n/a tr.pause_reading()
466n/a self.assertFalse(self.loop.readers)
467n/a
468n/a @mock.patch('os.read')
469n/a def test_resume_reading(self, m_read):
470n/a tr = self.read_pipe_transport()
471n/a tr.resume_reading()
472n/a self.loop.assert_reader(5, tr._read_ready)
473n/a
474n/a @mock.patch('os.read')
475n/a def test_close(self, m_read):
476n/a tr = self.read_pipe_transport()
477n/a tr._close = mock.Mock()
478n/a tr.close()
479n/a tr._close.assert_called_with(None)
480n/a
481n/a @mock.patch('os.read')
482n/a def test_close_already_closing(self, m_read):
483n/a tr = self.read_pipe_transport()
484n/a tr._closing = True
485n/a tr._close = mock.Mock()
486n/a tr.close()
487n/a self.assertFalse(tr._close.called)
488n/a
489n/a @mock.patch('os.read')
490n/a def test__close(self, m_read):
491n/a tr = self.read_pipe_transport()
492n/a err = object()
493n/a tr._close(err)
494n/a self.assertTrue(tr.is_closing())
495n/a self.assertFalse(self.loop.readers)
496n/a test_utils.run_briefly(self.loop)
497n/a self.protocol.connection_lost.assert_called_with(err)
498n/a
499n/a def test__call_connection_lost(self):
500n/a tr = self.read_pipe_transport()
501n/a self.assertIsNotNone(tr._protocol)
502n/a self.assertIsNotNone(tr._loop)
503n/a
504n/a err = None
505n/a tr._call_connection_lost(err)
506n/a self.protocol.connection_lost.assert_called_with(err)
507n/a self.pipe.close.assert_called_with()
508n/a
509n/a self.assertIsNone(tr._protocol)
510n/a self.assertIsNone(tr._loop)
511n/a
512n/a def test__call_connection_lost_with_err(self):
513n/a tr = self.read_pipe_transport()
514n/a self.assertIsNotNone(tr._protocol)
515n/a self.assertIsNotNone(tr._loop)
516n/a
517n/a err = OSError()
518n/a tr._call_connection_lost(err)
519n/a self.protocol.connection_lost.assert_called_with(err)
520n/a self.pipe.close.assert_called_with()
521n/a
522n/a self.assertIsNone(tr._protocol)
523n/a self.assertIsNone(tr._loop)
524n/a
525n/a
526n/aclass UnixWritePipeTransportTests(test_utils.TestCase):
527n/a
528n/a def setUp(self):
529n/a super().setUp()
530n/a self.loop = self.new_test_loop()
531n/a self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
532n/a self.pipe = mock.Mock(spec_set=io.RawIOBase)
533n/a self.pipe.fileno.return_value = 5
534n/a
535n/a blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
536n/a blocking_patcher.start()
537n/a self.addCleanup(blocking_patcher.stop)
538n/a
539n/a fstat_patcher = mock.patch('os.fstat')
540n/a m_fstat = fstat_patcher.start()
541n/a st = mock.Mock()
542n/a st.st_mode = stat.S_IFSOCK
543n/a m_fstat.return_value = st
544n/a self.addCleanup(fstat_patcher.stop)
545n/a
546n/a def write_pipe_transport(self, waiter=None):
547n/a transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
548n/a self.protocol,
549n/a waiter=waiter)
550n/a self.addCleanup(close_pipe_transport, transport)
551n/a return transport
552n/a
553n/a def test_ctor(self):
554n/a waiter = asyncio.Future(loop=self.loop)
555n/a tr = self.write_pipe_transport(waiter=waiter)
556n/a self.loop.run_until_complete(waiter)
557n/a
558n/a self.protocol.connection_made.assert_called_with(tr)
559n/a self.loop.assert_reader(5, tr._read_ready)
560n/a self.assertEqual(None, waiter.result())
561n/a
562n/a def test_can_write_eof(self):
563n/a tr = self.write_pipe_transport()
564n/a self.assertTrue(tr.can_write_eof())
565n/a
566n/a @mock.patch('os.write')
567n/a def test_write(self, m_write):
568n/a tr = self.write_pipe_transport()
569n/a m_write.return_value = 4
570n/a tr.write(b'data')
571n/a m_write.assert_called_with(5, b'data')
572n/a self.assertFalse(self.loop.writers)
573n/a self.assertEqual(bytearray(), tr._buffer)
574n/a
575n/a @mock.patch('os.write')
576n/a def test_write_no_data(self, m_write):
577n/a tr = self.write_pipe_transport()
578n/a tr.write(b'')
579n/a self.assertFalse(m_write.called)
580n/a self.assertFalse(self.loop.writers)
581n/a self.assertEqual(bytearray(b''), tr._buffer)
582n/a
583n/a @mock.patch('os.write')
584n/a def test_write_partial(self, m_write):
585n/a tr = self.write_pipe_transport()
586n/a m_write.return_value = 2
587n/a tr.write(b'data')
588n/a self.loop.assert_writer(5, tr._write_ready)
589n/a self.assertEqual(bytearray(b'ta'), tr._buffer)
590n/a
591n/a @mock.patch('os.write')
592n/a def test_write_buffer(self, m_write):
593n/a tr = self.write_pipe_transport()
594n/a self.loop.add_writer(5, tr._write_ready)
595n/a tr._buffer = bytearray(b'previous')
596n/a tr.write(b'data')
597n/a self.assertFalse(m_write.called)
598n/a self.loop.assert_writer(5, tr._write_ready)
599n/a self.assertEqual(bytearray(b'previousdata'), tr._buffer)
600n/a
601n/a @mock.patch('os.write')
602n/a def test_write_again(self, m_write):
603n/a tr = self.write_pipe_transport()
604n/a m_write.side_effect = BlockingIOError()
605n/a tr.write(b'data')
606n/a m_write.assert_called_with(5, bytearray(b'data'))
607n/a self.loop.assert_writer(5, tr._write_ready)
608n/a self.assertEqual(bytearray(b'data'), tr._buffer)
609n/a
610n/a @mock.patch('asyncio.unix_events.logger')
611n/a @mock.patch('os.write')
612n/a def test_write_err(self, m_write, m_log):
613n/a tr = self.write_pipe_transport()
614n/a err = OSError()
615n/a m_write.side_effect = err
616n/a tr._fatal_error = mock.Mock()
617n/a tr.write(b'data')
618n/a m_write.assert_called_with(5, b'data')
619n/a self.assertFalse(self.loop.writers)
620n/a self.assertEqual(bytearray(), tr._buffer)
621n/a tr._fatal_error.assert_called_with(
622n/a err,
623n/a 'Fatal write error on pipe transport')
624n/a self.assertEqual(1, tr._conn_lost)
625n/a
626n/a tr.write(b'data')
627n/a self.assertEqual(2, tr._conn_lost)
628n/a tr.write(b'data')
629n/a tr.write(b'data')
630n/a tr.write(b'data')
631n/a tr.write(b'data')
632n/a # This is a bit overspecified. :-(
633n/a m_log.warning.assert_called_with(
634n/a 'pipe closed by peer or os.write(pipe, data) raised exception.')
635n/a tr.close()
636n/a
637n/a @mock.patch('os.write')
638n/a def test_write_close(self, m_write):
639n/a tr = self.write_pipe_transport()
640n/a tr._read_ready() # pipe was closed by peer
641n/a
642n/a tr.write(b'data')
643n/a self.assertEqual(tr._conn_lost, 1)
644n/a tr.write(b'data')
645n/a self.assertEqual(tr._conn_lost, 2)
646n/a
647n/a def test__read_ready(self):
648n/a tr = self.write_pipe_transport()
649n/a tr._read_ready()
650n/a self.assertFalse(self.loop.readers)
651n/a self.assertFalse(self.loop.writers)
652n/a self.assertTrue(tr.is_closing())
653n/a test_utils.run_briefly(self.loop)
654n/a self.protocol.connection_lost.assert_called_with(None)
655n/a
656n/a @mock.patch('os.write')
657n/a def test__write_ready(self, m_write):
658n/a tr = self.write_pipe_transport()
659n/a self.loop.add_writer(5, tr._write_ready)
660n/a tr._buffer = bytearray(b'data')
661n/a m_write.return_value = 4
662n/a tr._write_ready()
663n/a self.assertFalse(self.loop.writers)
664n/a self.assertEqual(bytearray(), tr._buffer)
665n/a
666n/a @mock.patch('os.write')
667n/a def test__write_ready_partial(self, m_write):
668n/a tr = self.write_pipe_transport()
669n/a self.loop.add_writer(5, tr._write_ready)
670n/a tr._buffer = bytearray(b'data')
671n/a m_write.return_value = 3
672n/a tr._write_ready()
673n/a self.loop.assert_writer(5, tr._write_ready)
674n/a self.assertEqual(bytearray(b'a'), tr._buffer)
675n/a
676n/a @mock.patch('os.write')
677n/a def test__write_ready_again(self, m_write):
678n/a tr = self.write_pipe_transport()
679n/a self.loop.add_writer(5, tr._write_ready)
680n/a tr._buffer = bytearray(b'data')
681n/a m_write.side_effect = BlockingIOError()
682n/a tr._write_ready()
683n/a m_write.assert_called_with(5, bytearray(b'data'))
684n/a self.loop.assert_writer(5, tr._write_ready)
685n/a self.assertEqual(bytearray(b'data'), tr._buffer)
686n/a
687n/a @mock.patch('os.write')
688n/a def test__write_ready_empty(self, m_write):
689n/a tr = self.write_pipe_transport()
690n/a self.loop.add_writer(5, tr._write_ready)
691n/a tr._buffer = bytearray(b'data')
692n/a m_write.return_value = 0
693n/a tr._write_ready()
694n/a m_write.assert_called_with(5, bytearray(b'data'))
695n/a self.loop.assert_writer(5, tr._write_ready)
696n/a self.assertEqual(bytearray(b'data'), tr._buffer)
697n/a
698n/a @mock.patch('asyncio.log.logger.error')
699n/a @mock.patch('os.write')
700n/a def test__write_ready_err(self, m_write, m_logexc):
701n/a tr = self.write_pipe_transport()
702n/a self.loop.add_writer(5, tr._write_ready)
703n/a tr._buffer = bytearray(b'data')
704n/a m_write.side_effect = err = OSError()
705n/a tr._write_ready()
706n/a self.assertFalse(self.loop.writers)
707n/a self.assertFalse(self.loop.readers)
708n/a self.assertEqual(bytearray(), tr._buffer)
709n/a self.assertTrue(tr.is_closing())
710n/a m_logexc.assert_called_with(
711n/a test_utils.MockPattern(
712n/a 'Fatal write error on pipe transport'
713n/a '\nprotocol:.*\ntransport:.*'),
714n/a exc_info=(OSError, MOCK_ANY, MOCK_ANY))
715n/a self.assertEqual(1, tr._conn_lost)
716n/a test_utils.run_briefly(self.loop)
717n/a self.protocol.connection_lost.assert_called_with(err)
718n/a
719n/a @mock.patch('os.write')
720n/a def test__write_ready_closing(self, m_write):
721n/a tr = self.write_pipe_transport()
722n/a self.loop.add_writer(5, tr._write_ready)
723n/a tr._closing = True
724n/a tr._buffer = bytearray(b'data')
725n/a m_write.return_value = 4
726n/a tr._write_ready()
727n/a self.assertFalse(self.loop.writers)
728n/a self.assertFalse(self.loop.readers)
729n/a self.assertEqual(bytearray(), tr._buffer)
730n/a self.protocol.connection_lost.assert_called_with(None)
731n/a self.pipe.close.assert_called_with()
732n/a
733n/a @mock.patch('os.write')
734n/a def test_abort(self, m_write):
735n/a tr = self.write_pipe_transport()
736n/a self.loop.add_writer(5, tr._write_ready)
737n/a self.loop.add_reader(5, tr._read_ready)
738n/a tr._buffer = [b'da', b'ta']
739n/a tr.abort()
740n/a self.assertFalse(m_write.called)
741n/a self.assertFalse(self.loop.readers)
742n/a self.assertFalse(self.loop.writers)
743n/a self.assertEqual([], tr._buffer)
744n/a self.assertTrue(tr.is_closing())
745n/a test_utils.run_briefly(self.loop)
746n/a self.protocol.connection_lost.assert_called_with(None)
747n/a
748n/a def test__call_connection_lost(self):
749n/a tr = self.write_pipe_transport()
750n/a self.assertIsNotNone(tr._protocol)
751n/a self.assertIsNotNone(tr._loop)
752n/a
753n/a err = None
754n/a tr._call_connection_lost(err)
755n/a self.protocol.connection_lost.assert_called_with(err)
756n/a self.pipe.close.assert_called_with()
757n/a
758n/a self.assertIsNone(tr._protocol)
759n/a self.assertIsNone(tr._loop)
760n/a
761n/a def test__call_connection_lost_with_err(self):
762n/a tr = self.write_pipe_transport()
763n/a self.assertIsNotNone(tr._protocol)
764n/a self.assertIsNotNone(tr._loop)
765n/a
766n/a err = OSError()
767n/a tr._call_connection_lost(err)
768n/a self.protocol.connection_lost.assert_called_with(err)
769n/a self.pipe.close.assert_called_with()
770n/a
771n/a self.assertIsNone(tr._protocol)
772n/a self.assertIsNone(tr._loop)
773n/a
774n/a def test_close(self):
775n/a tr = self.write_pipe_transport()
776n/a tr.write_eof = mock.Mock()
777n/a tr.close()
778n/a tr.write_eof.assert_called_with()
779n/a
780n/a # closing the transport twice must not fail
781n/a tr.close()
782n/a
783n/a def test_close_closing(self):
784n/a tr = self.write_pipe_transport()
785n/a tr.write_eof = mock.Mock()
786n/a tr._closing = True
787n/a tr.close()
788n/a self.assertFalse(tr.write_eof.called)
789n/a
790n/a def test_write_eof(self):
791n/a tr = self.write_pipe_transport()
792n/a tr.write_eof()
793n/a self.assertTrue(tr.is_closing())
794n/a self.assertFalse(self.loop.readers)
795n/a test_utils.run_briefly(self.loop)
796n/a self.protocol.connection_lost.assert_called_with(None)
797n/a
798n/a def test_write_eof_pending(self):
799n/a tr = self.write_pipe_transport()
800n/a tr._buffer = [b'data']
801n/a tr.write_eof()
802n/a self.assertTrue(tr.is_closing())
803n/a self.assertFalse(self.protocol.connection_lost.called)
804n/a
805n/a
806n/aclass AbstractChildWatcherTests(unittest.TestCase):
807n/a
808n/a def test_not_implemented(self):
809n/a f = mock.Mock()
810n/a watcher = asyncio.AbstractChildWatcher()
811n/a self.assertRaises(
812n/a NotImplementedError, watcher.add_child_handler, f, f)
813n/a self.assertRaises(
814n/a NotImplementedError, watcher.remove_child_handler, f)
815n/a self.assertRaises(
816n/a NotImplementedError, watcher.attach_loop, f)
817n/a self.assertRaises(
818n/a NotImplementedError, watcher.close)
819n/a self.assertRaises(
820n/a NotImplementedError, watcher.__enter__)
821n/a self.assertRaises(
822n/a NotImplementedError, watcher.__exit__, f, f, f)
823n/a
824n/a
825n/aclass BaseChildWatcherTests(unittest.TestCase):
826n/a
827n/a def test_not_implemented(self):
828n/a f = mock.Mock()
829n/a watcher = unix_events.BaseChildWatcher()
830n/a self.assertRaises(
831n/a NotImplementedError, watcher._do_waitpid, f)
832n/a
833n/a
834n/aWaitPidMocks = collections.namedtuple("WaitPidMocks",
835n/a ("waitpid",
836n/a "WIFEXITED",
837n/a "WIFSIGNALED",
838n/a "WEXITSTATUS",
839n/a "WTERMSIG",
840n/a ))
841n/a
842n/a
843n/aclass ChildWatcherTestsMixin:
844n/a
845n/a ignore_warnings = mock.patch.object(log.logger, "warning")
846n/a
847n/a def setUp(self):
848n/a super().setUp()
849n/a self.loop = self.new_test_loop()
850n/a self.running = False
851n/a self.zombies = {}
852n/a
853n/a with mock.patch.object(
854n/a self.loop, "add_signal_handler") as self.m_add_signal_handler:
855n/a self.watcher = self.create_watcher()
856n/a self.watcher.attach_loop(self.loop)
857n/a
858n/a def waitpid(self, pid, flags):
859n/a if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
860n/a self.assertGreater(pid, 0)
861n/a try:
862n/a if pid < 0:
863n/a return self.zombies.popitem()
864n/a else:
865n/a return pid, self.zombies.pop(pid)
866n/a except KeyError:
867n/a pass
868n/a if self.running:
869n/a return 0, 0
870n/a else:
871n/a raise ChildProcessError()
872n/a
873n/a def add_zombie(self, pid, returncode):
874n/a self.zombies[pid] = returncode + 32768
875n/a
876n/a def WIFEXITED(self, status):
877n/a return status >= 32768
878n/a
879n/a def WIFSIGNALED(self, status):
880n/a return 32700 < status < 32768
881n/a
882n/a def WEXITSTATUS(self, status):
883n/a self.assertTrue(self.WIFEXITED(status))
884n/a return status - 32768
885n/a
886n/a def WTERMSIG(self, status):
887n/a self.assertTrue(self.WIFSIGNALED(status))
888n/a return 32768 - status
889n/a
890n/a def test_create_watcher(self):
891n/a self.m_add_signal_handler.assert_called_once_with(
892n/a signal.SIGCHLD, self.watcher._sig_chld)
893n/a
894n/a def waitpid_mocks(func):
895n/a def wrapped_func(self):
896n/a def patch(target, wrapper):
897n/a return mock.patch(target, wraps=wrapper,
898n/a new_callable=mock.Mock)
899n/a
900n/a with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
901n/a patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
902n/a patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
903n/a patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
904n/a patch('os.waitpid', self.waitpid) as m_waitpid:
905n/a func(self, WaitPidMocks(m_waitpid,
906n/a m_WIFEXITED, m_WIFSIGNALED,
907n/a m_WEXITSTATUS, m_WTERMSIG,
908n/a ))
909n/a return wrapped_func
910n/a
911n/a @waitpid_mocks
912n/a def test_sigchld(self, m):
913n/a # register a child
914n/a callback = mock.Mock()
915n/a
916n/a with self.watcher:
917n/a self.running = True
918n/a self.watcher.add_child_handler(42, callback, 9, 10, 14)
919n/a
920n/a self.assertFalse(callback.called)
921n/a self.assertFalse(m.WIFEXITED.called)
922n/a self.assertFalse(m.WIFSIGNALED.called)
923n/a self.assertFalse(m.WEXITSTATUS.called)
924n/a self.assertFalse(m.WTERMSIG.called)
925n/a
926n/a # child is running
927n/a self.watcher._sig_chld()
928n/a
929n/a self.assertFalse(callback.called)
930n/a self.assertFalse(m.WIFEXITED.called)
931n/a self.assertFalse(m.WIFSIGNALED.called)
932n/a self.assertFalse(m.WEXITSTATUS.called)
933n/a self.assertFalse(m.WTERMSIG.called)
934n/a
935n/a # child terminates (returncode 12)
936n/a self.running = False
937n/a self.add_zombie(42, 12)
938n/a self.watcher._sig_chld()
939n/a
940n/a self.assertTrue(m.WIFEXITED.called)
941n/a self.assertTrue(m.WEXITSTATUS.called)
942n/a self.assertFalse(m.WTERMSIG.called)
943n/a callback.assert_called_once_with(42, 12, 9, 10, 14)
944n/a
945n/a m.WIFSIGNALED.reset_mock()
946n/a m.WIFEXITED.reset_mock()
947n/a m.WEXITSTATUS.reset_mock()
948n/a callback.reset_mock()
949n/a
950n/a # ensure that the child is effectively reaped
951n/a self.add_zombie(42, 13)
952n/a with self.ignore_warnings:
953n/a self.watcher._sig_chld()
954n/a
955n/a self.assertFalse(callback.called)
956n/a self.assertFalse(m.WTERMSIG.called)
957n/a
958n/a m.WIFSIGNALED.reset_mock()
959n/a m.WIFEXITED.reset_mock()
960n/a m.WEXITSTATUS.reset_mock()
961n/a
962n/a # sigchld called again
963n/a self.zombies.clear()
964n/a self.watcher._sig_chld()
965n/a
966n/a self.assertFalse(callback.called)
967n/a self.assertFalse(m.WIFEXITED.called)
968n/a self.assertFalse(m.WIFSIGNALED.called)
969n/a self.assertFalse(m.WEXITSTATUS.called)
970n/a self.assertFalse(m.WTERMSIG.called)
971n/a
972n/a @waitpid_mocks
973n/a def test_sigchld_two_children(self, m):
974n/a callback1 = mock.Mock()
975n/a callback2 = mock.Mock()
976n/a
977n/a # register child 1
978n/a with self.watcher:
979n/a self.running = True
980n/a self.watcher.add_child_handler(43, callback1, 7, 8)
981n/a
982n/a self.assertFalse(callback1.called)
983n/a self.assertFalse(callback2.called)
984n/a self.assertFalse(m.WIFEXITED.called)
985n/a self.assertFalse(m.WIFSIGNALED.called)
986n/a self.assertFalse(m.WEXITSTATUS.called)
987n/a self.assertFalse(m.WTERMSIG.called)
988n/a
989n/a # register child 2
990n/a with self.watcher:
991n/a self.watcher.add_child_handler(44, callback2, 147, 18)
992n/a
993n/a self.assertFalse(callback1.called)
994n/a self.assertFalse(callback2.called)
995n/a self.assertFalse(m.WIFEXITED.called)
996n/a self.assertFalse(m.WIFSIGNALED.called)
997n/a self.assertFalse(m.WEXITSTATUS.called)
998n/a self.assertFalse(m.WTERMSIG.called)
999n/a
1000n/a # children are running
1001n/a self.watcher._sig_chld()
1002n/a
1003n/a self.assertFalse(callback1.called)
1004n/a self.assertFalse(callback2.called)
1005n/a self.assertFalse(m.WIFEXITED.called)
1006n/a self.assertFalse(m.WIFSIGNALED.called)
1007n/a self.assertFalse(m.WEXITSTATUS.called)
1008n/a self.assertFalse(m.WTERMSIG.called)
1009n/a
1010n/a # child 1 terminates (signal 3)
1011n/a self.add_zombie(43, -3)
1012n/a self.watcher._sig_chld()
1013n/a
1014n/a callback1.assert_called_once_with(43, -3, 7, 8)
1015n/a self.assertFalse(callback2.called)
1016n/a self.assertTrue(m.WIFSIGNALED.called)
1017n/a self.assertFalse(m.WEXITSTATUS.called)
1018n/a self.assertTrue(m.WTERMSIG.called)
1019n/a
1020n/a m.WIFSIGNALED.reset_mock()
1021n/a m.WIFEXITED.reset_mock()
1022n/a m.WTERMSIG.reset_mock()
1023n/a callback1.reset_mock()
1024n/a
1025n/a # child 2 still running
1026n/a self.watcher._sig_chld()
1027n/a
1028n/a self.assertFalse(callback1.called)
1029n/a self.assertFalse(callback2.called)
1030n/a self.assertFalse(m.WIFEXITED.called)
1031n/a self.assertFalse(m.WIFSIGNALED.called)
1032n/a self.assertFalse(m.WEXITSTATUS.called)
1033n/a self.assertFalse(m.WTERMSIG.called)
1034n/a
1035n/a # child 2 terminates (code 108)
1036n/a self.add_zombie(44, 108)
1037n/a self.running = False
1038n/a self.watcher._sig_chld()
1039n/a
1040n/a callback2.assert_called_once_with(44, 108, 147, 18)
1041n/a self.assertFalse(callback1.called)
1042n/a self.assertTrue(m.WIFEXITED.called)
1043n/a self.assertTrue(m.WEXITSTATUS.called)
1044n/a self.assertFalse(m.WTERMSIG.called)
1045n/a
1046n/a m.WIFSIGNALED.reset_mock()
1047n/a m.WIFEXITED.reset_mock()
1048n/a m.WEXITSTATUS.reset_mock()
1049n/a callback2.reset_mock()
1050n/a
1051n/a # ensure that the children are effectively reaped
1052n/a self.add_zombie(43, 14)
1053n/a self.add_zombie(44, 15)
1054n/a with self.ignore_warnings:
1055n/a self.watcher._sig_chld()
1056n/a
1057n/a self.assertFalse(callback1.called)
1058n/a self.assertFalse(callback2.called)
1059n/a self.assertFalse(m.WTERMSIG.called)
1060n/a
1061n/a m.WIFSIGNALED.reset_mock()
1062n/a m.WIFEXITED.reset_mock()
1063n/a m.WEXITSTATUS.reset_mock()
1064n/a
1065n/a # sigchld called again
1066n/a self.zombies.clear()
1067n/a self.watcher._sig_chld()
1068n/a
1069n/a self.assertFalse(callback1.called)
1070n/a self.assertFalse(callback2.called)
1071n/a self.assertFalse(m.WIFEXITED.called)
1072n/a self.assertFalse(m.WIFSIGNALED.called)
1073n/a self.assertFalse(m.WEXITSTATUS.called)
1074n/a self.assertFalse(m.WTERMSIG.called)
1075n/a
1076n/a @waitpid_mocks
1077n/a def test_sigchld_two_children_terminating_together(self, m):
1078n/a callback1 = mock.Mock()
1079n/a callback2 = mock.Mock()
1080n/a
1081n/a # register child 1
1082n/a with self.watcher:
1083n/a self.running = True
1084n/a self.watcher.add_child_handler(45, callback1, 17, 8)
1085n/a
1086n/a self.assertFalse(callback1.called)
1087n/a self.assertFalse(callback2.called)
1088n/a self.assertFalse(m.WIFEXITED.called)
1089n/a self.assertFalse(m.WIFSIGNALED.called)
1090n/a self.assertFalse(m.WEXITSTATUS.called)
1091n/a self.assertFalse(m.WTERMSIG.called)
1092n/a
1093n/a # register child 2
1094n/a with self.watcher:
1095n/a self.watcher.add_child_handler(46, callback2, 1147, 18)
1096n/a
1097n/a self.assertFalse(callback1.called)
1098n/a self.assertFalse(callback2.called)
1099n/a self.assertFalse(m.WIFEXITED.called)
1100n/a self.assertFalse(m.WIFSIGNALED.called)
1101n/a self.assertFalse(m.WEXITSTATUS.called)
1102n/a self.assertFalse(m.WTERMSIG.called)
1103n/a
1104n/a # children are running
1105n/a self.watcher._sig_chld()
1106n/a
1107n/a self.assertFalse(callback1.called)
1108n/a self.assertFalse(callback2.called)
1109n/a self.assertFalse(m.WIFEXITED.called)
1110n/a self.assertFalse(m.WIFSIGNALED.called)
1111n/a self.assertFalse(m.WEXITSTATUS.called)
1112n/a self.assertFalse(m.WTERMSIG.called)
1113n/a
1114n/a # child 1 terminates (code 78)
1115n/a # child 2 terminates (signal 5)
1116n/a self.add_zombie(45, 78)
1117n/a self.add_zombie(46, -5)
1118n/a self.running = False
1119n/a self.watcher._sig_chld()
1120n/a
1121n/a callback1.assert_called_once_with(45, 78, 17, 8)
1122n/a callback2.assert_called_once_with(46, -5, 1147, 18)
1123n/a self.assertTrue(m.WIFSIGNALED.called)
1124n/a self.assertTrue(m.WIFEXITED.called)
1125n/a self.assertTrue(m.WEXITSTATUS.called)
1126n/a self.assertTrue(m.WTERMSIG.called)
1127n/a
1128n/a m.WIFSIGNALED.reset_mock()
1129n/a m.WIFEXITED.reset_mock()
1130n/a m.WTERMSIG.reset_mock()
1131n/a m.WEXITSTATUS.reset_mock()
1132n/a callback1.reset_mock()
1133n/a callback2.reset_mock()
1134n/a
1135n/a # ensure that the children are effectively reaped
1136n/a self.add_zombie(45, 14)
1137n/a self.add_zombie(46, 15)
1138n/a with self.ignore_warnings:
1139n/a self.watcher._sig_chld()
1140n/a
1141n/a self.assertFalse(callback1.called)
1142n/a self.assertFalse(callback2.called)
1143n/a self.assertFalse(m.WTERMSIG.called)
1144n/a
1145n/a @waitpid_mocks
1146n/a def test_sigchld_race_condition(self, m):
1147n/a # register a child
1148n/a callback = mock.Mock()
1149n/a
1150n/a with self.watcher:
1151n/a # child terminates before being registered
1152n/a self.add_zombie(50, 4)
1153n/a self.watcher._sig_chld()
1154n/a
1155n/a self.watcher.add_child_handler(50, callback, 1, 12)
1156n/a
1157n/a callback.assert_called_once_with(50, 4, 1, 12)
1158n/a callback.reset_mock()
1159n/a
1160n/a # ensure that the child is effectively reaped
1161n/a self.add_zombie(50, -1)
1162n/a with self.ignore_warnings:
1163n/a self.watcher._sig_chld()
1164n/a
1165n/a self.assertFalse(callback.called)
1166n/a
1167n/a @waitpid_mocks
1168n/a def test_sigchld_replace_handler(self, m):
1169n/a callback1 = mock.Mock()
1170n/a callback2 = mock.Mock()
1171n/a
1172n/a # register a child
1173n/a with self.watcher:
1174n/a self.running = True
1175n/a self.watcher.add_child_handler(51, callback1, 19)
1176n/a
1177n/a self.assertFalse(callback1.called)
1178n/a self.assertFalse(callback2.called)
1179n/a self.assertFalse(m.WIFEXITED.called)
1180n/a self.assertFalse(m.WIFSIGNALED.called)
1181n/a self.assertFalse(m.WEXITSTATUS.called)
1182n/a self.assertFalse(m.WTERMSIG.called)
1183n/a
1184n/a # register the same child again
1185n/a with self.watcher:
1186n/a self.watcher.add_child_handler(51, callback2, 21)
1187n/a
1188n/a self.assertFalse(callback1.called)
1189n/a self.assertFalse(callback2.called)
1190n/a self.assertFalse(m.WIFEXITED.called)
1191n/a self.assertFalse(m.WIFSIGNALED.called)
1192n/a self.assertFalse(m.WEXITSTATUS.called)
1193n/a self.assertFalse(m.WTERMSIG.called)
1194n/a
1195n/a # child terminates (signal 8)
1196n/a self.running = False
1197n/a self.add_zombie(51, -8)
1198n/a self.watcher._sig_chld()
1199n/a
1200n/a callback2.assert_called_once_with(51, -8, 21)
1201n/a self.assertFalse(callback1.called)
1202n/a self.assertTrue(m.WIFSIGNALED.called)
1203n/a self.assertFalse(m.WEXITSTATUS.called)
1204n/a self.assertTrue(m.WTERMSIG.called)
1205n/a
1206n/a m.WIFSIGNALED.reset_mock()
1207n/a m.WIFEXITED.reset_mock()
1208n/a m.WTERMSIG.reset_mock()
1209n/a callback2.reset_mock()
1210n/a
1211n/a # ensure that the child is effectively reaped
1212n/a self.add_zombie(51, 13)
1213n/a with self.ignore_warnings:
1214n/a self.watcher._sig_chld()
1215n/a
1216n/a self.assertFalse(callback1.called)
1217n/a self.assertFalse(callback2.called)
1218n/a self.assertFalse(m.WTERMSIG.called)
1219n/a
1220n/a @waitpid_mocks
1221n/a def test_sigchld_remove_handler(self, m):
1222n/a callback = mock.Mock()
1223n/a
1224n/a # register a child
1225n/a with self.watcher:
1226n/a self.running = True
1227n/a self.watcher.add_child_handler(52, callback, 1984)
1228n/a
1229n/a self.assertFalse(callback.called)
1230n/a self.assertFalse(m.WIFEXITED.called)
1231n/a self.assertFalse(m.WIFSIGNALED.called)
1232n/a self.assertFalse(m.WEXITSTATUS.called)
1233n/a self.assertFalse(m.WTERMSIG.called)
1234n/a
1235n/a # unregister the child
1236n/a self.watcher.remove_child_handler(52)
1237n/a
1238n/a self.assertFalse(callback.called)
1239n/a self.assertFalse(m.WIFEXITED.called)
1240n/a self.assertFalse(m.WIFSIGNALED.called)
1241n/a self.assertFalse(m.WEXITSTATUS.called)
1242n/a self.assertFalse(m.WTERMSIG.called)
1243n/a
1244n/a # child terminates (code 99)
1245n/a self.running = False
1246n/a self.add_zombie(52, 99)
1247n/a with self.ignore_warnings:
1248n/a self.watcher._sig_chld()
1249n/a
1250n/a self.assertFalse(callback.called)
1251n/a
1252n/a @waitpid_mocks
1253n/a def test_sigchld_unknown_status(self, m):
1254n/a callback = mock.Mock()
1255n/a
1256n/a # register a child
1257n/a with self.watcher:
1258n/a self.running = True
1259n/a self.watcher.add_child_handler(53, callback, -19)
1260n/a
1261n/a self.assertFalse(callback.called)
1262n/a self.assertFalse(m.WIFEXITED.called)
1263n/a self.assertFalse(m.WIFSIGNALED.called)
1264n/a self.assertFalse(m.WEXITSTATUS.called)
1265n/a self.assertFalse(m.WTERMSIG.called)
1266n/a
1267n/a # terminate with unknown status
1268n/a self.zombies[53] = 1178
1269n/a self.running = False
1270n/a self.watcher._sig_chld()
1271n/a
1272n/a callback.assert_called_once_with(53, 1178, -19)
1273n/a self.assertTrue(m.WIFEXITED.called)
1274n/a self.assertTrue(m.WIFSIGNALED.called)
1275n/a self.assertFalse(m.WEXITSTATUS.called)
1276n/a self.assertFalse(m.WTERMSIG.called)
1277n/a
1278n/a callback.reset_mock()
1279n/a m.WIFEXITED.reset_mock()
1280n/a m.WIFSIGNALED.reset_mock()
1281n/a
1282n/a # ensure that the child is effectively reaped
1283n/a self.add_zombie(53, 101)
1284n/a with self.ignore_warnings:
1285n/a self.watcher._sig_chld()
1286n/a
1287n/a self.assertFalse(callback.called)
1288n/a
1289n/a @waitpid_mocks
1290n/a def test_remove_child_handler(self, m):
1291n/a callback1 = mock.Mock()
1292n/a callback2 = mock.Mock()
1293n/a callback3 = mock.Mock()
1294n/a
1295n/a # register children
1296n/a with self.watcher:
1297n/a self.running = True
1298n/a self.watcher.add_child_handler(54, callback1, 1)
1299n/a self.watcher.add_child_handler(55, callback2, 2)
1300n/a self.watcher.add_child_handler(56, callback3, 3)
1301n/a
1302n/a # remove child handler 1
1303n/a self.assertTrue(self.watcher.remove_child_handler(54))
1304n/a
1305n/a # remove child handler 2 multiple times
1306n/a self.assertTrue(self.watcher.remove_child_handler(55))
1307n/a self.assertFalse(self.watcher.remove_child_handler(55))
1308n/a self.assertFalse(self.watcher.remove_child_handler(55))
1309n/a
1310n/a # all children terminate
1311n/a self.add_zombie(54, 0)
1312n/a self.add_zombie(55, 1)
1313n/a self.add_zombie(56, 2)
1314n/a self.running = False
1315n/a with self.ignore_warnings:
1316n/a self.watcher._sig_chld()
1317n/a
1318n/a self.assertFalse(callback1.called)
1319n/a self.assertFalse(callback2.called)
1320n/a callback3.assert_called_once_with(56, 2, 3)
1321n/a
1322n/a @waitpid_mocks
1323n/a def test_sigchld_unhandled_exception(self, m):
1324n/a callback = mock.Mock()
1325n/a
1326n/a # register a child
1327n/a with self.watcher:
1328n/a self.running = True
1329n/a self.watcher.add_child_handler(57, callback)
1330n/a
1331n/a # raise an exception
1332n/a m.waitpid.side_effect = ValueError
1333n/a
1334n/a with mock.patch.object(log.logger,
1335n/a 'error') as m_error:
1336n/a
1337n/a self.assertEqual(self.watcher._sig_chld(), None)
1338n/a self.assertTrue(m_error.called)
1339n/a
1340n/a @waitpid_mocks
1341n/a def test_sigchld_child_reaped_elsewhere(self, m):
1342n/a # register a child
1343n/a callback = mock.Mock()
1344n/a
1345n/a with self.watcher:
1346n/a self.running = True
1347n/a self.watcher.add_child_handler(58, callback)
1348n/a
1349n/a self.assertFalse(callback.called)
1350n/a self.assertFalse(m.WIFEXITED.called)
1351n/a self.assertFalse(m.WIFSIGNALED.called)
1352n/a self.assertFalse(m.WEXITSTATUS.called)
1353n/a self.assertFalse(m.WTERMSIG.called)
1354n/a
1355n/a # child terminates
1356n/a self.running = False
1357n/a self.add_zombie(58, 4)
1358n/a
1359n/a # waitpid is called elsewhere
1360n/a os.waitpid(58, os.WNOHANG)
1361n/a
1362n/a m.waitpid.reset_mock()
1363n/a
1364n/a # sigchld
1365n/a with self.ignore_warnings:
1366n/a self.watcher._sig_chld()
1367n/a
1368n/a if isinstance(self.watcher, asyncio.FastChildWatcher):
1369n/a # here the FastChildWatche enters a deadlock
1370n/a # (there is no way to prevent it)
1371n/a self.assertFalse(callback.called)
1372n/a else:
1373n/a callback.assert_called_once_with(58, 255)
1374n/a
1375n/a @waitpid_mocks
1376n/a def test_sigchld_unknown_pid_during_registration(self, m):
1377n/a # register two children
1378n/a callback1 = mock.Mock()
1379n/a callback2 = mock.Mock()
1380n/a
1381n/a with self.ignore_warnings, self.watcher:
1382n/a self.running = True
1383n/a # child 1 terminates
1384n/a self.add_zombie(591, 7)
1385n/a # an unknown child terminates
1386n/a self.add_zombie(593, 17)
1387n/a
1388n/a self.watcher._sig_chld()
1389n/a
1390n/a self.watcher.add_child_handler(591, callback1)
1391n/a self.watcher.add_child_handler(592, callback2)
1392n/a
1393n/a callback1.assert_called_once_with(591, 7)
1394n/a self.assertFalse(callback2.called)
1395n/a
1396n/a @waitpid_mocks
1397n/a def test_set_loop(self, m):
1398n/a # register a child
1399n/a callback = mock.Mock()
1400n/a
1401n/a with self.watcher:
1402n/a self.running = True
1403n/a self.watcher.add_child_handler(60, callback)
1404n/a
1405n/a # attach a new loop
1406n/a old_loop = self.loop
1407n/a self.loop = self.new_test_loop()
1408n/a patch = mock.patch.object
1409n/a
1410n/a with patch(old_loop, "remove_signal_handler") as m_old_remove, \
1411n/a patch(self.loop, "add_signal_handler") as m_new_add:
1412n/a
1413n/a self.watcher.attach_loop(self.loop)
1414n/a
1415n/a m_old_remove.assert_called_once_with(
1416n/a signal.SIGCHLD)
1417n/a m_new_add.assert_called_once_with(
1418n/a signal.SIGCHLD, self.watcher._sig_chld)
1419n/a
1420n/a # child terminates
1421n/a self.running = False
1422n/a self.add_zombie(60, 9)
1423n/a self.watcher._sig_chld()
1424n/a
1425n/a callback.assert_called_once_with(60, 9)
1426n/a
1427n/a @waitpid_mocks
1428n/a def test_set_loop_race_condition(self, m):
1429n/a # register 3 children
1430n/a callback1 = mock.Mock()
1431n/a callback2 = mock.Mock()
1432n/a callback3 = mock.Mock()
1433n/a
1434n/a with self.watcher:
1435n/a self.running = True
1436n/a self.watcher.add_child_handler(61, callback1)
1437n/a self.watcher.add_child_handler(62, callback2)
1438n/a self.watcher.add_child_handler(622, callback3)
1439n/a
1440n/a # detach the loop
1441n/a old_loop = self.loop
1442n/a self.loop = None
1443n/a
1444n/a with mock.patch.object(
1445n/a old_loop, "remove_signal_handler") as m_remove_signal_handler:
1446n/a
1447n/a with self.assertWarnsRegex(
1448n/a RuntimeWarning, 'A loop is being detached'):
1449n/a self.watcher.attach_loop(None)
1450n/a
1451n/a m_remove_signal_handler.assert_called_once_with(
1452n/a signal.SIGCHLD)
1453n/a
1454n/a # child 1 & 2 terminate
1455n/a self.add_zombie(61, 11)
1456n/a self.add_zombie(62, -5)
1457n/a
1458n/a # SIGCHLD was not caught
1459n/a self.assertFalse(callback1.called)
1460n/a self.assertFalse(callback2.called)
1461n/a self.assertFalse(callback3.called)
1462n/a
1463n/a # attach a new loop
1464n/a self.loop = self.new_test_loop()
1465n/a
1466n/a with mock.patch.object(
1467n/a self.loop, "add_signal_handler") as m_add_signal_handler:
1468n/a
1469n/a self.watcher.attach_loop(self.loop)
1470n/a
1471n/a m_add_signal_handler.assert_called_once_with(
1472n/a signal.SIGCHLD, self.watcher._sig_chld)
1473n/a callback1.assert_called_once_with(61, 11) # race condition!
1474n/a callback2.assert_called_once_with(62, -5) # race condition!
1475n/a self.assertFalse(callback3.called)
1476n/a
1477n/a callback1.reset_mock()
1478n/a callback2.reset_mock()
1479n/a
1480n/a # child 3 terminates
1481n/a self.running = False
1482n/a self.add_zombie(622, 19)
1483n/a self.watcher._sig_chld()
1484n/a
1485n/a self.assertFalse(callback1.called)
1486n/a self.assertFalse(callback2.called)
1487n/a callback3.assert_called_once_with(622, 19)
1488n/a
1489n/a @waitpid_mocks
1490n/a def test_close(self, m):
1491n/a # register two children
1492n/a callback1 = mock.Mock()
1493n/a
1494n/a with self.watcher:
1495n/a self.running = True
1496n/a # child 1 terminates
1497n/a self.add_zombie(63, 9)
1498n/a # other child terminates
1499n/a self.add_zombie(65, 18)
1500n/a self.watcher._sig_chld()
1501n/a
1502n/a self.watcher.add_child_handler(63, callback1)
1503n/a self.watcher.add_child_handler(64, callback1)
1504n/a
1505n/a self.assertEqual(len(self.watcher._callbacks), 1)
1506n/a if isinstance(self.watcher, asyncio.FastChildWatcher):
1507n/a self.assertEqual(len(self.watcher._zombies), 1)
1508n/a
1509n/a with mock.patch.object(
1510n/a self.loop,
1511n/a "remove_signal_handler") as m_remove_signal_handler:
1512n/a
1513n/a self.watcher.close()
1514n/a
1515n/a m_remove_signal_handler.assert_called_once_with(
1516n/a signal.SIGCHLD)
1517n/a self.assertFalse(self.watcher._callbacks)
1518n/a if isinstance(self.watcher, asyncio.FastChildWatcher):
1519n/a self.assertFalse(self.watcher._zombies)
1520n/a
1521n/a @waitpid_mocks
1522n/a def test_add_child_handler_with_no_loop_attached(self, m):
1523n/a callback = mock.Mock()
1524n/a with self.create_watcher() as watcher:
1525n/a with self.assertRaisesRegex(
1526n/a RuntimeError,
1527n/a 'the child watcher does not have a loop attached'):
1528n/a watcher.add_child_handler(100, callback)
1529n/a
1530n/a
1531n/aclass SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1532n/a def create_watcher(self):
1533n/a return asyncio.SafeChildWatcher()
1534n/a
1535n/a
1536n/aclass FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1537n/a def create_watcher(self):
1538n/a return asyncio.FastChildWatcher()
1539n/a
1540n/a
1541n/aclass PolicyTests(unittest.TestCase):
1542n/a
1543n/a def create_policy(self):
1544n/a return asyncio.DefaultEventLoopPolicy()
1545n/a
1546n/a def test_get_child_watcher(self):
1547n/a policy = self.create_policy()
1548n/a self.assertIsNone(policy._watcher)
1549n/a
1550n/a watcher = policy.get_child_watcher()
1551n/a self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1552n/a
1553n/a self.assertIs(policy._watcher, watcher)
1554n/a
1555n/a self.assertIs(watcher, policy.get_child_watcher())
1556n/a self.assertIsNone(watcher._loop)
1557n/a
1558n/a def test_get_child_watcher_after_set(self):
1559n/a policy = self.create_policy()
1560n/a watcher = asyncio.FastChildWatcher()
1561n/a
1562n/a policy.set_child_watcher(watcher)
1563n/a self.assertIs(policy._watcher, watcher)
1564n/a self.assertIs(watcher, policy.get_child_watcher())
1565n/a
1566n/a def test_get_child_watcher_with_mainloop_existing(self):
1567n/a policy = self.create_policy()
1568n/a loop = policy.get_event_loop()
1569n/a
1570n/a self.assertIsNone(policy._watcher)
1571n/a watcher = policy.get_child_watcher()
1572n/a
1573n/a self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1574n/a self.assertIs(watcher._loop, loop)
1575n/a
1576n/a loop.close()
1577n/a
1578n/a def test_get_child_watcher_thread(self):
1579n/a
1580n/a def f():
1581n/a policy.set_event_loop(policy.new_event_loop())
1582n/a
1583n/a self.assertIsInstance(policy.get_event_loop(),
1584n/a asyncio.AbstractEventLoop)
1585n/a watcher = policy.get_child_watcher()
1586n/a
1587n/a self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1588n/a self.assertIsNone(watcher._loop)
1589n/a
1590n/a policy.get_event_loop().close()
1591n/a
1592n/a policy = self.create_policy()
1593n/a
1594n/a th = threading.Thread(target=f)
1595n/a th.start()
1596n/a th.join()
1597n/a
1598n/a def test_child_watcher_replace_mainloop_existing(self):
1599n/a policy = self.create_policy()
1600n/a loop = policy.get_event_loop()
1601n/a
1602n/a watcher = policy.get_child_watcher()
1603n/a
1604n/a self.assertIs(watcher._loop, loop)
1605n/a
1606n/a new_loop = policy.new_event_loop()
1607n/a policy.set_event_loop(new_loop)
1608n/a
1609n/a self.assertIs(watcher._loop, new_loop)
1610n/a
1611n/a policy.set_event_loop(None)
1612n/a
1613n/a self.assertIs(watcher._loop, None)
1614n/a
1615n/a loop.close()
1616n/a new_loop.close()
1617n/a
1618n/a
1619n/aif __name__ == '__main__':
1620n/a unittest.main()