| 1 | n/a | # Some simple queue module tests, plus some failure conditions |
|---|
| 2 | n/a | # to ensure the Queue locks remain stable. |
|---|
| 3 | n/a | import queue |
|---|
| 4 | n/a | import time |
|---|
| 5 | n/a | import unittest |
|---|
| 6 | n/a | from test import support |
|---|
| 7 | n/a | threading = support.import_module('threading') |
|---|
| 8 | n/a | |
|---|
| 9 | n/a | QUEUE_SIZE = 5 |
|---|
| 10 | n/a | |
|---|
| 11 | n/a | def qfull(q): |
|---|
| 12 | n/a | return q.maxsize > 0 and q.qsize() == q.maxsize |
|---|
| 13 | n/a | |
|---|
| 14 | n/a | # A thread to run a function that unclogs a blocked Queue. |
|---|
| 15 | n/a | class _TriggerThread(threading.Thread): |
|---|
| 16 | n/a | def __init__(self, fn, args): |
|---|
| 17 | n/a | self.fn = fn |
|---|
| 18 | n/a | self.args = args |
|---|
| 19 | n/a | self.startedEvent = threading.Event() |
|---|
| 20 | n/a | threading.Thread.__init__(self) |
|---|
| 21 | n/a | |
|---|
| 22 | n/a | def run(self): |
|---|
| 23 | n/a | # The sleep isn't necessary, but is intended to give the blocking |
|---|
| 24 | n/a | # function in the main thread a chance at actually blocking before |
|---|
| 25 | n/a | # we unclog it. But if the sleep is longer than the timeout-based |
|---|
| 26 | n/a | # tests wait in their blocking functions, those tests will fail. |
|---|
| 27 | n/a | # So we give them much longer timeout values compared to the |
|---|
| 28 | n/a | # sleep here (I aimed at 10 seconds for blocking functions -- |
|---|
| 29 | n/a | # they should never actually wait that long - they should make |
|---|
| 30 | n/a | # progress as soon as we call self.fn()). |
|---|
| 31 | n/a | time.sleep(0.1) |
|---|
| 32 | n/a | self.startedEvent.set() |
|---|
| 33 | n/a | self.fn(*self.args) |
|---|
| 34 | n/a | |
|---|
| 35 | n/a | |
|---|
| 36 | n/a | # Execute a function that blocks, and in a separate thread, a function that |
|---|
| 37 | n/a | # triggers the release. Returns the result of the blocking function. Caution: |
|---|
| 38 | n/a | # block_func must guarantee to block until trigger_func is called, and |
|---|
| 39 | n/a | # trigger_func must guarantee to change queue state so that block_func can make |
|---|
| 40 | n/a | # enough progress to return. In particular, a block_func that just raises an |
|---|
| 41 | n/a | # exception regardless of whether trigger_func is called will lead to |
|---|
| 42 | n/a | # timing-dependent sporadic failures, and one of those went rarely seen but |
|---|
| 43 | n/a | # undiagnosed for years. Now block_func must be unexceptional. If block_func |
|---|
| 44 | n/a | # is supposed to raise an exception, call do_exceptional_blocking_test() |
|---|
| 45 | n/a | # instead. |
|---|
| 46 | n/a | |
|---|
| 47 | n/a | class BlockingTestMixin: |
|---|
| 48 | n/a | |
|---|
| 49 | n/a | def tearDown(self): |
|---|
| 50 | n/a | self.t = None |
|---|
| 51 | n/a | |
|---|
| 52 | n/a | def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): |
|---|
| 53 | n/a | self.t = _TriggerThread(trigger_func, trigger_args) |
|---|
| 54 | n/a | self.t.start() |
|---|
| 55 | n/a | self.result = block_func(*block_args) |
|---|
| 56 | n/a | # If block_func returned before our thread made the call, we failed! |
|---|
| 57 | n/a | if not self.t.startedEvent.is_set(): |
|---|
| 58 | n/a | self.fail("blocking function '%r' appeared not to block" % |
|---|
| 59 | n/a | block_func) |
|---|
| 60 | n/a | self.t.join(10) # make sure the thread terminates |
|---|
| 61 | n/a | if self.t.is_alive(): |
|---|
| 62 | n/a | self.fail("trigger function '%r' appeared to not return" % |
|---|
| 63 | n/a | trigger_func) |
|---|
| 64 | n/a | return self.result |
|---|
| 65 | n/a | |
|---|
| 66 | n/a | # Call this instead if block_func is supposed to raise an exception. |
|---|
| 67 | n/a | def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, |
|---|
| 68 | n/a | trigger_args, expected_exception_class): |
|---|
| 69 | n/a | self.t = _TriggerThread(trigger_func, trigger_args) |
|---|
| 70 | n/a | self.t.start() |
|---|
| 71 | n/a | try: |
|---|
| 72 | n/a | try: |
|---|
| 73 | n/a | block_func(*block_args) |
|---|
| 74 | n/a | except expected_exception_class: |
|---|
| 75 | n/a | raise |
|---|
| 76 | n/a | else: |
|---|
| 77 | n/a | self.fail("expected exception of kind %r" % |
|---|
| 78 | n/a | expected_exception_class) |
|---|
| 79 | n/a | finally: |
|---|
| 80 | n/a | self.t.join(10) # make sure the thread terminates |
|---|
| 81 | n/a | if self.t.is_alive(): |
|---|
| 82 | n/a | self.fail("trigger function '%r' appeared to not return" % |
|---|
| 83 | n/a | trigger_func) |
|---|
| 84 | n/a | if not self.t.startedEvent.is_set(): |
|---|
| 85 | n/a | self.fail("trigger thread ended but event never set") |
|---|
| 86 | n/a | |
|---|
| 87 | n/a | |
|---|
| 88 | n/a | class BaseQueueTestMixin(BlockingTestMixin): |
|---|
| 89 | n/a | def setUp(self): |
|---|
| 90 | n/a | self.cum = 0 |
|---|
| 91 | n/a | self.cumlock = threading.Lock() |
|---|
| 92 | n/a | |
|---|
| 93 | n/a | def simple_queue_test(self, q): |
|---|
| 94 | n/a | if q.qsize(): |
|---|
| 95 | n/a | raise RuntimeError("Call this function with an empty queue") |
|---|
| 96 | n/a | self.assertTrue(q.empty()) |
|---|
| 97 | n/a | self.assertFalse(q.full()) |
|---|
| 98 | n/a | # I guess we better check things actually queue correctly a little :) |
|---|
| 99 | n/a | q.put(111) |
|---|
| 100 | n/a | q.put(333) |
|---|
| 101 | n/a | q.put(222) |
|---|
| 102 | n/a | target_order = dict(Queue = [111, 333, 222], |
|---|
| 103 | n/a | LifoQueue = [222, 333, 111], |
|---|
| 104 | n/a | PriorityQueue = [111, 222, 333]) |
|---|
| 105 | n/a | actual_order = [q.get(), q.get(), q.get()] |
|---|
| 106 | n/a | self.assertEqual(actual_order, target_order[q.__class__.__name__], |
|---|
| 107 | n/a | "Didn't seem to queue the correct data!") |
|---|
| 108 | n/a | for i in range(QUEUE_SIZE-1): |
|---|
| 109 | n/a | q.put(i) |
|---|
| 110 | n/a | self.assertTrue(q.qsize(), "Queue should not be empty") |
|---|
| 111 | n/a | self.assertTrue(not qfull(q), "Queue should not be full") |
|---|
| 112 | n/a | last = 2 * QUEUE_SIZE |
|---|
| 113 | n/a | full = 3 * 2 * QUEUE_SIZE |
|---|
| 114 | n/a | q.put(last) |
|---|
| 115 | n/a | self.assertTrue(qfull(q), "Queue should be full") |
|---|
| 116 | n/a | self.assertFalse(q.empty()) |
|---|
| 117 | n/a | self.assertTrue(q.full()) |
|---|
| 118 | n/a | try: |
|---|
| 119 | n/a | q.put(full, block=0) |
|---|
| 120 | n/a | self.fail("Didn't appear to block with a full queue") |
|---|
| 121 | n/a | except queue.Full: |
|---|
| 122 | n/a | pass |
|---|
| 123 | n/a | try: |
|---|
| 124 | n/a | q.put(full, timeout=0.01) |
|---|
| 125 | n/a | self.fail("Didn't appear to time-out with a full queue") |
|---|
| 126 | n/a | except queue.Full: |
|---|
| 127 | n/a | pass |
|---|
| 128 | n/a | # Test a blocking put |
|---|
| 129 | n/a | self.do_blocking_test(q.put, (full,), q.get, ()) |
|---|
| 130 | n/a | self.do_blocking_test(q.put, (full, True, 10), q.get, ()) |
|---|
| 131 | n/a | # Empty it |
|---|
| 132 | n/a | for i in range(QUEUE_SIZE): |
|---|
| 133 | n/a | q.get() |
|---|
| 134 | n/a | self.assertTrue(not q.qsize(), "Queue should be empty") |
|---|
| 135 | n/a | try: |
|---|
| 136 | n/a | q.get(block=0) |
|---|
| 137 | n/a | self.fail("Didn't appear to block with an empty queue") |
|---|
| 138 | n/a | except queue.Empty: |
|---|
| 139 | n/a | pass |
|---|
| 140 | n/a | try: |
|---|
| 141 | n/a | q.get(timeout=0.01) |
|---|
| 142 | n/a | self.fail("Didn't appear to time-out with an empty queue") |
|---|
| 143 | n/a | except queue.Empty: |
|---|
| 144 | n/a | pass |
|---|
| 145 | n/a | # Test a blocking get |
|---|
| 146 | n/a | self.do_blocking_test(q.get, (), q.put, ('empty',)) |
|---|
| 147 | n/a | self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) |
|---|
| 148 | n/a | |
|---|
| 149 | n/a | |
|---|
| 150 | n/a | def worker(self, q): |
|---|
| 151 | n/a | while True: |
|---|
| 152 | n/a | x = q.get() |
|---|
| 153 | n/a | if x < 0: |
|---|
| 154 | n/a | q.task_done() |
|---|
| 155 | n/a | return |
|---|
| 156 | n/a | with self.cumlock: |
|---|
| 157 | n/a | self.cum += x |
|---|
| 158 | n/a | q.task_done() |
|---|
| 159 | n/a | |
|---|
| 160 | n/a | def queue_join_test(self, q): |
|---|
| 161 | n/a | self.cum = 0 |
|---|
| 162 | n/a | for i in (0,1): |
|---|
| 163 | n/a | threading.Thread(target=self.worker, args=(q,)).start() |
|---|
| 164 | n/a | for i in range(100): |
|---|
| 165 | n/a | q.put(i) |
|---|
| 166 | n/a | q.join() |
|---|
| 167 | n/a | self.assertEqual(self.cum, sum(range(100)), |
|---|
| 168 | n/a | "q.join() did not block until all tasks were done") |
|---|
| 169 | n/a | for i in (0,1): |
|---|
| 170 | n/a | q.put(-1) # instruct the threads to close |
|---|
| 171 | n/a | q.join() # verify that you can join twice |
|---|
| 172 | n/a | |
|---|
| 173 | n/a | def test_queue_task_done(self): |
|---|
| 174 | n/a | # Test to make sure a queue task completed successfully. |
|---|
| 175 | n/a | q = self.type2test() |
|---|
| 176 | n/a | try: |
|---|
| 177 | n/a | q.task_done() |
|---|
| 178 | n/a | except ValueError: |
|---|
| 179 | n/a | pass |
|---|
| 180 | n/a | else: |
|---|
| 181 | n/a | self.fail("Did not detect task count going negative") |
|---|
| 182 | n/a | |
|---|
| 183 | n/a | def test_queue_join(self): |
|---|
| 184 | n/a | # Test that a queue join()s successfully, and before anything else |
|---|
| 185 | n/a | # (done twice for insurance). |
|---|
| 186 | n/a | q = self.type2test() |
|---|
| 187 | n/a | self.queue_join_test(q) |
|---|
| 188 | n/a | self.queue_join_test(q) |
|---|
| 189 | n/a | try: |
|---|
| 190 | n/a | q.task_done() |
|---|
| 191 | n/a | except ValueError: |
|---|
| 192 | n/a | pass |
|---|
| 193 | n/a | else: |
|---|
| 194 | n/a | self.fail("Did not detect task count going negative") |
|---|
| 195 | n/a | |
|---|
| 196 | n/a | def test_simple_queue(self): |
|---|
| 197 | n/a | # Do it a couple of times on the same queue. |
|---|
| 198 | n/a | # Done twice to make sure works with same instance reused. |
|---|
| 199 | n/a | q = self.type2test(QUEUE_SIZE) |
|---|
| 200 | n/a | self.simple_queue_test(q) |
|---|
| 201 | n/a | self.simple_queue_test(q) |
|---|
| 202 | n/a | |
|---|
| 203 | n/a | def test_negative_timeout_raises_exception(self): |
|---|
| 204 | n/a | q = self.type2test(QUEUE_SIZE) |
|---|
| 205 | n/a | with self.assertRaises(ValueError): |
|---|
| 206 | n/a | q.put(1, timeout=-1) |
|---|
| 207 | n/a | with self.assertRaises(ValueError): |
|---|
| 208 | n/a | q.get(1, timeout=-1) |
|---|
| 209 | n/a | |
|---|
| 210 | n/a | def test_nowait(self): |
|---|
| 211 | n/a | q = self.type2test(QUEUE_SIZE) |
|---|
| 212 | n/a | for i in range(QUEUE_SIZE): |
|---|
| 213 | n/a | q.put_nowait(1) |
|---|
| 214 | n/a | with self.assertRaises(queue.Full): |
|---|
| 215 | n/a | q.put_nowait(1) |
|---|
| 216 | n/a | |
|---|
| 217 | n/a | for i in range(QUEUE_SIZE): |
|---|
| 218 | n/a | q.get_nowait() |
|---|
| 219 | n/a | with self.assertRaises(queue.Empty): |
|---|
| 220 | n/a | q.get_nowait() |
|---|
| 221 | n/a | |
|---|
| 222 | n/a | def test_shrinking_queue(self): |
|---|
| 223 | n/a | # issue 10110 |
|---|
| 224 | n/a | q = self.type2test(3) |
|---|
| 225 | n/a | q.put(1) |
|---|
| 226 | n/a | q.put(2) |
|---|
| 227 | n/a | q.put(3) |
|---|
| 228 | n/a | with self.assertRaises(queue.Full): |
|---|
| 229 | n/a | q.put_nowait(4) |
|---|
| 230 | n/a | self.assertEqual(q.qsize(), 3) |
|---|
| 231 | n/a | q.maxsize = 2 # shrink the queue |
|---|
| 232 | n/a | with self.assertRaises(queue.Full): |
|---|
| 233 | n/a | q.put_nowait(4) |
|---|
| 234 | n/a | |
|---|
| 235 | n/a | class QueueTest(BaseQueueTestMixin, unittest.TestCase): |
|---|
| 236 | n/a | type2test = queue.Queue |
|---|
| 237 | n/a | |
|---|
| 238 | n/a | class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase): |
|---|
| 239 | n/a | type2test = queue.LifoQueue |
|---|
| 240 | n/a | |
|---|
| 241 | n/a | class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase): |
|---|
| 242 | n/a | type2test = queue.PriorityQueue |
|---|
| 243 | n/a | |
|---|
| 244 | n/a | |
|---|
| 245 | n/a | |
|---|
| 246 | n/a | # A Queue subclass that can provoke failure at a moment's notice :) |
|---|
| 247 | n/a | class FailingQueueException(Exception): |
|---|
| 248 | n/a | pass |
|---|
| 249 | n/a | |
|---|
| 250 | n/a | class FailingQueue(queue.Queue): |
|---|
| 251 | n/a | def __init__(self, *args): |
|---|
| 252 | n/a | self.fail_next_put = False |
|---|
| 253 | n/a | self.fail_next_get = False |
|---|
| 254 | n/a | queue.Queue.__init__(self, *args) |
|---|
| 255 | n/a | def _put(self, item): |
|---|
| 256 | n/a | if self.fail_next_put: |
|---|
| 257 | n/a | self.fail_next_put = False |
|---|
| 258 | n/a | raise FailingQueueException("You Lose") |
|---|
| 259 | n/a | return queue.Queue._put(self, item) |
|---|
| 260 | n/a | def _get(self): |
|---|
| 261 | n/a | if self.fail_next_get: |
|---|
| 262 | n/a | self.fail_next_get = False |
|---|
| 263 | n/a | raise FailingQueueException("You Lose") |
|---|
| 264 | n/a | return queue.Queue._get(self) |
|---|
| 265 | n/a | |
|---|
| 266 | n/a | class FailingQueueTest(BlockingTestMixin, unittest.TestCase): |
|---|
| 267 | n/a | |
|---|
| 268 | n/a | def failing_queue_test(self, q): |
|---|
| 269 | n/a | if q.qsize(): |
|---|
| 270 | n/a | raise RuntimeError("Call this function with an empty queue") |
|---|
| 271 | n/a | for i in range(QUEUE_SIZE-1): |
|---|
| 272 | n/a | q.put(i) |
|---|
| 273 | n/a | # Test a failing non-blocking put. |
|---|
| 274 | n/a | q.fail_next_put = True |
|---|
| 275 | n/a | try: |
|---|
| 276 | n/a | q.put("oops", block=0) |
|---|
| 277 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 278 | n/a | except FailingQueueException: |
|---|
| 279 | n/a | pass |
|---|
| 280 | n/a | q.fail_next_put = True |
|---|
| 281 | n/a | try: |
|---|
| 282 | n/a | q.put("oops", timeout=0.1) |
|---|
| 283 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 284 | n/a | except FailingQueueException: |
|---|
| 285 | n/a | pass |
|---|
| 286 | n/a | q.put("last") |
|---|
| 287 | n/a | self.assertTrue(qfull(q), "Queue should be full") |
|---|
| 288 | n/a | # Test a failing blocking put |
|---|
| 289 | n/a | q.fail_next_put = True |
|---|
| 290 | n/a | try: |
|---|
| 291 | n/a | self.do_blocking_test(q.put, ("full",), q.get, ()) |
|---|
| 292 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 293 | n/a | except FailingQueueException: |
|---|
| 294 | n/a | pass |
|---|
| 295 | n/a | # Check the Queue isn't damaged. |
|---|
| 296 | n/a | # put failed, but get succeeded - re-add |
|---|
| 297 | n/a | q.put("last") |
|---|
| 298 | n/a | # Test a failing timeout put |
|---|
| 299 | n/a | q.fail_next_put = True |
|---|
| 300 | n/a | try: |
|---|
| 301 | n/a | self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), |
|---|
| 302 | n/a | FailingQueueException) |
|---|
| 303 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 304 | n/a | except FailingQueueException: |
|---|
| 305 | n/a | pass |
|---|
| 306 | n/a | # Check the Queue isn't damaged. |
|---|
| 307 | n/a | # put failed, but get succeeded - re-add |
|---|
| 308 | n/a | q.put("last") |
|---|
| 309 | n/a | self.assertTrue(qfull(q), "Queue should be full") |
|---|
| 310 | n/a | q.get() |
|---|
| 311 | n/a | self.assertTrue(not qfull(q), "Queue should not be full") |
|---|
| 312 | n/a | q.put("last") |
|---|
| 313 | n/a | self.assertTrue(qfull(q), "Queue should be full") |
|---|
| 314 | n/a | # Test a blocking put |
|---|
| 315 | n/a | self.do_blocking_test(q.put, ("full",), q.get, ()) |
|---|
| 316 | n/a | # Empty it |
|---|
| 317 | n/a | for i in range(QUEUE_SIZE): |
|---|
| 318 | n/a | q.get() |
|---|
| 319 | n/a | self.assertTrue(not q.qsize(), "Queue should be empty") |
|---|
| 320 | n/a | q.put("first") |
|---|
| 321 | n/a | q.fail_next_get = True |
|---|
| 322 | n/a | try: |
|---|
| 323 | n/a | q.get() |
|---|
| 324 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 325 | n/a | except FailingQueueException: |
|---|
| 326 | n/a | pass |
|---|
| 327 | n/a | self.assertTrue(q.qsize(), "Queue should not be empty") |
|---|
| 328 | n/a | q.fail_next_get = True |
|---|
| 329 | n/a | try: |
|---|
| 330 | n/a | q.get(timeout=0.1) |
|---|
| 331 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 332 | n/a | except FailingQueueException: |
|---|
| 333 | n/a | pass |
|---|
| 334 | n/a | self.assertTrue(q.qsize(), "Queue should not be empty") |
|---|
| 335 | n/a | q.get() |
|---|
| 336 | n/a | self.assertTrue(not q.qsize(), "Queue should be empty") |
|---|
| 337 | n/a | q.fail_next_get = True |
|---|
| 338 | n/a | try: |
|---|
| 339 | n/a | self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), |
|---|
| 340 | n/a | FailingQueueException) |
|---|
| 341 | n/a | self.fail("The queue didn't fail when it should have") |
|---|
| 342 | n/a | except FailingQueueException: |
|---|
| 343 | n/a | pass |
|---|
| 344 | n/a | # put succeeded, but get failed. |
|---|
| 345 | n/a | self.assertTrue(q.qsize(), "Queue should not be empty") |
|---|
| 346 | n/a | q.get() |
|---|
| 347 | n/a | self.assertTrue(not q.qsize(), "Queue should be empty") |
|---|
| 348 | n/a | |
|---|
| 349 | n/a | def test_failing_queue(self): |
|---|
| 350 | n/a | # Test to make sure a queue is functioning correctly. |
|---|
| 351 | n/a | # Done twice to the same instance. |
|---|
| 352 | n/a | q = FailingQueue(QUEUE_SIZE) |
|---|
| 353 | n/a | self.failing_queue_test(q) |
|---|
| 354 | n/a | self.failing_queue_test(q) |
|---|
| 355 | n/a | |
|---|
| 356 | n/a | |
|---|
| 357 | n/a | if __name__ == "__main__": |
|---|
| 358 | n/a | unittest.main() |
|---|