ยปCore Development>Code coverage>Lib/bsddb/test/test_dbshelve.py

Python code coverage for Lib/bsddb/test/test_dbshelve.py

#countcontent
1n/a"""
2n/aTestCases for checking dbShelve objects.
3n/a"""
4n/a
5n/aimport os, string, sys
6n/aimport random
7n/aimport unittest
8n/a
9n/a
10n/afrom test_all import db, dbshelve, test_support, verbose, \
11n/a get_new_environment_path, get_new_database_path
12n/a
13n/a
14n/a
15n/aif sys.version_info < (2, 4) :
16n/a from sets import Set as set
17n/a
18n/a
19n/a
20n/a#----------------------------------------------------------------------
21n/a
22n/a# We want the objects to be comparable so we can test dbshelve.values
23n/a# later on.
24n/aclass DataClass:
25n/a def __init__(self):
26n/a self.value = random.random()
27n/a
28n/a def __repr__(self) : # For Python 3.0 comparison
29n/a return "DataClass %f" %self.value
30n/a
31n/a def __cmp__(self, other): # For Python 2.x comparison
32n/a return cmp(self.value, other)
33n/a
34n/a
35n/aclass DBShelveTestCase(unittest.TestCase):
36n/a if sys.version_info < (2, 4):
37n/a def assertTrue(self, expr, msg=None):
38n/a return self.failUnless(expr,msg=msg)
39n/a
40n/a if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
41n/a (sys.version_info < (3, 2))) :
42n/a def assertIn(self, a, b, msg=None) :
43n/a return self.assertTrue(a in b, msg=msg)
44n/a
45n/a
46n/a def setUp(self):
47n/a if sys.version_info[0] >= 3 :
48n/a from test_all import do_proxy_db_py3k
49n/a self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
50n/a self.filename = get_new_database_path()
51n/a self.do_open()
52n/a
53n/a def tearDown(self):
54n/a if sys.version_info[0] >= 3 :
55n/a from test_all import do_proxy_db_py3k
56n/a do_proxy_db_py3k(self._flag_proxy_db_py3k)
57n/a self.do_close()
58n/a test_support.unlink(self.filename)
59n/a
60n/a def mk(self, key):
61n/a """Turn key into an appropriate key type for this db"""
62n/a # override in child class for RECNO
63n/a if sys.version_info[0] < 3 :
64n/a return key
65n/a else :
66n/a return bytes(key, "iso8859-1") # 8 bits
67n/a
68n/a def populateDB(self, d):
69n/a for x in string.letters:
70n/a d[self.mk('S' + x)] = 10 * x # add a string
71n/a d[self.mk('I' + x)] = ord(x) # add an integer
72n/a d[self.mk('L' + x)] = [x] * 10 # add a list
73n/a
74n/a inst = DataClass() # add an instance
75n/a inst.S = 10 * x
76n/a inst.I = ord(x)
77n/a inst.L = [x] * 10
78n/a d[self.mk('O' + x)] = inst
79n/a
80n/a
81n/a # overridable in derived classes to affect how the shelf is created/opened
82n/a def do_open(self):
83n/a self.d = dbshelve.open(self.filename)
84n/a
85n/a # and closed...
86n/a def do_close(self):
87n/a self.d.close()
88n/a
89n/a
90n/a
91n/a def test01_basics(self):
92n/a if verbose:
93n/a print '\n', '-=' * 30
94n/a print "Running %s.test01_basics..." % self.__class__.__name__
95n/a
96n/a self.populateDB(self.d)
97n/a self.d.sync()
98n/a self.do_close()
99n/a self.do_open()
100n/a d = self.d
101n/a
102n/a l = len(d)
103n/a k = d.keys()
104n/a s = d.stat()
105n/a f = d.fd()
106n/a
107n/a if verbose:
108n/a print "length:", l
109n/a print "keys:", k
110n/a print "stats:", s
111n/a
112n/a self.assertEqual(0, d.has_key(self.mk('bad key')))
113n/a self.assertEqual(1, d.has_key(self.mk('IA')))
114n/a self.assertEqual(1, d.has_key(self.mk('OA')))
115n/a
116n/a d.delete(self.mk('IA'))
117n/a del d[self.mk('OA')]
118n/a self.assertEqual(0, d.has_key(self.mk('IA')))
119n/a self.assertEqual(0, d.has_key(self.mk('OA')))
120n/a self.assertEqual(len(d), l-2)
121n/a
122n/a values = []
123n/a for key in d.keys():
124n/a value = d[key]
125n/a values.append(value)
126n/a if verbose:
127n/a print "%s: %s" % (key, value)
128n/a self.checkrec(key, value)
129n/a
130n/a dbvalues = d.values()
131n/a self.assertEqual(len(dbvalues), len(d.keys()))
132n/a if sys.version_info < (2, 6) :
133n/a values.sort()
134n/a dbvalues.sort()
135n/a self.assertEqual(values, dbvalues)
136n/a else : # XXX: Convert all to strings. Please, improve
137n/a values.sort(key=lambda x : str(x))
138n/a dbvalues.sort(key=lambda x : str(x))
139n/a self.assertEqual(repr(values), repr(dbvalues))
140n/a
141n/a items = d.items()
142n/a self.assertEqual(len(items), len(values))
143n/a
144n/a for key, value in items:
145n/a self.checkrec(key, value)
146n/a
147n/a self.assertEqual(d.get(self.mk('bad key')), None)
148n/a self.assertEqual(d.get(self.mk('bad key'), None), None)
149n/a self.assertEqual(d.get(self.mk('bad key'), 'a string'), 'a string')
150n/a self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3])
151n/a
152n/a d.set_get_returns_none(0)
153n/a self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key'))
154n/a d.set_get_returns_none(1)
155n/a
156n/a d.put(self.mk('new key'), 'new data')
157n/a self.assertEqual(d.get(self.mk('new key')), 'new data')
158n/a self.assertEqual(d[self.mk('new key')], 'new data')
159n/a
160n/a
161n/a
162n/a def test02_cursors(self):
163n/a if verbose:
164n/a print '\n', '-=' * 30
165n/a print "Running %s.test02_cursors..." % self.__class__.__name__
166n/a
167n/a self.populateDB(self.d)
168n/a d = self.d
169n/a
170n/a count = 0
171n/a c = d.cursor()
172n/a rec = c.first()
173n/a while rec is not None:
174n/a count = count + 1
175n/a if verbose:
176n/a print rec
177n/a key, value = rec
178n/a self.checkrec(key, value)
179n/a # Hack to avoid conversion by 2to3 tool
180n/a rec = getattr(c, "next")()
181n/a del c
182n/a
183n/a self.assertEqual(count, len(d))
184n/a
185n/a count = 0
186n/a c = d.cursor()
187n/a rec = c.last()
188n/a while rec is not None:
189n/a count = count + 1
190n/a if verbose:
191n/a print rec
192n/a key, value = rec
193n/a self.checkrec(key, value)
194n/a rec = c.prev()
195n/a
196n/a self.assertEqual(count, len(d))
197n/a
198n/a c.set(self.mk('SS'))
199n/a key, value = c.current()
200n/a self.checkrec(key, value)
201n/a del c
202n/a
203n/a
204n/a def test03_append(self):
205n/a # NOTE: this is overridden in RECNO subclass, don't change its name.
206n/a if verbose:
207n/a print '\n', '-=' * 30
208n/a print "Running %s.test03_append..." % self.__class__.__name__
209n/a
210n/a self.assertRaises(dbshelve.DBShelveError,
211n/a self.d.append, 'unit test was here')
212n/a
213n/a
214n/a def test04_iterable(self) :
215n/a self.populateDB(self.d)
216n/a d = self.d
217n/a keys = d.keys()
218n/a keyset = set(keys)
219n/a self.assertEqual(len(keyset), len(keys))
220n/a
221n/a for key in d :
222n/a self.assertIn(key, keyset)
223n/a keyset.remove(key)
224n/a self.assertEqual(len(keyset), 0)
225n/a
226n/a def checkrec(self, key, value):
227n/a # override this in a subclass if the key type is different
228n/a
229n/a if sys.version_info[0] >= 3 :
230n/a if isinstance(key, bytes) :
231n/a key = key.decode("iso8859-1") # 8 bits
232n/a
233n/a x = key[1]
234n/a if key[0] == 'S':
235n/a self.assertEqual(type(value), str)
236n/a self.assertEqual(value, 10 * x)
237n/a
238n/a elif key[0] == 'I':
239n/a self.assertEqual(type(value), int)
240n/a self.assertEqual(value, ord(x))
241n/a
242n/a elif key[0] == 'L':
243n/a self.assertEqual(type(value), list)
244n/a self.assertEqual(value, [x] * 10)
245n/a
246n/a elif key[0] == 'O':
247n/a if sys.version_info[0] < 3 :
248n/a from types import InstanceType
249n/a self.assertEqual(type(value), InstanceType)
250n/a else :
251n/a self.assertEqual(type(value), DataClass)
252n/a
253n/a self.assertEqual(value.S, 10 * x)
254n/a self.assertEqual(value.I, ord(x))
255n/a self.assertEqual(value.L, [x] * 10)
256n/a
257n/a else:
258n/a self.assert_(0, 'Unknown key type, fix the test')
259n/a
260n/a#----------------------------------------------------------------------
261n/a
262n/aclass BasicShelveTestCase(DBShelveTestCase):
263n/a def do_open(self):
264n/a self.d = dbshelve.DBShelf()
265n/a self.d.open(self.filename, self.dbtype, self.dbflags)
266n/a
267n/a def do_close(self):
268n/a self.d.close()
269n/a
270n/a
271n/aclass BTreeShelveTestCase(BasicShelveTestCase):
272n/a dbtype = db.DB_BTREE
273n/a dbflags = db.DB_CREATE
274n/a
275n/a
276n/aclass HashShelveTestCase(BasicShelveTestCase):
277n/a dbtype = db.DB_HASH
278n/a dbflags = db.DB_CREATE
279n/a
280n/a
281n/aclass ThreadBTreeShelveTestCase(BasicShelveTestCase):
282n/a dbtype = db.DB_BTREE
283n/a dbflags = db.DB_CREATE | db.DB_THREAD
284n/a
285n/a
286n/aclass ThreadHashShelveTestCase(BasicShelveTestCase):
287n/a dbtype = db.DB_HASH
288n/a dbflags = db.DB_CREATE | db.DB_THREAD
289n/a
290n/a
291n/a#----------------------------------------------------------------------
292n/a
293n/aclass BasicEnvShelveTestCase(DBShelveTestCase):
294n/a def do_open(self):
295n/a self.env = db.DBEnv()
296n/a self.env.open(self.homeDir,
297n/a self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
298n/a
299n/a self.filename = os.path.split(self.filename)[1]
300n/a self.d = dbshelve.DBShelf(self.env)
301n/a self.d.open(self.filename, self.dbtype, self.dbflags)
302n/a
303n/a
304n/a def do_close(self):
305n/a self.d.close()
306n/a self.env.close()
307n/a
308n/a
309n/a def setUp(self) :
310n/a self.homeDir = get_new_environment_path()
311n/a DBShelveTestCase.setUp(self)
312n/a
313n/a def tearDown(self):
314n/a if sys.version_info[0] >= 3 :
315n/a from test_all import do_proxy_db_py3k
316n/a do_proxy_db_py3k(self._flag_proxy_db_py3k)
317n/a self.do_close()
318n/a test_support.rmtree(self.homeDir)
319n/a
320n/a
321n/aclass EnvBTreeShelveTestCase(BasicEnvShelveTestCase):
322n/a envflags = 0
323n/a dbtype = db.DB_BTREE
324n/a dbflags = db.DB_CREATE
325n/a
326n/a
327n/aclass EnvHashShelveTestCase(BasicEnvShelveTestCase):
328n/a envflags = 0
329n/a dbtype = db.DB_HASH
330n/a dbflags = db.DB_CREATE
331n/a
332n/a
333n/aclass EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase):
334n/a envflags = db.DB_THREAD
335n/a dbtype = db.DB_BTREE
336n/a dbflags = db.DB_CREATE | db.DB_THREAD
337n/a
338n/a
339n/aclass EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
340n/a envflags = db.DB_THREAD
341n/a dbtype = db.DB_HASH
342n/a dbflags = db.DB_CREATE | db.DB_THREAD
343n/a
344n/a
345n/a#----------------------------------------------------------------------
346n/a# test cases for a DBShelf in a RECNO DB.
347n/a
348n/aclass RecNoShelveTestCase(BasicShelveTestCase):
349n/a dbtype = db.DB_RECNO
350n/a dbflags = db.DB_CREATE
351n/a
352n/a def setUp(self):
353n/a BasicShelveTestCase.setUp(self)
354n/a
355n/a # pool to assign integer key values out of
356n/a self.key_pool = list(range(1, 5000))
357n/a self.key_map = {} # map string keys to the number we gave them
358n/a self.intkey_map = {} # reverse map of above
359n/a
360n/a def mk(self, key):
361n/a if key not in self.key_map:
362n/a self.key_map[key] = self.key_pool.pop(0)
363n/a self.intkey_map[self.key_map[key]] = key
364n/a return self.key_map[key]
365n/a
366n/a def checkrec(self, intkey, value):
367n/a key = self.intkey_map[intkey]
368n/a BasicShelveTestCase.checkrec(self, key, value)
369n/a
370n/a def test03_append(self):
371n/a if verbose:
372n/a print '\n', '-=' * 30
373n/a print "Running %s.test03_append..." % self.__class__.__name__
374n/a
375n/a self.d[1] = 'spam'
376n/a self.d[5] = 'eggs'
377n/a self.assertEqual(6, self.d.append('spam'))
378n/a self.assertEqual(7, self.d.append('baked beans'))
379n/a self.assertEqual('spam', self.d.get(6))
380n/a self.assertEqual('spam', self.d.get(1))
381n/a self.assertEqual('baked beans', self.d.get(7))
382n/a self.assertEqual('eggs', self.d.get(5))
383n/a
384n/a
385n/a#----------------------------------------------------------------------
386n/a
387n/adef test_suite():
388n/a suite = unittest.TestSuite()
389n/a
390n/a suite.addTest(unittest.makeSuite(DBShelveTestCase))
391n/a suite.addTest(unittest.makeSuite(BTreeShelveTestCase))
392n/a suite.addTest(unittest.makeSuite(HashShelveTestCase))
393n/a suite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase))
394n/a suite.addTest(unittest.makeSuite(ThreadHashShelveTestCase))
395n/a suite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase))
396n/a suite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
397n/a suite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
398n/a suite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))
399n/a suite.addTest(unittest.makeSuite(RecNoShelveTestCase))
400n/a
401n/a return suite
402n/a
403n/a
404n/aif __name__ == '__main__':
405n/a unittest.main(defaultTest='test_suite')