ยปCore Development>Code coverage>Lib/test/test_multiprocessing.py

Python code coverage for Lib/test/test_multiprocessing.py

#countcontent
1n/a#!/usr/bin/env python3
2n/a
3n/a#
4n/a# Unit tests for the multiprocessing package
5n/a#
6n/a
7n/aimport unittest
8n/aimport queue as pyqueue
9n/aimport time
10n/aimport io
11n/aimport itertools
12n/aimport sys
13n/aimport os
14n/aimport gc
15n/aimport errno
16n/aimport signal
17n/aimport array
18n/aimport socket
19n/aimport random
20n/aimport logging
21n/aimport struct
22n/aimport operator
23n/aimport test.support
24n/aimport test.script_helper
25n/a
26n/a
27n/a# Skip tests if _multiprocessing wasn't built.
28n/a_multiprocessing = test.support.import_module('_multiprocessing')
29n/a# Skip tests if sem_open implementation is broken.
30n/atest.support.import_module('multiprocessing.synchronize')
31n/a# import threading after _multiprocessing to raise a more revelant error
32n/a# message: "No module named _multiprocessing". _multiprocessing is not compiled
33n/a# without thread support.
34n/aimport threading
35n/a
36n/aimport multiprocessing.dummy
37n/aimport multiprocessing.connection
38n/aimport multiprocessing.managers
39n/aimport multiprocessing.heap
40n/aimport multiprocessing.pool
41n/a
42n/afrom multiprocessing import util
43n/a
44n/atry:
45n/a from multiprocessing import reduction
46n/a HAS_REDUCTION = True
47n/aexcept ImportError:
48n/a HAS_REDUCTION = False
49n/a
50n/atry:
51n/a from multiprocessing.sharedctypes import Value, copy
52n/a HAS_SHAREDCTYPES = True
53n/aexcept ImportError:
54n/a HAS_SHAREDCTYPES = False
55n/a
56n/atry:
57n/a import msvcrt
58n/aexcept ImportError:
59n/a msvcrt = None
60n/a
61n/a#
62n/a#
63n/a#
64n/a
65n/adef latin(s):
66n/a return s.encode('latin')
67n/a
68n/a#
69n/a# Constants
70n/a#
71n/a
72n/aLOG_LEVEL = util.SUBWARNING
73n/a#LOG_LEVEL = logging.DEBUG
74n/a
75n/aDELTA = 0.1
76n/aCHECK_TIMINGS = False # making true makes tests take a lot longer
77n/a # and can sometimes cause some non-serious
78n/a # failures because some calls block a bit
79n/a # longer than expected
80n/aif CHECK_TIMINGS:
81n/a TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82n/aelse:
83n/a TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84n/a
85n/aHAVE_GETVALUE = not getattr(_multiprocessing,
86n/a 'HAVE_BROKEN_SEM_GETVALUE', False)
87n/a
88n/aWIN32 = (sys.platform == "win32")
89n/a
90n/afrom multiprocessing.connection import wait
91n/a
92n/adef wait_for_handle(handle, timeout):
93n/a if timeout is not None and timeout < 0.0:
94n/a timeout = None
95n/a return wait([handle], timeout)
96n/a
97n/atry:
98n/a MAXFD = os.sysconf("SC_OPEN_MAX")
99n/aexcept:
100n/a MAXFD = 256
101n/a
102n/a#
103n/a# Some tests require ctypes
104n/a#
105n/a
106n/atry:
107n/a from ctypes import Structure, c_int, c_double
108n/aexcept ImportError:
109n/a Structure = object
110n/a c_int = c_double = None
111n/a
112n/a
113n/adef check_enough_semaphores():
114n/a """Check that the system supports enough semaphores to run the test."""
115n/a # minimum number of semaphores available according to POSIX
116n/a nsems_min = 256
117n/a try:
118n/a nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119n/a except (AttributeError, ValueError):
120n/a # sysconf not available or setting not available
121n/a return
122n/a if nsems == -1 or nsems >= nsems_min:
123n/a return
124n/a raise unittest.SkipTest("The OS doesn't support enough semaphores "
125n/a "to run the test (required: %d)." % nsems_min)
126n/a
127n/a
128n/a#
129n/a# Creates a wrapper for a function which records the time it takes to finish
130n/a#
131n/a
132n/aclass TimingWrapper(object):
133n/a
134n/a def __init__(self, func):
135n/a self.func = func
136n/a self.elapsed = None
137n/a
138n/a def __call__(self, *args, **kwds):
139n/a t = time.time()
140n/a try:
141n/a return self.func(*args, **kwds)
142n/a finally:
143n/a self.elapsed = time.time() - t
144n/a
145n/a#
146n/a# Base class for test cases
147n/a#
148n/a
149n/aclass BaseTestCase(object):
150n/a
151n/a ALLOWED_TYPES = ('processes', 'manager', 'threads')
152n/a
153n/a def assertTimingAlmostEqual(self, a, b):
154n/a if CHECK_TIMINGS:
155n/a self.assertAlmostEqual(a, b, 1)
156n/a
157n/a def assertReturnsIfImplemented(self, value, func, *args):
158n/a try:
159n/a res = func(*args)
160n/a except NotImplementedError:
161n/a pass
162n/a else:
163n/a return self.assertEqual(value, res)
164n/a
165n/a # For the sanity of Windows users, rather than crashing or freezing in
166n/a # multiple ways.
167n/a def __reduce__(self, *args):
168n/a raise NotImplementedError("shouldn't try to pickle a test case")
169n/a
170n/a __reduce_ex__ = __reduce__
171n/a
172n/a#
173n/a# Return the value of a semaphore
174n/a#
175n/a
176n/adef get_value(self):
177n/a try:
178n/a return self.get_value()
179n/a except AttributeError:
180n/a try:
181n/a return self._Semaphore__value
182n/a except AttributeError:
183n/a try:
184n/a return self._value
185n/a except AttributeError:
186n/a raise NotImplementedError
187n/a
188n/a#
189n/a# Testcases
190n/a#
191n/a
192n/aclass _TestProcess(BaseTestCase):
193n/a
194n/a ALLOWED_TYPES = ('processes', 'threads')
195n/a
196n/a def test_current(self):
197n/a if self.TYPE == 'threads':
198n/a return
199n/a
200n/a current = self.current_process()
201n/a authkey = current.authkey
202n/a
203n/a self.assertTrue(current.is_alive())
204n/a self.assertTrue(not current.daemon)
205n/a self.assertIsInstance(authkey, bytes)
206n/a self.assertTrue(len(authkey) > 0)
207n/a self.assertEqual(current.ident, os.getpid())
208n/a self.assertEqual(current.exitcode, None)
209n/a
210n/a def test_daemon_argument(self):
211n/a if self.TYPE == "threads":
212n/a return
213n/a
214n/a # By default uses the current process's daemon flag.
215n/a proc0 = self.Process(target=self._test)
216n/a self.assertEqual(proc0.daemon, self.current_process().daemon)
217n/a proc1 = self.Process(target=self._test, daemon=True)
218n/a self.assertTrue(proc1.daemon)
219n/a proc2 = self.Process(target=self._test, daemon=False)
220n/a self.assertFalse(proc2.daemon)
221n/a
222n/a @classmethod
223n/a def _test(cls, q, *args, **kwds):
224n/a current = cls.current_process()
225n/a q.put(args)
226n/a q.put(kwds)
227n/a q.put(current.name)
228n/a if cls.TYPE != 'threads':
229n/a q.put(bytes(current.authkey))
230n/a q.put(current.pid)
231n/a
232n/a def test_process(self):
233n/a q = self.Queue(1)
234n/a e = self.Event()
235n/a args = (q, 1, 2)
236n/a kwargs = {'hello':23, 'bye':2.54}
237n/a name = 'SomeProcess'
238n/a p = self.Process(
239n/a target=self._test, args=args, kwargs=kwargs, name=name
240n/a )
241n/a p.daemon = True
242n/a current = self.current_process()
243n/a
244n/a if self.TYPE != 'threads':
245n/a self.assertEqual(p.authkey, current.authkey)
246n/a self.assertEqual(p.is_alive(), False)
247n/a self.assertEqual(p.daemon, True)
248n/a self.assertNotIn(p, self.active_children())
249n/a self.assertTrue(type(self.active_children()) is list)
250n/a self.assertEqual(p.exitcode, None)
251n/a
252n/a p.start()
253n/a
254n/a self.assertEqual(p.exitcode, None)
255n/a self.assertEqual(p.is_alive(), True)
256n/a self.assertIn(p, self.active_children())
257n/a
258n/a self.assertEqual(q.get(), args[1:])
259n/a self.assertEqual(q.get(), kwargs)
260n/a self.assertEqual(q.get(), p.name)
261n/a if self.TYPE != 'threads':
262n/a self.assertEqual(q.get(), current.authkey)
263n/a self.assertEqual(q.get(), p.pid)
264n/a
265n/a p.join()
266n/a
267n/a self.assertEqual(p.exitcode, 0)
268n/a self.assertEqual(p.is_alive(), False)
269n/a self.assertNotIn(p, self.active_children())
270n/a
271n/a @classmethod
272n/a def _test_terminate(cls):
273n/a time.sleep(1000)
274n/a
275n/a def test_terminate(self):
276n/a if self.TYPE == 'threads':
277n/a return
278n/a
279n/a p = self.Process(target=self._test_terminate)
280n/a p.daemon = True
281n/a p.start()
282n/a
283n/a self.assertEqual(p.is_alive(), True)
284n/a self.assertIn(p, self.active_children())
285n/a self.assertEqual(p.exitcode, None)
286n/a
287n/a join = TimingWrapper(p.join)
288n/a
289n/a self.assertEqual(join(0), None)
290n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
291n/a self.assertEqual(p.is_alive(), True)
292n/a
293n/a self.assertEqual(join(-1), None)
294n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
295n/a self.assertEqual(p.is_alive(), True)
296n/a
297n/a p.terminate()
298n/a
299n/a self.assertEqual(join(), None)
300n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
301n/a
302n/a self.assertEqual(p.is_alive(), False)
303n/a self.assertNotIn(p, self.active_children())
304n/a
305n/a p.join()
306n/a
307n/a # XXX sometimes get p.exitcode == 0 on Windows ...
308n/a #self.assertEqual(p.exitcode, -signal.SIGTERM)
309n/a
310n/a def test_cpu_count(self):
311n/a try:
312n/a cpus = multiprocessing.cpu_count()
313n/a except NotImplementedError:
314n/a cpus = 1
315n/a self.assertTrue(type(cpus) is int)
316n/a self.assertTrue(cpus >= 1)
317n/a
318n/a def test_active_children(self):
319n/a self.assertEqual(type(self.active_children()), list)
320n/a
321n/a p = self.Process(target=time.sleep, args=(DELTA,))
322n/a self.assertNotIn(p, self.active_children())
323n/a
324n/a p.daemon = True
325n/a p.start()
326n/a self.assertIn(p, self.active_children())
327n/a
328n/a p.join()
329n/a self.assertNotIn(p, self.active_children())
330n/a
331n/a @classmethod
332n/a def _test_recursion(cls, wconn, id):
333n/a from multiprocessing import forking
334n/a wconn.send(id)
335n/a if len(id) < 2:
336n/a for i in range(2):
337n/a p = cls.Process(
338n/a target=cls._test_recursion, args=(wconn, id+[i])
339n/a )
340n/a p.start()
341n/a p.join()
342n/a
343n/a def test_recursion(self):
344n/a rconn, wconn = self.Pipe(duplex=False)
345n/a self._test_recursion(wconn, [])
346n/a
347n/a time.sleep(DELTA)
348n/a result = []
349n/a while rconn.poll():
350n/a result.append(rconn.recv())
351n/a
352n/a expected = [
353n/a [],
354n/a [0],
355n/a [0, 0],
356n/a [0, 1],
357n/a [1],
358n/a [1, 0],
359n/a [1, 1]
360n/a ]
361n/a self.assertEqual(result, expected)
362n/a
363n/a @classmethod
364n/a def _test_sentinel(cls, event):
365n/a event.wait(10.0)
366n/a
367n/a def test_sentinel(self):
368n/a if self.TYPE == "threads":
369n/a return
370n/a event = self.Event()
371n/a p = self.Process(target=self._test_sentinel, args=(event,))
372n/a with self.assertRaises(ValueError):
373n/a p.sentinel
374n/a p.start()
375n/a self.addCleanup(p.join)
376n/a sentinel = p.sentinel
377n/a self.assertIsInstance(sentinel, int)
378n/a self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379n/a event.set()
380n/a p.join()
381n/a self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382n/a
383n/a#
384n/a#
385n/a#
386n/a
387n/aclass _UpperCaser(multiprocessing.Process):
388n/a
389n/a def __init__(self):
390n/a multiprocessing.Process.__init__(self)
391n/a self.child_conn, self.parent_conn = multiprocessing.Pipe()
392n/a
393n/a def run(self):
394n/a self.parent_conn.close()
395n/a for s in iter(self.child_conn.recv, None):
396n/a self.child_conn.send(s.upper())
397n/a self.child_conn.close()
398n/a
399n/a def submit(self, s):
400n/a assert type(s) is str
401n/a self.parent_conn.send(s)
402n/a return self.parent_conn.recv()
403n/a
404n/a def stop(self):
405n/a self.parent_conn.send(None)
406n/a self.parent_conn.close()
407n/a self.child_conn.close()
408n/a
409n/aclass _TestSubclassingProcess(BaseTestCase):
410n/a
411n/a ALLOWED_TYPES = ('processes',)
412n/a
413n/a def test_subclassing(self):
414n/a uppercaser = _UpperCaser()
415n/a uppercaser.daemon = True
416n/a uppercaser.start()
417n/a self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418n/a self.assertEqual(uppercaser.submit('world'), 'WORLD')
419n/a uppercaser.stop()
420n/a uppercaser.join()
421n/a
422n/a def test_stderr_flush(self):
423n/a # sys.stderr is flushed at process shutdown (issue #13812)
424n/a if self.TYPE == "threads":
425n/a return
426n/a
427n/a testfn = test.support.TESTFN
428n/a self.addCleanup(test.support.unlink, testfn)
429n/a proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430n/a proc.start()
431n/a proc.join()
432n/a with open(testfn, 'r') as f:
433n/a err = f.read()
434n/a # The whole traceback was printed
435n/a self.assertIn("ZeroDivisionError", err)
436n/a self.assertIn("test_multiprocessing.py", err)
437n/a self.assertIn("1/0 # MARKER", err)
438n/a
439n/a @classmethod
440n/a def _test_stderr_flush(cls, testfn):
441n/a sys.stderr = open(testfn, 'w')
442n/a 1/0 # MARKER
443n/a
444n/a
445n/a @classmethod
446n/a def _test_sys_exit(cls, reason, testfn):
447n/a sys.stderr = open(testfn, 'w')
448n/a sys.exit(reason)
449n/a
450n/a def test_sys_exit(self):
451n/a # See Issue 13854
452n/a if self.TYPE == 'threads':
453n/a return
454n/a
455n/a testfn = test.support.TESTFN
456n/a self.addCleanup(test.support.unlink, testfn)
457n/a
458n/a for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459n/a p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460n/a p.daemon = True
461n/a p.start()
462n/a p.join(5)
463n/a self.assertEqual(p.exitcode, code)
464n/a
465n/a with open(testfn, 'r') as f:
466n/a self.assertEqual(f.read().rstrip(), str(reason))
467n/a
468n/a for reason in (True, False, 8):
469n/a p = self.Process(target=sys.exit, args=(reason,))
470n/a p.daemon = True
471n/a p.start()
472n/a p.join(5)
473n/a self.assertEqual(p.exitcode, reason)
474n/a
475n/a#
476n/a#
477n/a#
478n/a
479n/adef queue_empty(q):
480n/a if hasattr(q, 'empty'):
481n/a return q.empty()
482n/a else:
483n/a return q.qsize() == 0
484n/a
485n/adef queue_full(q, maxsize):
486n/a if hasattr(q, 'full'):
487n/a return q.full()
488n/a else:
489n/a return q.qsize() == maxsize
490n/a
491n/a
492n/aclass _TestQueue(BaseTestCase):
493n/a
494n/a
495n/a @classmethod
496n/a def _test_put(cls, queue, child_can_start, parent_can_continue):
497n/a child_can_start.wait()
498n/a for i in range(6):
499n/a queue.get()
500n/a parent_can_continue.set()
501n/a
502n/a def test_put(self):
503n/a MAXSIZE = 6
504n/a queue = self.Queue(maxsize=MAXSIZE)
505n/a child_can_start = self.Event()
506n/a parent_can_continue = self.Event()
507n/a
508n/a proc = self.Process(
509n/a target=self._test_put,
510n/a args=(queue, child_can_start, parent_can_continue)
511n/a )
512n/a proc.daemon = True
513n/a proc.start()
514n/a
515n/a self.assertEqual(queue_empty(queue), True)
516n/a self.assertEqual(queue_full(queue, MAXSIZE), False)
517n/a
518n/a queue.put(1)
519n/a queue.put(2, True)
520n/a queue.put(3, True, None)
521n/a queue.put(4, False)
522n/a queue.put(5, False, None)
523n/a queue.put_nowait(6)
524n/a
525n/a # the values may be in buffer but not yet in pipe so sleep a bit
526n/a time.sleep(DELTA)
527n/a
528n/a self.assertEqual(queue_empty(queue), False)
529n/a self.assertEqual(queue_full(queue, MAXSIZE), True)
530n/a
531n/a put = TimingWrapper(queue.put)
532n/a put_nowait = TimingWrapper(queue.put_nowait)
533n/a
534n/a self.assertRaises(pyqueue.Full, put, 7, False)
535n/a self.assertTimingAlmostEqual(put.elapsed, 0)
536n/a
537n/a self.assertRaises(pyqueue.Full, put, 7, False, None)
538n/a self.assertTimingAlmostEqual(put.elapsed, 0)
539n/a
540n/a self.assertRaises(pyqueue.Full, put_nowait, 7)
541n/a self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542n/a
543n/a self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544n/a self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545n/a
546n/a self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547n/a self.assertTimingAlmostEqual(put.elapsed, 0)
548n/a
549n/a self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550n/a self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551n/a
552n/a child_can_start.set()
553n/a parent_can_continue.wait()
554n/a
555n/a self.assertEqual(queue_empty(queue), True)
556n/a self.assertEqual(queue_full(queue, MAXSIZE), False)
557n/a
558n/a proc.join()
559n/a
560n/a @classmethod
561n/a def _test_get(cls, queue, child_can_start, parent_can_continue):
562n/a child_can_start.wait()
563n/a #queue.put(1)
564n/a queue.put(2)
565n/a queue.put(3)
566n/a queue.put(4)
567n/a queue.put(5)
568n/a parent_can_continue.set()
569n/a
570n/a def test_get(self):
571n/a queue = self.Queue()
572n/a child_can_start = self.Event()
573n/a parent_can_continue = self.Event()
574n/a
575n/a proc = self.Process(
576n/a target=self._test_get,
577n/a args=(queue, child_can_start, parent_can_continue)
578n/a )
579n/a proc.daemon = True
580n/a proc.start()
581n/a
582n/a self.assertEqual(queue_empty(queue), True)
583n/a
584n/a child_can_start.set()
585n/a parent_can_continue.wait()
586n/a
587n/a time.sleep(DELTA)
588n/a self.assertEqual(queue_empty(queue), False)
589n/a
590n/a # Hangs unexpectedly, remove for now
591n/a #self.assertEqual(queue.get(), 1)
592n/a self.assertEqual(queue.get(True, None), 2)
593n/a self.assertEqual(queue.get(True), 3)
594n/a self.assertEqual(queue.get(timeout=1), 4)
595n/a self.assertEqual(queue.get_nowait(), 5)
596n/a
597n/a self.assertEqual(queue_empty(queue), True)
598n/a
599n/a get = TimingWrapper(queue.get)
600n/a get_nowait = TimingWrapper(queue.get_nowait)
601n/a
602n/a self.assertRaises(pyqueue.Empty, get, False)
603n/a self.assertTimingAlmostEqual(get.elapsed, 0)
604n/a
605n/a self.assertRaises(pyqueue.Empty, get, False, None)
606n/a self.assertTimingAlmostEqual(get.elapsed, 0)
607n/a
608n/a self.assertRaises(pyqueue.Empty, get_nowait)
609n/a self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610n/a
611n/a self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613n/a
614n/a self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615n/a self.assertTimingAlmostEqual(get.elapsed, 0)
616n/a
617n/a self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619n/a
620n/a proc.join()
621n/a
622n/a @classmethod
623n/a def _test_fork(cls, queue):
624n/a for i in range(10, 20):
625n/a queue.put(i)
626n/a # note that at this point the items may only be buffered, so the
627n/a # process cannot shutdown until the feeder thread has finished
628n/a # pushing items onto the pipe.
629n/a
630n/a def test_fork(self):
631n/a # Old versions of Queue would fail to create a new feeder
632n/a # thread for a forked process if the original process had its
633n/a # own feeder thread. This test checks that this no longer
634n/a # happens.
635n/a
636n/a queue = self.Queue()
637n/a
638n/a # put items on queue so that main process starts a feeder thread
639n/a for i in range(10):
640n/a queue.put(i)
641n/a
642n/a # wait to make sure thread starts before we fork a new process
643n/a time.sleep(DELTA)
644n/a
645n/a # fork process
646n/a p = self.Process(target=self._test_fork, args=(queue,))
647n/a p.daemon = True
648n/a p.start()
649n/a
650n/a # check that all expected items are in the queue
651n/a for i in range(20):
652n/a self.assertEqual(queue.get(), i)
653n/a self.assertRaises(pyqueue.Empty, queue.get, False)
654n/a
655n/a p.join()
656n/a
657n/a def test_qsize(self):
658n/a q = self.Queue()
659n/a try:
660n/a self.assertEqual(q.qsize(), 0)
661n/a except NotImplementedError:
662n/a return
663n/a q.put(1)
664n/a self.assertEqual(q.qsize(), 1)
665n/a q.put(5)
666n/a self.assertEqual(q.qsize(), 2)
667n/a q.get()
668n/a self.assertEqual(q.qsize(), 1)
669n/a q.get()
670n/a self.assertEqual(q.qsize(), 0)
671n/a
672n/a @classmethod
673n/a def _test_task_done(cls, q):
674n/a for obj in iter(q.get, None):
675n/a time.sleep(DELTA)
676n/a q.task_done()
677n/a
678n/a def test_task_done(self):
679n/a queue = self.JoinableQueue()
680n/a
681n/a if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
682n/a self.skipTest("requires 'queue.task_done()' method")
683n/a
684n/a workers = [self.Process(target=self._test_task_done, args=(queue,))
685n/a for i in range(4)]
686n/a
687n/a for p in workers:
688n/a p.daemon = True
689n/a p.start()
690n/a
691n/a for i in range(10):
692n/a queue.put(i)
693n/a
694n/a queue.join()
695n/a
696n/a for p in workers:
697n/a queue.put(None)
698n/a
699n/a for p in workers:
700n/a p.join()
701n/a
702n/a def test_timeout(self):
703n/a q = multiprocessing.Queue()
704n/a start = time.time()
705n/a self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
706n/a delta = time.time() - start
707n/a self.assertGreaterEqual(delta, 0.19)
708n/a
709n/a#
710n/a#
711n/a#
712n/a
713n/aclass _TestLock(BaseTestCase):
714n/a
715n/a def test_lock(self):
716n/a lock = self.Lock()
717n/a self.assertEqual(lock.acquire(), True)
718n/a self.assertEqual(lock.acquire(False), False)
719n/a self.assertEqual(lock.release(), None)
720n/a self.assertRaises((ValueError, threading.ThreadError), lock.release)
721n/a
722n/a def test_rlock(self):
723n/a lock = self.RLock()
724n/a self.assertEqual(lock.acquire(), True)
725n/a self.assertEqual(lock.acquire(), True)
726n/a self.assertEqual(lock.acquire(), True)
727n/a self.assertEqual(lock.release(), None)
728n/a self.assertEqual(lock.release(), None)
729n/a self.assertEqual(lock.release(), None)
730n/a self.assertRaises((AssertionError, RuntimeError), lock.release)
731n/a
732n/a def test_lock_context(self):
733n/a with self.Lock():
734n/a pass
735n/a
736n/a
737n/aclass _TestSemaphore(BaseTestCase):
738n/a
739n/a def _test_semaphore(self, sem):
740n/a self.assertReturnsIfImplemented(2, get_value, sem)
741n/a self.assertEqual(sem.acquire(), True)
742n/a self.assertReturnsIfImplemented(1, get_value, sem)
743n/a self.assertEqual(sem.acquire(), True)
744n/a self.assertReturnsIfImplemented(0, get_value, sem)
745n/a self.assertEqual(sem.acquire(False), False)
746n/a self.assertReturnsIfImplemented(0, get_value, sem)
747n/a self.assertEqual(sem.release(), None)
748n/a self.assertReturnsIfImplemented(1, get_value, sem)
749n/a self.assertEqual(sem.release(), None)
750n/a self.assertReturnsIfImplemented(2, get_value, sem)
751n/a
752n/a def test_semaphore(self):
753n/a sem = self.Semaphore(2)
754n/a self._test_semaphore(sem)
755n/a self.assertEqual(sem.release(), None)
756n/a self.assertReturnsIfImplemented(3, get_value, sem)
757n/a self.assertEqual(sem.release(), None)
758n/a self.assertReturnsIfImplemented(4, get_value, sem)
759n/a
760n/a def test_bounded_semaphore(self):
761n/a sem = self.BoundedSemaphore(2)
762n/a self._test_semaphore(sem)
763n/a # Currently fails on OS/X
764n/a #if HAVE_GETVALUE:
765n/a # self.assertRaises(ValueError, sem.release)
766n/a # self.assertReturnsIfImplemented(2, get_value, sem)
767n/a
768n/a def test_timeout(self):
769n/a if self.TYPE != 'processes':
770n/a return
771n/a
772n/a sem = self.Semaphore(0)
773n/a acquire = TimingWrapper(sem.acquire)
774n/a
775n/a self.assertEqual(acquire(False), False)
776n/a self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
777n/a
778n/a self.assertEqual(acquire(False, None), False)
779n/a self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
780n/a
781n/a self.assertEqual(acquire(False, TIMEOUT1), False)
782n/a self.assertTimingAlmostEqual(acquire.elapsed, 0)
783n/a
784n/a self.assertEqual(acquire(True, TIMEOUT2), False)
785n/a self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
786n/a
787n/a self.assertEqual(acquire(timeout=TIMEOUT3), False)
788n/a self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
789n/a
790n/a
791n/aclass _TestCondition(BaseTestCase):
792n/a
793n/a @classmethod
794n/a def f(cls, cond, sleeping, woken, timeout=None):
795n/a cond.acquire()
796n/a sleeping.release()
797n/a cond.wait(timeout)
798n/a woken.release()
799n/a cond.release()
800n/a
801n/a def check_invariant(self, cond):
802n/a # this is only supposed to succeed when there are no sleepers
803n/a if self.TYPE == 'processes':
804n/a try:
805n/a sleepers = (cond._sleeping_count.get_value() -
806n/a cond._woken_count.get_value())
807n/a self.assertEqual(sleepers, 0)
808n/a self.assertEqual(cond._wait_semaphore.get_value(), 0)
809n/a except NotImplementedError:
810n/a pass
811n/a
812n/a def test_notify(self):
813n/a cond = self.Condition()
814n/a sleeping = self.Semaphore(0)
815n/a woken = self.Semaphore(0)
816n/a
817n/a p = self.Process(target=self.f, args=(cond, sleeping, woken))
818n/a p.daemon = True
819n/a p.start()
820n/a
821n/a p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
822n/a p.daemon = True
823n/a p.start()
824n/a
825n/a # wait for both children to start sleeping
826n/a sleeping.acquire()
827n/a sleeping.acquire()
828n/a
829n/a # check no process/thread has woken up
830n/a time.sleep(DELTA)
831n/a self.assertReturnsIfImplemented(0, get_value, woken)
832n/a
833n/a # wake up one process/thread
834n/a cond.acquire()
835n/a cond.notify()
836n/a cond.release()
837n/a
838n/a # check one process/thread has woken up
839n/a time.sleep(DELTA)
840n/a self.assertReturnsIfImplemented(1, get_value, woken)
841n/a
842n/a # wake up another
843n/a cond.acquire()
844n/a cond.notify()
845n/a cond.release()
846n/a
847n/a # check other has woken up
848n/a time.sleep(DELTA)
849n/a self.assertReturnsIfImplemented(2, get_value, woken)
850n/a
851n/a # check state is not mucked up
852n/a self.check_invariant(cond)
853n/a p.join()
854n/a
855n/a def test_notify_all(self):
856n/a cond = self.Condition()
857n/a sleeping = self.Semaphore(0)
858n/a woken = self.Semaphore(0)
859n/a
860n/a # start some threads/processes which will timeout
861n/a for i in range(3):
862n/a p = self.Process(target=self.f,
863n/a args=(cond, sleeping, woken, TIMEOUT1))
864n/a p.daemon = True
865n/a p.start()
866n/a
867n/a t = threading.Thread(target=self.f,
868n/a args=(cond, sleeping, woken, TIMEOUT1))
869n/a t.daemon = True
870n/a t.start()
871n/a
872n/a # wait for them all to sleep
873n/a for i in range(6):
874n/a sleeping.acquire()
875n/a
876n/a # check they have all timed out
877n/a for i in range(6):
878n/a woken.acquire()
879n/a self.assertReturnsIfImplemented(0, get_value, woken)
880n/a
881n/a # check state is not mucked up
882n/a self.check_invariant(cond)
883n/a
884n/a # start some more threads/processes
885n/a for i in range(3):
886n/a p = self.Process(target=self.f, args=(cond, sleeping, woken))
887n/a p.daemon = True
888n/a p.start()
889n/a
890n/a t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
891n/a t.daemon = True
892n/a t.start()
893n/a
894n/a # wait for them to all sleep
895n/a for i in range(6):
896n/a sleeping.acquire()
897n/a
898n/a # check no process/thread has woken up
899n/a time.sleep(DELTA)
900n/a self.assertReturnsIfImplemented(0, get_value, woken)
901n/a
902n/a # wake them all up
903n/a cond.acquire()
904n/a cond.notify_all()
905n/a cond.release()
906n/a
907n/a # check they have all woken
908n/a for i in range(10):
909n/a try:
910n/a if get_value(woken) == 6:
911n/a break
912n/a except NotImplementedError:
913n/a break
914n/a time.sleep(DELTA)
915n/a self.assertReturnsIfImplemented(6, get_value, woken)
916n/a
917n/a # check state is not mucked up
918n/a self.check_invariant(cond)
919n/a
920n/a def test_timeout(self):
921n/a cond = self.Condition()
922n/a wait = TimingWrapper(cond.wait)
923n/a cond.acquire()
924n/a res = wait(TIMEOUT1)
925n/a cond.release()
926n/a self.assertEqual(res, False)
927n/a self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
928n/a
929n/a @classmethod
930n/a def _test_waitfor_f(cls, cond, state):
931n/a with cond:
932n/a state.value = 0
933n/a cond.notify()
934n/a result = cond.wait_for(lambda : state.value==4)
935n/a if not result or state.value != 4:
936n/a sys.exit(1)
937n/a
938n/a @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
939n/a def test_waitfor(self):
940n/a # based on test in test/lock_tests.py
941n/a cond = self.Condition()
942n/a state = self.Value('i', -1)
943n/a
944n/a p = self.Process(target=self._test_waitfor_f, args=(cond, state))
945n/a p.daemon = True
946n/a p.start()
947n/a
948n/a with cond:
949n/a result = cond.wait_for(lambda : state.value==0)
950n/a self.assertTrue(result)
951n/a self.assertEqual(state.value, 0)
952n/a
953n/a for i in range(4):
954n/a time.sleep(0.01)
955n/a with cond:
956n/a state.value += 1
957n/a cond.notify()
958n/a
959n/a p.join(5)
960n/a self.assertFalse(p.is_alive())
961n/a self.assertEqual(p.exitcode, 0)
962n/a
963n/a @classmethod
964n/a def _test_waitfor_timeout_f(cls, cond, state, success, sem):
965n/a sem.release()
966n/a with cond:
967n/a expected = 0.1
968n/a dt = time.time()
969n/a result = cond.wait_for(lambda : state.value==4, timeout=expected)
970n/a dt = time.time() - dt
971n/a # borrow logic in assertTimeout() from test/lock_tests.py
972n/a if not result and expected * 0.6 < dt < expected * 10.0:
973n/a success.value = True
974n/a
975n/a @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
976n/a def test_waitfor_timeout(self):
977n/a # based on test in test/lock_tests.py
978n/a cond = self.Condition()
979n/a state = self.Value('i', 0)
980n/a success = self.Value('i', False)
981n/a sem = self.Semaphore(0)
982n/a
983n/a p = self.Process(target=self._test_waitfor_timeout_f,
984n/a args=(cond, state, success, sem))
985n/a p.daemon = True
986n/a p.start()
987n/a self.assertTrue(sem.acquire(timeout=10))
988n/a
989n/a # Only increment 3 times, so state == 4 is never reached.
990n/a for i in range(3):
991n/a time.sleep(0.01)
992n/a with cond:
993n/a state.value += 1
994n/a cond.notify()
995n/a
996n/a p.join(5)
997n/a self.assertTrue(success.value)
998n/a
999n/a @classmethod
1000n/a def _test_wait_result(cls, c, pid):
1001n/a with c:
1002n/a c.notify()
1003n/a time.sleep(1)
1004n/a if pid is not None:
1005n/a os.kill(pid, signal.SIGINT)
1006n/a
1007n/a def test_wait_result(self):
1008n/a if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1009n/a pid = os.getpid()
1010n/a else:
1011n/a pid = None
1012n/a
1013n/a c = self.Condition()
1014n/a with c:
1015n/a self.assertFalse(c.wait(0))
1016n/a self.assertFalse(c.wait(0.1))
1017n/a
1018n/a p = self.Process(target=self._test_wait_result, args=(c, pid))
1019n/a p.start()
1020n/a
1021n/a self.assertTrue(c.wait(10))
1022n/a if pid is not None:
1023n/a self.assertRaises(KeyboardInterrupt, c.wait, 10)
1024n/a
1025n/a p.join()
1026n/a
1027n/a
1028n/aclass _TestEvent(BaseTestCase):
1029n/a
1030n/a @classmethod
1031n/a def _test_event(cls, event):
1032n/a time.sleep(TIMEOUT2)
1033n/a event.set()
1034n/a
1035n/a def test_event(self):
1036n/a event = self.Event()
1037n/a wait = TimingWrapper(event.wait)
1038n/a
1039n/a # Removed temporarily, due to API shear, this does not
1040n/a # work with threading._Event objects. is_set == isSet
1041n/a self.assertEqual(event.is_set(), False)
1042n/a
1043n/a # Removed, threading.Event.wait() will return the value of the __flag
1044n/a # instead of None. API Shear with the semaphore backed mp.Event
1045n/a self.assertEqual(wait(0.0), False)
1046n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1047n/a self.assertEqual(wait(TIMEOUT1), False)
1048n/a self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1049n/a
1050n/a event.set()
1051n/a
1052n/a # See note above on the API differences
1053n/a self.assertEqual(event.is_set(), True)
1054n/a self.assertEqual(wait(), True)
1055n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1056n/a self.assertEqual(wait(TIMEOUT1), True)
1057n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1058n/a # self.assertEqual(event.is_set(), True)
1059n/a
1060n/a event.clear()
1061n/a
1062n/a #self.assertEqual(event.is_set(), False)
1063n/a
1064n/a p = self.Process(target=self._test_event, args=(event,))
1065n/a p.daemon = True
1066n/a p.start()
1067n/a self.assertEqual(wait(), True)
1068n/a
1069n/a#
1070n/a# Tests for Barrier - adapted from tests in test/lock_tests.py
1071n/a#
1072n/a
1073n/a# Many of the tests for threading.Barrier use a list as an atomic
1074n/a# counter: a value is appended to increment the counter, and the
1075n/a# length of the list gives the value. We use the class DummyList
1076n/a# for the same purpose.
1077n/a
1078n/aclass _DummyList(object):
1079n/a
1080n/a def __init__(self):
1081n/a wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1082n/a lock = multiprocessing.Lock()
1083n/a self.__setstate__((wrapper, lock))
1084n/a self._lengthbuf[0] = 0
1085n/a
1086n/a def __setstate__(self, state):
1087n/a (self._wrapper, self._lock) = state
1088n/a self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1089n/a
1090n/a def __getstate__(self):
1091n/a return (self._wrapper, self._lock)
1092n/a
1093n/a def append(self, _):
1094n/a with self._lock:
1095n/a self._lengthbuf[0] += 1
1096n/a
1097n/a def __len__(self):
1098n/a with self._lock:
1099n/a return self._lengthbuf[0]
1100n/a
1101n/adef _wait():
1102n/a # A crude wait/yield function not relying on synchronization primitives.
1103n/a time.sleep(0.01)
1104n/a
1105n/a
1106n/aclass Bunch(object):
1107n/a """
1108n/a A bunch of threads.
1109n/a """
1110n/a def __init__(self, namespace, f, args, n, wait_before_exit=False):
1111n/a """
1112n/a Construct a bunch of `n` threads running the same function `f`.
1113n/a If `wait_before_exit` is True, the threads won't terminate until
1114n/a do_finish() is called.
1115n/a """
1116n/a self.f = f
1117n/a self.args = args
1118n/a self.n = n
1119n/a self.started = namespace.DummyList()
1120n/a self.finished = namespace.DummyList()
1121n/a self._can_exit = namespace.Event()
1122n/a if not wait_before_exit:
1123n/a self._can_exit.set()
1124n/a for i in range(n):
1125n/a p = namespace.Process(target=self.task)
1126n/a p.daemon = True
1127n/a p.start()
1128n/a
1129n/a def task(self):
1130n/a pid = os.getpid()
1131n/a self.started.append(pid)
1132n/a try:
1133n/a self.f(*self.args)
1134n/a finally:
1135n/a self.finished.append(pid)
1136n/a self._can_exit.wait(30)
1137n/a assert self._can_exit.is_set()
1138n/a
1139n/a def wait_for_started(self):
1140n/a while len(self.started) < self.n:
1141n/a _wait()
1142n/a
1143n/a def wait_for_finished(self):
1144n/a while len(self.finished) < self.n:
1145n/a _wait()
1146n/a
1147n/a def do_finish(self):
1148n/a self._can_exit.set()
1149n/a
1150n/a
1151n/aclass AppendTrue(object):
1152n/a def __init__(self, obj):
1153n/a self.obj = obj
1154n/a def __call__(self):
1155n/a self.obj.append(True)
1156n/a
1157n/a
1158n/aclass _TestBarrier(BaseTestCase):
1159n/a """
1160n/a Tests for Barrier objects.
1161n/a """
1162n/a N = 5
1163n/a defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
1164n/a
1165n/a def setUp(self):
1166n/a self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1167n/a
1168n/a def tearDown(self):
1169n/a self.barrier.abort()
1170n/a self.barrier = None
1171n/a
1172n/a def DummyList(self):
1173n/a if self.TYPE == 'threads':
1174n/a return []
1175n/a elif self.TYPE == 'manager':
1176n/a return self.manager.list()
1177n/a else:
1178n/a return _DummyList()
1179n/a
1180n/a def run_threads(self, f, args):
1181n/a b = Bunch(self, f, args, self.N-1)
1182n/a f(*args)
1183n/a b.wait_for_finished()
1184n/a
1185n/a @classmethod
1186n/a def multipass(cls, barrier, results, n):
1187n/a m = barrier.parties
1188n/a assert m == cls.N
1189n/a for i in range(n):
1190n/a results[0].append(True)
1191n/a assert len(results[1]) == i * m
1192n/a barrier.wait()
1193n/a results[1].append(True)
1194n/a assert len(results[0]) == (i + 1) * m
1195n/a barrier.wait()
1196n/a try:
1197n/a assert barrier.n_waiting == 0
1198n/a except NotImplementedError:
1199n/a pass
1200n/a assert not barrier.broken
1201n/a
1202n/a def test_barrier(self, passes=1):
1203n/a """
1204n/a Test that a barrier is passed in lockstep
1205n/a """
1206n/a results = [self.DummyList(), self.DummyList()]
1207n/a self.run_threads(self.multipass, (self.barrier, results, passes))
1208n/a
1209n/a def test_barrier_10(self):
1210n/a """
1211n/a Test that a barrier works for 10 consecutive runs
1212n/a """
1213n/a return self.test_barrier(10)
1214n/a
1215n/a @classmethod
1216n/a def _test_wait_return_f(cls, barrier, queue):
1217n/a res = barrier.wait()
1218n/a queue.put(res)
1219n/a
1220n/a def test_wait_return(self):
1221n/a """
1222n/a test the return value from barrier.wait
1223n/a """
1224n/a queue = self.Queue()
1225n/a self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1226n/a results = [queue.get() for i in range(self.N)]
1227n/a self.assertEqual(results.count(0), 1)
1228n/a
1229n/a @classmethod
1230n/a def _test_action_f(cls, barrier, results):
1231n/a barrier.wait()
1232n/a if len(results) != 1:
1233n/a raise RuntimeError
1234n/a
1235n/a def test_action(self):
1236n/a """
1237n/a Test the 'action' callback
1238n/a """
1239n/a results = self.DummyList()
1240n/a barrier = self.Barrier(self.N, action=AppendTrue(results))
1241n/a self.run_threads(self._test_action_f, (barrier, results))
1242n/a self.assertEqual(len(results), 1)
1243n/a
1244n/a @classmethod
1245n/a def _test_abort_f(cls, barrier, results1, results2):
1246n/a try:
1247n/a i = barrier.wait()
1248n/a if i == cls.N//2:
1249n/a raise RuntimeError
1250n/a barrier.wait()
1251n/a results1.append(True)
1252n/a except threading.BrokenBarrierError:
1253n/a results2.append(True)
1254n/a except RuntimeError:
1255n/a barrier.abort()
1256n/a
1257n/a def test_abort(self):
1258n/a """
1259n/a Test that an abort will put the barrier in a broken state
1260n/a """
1261n/a results1 = self.DummyList()
1262n/a results2 = self.DummyList()
1263n/a self.run_threads(self._test_abort_f,
1264n/a (self.barrier, results1, results2))
1265n/a self.assertEqual(len(results1), 0)
1266n/a self.assertEqual(len(results2), self.N-1)
1267n/a self.assertTrue(self.barrier.broken)
1268n/a
1269n/a @classmethod
1270n/a def _test_reset_f(cls, barrier, results1, results2, results3):
1271n/a i = barrier.wait()
1272n/a if i == cls.N//2:
1273n/a # Wait until the other threads are all in the barrier.
1274n/a while barrier.n_waiting < cls.N-1:
1275n/a time.sleep(0.001)
1276n/a barrier.reset()
1277n/a else:
1278n/a try:
1279n/a barrier.wait()
1280n/a results1.append(True)
1281n/a except threading.BrokenBarrierError:
1282n/a results2.append(True)
1283n/a # Now, pass the barrier again
1284n/a barrier.wait()
1285n/a results3.append(True)
1286n/a
1287n/a def test_reset(self):
1288n/a """
1289n/a Test that a 'reset' on a barrier frees the waiting threads
1290n/a """
1291n/a results1 = self.DummyList()
1292n/a results2 = self.DummyList()
1293n/a results3 = self.DummyList()
1294n/a self.run_threads(self._test_reset_f,
1295n/a (self.barrier, results1, results2, results3))
1296n/a self.assertEqual(len(results1), 0)
1297n/a self.assertEqual(len(results2), self.N-1)
1298n/a self.assertEqual(len(results3), self.N)
1299n/a
1300n/a @classmethod
1301n/a def _test_abort_and_reset_f(cls, barrier, barrier2,
1302n/a results1, results2, results3):
1303n/a try:
1304n/a i = barrier.wait()
1305n/a if i == cls.N//2:
1306n/a raise RuntimeError
1307n/a barrier.wait()
1308n/a results1.append(True)
1309n/a except threading.BrokenBarrierError:
1310n/a results2.append(True)
1311n/a except RuntimeError:
1312n/a barrier.abort()
1313n/a # Synchronize and reset the barrier. Must synchronize first so
1314n/a # that everyone has left it when we reset, and after so that no
1315n/a # one enters it before the reset.
1316n/a if barrier2.wait() == cls.N//2:
1317n/a barrier.reset()
1318n/a barrier2.wait()
1319n/a barrier.wait()
1320n/a results3.append(True)
1321n/a
1322n/a def test_abort_and_reset(self):
1323n/a """
1324n/a Test that a barrier can be reset after being broken.
1325n/a """
1326n/a results1 = self.DummyList()
1327n/a results2 = self.DummyList()
1328n/a results3 = self.DummyList()
1329n/a barrier2 = self.Barrier(self.N)
1330n/a
1331n/a self.run_threads(self._test_abort_and_reset_f,
1332n/a (self.barrier, barrier2, results1, results2, results3))
1333n/a self.assertEqual(len(results1), 0)
1334n/a self.assertEqual(len(results2), self.N-1)
1335n/a self.assertEqual(len(results3), self.N)
1336n/a
1337n/a @classmethod
1338n/a def _test_timeout_f(cls, barrier, results):
1339n/a i = barrier.wait()
1340n/a if i == cls.N//2:
1341n/a # One thread is late!
1342n/a time.sleep(1.0)
1343n/a try:
1344n/a barrier.wait(0.5)
1345n/a except threading.BrokenBarrierError:
1346n/a results.append(True)
1347n/a
1348n/a def test_timeout(self):
1349n/a """
1350n/a Test wait(timeout)
1351n/a """
1352n/a results = self.DummyList()
1353n/a self.run_threads(self._test_timeout_f, (self.barrier, results))
1354n/a self.assertEqual(len(results), self.barrier.parties)
1355n/a
1356n/a @classmethod
1357n/a def _test_default_timeout_f(cls, barrier, results):
1358n/a i = barrier.wait(cls.defaultTimeout)
1359n/a if i == cls.N//2:
1360n/a # One thread is later than the default timeout
1361n/a time.sleep(1.0)
1362n/a try:
1363n/a barrier.wait()
1364n/a except threading.BrokenBarrierError:
1365n/a results.append(True)
1366n/a
1367n/a def test_default_timeout(self):
1368n/a """
1369n/a Test the barrier's default timeout
1370n/a """
1371n/a barrier = self.Barrier(self.N, timeout=0.5)
1372n/a results = self.DummyList()
1373n/a self.run_threads(self._test_default_timeout_f, (barrier, results))
1374n/a self.assertEqual(len(results), barrier.parties)
1375n/a
1376n/a def test_single_thread(self):
1377n/a b = self.Barrier(1)
1378n/a b.wait()
1379n/a b.wait()
1380n/a
1381n/a @classmethod
1382n/a def _test_thousand_f(cls, barrier, passes, conn, lock):
1383n/a for i in range(passes):
1384n/a barrier.wait()
1385n/a with lock:
1386n/a conn.send(i)
1387n/a
1388n/a def test_thousand(self):
1389n/a if self.TYPE == 'manager':
1390n/a return
1391n/a passes = 1000
1392n/a lock = self.Lock()
1393n/a conn, child_conn = self.Pipe(False)
1394n/a for j in range(self.N):
1395n/a p = self.Process(target=self._test_thousand_f,
1396n/a args=(self.barrier, passes, child_conn, lock))
1397n/a p.start()
1398n/a
1399n/a for i in range(passes):
1400n/a for j in range(self.N):
1401n/a self.assertEqual(conn.recv(), i)
1402n/a
1403n/a#
1404n/a#
1405n/a#
1406n/a
1407n/aclass _TestValue(BaseTestCase):
1408n/a
1409n/a ALLOWED_TYPES = ('processes',)
1410n/a
1411n/a codes_values = [
1412n/a ('i', 4343, 24234),
1413n/a ('d', 3.625, -4.25),
1414n/a ('h', -232, 234),
1415n/a ('c', latin('x'), latin('y'))
1416n/a ]
1417n/a
1418n/a def setUp(self):
1419n/a if not HAS_SHAREDCTYPES:
1420n/a self.skipTest("requires multiprocessing.sharedctypes")
1421n/a
1422n/a @classmethod
1423n/a def _test(cls, values):
1424n/a for sv, cv in zip(values, cls.codes_values):
1425n/a sv.value = cv[2]
1426n/a
1427n/a
1428n/a def test_value(self, raw=False):
1429n/a if raw:
1430n/a values = [self.RawValue(code, value)
1431n/a for code, value, _ in self.codes_values]
1432n/a else:
1433n/a values = [self.Value(code, value)
1434n/a for code, value, _ in self.codes_values]
1435n/a
1436n/a for sv, cv in zip(values, self.codes_values):
1437n/a self.assertEqual(sv.value, cv[1])
1438n/a
1439n/a proc = self.Process(target=self._test, args=(values,))
1440n/a proc.daemon = True
1441n/a proc.start()
1442n/a proc.join()
1443n/a
1444n/a for sv, cv in zip(values, self.codes_values):
1445n/a self.assertEqual(sv.value, cv[2])
1446n/a
1447n/a def test_rawvalue(self):
1448n/a self.test_value(raw=True)
1449n/a
1450n/a def test_getobj_getlock(self):
1451n/a val1 = self.Value('i', 5)
1452n/a lock1 = val1.get_lock()
1453n/a obj1 = val1.get_obj()
1454n/a
1455n/a val2 = self.Value('i', 5, lock=None)
1456n/a lock2 = val2.get_lock()
1457n/a obj2 = val2.get_obj()
1458n/a
1459n/a lock = self.Lock()
1460n/a val3 = self.Value('i', 5, lock=lock)
1461n/a lock3 = val3.get_lock()
1462n/a obj3 = val3.get_obj()
1463n/a self.assertEqual(lock, lock3)
1464n/a
1465n/a arr4 = self.Value('i', 5, lock=False)
1466n/a self.assertFalse(hasattr(arr4, 'get_lock'))
1467n/a self.assertFalse(hasattr(arr4, 'get_obj'))
1468n/a
1469n/a self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1470n/a
1471n/a arr5 = self.RawValue('i', 5)
1472n/a self.assertFalse(hasattr(arr5, 'get_lock'))
1473n/a self.assertFalse(hasattr(arr5, 'get_obj'))
1474n/a
1475n/a
1476n/aclass _TestArray(BaseTestCase):
1477n/a
1478n/a ALLOWED_TYPES = ('processes',)
1479n/a
1480n/a @classmethod
1481n/a def f(cls, seq):
1482n/a for i in range(1, len(seq)):
1483n/a seq[i] += seq[i-1]
1484n/a
1485n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1486n/a def test_array(self, raw=False):
1487n/a seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1488n/a if raw:
1489n/a arr = self.RawArray('i', seq)
1490n/a else:
1491n/a arr = self.Array('i', seq)
1492n/a
1493n/a self.assertEqual(len(arr), len(seq))
1494n/a self.assertEqual(arr[3], seq[3])
1495n/a self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1496n/a
1497n/a arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1498n/a
1499n/a self.assertEqual(list(arr[:]), seq)
1500n/a
1501n/a self.f(seq)
1502n/a
1503n/a p = self.Process(target=self.f, args=(arr,))
1504n/a p.daemon = True
1505n/a p.start()
1506n/a p.join()
1507n/a
1508n/a self.assertEqual(list(arr[:]), seq)
1509n/a
1510n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1511n/a def test_array_from_size(self):
1512n/a size = 10
1513n/a # Test for zeroing (see issue #11675).
1514n/a # The repetition below strengthens the test by increasing the chances
1515n/a # of previously allocated non-zero memory being used for the new array
1516n/a # on the 2nd and 3rd loops.
1517n/a for _ in range(3):
1518n/a arr = self.Array('i', size)
1519n/a self.assertEqual(len(arr), size)
1520n/a self.assertEqual(list(arr), [0] * size)
1521n/a arr[:] = range(10)
1522n/a self.assertEqual(list(arr), list(range(10)))
1523n/a del arr
1524n/a
1525n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1526n/a def test_rawarray(self):
1527n/a self.test_array(raw=True)
1528n/a
1529n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1530n/a def test_getobj_getlock_obj(self):
1531n/a arr1 = self.Array('i', list(range(10)))
1532n/a lock1 = arr1.get_lock()
1533n/a obj1 = arr1.get_obj()
1534n/a
1535n/a arr2 = self.Array('i', list(range(10)), lock=None)
1536n/a lock2 = arr2.get_lock()
1537n/a obj2 = arr2.get_obj()
1538n/a
1539n/a lock = self.Lock()
1540n/a arr3 = self.Array('i', list(range(10)), lock=lock)
1541n/a lock3 = arr3.get_lock()
1542n/a obj3 = arr3.get_obj()
1543n/a self.assertEqual(lock, lock3)
1544n/a
1545n/a arr4 = self.Array('i', range(10), lock=False)
1546n/a self.assertFalse(hasattr(arr4, 'get_lock'))
1547n/a self.assertFalse(hasattr(arr4, 'get_obj'))
1548n/a self.assertRaises(AttributeError,
1549n/a self.Array, 'i', range(10), lock='notalock')
1550n/a
1551n/a arr5 = self.RawArray('i', range(10))
1552n/a self.assertFalse(hasattr(arr5, 'get_lock'))
1553n/a self.assertFalse(hasattr(arr5, 'get_obj'))
1554n/a
1555n/a#
1556n/a#
1557n/a#
1558n/a
1559n/aclass _TestContainers(BaseTestCase):
1560n/a
1561n/a ALLOWED_TYPES = ('manager',)
1562n/a
1563n/a def test_list(self):
1564n/a a = self.list(list(range(10)))
1565n/a self.assertEqual(a[:], list(range(10)))
1566n/a
1567n/a b = self.list()
1568n/a self.assertEqual(b[:], [])
1569n/a
1570n/a b.extend(list(range(5)))
1571n/a self.assertEqual(b[:], list(range(5)))
1572n/a
1573n/a self.assertEqual(b[2], 2)
1574n/a self.assertEqual(b[2:10], [2,3,4])
1575n/a
1576n/a b *= 2
1577n/a self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1578n/a
1579n/a self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1580n/a
1581n/a self.assertEqual(a[:], list(range(10)))
1582n/a
1583n/a d = [a, b]
1584n/a e = self.list(d)
1585n/a self.assertEqual(
1586n/a e[:],
1587n/a [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1588n/a )
1589n/a
1590n/a f = self.list([a])
1591n/a a.append('hello')
1592n/a self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1593n/a
1594n/a def test_dict(self):
1595n/a d = self.dict()
1596n/a indices = list(range(65, 70))
1597n/a for i in indices:
1598n/a d[i] = chr(i)
1599n/a self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1600n/a self.assertEqual(sorted(d.keys()), indices)
1601n/a self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1602n/a self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1603n/a
1604n/a def test_namespace(self):
1605n/a n = self.Namespace()
1606n/a n.name = 'Bob'
1607n/a n.job = 'Builder'
1608n/a n._hidden = 'hidden'
1609n/a self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1610n/a del n.job
1611n/a self.assertEqual(str(n), "Namespace(name='Bob')")
1612n/a self.assertTrue(hasattr(n, 'name'))
1613n/a self.assertTrue(not hasattr(n, 'job'))
1614n/a
1615n/a#
1616n/a#
1617n/a#
1618n/a
1619n/adef sqr(x, wait=0.0):
1620n/a time.sleep(wait)
1621n/a return x*x
1622n/a
1623n/adef mul(x, y):
1624n/a return x*y
1625n/a
1626n/aclass _TestPool(BaseTestCase):
1627n/a
1628n/a @classmethod
1629n/a def setUpClass(cls):
1630n/a super().setUpClass()
1631n/a cls.pool = cls.Pool(4)
1632n/a
1633n/a @classmethod
1634n/a def tearDownClass(cls):
1635n/a cls.pool.terminate()
1636n/a cls.pool.join()
1637n/a cls.pool = None
1638n/a super().tearDownClass()
1639n/a
1640n/a def test_apply(self):
1641n/a papply = self.pool.apply
1642n/a self.assertEqual(papply(sqr, (5,)), sqr(5))
1643n/a self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1644n/a
1645n/a def test_map(self):
1646n/a pmap = self.pool.map
1647n/a self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1648n/a self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1649n/a list(map(sqr, list(range(100)))))
1650n/a
1651n/a def test_starmap(self):
1652n/a psmap = self.pool.starmap
1653n/a tuples = list(zip(range(10), range(9,-1, -1)))
1654n/a self.assertEqual(psmap(mul, tuples),
1655n/a list(itertools.starmap(mul, tuples)))
1656n/a tuples = list(zip(range(100), range(99,-1, -1)))
1657n/a self.assertEqual(psmap(mul, tuples, chunksize=20),
1658n/a list(itertools.starmap(mul, tuples)))
1659n/a
1660n/a def test_starmap_async(self):
1661n/a tuples = list(zip(range(100), range(99,-1, -1)))
1662n/a self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1663n/a list(itertools.starmap(mul, tuples)))
1664n/a
1665n/a def test_map_async(self):
1666n/a self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1667n/a list(map(sqr, list(range(10)))))
1668n/a
1669n/a def test_map_async_callbacks(self):
1670n/a call_args = self.manager.list() if self.TYPE == 'manager' else []
1671n/a self.pool.map_async(int, ['1'],
1672n/a callback=call_args.append,
1673n/a error_callback=call_args.append).wait()
1674n/a self.assertEqual(1, len(call_args))
1675n/a self.assertEqual([1], call_args[0])
1676n/a self.pool.map_async(int, ['a'],
1677n/a callback=call_args.append,
1678n/a error_callback=call_args.append).wait()
1679n/a self.assertEqual(2, len(call_args))
1680n/a self.assertIsInstance(call_args[1], ValueError)
1681n/a
1682n/a def test_map_chunksize(self):
1683n/a try:
1684n/a self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1685n/a except multiprocessing.TimeoutError:
1686n/a self.fail("pool.map_async with chunksize stalled on null list")
1687n/a
1688n/a def test_async(self):
1689n/a res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1690n/a get = TimingWrapper(res.get)
1691n/a self.assertEqual(get(), 49)
1692n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1693n/a
1694n/a def test_async_timeout(self):
1695n/a res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1696n/a get = TimingWrapper(res.get)
1697n/a self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1698n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1699n/a
1700n/a def test_imap(self):
1701n/a it = self.pool.imap(sqr, list(range(10)))
1702n/a self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1703n/a
1704n/a it = self.pool.imap(sqr, list(range(10)))
1705n/a for i in range(10):
1706n/a self.assertEqual(next(it), i*i)
1707n/a self.assertRaises(StopIteration, it.__next__)
1708n/a
1709n/a it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1710n/a for i in range(1000):
1711n/a self.assertEqual(next(it), i*i)
1712n/a self.assertRaises(StopIteration, it.__next__)
1713n/a
1714n/a def test_imap_unordered(self):
1715n/a it = self.pool.imap_unordered(sqr, list(range(1000)))
1716n/a self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1717n/a
1718n/a it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1719n/a self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1720n/a
1721n/a def test_make_pool(self):
1722n/a self.assertRaises(ValueError, multiprocessing.Pool, -1)
1723n/a self.assertRaises(ValueError, multiprocessing.Pool, 0)
1724n/a
1725n/a p = multiprocessing.Pool(3)
1726n/a self.assertEqual(3, len(p._pool))
1727n/a p.close()
1728n/a p.join()
1729n/a
1730n/a def test_terminate(self):
1731n/a result = self.pool.map_async(
1732n/a time.sleep, [0.1 for i in range(10000)], chunksize=1
1733n/a )
1734n/a self.pool.terminate()
1735n/a join = TimingWrapper(self.pool.join)
1736n/a join()
1737n/a self.assertLess(join.elapsed, 0.5)
1738n/a
1739n/a def test_empty_iterable(self):
1740n/a # See Issue 12157
1741n/a p = self.Pool(1)
1742n/a
1743n/a self.assertEqual(p.map(sqr, []), [])
1744n/a self.assertEqual(list(p.imap(sqr, [])), [])
1745n/a self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1746n/a self.assertEqual(p.map_async(sqr, []).get(), [])
1747n/a
1748n/a p.close()
1749n/a p.join()
1750n/a
1751n/a def test_context(self):
1752n/a if self.TYPE == 'processes':
1753n/a L = list(range(10))
1754n/a expected = [sqr(i) for i in L]
1755n/a with multiprocessing.Pool(2) as p:
1756n/a r = p.map_async(sqr, L)
1757n/a self.assertEqual(r.get(), expected)
1758n/a self.assertRaises(ValueError, p.map_async, sqr, L)
1759n/a
1760n/a @classmethod
1761n/a def _test_traceback(cls):
1762n/a raise RuntimeError(123) # some comment
1763n/a
1764n/a def test_traceback(self):
1765n/a # We want ensure that the traceback from the child process is
1766n/a # contained in the traceback raised in the main process.
1767n/a if self.TYPE == 'processes':
1768n/a with self.Pool(1) as p:
1769n/a try:
1770n/a p.apply(self._test_traceback)
1771n/a except Exception as e:
1772n/a exc = e
1773n/a else:
1774n/a raise AssertionError('expected RuntimeError')
1775n/a self.assertIs(type(exc), RuntimeError)
1776n/a self.assertEqual(exc.args, (123,))
1777n/a cause = exc.__cause__
1778n/a self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1779n/a self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1780n/a
1781n/a with test.support.captured_stderr() as f1:
1782n/a try:
1783n/a raise exc
1784n/a except RuntimeError:
1785n/a sys.excepthook(*sys.exc_info())
1786n/a self.assertIn('raise RuntimeError(123) # some comment',
1787n/a f1.getvalue())
1788n/a
1789n/adef raising():
1790n/a raise KeyError("key")
1791n/a
1792n/adef unpickleable_result():
1793n/a return lambda: 42
1794n/a
1795n/aclass _TestPoolWorkerErrors(BaseTestCase):
1796n/a ALLOWED_TYPES = ('processes', )
1797n/a
1798n/a def test_async_error_callback(self):
1799n/a p = multiprocessing.Pool(2)
1800n/a
1801n/a scratchpad = [None]
1802n/a def errback(exc):
1803n/a scratchpad[0] = exc
1804n/a
1805n/a res = p.apply_async(raising, error_callback=errback)
1806n/a self.assertRaises(KeyError, res.get)
1807n/a self.assertTrue(scratchpad[0])
1808n/a self.assertIsInstance(scratchpad[0], KeyError)
1809n/a
1810n/a p.close()
1811n/a p.join()
1812n/a
1813n/a def test_unpickleable_result(self):
1814n/a from multiprocessing.pool import MaybeEncodingError
1815n/a p = multiprocessing.Pool(2)
1816n/a
1817n/a # Make sure we don't lose pool processes because of encoding errors.
1818n/a for iteration in range(20):
1819n/a
1820n/a scratchpad = [None]
1821n/a def errback(exc):
1822n/a scratchpad[0] = exc
1823n/a
1824n/a res = p.apply_async(unpickleable_result, error_callback=errback)
1825n/a self.assertRaises(MaybeEncodingError, res.get)
1826n/a wrapped = scratchpad[0]
1827n/a self.assertTrue(wrapped)
1828n/a self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1829n/a self.assertIsNotNone(wrapped.exc)
1830n/a self.assertIsNotNone(wrapped.value)
1831n/a
1832n/a p.close()
1833n/a p.join()
1834n/a
1835n/aclass _TestPoolWorkerLifetime(BaseTestCase):
1836n/a ALLOWED_TYPES = ('processes', )
1837n/a
1838n/a def test_pool_worker_lifetime(self):
1839n/a p = multiprocessing.Pool(3, maxtasksperchild=10)
1840n/a self.assertEqual(3, len(p._pool))
1841n/a origworkerpids = [w.pid for w in p._pool]
1842n/a # Run many tasks so each worker gets replaced (hopefully)
1843n/a results = []
1844n/a for i in range(100):
1845n/a results.append(p.apply_async(sqr, (i, )))
1846n/a # Fetch the results and verify we got the right answers,
1847n/a # also ensuring all the tasks have completed.
1848n/a for (j, res) in enumerate(results):
1849n/a self.assertEqual(res.get(), sqr(j))
1850n/a # Refill the pool
1851n/a p._repopulate_pool()
1852n/a # Wait until all workers are alive
1853n/a # (countdown * DELTA = 5 seconds max startup process time)
1854n/a countdown = 50
1855n/a while countdown and not all(w.is_alive() for w in p._pool):
1856n/a countdown -= 1
1857n/a time.sleep(DELTA)
1858n/a finalworkerpids = [w.pid for w in p._pool]
1859n/a # All pids should be assigned. See issue #7805.
1860n/a self.assertNotIn(None, origworkerpids)
1861n/a self.assertNotIn(None, finalworkerpids)
1862n/a # Finally, check that the worker pids have changed
1863n/a self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1864n/a p.close()
1865n/a p.join()
1866n/a
1867n/a def test_pool_worker_lifetime_early_close(self):
1868n/a # Issue #10332: closing a pool whose workers have limited lifetimes
1869n/a # before all the tasks completed would make join() hang.
1870n/a p = multiprocessing.Pool(3, maxtasksperchild=1)
1871n/a results = []
1872n/a for i in range(6):
1873n/a results.append(p.apply_async(sqr, (i, 0.3)))
1874n/a p.close()
1875n/a p.join()
1876n/a # check the results
1877n/a for (j, res) in enumerate(results):
1878n/a self.assertEqual(res.get(), sqr(j))
1879n/a
1880n/a#
1881n/a# Test of creating a customized manager class
1882n/a#
1883n/a
1884n/afrom multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1885n/a
1886n/aclass FooBar(object):
1887n/a def f(self):
1888n/a return 'f()'
1889n/a def g(self):
1890n/a raise ValueError
1891n/a def _h(self):
1892n/a return '_h()'
1893n/a
1894n/adef baz():
1895n/a for i in range(10):
1896n/a yield i*i
1897n/a
1898n/aclass IteratorProxy(BaseProxy):
1899n/a _exposed_ = ('__next__',)
1900n/a def __iter__(self):
1901n/a return self
1902n/a def __next__(self):
1903n/a return self._callmethod('__next__')
1904n/a
1905n/aclass MyManager(BaseManager):
1906n/a pass
1907n/a
1908n/aMyManager.register('Foo', callable=FooBar)
1909n/aMyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1910n/aMyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1911n/a
1912n/a
1913n/aclass _TestMyManager(BaseTestCase):
1914n/a
1915n/a ALLOWED_TYPES = ('manager',)
1916n/a
1917n/a def test_mymanager(self):
1918n/a manager = MyManager()
1919n/a manager.start()
1920n/a self.common(manager)
1921n/a manager.shutdown()
1922n/a
1923n/a # If the manager process exited cleanly then the exitcode
1924n/a # will be zero. Otherwise (after a short timeout)
1925n/a # terminate() is used, resulting in an exitcode of -SIGTERM.
1926n/a self.assertEqual(manager._process.exitcode, 0)
1927n/a
1928n/a def test_mymanager_context(self):
1929n/a with MyManager() as manager:
1930n/a self.common(manager)
1931n/a self.assertEqual(manager._process.exitcode, 0)
1932n/a
1933n/a def test_mymanager_context_prestarted(self):
1934n/a manager = MyManager()
1935n/a manager.start()
1936n/a with manager:
1937n/a self.common(manager)
1938n/a self.assertEqual(manager._process.exitcode, 0)
1939n/a
1940n/a def common(self, manager):
1941n/a foo = manager.Foo()
1942n/a bar = manager.Bar()
1943n/a baz = manager.baz()
1944n/a
1945n/a foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1946n/a bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1947n/a
1948n/a self.assertEqual(foo_methods, ['f', 'g'])
1949n/a self.assertEqual(bar_methods, ['f', '_h'])
1950n/a
1951n/a self.assertEqual(foo.f(), 'f()')
1952n/a self.assertRaises(ValueError, foo.g)
1953n/a self.assertEqual(foo._callmethod('f'), 'f()')
1954n/a self.assertRaises(RemoteError, foo._callmethod, '_h')
1955n/a
1956n/a self.assertEqual(bar.f(), 'f()')
1957n/a self.assertEqual(bar._h(), '_h()')
1958n/a self.assertEqual(bar._callmethod('f'), 'f()')
1959n/a self.assertEqual(bar._callmethod('_h'), '_h()')
1960n/a
1961n/a self.assertEqual(list(baz), [i*i for i in range(10)])
1962n/a
1963n/a
1964n/a#
1965n/a# Test of connecting to a remote server and using xmlrpclib for serialization
1966n/a#
1967n/a
1968n/a_queue = pyqueue.Queue()
1969n/adef get_queue():
1970n/a return _queue
1971n/a
1972n/aclass QueueManager(BaseManager):
1973n/a '''manager class used by server process'''
1974n/aQueueManager.register('get_queue', callable=get_queue)
1975n/a
1976n/aclass QueueManager2(BaseManager):
1977n/a '''manager class which specifies the same interface as QueueManager'''
1978n/aQueueManager2.register('get_queue')
1979n/a
1980n/a
1981n/aSERIALIZER = 'xmlrpclib'
1982n/a
1983n/aclass _TestRemoteManager(BaseTestCase):
1984n/a
1985n/a ALLOWED_TYPES = ('manager',)
1986n/a
1987n/a @classmethod
1988n/a def _putter(cls, address, authkey):
1989n/a manager = QueueManager2(
1990n/a address=address, authkey=authkey, serializer=SERIALIZER
1991n/a )
1992n/a manager.connect()
1993n/a queue = manager.get_queue()
1994n/a queue.put(('hello world', None, True, 2.25))
1995n/a
1996n/a def test_remote(self):
1997n/a authkey = os.urandom(32)
1998n/a
1999n/a manager = QueueManager(
2000n/a address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
2001n/a )
2002n/a manager.start()
2003n/a
2004n/a p = self.Process(target=self._putter, args=(manager.address, authkey))
2005n/a p.daemon = True
2006n/a p.start()
2007n/a
2008n/a manager2 = QueueManager2(
2009n/a address=manager.address, authkey=authkey, serializer=SERIALIZER
2010n/a )
2011n/a manager2.connect()
2012n/a queue = manager2.get_queue()
2013n/a
2014n/a # Note that xmlrpclib will deserialize object as a list not a tuple
2015n/a self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2016n/a
2017n/a # Because we are using xmlrpclib for serialization instead of
2018n/a # pickle this will cause a serialization error.
2019n/a self.assertRaises(Exception, queue.put, time.sleep)
2020n/a
2021n/a # Make queue finalizer run before the server is stopped
2022n/a del queue
2023n/a manager.shutdown()
2024n/a
2025n/aclass _TestManagerRestart(BaseTestCase):
2026n/a
2027n/a @classmethod
2028n/a def _putter(cls, address, authkey):
2029n/a manager = QueueManager(
2030n/a address=address, authkey=authkey, serializer=SERIALIZER)
2031n/a manager.connect()
2032n/a queue = manager.get_queue()
2033n/a queue.put('hello world')
2034n/a
2035n/a def test_rapid_restart(self):
2036n/a authkey = os.urandom(32)
2037n/a manager = QueueManager(
2038n/a address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
2039n/a srvr = manager.get_server()
2040n/a addr = srvr.address
2041n/a # Close the connection.Listener socket which gets opened as a part
2042n/a # of manager.get_server(). It's not needed for the test.
2043n/a srvr.listener.close()
2044n/a manager.start()
2045n/a
2046n/a p = self.Process(target=self._putter, args=(manager.address, authkey))
2047n/a p.daemon = True
2048n/a p.start()
2049n/a queue = manager.get_queue()
2050n/a self.assertEqual(queue.get(), 'hello world')
2051n/a del queue
2052n/a manager.shutdown()
2053n/a manager = QueueManager(
2054n/a address=addr, authkey=authkey, serializer=SERIALIZER)
2055n/a try:
2056n/a manager.start()
2057n/a except OSError as e:
2058n/a if e.errno != errno.EADDRINUSE:
2059n/a raise
2060n/a # Retry after some time, in case the old socket was lingering
2061n/a # (sporadic failure on buildbots)
2062n/a time.sleep(1.0)
2063n/a manager = QueueManager(
2064n/a address=addr, authkey=authkey, serializer=SERIALIZER)
2065n/a manager.shutdown()
2066n/a
2067n/a#
2068n/a#
2069n/a#
2070n/a
2071n/aSENTINEL = latin('')
2072n/a
2073n/aclass _TestConnection(BaseTestCase):
2074n/a
2075n/a ALLOWED_TYPES = ('processes', 'threads')
2076n/a
2077n/a @classmethod
2078n/a def _echo(cls, conn):
2079n/a for msg in iter(conn.recv_bytes, SENTINEL):
2080n/a conn.send_bytes(msg)
2081n/a conn.close()
2082n/a
2083n/a def test_connection(self):
2084n/a conn, child_conn = self.Pipe()
2085n/a
2086n/a p = self.Process(target=self._echo, args=(child_conn,))
2087n/a p.daemon = True
2088n/a p.start()
2089n/a
2090n/a seq = [1, 2.25, None]
2091n/a msg = latin('hello world')
2092n/a longmsg = msg * 10
2093n/a arr = array.array('i', list(range(4)))
2094n/a
2095n/a if self.TYPE == 'processes':
2096n/a self.assertEqual(type(conn.fileno()), int)
2097n/a
2098n/a self.assertEqual(conn.send(seq), None)
2099n/a self.assertEqual(conn.recv(), seq)
2100n/a
2101n/a self.assertEqual(conn.send_bytes(msg), None)
2102n/a self.assertEqual(conn.recv_bytes(), msg)
2103n/a
2104n/a if self.TYPE == 'processes':
2105n/a buffer = array.array('i', [0]*10)
2106n/a expected = list(arr) + [0] * (10 - len(arr))
2107n/a self.assertEqual(conn.send_bytes(arr), None)
2108n/a self.assertEqual(conn.recv_bytes_into(buffer),
2109n/a len(arr) * buffer.itemsize)
2110n/a self.assertEqual(list(buffer), expected)
2111n/a
2112n/a buffer = array.array('i', [0]*10)
2113n/a expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2114n/a self.assertEqual(conn.send_bytes(arr), None)
2115n/a self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2116n/a len(arr) * buffer.itemsize)
2117n/a self.assertEqual(list(buffer), expected)
2118n/a
2119n/a buffer = bytearray(latin(' ' * 40))
2120n/a self.assertEqual(conn.send_bytes(longmsg), None)
2121n/a try:
2122n/a res = conn.recv_bytes_into(buffer)
2123n/a except multiprocessing.BufferTooShort as e:
2124n/a self.assertEqual(e.args, (longmsg,))
2125n/a else:
2126n/a self.fail('expected BufferTooShort, got %s' % res)
2127n/a
2128n/a poll = TimingWrapper(conn.poll)
2129n/a
2130n/a self.assertEqual(poll(), False)
2131n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2132n/a
2133n/a self.assertEqual(poll(-1), False)
2134n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2135n/a
2136n/a self.assertEqual(poll(TIMEOUT1), False)
2137n/a self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2138n/a
2139n/a conn.send(None)
2140n/a time.sleep(.1)
2141n/a
2142n/a self.assertEqual(poll(TIMEOUT1), True)
2143n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2144n/a
2145n/a self.assertEqual(conn.recv(), None)
2146n/a
2147n/a really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2148n/a conn.send_bytes(really_big_msg)
2149n/a self.assertEqual(conn.recv_bytes(), really_big_msg)
2150n/a
2151n/a conn.send_bytes(SENTINEL) # tell child to quit
2152n/a child_conn.close()
2153n/a
2154n/a if self.TYPE == 'processes':
2155n/a self.assertEqual(conn.readable, True)
2156n/a self.assertEqual(conn.writable, True)
2157n/a self.assertRaises(EOFError, conn.recv)
2158n/a self.assertRaises(EOFError, conn.recv_bytes)
2159n/a
2160n/a p.join()
2161n/a
2162n/a def test_duplex_false(self):
2163n/a reader, writer = self.Pipe(duplex=False)
2164n/a self.assertEqual(writer.send(1), None)
2165n/a self.assertEqual(reader.recv(), 1)
2166n/a if self.TYPE == 'processes':
2167n/a self.assertEqual(reader.readable, True)
2168n/a self.assertEqual(reader.writable, False)
2169n/a self.assertEqual(writer.readable, False)
2170n/a self.assertEqual(writer.writable, True)
2171n/a self.assertRaises(OSError, reader.send, 2)
2172n/a self.assertRaises(OSError, writer.recv)
2173n/a self.assertRaises(OSError, writer.poll)
2174n/a
2175n/a def test_spawn_close(self):
2176n/a # We test that a pipe connection can be closed by parent
2177n/a # process immediately after child is spawned. On Windows this
2178n/a # would have sometimes failed on old versions because
2179n/a # child_conn would be closed before the child got a chance to
2180n/a # duplicate it.
2181n/a conn, child_conn = self.Pipe()
2182n/a
2183n/a p = self.Process(target=self._echo, args=(child_conn,))
2184n/a p.daemon = True
2185n/a p.start()
2186n/a child_conn.close() # this might complete before child initializes
2187n/a
2188n/a msg = latin('hello')
2189n/a conn.send_bytes(msg)
2190n/a self.assertEqual(conn.recv_bytes(), msg)
2191n/a
2192n/a conn.send_bytes(SENTINEL)
2193n/a conn.close()
2194n/a p.join()
2195n/a
2196n/a def test_sendbytes(self):
2197n/a if self.TYPE != 'processes':
2198n/a return
2199n/a
2200n/a msg = latin('abcdefghijklmnopqrstuvwxyz')
2201n/a a, b = self.Pipe()
2202n/a
2203n/a a.send_bytes(msg)
2204n/a self.assertEqual(b.recv_bytes(), msg)
2205n/a
2206n/a a.send_bytes(msg, 5)
2207n/a self.assertEqual(b.recv_bytes(), msg[5:])
2208n/a
2209n/a a.send_bytes(msg, 7, 8)
2210n/a self.assertEqual(b.recv_bytes(), msg[7:7+8])
2211n/a
2212n/a a.send_bytes(msg, 26)
2213n/a self.assertEqual(b.recv_bytes(), latin(''))
2214n/a
2215n/a a.send_bytes(msg, 26, 0)
2216n/a self.assertEqual(b.recv_bytes(), latin(''))
2217n/a
2218n/a self.assertRaises(ValueError, a.send_bytes, msg, 27)
2219n/a
2220n/a self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2221n/a
2222n/a self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2223n/a
2224n/a self.assertRaises(ValueError, a.send_bytes, msg, -1)
2225n/a
2226n/a self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2227n/a
2228n/a @classmethod
2229n/a def _is_fd_assigned(cls, fd):
2230n/a try:
2231n/a os.fstat(fd)
2232n/a except OSError as e:
2233n/a if e.errno == errno.EBADF:
2234n/a return False
2235n/a raise
2236n/a else:
2237n/a return True
2238n/a
2239n/a @classmethod
2240n/a def _writefd(cls, conn, data, create_dummy_fds=False):
2241n/a if create_dummy_fds:
2242n/a for i in range(0, 256):
2243n/a if not cls._is_fd_assigned(i):
2244n/a os.dup2(conn.fileno(), i)
2245n/a fd = reduction.recv_handle(conn)
2246n/a if msvcrt:
2247n/a fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2248n/a os.write(fd, data)
2249n/a os.close(fd)
2250n/a
2251n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2252n/a def test_fd_transfer(self):
2253n/a if self.TYPE != 'processes':
2254n/a self.skipTest("only makes sense with processes")
2255n/a conn, child_conn = self.Pipe(duplex=True)
2256n/a
2257n/a p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
2258n/a p.daemon = True
2259n/a p.start()
2260n/a self.addCleanup(test.support.unlink, test.support.TESTFN)
2261n/a with open(test.support.TESTFN, "wb") as f:
2262n/a fd = f.fileno()
2263n/a if msvcrt:
2264n/a fd = msvcrt.get_osfhandle(fd)
2265n/a reduction.send_handle(conn, fd, p.pid)
2266n/a p.join()
2267n/a with open(test.support.TESTFN, "rb") as f:
2268n/a self.assertEqual(f.read(), b"foo")
2269n/a
2270n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2271n/a @unittest.skipIf(sys.platform == "win32",
2272n/a "test semantics don't make sense on Windows")
2273n/a @unittest.skipIf(MAXFD <= 256,
2274n/a "largest assignable fd number is too small")
2275n/a @unittest.skipUnless(hasattr(os, "dup2"),
2276n/a "test needs os.dup2()")
2277n/a def test_large_fd_transfer(self):
2278n/a # With fd > 256 (issue #11657)
2279n/a if self.TYPE != 'processes':
2280n/a self.skipTest("only makes sense with processes")
2281n/a conn, child_conn = self.Pipe(duplex=True)
2282n/a
2283n/a p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
2284n/a p.daemon = True
2285n/a p.start()
2286n/a self.addCleanup(test.support.unlink, test.support.TESTFN)
2287n/a with open(test.support.TESTFN, "wb") as f:
2288n/a fd = f.fileno()
2289n/a for newfd in range(256, MAXFD):
2290n/a if not self._is_fd_assigned(newfd):
2291n/a break
2292n/a else:
2293n/a self.fail("could not find an unassigned large file descriptor")
2294n/a os.dup2(fd, newfd)
2295n/a try:
2296n/a reduction.send_handle(conn, newfd, p.pid)
2297n/a finally:
2298n/a os.close(newfd)
2299n/a p.join()
2300n/a with open(test.support.TESTFN, "rb") as f:
2301n/a self.assertEqual(f.read(), b"bar")
2302n/a
2303n/a @classmethod
2304n/a def _send_data_without_fd(self, conn):
2305n/a os.write(conn.fileno(), b"\0")
2306n/a
2307n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2308n/a @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2309n/a def test_missing_fd_transfer(self):
2310n/a # Check that exception is raised when received data is not
2311n/a # accompanied by a file descriptor in ancillary data.
2312n/a if self.TYPE != 'processes':
2313n/a self.skipTest("only makes sense with processes")
2314n/a conn, child_conn = self.Pipe(duplex=True)
2315n/a
2316n/a p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2317n/a p.daemon = True
2318n/a p.start()
2319n/a self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2320n/a p.join()
2321n/a
2322n/a def test_context(self):
2323n/a a, b = self.Pipe()
2324n/a
2325n/a with a, b:
2326n/a a.send(1729)
2327n/a self.assertEqual(b.recv(), 1729)
2328n/a if self.TYPE == 'processes':
2329n/a self.assertFalse(a.closed)
2330n/a self.assertFalse(b.closed)
2331n/a
2332n/a if self.TYPE == 'processes':
2333n/a self.assertTrue(a.closed)
2334n/a self.assertTrue(b.closed)
2335n/a self.assertRaises(OSError, a.recv)
2336n/a self.assertRaises(OSError, b.recv)
2337n/a
2338n/aclass _TestListener(BaseTestCase):
2339n/a
2340n/a ALLOWED_TYPES = ('processes',)
2341n/a
2342n/a def test_multiple_bind(self):
2343n/a for family in self.connection.families:
2344n/a l = self.connection.Listener(family=family)
2345n/a self.addCleanup(l.close)
2346n/a self.assertRaises(OSError, self.connection.Listener,
2347n/a l.address, family)
2348n/a
2349n/a def test_context(self):
2350n/a with self.connection.Listener() as l:
2351n/a with self.connection.Client(l.address) as c:
2352n/a with l.accept() as d:
2353n/a c.send(1729)
2354n/a self.assertEqual(d.recv(), 1729)
2355n/a
2356n/a if self.TYPE == 'processes':
2357n/a self.assertRaises(OSError, l.accept)
2358n/a
2359n/aclass _TestListenerClient(BaseTestCase):
2360n/a
2361n/a ALLOWED_TYPES = ('processes', 'threads')
2362n/a
2363n/a @classmethod
2364n/a def _test(cls, address):
2365n/a conn = cls.connection.Client(address)
2366n/a conn.send('hello')
2367n/a conn.close()
2368n/a
2369n/a def test_listener_client(self):
2370n/a for family in self.connection.families:
2371n/a l = self.connection.Listener(family=family)
2372n/a p = self.Process(target=self._test, args=(l.address,))
2373n/a p.daemon = True
2374n/a p.start()
2375n/a conn = l.accept()
2376n/a self.assertEqual(conn.recv(), 'hello')
2377n/a p.join()
2378n/a l.close()
2379n/a
2380n/a def test_issue14725(self):
2381n/a l = self.connection.Listener()
2382n/a p = self.Process(target=self._test, args=(l.address,))
2383n/a p.daemon = True
2384n/a p.start()
2385n/a time.sleep(1)
2386n/a # On Windows the client process should by now have connected,
2387n/a # written data and closed the pipe handle by now. This causes
2388n/a # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2389n/a # 14725.
2390n/a conn = l.accept()
2391n/a self.assertEqual(conn.recv(), 'hello')
2392n/a conn.close()
2393n/a p.join()
2394n/a l.close()
2395n/a
2396n/a def test_issue16955(self):
2397n/a for fam in self.connection.families:
2398n/a l = self.connection.Listener(family=fam)
2399n/a c = self.connection.Client(l.address)
2400n/a a = l.accept()
2401n/a a.send_bytes(b"hello")
2402n/a self.assertTrue(c.poll(1))
2403n/a a.close()
2404n/a c.close()
2405n/a l.close()
2406n/a
2407n/aclass _TestPoll(BaseTestCase):
2408n/a
2409n/a ALLOWED_TYPES = ('processes', 'threads')
2410n/a
2411n/a def test_empty_string(self):
2412n/a a, b = self.Pipe()
2413n/a self.assertEqual(a.poll(), False)
2414n/a b.send_bytes(b'')
2415n/a self.assertEqual(a.poll(), True)
2416n/a self.assertEqual(a.poll(), True)
2417n/a
2418n/a @classmethod
2419n/a def _child_strings(cls, conn, strings):
2420n/a for s in strings:
2421n/a time.sleep(0.1)
2422n/a conn.send_bytes(s)
2423n/a conn.close()
2424n/a
2425n/a def test_strings(self):
2426n/a strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2427n/a a, b = self.Pipe()
2428n/a p = self.Process(target=self._child_strings, args=(b, strings))
2429n/a p.start()
2430n/a
2431n/a for s in strings:
2432n/a for i in range(200):
2433n/a if a.poll(0.01):
2434n/a break
2435n/a x = a.recv_bytes()
2436n/a self.assertEqual(s, x)
2437n/a
2438n/a p.join()
2439n/a
2440n/a @classmethod
2441n/a def _child_boundaries(cls, r):
2442n/a # Polling may "pull" a message in to the child process, but we
2443n/a # don't want it to pull only part of a message, as that would
2444n/a # corrupt the pipe for any other processes which might later
2445n/a # read from it.
2446n/a r.poll(5)
2447n/a
2448n/a def test_boundaries(self):
2449n/a r, w = self.Pipe(False)
2450n/a p = self.Process(target=self._child_boundaries, args=(r,))
2451n/a p.start()
2452n/a time.sleep(2)
2453n/a L = [b"first", b"second"]
2454n/a for obj in L:
2455n/a w.send_bytes(obj)
2456n/a w.close()
2457n/a p.join()
2458n/a self.assertIn(r.recv_bytes(), L)
2459n/a
2460n/a @classmethod
2461n/a def _child_dont_merge(cls, b):
2462n/a b.send_bytes(b'a')
2463n/a b.send_bytes(b'b')
2464n/a b.send_bytes(b'cd')
2465n/a
2466n/a def test_dont_merge(self):
2467n/a a, b = self.Pipe()
2468n/a self.assertEqual(a.poll(0.0), False)
2469n/a self.assertEqual(a.poll(0.1), False)
2470n/a
2471n/a p = self.Process(target=self._child_dont_merge, args=(b,))
2472n/a p.start()
2473n/a
2474n/a self.assertEqual(a.recv_bytes(), b'a')
2475n/a self.assertEqual(a.poll(1.0), True)
2476n/a self.assertEqual(a.poll(1.0), True)
2477n/a self.assertEqual(a.recv_bytes(), b'b')
2478n/a self.assertEqual(a.poll(1.0), True)
2479n/a self.assertEqual(a.poll(1.0), True)
2480n/a self.assertEqual(a.poll(0.0), True)
2481n/a self.assertEqual(a.recv_bytes(), b'cd')
2482n/a
2483n/a p.join()
2484n/a
2485n/a#
2486n/a# Test of sending connection and socket objects between processes
2487n/a#
2488n/a
2489n/a@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2490n/aclass _TestPicklingConnections(BaseTestCase):
2491n/a
2492n/a ALLOWED_TYPES = ('processes',)
2493n/a
2494n/a @classmethod
2495n/a def tearDownClass(cls):
2496n/a from multiprocessing.reduction import resource_sharer
2497n/a resource_sharer.stop(timeout=5)
2498n/a
2499n/a @classmethod
2500n/a def _listener(cls, conn, families):
2501n/a for fam in families:
2502n/a l = cls.connection.Listener(family=fam)
2503n/a conn.send(l.address)
2504n/a new_conn = l.accept()
2505n/a conn.send(new_conn)
2506n/a new_conn.close()
2507n/a l.close()
2508n/a
2509n/a l = socket.socket()
2510n/a l.bind(('localhost', 0))
2511n/a l.listen(1)
2512n/a conn.send(l.getsockname())
2513n/a new_conn, addr = l.accept()
2514n/a conn.send(new_conn)
2515n/a new_conn.close()
2516n/a l.close()
2517n/a
2518n/a conn.recv()
2519n/a
2520n/a @classmethod
2521n/a def _remote(cls, conn):
2522n/a for (address, msg) in iter(conn.recv, None):
2523n/a client = cls.connection.Client(address)
2524n/a client.send(msg.upper())
2525n/a client.close()
2526n/a
2527n/a address, msg = conn.recv()
2528n/a client = socket.socket()
2529n/a client.connect(address)
2530n/a client.sendall(msg.upper())
2531n/a client.close()
2532n/a
2533n/a conn.close()
2534n/a
2535n/a def test_pickling(self):
2536n/a families = self.connection.families
2537n/a
2538n/a lconn, lconn0 = self.Pipe()
2539n/a lp = self.Process(target=self._listener, args=(lconn0, families))
2540n/a lp.daemon = True
2541n/a lp.start()
2542n/a lconn0.close()
2543n/a
2544n/a rconn, rconn0 = self.Pipe()
2545n/a rp = self.Process(target=self._remote, args=(rconn0,))
2546n/a rp.daemon = True
2547n/a rp.start()
2548n/a rconn0.close()
2549n/a
2550n/a for fam in families:
2551n/a msg = ('This connection uses family %s' % fam).encode('ascii')
2552n/a address = lconn.recv()
2553n/a rconn.send((address, msg))
2554n/a new_conn = lconn.recv()
2555n/a self.assertEqual(new_conn.recv(), msg.upper())
2556n/a
2557n/a rconn.send(None)
2558n/a
2559n/a msg = latin('This connection uses a normal socket')
2560n/a address = lconn.recv()
2561n/a rconn.send((address, msg))
2562n/a new_conn = lconn.recv()
2563n/a buf = []
2564n/a while True:
2565n/a s = new_conn.recv(100)
2566n/a if not s:
2567n/a break
2568n/a buf.append(s)
2569n/a buf = b''.join(buf)
2570n/a self.assertEqual(buf, msg.upper())
2571n/a new_conn.close()
2572n/a
2573n/a lconn.send(None)
2574n/a
2575n/a rconn.close()
2576n/a lconn.close()
2577n/a
2578n/a lp.join()
2579n/a rp.join()
2580n/a
2581n/a @classmethod
2582n/a def child_access(cls, conn):
2583n/a w = conn.recv()
2584n/a w.send('all is well')
2585n/a w.close()
2586n/a
2587n/a r = conn.recv()
2588n/a msg = r.recv()
2589n/a conn.send(msg*2)
2590n/a
2591n/a conn.close()
2592n/a
2593n/a def test_access(self):
2594n/a # On Windows, if we do not specify a destination pid when
2595n/a # using DupHandle then we need to be careful to use the
2596n/a # correct access flags for DuplicateHandle(), or else
2597n/a # DupHandle.detach() will raise PermissionError. For example,
2598n/a # for a read only pipe handle we should use
2599n/a # access=FILE_GENERIC_READ. (Unfortunately
2600n/a # DUPLICATE_SAME_ACCESS does not work.)
2601n/a conn, child_conn = self.Pipe()
2602n/a p = self.Process(target=self.child_access, args=(child_conn,))
2603n/a p.daemon = True
2604n/a p.start()
2605n/a child_conn.close()
2606n/a
2607n/a r, w = self.Pipe(duplex=False)
2608n/a conn.send(w)
2609n/a w.close()
2610n/a self.assertEqual(r.recv(), 'all is well')
2611n/a r.close()
2612n/a
2613n/a r, w = self.Pipe(duplex=False)
2614n/a conn.send(r)
2615n/a r.close()
2616n/a w.send('foobar')
2617n/a w.close()
2618n/a self.assertEqual(conn.recv(), 'foobar'*2)
2619n/a
2620n/a#
2621n/a#
2622n/a#
2623n/a
2624n/aclass _TestHeap(BaseTestCase):
2625n/a
2626n/a ALLOWED_TYPES = ('processes',)
2627n/a
2628n/a def test_heap(self):
2629n/a iterations = 5000
2630n/a maxblocks = 50
2631n/a blocks = []
2632n/a
2633n/a # create and destroy lots of blocks of different sizes
2634n/a for i in range(iterations):
2635n/a size = int(random.lognormvariate(0, 1) * 1000)
2636n/a b = multiprocessing.heap.BufferWrapper(size)
2637n/a blocks.append(b)
2638n/a if len(blocks) > maxblocks:
2639n/a i = random.randrange(maxblocks)
2640n/a del blocks[i]
2641n/a
2642n/a # get the heap object
2643n/a heap = multiprocessing.heap.BufferWrapper._heap
2644n/a
2645n/a # verify the state of the heap
2646n/a all = []
2647n/a occupied = 0
2648n/a heap._lock.acquire()
2649n/a self.addCleanup(heap._lock.release)
2650n/a for L in list(heap._len_to_seq.values()):
2651n/a for arena, start, stop in L:
2652n/a all.append((heap._arenas.index(arena), start, stop,
2653n/a stop-start, 'free'))
2654n/a for arena, start, stop in heap._allocated_blocks:
2655n/a all.append((heap._arenas.index(arena), start, stop,
2656n/a stop-start, 'occupied'))
2657n/a occupied += (stop-start)
2658n/a
2659n/a all.sort()
2660n/a
2661n/a for i in range(len(all)-1):
2662n/a (arena, start, stop) = all[i][:3]
2663n/a (narena, nstart, nstop) = all[i+1][:3]
2664n/a self.assertTrue((arena != narena and nstart == 0) or
2665n/a (stop == nstart))
2666n/a
2667n/a def test_free_from_gc(self):
2668n/a # Check that freeing of blocks by the garbage collector doesn't deadlock
2669n/a # (issue #12352).
2670n/a # Make sure the GC is enabled, and set lower collection thresholds to
2671n/a # make collections more frequent (and increase the probability of
2672n/a # deadlock).
2673n/a if not gc.isenabled():
2674n/a gc.enable()
2675n/a self.addCleanup(gc.disable)
2676n/a thresholds = gc.get_threshold()
2677n/a self.addCleanup(gc.set_threshold, *thresholds)
2678n/a gc.set_threshold(10)
2679n/a
2680n/a # perform numerous block allocations, with cyclic references to make
2681n/a # sure objects are collected asynchronously by the gc
2682n/a for i in range(5000):
2683n/a a = multiprocessing.heap.BufferWrapper(1)
2684n/a b = multiprocessing.heap.BufferWrapper(1)
2685n/a # circular references
2686n/a a.buddy = b
2687n/a b.buddy = a
2688n/a
2689n/a#
2690n/a#
2691n/a#
2692n/a
2693n/aclass _Foo(Structure):
2694n/a _fields_ = [
2695n/a ('x', c_int),
2696n/a ('y', c_double)
2697n/a ]
2698n/a
2699n/aclass _TestSharedCTypes(BaseTestCase):
2700n/a
2701n/a ALLOWED_TYPES = ('processes',)
2702n/a
2703n/a def setUp(self):
2704n/a if not HAS_SHAREDCTYPES:
2705n/a self.skipTest("requires multiprocessing.sharedctypes")
2706n/a
2707n/a @classmethod
2708n/a def _double(cls, x, y, foo, arr, string):
2709n/a x.value *= 2
2710n/a y.value *= 2
2711n/a foo.x *= 2
2712n/a foo.y *= 2
2713n/a string.value *= 2
2714n/a for i in range(len(arr)):
2715n/a arr[i] *= 2
2716n/a
2717n/a def test_sharedctypes(self, lock=False):
2718n/a x = Value('i', 7, lock=lock)
2719n/a y = Value(c_double, 1.0/3.0, lock=lock)
2720n/a foo = Value(_Foo, 3, 2, lock=lock)
2721n/a arr = self.Array('d', list(range(10)), lock=lock)
2722n/a string = self.Array('c', 20, lock=lock)
2723n/a string.value = latin('hello')
2724n/a
2725n/a p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2726n/a p.daemon = True
2727n/a p.start()
2728n/a p.join()
2729n/a
2730n/a self.assertEqual(x.value, 14)
2731n/a self.assertAlmostEqual(y.value, 2.0/3.0)
2732n/a self.assertEqual(foo.x, 6)
2733n/a self.assertAlmostEqual(foo.y, 4.0)
2734n/a for i in range(10):
2735n/a self.assertAlmostEqual(arr[i], i*2)
2736n/a self.assertEqual(string.value, latin('hellohello'))
2737n/a
2738n/a def test_synchronize(self):
2739n/a self.test_sharedctypes(lock=True)
2740n/a
2741n/a def test_copy(self):
2742n/a foo = _Foo(2, 5.0)
2743n/a bar = copy(foo)
2744n/a foo.x = 0
2745n/a foo.y = 0
2746n/a self.assertEqual(bar.x, 2)
2747n/a self.assertAlmostEqual(bar.y, 5.0)
2748n/a
2749n/a#
2750n/a#
2751n/a#
2752n/a
2753n/aclass _TestFinalize(BaseTestCase):
2754n/a
2755n/a ALLOWED_TYPES = ('processes',)
2756n/a
2757n/a @classmethod
2758n/a def _test_finalize(cls, conn):
2759n/a class Foo(object):
2760n/a pass
2761n/a
2762n/a a = Foo()
2763n/a util.Finalize(a, conn.send, args=('a',))
2764n/a del a # triggers callback for a
2765n/a
2766n/a b = Foo()
2767n/a close_b = util.Finalize(b, conn.send, args=('b',))
2768n/a close_b() # triggers callback for b
2769n/a close_b() # does nothing because callback has already been called
2770n/a del b # does nothing because callback has already been called
2771n/a
2772n/a c = Foo()
2773n/a util.Finalize(c, conn.send, args=('c',))
2774n/a
2775n/a d10 = Foo()
2776n/a util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2777n/a
2778n/a d01 = Foo()
2779n/a util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2780n/a d02 = Foo()
2781n/a util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2782n/a d03 = Foo()
2783n/a util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2784n/a
2785n/a util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2786n/a
2787n/a util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2788n/a
2789n/a # call multiprocessing's cleanup function then exit process without
2790n/a # garbage collecting locals
2791n/a util._exit_function()
2792n/a conn.close()
2793n/a os._exit(0)
2794n/a
2795n/a def test_finalize(self):
2796n/a conn, child_conn = self.Pipe()
2797n/a
2798n/a p = self.Process(target=self._test_finalize, args=(child_conn,))
2799n/a p.daemon = True
2800n/a p.start()
2801n/a p.join()
2802n/a
2803n/a result = [obj for obj in iter(conn.recv, 'STOP')]
2804n/a self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2805n/a
2806n/a#
2807n/a# Test that from ... import * works for each module
2808n/a#
2809n/a
2810n/aclass _TestImportStar(BaseTestCase):
2811n/a
2812n/a ALLOWED_TYPES = ('processes',)
2813n/a
2814n/a def test_import(self):
2815n/a modules = [
2816n/a 'multiprocessing', 'multiprocessing.connection',
2817n/a 'multiprocessing.heap', 'multiprocessing.managers',
2818n/a 'multiprocessing.pool', 'multiprocessing.process',
2819n/a 'multiprocessing.synchronize', 'multiprocessing.util'
2820n/a ]
2821n/a
2822n/a if HAS_REDUCTION:
2823n/a modules.append('multiprocessing.reduction')
2824n/a
2825n/a if c_int is not None:
2826n/a # This module requires _ctypes
2827n/a modules.append('multiprocessing.sharedctypes')
2828n/a
2829n/a for name in modules:
2830n/a __import__(name)
2831n/a mod = sys.modules[name]
2832n/a
2833n/a for attr in getattr(mod, '__all__', ()):
2834n/a self.assertTrue(
2835n/a hasattr(mod, attr),
2836n/a '%r does not have attribute %r' % (mod, attr)
2837n/a )
2838n/a
2839n/a#
2840n/a# Quick test that logging works -- does not test logging output
2841n/a#
2842n/a
2843n/aclass _TestLogging(BaseTestCase):
2844n/a
2845n/a ALLOWED_TYPES = ('processes',)
2846n/a
2847n/a def test_enable_logging(self):
2848n/a logger = multiprocessing.get_logger()
2849n/a logger.setLevel(util.SUBWARNING)
2850n/a self.assertTrue(logger is not None)
2851n/a logger.debug('this will not be printed')
2852n/a logger.info('nor will this')
2853n/a logger.setLevel(LOG_LEVEL)
2854n/a
2855n/a @classmethod
2856n/a def _test_level(cls, conn):
2857n/a logger = multiprocessing.get_logger()
2858n/a conn.send(logger.getEffectiveLevel())
2859n/a
2860n/a def test_level(self):
2861n/a LEVEL1 = 32
2862n/a LEVEL2 = 37
2863n/a
2864n/a logger = multiprocessing.get_logger()
2865n/a root_logger = logging.getLogger()
2866n/a root_level = root_logger.level
2867n/a
2868n/a reader, writer = multiprocessing.Pipe(duplex=False)
2869n/a
2870n/a logger.setLevel(LEVEL1)
2871n/a p = self.Process(target=self._test_level, args=(writer,))
2872n/a p.daemon = True
2873n/a p.start()
2874n/a self.assertEqual(LEVEL1, reader.recv())
2875n/a
2876n/a logger.setLevel(logging.NOTSET)
2877n/a root_logger.setLevel(LEVEL2)
2878n/a p = self.Process(target=self._test_level, args=(writer,))
2879n/a p.daemon = True
2880n/a p.start()
2881n/a self.assertEqual(LEVEL2, reader.recv())
2882n/a
2883n/a root_logger.setLevel(root_level)
2884n/a logger.setLevel(level=LOG_LEVEL)
2885n/a
2886n/a
2887n/a# class _TestLoggingProcessName(BaseTestCase):
2888n/a#
2889n/a# def handle(self, record):
2890n/a# assert record.processName == multiprocessing.current_process().name
2891n/a# self.__handled = True
2892n/a#
2893n/a# def test_logging(self):
2894n/a# handler = logging.Handler()
2895n/a# handler.handle = self.handle
2896n/a# self.__handled = False
2897n/a# # Bypass getLogger() and side-effects
2898n/a# logger = logging.getLoggerClass()(
2899n/a# 'multiprocessing.test.TestLoggingProcessName')
2900n/a# logger.addHandler(handler)
2901n/a# logger.propagate = False
2902n/a#
2903n/a# logger.warn('foo')
2904n/a# assert self.__handled
2905n/a
2906n/a#
2907n/a# Check that Process.join() retries if os.waitpid() fails with EINTR
2908n/a#
2909n/a
2910n/aclass _TestPollEintr(BaseTestCase):
2911n/a
2912n/a ALLOWED_TYPES = ('processes',)
2913n/a
2914n/a @classmethod
2915n/a def _killer(cls, pid):
2916n/a time.sleep(0.5)
2917n/a os.kill(pid, signal.SIGUSR1)
2918n/a
2919n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2920n/a def test_poll_eintr(self):
2921n/a got_signal = [False]
2922n/a def record(*args):
2923n/a got_signal[0] = True
2924n/a pid = os.getpid()
2925n/a oldhandler = signal.signal(signal.SIGUSR1, record)
2926n/a try:
2927n/a killer = self.Process(target=self._killer, args=(pid,))
2928n/a killer.start()
2929n/a p = self.Process(target=time.sleep, args=(1,))
2930n/a p.start()
2931n/a p.join()
2932n/a self.assertTrue(got_signal[0])
2933n/a self.assertEqual(p.exitcode, 0)
2934n/a killer.join()
2935n/a finally:
2936n/a signal.signal(signal.SIGUSR1, oldhandler)
2937n/a
2938n/a#
2939n/a# Test to verify handle verification, see issue 3321
2940n/a#
2941n/a
2942n/aclass TestInvalidHandle(unittest.TestCase):
2943n/a
2944n/a @unittest.skipIf(WIN32, "skipped on Windows")
2945n/a def test_invalid_handles(self):
2946n/a conn = multiprocessing.connection.Connection(44977608)
2947n/a try:
2948n/a self.assertRaises((ValueError, OSError), conn.poll)
2949n/a finally:
2950n/a # Hack private attribute _handle to avoid printing an error
2951n/a # in conn.__del__
2952n/a conn._handle = None
2953n/a self.assertRaises((ValueError, OSError),
2954n/a multiprocessing.connection.Connection, -1)
2955n/a
2956n/a#
2957n/a# Functions used to create test cases from the base ones in this module
2958n/a#
2959n/a
2960n/adef create_test_cases(Mixin, type):
2961n/a result = {}
2962n/a glob = globals()
2963n/a Type = type.capitalize()
2964n/a ALL_TYPES = {'processes', 'threads', 'manager'}
2965n/a
2966n/a for name in list(glob.keys()):
2967n/a if name.startswith('_Test'):
2968n/a base = glob[name]
2969n/a assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
2970n/a if type in base.ALLOWED_TYPES:
2971n/a newname = 'With' + Type + name[1:]
2972n/a class Temp(base, Mixin, unittest.TestCase):
2973n/a pass
2974n/a result[newname] = Temp
2975n/a Temp.__name__ = Temp.__qualname__ = newname
2976n/a Temp.__module__ = Mixin.__module__
2977n/a return result
2978n/a
2979n/a#
2980n/a# Create test cases
2981n/a#
2982n/a
2983n/aclass ProcessesMixin(object):
2984n/a TYPE = 'processes'
2985n/a Process = multiprocessing.Process
2986n/a connection = multiprocessing.connection
2987n/a current_process = staticmethod(multiprocessing.current_process)
2988n/a active_children = staticmethod(multiprocessing.active_children)
2989n/a Pool = staticmethod(multiprocessing.Pool)
2990n/a Pipe = staticmethod(multiprocessing.Pipe)
2991n/a Queue = staticmethod(multiprocessing.Queue)
2992n/a JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2993n/a Lock = staticmethod(multiprocessing.Lock)
2994n/a RLock = staticmethod(multiprocessing.RLock)
2995n/a Semaphore = staticmethod(multiprocessing.Semaphore)
2996n/a BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2997n/a Condition = staticmethod(multiprocessing.Condition)
2998n/a Event = staticmethod(multiprocessing.Event)
2999n/a Barrier = staticmethod(multiprocessing.Barrier)
3000n/a Value = staticmethod(multiprocessing.Value)
3001n/a Array = staticmethod(multiprocessing.Array)
3002n/a RawValue = staticmethod(multiprocessing.RawValue)
3003n/a RawArray = staticmethod(multiprocessing.RawArray)
3004n/a
3005n/atestcases_processes = create_test_cases(ProcessesMixin, type='processes')
3006n/aglobals().update(testcases_processes)
3007n/a
3008n/a
3009n/aclass ManagerMixin(object):
3010n/a TYPE = 'manager'
3011n/a Process = multiprocessing.Process
3012n/a Queue = property(operator.attrgetter('manager.Queue'))
3013n/a JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3014n/a Lock = property(operator.attrgetter('manager.Lock'))
3015n/a RLock = property(operator.attrgetter('manager.RLock'))
3016n/a Semaphore = property(operator.attrgetter('manager.Semaphore'))
3017n/a BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3018n/a Condition = property(operator.attrgetter('manager.Condition'))
3019n/a Event = property(operator.attrgetter('manager.Event'))
3020n/a Barrier = property(operator.attrgetter('manager.Barrier'))
3021n/a Value = property(operator.attrgetter('manager.Value'))
3022n/a Array = property(operator.attrgetter('manager.Array'))
3023n/a list = property(operator.attrgetter('manager.list'))
3024n/a dict = property(operator.attrgetter('manager.dict'))
3025n/a Namespace = property(operator.attrgetter('manager.Namespace'))
3026n/a
3027n/a @classmethod
3028n/a def Pool(cls, *args, **kwds):
3029n/a return cls.manager.Pool(*args, **kwds)
3030n/a
3031n/a @classmethod
3032n/a def setUpClass(cls):
3033n/a cls.manager = multiprocessing.Manager()
3034n/a
3035n/a @classmethod
3036n/a def tearDownClass(cls):
3037n/a # only the manager process should be returned by active_children()
3038n/a # but this can take a bit on slow machines, so wait a few seconds
3039n/a # if there are other children too (see #17395)
3040n/a t = 0.01
3041n/a while len(multiprocessing.active_children()) > 1 and t < 5:
3042n/a time.sleep(t)
3043n/a t *= 2
3044n/a gc.collect() # do garbage collection
3045n/a if cls.manager._number_of_objects() != 0:
3046n/a # This is not really an error since some tests do not
3047n/a # ensure that all processes which hold a reference to a
3048n/a # managed object have been joined.
3049n/a print('Shared objects which still exist at manager shutdown:')
3050n/a print(cls.manager._debug_info())
3051n/a cls.manager.shutdown()
3052n/a cls.manager.join()
3053n/a cls.manager = None
3054n/a
3055n/atestcases_manager = create_test_cases(ManagerMixin, type='manager')
3056n/aglobals().update(testcases_manager)
3057n/a
3058n/a
3059n/aclass ThreadsMixin(object):
3060n/a TYPE = 'threads'
3061n/a Process = multiprocessing.dummy.Process
3062n/a connection = multiprocessing.dummy.connection
3063n/a current_process = staticmethod(multiprocessing.dummy.current_process)
3064n/a active_children = staticmethod(multiprocessing.dummy.active_children)
3065n/a Pool = staticmethod(multiprocessing.Pool)
3066n/a Pipe = staticmethod(multiprocessing.dummy.Pipe)
3067n/a Queue = staticmethod(multiprocessing.dummy.Queue)
3068n/a JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3069n/a Lock = staticmethod(multiprocessing.dummy.Lock)
3070n/a RLock = staticmethod(multiprocessing.dummy.RLock)
3071n/a Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3072n/a BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3073n/a Condition = staticmethod(multiprocessing.dummy.Condition)
3074n/a Event = staticmethod(multiprocessing.dummy.Event)
3075n/a Barrier = staticmethod(multiprocessing.dummy.Barrier)
3076n/a Value = staticmethod(multiprocessing.dummy.Value)
3077n/a Array = staticmethod(multiprocessing.dummy.Array)
3078n/a
3079n/atestcases_threads = create_test_cases(ThreadsMixin, type='threads')
3080n/aglobals().update(testcases_threads)
3081n/a
3082n/a
3083n/aclass OtherTest(unittest.TestCase):
3084n/a # TODO: add more tests for deliver/answer challenge.
3085n/a def test_deliver_challenge_auth_failure(self):
3086n/a class _FakeConnection(object):
3087n/a def recv_bytes(self, size):
3088n/a return b'something bogus'
3089n/a def send_bytes(self, data):
3090n/a pass
3091n/a self.assertRaises(multiprocessing.AuthenticationError,
3092n/a multiprocessing.connection.deliver_challenge,
3093n/a _FakeConnection(), b'abc')
3094n/a
3095n/a def test_answer_challenge_auth_failure(self):
3096n/a class _FakeConnection(object):
3097n/a def __init__(self):
3098n/a self.count = 0
3099n/a def recv_bytes(self, size):
3100n/a self.count += 1
3101n/a if self.count == 1:
3102n/a return multiprocessing.connection.CHALLENGE
3103n/a elif self.count == 2:
3104n/a return b'something bogus'
3105n/a return b''
3106n/a def send_bytes(self, data):
3107n/a pass
3108n/a self.assertRaises(multiprocessing.AuthenticationError,
3109n/a multiprocessing.connection.answer_challenge,
3110n/a _FakeConnection(), b'abc')
3111n/a
3112n/a#
3113n/a# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3114n/a#
3115n/a
3116n/adef initializer(ns):
3117n/a ns.test += 1
3118n/a
3119n/aclass TestInitializers(unittest.TestCase):
3120n/a def setUp(self):
3121n/a self.mgr = multiprocessing.Manager()
3122n/a self.ns = self.mgr.Namespace()
3123n/a self.ns.test = 0
3124n/a
3125n/a def tearDown(self):
3126n/a self.mgr.shutdown()
3127n/a self.mgr.join()
3128n/a
3129n/a def test_manager_initializer(self):
3130n/a m = multiprocessing.managers.SyncManager()
3131n/a self.assertRaises(TypeError, m.start, 1)
3132n/a m.start(initializer, (self.ns,))
3133n/a self.assertEqual(self.ns.test, 1)
3134n/a m.shutdown()
3135n/a m.join()
3136n/a
3137n/a def test_pool_initializer(self):
3138n/a self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3139n/a p = multiprocessing.Pool(1, initializer, (self.ns,))
3140n/a p.close()
3141n/a p.join()
3142n/a self.assertEqual(self.ns.test, 1)
3143n/a
3144n/a#
3145n/a# Issue 5155, 5313, 5331: Test process in processes
3146n/a# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3147n/a#
3148n/a
3149n/adef _ThisSubProcess(q):
3150n/a try:
3151n/a item = q.get(block=False)
3152n/a except pyqueue.Empty:
3153n/a pass
3154n/a
3155n/adef _TestProcess(q):
3156n/a queue = multiprocessing.Queue()
3157n/a subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
3158n/a subProc.daemon = True
3159n/a subProc.start()
3160n/a subProc.join()
3161n/a
3162n/adef _afunc(x):
3163n/a return x*x
3164n/a
3165n/adef pool_in_process():
3166n/a pool = multiprocessing.Pool(processes=4)
3167n/a x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3168n/a pool.close()
3169n/a pool.join()
3170n/a
3171n/aclass _file_like(object):
3172n/a def __init__(self, delegate):
3173n/a self._delegate = delegate
3174n/a self._pid = None
3175n/a
3176n/a @property
3177n/a def cache(self):
3178n/a pid = os.getpid()
3179n/a # There are no race conditions since fork keeps only the running thread
3180n/a if pid != self._pid:
3181n/a self._pid = pid
3182n/a self._cache = []
3183n/a return self._cache
3184n/a
3185n/a def write(self, data):
3186n/a self.cache.append(data)
3187n/a
3188n/a def flush(self):
3189n/a self._delegate.write(''.join(self.cache))
3190n/a self._cache = []
3191n/a
3192n/aclass TestStdinBadfiledescriptor(unittest.TestCase):
3193n/a
3194n/a def test_queue_in_process(self):
3195n/a queue = multiprocessing.Queue()
3196n/a proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3197n/a proc.start()
3198n/a proc.join()
3199n/a
3200n/a def test_pool_in_process(self):
3201n/a p = multiprocessing.Process(target=pool_in_process)
3202n/a p.start()
3203n/a p.join()
3204n/a
3205n/a def test_flushing(self):
3206n/a sio = io.StringIO()
3207n/a flike = _file_like(sio)
3208n/a flike.write('foo')
3209n/a proc = multiprocessing.Process(target=lambda: flike.flush())
3210n/a flike.flush()
3211n/a assert sio.getvalue() == 'foo'
3212n/a
3213n/a
3214n/aclass TestWait(unittest.TestCase):
3215n/a
3216n/a @classmethod
3217n/a def _child_test_wait(cls, w, slow):
3218n/a for i in range(10):
3219n/a if slow:
3220n/a time.sleep(random.random()*0.1)
3221n/a w.send((i, os.getpid()))
3222n/a w.close()
3223n/a
3224n/a def test_wait(self, slow=False):
3225n/a from multiprocessing.connection import wait
3226n/a readers = []
3227n/a procs = []
3228n/a messages = []
3229n/a
3230n/a for i in range(4):
3231n/a r, w = multiprocessing.Pipe(duplex=False)
3232n/a p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3233n/a p.daemon = True
3234n/a p.start()
3235n/a w.close()
3236n/a readers.append(r)
3237n/a procs.append(p)
3238n/a self.addCleanup(p.join)
3239n/a
3240n/a while readers:
3241n/a for r in wait(readers):
3242n/a try:
3243n/a msg = r.recv()
3244n/a except EOFError:
3245n/a readers.remove(r)
3246n/a r.close()
3247n/a else:
3248n/a messages.append(msg)
3249n/a
3250n/a messages.sort()
3251n/a expected = sorted((i, p.pid) for i in range(10) for p in procs)
3252n/a self.assertEqual(messages, expected)
3253n/a
3254n/a @classmethod
3255n/a def _child_test_wait_socket(cls, address, slow):
3256n/a s = socket.socket()
3257n/a s.connect(address)
3258n/a for i in range(10):
3259n/a if slow:
3260n/a time.sleep(random.random()*0.1)
3261n/a s.sendall(('%s\n' % i).encode('ascii'))
3262n/a s.close()
3263n/a
3264n/a def test_wait_socket(self, slow=False):
3265n/a from multiprocessing.connection import wait
3266n/a l = socket.socket()
3267n/a l.bind(('', 0))
3268n/a l.listen(4)
3269n/a addr = ('localhost', l.getsockname()[1])
3270n/a readers = []
3271n/a procs = []
3272n/a dic = {}
3273n/a
3274n/a for i in range(4):
3275n/a p = multiprocessing.Process(target=self._child_test_wait_socket,
3276n/a args=(addr, slow))
3277n/a p.daemon = True
3278n/a p.start()
3279n/a procs.append(p)
3280n/a self.addCleanup(p.join)
3281n/a
3282n/a for i in range(4):
3283n/a r, _ = l.accept()
3284n/a readers.append(r)
3285n/a dic[r] = []
3286n/a l.close()
3287n/a
3288n/a while readers:
3289n/a for r in wait(readers):
3290n/a msg = r.recv(32)
3291n/a if not msg:
3292n/a readers.remove(r)
3293n/a r.close()
3294n/a else:
3295n/a dic[r].append(msg)
3296n/a
3297n/a expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3298n/a for v in dic.values():
3299n/a self.assertEqual(b''.join(v), expected)
3300n/a
3301n/a def test_wait_slow(self):
3302n/a self.test_wait(True)
3303n/a
3304n/a def test_wait_socket_slow(self):
3305n/a self.test_wait_socket(True)
3306n/a
3307n/a def test_wait_timeout(self):
3308n/a from multiprocessing.connection import wait
3309n/a
3310n/a expected = 5
3311n/a a, b = multiprocessing.Pipe()
3312n/a
3313n/a start = time.time()
3314n/a res = wait([a, b], expected)
3315n/a delta = time.time() - start
3316n/a
3317n/a self.assertEqual(res, [])
3318n/a self.assertLess(delta, expected * 2)
3319n/a self.assertGreater(delta, expected * 0.5)
3320n/a
3321n/a b.send(None)
3322n/a
3323n/a start = time.time()
3324n/a res = wait([a, b], 20)
3325n/a delta = time.time() - start
3326n/a
3327n/a self.assertEqual(res, [a])
3328n/a self.assertLess(delta, 0.4)
3329n/a
3330n/a @classmethod
3331n/a def signal_and_sleep(cls, sem, period):
3332n/a sem.release()
3333n/a time.sleep(period)
3334n/a
3335n/a def test_wait_integer(self):
3336n/a from multiprocessing.connection import wait
3337n/a
3338n/a expected = 3
3339n/a sorted_ = lambda l: sorted(l, key=lambda x: id(x))
3340n/a sem = multiprocessing.Semaphore(0)
3341n/a a, b = multiprocessing.Pipe()
3342n/a p = multiprocessing.Process(target=self.signal_and_sleep,
3343n/a args=(sem, expected))
3344n/a
3345n/a p.start()
3346n/a self.assertIsInstance(p.sentinel, int)
3347n/a self.assertTrue(sem.acquire(timeout=20))
3348n/a
3349n/a start = time.time()
3350n/a res = wait([a, p.sentinel, b], expected + 20)
3351n/a delta = time.time() - start
3352n/a
3353n/a self.assertEqual(res, [p.sentinel])
3354n/a self.assertLess(delta, expected + 2)
3355n/a self.assertGreater(delta, expected - 2)
3356n/a
3357n/a a.send(None)
3358n/a
3359n/a start = time.time()
3360n/a res = wait([a, p.sentinel, b], 20)
3361n/a delta = time.time() - start
3362n/a
3363n/a self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
3364n/a self.assertLess(delta, 0.4)
3365n/a
3366n/a b.send(None)
3367n/a
3368n/a start = time.time()
3369n/a res = wait([a, p.sentinel, b], 20)
3370n/a delta = time.time() - start
3371n/a
3372n/a self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
3373n/a self.assertLess(delta, 0.4)
3374n/a
3375n/a p.terminate()
3376n/a p.join()
3377n/a
3378n/a def test_neg_timeout(self):
3379n/a from multiprocessing.connection import wait
3380n/a a, b = multiprocessing.Pipe()
3381n/a t = time.time()
3382n/a res = wait([a], timeout=-1)
3383n/a t = time.time() - t
3384n/a self.assertEqual(res, [])
3385n/a self.assertLess(t, 1)
3386n/a a.close()
3387n/a b.close()
3388n/a
3389n/a#
3390n/a# Issue 14151: Test invalid family on invalid environment
3391n/a#
3392n/a
3393n/aclass TestInvalidFamily(unittest.TestCase):
3394n/a
3395n/a @unittest.skipIf(WIN32, "skipped on Windows")
3396n/a def test_invalid_family(self):
3397n/a with self.assertRaises(ValueError):
3398n/a multiprocessing.connection.Listener(r'\\.\test')
3399n/a
3400n/a @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3401n/a def test_invalid_family_win32(self):
3402n/a with self.assertRaises(ValueError):
3403n/a multiprocessing.connection.Listener('/var/test.pipe')
3404n/a
3405n/a#
3406n/a# Issue 12098: check sys.flags of child matches that for parent
3407n/a#
3408n/a
3409n/aclass TestFlags(unittest.TestCase):
3410n/a @classmethod
3411n/a def run_in_grandchild(cls, conn):
3412n/a conn.send(tuple(sys.flags))
3413n/a
3414n/a @classmethod
3415n/a def run_in_child(cls):
3416n/a import json
3417n/a r, w = multiprocessing.Pipe(duplex=False)
3418n/a p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3419n/a p.start()
3420n/a grandchild_flags = r.recv()
3421n/a p.join()
3422n/a r.close()
3423n/a w.close()
3424n/a flags = (tuple(sys.flags), grandchild_flags)
3425n/a print(json.dumps(flags))
3426n/a
3427n/a def test_flags(self):
3428n/a import json, subprocess
3429n/a # start child process using unusual flags
3430n/a prog = ('from test.test_multiprocessing import TestFlags; ' +
3431n/a 'TestFlags.run_in_child()')
3432n/a data = subprocess.check_output(
3433n/a [sys.executable, '-E', '-S', '-O', '-c', prog])
3434n/a child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3435n/a self.assertEqual(child_flags, grandchild_flags)
3436n/a
3437n/a#
3438n/a# Test interaction with socket timeouts - see Issue #6056
3439n/a#
3440n/a
3441n/aclass TestTimeouts(unittest.TestCase):
3442n/a @classmethod
3443n/a def _test_timeout(cls, child, address):
3444n/a time.sleep(1)
3445n/a child.send(123)
3446n/a child.close()
3447n/a conn = multiprocessing.connection.Client(address)
3448n/a conn.send(456)
3449n/a conn.close()
3450n/a
3451n/a def test_timeout(self):
3452n/a old_timeout = socket.getdefaulttimeout()
3453n/a try:
3454n/a socket.setdefaulttimeout(0.1)
3455n/a parent, child = multiprocessing.Pipe(duplex=True)
3456n/a l = multiprocessing.connection.Listener(family='AF_INET')
3457n/a p = multiprocessing.Process(target=self._test_timeout,
3458n/a args=(child, l.address))
3459n/a p.start()
3460n/a child.close()
3461n/a self.assertEqual(parent.recv(), 123)
3462n/a parent.close()
3463n/a conn = l.accept()
3464n/a self.assertEqual(conn.recv(), 456)
3465n/a conn.close()
3466n/a l.close()
3467n/a p.join(10)
3468n/a finally:
3469n/a socket.setdefaulttimeout(old_timeout)
3470n/a
3471n/a#
3472n/a# Test what happens with no "if __name__ == '__main__'"
3473n/a#
3474n/a
3475n/aclass TestNoForkBomb(unittest.TestCase):
3476n/a def test_noforkbomb(self):
3477n/a name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3478n/a if WIN32:
3479n/a rc, out, err = test.script_helper.assert_python_failure(name)
3480n/a self.assertEqual('', out.decode('ascii'))
3481n/a self.assertIn('RuntimeError', err.decode('ascii'))
3482n/a else:
3483n/a rc, out, err = test.script_helper.assert_python_ok(name)
3484n/a self.assertEqual('123', out.decode('ascii').rstrip())
3485n/a self.assertEqual('', err.decode('ascii'))
3486n/a
3487n/a#
3488n/a# Issue #17555: ForkAwareThreadLock
3489n/a#
3490n/a
3491n/aclass TestForkAwareThreadLock(unittest.TestCase):
3492n/a # We recurisvely start processes. Issue #17555 meant that the
3493n/a # after fork registry would get duplicate entries for the same
3494n/a # lock. The size of the registry at generation n was ~2**n.
3495n/a
3496n/a @classmethod
3497n/a def child(cls, n, conn):
3498n/a if n > 1:
3499n/a p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3500n/a p.start()
3501n/a p.join()
3502n/a else:
3503n/a conn.send(len(util._afterfork_registry))
3504n/a conn.close()
3505n/a
3506n/a def test_lock(self):
3507n/a r, w = multiprocessing.Pipe(False)
3508n/a l = util.ForkAwareThreadLock()
3509n/a old_size = len(util._afterfork_registry)
3510n/a p = multiprocessing.Process(target=self.child, args=(5, w))
3511n/a p.start()
3512n/a new_size = r.recv()
3513n/a p.join()
3514n/a self.assertLessEqual(new_size, old_size)
3515n/a
3516n/a#
3517n/a# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3518n/a#
3519n/a
3520n/aclass TestIgnoreEINTR(unittest.TestCase):
3521n/a
3522n/a @classmethod
3523n/a def _test_ignore(cls, conn):
3524n/a def handler(signum, frame):
3525n/a pass
3526n/a signal.signal(signal.SIGUSR1, handler)
3527n/a conn.send('ready')
3528n/a x = conn.recv()
3529n/a conn.send(x)
3530n/a conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3531n/a
3532n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3533n/a def test_ignore(self):
3534n/a conn, child_conn = multiprocessing.Pipe()
3535n/a try:
3536n/a p = multiprocessing.Process(target=self._test_ignore,
3537n/a args=(child_conn,))
3538n/a p.daemon = True
3539n/a p.start()
3540n/a child_conn.close()
3541n/a self.assertEqual(conn.recv(), 'ready')
3542n/a time.sleep(0.1)
3543n/a os.kill(p.pid, signal.SIGUSR1)
3544n/a time.sleep(0.1)
3545n/a conn.send(1234)
3546n/a self.assertEqual(conn.recv(), 1234)
3547n/a time.sleep(0.1)
3548n/a os.kill(p.pid, signal.SIGUSR1)
3549n/a self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3550n/a time.sleep(0.1)
3551n/a p.join()
3552n/a finally:
3553n/a conn.close()
3554n/a
3555n/a @classmethod
3556n/a def _test_ignore_listener(cls, conn):
3557n/a def handler(signum, frame):
3558n/a pass
3559n/a signal.signal(signal.SIGUSR1, handler)
3560n/a l = multiprocessing.connection.Listener()
3561n/a conn.send(l.address)
3562n/a a = l.accept()
3563n/a a.send('welcome')
3564n/a
3565n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3566n/a def test_ignore_listener(self):
3567n/a conn, child_conn = multiprocessing.Pipe()
3568n/a try:
3569n/a p = multiprocessing.Process(target=self._test_ignore_listener,
3570n/a args=(child_conn,))
3571n/a p.daemon = True
3572n/a p.start()
3573n/a child_conn.close()
3574n/a address = conn.recv()
3575n/a time.sleep(0.1)
3576n/a os.kill(p.pid, signal.SIGUSR1)
3577n/a time.sleep(0.1)
3578n/a client = multiprocessing.connection.Client(address)
3579n/a self.assertEqual(client.recv(), 'welcome')
3580n/a p.join()
3581n/a finally:
3582n/a conn.close()
3583n/a
3584n/a#
3585n/a#
3586n/a#
3587n/a
3588n/adef setUpModule():
3589n/a if sys.platform.startswith("linux"):
3590n/a try:
3591n/a lock = multiprocessing.RLock()
3592n/a except OSError:
3593n/a raise unittest.SkipTest("OSError raises on RLock creation, "
3594n/a "see issue 3111!")
3595n/a check_enough_semaphores()
3596n/a util.get_temp_dir() # creates temp directory for use by all processes
3597n/a multiprocessing.get_logger().setLevel(LOG_LEVEL)
3598n/a
3599n/a
3600n/adef tearDownModule():
3601n/a # pause a bit so we don't get warning about dangling threads/processes
3602n/a time.sleep(0.5)
3603n/a
3604n/a
3605n/aif __name__ == '__main__':
3606n/a unittest.main()