ยปCore Development>Code coverage>Lib/bsddb/test/test_early_close.py

Python code coverage for Lib/bsddb/test/test_early_close.py

#countcontent
1n/a"""TestCases for checking that it does not segfault when a DBEnv object
2n/ais closed before its DB objects.
3n/a"""
4n/a
5n/aimport os, sys
6n/aimport unittest
7n/a
8n/afrom test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
9n/a
10n/a# We're going to get warnings in this module about trying to close the db when
11n/a# its env is already closed. Let's just ignore those.
12n/atry:
13n/a import warnings
14n/aexcept ImportError:
15n/a pass
16n/aelse:
17n/a warnings.filterwarnings('ignore',
18n/a message='DB could not be closed in',
19n/a category=RuntimeWarning)
20n/a
21n/a
22n/a#----------------------------------------------------------------------
23n/a
24n/aclass DBEnvClosedEarlyCrash(unittest.TestCase):
25n/a def setUp(self):
26n/a self.homeDir = get_new_environment_path()
27n/a self.filename = "test"
28n/a
29n/a def tearDown(self):
30n/a test_support.rmtree(self.homeDir)
31n/a
32n/a def test01_close_dbenv_before_db(self):
33n/a dbenv = db.DBEnv()
34n/a dbenv.open(self.homeDir,
35n/a db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
36n/a 0666)
37n/a
38n/a d = db.DB(dbenv)
39n/a d2 = db.DB(dbenv)
40n/a d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
41n/a
42n/a self.assertRaises(db.DBNoSuchFileError, d2.open,
43n/a self.filename+"2", db.DB_BTREE, db.DB_THREAD, 0666)
44n/a
45n/a d.put("test","this is a test")
46n/a self.assertEqual(d.get("test"), "this is a test", "put!=get")
47n/a dbenv.close() # This "close" should close the child db handle also
48n/a self.assertRaises(db.DBError, d.get, "test")
49n/a
50n/a def test02_close_dbenv_before_dbcursor(self):
51n/a dbenv = db.DBEnv()
52n/a dbenv.open(self.homeDir,
53n/a db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
54n/a 0666)
55n/a
56n/a d = db.DB(dbenv)
57n/a d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
58n/a
59n/a d.put("test","this is a test")
60n/a d.put("test2","another test")
61n/a d.put("test3","another one")
62n/a self.assertEqual(d.get("test"), "this is a test", "put!=get")
63n/a c=d.cursor()
64n/a c.first()
65n/a c.next()
66n/a d.close() # This "close" should close the child db handle also
67n/a # db.close should close the child cursor
68n/a self.assertRaises(db.DBError,c.next)
69n/a
70n/a d = db.DB(dbenv)
71n/a d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
72n/a c=d.cursor()
73n/a c.first()
74n/a c.next()
75n/a dbenv.close()
76n/a # The "close" should close the child db handle also, with cursors
77n/a self.assertRaises(db.DBError, c.next)
78n/a
79n/a def test03_close_db_before_dbcursor_without_env(self):
80n/a import os.path
81n/a path=os.path.join(self.homeDir,self.filename)
82n/a d = db.DB()
83n/a d.open(path, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
84n/a
85n/a d.put("test","this is a test")
86n/a d.put("test2","another test")
87n/a d.put("test3","another one")
88n/a self.assertEqual(d.get("test"), "this is a test", "put!=get")
89n/a c=d.cursor()
90n/a c.first()
91n/a c.next()
92n/a d.close()
93n/a # The "close" should close the child db handle also
94n/a self.assertRaises(db.DBError, c.next)
95n/a
96n/a def test04_close_massive(self):
97n/a dbenv = db.DBEnv()
98n/a dbenv.open(self.homeDir,
99n/a db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
100n/a 0666)
101n/a
102n/a dbs=[db.DB(dbenv) for i in xrange(16)]
103n/a cursors=[]
104n/a for i in dbs :
105n/a i.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
106n/a
107n/a dbs[10].put("test","this is a test")
108n/a dbs[10].put("test2","another test")
109n/a dbs[10].put("test3","another one")
110n/a self.assertEqual(dbs[4].get("test"), "this is a test", "put!=get")
111n/a
112n/a for i in dbs :
113n/a cursors.extend([i.cursor() for j in xrange(32)])
114n/a
115n/a for i in dbs[::3] :
116n/a i.close()
117n/a for i in cursors[::3] :
118n/a i.close()
119n/a
120n/a # Check for missing exception in DB! (after DB close)
121n/a self.assertRaises(db.DBError, dbs[9].get, "test")
122n/a
123n/a # Check for missing exception in DBCursor! (after DB close)
124n/a self.assertRaises(db.DBError, cursors[101].first)
125n/a
126n/a cursors[80].first()
127n/a cursors[80].next()
128n/a dbenv.close() # This "close" should close the child db handle also
129n/a # Check for missing exception! (after DBEnv close)
130n/a self.assertRaises(db.DBError, cursors[80].next)
131n/a
132n/a def test05_close_dbenv_delete_db_success(self):
133n/a dbenv = db.DBEnv()
134n/a dbenv.open(self.homeDir,
135n/a db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL,
136n/a 0666)
137n/a
138n/a d = db.DB(dbenv)
139n/a d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
140n/a
141n/a dbenv.close() # This "close" should close the child db handle also
142n/a
143n/a del d
144n/a try:
145n/a import gc
146n/a except ImportError:
147n/a gc = None
148n/a if gc:
149n/a # force d.__del__ [DB_dealloc] to be called
150n/a gc.collect()
151n/a
152n/a def test06_close_txn_before_dup_cursor(self) :
153n/a dbenv = db.DBEnv()
154n/a dbenv.open(self.homeDir,db.DB_INIT_TXN | db.DB_INIT_MPOOL |
155n/a db.DB_INIT_LOG | db.DB_CREATE)
156n/a d = db.DB(dbenv)
157n/a txn = dbenv.txn_begin()
158n/a d.open(self.filename, dbtype = db.DB_HASH, flags = db.DB_CREATE,
159n/a txn=txn)
160n/a d.put("XXX", "yyy", txn=txn)
161n/a txn.commit()
162n/a txn = dbenv.txn_begin()
163n/a c1 = d.cursor(txn)
164n/a c2 = c1.dup()
165n/a self.assertEquals(("XXX", "yyy"), c1.first())
166n/a
167n/a # Not interested in warnings about implicit close.
168n/a import warnings
169n/a if sys.version_info < (2, 6) :
170n/a # Completely resetting the warning state is
171n/a # problematic with python >=2.6 with -3 (py3k warning),
172n/a # because some stdlib modules selectively ignores warnings.
173n/a warnings.simplefilter("ignore")
174n/a txn.commit()
175n/a warnings.resetwarnings()
176n/a else :
177n/a # When we drop support for python 2.3 and 2.4
178n/a # we could use: (in 2.5 we need a __future__ statement)
179n/a #
180n/a # with warnings.catch_warnings():
181n/a # warnings.simplefilter("ignore")
182n/a # txn.commit()
183n/a #
184n/a # We can not use "with" as is, because it would be invalid syntax
185n/a # in python 2.3, 2.4 and (with no __future__) 2.5.
186n/a # Here we simulate "with" following PEP 343 :
187n/a w = warnings.catch_warnings()
188n/a w.__enter__()
189n/a try :
190n/a warnings.simplefilter("ignore")
191n/a txn.commit()
192n/a finally :
193n/a w.__exit__()
194n/a
195n/a self.assertRaises(db.DBCursorClosedError, c2.first)
196n/a
197n/a if db.version() > (4,3,0) :
198n/a def test07_close_db_before_sequence(self):
199n/a import os.path
200n/a path=os.path.join(self.homeDir,self.filename)
201n/a d = db.DB()
202n/a d.open(path, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
203n/a dbs=db.DBSequence(d)
204n/a d.close() # This "close" should close the child DBSequence also
205n/a dbs.close() # If not closed, core dump (in Berkeley DB 4.6.*)
206n/a
207n/a#----------------------------------------------------------------------
208n/a
209n/adef test_suite():
210n/a suite = unittest.TestSuite()
211n/a suite.addTest(unittest.makeSuite(DBEnvClosedEarlyCrash))
212n/a return suite
213n/a
214n/a
215n/aif __name__ == '__main__':
216n/a unittest.main(defaultTest='test_suite')