ยปCore Development>Code coverage>Lib/bsddb/test/test_lock.py

Python code coverage for Lib/bsddb/test/test_lock.py

#countcontent
1n/a"""
2n/aTestCases for testing the locking sub-system.
3n/a"""
4n/a
5n/aimport time
6n/a
7n/aimport unittest
8n/afrom test_all import db, test_support, verbose, have_threads, \
9n/a get_new_environment_path, get_new_database_path
10n/a
11n/aif have_threads :
12n/a from threading import Thread
13n/a import sys
14n/a if sys.version_info[0] < 3 :
15n/a from threading import currentThread
16n/a else :
17n/a from threading import current_thread as currentThread
18n/a
19n/a#----------------------------------------------------------------------
20n/a
21n/aclass LockingTestCase(unittest.TestCase):
22n/a import sys
23n/a if sys.version_info < (2, 4) :
24n/a def assertTrue(self, expr, msg=None):
25n/a self.failUnless(expr,msg=msg)
26n/a
27n/a
28n/a def setUp(self):
29n/a self.homeDir = get_new_environment_path()
30n/a self.env = db.DBEnv()
31n/a self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
32n/a db.DB_INIT_LOCK | db.DB_CREATE)
33n/a
34n/a
35n/a def tearDown(self):
36n/a self.env.close()
37n/a test_support.rmtree(self.homeDir)
38n/a
39n/a
40n/a def test01_simple(self):
41n/a if verbose:
42n/a print '\n', '-=' * 30
43n/a print "Running %s.test01_simple..." % self.__class__.__name__
44n/a
45n/a anID = self.env.lock_id()
46n/a if verbose:
47n/a print "locker ID: %s" % anID
48n/a lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
49n/a if verbose:
50n/a print "Aquired lock: %s" % lock
51n/a self.env.lock_put(lock)
52n/a if verbose:
53n/a print "Released lock: %s" % lock
54n/a self.env.lock_id_free(anID)
55n/a
56n/a
57n/a def test02_threaded(self):
58n/a if verbose:
59n/a print '\n', '-=' * 30
60n/a print "Running %s.test02_threaded..." % self.__class__.__name__
61n/a
62n/a threads = []
63n/a threads.append(Thread(target = self.theThread,
64n/a args=(db.DB_LOCK_WRITE,)))
65n/a threads.append(Thread(target = self.theThread,
66n/a args=(db.DB_LOCK_READ,)))
67n/a threads.append(Thread(target = self.theThread,
68n/a args=(db.DB_LOCK_READ,)))
69n/a threads.append(Thread(target = self.theThread,
70n/a args=(db.DB_LOCK_WRITE,)))
71n/a threads.append(Thread(target = self.theThread,
72n/a args=(db.DB_LOCK_READ,)))
73n/a threads.append(Thread(target = self.theThread,
74n/a args=(db.DB_LOCK_READ,)))
75n/a threads.append(Thread(target = self.theThread,
76n/a args=(db.DB_LOCK_WRITE,)))
77n/a threads.append(Thread(target = self.theThread,
78n/a args=(db.DB_LOCK_WRITE,)))
79n/a threads.append(Thread(target = self.theThread,
80n/a args=(db.DB_LOCK_WRITE,)))
81n/a
82n/a for t in threads:
83n/a import sys
84n/a if sys.version_info[0] < 3 :
85n/a t.setDaemon(True)
86n/a else :
87n/a t.daemon = True
88n/a t.start()
89n/a for t in threads:
90n/a t.join()
91n/a
92n/a if db.version() >= (4, 2) :
93n/a def test03_lock_timeout(self):
94n/a self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
95n/a self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)
96n/a self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
97n/a self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)
98n/a self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
99n/a self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)
100n/a self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
101n/a self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)
102n/a
103n/a def test04_lock_timeout2(self):
104n/a self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
105n/a self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
106n/a self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
107n/a self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
108n/a
109n/a def deadlock_detection() :
110n/a while not deadlock_detection.end :
111n/a deadlock_detection.count = \
112n/a self.env.lock_detect(db.DB_LOCK_EXPIRE)
113n/a if deadlock_detection.count :
114n/a while not deadlock_detection.end :
115n/a pass
116n/a break
117n/a time.sleep(0.01)
118n/a
119n/a deadlock_detection.end=False
120n/a deadlock_detection.count=0
121n/a t=Thread(target=deadlock_detection)
122n/a import sys
123n/a if sys.version_info[0] < 3 :
124n/a t.setDaemon(True)
125n/a else :
126n/a t.daemon = True
127n/a t.start()
128n/a self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
129n/a anID = self.env.lock_id()
130n/a anID2 = self.env.lock_id()
131n/a self.assertNotEqual(anID, anID2)
132n/a lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
133n/a start_time=time.time()
134n/a self.assertRaises(db.DBLockNotGrantedError,
135n/a self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
136n/a end_time=time.time()
137n/a deadlock_detection.end=True
138n/a # Floating point rounding
139n/a self.assertTrue((end_time-start_time) >= 0.0999)
140n/a self.env.lock_put(lock)
141n/a t.join()
142n/a
143n/a self.env.lock_id_free(anID)
144n/a self.env.lock_id_free(anID2)
145n/a
146n/a if db.version() >= (4,6):
147n/a self.assertTrue(deadlock_detection.count>0)
148n/a
149n/a def theThread(self, lockType):
150n/a import sys
151n/a if sys.version_info[0] < 3 :
152n/a name = currentThread().getName()
153n/a else :
154n/a name = currentThread().name
155n/a
156n/a if lockType == db.DB_LOCK_WRITE:
157n/a lt = "write"
158n/a else:
159n/a lt = "read"
160n/a
161n/a anID = self.env.lock_id()
162n/a if verbose:
163n/a print "%s: locker ID: %s" % (name, anID)
164n/a
165n/a for i in xrange(1000) :
166n/a lock = self.env.lock_get(anID, "some locked thing", lockType)
167n/a if verbose:
168n/a print "%s: Aquired %s lock: %s" % (name, lt, lock)
169n/a
170n/a self.env.lock_put(lock)
171n/a if verbose:
172n/a print "%s: Released %s lock: %s" % (name, lt, lock)
173n/a
174n/a self.env.lock_id_free(anID)
175n/a
176n/a
177n/a#----------------------------------------------------------------------
178n/a
179n/adef test_suite():
180n/a suite = unittest.TestSuite()
181n/a
182n/a if have_threads:
183n/a suite.addTest(unittest.makeSuite(LockingTestCase))
184n/a else:
185n/a suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
186n/a
187n/a return suite
188n/a
189n/a
190n/aif __name__ == '__main__':
191n/a unittest.main(defaultTest='test_suite')