ยปCore Development>Code coverage>Lib/test/test_asyncio/test_queues.py

Python code coverage for Lib/test/test_asyncio/test_queues.py

#countcontent
1n/a"""Tests for queues.py"""
2n/a
3n/aimport unittest
4n/afrom unittest import mock
5n/a
6n/aimport asyncio
7n/afrom asyncio import test_utils
8n/a
9n/a
10n/aclass _QueueTestBase(test_utils.TestCase):
11n/a
12n/a def setUp(self):
13n/a super().setUp()
14n/a self.loop = self.new_test_loop()
15n/a
16n/a
17n/aclass QueueBasicTests(_QueueTestBase):
18n/a
19n/a def _test_repr_or_str(self, fn, expect_id):
20n/a """Test Queue's repr or str.
21n/a
22n/a fn is repr or str. expect_id is True if we expect the Queue's id to
23n/a appear in fn(Queue()).
24n/a """
25n/a def gen():
26n/a when = yield
27n/a self.assertAlmostEqual(0.1, when)
28n/a when = yield 0.1
29n/a self.assertAlmostEqual(0.2, when)
30n/a yield 0.1
31n/a
32n/a loop = self.new_test_loop(gen)
33n/a
34n/a q = asyncio.Queue(loop=loop)
35n/a self.assertTrue(fn(q).startswith('<Queue'), fn(q))
36n/a id_is_present = hex(id(q)) in fn(q)
37n/a self.assertEqual(expect_id, id_is_present)
38n/a
39n/a @asyncio.coroutine
40n/a def add_getter():
41n/a q = asyncio.Queue(loop=loop)
42n/a # Start a task that waits to get.
43n/a asyncio.Task(q.get(), loop=loop)
44n/a # Let it start waiting.
45n/a yield from asyncio.sleep(0.1, loop=loop)
46n/a self.assertTrue('_getters[1]' in fn(q))
47n/a # resume q.get coroutine to finish generator
48n/a q.put_nowait(0)
49n/a
50n/a loop.run_until_complete(add_getter())
51n/a
52n/a @asyncio.coroutine
53n/a def add_putter():
54n/a q = asyncio.Queue(maxsize=1, loop=loop)
55n/a q.put_nowait(1)
56n/a # Start a task that waits to put.
57n/a asyncio.Task(q.put(2), loop=loop)
58n/a # Let it start waiting.
59n/a yield from asyncio.sleep(0.1, loop=loop)
60n/a self.assertTrue('_putters[1]' in fn(q))
61n/a # resume q.put coroutine to finish generator
62n/a q.get_nowait()
63n/a
64n/a loop.run_until_complete(add_putter())
65n/a
66n/a q = asyncio.Queue(loop=loop)
67n/a q.put_nowait(1)
68n/a self.assertTrue('_queue=[1]' in fn(q))
69n/a
70n/a def test_ctor_loop(self):
71n/a loop = mock.Mock()
72n/a q = asyncio.Queue(loop=loop)
73n/a self.assertIs(q._loop, loop)
74n/a
75n/a q = asyncio.Queue(loop=self.loop)
76n/a self.assertIs(q._loop, self.loop)
77n/a
78n/a def test_ctor_noloop(self):
79n/a asyncio.set_event_loop(self.loop)
80n/a q = asyncio.Queue()
81n/a self.assertIs(q._loop, self.loop)
82n/a
83n/a def test_repr(self):
84n/a self._test_repr_or_str(repr, True)
85n/a
86n/a def test_str(self):
87n/a self._test_repr_or_str(str, False)
88n/a
89n/a def test_empty(self):
90n/a q = asyncio.Queue(loop=self.loop)
91n/a self.assertTrue(q.empty())
92n/a q.put_nowait(1)
93n/a self.assertFalse(q.empty())
94n/a self.assertEqual(1, q.get_nowait())
95n/a self.assertTrue(q.empty())
96n/a
97n/a def test_full(self):
98n/a q = asyncio.Queue(loop=self.loop)
99n/a self.assertFalse(q.full())
100n/a
101n/a q = asyncio.Queue(maxsize=1, loop=self.loop)
102n/a q.put_nowait(1)
103n/a self.assertTrue(q.full())
104n/a
105n/a def test_order(self):
106n/a q = asyncio.Queue(loop=self.loop)
107n/a for i in [1, 3, 2]:
108n/a q.put_nowait(i)
109n/a
110n/a items = [q.get_nowait() for _ in range(3)]
111n/a self.assertEqual([1, 3, 2], items)
112n/a
113n/a def test_maxsize(self):
114n/a
115n/a def gen():
116n/a when = yield
117n/a self.assertAlmostEqual(0.01, when)
118n/a when = yield 0.01
119n/a self.assertAlmostEqual(0.02, when)
120n/a yield 0.01
121n/a
122n/a loop = self.new_test_loop(gen)
123n/a
124n/a q = asyncio.Queue(maxsize=2, loop=loop)
125n/a self.assertEqual(2, q.maxsize)
126n/a have_been_put = []
127n/a
128n/a @asyncio.coroutine
129n/a def putter():
130n/a for i in range(3):
131n/a yield from q.put(i)
132n/a have_been_put.append(i)
133n/a return True
134n/a
135n/a @asyncio.coroutine
136n/a def test():
137n/a t = asyncio.Task(putter(), loop=loop)
138n/a yield from asyncio.sleep(0.01, loop=loop)
139n/a
140n/a # The putter is blocked after putting two items.
141n/a self.assertEqual([0, 1], have_been_put)
142n/a self.assertEqual(0, q.get_nowait())
143n/a
144n/a # Let the putter resume and put last item.
145n/a yield from asyncio.sleep(0.01, loop=loop)
146n/a self.assertEqual([0, 1, 2], have_been_put)
147n/a self.assertEqual(1, q.get_nowait())
148n/a self.assertEqual(2, q.get_nowait())
149n/a
150n/a self.assertTrue(t.done())
151n/a self.assertTrue(t.result())
152n/a
153n/a loop.run_until_complete(test())
154n/a self.assertAlmostEqual(0.02, loop.time())
155n/a
156n/a
157n/aclass QueueGetTests(_QueueTestBase):
158n/a
159n/a def test_blocking_get(self):
160n/a q = asyncio.Queue(loop=self.loop)
161n/a q.put_nowait(1)
162n/a
163n/a @asyncio.coroutine
164n/a def queue_get():
165n/a return (yield from q.get())
166n/a
167n/a res = self.loop.run_until_complete(queue_get())
168n/a self.assertEqual(1, res)
169n/a
170n/a def test_get_with_putters(self):
171n/a q = asyncio.Queue(1, loop=self.loop)
172n/a q.put_nowait(1)
173n/a
174n/a waiter = asyncio.Future(loop=self.loop)
175n/a q._putters.append(waiter)
176n/a
177n/a res = self.loop.run_until_complete(q.get())
178n/a self.assertEqual(1, res)
179n/a self.assertTrue(waiter.done())
180n/a self.assertIsNone(waiter.result())
181n/a
182n/a def test_blocking_get_wait(self):
183n/a
184n/a def gen():
185n/a when = yield
186n/a self.assertAlmostEqual(0.01, when)
187n/a yield 0.01
188n/a
189n/a loop = self.new_test_loop(gen)
190n/a
191n/a q = asyncio.Queue(loop=loop)
192n/a started = asyncio.Event(loop=loop)
193n/a finished = False
194n/a
195n/a @asyncio.coroutine
196n/a def queue_get():
197n/a nonlocal finished
198n/a started.set()
199n/a res = yield from q.get()
200n/a finished = True
201n/a return res
202n/a
203n/a @asyncio.coroutine
204n/a def queue_put():
205n/a loop.call_later(0.01, q.put_nowait, 1)
206n/a queue_get_task = asyncio.Task(queue_get(), loop=loop)
207n/a yield from started.wait()
208n/a self.assertFalse(finished)
209n/a res = yield from queue_get_task
210n/a self.assertTrue(finished)
211n/a return res
212n/a
213n/a res = loop.run_until_complete(queue_put())
214n/a self.assertEqual(1, res)
215n/a self.assertAlmostEqual(0.01, loop.time())
216n/a
217n/a def test_nonblocking_get(self):
218n/a q = asyncio.Queue(loop=self.loop)
219n/a q.put_nowait(1)
220n/a self.assertEqual(1, q.get_nowait())
221n/a
222n/a def test_nonblocking_get_exception(self):
223n/a q = asyncio.Queue(loop=self.loop)
224n/a self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
225n/a
226n/a def test_get_cancelled(self):
227n/a
228n/a def gen():
229n/a when = yield
230n/a self.assertAlmostEqual(0.01, when)
231n/a when = yield 0.01
232n/a self.assertAlmostEqual(0.061, when)
233n/a yield 0.05
234n/a
235n/a loop = self.new_test_loop(gen)
236n/a
237n/a q = asyncio.Queue(loop=loop)
238n/a
239n/a @asyncio.coroutine
240n/a def queue_get():
241n/a return (yield from asyncio.wait_for(q.get(), 0.051, loop=loop))
242n/a
243n/a @asyncio.coroutine
244n/a def test():
245n/a get_task = asyncio.Task(queue_get(), loop=loop)
246n/a yield from asyncio.sleep(0.01, loop=loop) # let the task start
247n/a q.put_nowait(1)
248n/a return (yield from get_task)
249n/a
250n/a self.assertEqual(1, loop.run_until_complete(test()))
251n/a self.assertAlmostEqual(0.06, loop.time())
252n/a
253n/a def test_get_cancelled_race(self):
254n/a q = asyncio.Queue(loop=self.loop)
255n/a
256n/a t1 = asyncio.Task(q.get(), loop=self.loop)
257n/a t2 = asyncio.Task(q.get(), loop=self.loop)
258n/a
259n/a test_utils.run_briefly(self.loop)
260n/a t1.cancel()
261n/a test_utils.run_briefly(self.loop)
262n/a self.assertTrue(t1.done())
263n/a q.put_nowait('a')
264n/a test_utils.run_briefly(self.loop)
265n/a self.assertEqual(t2.result(), 'a')
266n/a
267n/a def test_get_with_waiting_putters(self):
268n/a q = asyncio.Queue(loop=self.loop, maxsize=1)
269n/a asyncio.Task(q.put('a'), loop=self.loop)
270n/a asyncio.Task(q.put('b'), loop=self.loop)
271n/a test_utils.run_briefly(self.loop)
272n/a self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
273n/a self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
274n/a
275n/a def test_why_are_getters_waiting(self):
276n/a # From issue #268.
277n/a
278n/a @asyncio.coroutine
279n/a def consumer(queue, num_expected):
280n/a for _ in range(num_expected):
281n/a yield from queue.get()
282n/a
283n/a @asyncio.coroutine
284n/a def producer(queue, num_items):
285n/a for i in range(num_items):
286n/a yield from queue.put(i)
287n/a
288n/a queue_size = 1
289n/a producer_num_items = 5
290n/a q = asyncio.Queue(queue_size, loop=self.loop)
291n/a
292n/a self.loop.run_until_complete(
293n/a asyncio.gather(producer(q, producer_num_items),
294n/a consumer(q, producer_num_items),
295n/a loop=self.loop),
296n/a )
297n/a
298n/a
299n/aclass QueuePutTests(_QueueTestBase):
300n/a
301n/a def test_blocking_put(self):
302n/a q = asyncio.Queue(loop=self.loop)
303n/a
304n/a @asyncio.coroutine
305n/a def queue_put():
306n/a # No maxsize, won't block.
307n/a yield from q.put(1)
308n/a
309n/a self.loop.run_until_complete(queue_put())
310n/a
311n/a def test_blocking_put_wait(self):
312n/a
313n/a def gen():
314n/a when = yield
315n/a self.assertAlmostEqual(0.01, when)
316n/a yield 0.01
317n/a
318n/a loop = self.new_test_loop(gen)
319n/a
320n/a q = asyncio.Queue(maxsize=1, loop=loop)
321n/a started = asyncio.Event(loop=loop)
322n/a finished = False
323n/a
324n/a @asyncio.coroutine
325n/a def queue_put():
326n/a nonlocal finished
327n/a started.set()
328n/a yield from q.put(1)
329n/a yield from q.put(2)
330n/a finished = True
331n/a
332n/a @asyncio.coroutine
333n/a def queue_get():
334n/a loop.call_later(0.01, q.get_nowait)
335n/a queue_put_task = asyncio.Task(queue_put(), loop=loop)
336n/a yield from started.wait()
337n/a self.assertFalse(finished)
338n/a yield from queue_put_task
339n/a self.assertTrue(finished)
340n/a
341n/a loop.run_until_complete(queue_get())
342n/a self.assertAlmostEqual(0.01, loop.time())
343n/a
344n/a def test_nonblocking_put(self):
345n/a q = asyncio.Queue(loop=self.loop)
346n/a q.put_nowait(1)
347n/a self.assertEqual(1, q.get_nowait())
348n/a
349n/a def test_get_cancel_drop_one_pending_reader(self):
350n/a def gen():
351n/a yield 0.01
352n/a yield 0.1
353n/a
354n/a loop = self.new_test_loop(gen)
355n/a
356n/a q = asyncio.Queue(loop=loop)
357n/a
358n/a reader = loop.create_task(q.get())
359n/a
360n/a loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
361n/a
362n/a q.put_nowait(1)
363n/a q.put_nowait(2)
364n/a reader.cancel()
365n/a
366n/a try:
367n/a loop.run_until_complete(reader)
368n/a except asyncio.CancelledError:
369n/a # try again
370n/a reader = loop.create_task(q.get())
371n/a loop.run_until_complete(reader)
372n/a
373n/a result = reader.result()
374n/a # if we get 2, it means 1 got dropped!
375n/a self.assertEqual(1, result)
376n/a
377n/a def test_get_cancel_drop_many_pending_readers(self):
378n/a def gen():
379n/a yield 0.01
380n/a yield 0.1
381n/a
382n/a loop = self.new_test_loop(gen)
383n/a loop.set_debug(True)
384n/a
385n/a q = asyncio.Queue(loop=loop)
386n/a
387n/a reader1 = loop.create_task(q.get())
388n/a reader2 = loop.create_task(q.get())
389n/a reader3 = loop.create_task(q.get())
390n/a
391n/a loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
392n/a
393n/a q.put_nowait(1)
394n/a q.put_nowait(2)
395n/a reader1.cancel()
396n/a
397n/a try:
398n/a loop.run_until_complete(reader1)
399n/a except asyncio.CancelledError:
400n/a pass
401n/a
402n/a loop.run_until_complete(reader3)
403n/a
404n/a # It is undefined in which order concurrent readers receive results.
405n/a self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
406n/a
407n/a def test_put_cancel_drop(self):
408n/a
409n/a def gen():
410n/a yield 0.01
411n/a yield 0.1
412n/a
413n/a loop = self.new_test_loop(gen)
414n/a q = asyncio.Queue(1, loop=loop)
415n/a
416n/a q.put_nowait(1)
417n/a
418n/a # putting a second item in the queue has to block (qsize=1)
419n/a writer = loop.create_task(q.put(2))
420n/a loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
421n/a
422n/a value1 = q.get_nowait()
423n/a self.assertEqual(value1, 1)
424n/a
425n/a writer.cancel()
426n/a try:
427n/a loop.run_until_complete(writer)
428n/a except asyncio.CancelledError:
429n/a # try again
430n/a writer = loop.create_task(q.put(2))
431n/a loop.run_until_complete(writer)
432n/a
433n/a value2 = q.get_nowait()
434n/a self.assertEqual(value2, 2)
435n/a self.assertEqual(q.qsize(), 0)
436n/a
437n/a def test_nonblocking_put_exception(self):
438n/a q = asyncio.Queue(maxsize=1, loop=self.loop)
439n/a q.put_nowait(1)
440n/a self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
441n/a
442n/a def test_float_maxsize(self):
443n/a q = asyncio.Queue(maxsize=1.3, loop=self.loop)
444n/a q.put_nowait(1)
445n/a q.put_nowait(2)
446n/a self.assertTrue(q.full())
447n/a self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
448n/a
449n/a q = asyncio.Queue(maxsize=1.3, loop=self.loop)
450n/a @asyncio.coroutine
451n/a def queue_put():
452n/a yield from q.put(1)
453n/a yield from q.put(2)
454n/a self.assertTrue(q.full())
455n/a self.loop.run_until_complete(queue_put())
456n/a
457n/a def test_put_cancelled(self):
458n/a q = asyncio.Queue(loop=self.loop)
459n/a
460n/a @asyncio.coroutine
461n/a def queue_put():
462n/a yield from q.put(1)
463n/a return True
464n/a
465n/a @asyncio.coroutine
466n/a def test():
467n/a return (yield from q.get())
468n/a
469n/a t = asyncio.Task(queue_put(), loop=self.loop)
470n/a self.assertEqual(1, self.loop.run_until_complete(test()))
471n/a self.assertTrue(t.done())
472n/a self.assertTrue(t.result())
473n/a
474n/a def test_put_cancelled_race(self):
475n/a q = asyncio.Queue(loop=self.loop, maxsize=1)
476n/a
477n/a put_a = asyncio.Task(q.put('a'), loop=self.loop)
478n/a put_b = asyncio.Task(q.put('b'), loop=self.loop)
479n/a put_c = asyncio.Task(q.put('X'), loop=self.loop)
480n/a
481n/a test_utils.run_briefly(self.loop)
482n/a self.assertTrue(put_a.done())
483n/a self.assertFalse(put_b.done())
484n/a
485n/a put_c.cancel()
486n/a test_utils.run_briefly(self.loop)
487n/a self.assertTrue(put_c.done())
488n/a self.assertEqual(q.get_nowait(), 'a')
489n/a test_utils.run_briefly(self.loop)
490n/a self.assertEqual(q.get_nowait(), 'b')
491n/a
492n/a self.loop.run_until_complete(put_b)
493n/a
494n/a def test_put_with_waiting_getters(self):
495n/a q = asyncio.Queue(loop=self.loop)
496n/a t = asyncio.Task(q.get(), loop=self.loop)
497n/a test_utils.run_briefly(self.loop)
498n/a self.loop.run_until_complete(q.put('a'))
499n/a self.assertEqual(self.loop.run_until_complete(t), 'a')
500n/a
501n/a def test_why_are_putters_waiting(self):
502n/a # From issue #265.
503n/a
504n/a queue = asyncio.Queue(2, loop=self.loop)
505n/a
506n/a @asyncio.coroutine
507n/a def putter(item):
508n/a yield from queue.put(item)
509n/a
510n/a @asyncio.coroutine
511n/a def getter():
512n/a yield
513n/a num = queue.qsize()
514n/a for _ in range(num):
515n/a item = queue.get_nowait()
516n/a
517n/a t0 = putter(0)
518n/a t1 = putter(1)
519n/a t2 = putter(2)
520n/a t3 = putter(3)
521n/a self.loop.run_until_complete(
522n/a asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
523n/a
524n/a
525n/aclass LifoQueueTests(_QueueTestBase):
526n/a
527n/a def test_order(self):
528n/a q = asyncio.LifoQueue(loop=self.loop)
529n/a for i in [1, 3, 2]:
530n/a q.put_nowait(i)
531n/a
532n/a items = [q.get_nowait() for _ in range(3)]
533n/a self.assertEqual([2, 3, 1], items)
534n/a
535n/a
536n/aclass PriorityQueueTests(_QueueTestBase):
537n/a
538n/a def test_order(self):
539n/a q = asyncio.PriorityQueue(loop=self.loop)
540n/a for i in [1, 3, 2]:
541n/a q.put_nowait(i)
542n/a
543n/a items = [q.get_nowait() for _ in range(3)]
544n/a self.assertEqual([1, 2, 3], items)
545n/a
546n/a
547n/aclass _QueueJoinTestMixin:
548n/a
549n/a q_class = None
550n/a
551n/a def test_task_done_underflow(self):
552n/a q = self.q_class(loop=self.loop)
553n/a self.assertRaises(ValueError, q.task_done)
554n/a
555n/a def test_task_done(self):
556n/a q = self.q_class(loop=self.loop)
557n/a for i in range(100):
558n/a q.put_nowait(i)
559n/a
560n/a accumulator = 0
561n/a
562n/a # Two workers get items from the queue and call task_done after each.
563n/a # Join the queue and assert all items have been processed.
564n/a running = True
565n/a
566n/a @asyncio.coroutine
567n/a def worker():
568n/a nonlocal accumulator
569n/a
570n/a while running:
571n/a item = yield from q.get()
572n/a accumulator += item
573n/a q.task_done()
574n/a
575n/a @asyncio.coroutine
576n/a def test():
577n/a tasks = [asyncio.Task(worker(), loop=self.loop)
578n/a for index in range(2)]
579n/a
580n/a yield from q.join()
581n/a return tasks
582n/a
583n/a tasks = self.loop.run_until_complete(test())
584n/a self.assertEqual(sum(range(100)), accumulator)
585n/a
586n/a # close running generators
587n/a running = False
588n/a for i in range(len(tasks)):
589n/a q.put_nowait(0)
590n/a self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
591n/a
592n/a def test_join_empty_queue(self):
593n/a q = self.q_class(loop=self.loop)
594n/a
595n/a # Test that a queue join()s successfully, and before anything else
596n/a # (done twice for insurance).
597n/a
598n/a @asyncio.coroutine
599n/a def join():
600n/a yield from q.join()
601n/a yield from q.join()
602n/a
603n/a self.loop.run_until_complete(join())
604n/a
605n/a def test_format(self):
606n/a q = self.q_class(loop=self.loop)
607n/a self.assertEqual(q._format(), 'maxsize=0')
608n/a
609n/a q._unfinished_tasks = 2
610n/a self.assertEqual(q._format(), 'maxsize=0 tasks=2')
611n/a
612n/a
613n/aclass QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
614n/a q_class = asyncio.Queue
615n/a
616n/a
617n/aclass LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
618n/a q_class = asyncio.LifoQueue
619n/a
620n/a
621n/aclass PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
622n/a q_class = asyncio.PriorityQueue
623n/a
624n/a
625n/aif __name__ == '__main__':
626n/a unittest.main()