ยปCore Development>Code coverage>Lib/bsddb/test/test_thread.py

Python code coverage for Lib/bsddb/test/test_thread.py

#countcontent
1n/a"""TestCases for multi-threaded access to a DB.
2n/a"""
3n/a
4n/aimport os
5n/aimport sys
6n/aimport time
7n/aimport errno
8n/afrom random import random
9n/a
10n/aDASH = '-'
11n/a
12n/atry:
13n/a WindowsError
14n/aexcept NameError:
15n/a class WindowsError(Exception):
16n/a pass
17n/a
18n/aimport unittest
19n/afrom test_all import db, dbutils, test_support, verbose, have_threads, \
20n/a get_new_environment_path, get_new_database_path
21n/a
22n/aif have_threads :
23n/a from threading import Thread
24n/a if sys.version_info[0] < 3 :
25n/a from threading import currentThread
26n/a else :
27n/a from threading import current_thread as currentThread
28n/a
29n/a
30n/a#----------------------------------------------------------------------
31n/a
32n/aclass BaseThreadedTestCase(unittest.TestCase):
33n/a dbtype = db.DB_UNKNOWN # must be set in derived class
34n/a dbopenflags = 0
35n/a dbsetflags = 0
36n/a envflags = 0
37n/a
38n/a if sys.version_info < (2, 4) :
39n/a def assertTrue(self, expr, msg=None):
40n/a self.failUnless(expr,msg=msg)
41n/a
42n/a def setUp(self):
43n/a if verbose:
44n/a dbutils._deadlock_VerboseFile = sys.stdout
45n/a
46n/a self.homeDir = get_new_environment_path()
47n/a self.env = db.DBEnv()
48n/a self.setEnvOpts()
49n/a self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
50n/a
51n/a self.filename = self.__class__.__name__ + '.db'
52n/a self.d = db.DB(self.env)
53n/a if self.dbsetflags:
54n/a self.d.set_flags(self.dbsetflags)
55n/a self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
56n/a
57n/a def tearDown(self):
58n/a self.d.close()
59n/a self.env.close()
60n/a test_support.rmtree(self.homeDir)
61n/a
62n/a def setEnvOpts(self):
63n/a pass
64n/a
65n/a def makeData(self, key):
66n/a return DASH.join([key] * 5)
67n/a
68n/a
69n/a#----------------------------------------------------------------------
70n/a
71n/a
72n/aclass ConcurrentDataStoreBase(BaseThreadedTestCase):
73n/a dbopenflags = db.DB_THREAD
74n/a envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
75n/a readers = 0 # derived class should set
76n/a writers = 0
77n/a records = 1000
78n/a
79n/a def test01_1WriterMultiReaders(self):
80n/a if verbose:
81n/a print '\n', '-=' * 30
82n/a print "Running %s.test01_1WriterMultiReaders..." % \
83n/a self.__class__.__name__
84n/a
85n/a keys=range(self.records)
86n/a import random
87n/a random.shuffle(keys)
88n/a records_per_writer=self.records//self.writers
89n/a readers_per_writer=self.readers//self.writers
90n/a self.assertEqual(self.records,self.writers*records_per_writer)
91n/a self.assertEqual(self.readers,self.writers*readers_per_writer)
92n/a self.assertTrue((records_per_writer%readers_per_writer)==0)
93n/a readers = []
94n/a
95n/a for x in xrange(self.readers):
96n/a rt = Thread(target = self.readerThread,
97n/a args = (self.d, x),
98n/a name = 'reader %d' % x,
99n/a )#verbose = verbose)
100n/a if sys.version_info[0] < 3 :
101n/a rt.setDaemon(True)
102n/a else :
103n/a rt.daemon = True
104n/a readers.append(rt)
105n/a
106n/a writers=[]
107n/a for x in xrange(self.writers):
108n/a a=keys[records_per_writer*x:records_per_writer*(x+1)]
109n/a a.sort() # Generate conflicts
110n/a b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
111n/a wt = Thread(target = self.writerThread,
112n/a args = (self.d, a, b),
113n/a name = 'writer %d' % x,
114n/a )#verbose = verbose)
115n/a writers.append(wt)
116n/a
117n/a for t in writers:
118n/a if sys.version_info[0] < 3 :
119n/a t.setDaemon(True)
120n/a else :
121n/a t.daemon = True
122n/a t.start()
123n/a
124n/a for t in writers:
125n/a t.join()
126n/a for t in readers:
127n/a t.join()
128n/a
129n/a def writerThread(self, d, keys, readers):
130n/a if sys.version_info[0] < 3 :
131n/a name = currentThread().getName()
132n/a else :
133n/a name = currentThread().name
134n/a
135n/a if verbose:
136n/a print "%s: creating records %d - %d" % (name, start, stop)
137n/a
138n/a count=len(keys)//len(readers)
139n/a count2=count
140n/a for x in keys :
141n/a key = '%04d' % x
142n/a dbutils.DeadlockWrap(d.put, key, self.makeData(key),
143n/a max_retries=12)
144n/a if verbose and x % 100 == 0:
145n/a print "%s: records %d - %d finished" % (name, start, x)
146n/a
147n/a count2-=1
148n/a if not count2 :
149n/a readers.pop().start()
150n/a count2=count
151n/a
152n/a if verbose:
153n/a print "%s: finished creating records" % name
154n/a
155n/a if verbose:
156n/a print "%s: thread finished" % name
157n/a
158n/a def readerThread(self, d, readerNum):
159n/a if sys.version_info[0] < 3 :
160n/a name = currentThread().getName()
161n/a else :
162n/a name = currentThread().name
163n/a
164n/a for i in xrange(5) :
165n/a c = d.cursor()
166n/a count = 0
167n/a rec = c.first()
168n/a while rec:
169n/a count += 1
170n/a key, data = rec
171n/a self.assertEqual(self.makeData(key), data)
172n/a rec = c.next()
173n/a if verbose:
174n/a print "%s: found %d records" % (name, count)
175n/a c.close()
176n/a
177n/a if verbose:
178n/a print "%s: thread finished" % name
179n/a
180n/a
181n/aclass BTreeConcurrentDataStore(ConcurrentDataStoreBase):
182n/a dbtype = db.DB_BTREE
183n/a writers = 2
184n/a readers = 10
185n/a records = 1000
186n/a
187n/a
188n/aclass HashConcurrentDataStore(ConcurrentDataStoreBase):
189n/a dbtype = db.DB_HASH
190n/a writers = 2
191n/a readers = 10
192n/a records = 1000
193n/a
194n/a
195n/a#----------------------------------------------------------------------
196n/a
197n/aclass SimpleThreadedBase(BaseThreadedTestCase):
198n/a dbopenflags = db.DB_THREAD
199n/a envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
200n/a readers = 10
201n/a writers = 2
202n/a records = 1000
203n/a
204n/a def setEnvOpts(self):
205n/a self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
206n/a
207n/a def test02_SimpleLocks(self):
208n/a if verbose:
209n/a print '\n', '-=' * 30
210n/a print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
211n/a
212n/a
213n/a keys=range(self.records)
214n/a import random
215n/a random.shuffle(keys)
216n/a records_per_writer=self.records//self.writers
217n/a readers_per_writer=self.readers//self.writers
218n/a self.assertEqual(self.records,self.writers*records_per_writer)
219n/a self.assertEqual(self.readers,self.writers*readers_per_writer)
220n/a self.assertTrue((records_per_writer%readers_per_writer)==0)
221n/a
222n/a readers = []
223n/a for x in xrange(self.readers):
224n/a rt = Thread(target = self.readerThread,
225n/a args = (self.d, x),
226n/a name = 'reader %d' % x,
227n/a )#verbose = verbose)
228n/a if sys.version_info[0] < 3 :
229n/a rt.setDaemon(True)
230n/a else :
231n/a rt.daemon = True
232n/a readers.append(rt)
233n/a
234n/a writers = []
235n/a for x in xrange(self.writers):
236n/a a=keys[records_per_writer*x:records_per_writer*(x+1)]
237n/a a.sort() # Generate conflicts
238n/a b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
239n/a wt = Thread(target = self.writerThread,
240n/a args = (self.d, a, b),
241n/a name = 'writer %d' % x,
242n/a )#verbose = verbose)
243n/a writers.append(wt)
244n/a
245n/a for t in writers:
246n/a if sys.version_info[0] < 3 :
247n/a t.setDaemon(True)
248n/a else :
249n/a t.daemon = True
250n/a t.start()
251n/a
252n/a for t in writers:
253n/a t.join()
254n/a for t in readers:
255n/a t.join()
256n/a
257n/a def writerThread(self, d, keys, readers):
258n/a if sys.version_info[0] < 3 :
259n/a name = currentThread().getName()
260n/a else :
261n/a name = currentThread().name
262n/a if verbose:
263n/a print "%s: creating records %d - %d" % (name, start, stop)
264n/a
265n/a count=len(keys)//len(readers)
266n/a count2=count
267n/a for x in keys :
268n/a key = '%04d' % x
269n/a dbutils.DeadlockWrap(d.put, key, self.makeData(key),
270n/a max_retries=12)
271n/a
272n/a if verbose and x % 100 == 0:
273n/a print "%s: records %d - %d finished" % (name, start, x)
274n/a
275n/a count2-=1
276n/a if not count2 :
277n/a readers.pop().start()
278n/a count2=count
279n/a
280n/a if verbose:
281n/a print "%s: thread finished" % name
282n/a
283n/a def readerThread(self, d, readerNum):
284n/a if sys.version_info[0] < 3 :
285n/a name = currentThread().getName()
286n/a else :
287n/a name = currentThread().name
288n/a
289n/a c = d.cursor()
290n/a count = 0
291n/a rec = dbutils.DeadlockWrap(c.first, max_retries=10)
292n/a while rec:
293n/a count += 1
294n/a key, data = rec
295n/a self.assertEqual(self.makeData(key), data)
296n/a rec = dbutils.DeadlockWrap(c.next, max_retries=10)
297n/a if verbose:
298n/a print "%s: found %d records" % (name, count)
299n/a c.close()
300n/a
301n/a if verbose:
302n/a print "%s: thread finished" % name
303n/a
304n/a
305n/aclass BTreeSimpleThreaded(SimpleThreadedBase):
306n/a dbtype = db.DB_BTREE
307n/a
308n/a
309n/aclass HashSimpleThreaded(SimpleThreadedBase):
310n/a dbtype = db.DB_HASH
311n/a
312n/a
313n/a#----------------------------------------------------------------------
314n/a
315n/a
316n/aclass ThreadedTransactionsBase(BaseThreadedTestCase):
317n/a dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
318n/a envflags = (db.DB_THREAD |
319n/a db.DB_INIT_MPOOL |
320n/a db.DB_INIT_LOCK |
321n/a db.DB_INIT_LOG |
322n/a db.DB_INIT_TXN
323n/a )
324n/a readers = 0
325n/a writers = 0
326n/a records = 2000
327n/a txnFlag = 0
328n/a
329n/a def setEnvOpts(self):
330n/a #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
331n/a pass
332n/a
333n/a def test03_ThreadedTransactions(self):
334n/a if verbose:
335n/a print '\n', '-=' * 30
336n/a print "Running %s.test03_ThreadedTransactions..." % \
337n/a self.__class__.__name__
338n/a
339n/a keys=range(self.records)
340n/a import random
341n/a random.shuffle(keys)
342n/a records_per_writer=self.records//self.writers
343n/a readers_per_writer=self.readers//self.writers
344n/a self.assertEqual(self.records,self.writers*records_per_writer)
345n/a self.assertEqual(self.readers,self.writers*readers_per_writer)
346n/a self.assertTrue((records_per_writer%readers_per_writer)==0)
347n/a
348n/a readers=[]
349n/a for x in xrange(self.readers):
350n/a rt = Thread(target = self.readerThread,
351n/a args = (self.d, x),
352n/a name = 'reader %d' % x,
353n/a )#verbose = verbose)
354n/a if sys.version_info[0] < 3 :
355n/a rt.setDaemon(True)
356n/a else :
357n/a rt.daemon = True
358n/a readers.append(rt)
359n/a
360n/a writers = []
361n/a for x in xrange(self.writers):
362n/a a=keys[records_per_writer*x:records_per_writer*(x+1)]
363n/a b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
364n/a wt = Thread(target = self.writerThread,
365n/a args = (self.d, a, b),
366n/a name = 'writer %d' % x,
367n/a )#verbose = verbose)
368n/a writers.append(wt)
369n/a
370n/a dt = Thread(target = self.deadlockThread)
371n/a if sys.version_info[0] < 3 :
372n/a dt.setDaemon(True)
373n/a else :
374n/a dt.daemon = True
375n/a dt.start()
376n/a
377n/a for t in writers:
378n/a if sys.version_info[0] < 3 :
379n/a t.setDaemon(True)
380n/a else :
381n/a t.daemon = True
382n/a t.start()
383n/a
384n/a for t in writers:
385n/a t.join()
386n/a for t in readers:
387n/a t.join()
388n/a
389n/a self.doLockDetect = False
390n/a dt.join()
391n/a
392n/a def writerThread(self, d, keys, readers):
393n/a if sys.version_info[0] < 3 :
394n/a name = currentThread().getName()
395n/a else :
396n/a name = currentThread().name
397n/a
398n/a count=len(keys)//len(readers)
399n/a while len(keys):
400n/a try:
401n/a txn = self.env.txn_begin(None, self.txnFlag)
402n/a keys2=keys[:count]
403n/a for x in keys2 :
404n/a key = '%04d' % x
405n/a d.put(key, self.makeData(key), txn)
406n/a if verbose and x % 100 == 0:
407n/a print "%s: records %d - %d finished" % (name, start, x)
408n/a txn.commit()
409n/a keys=keys[count:]
410n/a readers.pop().start()
411n/a except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
412n/a if verbose:
413n/a if sys.version_info < (2, 6) :
414n/a print "%s: Aborting transaction (%s)" % (name, val[1])
415n/a else :
416n/a print "%s: Aborting transaction (%s)" % (name,
417n/a val.args[1])
418n/a txn.abort()
419n/a
420n/a if verbose:
421n/a print "%s: thread finished" % name
422n/a
423n/a def readerThread(self, d, readerNum):
424n/a if sys.version_info[0] < 3 :
425n/a name = currentThread().getName()
426n/a else :
427n/a name = currentThread().name
428n/a
429n/a finished = False
430n/a while not finished:
431n/a try:
432n/a txn = self.env.txn_begin(None, self.txnFlag)
433n/a c = d.cursor(txn)
434n/a count = 0
435n/a rec = c.first()
436n/a while rec:
437n/a count += 1
438n/a key, data = rec
439n/a self.assertEqual(self.makeData(key), data)
440n/a rec = c.next()
441n/a if verbose: print "%s: found %d records" % (name, count)
442n/a c.close()
443n/a txn.commit()
444n/a finished = True
445n/a except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
446n/a if verbose:
447n/a if sys.version_info < (2, 6) :
448n/a print "%s: Aborting transaction (%s)" % (name, val[1])
449n/a else :
450n/a print "%s: Aborting transaction (%s)" % (name,
451n/a val.args[1])
452n/a c.close()
453n/a txn.abort()
454n/a
455n/a if verbose:
456n/a print "%s: thread finished" % name
457n/a
458n/a def deadlockThread(self):
459n/a self.doLockDetect = True
460n/a while self.doLockDetect:
461n/a time.sleep(0.05)
462n/a try:
463n/a aborted = self.env.lock_detect(
464n/a db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
465n/a if verbose and aborted:
466n/a print "deadlock: Aborted %d deadlocked transaction(s)" \
467n/a % aborted
468n/a except db.DBError:
469n/a pass
470n/a
471n/a
472n/aclass BTreeThreadedTransactions(ThreadedTransactionsBase):
473n/a dbtype = db.DB_BTREE
474n/a writers = 2
475n/a readers = 10
476n/a records = 1000
477n/a
478n/aclass HashThreadedTransactions(ThreadedTransactionsBase):
479n/a dbtype = db.DB_HASH
480n/a writers = 2
481n/a readers = 10
482n/a records = 1000
483n/a
484n/aclass BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
485n/a dbtype = db.DB_BTREE
486n/a writers = 2
487n/a readers = 10
488n/a records = 1000
489n/a txnFlag = db.DB_TXN_NOWAIT
490n/a
491n/aclass HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
492n/a dbtype = db.DB_HASH
493n/a writers = 2
494n/a readers = 10
495n/a records = 1000
496n/a txnFlag = db.DB_TXN_NOWAIT
497n/a
498n/a
499n/a#----------------------------------------------------------------------
500n/a
501n/adef test_suite():
502n/a suite = unittest.TestSuite()
503n/a
504n/a if have_threads:
505n/a suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
506n/a suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
507n/a suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
508n/a suite.addTest(unittest.makeSuite(HashSimpleThreaded))
509n/a suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
510n/a suite.addTest(unittest.makeSuite(HashThreadedTransactions))
511n/a suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
512n/a suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
513n/a
514n/a else:
515n/a print "Threads not available, skipping thread tests."
516n/a
517n/a return suite
518n/a
519n/a
520n/aif __name__ == '__main__':
521n/a unittest.main(defaultTest='test_suite')