ยปCore Development>Code coverage>Lib/bsddb/test/test_replication.py

Python code coverage for Lib/bsddb/test/test_replication.py

#countcontent
1n/a"""TestCases for distributed transactions.
2n/a"""
3n/a
4n/aimport os
5n/aimport time
6n/aimport unittest
7n/a
8n/afrom test_all import db, test_support, have_threads, verbose, \
9n/a get_new_environment_path, get_new_database_path
10n/a
11n/a
12n/a#----------------------------------------------------------------------
13n/a
14n/aclass DBReplication(unittest.TestCase) :
15n/a import sys
16n/a if sys.version_info < (2, 4) :
17n/a def assertTrue(self, expr, msg=None):
18n/a self.failUnless(expr,msg=msg)
19n/a
20n/a def setUp(self) :
21n/a self.homeDirMaster = get_new_environment_path()
22n/a self.homeDirClient = get_new_environment_path()
23n/a
24n/a self.dbenvMaster = db.DBEnv()
25n/a self.dbenvClient = db.DBEnv()
26n/a
27n/a # Must use "DB_THREAD" because the Replication Manager will
28n/a # be executed in other threads but will use the same environment.
29n/a # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
30n/a self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
31n/a | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
32n/a db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
33n/a self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
34n/a | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
35n/a db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
36n/a
37n/a self.confirmed_master=self.client_startupdone=False
38n/a def confirmed_master(a,b,c) :
39n/a if b==db.DB_EVENT_REP_MASTER :
40n/a self.confirmed_master=True
41n/a
42n/a def client_startupdone(a,b,c) :
43n/a if b==db.DB_EVENT_REP_STARTUPDONE :
44n/a self.client_startupdone=True
45n/a
46n/a self.dbenvMaster.set_event_notify(confirmed_master)
47n/a self.dbenvClient.set_event_notify(client_startupdone)
48n/a
49n/a #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
50n/a #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
51n/a #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
52n/a #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
53n/a
54n/a self.dbMaster = self.dbClient = None
55n/a
56n/a
57n/a def tearDown(self):
58n/a if self.dbClient :
59n/a self.dbClient.close()
60n/a if self.dbMaster :
61n/a self.dbMaster.close()
62n/a
63n/a # Here we assign dummy event handlers to allow GC of the test object.
64n/a # Since the dummy handler doesn't use any outer scope variable, it
65n/a # doesn't keep any reference to the test object.
66n/a def dummy(*args) :
67n/a pass
68n/a self.dbenvMaster.set_event_notify(dummy)
69n/a self.dbenvClient.set_event_notify(dummy)
70n/a
71n/a self.dbenvClient.close()
72n/a self.dbenvMaster.close()
73n/a test_support.rmtree(self.homeDirClient)
74n/a test_support.rmtree(self.homeDirMaster)
75n/a
76n/aclass DBReplicationManager(DBReplication) :
77n/a def test01_basic_replication(self) :
78n/a master_port = test_support.find_unused_port()
79n/a self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
80n/a client_port = test_support.find_unused_port()
81n/a self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
82n/a self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
83n/a self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
84n/a self.dbenvMaster.rep_set_nsites(2)
85n/a self.dbenvClient.rep_set_nsites(2)
86n/a self.dbenvMaster.rep_set_priority(10)
87n/a self.dbenvClient.rep_set_priority(0)
88n/a
89n/a self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
90n/a self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
91n/a self.assertEquals(self.dbenvMaster.rep_get_timeout(
92n/a db.DB_REP_CONNECTION_RETRY), 100123)
93n/a self.assertEquals(self.dbenvClient.rep_get_timeout(
94n/a db.DB_REP_CONNECTION_RETRY), 100321)
95n/a
96n/a self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
97n/a self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
98n/a self.assertEquals(self.dbenvMaster.rep_get_timeout(
99n/a db.DB_REP_ELECTION_TIMEOUT), 100234)
100n/a self.assertEquals(self.dbenvClient.rep_get_timeout(
101n/a db.DB_REP_ELECTION_TIMEOUT), 100432)
102n/a
103n/a self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
104n/a self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
105n/a self.assertEquals(self.dbenvMaster.rep_get_timeout(
106n/a db.DB_REP_ELECTION_RETRY), 100345)
107n/a self.assertEquals(self.dbenvClient.rep_get_timeout(
108n/a db.DB_REP_ELECTION_RETRY), 100543)
109n/a
110n/a self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
111n/a self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
112n/a
113n/a self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
114n/a self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
115n/a
116n/a self.assertEquals(self.dbenvMaster.rep_get_nsites(),2)
117n/a self.assertEquals(self.dbenvClient.rep_get_nsites(),2)
118n/a self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
119n/a self.assertEquals(self.dbenvClient.rep_get_priority(),0)
120n/a self.assertEquals(self.dbenvMaster.repmgr_get_ack_policy(),
121n/a db.DB_REPMGR_ACKS_ALL)
122n/a self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
123n/a db.DB_REPMGR_ACKS_ALL)
124n/a
125n/a # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
126n/a # is not generated if the master has no new transactions.
127n/a # This is solved in BDB 4.6 (#15542).
128n/a import time
129n/a timeout = time.time()+60
130n/a while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
131n/a time.sleep(0.02)
132n/a # self.client_startupdone does not always get set to True within
133n/a # the timeout. On windows this may be a deep issue, on other
134n/a # platforms it is likely just a timing issue, especially on slow
135n/a # virthost buildbots (see issue 3892 for more). Even though
136n/a # the timeout triggers, the rest of this test method usually passes
137n/a # (but not all of it always, see below). So we just note the
138n/a # timeout on stderr and keep soldering on.
139n/a if time.time()>timeout:
140n/a import sys
141n/a print >> sys.stderr, ("XXX: timeout happened before"
142n/a "startup was confirmed - see issue 3892")
143n/a startup_timeout = True
144n/a
145n/a d = self.dbenvMaster.repmgr_site_list()
146n/a self.assertEquals(len(d), 1)
147n/a self.assertEquals(d[0][0], "127.0.0.1")
148n/a self.assertEquals(d[0][1], client_port)
149n/a self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
150n/a (d[0][2]==db.DB_REPMGR_DISCONNECTED))
151n/a
152n/a d = self.dbenvClient.repmgr_site_list()
153n/a self.assertEquals(len(d), 1)
154n/a self.assertEquals(d[0][0], "127.0.0.1")
155n/a self.assertEquals(d[0][1], master_port)
156n/a self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
157n/a (d[0][2]==db.DB_REPMGR_DISCONNECTED))
158n/a
159n/a if db.version() >= (4,6) :
160n/a d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
161n/a self.assertTrue("msgs_queued" in d)
162n/a
163n/a self.dbMaster=db.DB(self.dbenvMaster)
164n/a txn=self.dbenvMaster.txn_begin()
165n/a self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
166n/a txn.commit()
167n/a
168n/a import time,os.path
169n/a timeout=time.time()+10
170n/a while (time.time()<timeout) and \
171n/a not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
172n/a time.sleep(0.01)
173n/a
174n/a self.dbClient=db.DB(self.dbenvClient)
175n/a while True :
176n/a txn=self.dbenvClient.txn_begin()
177n/a try :
178n/a self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
179n/a mode=0666, txn=txn)
180n/a except db.DBRepHandleDeadError :
181n/a txn.abort()
182n/a self.dbClient.close()
183n/a self.dbClient=db.DB(self.dbenvClient)
184n/a continue
185n/a
186n/a txn.commit()
187n/a break
188n/a
189n/a txn=self.dbenvMaster.txn_begin()
190n/a self.dbMaster.put("ABC", "123", txn=txn)
191n/a txn.commit()
192n/a import time
193n/a timeout=time.time()+10
194n/a v=None
195n/a while (time.time()<timeout) and (v is None) :
196n/a txn=self.dbenvClient.txn_begin()
197n/a v=self.dbClient.get("ABC", txn=txn)
198n/a txn.commit()
199n/a if v is None :
200n/a time.sleep(0.02)
201n/a # If startup did not happen before the timeout above, then this test
202n/a # sometimes fails. This happens randomly, which causes buildbot
203n/a # instability, but all the other bsddb tests pass. Since bsddb3 in the
204n/a # stdlib is currently not getting active maintenance, and is gone in
205n/a # py3k, we just skip the end of the test in that case.
206n/a if time.time()>=timeout and startup_timeout:
207n/a self.skipTest("replication test skipped due to random failure, "
208n/a "see issue 3892")
209n/a self.assertTrue(time.time()<timeout)
210n/a self.assertEquals("123", v)
211n/a
212n/a txn=self.dbenvMaster.txn_begin()
213n/a self.dbMaster.delete("ABC", txn=txn)
214n/a txn.commit()
215n/a timeout=time.time()+10
216n/a while (time.time()<timeout) and (v is not None) :
217n/a txn=self.dbenvClient.txn_begin()
218n/a v=self.dbClient.get("ABC", txn=txn)
219n/a txn.commit()
220n/a if v is None :
221n/a time.sleep(0.02)
222n/a self.assertTrue(time.time()<timeout)
223n/a self.assertEquals(None, v)
224n/a
225n/aclass DBBaseReplication(DBReplication) :
226n/a def setUp(self) :
227n/a DBReplication.setUp(self)
228n/a def confirmed_master(a,b,c) :
229n/a if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
230n/a self.confirmed_master = True
231n/a
232n/a def client_startupdone(a,b,c) :
233n/a if b == db.DB_EVENT_REP_STARTUPDONE :
234n/a self.client_startupdone = True
235n/a
236n/a self.dbenvMaster.set_event_notify(confirmed_master)
237n/a self.dbenvClient.set_event_notify(client_startupdone)
238n/a
239n/a import Queue
240n/a self.m2c = Queue.Queue()
241n/a self.c2m = Queue.Queue()
242n/a
243n/a # There are only two nodes, so we don't need to
244n/a # do any routing decision
245n/a def m2c(dbenv, control, rec, lsnp, envid, flags) :
246n/a self.m2c.put((control, rec))
247n/a
248n/a def c2m(dbenv, control, rec, lsnp, envid, flags) :
249n/a self.c2m.put((control, rec))
250n/a
251n/a self.dbenvMaster.rep_set_transport(13,m2c)
252n/a self.dbenvMaster.rep_set_priority(10)
253n/a self.dbenvClient.rep_set_transport(3,c2m)
254n/a self.dbenvClient.rep_set_priority(0)
255n/a
256n/a self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
257n/a self.assertEquals(self.dbenvClient.rep_get_priority(),0)
258n/a
259n/a #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
260n/a #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
261n/a #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
262n/a #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
263n/a
264n/a def thread_master() :
265n/a return self.thread_do(self.dbenvMaster, self.c2m, 3,
266n/a self.master_doing_election, True)
267n/a
268n/a def thread_client() :
269n/a return self.thread_do(self.dbenvClient, self.m2c, 13,
270n/a self.client_doing_election, False)
271n/a
272n/a from threading import Thread
273n/a t_m=Thread(target=thread_master)
274n/a t_c=Thread(target=thread_client)
275n/a import sys
276n/a if sys.version_info[0] < 3 :
277n/a t_m.setDaemon(True)
278n/a t_c.setDaemon(True)
279n/a else :
280n/a t_m.daemon = True
281n/a t_c.daemon = True
282n/a
283n/a self.t_m = t_m
284n/a self.t_c = t_c
285n/a
286n/a self.dbMaster = self.dbClient = None
287n/a
288n/a self.master_doing_election=[False]
289n/a self.client_doing_election=[False]
290n/a
291n/a
292n/a def tearDown(self):
293n/a if self.dbClient :
294n/a self.dbClient.close()
295n/a if self.dbMaster :
296n/a self.dbMaster.close()
297n/a self.m2c.put(None)
298n/a self.c2m.put(None)
299n/a self.t_m.join()
300n/a self.t_c.join()
301n/a
302n/a # Here we assign dummy event handlers to allow GC of the test object.
303n/a # Since the dummy handler doesn't use any outer scope variable, it
304n/a # doesn't keep any reference to the test object.
305n/a def dummy(*args) :
306n/a pass
307n/a self.dbenvMaster.set_event_notify(dummy)
308n/a self.dbenvClient.set_event_notify(dummy)
309n/a self.dbenvMaster.rep_set_transport(13,dummy)
310n/a self.dbenvClient.rep_set_transport(3,dummy)
311n/a
312n/a self.dbenvClient.close()
313n/a self.dbenvMaster.close()
314n/a test_support.rmtree(self.homeDirClient)
315n/a test_support.rmtree(self.homeDirMaster)
316n/a
317n/a def basic_rep_threading(self) :
318n/a self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
319n/a self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
320n/a
321n/a def thread_do(env, q, envid, election_status, must_be_master) :
322n/a while True :
323n/a v=q.get()
324n/a if v is None : return
325n/a env.rep_process_message(v[0], v[1], envid)
326n/a
327n/a self.thread_do = thread_do
328n/a
329n/a self.t_m.start()
330n/a self.t_c.start()
331n/a
332n/a def test01_basic_replication(self) :
333n/a self.basic_rep_threading()
334n/a
335n/a # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
336n/a # is not generated if the master has no new transactions.
337n/a # This is solved in BDB 4.6 (#15542).
338n/a import time
339n/a timeout = time.time()+60
340n/a while (time.time()<timeout) and not (self.confirmed_master and
341n/a self.client_startupdone) :
342n/a time.sleep(0.02)
343n/a self.assertTrue(time.time()<timeout)
344n/a
345n/a self.dbMaster=db.DB(self.dbenvMaster)
346n/a txn=self.dbenvMaster.txn_begin()
347n/a self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
348n/a txn.commit()
349n/a
350n/a import time,os.path
351n/a timeout=time.time()+10
352n/a while (time.time()<timeout) and \
353n/a not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
354n/a time.sleep(0.01)
355n/a
356n/a self.dbClient=db.DB(self.dbenvClient)
357n/a while True :
358n/a txn=self.dbenvClient.txn_begin()
359n/a try :
360n/a self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
361n/a mode=0666, txn=txn)
362n/a except db.DBRepHandleDeadError :
363n/a txn.abort()
364n/a self.dbClient.close()
365n/a self.dbClient=db.DB(self.dbenvClient)
366n/a continue
367n/a
368n/a txn.commit()
369n/a break
370n/a
371n/a d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
372n/a self.assertTrue("master_changes" in d)
373n/a
374n/a txn=self.dbenvMaster.txn_begin()
375n/a self.dbMaster.put("ABC", "123", txn=txn)
376n/a txn.commit()
377n/a import time
378n/a timeout=time.time()+10
379n/a v=None
380n/a while (time.time()<timeout) and (v is None) :
381n/a txn=self.dbenvClient.txn_begin()
382n/a v=self.dbClient.get("ABC", txn=txn)
383n/a txn.commit()
384n/a if v is None :
385n/a time.sleep(0.02)
386n/a self.assertTrue(time.time()<timeout)
387n/a self.assertEquals("123", v)
388n/a
389n/a txn=self.dbenvMaster.txn_begin()
390n/a self.dbMaster.delete("ABC", txn=txn)
391n/a txn.commit()
392n/a timeout=time.time()+10
393n/a while (time.time()<timeout) and (v is not None) :
394n/a txn=self.dbenvClient.txn_begin()
395n/a v=self.dbClient.get("ABC", txn=txn)
396n/a txn.commit()
397n/a if v is None :
398n/a time.sleep(0.02)
399n/a self.assertTrue(time.time()<timeout)
400n/a self.assertEquals(None, v)
401n/a
402n/a if db.version() >= (4,7) :
403n/a def test02_test_request(self) :
404n/a self.basic_rep_threading()
405n/a (minimum, maximum) = self.dbenvClient.rep_get_request()
406n/a self.dbenvClient.rep_set_request(minimum-1, maximum+1)
407n/a self.assertEqual(self.dbenvClient.rep_get_request(),
408n/a (minimum-1, maximum+1))
409n/a
410n/a if db.version() >= (4,6) :
411n/a def test03_master_election(self) :
412n/a # Get ready to hold an election
413n/a #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
414n/a self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
415n/a self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
416n/a
417n/a def thread_do(env, q, envid, election_status, must_be_master) :
418n/a while True :
419n/a v=q.get()
420n/a if v is None : return
421n/a r = env.rep_process_message(v[0],v[1],envid)
422n/a if must_be_master and self.confirmed_master :
423n/a self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
424n/a must_be_master = False
425n/a
426n/a if r[0] == db.DB_REP_HOLDELECTION :
427n/a def elect() :
428n/a while True :
429n/a try :
430n/a env.rep_elect(2, 1)
431n/a election_status[0] = False
432n/a break
433n/a except db.DBRepUnavailError :
434n/a pass
435n/a if not election_status[0] and not self.confirmed_master :
436n/a from threading import Thread
437n/a election_status[0] = True
438n/a t=Thread(target=elect)
439n/a import sys
440n/a if sys.version_info[0] < 3 :
441n/a t.setDaemon(True)
442n/a else :
443n/a t.daemon = True
444n/a t.start()
445n/a
446n/a self.thread_do = thread_do
447n/a
448n/a self.t_m.start()
449n/a self.t_c.start()
450n/a
451n/a self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
452n/a self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
453n/a self.client_doing_election[0] = True
454n/a while True :
455n/a try :
456n/a self.dbenvClient.rep_elect(2, 1)
457n/a self.client_doing_election[0] = False
458n/a break
459n/a except db.DBRepUnavailError :
460n/a pass
461n/a
462n/a self.assertTrue(self.confirmed_master)
463n/a
464n/a if db.version() >= (4,7) :
465n/a def test04_test_clockskew(self) :
466n/a fast, slow = 1234, 1230
467n/a self.dbenvMaster.rep_set_clockskew(fast, slow)
468n/a self.assertEqual((fast, slow),
469n/a self.dbenvMaster.rep_get_clockskew())
470n/a self.basic_rep_threading()
471n/a
472n/a#----------------------------------------------------------------------
473n/a
474n/adef test_suite():
475n/a suite = unittest.TestSuite()
476n/a if db.version() >= (4, 6) :
477n/a dbenv = db.DBEnv()
478n/a try :
479n/a dbenv.repmgr_get_ack_policy()
480n/a ReplicationManager_available=True
481n/a except :
482n/a ReplicationManager_available=False
483n/a dbenv.close()
484n/a del dbenv
485n/a if ReplicationManager_available :
486n/a suite.addTest(unittest.makeSuite(DBReplicationManager))
487n/a
488n/a if have_threads :
489n/a suite.addTest(unittest.makeSuite(DBBaseReplication))
490n/a
491n/a return suite
492n/a
493n/a
494n/aif __name__ == '__main__':
495n/a unittest.main(defaultTest='test_suite')