ยปCore Development>Code coverage>Doc/includes/mp_pool.py

Python code coverage for Doc/includes/mp_pool.py

#countcontent
1n/aimport multiprocessing
2n/aimport time
3n/aimport random
4n/aimport sys
5n/a
6n/a#
7n/a# Functions used by test code
8n/a#
9n/a
10n/adef calculate(func, args):
11n/a result = func(*args)
12n/a return '%s says that %s%s = %s' % (
13n/a multiprocessing.current_process().name,
14n/a func.__name__, args, result
15n/a )
16n/a
17n/adef calculatestar(args):
18n/a return calculate(*args)
19n/a
20n/adef mul(a, b):
21n/a time.sleep(0.5 * random.random())
22n/a return a * b
23n/a
24n/adef plus(a, b):
25n/a time.sleep(0.5 * random.random())
26n/a return a + b
27n/a
28n/adef f(x):
29n/a return 1.0 / (x - 5.0)
30n/a
31n/adef pow3(x):
32n/a return x ** 3
33n/a
34n/adef noop(x):
35n/a pass
36n/a
37n/a#
38n/a# Test code
39n/a#
40n/a
41n/adef test():
42n/a PROCESSES = 4
43n/a print('Creating pool with %d processes\n' % PROCESSES)
44n/a
45n/a with multiprocessing.Pool(PROCESSES) as pool:
46n/a #
47n/a # Tests
48n/a #
49n/a
50n/a TASKS = [(mul, (i, 7)) for i in range(10)] + \
51n/a [(plus, (i, 8)) for i in range(10)]
52n/a
53n/a results = [pool.apply_async(calculate, t) for t in TASKS]
54n/a imap_it = pool.imap(calculatestar, TASKS)
55n/a imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
56n/a
57n/a print('Ordered results using pool.apply_async():')
58n/a for r in results:
59n/a print('\t', r.get())
60n/a print()
61n/a
62n/a print('Ordered results using pool.imap():')
63n/a for x in imap_it:
64n/a print('\t', x)
65n/a print()
66n/a
67n/a print('Unordered results using pool.imap_unordered():')
68n/a for x in imap_unordered_it:
69n/a print('\t', x)
70n/a print()
71n/a
72n/a print('Ordered results using pool.map() --- will block till complete:')
73n/a for x in pool.map(calculatestar, TASKS):
74n/a print('\t', x)
75n/a print()
76n/a
77n/a #
78n/a # Test error handling
79n/a #
80n/a
81n/a print('Testing error handling:')
82n/a
83n/a try:
84n/a print(pool.apply(f, (5,)))
85n/a except ZeroDivisionError:
86n/a print('\tGot ZeroDivisionError as expected from pool.apply()')
87n/a else:
88n/a raise AssertionError('expected ZeroDivisionError')
89n/a
90n/a try:
91n/a print(pool.map(f, list(range(10))))
92n/a except ZeroDivisionError:
93n/a print('\tGot ZeroDivisionError as expected from pool.map()')
94n/a else:
95n/a raise AssertionError('expected ZeroDivisionError')
96n/a
97n/a try:
98n/a print(list(pool.imap(f, list(range(10)))))
99n/a except ZeroDivisionError:
100n/a print('\tGot ZeroDivisionError as expected from list(pool.imap())')
101n/a else:
102n/a raise AssertionError('expected ZeroDivisionError')
103n/a
104n/a it = pool.imap(f, list(range(10)))
105n/a for i in range(10):
106n/a try:
107n/a x = next(it)
108n/a except ZeroDivisionError:
109n/a if i == 5:
110n/a pass
111n/a except StopIteration:
112n/a break
113n/a else:
114n/a if i == 5:
115n/a raise AssertionError('expected ZeroDivisionError')
116n/a
117n/a assert i == 9
118n/a print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
119n/a print()
120n/a
121n/a #
122n/a # Testing timeouts
123n/a #
124n/a
125n/a print('Testing ApplyResult.get() with timeout:', end=' ')
126n/a res = pool.apply_async(calculate, TASKS[0])
127n/a while 1:
128n/a sys.stdout.flush()
129n/a try:
130n/a sys.stdout.write('\n\t%s' % res.get(0.02))
131n/a break
132n/a except multiprocessing.TimeoutError:
133n/a sys.stdout.write('.')
134n/a print()
135n/a print()
136n/a
137n/a print('Testing IMapIterator.next() with timeout:', end=' ')
138n/a it = pool.imap(calculatestar, TASKS)
139n/a while 1:
140n/a sys.stdout.flush()
141n/a try:
142n/a sys.stdout.write('\n\t%s' % it.next(0.02))
143n/a except StopIteration:
144n/a break
145n/a except multiprocessing.TimeoutError:
146n/a sys.stdout.write('.')
147n/a print()
148n/a print()
149n/a
150n/a
151n/aif __name__ == '__main__':
152n/a multiprocessing.freeze_support()
153n/a test()