ยปCore Development>Code coverage>Lib/bsddb/test/test_distributed_transactions.py

Python code coverage for Lib/bsddb/test/test_distributed_transactions.py

#countcontent
1n/a"""TestCases for distributed transactions.
2n/a"""
3n/a
4n/aimport os
5n/aimport unittest
6n/a
7n/afrom test_all import db, test_support, get_new_environment_path, \
8n/a get_new_database_path
9n/a
10n/atry :
11n/a a=set()
12n/aexcept : # Python 2.3
13n/a from sets import Set as set
14n/aelse :
15n/a del a
16n/a
17n/afrom test_all import verbose
18n/a
19n/a#----------------------------------------------------------------------
20n/a
21n/aclass DBTxn_distributed(unittest.TestCase):
22n/a num_txns=1234
23n/a nosync=True
24n/a must_open_db=False
25n/a def _create_env(self, must_open_db) :
26n/a self.dbenv = db.DBEnv()
27n/a self.dbenv.set_tx_max(self.num_txns)
28n/a self.dbenv.set_lk_max_lockers(self.num_txns*2)
29n/a self.dbenv.set_lk_max_locks(self.num_txns*2)
30n/a self.dbenv.set_lk_max_objects(self.num_txns*2)
31n/a if self.nosync :
32n/a self.dbenv.set_flags(db.DB_TXN_NOSYNC,True)
33n/a self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD |
34n/a db.DB_RECOVER |
35n/a db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
36n/a db.DB_INIT_LOCK, 0666)
37n/a self.db = db.DB(self.dbenv)
38n/a self.db.set_re_len(db.DB_GID_SIZE)
39n/a if must_open_db :
40n/a if db.version() >= (4,2) :
41n/a txn=self.dbenv.txn_begin()
42n/a self.db.open(self.filename,
43n/a db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666,
44n/a txn=txn)
45n/a txn.commit()
46n/a else :
47n/a self.db.open(self.filename,
48n/a db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666)
49n/a
50n/a def setUp(self) :
51n/a self.homeDir = get_new_environment_path()
52n/a self.filename = "test"
53n/a return self._create_env(must_open_db=True)
54n/a
55n/a def _destroy_env(self):
56n/a if self.nosync or (db.version()[:2] == (4,6)): # Known bug
57n/a self.dbenv.log_flush()
58n/a self.db.close()
59n/a self.dbenv.close()
60n/a
61n/a def tearDown(self):
62n/a self._destroy_env()
63n/a test_support.rmtree(self.homeDir)
64n/a
65n/a def _recreate_env(self,must_open_db) :
66n/a self._destroy_env()
67n/a self._create_env(must_open_db)
68n/a
69n/a def test01_distributed_transactions(self) :
70n/a txns=set()
71n/a adapt = lambda x : x
72n/a import sys
73n/a if sys.version_info[0] >= 3 :
74n/a adapt = lambda x : bytes(x, "ascii")
75n/a # Create transactions, "prepare" them, and
76n/a # let them be garbage collected.
77n/a for i in xrange(self.num_txns) :
78n/a txn = self.dbenv.txn_begin()
79n/a gid = "%%%dd" %db.DB_GID_SIZE
80n/a gid = adapt(gid %i)
81n/a self.db.put(i, gid, txn=txn, flags=db.DB_APPEND)
82n/a txns.add(gid)
83n/a txn.prepare(gid)
84n/a del txn
85n/a
86n/a self._recreate_env(self.must_open_db)
87n/a
88n/a # Get "to be recovered" transactions but
89n/a # let them be garbage collected.
90n/a recovered_txns=self.dbenv.txn_recover()
91n/a self.assertEquals(self.num_txns,len(recovered_txns))
92n/a for gid,txn in recovered_txns :
93n/a self.assert_(gid in txns)
94n/a del txn
95n/a del recovered_txns
96n/a
97n/a self._recreate_env(self.must_open_db)
98n/a
99n/a # Get "to be recovered" transactions. Commit, abort and
100n/a # discard them.
101n/a recovered_txns=self.dbenv.txn_recover()
102n/a self.assertEquals(self.num_txns,len(recovered_txns))
103n/a discard_txns=set()
104n/a committed_txns=set()
105n/a state=0
106n/a for gid,txn in recovered_txns :
107n/a if state==0 or state==1:
108n/a committed_txns.add(gid)
109n/a txn.commit()
110n/a elif state==2 :
111n/a txn.abort()
112n/a elif state==3 :
113n/a txn.discard()
114n/a discard_txns.add(gid)
115n/a state=-1
116n/a state+=1
117n/a del txn
118n/a del recovered_txns
119n/a
120n/a self._recreate_env(self.must_open_db)
121n/a
122n/a # Verify the discarded transactions are still
123n/a # around, and dispose them.
124n/a recovered_txns=self.dbenv.txn_recover()
125n/a self.assertEquals(len(discard_txns),len(recovered_txns))
126n/a for gid,txn in recovered_txns :
127n/a txn.abort()
128n/a del txn
129n/a del recovered_txns
130n/a
131n/a self._recreate_env(must_open_db=True)
132n/a
133n/a # Be sure there are not pending transactions.
134n/a # Check also database size.
135n/a recovered_txns=self.dbenv.txn_recover()
136n/a self.assert_(len(recovered_txns)==0)
137n/a self.assertEquals(len(committed_txns),self.db.stat()["nkeys"])
138n/a
139n/aclass DBTxn_distributedSYNC(DBTxn_distributed):
140n/a nosync=False
141n/a
142n/aclass DBTxn_distributed_must_open_db(DBTxn_distributed):
143n/a must_open_db=True
144n/a
145n/aclass DBTxn_distributedSYNC_must_open_db(DBTxn_distributed):
146n/a nosync=False
147n/a must_open_db=True
148n/a
149n/a#----------------------------------------------------------------------
150n/a
151n/adef test_suite():
152n/a suite = unittest.TestSuite()
153n/a if db.version() >= (4,5) :
154n/a suite.addTest(unittest.makeSuite(DBTxn_distributed))
155n/a suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC))
156n/a if db.version() >= (4,6) :
157n/a suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db))
158n/a suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db))
159n/a return suite
160n/a
161n/a
162n/aif __name__ == '__main__':
163n/a unittest.main(defaultTest='test_suite')