| 1 | n/a | import unittest |
|---|
| 2 | n/a | import os |
|---|
| 3 | n/a | |
|---|
| 4 | n/a | from test_all import db, test_support, get_new_environment_path, get_new_database_path |
|---|
| 5 | n/a | |
|---|
| 6 | n/a | |
|---|
| 7 | n/a | class DBSequenceTest(unittest.TestCase): |
|---|
| 8 | n/a | import sys |
|---|
| 9 | n/a | if sys.version_info < (2, 4) : |
|---|
| 10 | n/a | def assertTrue(self, expr, msg=None): |
|---|
| 11 | n/a | self.failUnless(expr,msg=msg) |
|---|
| 12 | n/a | |
|---|
| 13 | n/a | def setUp(self): |
|---|
| 14 | n/a | self.int_32_max = 0x100000000 |
|---|
| 15 | n/a | self.homeDir = get_new_environment_path() |
|---|
| 16 | n/a | self.filename = "test" |
|---|
| 17 | n/a | |
|---|
| 18 | n/a | self.dbenv = db.DBEnv() |
|---|
| 19 | n/a | self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666) |
|---|
| 20 | n/a | self.d = db.DB(self.dbenv) |
|---|
| 21 | n/a | self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666) |
|---|
| 22 | n/a | |
|---|
| 23 | n/a | def tearDown(self): |
|---|
| 24 | n/a | if hasattr(self, 'seq'): |
|---|
| 25 | n/a | self.seq.close() |
|---|
| 26 | n/a | del self.seq |
|---|
| 27 | n/a | if hasattr(self, 'd'): |
|---|
| 28 | n/a | self.d.close() |
|---|
| 29 | n/a | del self.d |
|---|
| 30 | n/a | if hasattr(self, 'dbenv'): |
|---|
| 31 | n/a | self.dbenv.close() |
|---|
| 32 | n/a | del self.dbenv |
|---|
| 33 | n/a | |
|---|
| 34 | n/a | test_support.rmtree(self.homeDir) |
|---|
| 35 | n/a | |
|---|
| 36 | n/a | def test_get(self): |
|---|
| 37 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 38 | n/a | start_value = 10 * self.int_32_max |
|---|
| 39 | n/a | self.assertEqual(0xA00000000, start_value) |
|---|
| 40 | n/a | self.assertEquals(None, self.seq.initial_value(start_value)) |
|---|
| 41 | n/a | self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE)) |
|---|
| 42 | n/a | self.assertEquals(start_value, self.seq.get(5)) |
|---|
| 43 | n/a | self.assertEquals(start_value + 5, self.seq.get()) |
|---|
| 44 | n/a | |
|---|
| 45 | n/a | def test_remove(self): |
|---|
| 46 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 47 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 48 | n/a | self.assertEquals(None, self.seq.remove(txn=None, flags=0)) |
|---|
| 49 | n/a | del self.seq |
|---|
| 50 | n/a | |
|---|
| 51 | n/a | def test_get_key(self): |
|---|
| 52 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 53 | n/a | key = 'foo' |
|---|
| 54 | n/a | self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE)) |
|---|
| 55 | n/a | self.assertEquals(key, self.seq.get_key()) |
|---|
| 56 | n/a | |
|---|
| 57 | n/a | def test_get_dbp(self): |
|---|
| 58 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 59 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 60 | n/a | self.assertEquals(self.d, self.seq.get_dbp()) |
|---|
| 61 | n/a | |
|---|
| 62 | n/a | def test_cachesize(self): |
|---|
| 63 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 64 | n/a | cashe_size = 10 |
|---|
| 65 | n/a | self.assertEquals(None, self.seq.set_cachesize(cashe_size)) |
|---|
| 66 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 67 | n/a | self.assertEquals(cashe_size, self.seq.get_cachesize()) |
|---|
| 68 | n/a | |
|---|
| 69 | n/a | def test_flags(self): |
|---|
| 70 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 71 | n/a | flag = db.DB_SEQ_WRAP; |
|---|
| 72 | n/a | self.assertEquals(None, self.seq.set_flags(flag)) |
|---|
| 73 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 74 | n/a | self.assertEquals(flag, self.seq.get_flags() & flag) |
|---|
| 75 | n/a | |
|---|
| 76 | n/a | def test_range(self): |
|---|
| 77 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 78 | n/a | seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1) |
|---|
| 79 | n/a | self.assertEquals(None, self.seq.set_range(seq_range)) |
|---|
| 80 | n/a | self.seq.initial_value(seq_range[0]) |
|---|
| 81 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 82 | n/a | self.assertEquals(seq_range, self.seq.get_range()) |
|---|
| 83 | n/a | |
|---|
| 84 | n/a | def test_stat(self): |
|---|
| 85 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 86 | n/a | self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
|---|
| 87 | n/a | stat = self.seq.stat() |
|---|
| 88 | n/a | for param in ('nowait', 'min', 'max', 'value', 'current', |
|---|
| 89 | n/a | 'flags', 'cache_size', 'last_value', 'wait'): |
|---|
| 90 | n/a | self.assertTrue(param in stat, "parameter %s isn't in stat info" % param) |
|---|
| 91 | n/a | |
|---|
| 92 | n/a | if db.version() >= (4,7) : |
|---|
| 93 | n/a | # This code checks a crash solved in Berkeley DB 4.7 |
|---|
| 94 | n/a | def test_stat_crash(self) : |
|---|
| 95 | n/a | d=db.DB() |
|---|
| 96 | n/a | d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM |
|---|
| 97 | n/a | seq = db.DBSequence(d, flags=0) |
|---|
| 98 | n/a | |
|---|
| 99 | n/a | self.assertRaises(db.DBNotFoundError, seq.open, |
|---|
| 100 | n/a | key='id', txn=None, flags=0) |
|---|
| 101 | n/a | |
|---|
| 102 | n/a | self.assertRaises(db.DBInvalidArgError, seq.stat) |
|---|
| 103 | n/a | |
|---|
| 104 | n/a | d.close() |
|---|
| 105 | n/a | |
|---|
| 106 | n/a | def test_64bits(self) : |
|---|
| 107 | n/a | # We don't use both extremes because they are problematic |
|---|
| 108 | n/a | value_plus=(1L<<63)-2 |
|---|
| 109 | n/a | self.assertEquals(9223372036854775806L,value_plus) |
|---|
| 110 | n/a | value_minus=(-1L<<63)+1 # Two complement |
|---|
| 111 | n/a | self.assertEquals(-9223372036854775807L,value_minus) |
|---|
| 112 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 113 | n/a | self.assertEquals(None, self.seq.initial_value(value_plus-1)) |
|---|
| 114 | n/a | self.assertEquals(None, self.seq.open(key='id', txn=None, |
|---|
| 115 | n/a | flags=db.DB_CREATE)) |
|---|
| 116 | n/a | self.assertEquals(value_plus-1, self.seq.get(1)) |
|---|
| 117 | n/a | self.assertEquals(value_plus, self.seq.get(1)) |
|---|
| 118 | n/a | |
|---|
| 119 | n/a | self.seq.remove(txn=None, flags=0) |
|---|
| 120 | n/a | |
|---|
| 121 | n/a | self.seq = db.DBSequence(self.d, flags=0) |
|---|
| 122 | n/a | self.assertEquals(None, self.seq.initial_value(value_minus)) |
|---|
| 123 | n/a | self.assertEquals(None, self.seq.open(key='id', txn=None, |
|---|
| 124 | n/a | flags=db.DB_CREATE)) |
|---|
| 125 | n/a | self.assertEquals(value_minus, self.seq.get(1)) |
|---|
| 126 | n/a | self.assertEquals(value_minus+1, self.seq.get(1)) |
|---|
| 127 | n/a | |
|---|
| 128 | n/a | def test_multiple_close(self): |
|---|
| 129 | n/a | self.seq = db.DBSequence(self.d) |
|---|
| 130 | n/a | self.seq.close() # You can close a Sequence multiple times |
|---|
| 131 | n/a | self.seq.close() |
|---|
| 132 | n/a | self.seq.close() |
|---|
| 133 | n/a | |
|---|
| 134 | n/a | def test_suite(): |
|---|
| 135 | n/a | suite = unittest.TestSuite() |
|---|
| 136 | n/a | if db.version() >= (4,3): |
|---|
| 137 | n/a | suite.addTest(unittest.makeSuite(DBSequenceTest)) |
|---|
| 138 | n/a | return suite |
|---|
| 139 | n/a | |
|---|
| 140 | n/a | |
|---|
| 141 | n/a | if __name__ == '__main__': |
|---|
| 142 | n/a | unittest.main(defaultTest='test_suite') |
|---|