ยปCore Development>Code coverage>Lib/test/test_queue.py

Python code coverage for Lib/test/test_queue.py

#countcontent
1n/a# Some simple queue module tests, plus some failure conditions
2n/a# to ensure the Queue locks remain stable.
3n/aimport queue
4n/aimport time
5n/aimport unittest
6n/afrom test import support
7n/athreading = support.import_module('threading')
8n/a
9n/aQUEUE_SIZE = 5
10n/a
11n/adef qfull(q):
12n/a return q.maxsize > 0 and q.qsize() == q.maxsize
13n/a
14n/a# A thread to run a function that unclogs a blocked Queue.
15n/aclass _TriggerThread(threading.Thread):
16n/a def __init__(self, fn, args):
17n/a self.fn = fn
18n/a self.args = args
19n/a self.startedEvent = threading.Event()
20n/a threading.Thread.__init__(self)
21n/a
22n/a def run(self):
23n/a # The sleep isn't necessary, but is intended to give the blocking
24n/a # function in the main thread a chance at actually blocking before
25n/a # we unclog it. But if the sleep is longer than the timeout-based
26n/a # tests wait in their blocking functions, those tests will fail.
27n/a # So we give them much longer timeout values compared to the
28n/a # sleep here (I aimed at 10 seconds for blocking functions --
29n/a # they should never actually wait that long - they should make
30n/a # progress as soon as we call self.fn()).
31n/a time.sleep(0.1)
32n/a self.startedEvent.set()
33n/a self.fn(*self.args)
34n/a
35n/a
36n/a# Execute a function that blocks, and in a separate thread, a function that
37n/a# triggers the release. Returns the result of the blocking function. Caution:
38n/a# block_func must guarantee to block until trigger_func is called, and
39n/a# trigger_func must guarantee to change queue state so that block_func can make
40n/a# enough progress to return. In particular, a block_func that just raises an
41n/a# exception regardless of whether trigger_func is called will lead to
42n/a# timing-dependent sporadic failures, and one of those went rarely seen but
43n/a# undiagnosed for years. Now block_func must be unexceptional. If block_func
44n/a# is supposed to raise an exception, call do_exceptional_blocking_test()
45n/a# instead.
46n/a
47n/aclass BlockingTestMixin:
48n/a
49n/a def tearDown(self):
50n/a self.t = None
51n/a
52n/a def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
53n/a self.t = _TriggerThread(trigger_func, trigger_args)
54n/a self.t.start()
55n/a self.result = block_func(*block_args)
56n/a # If block_func returned before our thread made the call, we failed!
57n/a if not self.t.startedEvent.is_set():
58n/a self.fail("blocking function '%r' appeared not to block" %
59n/a block_func)
60n/a self.t.join(10) # make sure the thread terminates
61n/a if self.t.is_alive():
62n/a self.fail("trigger function '%r' appeared to not return" %
63n/a trigger_func)
64n/a return self.result
65n/a
66n/a # Call this instead if block_func is supposed to raise an exception.
67n/a def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
68n/a trigger_args, expected_exception_class):
69n/a self.t = _TriggerThread(trigger_func, trigger_args)
70n/a self.t.start()
71n/a try:
72n/a try:
73n/a block_func(*block_args)
74n/a except expected_exception_class:
75n/a raise
76n/a else:
77n/a self.fail("expected exception of kind %r" %
78n/a expected_exception_class)
79n/a finally:
80n/a self.t.join(10) # make sure the thread terminates
81n/a if self.t.is_alive():
82n/a self.fail("trigger function '%r' appeared to not return" %
83n/a trigger_func)
84n/a if not self.t.startedEvent.is_set():
85n/a self.fail("trigger thread ended but event never set")
86n/a
87n/a
88n/aclass BaseQueueTestMixin(BlockingTestMixin):
89n/a def setUp(self):
90n/a self.cum = 0
91n/a self.cumlock = threading.Lock()
92n/a
93n/a def simple_queue_test(self, q):
94n/a if q.qsize():
95n/a raise RuntimeError("Call this function with an empty queue")
96n/a self.assertTrue(q.empty())
97n/a self.assertFalse(q.full())
98n/a # I guess we better check things actually queue correctly a little :)
99n/a q.put(111)
100n/a q.put(333)
101n/a q.put(222)
102n/a target_order = dict(Queue = [111, 333, 222],
103n/a LifoQueue = [222, 333, 111],
104n/a PriorityQueue = [111, 222, 333])
105n/a actual_order = [q.get(), q.get(), q.get()]
106n/a self.assertEqual(actual_order, target_order[q.__class__.__name__],
107n/a "Didn't seem to queue the correct data!")
108n/a for i in range(QUEUE_SIZE-1):
109n/a q.put(i)
110n/a self.assertTrue(q.qsize(), "Queue should not be empty")
111n/a self.assertTrue(not qfull(q), "Queue should not be full")
112n/a last = 2 * QUEUE_SIZE
113n/a full = 3 * 2 * QUEUE_SIZE
114n/a q.put(last)
115n/a self.assertTrue(qfull(q), "Queue should be full")
116n/a self.assertFalse(q.empty())
117n/a self.assertTrue(q.full())
118n/a try:
119n/a q.put(full, block=0)
120n/a self.fail("Didn't appear to block with a full queue")
121n/a except queue.Full:
122n/a pass
123n/a try:
124n/a q.put(full, timeout=0.01)
125n/a self.fail("Didn't appear to time-out with a full queue")
126n/a except queue.Full:
127n/a pass
128n/a # Test a blocking put
129n/a self.do_blocking_test(q.put, (full,), q.get, ())
130n/a self.do_blocking_test(q.put, (full, True, 10), q.get, ())
131n/a # Empty it
132n/a for i in range(QUEUE_SIZE):
133n/a q.get()
134n/a self.assertTrue(not q.qsize(), "Queue should be empty")
135n/a try:
136n/a q.get(block=0)
137n/a self.fail("Didn't appear to block with an empty queue")
138n/a except queue.Empty:
139n/a pass
140n/a try:
141n/a q.get(timeout=0.01)
142n/a self.fail("Didn't appear to time-out with an empty queue")
143n/a except queue.Empty:
144n/a pass
145n/a # Test a blocking get
146n/a self.do_blocking_test(q.get, (), q.put, ('empty',))
147n/a self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
148n/a
149n/a
150n/a def worker(self, q):
151n/a while True:
152n/a x = q.get()
153n/a if x < 0:
154n/a q.task_done()
155n/a return
156n/a with self.cumlock:
157n/a self.cum += x
158n/a q.task_done()
159n/a
160n/a def queue_join_test(self, q):
161n/a self.cum = 0
162n/a for i in (0,1):
163n/a threading.Thread(target=self.worker, args=(q,)).start()
164n/a for i in range(100):
165n/a q.put(i)
166n/a q.join()
167n/a self.assertEqual(self.cum, sum(range(100)),
168n/a "q.join() did not block until all tasks were done")
169n/a for i in (0,1):
170n/a q.put(-1) # instruct the threads to close
171n/a q.join() # verify that you can join twice
172n/a
173n/a def test_queue_task_done(self):
174n/a # Test to make sure a queue task completed successfully.
175n/a q = self.type2test()
176n/a try:
177n/a q.task_done()
178n/a except ValueError:
179n/a pass
180n/a else:
181n/a self.fail("Did not detect task count going negative")
182n/a
183n/a def test_queue_join(self):
184n/a # Test that a queue join()s successfully, and before anything else
185n/a # (done twice for insurance).
186n/a q = self.type2test()
187n/a self.queue_join_test(q)
188n/a self.queue_join_test(q)
189n/a try:
190n/a q.task_done()
191n/a except ValueError:
192n/a pass
193n/a else:
194n/a self.fail("Did not detect task count going negative")
195n/a
196n/a def test_simple_queue(self):
197n/a # Do it a couple of times on the same queue.
198n/a # Done twice to make sure works with same instance reused.
199n/a q = self.type2test(QUEUE_SIZE)
200n/a self.simple_queue_test(q)
201n/a self.simple_queue_test(q)
202n/a
203n/a def test_negative_timeout_raises_exception(self):
204n/a q = self.type2test(QUEUE_SIZE)
205n/a with self.assertRaises(ValueError):
206n/a q.put(1, timeout=-1)
207n/a with self.assertRaises(ValueError):
208n/a q.get(1, timeout=-1)
209n/a
210n/a def test_nowait(self):
211n/a q = self.type2test(QUEUE_SIZE)
212n/a for i in range(QUEUE_SIZE):
213n/a q.put_nowait(1)
214n/a with self.assertRaises(queue.Full):
215n/a q.put_nowait(1)
216n/a
217n/a for i in range(QUEUE_SIZE):
218n/a q.get_nowait()
219n/a with self.assertRaises(queue.Empty):
220n/a q.get_nowait()
221n/a
222n/a def test_shrinking_queue(self):
223n/a # issue 10110
224n/a q = self.type2test(3)
225n/a q.put(1)
226n/a q.put(2)
227n/a q.put(3)
228n/a with self.assertRaises(queue.Full):
229n/a q.put_nowait(4)
230n/a self.assertEqual(q.qsize(), 3)
231n/a q.maxsize = 2 # shrink the queue
232n/a with self.assertRaises(queue.Full):
233n/a q.put_nowait(4)
234n/a
235n/aclass QueueTest(BaseQueueTestMixin, unittest.TestCase):
236n/a type2test = queue.Queue
237n/a
238n/aclass LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
239n/a type2test = queue.LifoQueue
240n/a
241n/aclass PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
242n/a type2test = queue.PriorityQueue
243n/a
244n/a
245n/a
246n/a# A Queue subclass that can provoke failure at a moment's notice :)
247n/aclass FailingQueueException(Exception):
248n/a pass
249n/a
250n/aclass FailingQueue(queue.Queue):
251n/a def __init__(self, *args):
252n/a self.fail_next_put = False
253n/a self.fail_next_get = False
254n/a queue.Queue.__init__(self, *args)
255n/a def _put(self, item):
256n/a if self.fail_next_put:
257n/a self.fail_next_put = False
258n/a raise FailingQueueException("You Lose")
259n/a return queue.Queue._put(self, item)
260n/a def _get(self):
261n/a if self.fail_next_get:
262n/a self.fail_next_get = False
263n/a raise FailingQueueException("You Lose")
264n/a return queue.Queue._get(self)
265n/a
266n/aclass FailingQueueTest(BlockingTestMixin, unittest.TestCase):
267n/a
268n/a def failing_queue_test(self, q):
269n/a if q.qsize():
270n/a raise RuntimeError("Call this function with an empty queue")
271n/a for i in range(QUEUE_SIZE-1):
272n/a q.put(i)
273n/a # Test a failing non-blocking put.
274n/a q.fail_next_put = True
275n/a try:
276n/a q.put("oops", block=0)
277n/a self.fail("The queue didn't fail when it should have")
278n/a except FailingQueueException:
279n/a pass
280n/a q.fail_next_put = True
281n/a try:
282n/a q.put("oops", timeout=0.1)
283n/a self.fail("The queue didn't fail when it should have")
284n/a except FailingQueueException:
285n/a pass
286n/a q.put("last")
287n/a self.assertTrue(qfull(q), "Queue should be full")
288n/a # Test a failing blocking put
289n/a q.fail_next_put = True
290n/a try:
291n/a self.do_blocking_test(q.put, ("full",), q.get, ())
292n/a self.fail("The queue didn't fail when it should have")
293n/a except FailingQueueException:
294n/a pass
295n/a # Check the Queue isn't damaged.
296n/a # put failed, but get succeeded - re-add
297n/a q.put("last")
298n/a # Test a failing timeout put
299n/a q.fail_next_put = True
300n/a try:
301n/a self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
302n/a FailingQueueException)
303n/a self.fail("The queue didn't fail when it should have")
304n/a except FailingQueueException:
305n/a pass
306n/a # Check the Queue isn't damaged.
307n/a # put failed, but get succeeded - re-add
308n/a q.put("last")
309n/a self.assertTrue(qfull(q), "Queue should be full")
310n/a q.get()
311n/a self.assertTrue(not qfull(q), "Queue should not be full")
312n/a q.put("last")
313n/a self.assertTrue(qfull(q), "Queue should be full")
314n/a # Test a blocking put
315n/a self.do_blocking_test(q.put, ("full",), q.get, ())
316n/a # Empty it
317n/a for i in range(QUEUE_SIZE):
318n/a q.get()
319n/a self.assertTrue(not q.qsize(), "Queue should be empty")
320n/a q.put("first")
321n/a q.fail_next_get = True
322n/a try:
323n/a q.get()
324n/a self.fail("The queue didn't fail when it should have")
325n/a except FailingQueueException:
326n/a pass
327n/a self.assertTrue(q.qsize(), "Queue should not be empty")
328n/a q.fail_next_get = True
329n/a try:
330n/a q.get(timeout=0.1)
331n/a self.fail("The queue didn't fail when it should have")
332n/a except FailingQueueException:
333n/a pass
334n/a self.assertTrue(q.qsize(), "Queue should not be empty")
335n/a q.get()
336n/a self.assertTrue(not q.qsize(), "Queue should be empty")
337n/a q.fail_next_get = True
338n/a try:
339n/a self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
340n/a FailingQueueException)
341n/a self.fail("The queue didn't fail when it should have")
342n/a except FailingQueueException:
343n/a pass
344n/a # put succeeded, but get failed.
345n/a self.assertTrue(q.qsize(), "Queue should not be empty")
346n/a q.get()
347n/a self.assertTrue(not q.qsize(), "Queue should be empty")
348n/a
349n/a def test_failing_queue(self):
350n/a # Test to make sure a queue is functioning correctly.
351n/a # Done twice to the same instance.
352n/a q = FailingQueue(QUEUE_SIZE)
353n/a self.failing_queue_test(q)
354n/a self.failing_queue_test(q)
355n/a
356n/a
357n/aif __name__ == "__main__":
358n/a unittest.main()