ยปCore Development>Code coverage>Lib/bsddb/test/test_queue.py

Python code coverage for Lib/bsddb/test/test_queue.py

#countcontent
1n/a"""
2n/aTestCases for exercising a Queue DB.
3n/a"""
4n/a
5n/aimport os, string
6n/afrom pprint import pprint
7n/aimport unittest
8n/a
9n/afrom test_all import db, verbose, get_new_database_path
10n/a
11n/a#----------------------------------------------------------------------
12n/a
13n/aclass SimpleQueueTestCase(unittest.TestCase):
14n/a def setUp(self):
15n/a self.filename = get_new_database_path()
16n/a
17n/a def tearDown(self):
18n/a try:
19n/a os.remove(self.filename)
20n/a except os.error:
21n/a pass
22n/a
23n/a
24n/a def test01_basic(self):
25n/a # Basic Queue tests using the deprecated DBCursor.consume method.
26n/a
27n/a if verbose:
28n/a print '\n', '-=' * 30
29n/a print "Running %s.test01_basic..." % self.__class__.__name__
30n/a
31n/a d = db.DB()
32n/a d.set_re_len(40) # Queues must be fixed length
33n/a d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
34n/a
35n/a if verbose:
36n/a print "before appends" + '-' * 30
37n/a pprint(d.stat())
38n/a
39n/a for x in string.letters:
40n/a d.append(x * 40)
41n/a
42n/a self.assertEqual(len(d), len(string.letters))
43n/a
44n/a d.put(100, "some more data")
45n/a d.put(101, "and some more ")
46n/a d.put(75, "out of order")
47n/a d.put(1, "replacement data")
48n/a
49n/a self.assertEqual(len(d), len(string.letters)+3)
50n/a
51n/a if verbose:
52n/a print "before close" + '-' * 30
53n/a pprint(d.stat())
54n/a
55n/a d.close()
56n/a del d
57n/a d = db.DB()
58n/a d.open(self.filename)
59n/a
60n/a if verbose:
61n/a print "after open" + '-' * 30
62n/a pprint(d.stat())
63n/a
64n/a # Test "txn" as a positional parameter
65n/a d.append("one more", None)
66n/a # Test "txn" as a keyword parameter
67n/a d.append("another one", txn=None)
68n/a
69n/a c = d.cursor()
70n/a
71n/a if verbose:
72n/a print "after append" + '-' * 30
73n/a pprint(d.stat())
74n/a
75n/a rec = c.consume()
76n/a while rec:
77n/a if verbose:
78n/a print rec
79n/a rec = c.consume()
80n/a c.close()
81n/a
82n/a if verbose:
83n/a print "after consume loop" + '-' * 30
84n/a pprint(d.stat())
85n/a
86n/a self.assertEqual(len(d), 0, \
87n/a "if you see this message then you need to rebuild " \
88n/a "Berkeley DB 3.1.17 with the patch in patches/qam_stat.diff")
89n/a
90n/a d.close()
91n/a
92n/a
93n/a
94n/a def test02_basicPost32(self):
95n/a # Basic Queue tests using the new DB.consume method in DB 3.2+
96n/a # (No cursor needed)
97n/a
98n/a if verbose:
99n/a print '\n', '-=' * 30
100n/a print "Running %s.test02_basicPost32..." % self.__class__.__name__
101n/a
102n/a if db.version() < (3, 2, 0):
103n/a if verbose:
104n/a print "Test not run, DB not new enough..."
105n/a return
106n/a
107n/a d = db.DB()
108n/a d.set_re_len(40) # Queues must be fixed length
109n/a d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
110n/a
111n/a if verbose:
112n/a print "before appends" + '-' * 30
113n/a pprint(d.stat())
114n/a
115n/a for x in string.letters:
116n/a d.append(x * 40)
117n/a
118n/a self.assertEqual(len(d), len(string.letters))
119n/a
120n/a d.put(100, "some more data")
121n/a d.put(101, "and some more ")
122n/a d.put(75, "out of order")
123n/a d.put(1, "replacement data")
124n/a
125n/a self.assertEqual(len(d), len(string.letters)+3)
126n/a
127n/a if verbose:
128n/a print "before close" + '-' * 30
129n/a pprint(d.stat())
130n/a
131n/a d.close()
132n/a del d
133n/a d = db.DB()
134n/a d.open(self.filename)
135n/a #d.set_get_returns_none(true)
136n/a
137n/a if verbose:
138n/a print "after open" + '-' * 30
139n/a pprint(d.stat())
140n/a
141n/a d.append("one more")
142n/a
143n/a if verbose:
144n/a print "after append" + '-' * 30
145n/a pprint(d.stat())
146n/a
147n/a rec = d.consume()
148n/a while rec:
149n/a if verbose:
150n/a print rec
151n/a rec = d.consume()
152n/a
153n/a if verbose:
154n/a print "after consume loop" + '-' * 30
155n/a pprint(d.stat())
156n/a
157n/a d.close()
158n/a
159n/a
160n/a
161n/a#----------------------------------------------------------------------
162n/a
163n/adef test_suite():
164n/a return unittest.makeSuite(SimpleQueueTestCase)
165n/a
166n/a
167n/aif __name__ == '__main__':
168n/a unittest.main(defaultTest='test_suite')