| 1 | n/a | """Miscellaneous bsddb module test cases |
|---|
| 2 | n/a | """ |
|---|
| 3 | n/a | |
|---|
| 4 | n/a | import os, sys |
|---|
| 5 | n/a | import unittest |
|---|
| 6 | n/a | |
|---|
| 7 | n/a | from test_all import db, dbshelve, hashopen, test_support, get_new_environment_path, get_new_database_path |
|---|
| 8 | n/a | |
|---|
| 9 | n/a | #---------------------------------------------------------------------- |
|---|
| 10 | n/a | |
|---|
| 11 | n/a | class MiscTestCase(unittest.TestCase): |
|---|
| 12 | n/a | if sys.version_info < (2, 4) : |
|---|
| 13 | n/a | def assertTrue(self, expr, msg=None): |
|---|
| 14 | n/a | self.failUnless(expr, msg=msg) |
|---|
| 15 | n/a | |
|---|
| 16 | n/a | def assertFalse(self, expr, msg=None): |
|---|
| 17 | n/a | self.failIf(expr, msg=msg) |
|---|
| 18 | n/a | |
|---|
| 19 | n/a | def setUp(self): |
|---|
| 20 | n/a | self.filename = get_new_database_path() |
|---|
| 21 | n/a | self.homeDir = get_new_environment_path() |
|---|
| 22 | n/a | |
|---|
| 23 | n/a | def tearDown(self): |
|---|
| 24 | n/a | test_support.unlink(self.filename) |
|---|
| 25 | n/a | test_support.rmtree(self.homeDir) |
|---|
| 26 | n/a | |
|---|
| 27 | n/a | def test01_badpointer(self): |
|---|
| 28 | n/a | dbs = dbshelve.open(self.filename) |
|---|
| 29 | n/a | dbs.close() |
|---|
| 30 | n/a | self.assertRaises(db.DBError, dbs.get, "foo") |
|---|
| 31 | n/a | |
|---|
| 32 | n/a | def test02_db_home(self): |
|---|
| 33 | n/a | env = db.DBEnv() |
|---|
| 34 | n/a | # check for crash fixed when db_home is used before open() |
|---|
| 35 | n/a | self.assert_(env.db_home is None) |
|---|
| 36 | n/a | env.open(self.homeDir, db.DB_CREATE) |
|---|
| 37 | n/a | if sys.version_info[0] < 3 : |
|---|
| 38 | n/a | self.assertEqual(self.homeDir, env.db_home) |
|---|
| 39 | n/a | else : |
|---|
| 40 | n/a | self.assertEqual(bytes(self.homeDir, "ascii"), env.db_home) |
|---|
| 41 | n/a | |
|---|
| 42 | n/a | def test03_repr_closed_db(self): |
|---|
| 43 | n/a | db = hashopen(self.filename) |
|---|
| 44 | n/a | db.close() |
|---|
| 45 | n/a | rp = repr(db) |
|---|
| 46 | n/a | self.assertEquals(rp, "{}") |
|---|
| 47 | n/a | |
|---|
| 48 | n/a | def test04_repr_db(self) : |
|---|
| 49 | n/a | db = hashopen(self.filename) |
|---|
| 50 | n/a | d = {} |
|---|
| 51 | n/a | for i in xrange(100) : |
|---|
| 52 | n/a | db[repr(i)] = repr(100*i) |
|---|
| 53 | n/a | d[repr(i)] = repr(100*i) |
|---|
| 54 | n/a | db.close() |
|---|
| 55 | n/a | db = hashopen(self.filename) |
|---|
| 56 | n/a | rp = repr(db) |
|---|
| 57 | n/a | self.assertEquals(rp, repr(d)) |
|---|
| 58 | n/a | db.close() |
|---|
| 59 | n/a | |
|---|
| 60 | n/a | # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900 |
|---|
| 61 | n/a | # |
|---|
| 62 | n/a | # See the bug report for details. |
|---|
| 63 | n/a | # |
|---|
| 64 | n/a | # The problem was that make_key_dbt() was not allocating a copy of |
|---|
| 65 | n/a | # string keys but FREE_DBT() was always being told to free it when the |
|---|
| 66 | n/a | # database was opened with DB_THREAD. |
|---|
| 67 | n/a | def test05_double_free_make_key_dbt(self): |
|---|
| 68 | n/a | try: |
|---|
| 69 | n/a | db1 = db.DB() |
|---|
| 70 | n/a | db1.open(self.filename, None, db.DB_BTREE, |
|---|
| 71 | n/a | db.DB_CREATE | db.DB_THREAD) |
|---|
| 72 | n/a | |
|---|
| 73 | n/a | curs = db1.cursor() |
|---|
| 74 | n/a | t = curs.get("/foo", db.DB_SET) |
|---|
| 75 | n/a | # double free happened during exit from DBC_get |
|---|
| 76 | n/a | finally: |
|---|
| 77 | n/a | db1.close() |
|---|
| 78 | n/a | test_support.unlink(self.filename) |
|---|
| 79 | n/a | |
|---|
| 80 | n/a | def test06_key_with_null_bytes(self): |
|---|
| 81 | n/a | try: |
|---|
| 82 | n/a | db1 = db.DB() |
|---|
| 83 | n/a | db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE) |
|---|
| 84 | n/a | db1['a'] = 'eh?' |
|---|
| 85 | n/a | db1['a\x00'] = 'eh zed.' |
|---|
| 86 | n/a | db1['a\x00a'] = 'eh zed eh?' |
|---|
| 87 | n/a | db1['aaa'] = 'eh eh eh!' |
|---|
| 88 | n/a | keys = db1.keys() |
|---|
| 89 | n/a | keys.sort() |
|---|
| 90 | n/a | self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys) |
|---|
| 91 | n/a | self.assertEqual(db1['a'], 'eh?') |
|---|
| 92 | n/a | self.assertEqual(db1['a\x00'], 'eh zed.') |
|---|
| 93 | n/a | self.assertEqual(db1['a\x00a'], 'eh zed eh?') |
|---|
| 94 | n/a | self.assertEqual(db1['aaa'], 'eh eh eh!') |
|---|
| 95 | n/a | finally: |
|---|
| 96 | n/a | db1.close() |
|---|
| 97 | n/a | test_support.unlink(self.filename) |
|---|
| 98 | n/a | |
|---|
| 99 | n/a | def test07_DB_set_flags_persists(self): |
|---|
| 100 | n/a | if db.version() < (4,2): |
|---|
| 101 | n/a | # The get_flags API required for this to work is only available |
|---|
| 102 | n/a | # in Berkeley DB >= 4.2 |
|---|
| 103 | n/a | return |
|---|
| 104 | n/a | try: |
|---|
| 105 | n/a | db1 = db.DB() |
|---|
| 106 | n/a | db1.set_flags(db.DB_DUPSORT) |
|---|
| 107 | n/a | db1.open(self.filename, db.DB_HASH, db.DB_CREATE) |
|---|
| 108 | n/a | db1['a'] = 'eh' |
|---|
| 109 | n/a | db1['a'] = 'A' |
|---|
| 110 | n/a | self.assertEqual([('a', 'A')], db1.items()) |
|---|
| 111 | n/a | db1.put('a', 'Aa') |
|---|
| 112 | n/a | self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items()) |
|---|
| 113 | n/a | db1.close() |
|---|
| 114 | n/a | db1 = db.DB() |
|---|
| 115 | n/a | # no set_flags call, we're testing that it reads and obeys |
|---|
| 116 | n/a | # the flags on open. |
|---|
| 117 | n/a | db1.open(self.filename, db.DB_HASH) |
|---|
| 118 | n/a | self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items()) |
|---|
| 119 | n/a | # if it read the flags right this will replace all values |
|---|
| 120 | n/a | # for key 'a' instead of adding a new one. (as a dict should) |
|---|
| 121 | n/a | db1['a'] = 'new A' |
|---|
| 122 | n/a | self.assertEqual([('a', 'new A')], db1.items()) |
|---|
| 123 | n/a | finally: |
|---|
| 124 | n/a | db1.close() |
|---|
| 125 | n/a | test_support.unlink(self.filename) |
|---|
| 126 | n/a | |
|---|
| 127 | n/a | |
|---|
| 128 | n/a | def test08_ExceptionTypes(self) : |
|---|
| 129 | n/a | self.assertTrue(issubclass(db.DBError, Exception)) |
|---|
| 130 | n/a | for i, j in db.__dict__.items() : |
|---|
| 131 | n/a | if i.startswith("DB") and i.endswith("Error") : |
|---|
| 132 | n/a | self.assertTrue(issubclass(j, db.DBError), msg=i) |
|---|
| 133 | n/a | if i not in ("DBKeyEmptyError", "DBNotFoundError") : |
|---|
| 134 | n/a | self.assertFalse(issubclass(j, KeyError), msg=i) |
|---|
| 135 | n/a | |
|---|
| 136 | n/a | # This two exceptions have two bases |
|---|
| 137 | n/a | self.assertTrue(issubclass(db.DBKeyEmptyError, KeyError)) |
|---|
| 138 | n/a | self.assertTrue(issubclass(db.DBNotFoundError, KeyError)) |
|---|
| 139 | n/a | |
|---|
| 140 | n/a | |
|---|
| 141 | n/a | #---------------------------------------------------------------------- |
|---|
| 142 | n/a | |
|---|
| 143 | n/a | |
|---|
| 144 | n/a | def test_suite(): |
|---|
| 145 | n/a | return unittest.makeSuite(MiscTestCase) |
|---|
| 146 | n/a | |
|---|
| 147 | n/a | |
|---|
| 148 | n/a | if __name__ == '__main__': |
|---|
| 149 | n/a | unittest.main(defaultTest='test_suite') |
|---|