ยปCore Development>Code coverage>Lib/test/lock_tests.py

Python code coverage for Lib/test/lock_tests.py

#countcontent
1n/a"""
2n/aVarious tests for synchronization primitives.
3n/a"""
4n/a
5n/aimport sys
6n/aimport time
7n/afrom _thread import start_new_thread, TIMEOUT_MAX
8n/aimport threading
9n/aimport unittest
10n/aimport weakref
11n/a
12n/afrom test import support
13n/a
14n/a
15n/adef _wait():
16n/a # A crude wait/yield function not relying on synchronization primitives.
17n/a time.sleep(0.01)
18n/a
19n/aclass Bunch(object):
20n/a """
21n/a A bunch of threads.
22n/a """
23n/a def __init__(self, f, n, wait_before_exit=False):
24n/a """
25n/a Construct a bunch of `n` threads running the same function `f`.
26n/a If `wait_before_exit` is True, the threads won't terminate until
27n/a do_finish() is called.
28n/a """
29n/a self.f = f
30n/a self.n = n
31n/a self.started = []
32n/a self.finished = []
33n/a self._can_exit = not wait_before_exit
34n/a def task():
35n/a tid = threading.get_ident()
36n/a self.started.append(tid)
37n/a try:
38n/a f()
39n/a finally:
40n/a self.finished.append(tid)
41n/a while not self._can_exit:
42n/a _wait()
43n/a try:
44n/a for i in range(n):
45n/a start_new_thread(task, ())
46n/a except:
47n/a self._can_exit = True
48n/a raise
49n/a
50n/a def wait_for_started(self):
51n/a while len(self.started) < self.n:
52n/a _wait()
53n/a
54n/a def wait_for_finished(self):
55n/a while len(self.finished) < self.n:
56n/a _wait()
57n/a
58n/a def do_finish(self):
59n/a self._can_exit = True
60n/a
61n/a
62n/aclass BaseTestCase(unittest.TestCase):
63n/a def setUp(self):
64n/a self._threads = support.threading_setup()
65n/a
66n/a def tearDown(self):
67n/a support.threading_cleanup(*self._threads)
68n/a support.reap_children()
69n/a
70n/a def assertTimeout(self, actual, expected):
71n/a # The waiting and/or time.time() can be imprecise, which
72n/a # is why comparing to the expected value would sometimes fail
73n/a # (especially under Windows).
74n/a self.assertGreaterEqual(actual, expected * 0.6)
75n/a # Test nothing insane happened
76n/a self.assertLess(actual, expected * 10.0)
77n/a
78n/a
79n/aclass BaseLockTests(BaseTestCase):
80n/a """
81n/a Tests for both recursive and non-recursive locks.
82n/a """
83n/a
84n/a def test_constructor(self):
85n/a lock = self.locktype()
86n/a del lock
87n/a
88n/a def test_repr(self):
89n/a lock = self.locktype()
90n/a self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
91n/a del lock
92n/a
93n/a def test_locked_repr(self):
94n/a lock = self.locktype()
95n/a lock.acquire()
96n/a self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
97n/a del lock
98n/a
99n/a def test_acquire_destroy(self):
100n/a lock = self.locktype()
101n/a lock.acquire()
102n/a del lock
103n/a
104n/a def test_acquire_release(self):
105n/a lock = self.locktype()
106n/a lock.acquire()
107n/a lock.release()
108n/a del lock
109n/a
110n/a def test_try_acquire(self):
111n/a lock = self.locktype()
112n/a self.assertTrue(lock.acquire(False))
113n/a lock.release()
114n/a
115n/a def test_try_acquire_contended(self):
116n/a lock = self.locktype()
117n/a lock.acquire()
118n/a result = []
119n/a def f():
120n/a result.append(lock.acquire(False))
121n/a Bunch(f, 1).wait_for_finished()
122n/a self.assertFalse(result[0])
123n/a lock.release()
124n/a
125n/a def test_acquire_contended(self):
126n/a lock = self.locktype()
127n/a lock.acquire()
128n/a N = 5
129n/a def f():
130n/a lock.acquire()
131n/a lock.release()
132n/a
133n/a b = Bunch(f, N)
134n/a b.wait_for_started()
135n/a _wait()
136n/a self.assertEqual(len(b.finished), 0)
137n/a lock.release()
138n/a b.wait_for_finished()
139n/a self.assertEqual(len(b.finished), N)
140n/a
141n/a def test_with(self):
142n/a lock = self.locktype()
143n/a def f():
144n/a lock.acquire()
145n/a lock.release()
146n/a def _with(err=None):
147n/a with lock:
148n/a if err is not None:
149n/a raise err
150n/a _with()
151n/a # Check the lock is unacquired
152n/a Bunch(f, 1).wait_for_finished()
153n/a self.assertRaises(TypeError, _with, TypeError)
154n/a # Check the lock is unacquired
155n/a Bunch(f, 1).wait_for_finished()
156n/a
157n/a def test_thread_leak(self):
158n/a # The lock shouldn't leak a Thread instance when used from a foreign
159n/a # (non-threading) thread.
160n/a lock = self.locktype()
161n/a def f():
162n/a lock.acquire()
163n/a lock.release()
164n/a n = len(threading.enumerate())
165n/a # We run many threads in the hope that existing threads ids won't
166n/a # be recycled.
167n/a Bunch(f, 15).wait_for_finished()
168n/a if len(threading.enumerate()) != n:
169n/a # There is a small window during which a Thread instance's
170n/a # target function has finished running, but the Thread is still
171n/a # alive and registered. Avoid spurious failures by waiting a
172n/a # bit more (seen on a buildbot).
173n/a time.sleep(0.4)
174n/a self.assertEqual(n, len(threading.enumerate()))
175n/a
176n/a def test_timeout(self):
177n/a lock = self.locktype()
178n/a # Can't set timeout if not blocking
179n/a self.assertRaises(ValueError, lock.acquire, 0, 1)
180n/a # Invalid timeout values
181n/a self.assertRaises(ValueError, lock.acquire, timeout=-100)
182n/a self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
183n/a self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
184n/a # TIMEOUT_MAX is ok
185n/a lock.acquire(timeout=TIMEOUT_MAX)
186n/a lock.release()
187n/a t1 = time.time()
188n/a self.assertTrue(lock.acquire(timeout=5))
189n/a t2 = time.time()
190n/a # Just a sanity test that it didn't actually wait for the timeout.
191n/a self.assertLess(t2 - t1, 5)
192n/a results = []
193n/a def f():
194n/a t1 = time.time()
195n/a results.append(lock.acquire(timeout=0.5))
196n/a t2 = time.time()
197n/a results.append(t2 - t1)
198n/a Bunch(f, 1).wait_for_finished()
199n/a self.assertFalse(results[0])
200n/a self.assertTimeout(results[1], 0.5)
201n/a
202n/a def test_weakref_exists(self):
203n/a lock = self.locktype()
204n/a ref = weakref.ref(lock)
205n/a self.assertIsNotNone(ref())
206n/a
207n/a def test_weakref_deleted(self):
208n/a lock = self.locktype()
209n/a ref = weakref.ref(lock)
210n/a del lock
211n/a self.assertIsNone(ref())
212n/a
213n/a
214n/aclass LockTests(BaseLockTests):
215n/a """
216n/a Tests for non-recursive, weak locks
217n/a (which can be acquired and released from different threads).
218n/a """
219n/a def test_reacquire(self):
220n/a # Lock needs to be released before re-acquiring.
221n/a lock = self.locktype()
222n/a phase = []
223n/a def f():
224n/a lock.acquire()
225n/a phase.append(None)
226n/a lock.acquire()
227n/a phase.append(None)
228n/a start_new_thread(f, ())
229n/a while len(phase) == 0:
230n/a _wait()
231n/a _wait()
232n/a self.assertEqual(len(phase), 1)
233n/a lock.release()
234n/a while len(phase) == 1:
235n/a _wait()
236n/a self.assertEqual(len(phase), 2)
237n/a
238n/a def test_different_thread(self):
239n/a # Lock can be released from a different thread.
240n/a lock = self.locktype()
241n/a lock.acquire()
242n/a def f():
243n/a lock.release()
244n/a b = Bunch(f, 1)
245n/a b.wait_for_finished()
246n/a lock.acquire()
247n/a lock.release()
248n/a
249n/a def test_state_after_timeout(self):
250n/a # Issue #11618: check that lock is in a proper state after a
251n/a # (non-zero) timeout.
252n/a lock = self.locktype()
253n/a lock.acquire()
254n/a self.assertFalse(lock.acquire(timeout=0.01))
255n/a lock.release()
256n/a self.assertFalse(lock.locked())
257n/a self.assertTrue(lock.acquire(blocking=False))
258n/a
259n/a
260n/aclass RLockTests(BaseLockTests):
261n/a """
262n/a Tests for recursive locks.
263n/a """
264n/a def test_reacquire(self):
265n/a lock = self.locktype()
266n/a lock.acquire()
267n/a lock.acquire()
268n/a lock.release()
269n/a lock.acquire()
270n/a lock.release()
271n/a lock.release()
272n/a
273n/a def test_release_unacquired(self):
274n/a # Cannot release an unacquired lock
275n/a lock = self.locktype()
276n/a self.assertRaises(RuntimeError, lock.release)
277n/a lock.acquire()
278n/a lock.acquire()
279n/a lock.release()
280n/a lock.acquire()
281n/a lock.release()
282n/a lock.release()
283n/a self.assertRaises(RuntimeError, lock.release)
284n/a
285n/a def test_release_save_unacquired(self):
286n/a # Cannot _release_save an unacquired lock
287n/a lock = self.locktype()
288n/a self.assertRaises(RuntimeError, lock._release_save)
289n/a lock.acquire()
290n/a lock.acquire()
291n/a lock.release()
292n/a lock.acquire()
293n/a lock.release()
294n/a lock.release()
295n/a self.assertRaises(RuntimeError, lock._release_save)
296n/a
297n/a def test_different_thread(self):
298n/a # Cannot release from a different thread
299n/a lock = self.locktype()
300n/a def f():
301n/a lock.acquire()
302n/a b = Bunch(f, 1, True)
303n/a try:
304n/a self.assertRaises(RuntimeError, lock.release)
305n/a finally:
306n/a b.do_finish()
307n/a
308n/a def test__is_owned(self):
309n/a lock = self.locktype()
310n/a self.assertFalse(lock._is_owned())
311n/a lock.acquire()
312n/a self.assertTrue(lock._is_owned())
313n/a lock.acquire()
314n/a self.assertTrue(lock._is_owned())
315n/a result = []
316n/a def f():
317n/a result.append(lock._is_owned())
318n/a Bunch(f, 1).wait_for_finished()
319n/a self.assertFalse(result[0])
320n/a lock.release()
321n/a self.assertTrue(lock._is_owned())
322n/a lock.release()
323n/a self.assertFalse(lock._is_owned())
324n/a
325n/a
326n/aclass EventTests(BaseTestCase):
327n/a """
328n/a Tests for Event objects.
329n/a """
330n/a
331n/a def test_is_set(self):
332n/a evt = self.eventtype()
333n/a self.assertFalse(evt.is_set())
334n/a evt.set()
335n/a self.assertTrue(evt.is_set())
336n/a evt.set()
337n/a self.assertTrue(evt.is_set())
338n/a evt.clear()
339n/a self.assertFalse(evt.is_set())
340n/a evt.clear()
341n/a self.assertFalse(evt.is_set())
342n/a
343n/a def _check_notify(self, evt):
344n/a # All threads get notified
345n/a N = 5
346n/a results1 = []
347n/a results2 = []
348n/a def f():
349n/a results1.append(evt.wait())
350n/a results2.append(evt.wait())
351n/a b = Bunch(f, N)
352n/a b.wait_for_started()
353n/a _wait()
354n/a self.assertEqual(len(results1), 0)
355n/a evt.set()
356n/a b.wait_for_finished()
357n/a self.assertEqual(results1, [True] * N)
358n/a self.assertEqual(results2, [True] * N)
359n/a
360n/a def test_notify(self):
361n/a evt = self.eventtype()
362n/a self._check_notify(evt)
363n/a # Another time, after an explicit clear()
364n/a evt.set()
365n/a evt.clear()
366n/a self._check_notify(evt)
367n/a
368n/a def test_timeout(self):
369n/a evt = self.eventtype()
370n/a results1 = []
371n/a results2 = []
372n/a N = 5
373n/a def f():
374n/a results1.append(evt.wait(0.0))
375n/a t1 = time.time()
376n/a r = evt.wait(0.5)
377n/a t2 = time.time()
378n/a results2.append((r, t2 - t1))
379n/a Bunch(f, N).wait_for_finished()
380n/a self.assertEqual(results1, [False] * N)
381n/a for r, dt in results2:
382n/a self.assertFalse(r)
383n/a self.assertTimeout(dt, 0.5)
384n/a # The event is set
385n/a results1 = []
386n/a results2 = []
387n/a evt.set()
388n/a Bunch(f, N).wait_for_finished()
389n/a self.assertEqual(results1, [True] * N)
390n/a for r, dt in results2:
391n/a self.assertTrue(r)
392n/a
393n/a def test_set_and_clear(self):
394n/a # Issue #13502: check that wait() returns true even when the event is
395n/a # cleared before the waiting thread is woken up.
396n/a evt = self.eventtype()
397n/a results = []
398n/a N = 5
399n/a def f():
400n/a results.append(evt.wait(1))
401n/a b = Bunch(f, N)
402n/a b.wait_for_started()
403n/a time.sleep(0.5)
404n/a evt.set()
405n/a evt.clear()
406n/a b.wait_for_finished()
407n/a self.assertEqual(results, [True] * N)
408n/a
409n/a def test_reset_internal_locks(self):
410n/a # ensure that condition is still using a Lock after reset
411n/a evt = self.eventtype()
412n/a with evt._cond:
413n/a self.assertFalse(evt._cond.acquire(False))
414n/a evt._reset_internal_locks()
415n/a with evt._cond:
416n/a self.assertFalse(evt._cond.acquire(False))
417n/a
418n/a
419n/aclass ConditionTests(BaseTestCase):
420n/a """
421n/a Tests for condition variables.
422n/a """
423n/a
424n/a def test_acquire(self):
425n/a cond = self.condtype()
426n/a # Be default we have an RLock: the condition can be acquired multiple
427n/a # times.
428n/a cond.acquire()
429n/a cond.acquire()
430n/a cond.release()
431n/a cond.release()
432n/a lock = threading.Lock()
433n/a cond = self.condtype(lock)
434n/a cond.acquire()
435n/a self.assertFalse(lock.acquire(False))
436n/a cond.release()
437n/a self.assertTrue(lock.acquire(False))
438n/a self.assertFalse(cond.acquire(False))
439n/a lock.release()
440n/a with cond:
441n/a self.assertFalse(lock.acquire(False))
442n/a
443n/a def test_unacquired_wait(self):
444n/a cond = self.condtype()
445n/a self.assertRaises(RuntimeError, cond.wait)
446n/a
447n/a def test_unacquired_notify(self):
448n/a cond = self.condtype()
449n/a self.assertRaises(RuntimeError, cond.notify)
450n/a
451n/a def _check_notify(self, cond):
452n/a # Note that this test is sensitive to timing. If the worker threads
453n/a # don't execute in a timely fashion, the main thread may think they
454n/a # are further along then they are. The main thread therefore issues
455n/a # _wait() statements to try to make sure that it doesn't race ahead
456n/a # of the workers.
457n/a # Secondly, this test assumes that condition variables are not subject
458n/a # to spurious wakeups. The absence of spurious wakeups is an implementation
459n/a # detail of Condition Cariables in current CPython, but in general, not
460n/a # a guaranteed property of condition variables as a programming
461n/a # construct. In particular, it is possible that this can no longer
462n/a # be conveniently guaranteed should their implementation ever change.
463n/a N = 5
464n/a results1 = []
465n/a results2 = []
466n/a phase_num = 0
467n/a def f():
468n/a cond.acquire()
469n/a result = cond.wait()
470n/a cond.release()
471n/a results1.append((result, phase_num))
472n/a cond.acquire()
473n/a result = cond.wait()
474n/a cond.release()
475n/a results2.append((result, phase_num))
476n/a b = Bunch(f, N)
477n/a b.wait_for_started()
478n/a _wait()
479n/a self.assertEqual(results1, [])
480n/a # Notify 3 threads at first
481n/a cond.acquire()
482n/a cond.notify(3)
483n/a _wait()
484n/a phase_num = 1
485n/a cond.release()
486n/a while len(results1) < 3:
487n/a _wait()
488n/a self.assertEqual(results1, [(True, 1)] * 3)
489n/a self.assertEqual(results2, [])
490n/a # first wait, to ensure all workers settle into cond.wait() before
491n/a # we continue. See issue #8799
492n/a _wait()
493n/a # Notify 5 threads: they might be in their first or second wait
494n/a cond.acquire()
495n/a cond.notify(5)
496n/a _wait()
497n/a phase_num = 2
498n/a cond.release()
499n/a while len(results1) + len(results2) < 8:
500n/a _wait()
501n/a self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
502n/a self.assertEqual(results2, [(True, 2)] * 3)
503n/a _wait() # make sure all workers settle into cond.wait()
504n/a # Notify all threads: they are all in their second wait
505n/a cond.acquire()
506n/a cond.notify_all()
507n/a _wait()
508n/a phase_num = 3
509n/a cond.release()
510n/a while len(results2) < 5:
511n/a _wait()
512n/a self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
513n/a self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
514n/a b.wait_for_finished()
515n/a
516n/a def test_notify(self):
517n/a cond = self.condtype()
518n/a self._check_notify(cond)
519n/a # A second time, to check internal state is still ok.
520n/a self._check_notify(cond)
521n/a
522n/a def test_timeout(self):
523n/a cond = self.condtype()
524n/a results = []
525n/a N = 5
526n/a def f():
527n/a cond.acquire()
528n/a t1 = time.time()
529n/a result = cond.wait(0.5)
530n/a t2 = time.time()
531n/a cond.release()
532n/a results.append((t2 - t1, result))
533n/a Bunch(f, N).wait_for_finished()
534n/a self.assertEqual(len(results), N)
535n/a for dt, result in results:
536n/a self.assertTimeout(dt, 0.5)
537n/a # Note that conceptually (that"s the condition variable protocol)
538n/a # a wait() may succeed even if no one notifies us and before any
539n/a # timeout occurs. Spurious wakeups can occur.
540n/a # This makes it hard to verify the result value.
541n/a # In practice, this implementation has no spurious wakeups.
542n/a self.assertFalse(result)
543n/a
544n/a def test_waitfor(self):
545n/a cond = self.condtype()
546n/a state = 0
547n/a def f():
548n/a with cond:
549n/a result = cond.wait_for(lambda : state==4)
550n/a self.assertTrue(result)
551n/a self.assertEqual(state, 4)
552n/a b = Bunch(f, 1)
553n/a b.wait_for_started()
554n/a for i in range(4):
555n/a time.sleep(0.01)
556n/a with cond:
557n/a state += 1
558n/a cond.notify()
559n/a b.wait_for_finished()
560n/a
561n/a def test_waitfor_timeout(self):
562n/a cond = self.condtype()
563n/a state = 0
564n/a success = []
565n/a def f():
566n/a with cond:
567n/a dt = time.time()
568n/a result = cond.wait_for(lambda : state==4, timeout=0.1)
569n/a dt = time.time() - dt
570n/a self.assertFalse(result)
571n/a self.assertTimeout(dt, 0.1)
572n/a success.append(None)
573n/a b = Bunch(f, 1)
574n/a b.wait_for_started()
575n/a # Only increment 3 times, so state == 4 is never reached.
576n/a for i in range(3):
577n/a time.sleep(0.01)
578n/a with cond:
579n/a state += 1
580n/a cond.notify()
581n/a b.wait_for_finished()
582n/a self.assertEqual(len(success), 1)
583n/a
584n/a
585n/aclass BaseSemaphoreTests(BaseTestCase):
586n/a """
587n/a Common tests for {bounded, unbounded} semaphore objects.
588n/a """
589n/a
590n/a def test_constructor(self):
591n/a self.assertRaises(ValueError, self.semtype, value = -1)
592n/a self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
593n/a
594n/a def test_acquire(self):
595n/a sem = self.semtype(1)
596n/a sem.acquire()
597n/a sem.release()
598n/a sem = self.semtype(2)
599n/a sem.acquire()
600n/a sem.acquire()
601n/a sem.release()
602n/a sem.release()
603n/a
604n/a def test_acquire_destroy(self):
605n/a sem = self.semtype()
606n/a sem.acquire()
607n/a del sem
608n/a
609n/a def test_acquire_contended(self):
610n/a sem = self.semtype(7)
611n/a sem.acquire()
612n/a N = 10
613n/a results1 = []
614n/a results2 = []
615n/a phase_num = 0
616n/a def f():
617n/a sem.acquire()
618n/a results1.append(phase_num)
619n/a sem.acquire()
620n/a results2.append(phase_num)
621n/a b = Bunch(f, 10)
622n/a b.wait_for_started()
623n/a while len(results1) + len(results2) < 6:
624n/a _wait()
625n/a self.assertEqual(results1 + results2, [0] * 6)
626n/a phase_num = 1
627n/a for i in range(7):
628n/a sem.release()
629n/a while len(results1) + len(results2) < 13:
630n/a _wait()
631n/a self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
632n/a phase_num = 2
633n/a for i in range(6):
634n/a sem.release()
635n/a while len(results1) + len(results2) < 19:
636n/a _wait()
637n/a self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
638n/a # The semaphore is still locked
639n/a self.assertFalse(sem.acquire(False))
640n/a # Final release, to let the last thread finish
641n/a sem.release()
642n/a b.wait_for_finished()
643n/a
644n/a def test_try_acquire(self):
645n/a sem = self.semtype(2)
646n/a self.assertTrue(sem.acquire(False))
647n/a self.assertTrue(sem.acquire(False))
648n/a self.assertFalse(sem.acquire(False))
649n/a sem.release()
650n/a self.assertTrue(sem.acquire(False))
651n/a
652n/a def test_try_acquire_contended(self):
653n/a sem = self.semtype(4)
654n/a sem.acquire()
655n/a results = []
656n/a def f():
657n/a results.append(sem.acquire(False))
658n/a results.append(sem.acquire(False))
659n/a Bunch(f, 5).wait_for_finished()
660n/a # There can be a thread switch between acquiring the semaphore and
661n/a # appending the result, therefore results will not necessarily be
662n/a # ordered.
663n/a self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
664n/a
665n/a def test_acquire_timeout(self):
666n/a sem = self.semtype(2)
667n/a self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
668n/a self.assertTrue(sem.acquire(timeout=0.005))
669n/a self.assertTrue(sem.acquire(timeout=0.005))
670n/a self.assertFalse(sem.acquire(timeout=0.005))
671n/a sem.release()
672n/a self.assertTrue(sem.acquire(timeout=0.005))
673n/a t = time.time()
674n/a self.assertFalse(sem.acquire(timeout=0.5))
675n/a dt = time.time() - t
676n/a self.assertTimeout(dt, 0.5)
677n/a
678n/a def test_default_value(self):
679n/a # The default initial value is 1.
680n/a sem = self.semtype()
681n/a sem.acquire()
682n/a def f():
683n/a sem.acquire()
684n/a sem.release()
685n/a b = Bunch(f, 1)
686n/a b.wait_for_started()
687n/a _wait()
688n/a self.assertFalse(b.finished)
689n/a sem.release()
690n/a b.wait_for_finished()
691n/a
692n/a def test_with(self):
693n/a sem = self.semtype(2)
694n/a def _with(err=None):
695n/a with sem:
696n/a self.assertTrue(sem.acquire(False))
697n/a sem.release()
698n/a with sem:
699n/a self.assertFalse(sem.acquire(False))
700n/a if err:
701n/a raise err
702n/a _with()
703n/a self.assertTrue(sem.acquire(False))
704n/a sem.release()
705n/a self.assertRaises(TypeError, _with, TypeError)
706n/a self.assertTrue(sem.acquire(False))
707n/a sem.release()
708n/a
709n/aclass SemaphoreTests(BaseSemaphoreTests):
710n/a """
711n/a Tests for unbounded semaphores.
712n/a """
713n/a
714n/a def test_release_unacquired(self):
715n/a # Unbounded releases are allowed and increment the semaphore's value
716n/a sem = self.semtype(1)
717n/a sem.release()
718n/a sem.acquire()
719n/a sem.acquire()
720n/a sem.release()
721n/a
722n/a
723n/aclass BoundedSemaphoreTests(BaseSemaphoreTests):
724n/a """
725n/a Tests for bounded semaphores.
726n/a """
727n/a
728n/a def test_release_unacquired(self):
729n/a # Cannot go past the initial value
730n/a sem = self.semtype()
731n/a self.assertRaises(ValueError, sem.release)
732n/a sem.acquire()
733n/a sem.release()
734n/a self.assertRaises(ValueError, sem.release)
735n/a
736n/a
737n/aclass BarrierTests(BaseTestCase):
738n/a """
739n/a Tests for Barrier objects.
740n/a """
741n/a N = 5
742n/a defaultTimeout = 2.0
743n/a
744n/a def setUp(self):
745n/a self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
746n/a def tearDown(self):
747n/a self.barrier.abort()
748n/a
749n/a def run_threads(self, f):
750n/a b = Bunch(f, self.N-1)
751n/a f()
752n/a b.wait_for_finished()
753n/a
754n/a def multipass(self, results, n):
755n/a m = self.barrier.parties
756n/a self.assertEqual(m, self.N)
757n/a for i in range(n):
758n/a results[0].append(True)
759n/a self.assertEqual(len(results[1]), i * m)
760n/a self.barrier.wait()
761n/a results[1].append(True)
762n/a self.assertEqual(len(results[0]), (i + 1) * m)
763n/a self.barrier.wait()
764n/a self.assertEqual(self.barrier.n_waiting, 0)
765n/a self.assertFalse(self.barrier.broken)
766n/a
767n/a def test_barrier(self, passes=1):
768n/a """
769n/a Test that a barrier is passed in lockstep
770n/a """
771n/a results = [[],[]]
772n/a def f():
773n/a self.multipass(results, passes)
774n/a self.run_threads(f)
775n/a
776n/a def test_barrier_10(self):
777n/a """
778n/a Test that a barrier works for 10 consecutive runs
779n/a """
780n/a return self.test_barrier(10)
781n/a
782n/a def test_wait_return(self):
783n/a """
784n/a test the return value from barrier.wait
785n/a """
786n/a results = []
787n/a def f():
788n/a r = self.barrier.wait()
789n/a results.append(r)
790n/a
791n/a self.run_threads(f)
792n/a self.assertEqual(sum(results), sum(range(self.N)))
793n/a
794n/a def test_action(self):
795n/a """
796n/a Test the 'action' callback
797n/a """
798n/a results = []
799n/a def action():
800n/a results.append(True)
801n/a barrier = self.barriertype(self.N, action)
802n/a def f():
803n/a barrier.wait()
804n/a self.assertEqual(len(results), 1)
805n/a
806n/a self.run_threads(f)
807n/a
808n/a def test_abort(self):
809n/a """
810n/a Test that an abort will put the barrier in a broken state
811n/a """
812n/a results1 = []
813n/a results2 = []
814n/a def f():
815n/a try:
816n/a i = self.barrier.wait()
817n/a if i == self.N//2:
818n/a raise RuntimeError
819n/a self.barrier.wait()
820n/a results1.append(True)
821n/a except threading.BrokenBarrierError:
822n/a results2.append(True)
823n/a except RuntimeError:
824n/a self.barrier.abort()
825n/a pass
826n/a
827n/a self.run_threads(f)
828n/a self.assertEqual(len(results1), 0)
829n/a self.assertEqual(len(results2), self.N-1)
830n/a self.assertTrue(self.barrier.broken)
831n/a
832n/a def test_reset(self):
833n/a """
834n/a Test that a 'reset' on a barrier frees the waiting threads
835n/a """
836n/a results1 = []
837n/a results2 = []
838n/a results3 = []
839n/a def f():
840n/a i = self.barrier.wait()
841n/a if i == self.N//2:
842n/a # Wait until the other threads are all in the barrier.
843n/a while self.barrier.n_waiting < self.N-1:
844n/a time.sleep(0.001)
845n/a self.barrier.reset()
846n/a else:
847n/a try:
848n/a self.barrier.wait()
849n/a results1.append(True)
850n/a except threading.BrokenBarrierError:
851n/a results2.append(True)
852n/a # Now, pass the barrier again
853n/a self.barrier.wait()
854n/a results3.append(True)
855n/a
856n/a self.run_threads(f)
857n/a self.assertEqual(len(results1), 0)
858n/a self.assertEqual(len(results2), self.N-1)
859n/a self.assertEqual(len(results3), self.N)
860n/a
861n/a
862n/a def test_abort_and_reset(self):
863n/a """
864n/a Test that a barrier can be reset after being broken.
865n/a """
866n/a results1 = []
867n/a results2 = []
868n/a results3 = []
869n/a barrier2 = self.barriertype(self.N)
870n/a def f():
871n/a try:
872n/a i = self.barrier.wait()
873n/a if i == self.N//2:
874n/a raise RuntimeError
875n/a self.barrier.wait()
876n/a results1.append(True)
877n/a except threading.BrokenBarrierError:
878n/a results2.append(True)
879n/a except RuntimeError:
880n/a self.barrier.abort()
881n/a pass
882n/a # Synchronize and reset the barrier. Must synchronize first so
883n/a # that everyone has left it when we reset, and after so that no
884n/a # one enters it before the reset.
885n/a if barrier2.wait() == self.N//2:
886n/a self.barrier.reset()
887n/a barrier2.wait()
888n/a self.barrier.wait()
889n/a results3.append(True)
890n/a
891n/a self.run_threads(f)
892n/a self.assertEqual(len(results1), 0)
893n/a self.assertEqual(len(results2), self.N-1)
894n/a self.assertEqual(len(results3), self.N)
895n/a
896n/a def test_timeout(self):
897n/a """
898n/a Test wait(timeout)
899n/a """
900n/a def f():
901n/a i = self.barrier.wait()
902n/a if i == self.N // 2:
903n/a # One thread is late!
904n/a time.sleep(1.0)
905n/a # Default timeout is 2.0, so this is shorter.
906n/a self.assertRaises(threading.BrokenBarrierError,
907n/a self.barrier.wait, 0.5)
908n/a self.run_threads(f)
909n/a
910n/a def test_default_timeout(self):
911n/a """
912n/a Test the barrier's default timeout
913n/a """
914n/a # create a barrier with a low default timeout
915n/a barrier = self.barriertype(self.N, timeout=0.3)
916n/a def f():
917n/a i = barrier.wait()
918n/a if i == self.N // 2:
919n/a # One thread is later than the default timeout of 0.3s.
920n/a time.sleep(1.0)
921n/a self.assertRaises(threading.BrokenBarrierError, barrier.wait)
922n/a self.run_threads(f)
923n/a
924n/a def test_single_thread(self):
925n/a b = self.barriertype(1)
926n/a b.wait()
927n/a b.wait()