ยปCore Development>Code coverage>Lib/bsddb/test/test_recno.py

Python code coverage for Lib/bsddb/test/test_recno.py

#countcontent
1n/a"""TestCases for exercising a Recno DB.
2n/a"""
3n/a
4n/aimport os, sys
5n/aimport errno
6n/afrom pprint import pprint
7n/aimport unittest
8n/a
9n/afrom test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
10n/a
11n/aletters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
12n/a
13n/a
14n/a#----------------------------------------------------------------------
15n/a
16n/aclass SimpleRecnoTestCase(unittest.TestCase):
17n/a if sys.version_info < (2, 4) :
18n/a def assertFalse(self, expr, msg=None) :
19n/a return self.failIf(expr,msg=msg)
20n/a def assertTrue(self, expr, msg=None) :
21n/a return self.assert_(expr, msg=msg)
22n/a
23n/a if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
24n/a (sys.version_info < (3, 2))) :
25n/a def assertIsInstance(self, obj, datatype, msg=None) :
26n/a return self.assertEqual(type(obj), datatype, msg=msg)
27n/a def assertGreaterEqual(self, a, b, msg=None) :
28n/a return self.assertTrue(a>=b, msg=msg)
29n/a
30n/a
31n/a def setUp(self):
32n/a self.filename = get_new_database_path()
33n/a self.homeDir = None
34n/a
35n/a def tearDown(self):
36n/a test_support.unlink(self.filename)
37n/a if self.homeDir:
38n/a test_support.rmtree(self.homeDir)
39n/a
40n/a def test01_basic(self):
41n/a d = db.DB()
42n/a
43n/a get_returns_none = d.set_get_returns_none(2)
44n/a d.set_get_returns_none(get_returns_none)
45n/a
46n/a d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
47n/a
48n/a for x in letters:
49n/a recno = d.append(x * 60)
50n/a self.assertIsInstance(recno, int)
51n/a self.assertGreaterEqual(recno, 1)
52n/a if verbose:
53n/a print recno,
54n/a
55n/a if verbose: print
56n/a
57n/a stat = d.stat()
58n/a if verbose:
59n/a pprint(stat)
60n/a
61n/a for recno in range(1, len(d)+1):
62n/a data = d[recno]
63n/a if verbose:
64n/a print data
65n/a
66n/a self.assertIsInstance(data, str)
67n/a self.assertEqual(data, d.get(recno))
68n/a
69n/a try:
70n/a data = d[0] # This should raise a KeyError!?!?!
71n/a except db.DBInvalidArgError, val:
72n/a if sys.version_info < (2, 6) :
73n/a self.assertEqual(val[0], db.EINVAL)
74n/a else :
75n/a self.assertEqual(val.args[0], db.EINVAL)
76n/a if verbose: print val
77n/a else:
78n/a self.fail("expected exception")
79n/a
80n/a # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
81n/a try:
82n/a d.has_key(0)
83n/a except db.DBError, val:
84n/a pass
85n/a else:
86n/a self.fail("has_key did not raise a proper exception")
87n/a
88n/a try:
89n/a data = d[100]
90n/a except KeyError:
91n/a pass
92n/a else:
93n/a self.fail("expected exception")
94n/a
95n/a try:
96n/a data = d.get(100)
97n/a except db.DBNotFoundError, val:
98n/a if get_returns_none:
99n/a self.fail("unexpected exception")
100n/a else:
101n/a self.assertEqual(data, None)
102n/a
103n/a keys = d.keys()
104n/a if verbose:
105n/a print keys
106n/a self.assertIsInstance(keys, list)
107n/a self.assertIsInstance(keys[0], int)
108n/a self.assertEqual(len(keys), len(d))
109n/a
110n/a items = d.items()
111n/a if verbose:
112n/a pprint(items)
113n/a self.assertIsInstance(items, list)
114n/a self.assertIsInstance(items[0], tuple)
115n/a self.assertEqual(len(items[0]), 2)
116n/a self.assertIsInstance(items[0][0], int)
117n/a self.assertIsInstance(items[0][1], str)
118n/a self.assertEqual(len(items), len(d))
119n/a
120n/a self.assertTrue(d.has_key(25))
121n/a
122n/a del d[25]
123n/a self.assertFalse(d.has_key(25))
124n/a
125n/a d.delete(13)
126n/a self.assertFalse(d.has_key(13))
127n/a
128n/a data = d.get_both(26, "z" * 60)
129n/a self.assertEqual(data, "z" * 60, 'was %r' % data)
130n/a if verbose:
131n/a print data
132n/a
133n/a fd = d.fd()
134n/a if verbose:
135n/a print fd
136n/a
137n/a c = d.cursor()
138n/a rec = c.first()
139n/a while rec:
140n/a if verbose:
141n/a print rec
142n/a rec = c.next()
143n/a
144n/a c.set(50)
145n/a rec = c.current()
146n/a if verbose:
147n/a print rec
148n/a
149n/a c.put(-1, "a replacement record", db.DB_CURRENT)
150n/a
151n/a c.set(50)
152n/a rec = c.current()
153n/a self.assertEqual(rec, (50, "a replacement record"))
154n/a if verbose:
155n/a print rec
156n/a
157n/a rec = c.set_range(30)
158n/a if verbose:
159n/a print rec
160n/a
161n/a # test that non-existent key lookups work (and that
162n/a # DBC_set_range doesn't have a memleak under valgrind)
163n/a rec = c.set_range(999999)
164n/a self.assertEqual(rec, None)
165n/a if verbose:
166n/a print rec
167n/a
168n/a c.close()
169n/a d.close()
170n/a
171n/a d = db.DB()
172n/a d.open(self.filename)
173n/a c = d.cursor()
174n/a
175n/a # put a record beyond the consecutive end of the recno's
176n/a d[100] = "way out there"
177n/a self.assertEqual(d[100], "way out there")
178n/a
179n/a try:
180n/a data = d[99]
181n/a except KeyError:
182n/a pass
183n/a else:
184n/a self.fail("expected exception")
185n/a
186n/a try:
187n/a d.get(99)
188n/a except db.DBKeyEmptyError, val:
189n/a if get_returns_none:
190n/a self.fail("unexpected DBKeyEmptyError exception")
191n/a else:
192n/a if sys.version_info < (2, 6) :
193n/a self.assertEqual(val[0], db.DB_KEYEMPTY)
194n/a else :
195n/a self.assertEqual(val.args[0], db.DB_KEYEMPTY)
196n/a if verbose: print val
197n/a else:
198n/a if not get_returns_none:
199n/a self.fail("expected exception")
200n/a
201n/a rec = c.set(40)
202n/a while rec:
203n/a if verbose:
204n/a print rec
205n/a rec = c.next()
206n/a
207n/a c.close()
208n/a d.close()
209n/a
210n/a def test02_WithSource(self):
211n/a """
212n/a A Recno file that is given a "backing source file" is essentially a
213n/a simple ASCII file. Normally each record is delimited by \n and so is
214n/a just a line in the file, but you can set a different record delimiter
215n/a if needed.
216n/a """
217n/a homeDir = get_new_environment_path()
218n/a self.homeDir = homeDir
219n/a source = os.path.join(homeDir, 'test_recno.txt')
220n/a if not os.path.isdir(homeDir):
221n/a os.mkdir(homeDir)
222n/a f = open(source, 'w') # create the file
223n/a f.close()
224n/a
225n/a d = db.DB()
226n/a # This is the default value, just checking if both int
227n/a d.set_re_delim(0x0A)
228n/a d.set_re_delim('\n') # and char can be used...
229n/a d.set_re_source(source)
230n/a d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
231n/a
232n/a data = "The quick brown fox jumped over the lazy dog".split()
233n/a for datum in data:
234n/a d.append(datum)
235n/a d.sync()
236n/a d.close()
237n/a
238n/a # get the text from the backing source
239n/a text = open(source, 'r').read()
240n/a text = text.strip()
241n/a if verbose:
242n/a print text
243n/a print data
244n/a print text.split('\n')
245n/a
246n/a self.assertEqual(text.split('\n'), data)
247n/a
248n/a # open as a DB again
249n/a d = db.DB()
250n/a d.set_re_source(source)
251n/a d.open(self.filename, db.DB_RECNO)
252n/a
253n/a d[3] = 'reddish-brown'
254n/a d[8] = 'comatose'
255n/a
256n/a d.sync()
257n/a d.close()
258n/a
259n/a text = open(source, 'r').read()
260n/a text = text.strip()
261n/a if verbose:
262n/a print text
263n/a print text.split('\n')
264n/a
265n/a self.assertEqual(text.split('\n'),
266n/a "The quick reddish-brown fox jumped over the comatose dog".split())
267n/a
268n/a def test03_FixedLength(self):
269n/a d = db.DB()
270n/a d.set_re_len(40) # fixed length records, 40 bytes long
271n/a d.set_re_pad('-') # sets the pad character...
272n/a d.set_re_pad(45) # ...test both int and char
273n/a d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
274n/a
275n/a for x in letters:
276n/a d.append(x * 35) # These will be padded
277n/a
278n/a d.append('.' * 40) # this one will be exact
279n/a
280n/a try: # this one will fail
281n/a d.append('bad' * 20)
282n/a except db.DBInvalidArgError, val:
283n/a if sys.version_info < (2, 6) :
284n/a self.assertEqual(val[0], db.EINVAL)
285n/a else :
286n/a self.assertEqual(val.args[0], db.EINVAL)
287n/a if verbose: print val
288n/a else:
289n/a self.fail("expected exception")
290n/a
291n/a c = d.cursor()
292n/a rec = c.first()
293n/a while rec:
294n/a if verbose:
295n/a print rec
296n/a rec = c.next()
297n/a
298n/a c.close()
299n/a d.close()
300n/a
301n/a
302n/a#----------------------------------------------------------------------
303n/a
304n/a
305n/adef test_suite():
306n/a return unittest.makeSuite(SimpleRecnoTestCase)
307n/a
308n/a
309n/aif __name__ == '__main__':
310n/a unittest.main(defaultTest='test_suite')