ยปCore Development>Code coverage>Doc/includes/mp_synchronize.py

Python code coverage for Doc/includes/mp_synchronize.py

#countcontent
1n/a#
2n/a# A test file for the `multiprocessing` package
3n/a#
4n/a# Copyright (c) 2006-2008, R Oudkerk
5n/a# All rights reserved.
6n/a#
7n/a
8n/aimport time
9n/aimport sys
10n/aimport random
11n/afrom queue import Empty
12n/a
13n/aimport multiprocessing # may get overwritten
14n/a
15n/a
16n/a#### TEST_VALUE
17n/a
18n/adef value_func(running, mutex):
19n/a random.seed()
20n/a time.sleep(random.random()*4)
21n/a
22n/a mutex.acquire()
23n/a print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
24n/a running.value -= 1
25n/a mutex.release()
26n/a
27n/adef test_value():
28n/a TASKS = 10
29n/a running = multiprocessing.Value('i', TASKS)
30n/a mutex = multiprocessing.Lock()
31n/a
32n/a for i in range(TASKS):
33n/a p = multiprocessing.Process(target=value_func, args=(running, mutex))
34n/a p.start()
35n/a
36n/a while running.value > 0:
37n/a time.sleep(0.08)
38n/a mutex.acquire()
39n/a print(running.value, end=' ')
40n/a sys.stdout.flush()
41n/a mutex.release()
42n/a
43n/a print()
44n/a print('No more running processes')
45n/a
46n/a
47n/a#### TEST_QUEUE
48n/a
49n/adef queue_func(queue):
50n/a for i in range(30):
51n/a time.sleep(0.5 * random.random())
52n/a queue.put(i*i)
53n/a queue.put('STOP')
54n/a
55n/adef test_queue():
56n/a q = multiprocessing.Queue()
57n/a
58n/a p = multiprocessing.Process(target=queue_func, args=(q,))
59n/a p.start()
60n/a
61n/a o = None
62n/a while o != 'STOP':
63n/a try:
64n/a o = q.get(timeout=0.3)
65n/a print(o, end=' ')
66n/a sys.stdout.flush()
67n/a except Empty:
68n/a print('TIMEOUT')
69n/a
70n/a print()
71n/a
72n/a
73n/a#### TEST_CONDITION
74n/a
75n/adef condition_func(cond):
76n/a cond.acquire()
77n/a print('\t' + str(cond))
78n/a time.sleep(2)
79n/a print('\tchild is notifying')
80n/a print('\t' + str(cond))
81n/a cond.notify()
82n/a cond.release()
83n/a
84n/adef test_condition():
85n/a cond = multiprocessing.Condition()
86n/a
87n/a p = multiprocessing.Process(target=condition_func, args=(cond,))
88n/a print(cond)
89n/a
90n/a cond.acquire()
91n/a print(cond)
92n/a cond.acquire()
93n/a print(cond)
94n/a
95n/a p.start()
96n/a
97n/a print('main is waiting')
98n/a cond.wait()
99n/a print('main has woken up')
100n/a
101n/a print(cond)
102n/a cond.release()
103n/a print(cond)
104n/a cond.release()
105n/a
106n/a p.join()
107n/a print(cond)
108n/a
109n/a
110n/a#### TEST_SEMAPHORE
111n/a
112n/adef semaphore_func(sema, mutex, running):
113n/a sema.acquire()
114n/a
115n/a mutex.acquire()
116n/a running.value += 1
117n/a print(running.value, 'tasks are running')
118n/a mutex.release()
119n/a
120n/a random.seed()
121n/a time.sleep(random.random()*2)
122n/a
123n/a mutex.acquire()
124n/a running.value -= 1
125n/a print('%s has finished' % multiprocessing.current_process())
126n/a mutex.release()
127n/a
128n/a sema.release()
129n/a
130n/adef test_semaphore():
131n/a sema = multiprocessing.Semaphore(3)
132n/a mutex = multiprocessing.RLock()
133n/a running = multiprocessing.Value('i', 0)
134n/a
135n/a processes = [
136n/a multiprocessing.Process(target=semaphore_func,
137n/a args=(sema, mutex, running))
138n/a for i in range(10)
139n/a ]
140n/a
141n/a for p in processes:
142n/a p.start()
143n/a
144n/a for p in processes:
145n/a p.join()
146n/a
147n/a
148n/a#### TEST_JOIN_TIMEOUT
149n/a
150n/adef join_timeout_func():
151n/a print('\tchild sleeping')
152n/a time.sleep(5.5)
153n/a print('\n\tchild terminating')
154n/a
155n/adef test_join_timeout():
156n/a p = multiprocessing.Process(target=join_timeout_func)
157n/a p.start()
158n/a
159n/a print('waiting for process to finish')
160n/a
161n/a while 1:
162n/a p.join(timeout=1)
163n/a if not p.is_alive():
164n/a break
165n/a print('.', end=' ')
166n/a sys.stdout.flush()
167n/a
168n/a
169n/a#### TEST_EVENT
170n/a
171n/adef event_func(event):
172n/a print('\t%r is waiting' % multiprocessing.current_process())
173n/a event.wait()
174n/a print('\t%r has woken up' % multiprocessing.current_process())
175n/a
176n/adef test_event():
177n/a event = multiprocessing.Event()
178n/a
179n/a processes = [multiprocessing.Process(target=event_func, args=(event,))
180n/a for i in range(5)]
181n/a
182n/a for p in processes:
183n/a p.start()
184n/a
185n/a print('main is sleeping')
186n/a time.sleep(2)
187n/a
188n/a print('main is setting event')
189n/a event.set()
190n/a
191n/a for p in processes:
192n/a p.join()
193n/a
194n/a
195n/a#### TEST_SHAREDVALUES
196n/a
197n/adef sharedvalues_func(values, arrays, shared_values, shared_arrays):
198n/a for i in range(len(values)):
199n/a v = values[i][1]
200n/a sv = shared_values[i].value
201n/a assert v == sv
202n/a
203n/a for i in range(len(values)):
204n/a a = arrays[i][1]
205n/a sa = list(shared_arrays[i][:])
206n/a assert a == sa
207n/a
208n/a print('Tests passed')
209n/a
210n/adef test_sharedvalues():
211n/a values = [
212n/a ('i', 10),
213n/a ('h', -2),
214n/a ('d', 1.25)
215n/a ]
216n/a arrays = [
217n/a ('i', list(range(100))),
218n/a ('d', [0.25 * i for i in range(100)]),
219n/a ('H', list(range(1000)))
220n/a ]
221n/a
222n/a shared_values = [multiprocessing.Value(id, v) for id, v in values]
223n/a shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
224n/a
225n/a p = multiprocessing.Process(
226n/a target=sharedvalues_func,
227n/a args=(values, arrays, shared_values, shared_arrays)
228n/a )
229n/a p.start()
230n/a p.join()
231n/a
232n/a assert p.exitcode == 0
233n/a
234n/a
235n/a####
236n/a
237n/adef test(namespace=multiprocessing):
238n/a global multiprocessing
239n/a
240n/a multiprocessing = namespace
241n/a
242n/a for func in [test_value, test_queue, test_condition,
243n/a test_semaphore, test_join_timeout, test_event,
244n/a test_sharedvalues]:
245n/a
246n/a print('\n\t######## %s\n' % func.__name__)
247n/a func()
248n/a
249n/a ignore = multiprocessing.active_children() # cleanup any old processes
250n/a if hasattr(multiprocessing, '_debug_info'):
251n/a info = multiprocessing._debug_info()
252n/a if info:
253n/a print(info)
254n/a raise ValueError('there should be no positive refcounts left')
255n/a
256n/a
257n/aif __name__ == '__main__':
258n/a multiprocessing.freeze_support()
259n/a
260n/a assert len(sys.argv) in (1, 2)
261n/a
262n/a if len(sys.argv) == 1 or sys.argv[1] == 'processes':
263n/a print(' Using processes '.center(79, '-'))
264n/a namespace = multiprocessing
265n/a elif sys.argv[1] == 'manager':
266n/a print(' Using processes and a manager '.center(79, '-'))
267n/a namespace = multiprocessing.Manager()
268n/a namespace.Process = multiprocessing.Process
269n/a namespace.current_process = multiprocessing.current_process
270n/a namespace.active_children = multiprocessing.active_children
271n/a elif sys.argv[1] == 'threads':
272n/a print(' Using threads '.center(79, '-'))
273n/a import multiprocessing.dummy as namespace
274n/a else:
275n/a print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
276n/a raise SystemExit(2)
277n/a
278n/a test(namespace)