ยปCore Development>Code coverage>Lib/test/test_asyncio/test_streams.py

Python code coverage for Lib/test/test_asyncio/test_streams.py

#countcontent
1n/a"""Tests for streams.py."""
2n/a
3n/aimport gc
4n/aimport os
5n/aimport queue
6n/aimport socket
7n/aimport sys
8n/aimport threading
9n/aimport unittest
10n/afrom unittest import mock
11n/atry:
12n/a import ssl
13n/aexcept ImportError:
14n/a ssl = None
15n/a
16n/aimport asyncio
17n/afrom asyncio import test_utils
18n/a
19n/a
20n/aclass StreamReaderTests(test_utils.TestCase):
21n/a
22n/a DATA = b'line1\nline2\nline3\n'
23n/a
24n/a def setUp(self):
25n/a super().setUp()
26n/a self.loop = asyncio.new_event_loop()
27n/a self.set_event_loop(self.loop)
28n/a
29n/a def tearDown(self):
30n/a # just in case if we have transport close callbacks
31n/a test_utils.run_briefly(self.loop)
32n/a
33n/a self.loop.close()
34n/a gc.collect()
35n/a super().tearDown()
36n/a
37n/a @mock.patch('asyncio.streams.events')
38n/a def test_ctor_global_loop(self, m_events):
39n/a stream = asyncio.StreamReader()
40n/a self.assertIs(stream._loop, m_events.get_event_loop.return_value)
41n/a
42n/a def _basetest_open_connection(self, open_connection_fut):
43n/a reader, writer = self.loop.run_until_complete(open_connection_fut)
44n/a writer.write(b'GET / HTTP/1.0\r\n\r\n')
45n/a f = reader.readline()
46n/a data = self.loop.run_until_complete(f)
47n/a self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
48n/a f = reader.read()
49n/a data = self.loop.run_until_complete(f)
50n/a self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
51n/a writer.close()
52n/a
53n/a def test_open_connection(self):
54n/a with test_utils.run_test_server() as httpd:
55n/a conn_fut = asyncio.open_connection(*httpd.address,
56n/a loop=self.loop)
57n/a self._basetest_open_connection(conn_fut)
58n/a
59n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
60n/a def test_open_unix_connection(self):
61n/a with test_utils.run_test_unix_server() as httpd:
62n/a conn_fut = asyncio.open_unix_connection(httpd.address,
63n/a loop=self.loop)
64n/a self._basetest_open_connection(conn_fut)
65n/a
66n/a def _basetest_open_connection_no_loop_ssl(self, open_connection_fut):
67n/a try:
68n/a reader, writer = self.loop.run_until_complete(open_connection_fut)
69n/a finally:
70n/a asyncio.set_event_loop(None)
71n/a writer.write(b'GET / HTTP/1.0\r\n\r\n')
72n/a f = reader.read()
73n/a data = self.loop.run_until_complete(f)
74n/a self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
75n/a
76n/a writer.close()
77n/a
78n/a @unittest.skipIf(ssl is None, 'No ssl module')
79n/a def test_open_connection_no_loop_ssl(self):
80n/a with test_utils.run_test_server(use_ssl=True) as httpd:
81n/a conn_fut = asyncio.open_connection(
82n/a *httpd.address,
83n/a ssl=test_utils.dummy_ssl_context(),
84n/a loop=self.loop)
85n/a
86n/a self._basetest_open_connection_no_loop_ssl(conn_fut)
87n/a
88n/a @unittest.skipIf(ssl is None, 'No ssl module')
89n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
90n/a def test_open_unix_connection_no_loop_ssl(self):
91n/a with test_utils.run_test_unix_server(use_ssl=True) as httpd:
92n/a conn_fut = asyncio.open_unix_connection(
93n/a httpd.address,
94n/a ssl=test_utils.dummy_ssl_context(),
95n/a server_hostname='',
96n/a loop=self.loop)
97n/a
98n/a self._basetest_open_connection_no_loop_ssl(conn_fut)
99n/a
100n/a def _basetest_open_connection_error(self, open_connection_fut):
101n/a reader, writer = self.loop.run_until_complete(open_connection_fut)
102n/a writer._protocol.connection_lost(ZeroDivisionError())
103n/a f = reader.read()
104n/a with self.assertRaises(ZeroDivisionError):
105n/a self.loop.run_until_complete(f)
106n/a writer.close()
107n/a test_utils.run_briefly(self.loop)
108n/a
109n/a def test_open_connection_error(self):
110n/a with test_utils.run_test_server() as httpd:
111n/a conn_fut = asyncio.open_connection(*httpd.address,
112n/a loop=self.loop)
113n/a self._basetest_open_connection_error(conn_fut)
114n/a
115n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
116n/a def test_open_unix_connection_error(self):
117n/a with test_utils.run_test_unix_server() as httpd:
118n/a conn_fut = asyncio.open_unix_connection(httpd.address,
119n/a loop=self.loop)
120n/a self._basetest_open_connection_error(conn_fut)
121n/a
122n/a def test_feed_empty_data(self):
123n/a stream = asyncio.StreamReader(loop=self.loop)
124n/a
125n/a stream.feed_data(b'')
126n/a self.assertEqual(b'', stream._buffer)
127n/a
128n/a def test_feed_nonempty_data(self):
129n/a stream = asyncio.StreamReader(loop=self.loop)
130n/a
131n/a stream.feed_data(self.DATA)
132n/a self.assertEqual(self.DATA, stream._buffer)
133n/a
134n/a def test_read_zero(self):
135n/a # Read zero bytes.
136n/a stream = asyncio.StreamReader(loop=self.loop)
137n/a stream.feed_data(self.DATA)
138n/a
139n/a data = self.loop.run_until_complete(stream.read(0))
140n/a self.assertEqual(b'', data)
141n/a self.assertEqual(self.DATA, stream._buffer)
142n/a
143n/a def test_read(self):
144n/a # Read bytes.
145n/a stream = asyncio.StreamReader(loop=self.loop)
146n/a read_task = asyncio.Task(stream.read(30), loop=self.loop)
147n/a
148n/a def cb():
149n/a stream.feed_data(self.DATA)
150n/a self.loop.call_soon(cb)
151n/a
152n/a data = self.loop.run_until_complete(read_task)
153n/a self.assertEqual(self.DATA, data)
154n/a self.assertEqual(b'', stream._buffer)
155n/a
156n/a def test_read_line_breaks(self):
157n/a # Read bytes without line breaks.
158n/a stream = asyncio.StreamReader(loop=self.loop)
159n/a stream.feed_data(b'line1')
160n/a stream.feed_data(b'line2')
161n/a
162n/a data = self.loop.run_until_complete(stream.read(5))
163n/a
164n/a self.assertEqual(b'line1', data)
165n/a self.assertEqual(b'line2', stream._buffer)
166n/a
167n/a def test_read_eof(self):
168n/a # Read bytes, stop at eof.
169n/a stream = asyncio.StreamReader(loop=self.loop)
170n/a read_task = asyncio.Task(stream.read(1024), loop=self.loop)
171n/a
172n/a def cb():
173n/a stream.feed_eof()
174n/a self.loop.call_soon(cb)
175n/a
176n/a data = self.loop.run_until_complete(read_task)
177n/a self.assertEqual(b'', data)
178n/a self.assertEqual(b'', stream._buffer)
179n/a
180n/a def test_read_until_eof(self):
181n/a # Read all bytes until eof.
182n/a stream = asyncio.StreamReader(loop=self.loop)
183n/a read_task = asyncio.Task(stream.read(-1), loop=self.loop)
184n/a
185n/a def cb():
186n/a stream.feed_data(b'chunk1\n')
187n/a stream.feed_data(b'chunk2')
188n/a stream.feed_eof()
189n/a self.loop.call_soon(cb)
190n/a
191n/a data = self.loop.run_until_complete(read_task)
192n/a
193n/a self.assertEqual(b'chunk1\nchunk2', data)
194n/a self.assertEqual(b'', stream._buffer)
195n/a
196n/a def test_read_exception(self):
197n/a stream = asyncio.StreamReader(loop=self.loop)
198n/a stream.feed_data(b'line\n')
199n/a
200n/a data = self.loop.run_until_complete(stream.read(2))
201n/a self.assertEqual(b'li', data)
202n/a
203n/a stream.set_exception(ValueError())
204n/a self.assertRaises(
205n/a ValueError, self.loop.run_until_complete, stream.read(2))
206n/a
207n/a def test_invalid_limit(self):
208n/a with self.assertRaisesRegex(ValueError, 'imit'):
209n/a asyncio.StreamReader(limit=0, loop=self.loop)
210n/a
211n/a with self.assertRaisesRegex(ValueError, 'imit'):
212n/a asyncio.StreamReader(limit=-1, loop=self.loop)
213n/a
214n/a def test_read_limit(self):
215n/a stream = asyncio.StreamReader(limit=3, loop=self.loop)
216n/a stream.feed_data(b'chunk')
217n/a data = self.loop.run_until_complete(stream.read(5))
218n/a self.assertEqual(b'chunk', data)
219n/a self.assertEqual(b'', stream._buffer)
220n/a
221n/a def test_readline(self):
222n/a # Read one line. 'readline' will need to wait for the data
223n/a # to come from 'cb'
224n/a stream = asyncio.StreamReader(loop=self.loop)
225n/a stream.feed_data(b'chunk1 ')
226n/a read_task = asyncio.Task(stream.readline(), loop=self.loop)
227n/a
228n/a def cb():
229n/a stream.feed_data(b'chunk2 ')
230n/a stream.feed_data(b'chunk3 ')
231n/a stream.feed_data(b'\n chunk4')
232n/a self.loop.call_soon(cb)
233n/a
234n/a line = self.loop.run_until_complete(read_task)
235n/a self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
236n/a self.assertEqual(b' chunk4', stream._buffer)
237n/a
238n/a def test_readline_limit_with_existing_data(self):
239n/a # Read one line. The data is in StreamReader's buffer
240n/a # before the event loop is run.
241n/a
242n/a stream = asyncio.StreamReader(limit=3, loop=self.loop)
243n/a stream.feed_data(b'li')
244n/a stream.feed_data(b'ne1\nline2\n')
245n/a
246n/a self.assertRaises(
247n/a ValueError, self.loop.run_until_complete, stream.readline())
248n/a # The buffer should contain the remaining data after exception
249n/a self.assertEqual(b'line2\n', stream._buffer)
250n/a
251n/a stream = asyncio.StreamReader(limit=3, loop=self.loop)
252n/a stream.feed_data(b'li')
253n/a stream.feed_data(b'ne1')
254n/a stream.feed_data(b'li')
255n/a
256n/a self.assertRaises(
257n/a ValueError, self.loop.run_until_complete, stream.readline())
258n/a # No b'\n' at the end. The 'limit' is set to 3. So before
259n/a # waiting for the new data in buffer, 'readline' will consume
260n/a # the entire buffer, and since the length of the consumed data
261n/a # is more than 3, it will raise a ValueError. The buffer is
262n/a # expected to be empty now.
263n/a self.assertEqual(b'', stream._buffer)
264n/a
265n/a def test_at_eof(self):
266n/a stream = asyncio.StreamReader(loop=self.loop)
267n/a self.assertFalse(stream.at_eof())
268n/a
269n/a stream.feed_data(b'some data\n')
270n/a self.assertFalse(stream.at_eof())
271n/a
272n/a self.loop.run_until_complete(stream.readline())
273n/a self.assertFalse(stream.at_eof())
274n/a
275n/a stream.feed_data(b'some data\n')
276n/a stream.feed_eof()
277n/a self.loop.run_until_complete(stream.readline())
278n/a self.assertTrue(stream.at_eof())
279n/a
280n/a def test_readline_limit(self):
281n/a # Read one line. StreamReaders are fed with data after
282n/a # their 'readline' methods are called.
283n/a
284n/a stream = asyncio.StreamReader(limit=7, loop=self.loop)
285n/a def cb():
286n/a stream.feed_data(b'chunk1')
287n/a stream.feed_data(b'chunk2')
288n/a stream.feed_data(b'chunk3\n')
289n/a stream.feed_eof()
290n/a self.loop.call_soon(cb)
291n/a
292n/a self.assertRaises(
293n/a ValueError, self.loop.run_until_complete, stream.readline())
294n/a # The buffer had just one line of data, and after raising
295n/a # a ValueError it should be empty.
296n/a self.assertEqual(b'', stream._buffer)
297n/a
298n/a stream = asyncio.StreamReader(limit=7, loop=self.loop)
299n/a def cb():
300n/a stream.feed_data(b'chunk1')
301n/a stream.feed_data(b'chunk2\n')
302n/a stream.feed_data(b'chunk3\n')
303n/a stream.feed_eof()
304n/a self.loop.call_soon(cb)
305n/a
306n/a self.assertRaises(
307n/a ValueError, self.loop.run_until_complete, stream.readline())
308n/a self.assertEqual(b'chunk3\n', stream._buffer)
309n/a
310n/a # check strictness of the limit
311n/a stream = asyncio.StreamReader(limit=7, loop=self.loop)
312n/a stream.feed_data(b'1234567\n')
313n/a line = self.loop.run_until_complete(stream.readline())
314n/a self.assertEqual(b'1234567\n', line)
315n/a self.assertEqual(b'', stream._buffer)
316n/a
317n/a stream.feed_data(b'12345678\n')
318n/a with self.assertRaises(ValueError) as cm:
319n/a self.loop.run_until_complete(stream.readline())
320n/a self.assertEqual(b'', stream._buffer)
321n/a
322n/a stream.feed_data(b'12345678')
323n/a with self.assertRaises(ValueError) as cm:
324n/a self.loop.run_until_complete(stream.readline())
325n/a self.assertEqual(b'', stream._buffer)
326n/a
327n/a def test_readline_nolimit_nowait(self):
328n/a # All needed data for the first 'readline' call will be
329n/a # in the buffer.
330n/a stream = asyncio.StreamReader(loop=self.loop)
331n/a stream.feed_data(self.DATA[:6])
332n/a stream.feed_data(self.DATA[6:])
333n/a
334n/a line = self.loop.run_until_complete(stream.readline())
335n/a
336n/a self.assertEqual(b'line1\n', line)
337n/a self.assertEqual(b'line2\nline3\n', stream._buffer)
338n/a
339n/a def test_readline_eof(self):
340n/a stream = asyncio.StreamReader(loop=self.loop)
341n/a stream.feed_data(b'some data')
342n/a stream.feed_eof()
343n/a
344n/a line = self.loop.run_until_complete(stream.readline())
345n/a self.assertEqual(b'some data', line)
346n/a
347n/a def test_readline_empty_eof(self):
348n/a stream = asyncio.StreamReader(loop=self.loop)
349n/a stream.feed_eof()
350n/a
351n/a line = self.loop.run_until_complete(stream.readline())
352n/a self.assertEqual(b'', line)
353n/a
354n/a def test_readline_read_byte_count(self):
355n/a stream = asyncio.StreamReader(loop=self.loop)
356n/a stream.feed_data(self.DATA)
357n/a
358n/a self.loop.run_until_complete(stream.readline())
359n/a
360n/a data = self.loop.run_until_complete(stream.read(7))
361n/a
362n/a self.assertEqual(b'line2\nl', data)
363n/a self.assertEqual(b'ine3\n', stream._buffer)
364n/a
365n/a def test_readline_exception(self):
366n/a stream = asyncio.StreamReader(loop=self.loop)
367n/a stream.feed_data(b'line\n')
368n/a
369n/a data = self.loop.run_until_complete(stream.readline())
370n/a self.assertEqual(b'line\n', data)
371n/a
372n/a stream.set_exception(ValueError())
373n/a self.assertRaises(
374n/a ValueError, self.loop.run_until_complete, stream.readline())
375n/a self.assertEqual(b'', stream._buffer)
376n/a
377n/a def test_readuntil_separator(self):
378n/a stream = asyncio.StreamReader(loop=self.loop)
379n/a with self.assertRaisesRegex(ValueError, 'Separator should be'):
380n/a self.loop.run_until_complete(stream.readuntil(separator=b''))
381n/a
382n/a def test_readuntil_multi_chunks(self):
383n/a stream = asyncio.StreamReader(loop=self.loop)
384n/a
385n/a stream.feed_data(b'lineAAA')
386n/a data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))
387n/a self.assertEqual(b'lineAAA', data)
388n/a self.assertEqual(b'', stream._buffer)
389n/a
390n/a stream.feed_data(b'lineAAA')
391n/a data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
392n/a self.assertEqual(b'lineAAA', data)
393n/a self.assertEqual(b'', stream._buffer)
394n/a
395n/a stream.feed_data(b'lineAAAxxx')
396n/a data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
397n/a self.assertEqual(b'lineAAA', data)
398n/a self.assertEqual(b'xxx', stream._buffer)
399n/a
400n/a def test_readuntil_multi_chunks_1(self):
401n/a stream = asyncio.StreamReader(loop=self.loop)
402n/a
403n/a stream.feed_data(b'QWEaa')
404n/a stream.feed_data(b'XYaa')
405n/a stream.feed_data(b'a')
406n/a data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
407n/a self.assertEqual(b'QWEaaXYaaa', data)
408n/a self.assertEqual(b'', stream._buffer)
409n/a
410n/a stream.feed_data(b'QWEaa')
411n/a stream.feed_data(b'XYa')
412n/a stream.feed_data(b'aa')
413n/a data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
414n/a self.assertEqual(b'QWEaaXYaaa', data)
415n/a self.assertEqual(b'', stream._buffer)
416n/a
417n/a stream.feed_data(b'aaa')
418n/a data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
419n/a self.assertEqual(b'aaa', data)
420n/a self.assertEqual(b'', stream._buffer)
421n/a
422n/a stream.feed_data(b'Xaaa')
423n/a data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
424n/a self.assertEqual(b'Xaaa', data)
425n/a self.assertEqual(b'', stream._buffer)
426n/a
427n/a stream.feed_data(b'XXX')
428n/a stream.feed_data(b'a')
429n/a stream.feed_data(b'a')
430n/a stream.feed_data(b'a')
431n/a data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
432n/a self.assertEqual(b'XXXaaa', data)
433n/a self.assertEqual(b'', stream._buffer)
434n/a
435n/a def test_readuntil_eof(self):
436n/a stream = asyncio.StreamReader(loop=self.loop)
437n/a stream.feed_data(b'some dataAA')
438n/a stream.feed_eof()
439n/a
440n/a with self.assertRaises(asyncio.IncompleteReadError) as cm:
441n/a self.loop.run_until_complete(stream.readuntil(b'AAA'))
442n/a self.assertEqual(cm.exception.partial, b'some dataAA')
443n/a self.assertIsNone(cm.exception.expected)
444n/a self.assertEqual(b'', stream._buffer)
445n/a
446n/a def test_readuntil_limit_found_sep(self):
447n/a stream = asyncio.StreamReader(loop=self.loop, limit=3)
448n/a stream.feed_data(b'some dataAA')
449n/a
450n/a with self.assertRaisesRegex(asyncio.LimitOverrunError,
451n/a 'not found') as cm:
452n/a self.loop.run_until_complete(stream.readuntil(b'AAA'))
453n/a
454n/a self.assertEqual(b'some dataAA', stream._buffer)
455n/a
456n/a stream.feed_data(b'A')
457n/a with self.assertRaisesRegex(asyncio.LimitOverrunError,
458n/a 'is found') as cm:
459n/a self.loop.run_until_complete(stream.readuntil(b'AAA'))
460n/a
461n/a self.assertEqual(b'some dataAAA', stream._buffer)
462n/a
463n/a def test_readexactly_zero_or_less(self):
464n/a # Read exact number of bytes (zero or less).
465n/a stream = asyncio.StreamReader(loop=self.loop)
466n/a stream.feed_data(self.DATA)
467n/a
468n/a data = self.loop.run_until_complete(stream.readexactly(0))
469n/a self.assertEqual(b'', data)
470n/a self.assertEqual(self.DATA, stream._buffer)
471n/a
472n/a with self.assertRaisesRegex(ValueError, 'less than zero'):
473n/a self.loop.run_until_complete(stream.readexactly(-1))
474n/a self.assertEqual(self.DATA, stream._buffer)
475n/a
476n/a def test_readexactly(self):
477n/a # Read exact number of bytes.
478n/a stream = asyncio.StreamReader(loop=self.loop)
479n/a
480n/a n = 2 * len(self.DATA)
481n/a read_task = asyncio.Task(stream.readexactly(n), loop=self.loop)
482n/a
483n/a def cb():
484n/a stream.feed_data(self.DATA)
485n/a stream.feed_data(self.DATA)
486n/a stream.feed_data(self.DATA)
487n/a self.loop.call_soon(cb)
488n/a
489n/a data = self.loop.run_until_complete(read_task)
490n/a self.assertEqual(self.DATA + self.DATA, data)
491n/a self.assertEqual(self.DATA, stream._buffer)
492n/a
493n/a def test_readexactly_limit(self):
494n/a stream = asyncio.StreamReader(limit=3, loop=self.loop)
495n/a stream.feed_data(b'chunk')
496n/a data = self.loop.run_until_complete(stream.readexactly(5))
497n/a self.assertEqual(b'chunk', data)
498n/a self.assertEqual(b'', stream._buffer)
499n/a
500n/a def test_readexactly_eof(self):
501n/a # Read exact number of bytes (eof).
502n/a stream = asyncio.StreamReader(loop=self.loop)
503n/a n = 2 * len(self.DATA)
504n/a read_task = asyncio.Task(stream.readexactly(n), loop=self.loop)
505n/a
506n/a def cb():
507n/a stream.feed_data(self.DATA)
508n/a stream.feed_eof()
509n/a self.loop.call_soon(cb)
510n/a
511n/a with self.assertRaises(asyncio.IncompleteReadError) as cm:
512n/a self.loop.run_until_complete(read_task)
513n/a self.assertEqual(cm.exception.partial, self.DATA)
514n/a self.assertEqual(cm.exception.expected, n)
515n/a self.assertEqual(str(cm.exception),
516n/a '18 bytes read on a total of 36 expected bytes')
517n/a self.assertEqual(b'', stream._buffer)
518n/a
519n/a def test_readexactly_exception(self):
520n/a stream = asyncio.StreamReader(loop=self.loop)
521n/a stream.feed_data(b'line\n')
522n/a
523n/a data = self.loop.run_until_complete(stream.readexactly(2))
524n/a self.assertEqual(b'li', data)
525n/a
526n/a stream.set_exception(ValueError())
527n/a self.assertRaises(
528n/a ValueError, self.loop.run_until_complete, stream.readexactly(2))
529n/a
530n/a def test_exception(self):
531n/a stream = asyncio.StreamReader(loop=self.loop)
532n/a self.assertIsNone(stream.exception())
533n/a
534n/a exc = ValueError()
535n/a stream.set_exception(exc)
536n/a self.assertIs(stream.exception(), exc)
537n/a
538n/a def test_exception_waiter(self):
539n/a stream = asyncio.StreamReader(loop=self.loop)
540n/a
541n/a @asyncio.coroutine
542n/a def set_err():
543n/a stream.set_exception(ValueError())
544n/a
545n/a t1 = asyncio.Task(stream.readline(), loop=self.loop)
546n/a t2 = asyncio.Task(set_err(), loop=self.loop)
547n/a
548n/a self.loop.run_until_complete(asyncio.wait([t1, t2], loop=self.loop))
549n/a
550n/a self.assertRaises(ValueError, t1.result)
551n/a
552n/a def test_exception_cancel(self):
553n/a stream = asyncio.StreamReader(loop=self.loop)
554n/a
555n/a t = asyncio.Task(stream.readline(), loop=self.loop)
556n/a test_utils.run_briefly(self.loop)
557n/a t.cancel()
558n/a test_utils.run_briefly(self.loop)
559n/a # The following line fails if set_exception() isn't careful.
560n/a stream.set_exception(RuntimeError('message'))
561n/a test_utils.run_briefly(self.loop)
562n/a self.assertIs(stream._waiter, None)
563n/a
564n/a def test_start_server(self):
565n/a
566n/a class MyServer:
567n/a
568n/a def __init__(self, loop):
569n/a self.server = None
570n/a self.loop = loop
571n/a
572n/a @asyncio.coroutine
573n/a def handle_client(self, client_reader, client_writer):
574n/a data = yield from client_reader.readline()
575n/a client_writer.write(data)
576n/a yield from client_writer.drain()
577n/a client_writer.close()
578n/a
579n/a def start(self):
580n/a sock = socket.socket()
581n/a sock.bind(('127.0.0.1', 0))
582n/a self.server = self.loop.run_until_complete(
583n/a asyncio.start_server(self.handle_client,
584n/a sock=sock,
585n/a loop=self.loop))
586n/a return sock.getsockname()
587n/a
588n/a def handle_client_callback(self, client_reader, client_writer):
589n/a self.loop.create_task(self.handle_client(client_reader,
590n/a client_writer))
591n/a
592n/a def start_callback(self):
593n/a sock = socket.socket()
594n/a sock.bind(('127.0.0.1', 0))
595n/a addr = sock.getsockname()
596n/a sock.close()
597n/a self.server = self.loop.run_until_complete(
598n/a asyncio.start_server(self.handle_client_callback,
599n/a host=addr[0], port=addr[1],
600n/a loop=self.loop))
601n/a return addr
602n/a
603n/a def stop(self):
604n/a if self.server is not None:
605n/a self.server.close()
606n/a self.loop.run_until_complete(self.server.wait_closed())
607n/a self.server = None
608n/a
609n/a @asyncio.coroutine
610n/a def client(addr):
611n/a reader, writer = yield from asyncio.open_connection(
612n/a *addr, loop=self.loop)
613n/a # send a line
614n/a writer.write(b"hello world!\n")
615n/a # read it back
616n/a msgback = yield from reader.readline()
617n/a writer.close()
618n/a return msgback
619n/a
620n/a # test the server variant with a coroutine as client handler
621n/a server = MyServer(self.loop)
622n/a addr = server.start()
623n/a msg = self.loop.run_until_complete(asyncio.Task(client(addr),
624n/a loop=self.loop))
625n/a server.stop()
626n/a self.assertEqual(msg, b"hello world!\n")
627n/a
628n/a # test the server variant with a callback as client handler
629n/a server = MyServer(self.loop)
630n/a addr = server.start_callback()
631n/a msg = self.loop.run_until_complete(asyncio.Task(client(addr),
632n/a loop=self.loop))
633n/a server.stop()
634n/a self.assertEqual(msg, b"hello world!\n")
635n/a
636n/a @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
637n/a def test_start_unix_server(self):
638n/a
639n/a class MyServer:
640n/a
641n/a def __init__(self, loop, path):
642n/a self.server = None
643n/a self.loop = loop
644n/a self.path = path
645n/a
646n/a @asyncio.coroutine
647n/a def handle_client(self, client_reader, client_writer):
648n/a data = yield from client_reader.readline()
649n/a client_writer.write(data)
650n/a yield from client_writer.drain()
651n/a client_writer.close()
652n/a
653n/a def start(self):
654n/a self.server = self.loop.run_until_complete(
655n/a asyncio.start_unix_server(self.handle_client,
656n/a path=self.path,
657n/a loop=self.loop))
658n/a
659n/a def handle_client_callback(self, client_reader, client_writer):
660n/a self.loop.create_task(self.handle_client(client_reader,
661n/a client_writer))
662n/a
663n/a def start_callback(self):
664n/a start = asyncio.start_unix_server(self.handle_client_callback,
665n/a path=self.path,
666n/a loop=self.loop)
667n/a self.server = self.loop.run_until_complete(start)
668n/a
669n/a def stop(self):
670n/a if self.server is not None:
671n/a self.server.close()
672n/a self.loop.run_until_complete(self.server.wait_closed())
673n/a self.server = None
674n/a
675n/a @asyncio.coroutine
676n/a def client(path):
677n/a reader, writer = yield from asyncio.open_unix_connection(
678n/a path, loop=self.loop)
679n/a # send a line
680n/a writer.write(b"hello world!\n")
681n/a # read it back
682n/a msgback = yield from reader.readline()
683n/a writer.close()
684n/a return msgback
685n/a
686n/a # test the server variant with a coroutine as client handler
687n/a with test_utils.unix_socket_path() as path:
688n/a server = MyServer(self.loop, path)
689n/a server.start()
690n/a msg = self.loop.run_until_complete(asyncio.Task(client(path),
691n/a loop=self.loop))
692n/a server.stop()
693n/a self.assertEqual(msg, b"hello world!\n")
694n/a
695n/a # test the server variant with a callback as client handler
696n/a with test_utils.unix_socket_path() as path:
697n/a server = MyServer(self.loop, path)
698n/a server.start_callback()
699n/a msg = self.loop.run_until_complete(asyncio.Task(client(path),
700n/a loop=self.loop))
701n/a server.stop()
702n/a self.assertEqual(msg, b"hello world!\n")
703n/a
704n/a @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
705n/a def test_read_all_from_pipe_reader(self):
706n/a # See asyncio issue 168. This test is derived from the example
707n/a # subprocess_attach_read_pipe.py, but we configure the
708n/a # StreamReader's limit so that twice it is less than the size
709n/a # of the data writter. Also we must explicitly attach a child
710n/a # watcher to the event loop.
711n/a
712n/a code = """\
713n/aimport os, sys
714n/afd = int(sys.argv[1])
715n/aos.write(fd, b'data')
716n/aos.close(fd)
717n/a"""
718n/a rfd, wfd = os.pipe()
719n/a args = [sys.executable, '-c', code, str(wfd)]
720n/a
721n/a pipe = open(rfd, 'rb', 0)
722n/a reader = asyncio.StreamReader(loop=self.loop, limit=1)
723n/a protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
724n/a transport, _ = self.loop.run_until_complete(
725n/a self.loop.connect_read_pipe(lambda: protocol, pipe))
726n/a
727n/a watcher = asyncio.SafeChildWatcher()
728n/a watcher.attach_loop(self.loop)
729n/a try:
730n/a asyncio.set_child_watcher(watcher)
731n/a create = asyncio.create_subprocess_exec(*args,
732n/a pass_fds={wfd},
733n/a loop=self.loop)
734n/a proc = self.loop.run_until_complete(create)
735n/a self.loop.run_until_complete(proc.wait())
736n/a finally:
737n/a asyncio.set_child_watcher(None)
738n/a
739n/a os.close(wfd)
740n/a data = self.loop.run_until_complete(reader.read(-1))
741n/a self.assertEqual(data, b'data')
742n/a
743n/a def test_streamreader_constructor(self):
744n/a self.addCleanup(asyncio.set_event_loop, None)
745n/a asyncio.set_event_loop(self.loop)
746n/a
747n/a # asyncio issue #184: Ensure that StreamReaderProtocol constructor
748n/a # retrieves the current loop if the loop parameter is not set
749n/a reader = asyncio.StreamReader()
750n/a self.assertIs(reader._loop, self.loop)
751n/a
752n/a def test_streamreaderprotocol_constructor(self):
753n/a self.addCleanup(asyncio.set_event_loop, None)
754n/a asyncio.set_event_loop(self.loop)
755n/a
756n/a # asyncio issue #184: Ensure that StreamReaderProtocol constructor
757n/a # retrieves the current loop if the loop parameter is not set
758n/a reader = mock.Mock()
759n/a protocol = asyncio.StreamReaderProtocol(reader)
760n/a self.assertIs(protocol._loop, self.loop)
761n/a
762n/a def test_drain_raises(self):
763n/a # See http://bugs.python.org/issue25441
764n/a
765n/a # This test should not use asyncio for the mock server; the
766n/a # whole point of the test is to test for a bug in drain()
767n/a # where it never gives up the event loop but the socket is
768n/a # closed on the server side.
769n/a
770n/a q = queue.Queue()
771n/a
772n/a def server():
773n/a # Runs in a separate thread.
774n/a sock = socket.socket()
775n/a with sock:
776n/a sock.bind(('localhost', 0))
777n/a sock.listen(1)
778n/a addr = sock.getsockname()
779n/a q.put(addr)
780n/a clt, _ = sock.accept()
781n/a clt.close()
782n/a
783n/a @asyncio.coroutine
784n/a def client(host, port):
785n/a reader, writer = yield from asyncio.open_connection(
786n/a host, port, loop=self.loop)
787n/a
788n/a while True:
789n/a writer.write(b"foo\n")
790n/a yield from writer.drain()
791n/a
792n/a # Start the server thread and wait for it to be listening.
793n/a thread = threading.Thread(target=server)
794n/a thread.setDaemon(True)
795n/a thread.start()
796n/a addr = q.get()
797n/a
798n/a # Should not be stuck in an infinite loop.
799n/a with self.assertRaises((ConnectionResetError, BrokenPipeError)):
800n/a self.loop.run_until_complete(client(*addr))
801n/a
802n/a # Clean up the thread. (Only on success; on failure, it may
803n/a # be stuck in accept().)
804n/a thread.join()
805n/a
806n/a def test___repr__(self):
807n/a stream = asyncio.StreamReader(loop=self.loop)
808n/a self.assertEqual("<StreamReader>", repr(stream))
809n/a
810n/a def test___repr__nondefault_limit(self):
811n/a stream = asyncio.StreamReader(loop=self.loop, limit=123)
812n/a self.assertEqual("<StreamReader l=123>", repr(stream))
813n/a
814n/a def test___repr__eof(self):
815n/a stream = asyncio.StreamReader(loop=self.loop)
816n/a stream.feed_eof()
817n/a self.assertEqual("<StreamReader eof>", repr(stream))
818n/a
819n/a def test___repr__data(self):
820n/a stream = asyncio.StreamReader(loop=self.loop)
821n/a stream.feed_data(b'data')
822n/a self.assertEqual("<StreamReader 4 bytes>", repr(stream))
823n/a
824n/a def test___repr__exception(self):
825n/a stream = asyncio.StreamReader(loop=self.loop)
826n/a exc = RuntimeError()
827n/a stream.set_exception(exc)
828n/a self.assertEqual("<StreamReader e=RuntimeError()>", repr(stream))
829n/a
830n/a def test___repr__waiter(self):
831n/a stream = asyncio.StreamReader(loop=self.loop)
832n/a stream._waiter = asyncio.Future(loop=self.loop)
833n/a self.assertRegex(
834n/a repr(stream),
835n/a r"<StreamReader w=<Future pending[\S ]*>>")
836n/a stream._waiter.set_result(None)
837n/a self.loop.run_until_complete(stream._waiter)
838n/a stream._waiter = None
839n/a self.assertEqual("<StreamReader>", repr(stream))
840n/a
841n/a def test___repr__transport(self):
842n/a stream = asyncio.StreamReader(loop=self.loop)
843n/a stream._transport = mock.Mock()
844n/a stream._transport.__repr__ = mock.Mock()
845n/a stream._transport.__repr__.return_value = "<Transport>"
846n/a self.assertEqual("<StreamReader t=<Transport>>", repr(stream))
847n/a
848n/a
849n/aif __name__ == '__main__':
850n/a unittest.main()