ยปCore Development>Code coverage>Lib/test/_test_multiprocessing.py

Python code coverage for Lib/test/_test_multiprocessing.py

#countcontent
1n/a#
2n/a# Unit tests for the multiprocessing package
3n/a#
4n/a
5n/aimport unittest
6n/aimport queue as pyqueue
7n/aimport time
8n/aimport io
9n/aimport itertools
10n/aimport sys
11n/aimport os
12n/aimport gc
13n/aimport errno
14n/aimport signal
15n/aimport array
16n/aimport socket
17n/aimport random
18n/aimport logging
19n/aimport struct
20n/aimport operator
21n/aimport test.support
22n/aimport test.support.script_helper
23n/a
24n/a
25n/a# Skip tests if _multiprocessing wasn't built.
26n/a_multiprocessing = test.support.import_module('_multiprocessing')
27n/a# Skip tests if sem_open implementation is broken.
28n/atest.support.import_module('multiprocessing.synchronize')
29n/a# import threading after _multiprocessing to raise a more relevant error
30n/a# message: "No module named _multiprocessing". _multiprocessing is not compiled
31n/a# without thread support.
32n/aimport threading
33n/a
34n/aimport multiprocessing.dummy
35n/aimport multiprocessing.connection
36n/aimport multiprocessing.managers
37n/aimport multiprocessing.heap
38n/aimport multiprocessing.pool
39n/a
40n/afrom multiprocessing import util
41n/a
42n/atry:
43n/a from multiprocessing import reduction
44n/a HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
45n/aexcept ImportError:
46n/a HAS_REDUCTION = False
47n/a
48n/atry:
49n/a from multiprocessing.sharedctypes import Value, copy
50n/a HAS_SHAREDCTYPES = True
51n/aexcept ImportError:
52n/a HAS_SHAREDCTYPES = False
53n/a
54n/atry:
55n/a import msvcrt
56n/aexcept ImportError:
57n/a msvcrt = None
58n/a
59n/a#
60n/a#
61n/a#
62n/a
63n/adef latin(s):
64n/a return s.encode('latin')
65n/a
66n/a#
67n/a# Constants
68n/a#
69n/a
70n/aLOG_LEVEL = util.SUBWARNING
71n/a#LOG_LEVEL = logging.DEBUG
72n/a
73n/aDELTA = 0.1
74n/aCHECK_TIMINGS = False # making true makes tests take a lot longer
75n/a # and can sometimes cause some non-serious
76n/a # failures because some calls block a bit
77n/a # longer than expected
78n/aif CHECK_TIMINGS:
79n/a TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80n/aelse:
81n/a TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82n/a
83n/aHAVE_GETVALUE = not getattr(_multiprocessing,
84n/a 'HAVE_BROKEN_SEM_GETVALUE', False)
85n/a
86n/aWIN32 = (sys.platform == "win32")
87n/a
88n/afrom multiprocessing.connection import wait
89n/a
90n/adef wait_for_handle(handle, timeout):
91n/a if timeout is not None and timeout < 0.0:
92n/a timeout = None
93n/a return wait([handle], timeout)
94n/a
95n/atry:
96n/a MAXFD = os.sysconf("SC_OPEN_MAX")
97n/aexcept:
98n/a MAXFD = 256
99n/a
100n/a# To speed up tests when using the forkserver, we can preload these:
101n/aPRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
102n/a
103n/a#
104n/a# Some tests require ctypes
105n/a#
106n/a
107n/atry:
108n/a from ctypes import Structure, c_int, c_double
109n/aexcept ImportError:
110n/a Structure = object
111n/a c_int = c_double = None
112n/a
113n/a
114n/adef check_enough_semaphores():
115n/a """Check that the system supports enough semaphores to run the test."""
116n/a # minimum number of semaphores available according to POSIX
117n/a nsems_min = 256
118n/a try:
119n/a nsems = os.sysconf("SC_SEM_NSEMS_MAX")
120n/a except (AttributeError, ValueError):
121n/a # sysconf not available or setting not available
122n/a return
123n/a if nsems == -1 or nsems >= nsems_min:
124n/a return
125n/a raise unittest.SkipTest("The OS doesn't support enough semaphores "
126n/a "to run the test (required: %d)." % nsems_min)
127n/a
128n/a
129n/a#
130n/a# Creates a wrapper for a function which records the time it takes to finish
131n/a#
132n/a
133n/aclass TimingWrapper(object):
134n/a
135n/a def __init__(self, func):
136n/a self.func = func
137n/a self.elapsed = None
138n/a
139n/a def __call__(self, *args, **kwds):
140n/a t = time.time()
141n/a try:
142n/a return self.func(*args, **kwds)
143n/a finally:
144n/a self.elapsed = time.time() - t
145n/a
146n/a#
147n/a# Base class for test cases
148n/a#
149n/a
150n/aclass BaseTestCase(object):
151n/a
152n/a ALLOWED_TYPES = ('processes', 'manager', 'threads')
153n/a
154n/a def assertTimingAlmostEqual(self, a, b):
155n/a if CHECK_TIMINGS:
156n/a self.assertAlmostEqual(a, b, 1)
157n/a
158n/a def assertReturnsIfImplemented(self, value, func, *args):
159n/a try:
160n/a res = func(*args)
161n/a except NotImplementedError:
162n/a pass
163n/a else:
164n/a return self.assertEqual(value, res)
165n/a
166n/a # For the sanity of Windows users, rather than crashing or freezing in
167n/a # multiple ways.
168n/a def __reduce__(self, *args):
169n/a raise NotImplementedError("shouldn't try to pickle a test case")
170n/a
171n/a __reduce_ex__ = __reduce__
172n/a
173n/a#
174n/a# Return the value of a semaphore
175n/a#
176n/a
177n/adef get_value(self):
178n/a try:
179n/a return self.get_value()
180n/a except AttributeError:
181n/a try:
182n/a return self._Semaphore__value
183n/a except AttributeError:
184n/a try:
185n/a return self._value
186n/a except AttributeError:
187n/a raise NotImplementedError
188n/a
189n/a#
190n/a# Testcases
191n/a#
192n/a
193n/aclass _TestProcess(BaseTestCase):
194n/a
195n/a ALLOWED_TYPES = ('processes', 'threads')
196n/a
197n/a def test_current(self):
198n/a if self.TYPE == 'threads':
199n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
200n/a
201n/a current = self.current_process()
202n/a authkey = current.authkey
203n/a
204n/a self.assertTrue(current.is_alive())
205n/a self.assertTrue(not current.daemon)
206n/a self.assertIsInstance(authkey, bytes)
207n/a self.assertTrue(len(authkey) > 0)
208n/a self.assertEqual(current.ident, os.getpid())
209n/a self.assertEqual(current.exitcode, None)
210n/a
211n/a def test_daemon_argument(self):
212n/a if self.TYPE == "threads":
213n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
214n/a
215n/a # By default uses the current process's daemon flag.
216n/a proc0 = self.Process(target=self._test)
217n/a self.assertEqual(proc0.daemon, self.current_process().daemon)
218n/a proc1 = self.Process(target=self._test, daemon=True)
219n/a self.assertTrue(proc1.daemon)
220n/a proc2 = self.Process(target=self._test, daemon=False)
221n/a self.assertFalse(proc2.daemon)
222n/a
223n/a @classmethod
224n/a def _test(cls, q, *args, **kwds):
225n/a current = cls.current_process()
226n/a q.put(args)
227n/a q.put(kwds)
228n/a q.put(current.name)
229n/a if cls.TYPE != 'threads':
230n/a q.put(bytes(current.authkey))
231n/a q.put(current.pid)
232n/a
233n/a def test_process(self):
234n/a q = self.Queue(1)
235n/a e = self.Event()
236n/a args = (q, 1, 2)
237n/a kwargs = {'hello':23, 'bye':2.54}
238n/a name = 'SomeProcess'
239n/a p = self.Process(
240n/a target=self._test, args=args, kwargs=kwargs, name=name
241n/a )
242n/a p.daemon = True
243n/a current = self.current_process()
244n/a
245n/a if self.TYPE != 'threads':
246n/a self.assertEqual(p.authkey, current.authkey)
247n/a self.assertEqual(p.is_alive(), False)
248n/a self.assertEqual(p.daemon, True)
249n/a self.assertNotIn(p, self.active_children())
250n/a self.assertTrue(type(self.active_children()) is list)
251n/a self.assertEqual(p.exitcode, None)
252n/a
253n/a p.start()
254n/a
255n/a self.assertEqual(p.exitcode, None)
256n/a self.assertEqual(p.is_alive(), True)
257n/a self.assertIn(p, self.active_children())
258n/a
259n/a self.assertEqual(q.get(), args[1:])
260n/a self.assertEqual(q.get(), kwargs)
261n/a self.assertEqual(q.get(), p.name)
262n/a if self.TYPE != 'threads':
263n/a self.assertEqual(q.get(), current.authkey)
264n/a self.assertEqual(q.get(), p.pid)
265n/a
266n/a p.join()
267n/a
268n/a self.assertEqual(p.exitcode, 0)
269n/a self.assertEqual(p.is_alive(), False)
270n/a self.assertNotIn(p, self.active_children())
271n/a
272n/a @classmethod
273n/a def _test_terminate(cls):
274n/a time.sleep(100)
275n/a
276n/a def test_terminate(self):
277n/a if self.TYPE == 'threads':
278n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
279n/a
280n/a p = self.Process(target=self._test_terminate)
281n/a p.daemon = True
282n/a p.start()
283n/a
284n/a self.assertEqual(p.is_alive(), True)
285n/a self.assertIn(p, self.active_children())
286n/a self.assertEqual(p.exitcode, None)
287n/a
288n/a join = TimingWrapper(p.join)
289n/a
290n/a self.assertEqual(join(0), None)
291n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
292n/a self.assertEqual(p.is_alive(), True)
293n/a
294n/a self.assertEqual(join(-1), None)
295n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
296n/a self.assertEqual(p.is_alive(), True)
297n/a
298n/a # XXX maybe terminating too soon causes the problems on Gentoo...
299n/a time.sleep(1)
300n/a
301n/a p.terminate()
302n/a
303n/a if hasattr(signal, 'alarm'):
304n/a # On the Gentoo buildbot waitpid() often seems to block forever.
305n/a # We use alarm() to interrupt it if it blocks for too long.
306n/a def handler(*args):
307n/a raise RuntimeError('join took too long: %s' % p)
308n/a old_handler = signal.signal(signal.SIGALRM, handler)
309n/a try:
310n/a signal.alarm(10)
311n/a self.assertEqual(join(), None)
312n/a finally:
313n/a signal.alarm(0)
314n/a signal.signal(signal.SIGALRM, old_handler)
315n/a else:
316n/a self.assertEqual(join(), None)
317n/a
318n/a self.assertTimingAlmostEqual(join.elapsed, 0.0)
319n/a
320n/a self.assertEqual(p.is_alive(), False)
321n/a self.assertNotIn(p, self.active_children())
322n/a
323n/a p.join()
324n/a
325n/a # XXX sometimes get p.exitcode == 0 on Windows ...
326n/a #self.assertEqual(p.exitcode, -signal.SIGTERM)
327n/a
328n/a def test_cpu_count(self):
329n/a try:
330n/a cpus = multiprocessing.cpu_count()
331n/a except NotImplementedError:
332n/a cpus = 1
333n/a self.assertTrue(type(cpus) is int)
334n/a self.assertTrue(cpus >= 1)
335n/a
336n/a def test_active_children(self):
337n/a self.assertEqual(type(self.active_children()), list)
338n/a
339n/a p = self.Process(target=time.sleep, args=(DELTA,))
340n/a self.assertNotIn(p, self.active_children())
341n/a
342n/a p.daemon = True
343n/a p.start()
344n/a self.assertIn(p, self.active_children())
345n/a
346n/a p.join()
347n/a self.assertNotIn(p, self.active_children())
348n/a
349n/a @classmethod
350n/a def _test_recursion(cls, wconn, id):
351n/a wconn.send(id)
352n/a if len(id) < 2:
353n/a for i in range(2):
354n/a p = cls.Process(
355n/a target=cls._test_recursion, args=(wconn, id+[i])
356n/a )
357n/a p.start()
358n/a p.join()
359n/a
360n/a def test_recursion(self):
361n/a rconn, wconn = self.Pipe(duplex=False)
362n/a self._test_recursion(wconn, [])
363n/a
364n/a time.sleep(DELTA)
365n/a result = []
366n/a while rconn.poll():
367n/a result.append(rconn.recv())
368n/a
369n/a expected = [
370n/a [],
371n/a [0],
372n/a [0, 0],
373n/a [0, 1],
374n/a [1],
375n/a [1, 0],
376n/a [1, 1]
377n/a ]
378n/a self.assertEqual(result, expected)
379n/a
380n/a @classmethod
381n/a def _test_sentinel(cls, event):
382n/a event.wait(10.0)
383n/a
384n/a def test_sentinel(self):
385n/a if self.TYPE == "threads":
386n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
387n/a event = self.Event()
388n/a p = self.Process(target=self._test_sentinel, args=(event,))
389n/a with self.assertRaises(ValueError):
390n/a p.sentinel
391n/a p.start()
392n/a self.addCleanup(p.join)
393n/a sentinel = p.sentinel
394n/a self.assertIsInstance(sentinel, int)
395n/a self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
396n/a event.set()
397n/a p.join()
398n/a self.assertTrue(wait_for_handle(sentinel, timeout=1))
399n/a
400n/a#
401n/a#
402n/a#
403n/a
404n/aclass _UpperCaser(multiprocessing.Process):
405n/a
406n/a def __init__(self):
407n/a multiprocessing.Process.__init__(self)
408n/a self.child_conn, self.parent_conn = multiprocessing.Pipe()
409n/a
410n/a def run(self):
411n/a self.parent_conn.close()
412n/a for s in iter(self.child_conn.recv, None):
413n/a self.child_conn.send(s.upper())
414n/a self.child_conn.close()
415n/a
416n/a def submit(self, s):
417n/a assert type(s) is str
418n/a self.parent_conn.send(s)
419n/a return self.parent_conn.recv()
420n/a
421n/a def stop(self):
422n/a self.parent_conn.send(None)
423n/a self.parent_conn.close()
424n/a self.child_conn.close()
425n/a
426n/aclass _TestSubclassingProcess(BaseTestCase):
427n/a
428n/a ALLOWED_TYPES = ('processes',)
429n/a
430n/a def test_subclassing(self):
431n/a uppercaser = _UpperCaser()
432n/a uppercaser.daemon = True
433n/a uppercaser.start()
434n/a self.assertEqual(uppercaser.submit('hello'), 'HELLO')
435n/a self.assertEqual(uppercaser.submit('world'), 'WORLD')
436n/a uppercaser.stop()
437n/a uppercaser.join()
438n/a
439n/a def test_stderr_flush(self):
440n/a # sys.stderr is flushed at process shutdown (issue #13812)
441n/a if self.TYPE == "threads":
442n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
443n/a
444n/a testfn = test.support.TESTFN
445n/a self.addCleanup(test.support.unlink, testfn)
446n/a proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
447n/a proc.start()
448n/a proc.join()
449n/a with open(testfn, 'r') as f:
450n/a err = f.read()
451n/a # The whole traceback was printed
452n/a self.assertIn("ZeroDivisionError", err)
453n/a self.assertIn("test_multiprocessing.py", err)
454n/a self.assertIn("1/0 # MARKER", err)
455n/a
456n/a @classmethod
457n/a def _test_stderr_flush(cls, testfn):
458n/a fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
459n/a sys.stderr = open(fd, 'w', closefd=False)
460n/a 1/0 # MARKER
461n/a
462n/a
463n/a @classmethod
464n/a def _test_sys_exit(cls, reason, testfn):
465n/a fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
466n/a sys.stderr = open(fd, 'w', closefd=False)
467n/a sys.exit(reason)
468n/a
469n/a def test_sys_exit(self):
470n/a # See Issue 13854
471n/a if self.TYPE == 'threads':
472n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
473n/a
474n/a testfn = test.support.TESTFN
475n/a self.addCleanup(test.support.unlink, testfn)
476n/a
477n/a for reason in (
478n/a [1, 2, 3],
479n/a 'ignore this',
480n/a ):
481n/a p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
482n/a p.daemon = True
483n/a p.start()
484n/a p.join(5)
485n/a self.assertEqual(p.exitcode, 1)
486n/a
487n/a with open(testfn, 'r') as f:
488n/a content = f.read()
489n/a self.assertEqual(content.rstrip(), str(reason))
490n/a
491n/a os.unlink(testfn)
492n/a
493n/a for reason in (True, False, 8):
494n/a p = self.Process(target=sys.exit, args=(reason,))
495n/a p.daemon = True
496n/a p.start()
497n/a p.join(5)
498n/a self.assertEqual(p.exitcode, reason)
499n/a
500n/a#
501n/a#
502n/a#
503n/a
504n/adef queue_empty(q):
505n/a if hasattr(q, 'empty'):
506n/a return q.empty()
507n/a else:
508n/a return q.qsize() == 0
509n/a
510n/adef queue_full(q, maxsize):
511n/a if hasattr(q, 'full'):
512n/a return q.full()
513n/a else:
514n/a return q.qsize() == maxsize
515n/a
516n/a
517n/aclass _TestQueue(BaseTestCase):
518n/a
519n/a
520n/a @classmethod
521n/a def _test_put(cls, queue, child_can_start, parent_can_continue):
522n/a child_can_start.wait()
523n/a for i in range(6):
524n/a queue.get()
525n/a parent_can_continue.set()
526n/a
527n/a def test_put(self):
528n/a MAXSIZE = 6
529n/a queue = self.Queue(maxsize=MAXSIZE)
530n/a child_can_start = self.Event()
531n/a parent_can_continue = self.Event()
532n/a
533n/a proc = self.Process(
534n/a target=self._test_put,
535n/a args=(queue, child_can_start, parent_can_continue)
536n/a )
537n/a proc.daemon = True
538n/a proc.start()
539n/a
540n/a self.assertEqual(queue_empty(queue), True)
541n/a self.assertEqual(queue_full(queue, MAXSIZE), False)
542n/a
543n/a queue.put(1)
544n/a queue.put(2, True)
545n/a queue.put(3, True, None)
546n/a queue.put(4, False)
547n/a queue.put(5, False, None)
548n/a queue.put_nowait(6)
549n/a
550n/a # the values may be in buffer but not yet in pipe so sleep a bit
551n/a time.sleep(DELTA)
552n/a
553n/a self.assertEqual(queue_empty(queue), False)
554n/a self.assertEqual(queue_full(queue, MAXSIZE), True)
555n/a
556n/a put = TimingWrapper(queue.put)
557n/a put_nowait = TimingWrapper(queue.put_nowait)
558n/a
559n/a self.assertRaises(pyqueue.Full, put, 7, False)
560n/a self.assertTimingAlmostEqual(put.elapsed, 0)
561n/a
562n/a self.assertRaises(pyqueue.Full, put, 7, False, None)
563n/a self.assertTimingAlmostEqual(put.elapsed, 0)
564n/a
565n/a self.assertRaises(pyqueue.Full, put_nowait, 7)
566n/a self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
567n/a
568n/a self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
569n/a self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
570n/a
571n/a self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
572n/a self.assertTimingAlmostEqual(put.elapsed, 0)
573n/a
574n/a self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
575n/a self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
576n/a
577n/a child_can_start.set()
578n/a parent_can_continue.wait()
579n/a
580n/a self.assertEqual(queue_empty(queue), True)
581n/a self.assertEqual(queue_full(queue, MAXSIZE), False)
582n/a
583n/a proc.join()
584n/a
585n/a @classmethod
586n/a def _test_get(cls, queue, child_can_start, parent_can_continue):
587n/a child_can_start.wait()
588n/a #queue.put(1)
589n/a queue.put(2)
590n/a queue.put(3)
591n/a queue.put(4)
592n/a queue.put(5)
593n/a parent_can_continue.set()
594n/a
595n/a def test_get(self):
596n/a queue = self.Queue()
597n/a child_can_start = self.Event()
598n/a parent_can_continue = self.Event()
599n/a
600n/a proc = self.Process(
601n/a target=self._test_get,
602n/a args=(queue, child_can_start, parent_can_continue)
603n/a )
604n/a proc.daemon = True
605n/a proc.start()
606n/a
607n/a self.assertEqual(queue_empty(queue), True)
608n/a
609n/a child_can_start.set()
610n/a parent_can_continue.wait()
611n/a
612n/a time.sleep(DELTA)
613n/a self.assertEqual(queue_empty(queue), False)
614n/a
615n/a # Hangs unexpectedly, remove for now
616n/a #self.assertEqual(queue.get(), 1)
617n/a self.assertEqual(queue.get(True, None), 2)
618n/a self.assertEqual(queue.get(True), 3)
619n/a self.assertEqual(queue.get(timeout=1), 4)
620n/a self.assertEqual(queue.get_nowait(), 5)
621n/a
622n/a self.assertEqual(queue_empty(queue), True)
623n/a
624n/a get = TimingWrapper(queue.get)
625n/a get_nowait = TimingWrapper(queue.get_nowait)
626n/a
627n/a self.assertRaises(pyqueue.Empty, get, False)
628n/a self.assertTimingAlmostEqual(get.elapsed, 0)
629n/a
630n/a self.assertRaises(pyqueue.Empty, get, False, None)
631n/a self.assertTimingAlmostEqual(get.elapsed, 0)
632n/a
633n/a self.assertRaises(pyqueue.Empty, get_nowait)
634n/a self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
635n/a
636n/a self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
637n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
638n/a
639n/a self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
640n/a self.assertTimingAlmostEqual(get.elapsed, 0)
641n/a
642n/a self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
643n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
644n/a
645n/a proc.join()
646n/a
647n/a @classmethod
648n/a def _test_fork(cls, queue):
649n/a for i in range(10, 20):
650n/a queue.put(i)
651n/a # note that at this point the items may only be buffered, so the
652n/a # process cannot shutdown until the feeder thread has finished
653n/a # pushing items onto the pipe.
654n/a
655n/a def test_fork(self):
656n/a # Old versions of Queue would fail to create a new feeder
657n/a # thread for a forked process if the original process had its
658n/a # own feeder thread. This test checks that this no longer
659n/a # happens.
660n/a
661n/a queue = self.Queue()
662n/a
663n/a # put items on queue so that main process starts a feeder thread
664n/a for i in range(10):
665n/a queue.put(i)
666n/a
667n/a # wait to make sure thread starts before we fork a new process
668n/a time.sleep(DELTA)
669n/a
670n/a # fork process
671n/a p = self.Process(target=self._test_fork, args=(queue,))
672n/a p.daemon = True
673n/a p.start()
674n/a
675n/a # check that all expected items are in the queue
676n/a for i in range(20):
677n/a self.assertEqual(queue.get(), i)
678n/a self.assertRaises(pyqueue.Empty, queue.get, False)
679n/a
680n/a p.join()
681n/a
682n/a def test_qsize(self):
683n/a q = self.Queue()
684n/a try:
685n/a self.assertEqual(q.qsize(), 0)
686n/a except NotImplementedError:
687n/a self.skipTest('qsize method not implemented')
688n/a q.put(1)
689n/a self.assertEqual(q.qsize(), 1)
690n/a q.put(5)
691n/a self.assertEqual(q.qsize(), 2)
692n/a q.get()
693n/a self.assertEqual(q.qsize(), 1)
694n/a q.get()
695n/a self.assertEqual(q.qsize(), 0)
696n/a
697n/a @classmethod
698n/a def _test_task_done(cls, q):
699n/a for obj in iter(q.get, None):
700n/a time.sleep(DELTA)
701n/a q.task_done()
702n/a
703n/a def test_task_done(self):
704n/a queue = self.JoinableQueue()
705n/a
706n/a workers = [self.Process(target=self._test_task_done, args=(queue,))
707n/a for i in range(4)]
708n/a
709n/a for p in workers:
710n/a p.daemon = True
711n/a p.start()
712n/a
713n/a for i in range(10):
714n/a queue.put(i)
715n/a
716n/a queue.join()
717n/a
718n/a for p in workers:
719n/a queue.put(None)
720n/a
721n/a for p in workers:
722n/a p.join()
723n/a
724n/a def test_no_import_lock_contention(self):
725n/a with test.support.temp_cwd():
726n/a module_name = 'imported_by_an_imported_module'
727n/a with open(module_name + '.py', 'w') as f:
728n/a f.write("""if 1:
729n/a import multiprocessing
730n/a
731n/a q = multiprocessing.Queue()
732n/a q.put('knock knock')
733n/a q.get(timeout=3)
734n/a q.close()
735n/a del q
736n/a """)
737n/a
738n/a with test.support.DirsOnSysPath(os.getcwd()):
739n/a try:
740n/a __import__(module_name)
741n/a except pyqueue.Empty:
742n/a self.fail("Probable regression on import lock contention;"
743n/a " see Issue #22853")
744n/a
745n/a def test_timeout(self):
746n/a q = multiprocessing.Queue()
747n/a start = time.time()
748n/a self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
749n/a delta = time.time() - start
750n/a # Tolerate a delta of 30 ms because of the bad clock resolution on
751n/a # Windows (usually 15.6 ms)
752n/a self.assertGreaterEqual(delta, 0.170)
753n/a
754n/a#
755n/a#
756n/a#
757n/a
758n/aclass _TestLock(BaseTestCase):
759n/a
760n/a def test_lock(self):
761n/a lock = self.Lock()
762n/a self.assertEqual(lock.acquire(), True)
763n/a self.assertEqual(lock.acquire(False), False)
764n/a self.assertEqual(lock.release(), None)
765n/a self.assertRaises((ValueError, threading.ThreadError), lock.release)
766n/a
767n/a def test_rlock(self):
768n/a lock = self.RLock()
769n/a self.assertEqual(lock.acquire(), True)
770n/a self.assertEqual(lock.acquire(), True)
771n/a self.assertEqual(lock.acquire(), True)
772n/a self.assertEqual(lock.release(), None)
773n/a self.assertEqual(lock.release(), None)
774n/a self.assertEqual(lock.release(), None)
775n/a self.assertRaises((AssertionError, RuntimeError), lock.release)
776n/a
777n/a def test_lock_context(self):
778n/a with self.Lock():
779n/a pass
780n/a
781n/a
782n/aclass _TestSemaphore(BaseTestCase):
783n/a
784n/a def _test_semaphore(self, sem):
785n/a self.assertReturnsIfImplemented(2, get_value, sem)
786n/a self.assertEqual(sem.acquire(), True)
787n/a self.assertReturnsIfImplemented(1, get_value, sem)
788n/a self.assertEqual(sem.acquire(), True)
789n/a self.assertReturnsIfImplemented(0, get_value, sem)
790n/a self.assertEqual(sem.acquire(False), False)
791n/a self.assertReturnsIfImplemented(0, get_value, sem)
792n/a self.assertEqual(sem.release(), None)
793n/a self.assertReturnsIfImplemented(1, get_value, sem)
794n/a self.assertEqual(sem.release(), None)
795n/a self.assertReturnsIfImplemented(2, get_value, sem)
796n/a
797n/a def test_semaphore(self):
798n/a sem = self.Semaphore(2)
799n/a self._test_semaphore(sem)
800n/a self.assertEqual(sem.release(), None)
801n/a self.assertReturnsIfImplemented(3, get_value, sem)
802n/a self.assertEqual(sem.release(), None)
803n/a self.assertReturnsIfImplemented(4, get_value, sem)
804n/a
805n/a def test_bounded_semaphore(self):
806n/a sem = self.BoundedSemaphore(2)
807n/a self._test_semaphore(sem)
808n/a # Currently fails on OS/X
809n/a #if HAVE_GETVALUE:
810n/a # self.assertRaises(ValueError, sem.release)
811n/a # self.assertReturnsIfImplemented(2, get_value, sem)
812n/a
813n/a def test_timeout(self):
814n/a if self.TYPE != 'processes':
815n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
816n/a
817n/a sem = self.Semaphore(0)
818n/a acquire = TimingWrapper(sem.acquire)
819n/a
820n/a self.assertEqual(acquire(False), False)
821n/a self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
822n/a
823n/a self.assertEqual(acquire(False, None), False)
824n/a self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
825n/a
826n/a self.assertEqual(acquire(False, TIMEOUT1), False)
827n/a self.assertTimingAlmostEqual(acquire.elapsed, 0)
828n/a
829n/a self.assertEqual(acquire(True, TIMEOUT2), False)
830n/a self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
831n/a
832n/a self.assertEqual(acquire(timeout=TIMEOUT3), False)
833n/a self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
834n/a
835n/a
836n/aclass _TestCondition(BaseTestCase):
837n/a
838n/a @classmethod
839n/a def f(cls, cond, sleeping, woken, timeout=None):
840n/a cond.acquire()
841n/a sleeping.release()
842n/a cond.wait(timeout)
843n/a woken.release()
844n/a cond.release()
845n/a
846n/a def check_invariant(self, cond):
847n/a # this is only supposed to succeed when there are no sleepers
848n/a if self.TYPE == 'processes':
849n/a try:
850n/a sleepers = (cond._sleeping_count.get_value() -
851n/a cond._woken_count.get_value())
852n/a self.assertEqual(sleepers, 0)
853n/a self.assertEqual(cond._wait_semaphore.get_value(), 0)
854n/a except NotImplementedError:
855n/a pass
856n/a
857n/a def test_notify(self):
858n/a cond = self.Condition()
859n/a sleeping = self.Semaphore(0)
860n/a woken = self.Semaphore(0)
861n/a
862n/a p = self.Process(target=self.f, args=(cond, sleeping, woken))
863n/a p.daemon = True
864n/a p.start()
865n/a
866n/a p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
867n/a p.daemon = True
868n/a p.start()
869n/a
870n/a # wait for both children to start sleeping
871n/a sleeping.acquire()
872n/a sleeping.acquire()
873n/a
874n/a # check no process/thread has woken up
875n/a time.sleep(DELTA)
876n/a self.assertReturnsIfImplemented(0, get_value, woken)
877n/a
878n/a # wake up one process/thread
879n/a cond.acquire()
880n/a cond.notify()
881n/a cond.release()
882n/a
883n/a # check one process/thread has woken up
884n/a time.sleep(DELTA)
885n/a self.assertReturnsIfImplemented(1, get_value, woken)
886n/a
887n/a # wake up another
888n/a cond.acquire()
889n/a cond.notify()
890n/a cond.release()
891n/a
892n/a # check other has woken up
893n/a time.sleep(DELTA)
894n/a self.assertReturnsIfImplemented(2, get_value, woken)
895n/a
896n/a # check state is not mucked up
897n/a self.check_invariant(cond)
898n/a p.join()
899n/a
900n/a def test_notify_all(self):
901n/a cond = self.Condition()
902n/a sleeping = self.Semaphore(0)
903n/a woken = self.Semaphore(0)
904n/a
905n/a # start some threads/processes which will timeout
906n/a for i in range(3):
907n/a p = self.Process(target=self.f,
908n/a args=(cond, sleeping, woken, TIMEOUT1))
909n/a p.daemon = True
910n/a p.start()
911n/a
912n/a t = threading.Thread(target=self.f,
913n/a args=(cond, sleeping, woken, TIMEOUT1))
914n/a t.daemon = True
915n/a t.start()
916n/a
917n/a # wait for them all to sleep
918n/a for i in range(6):
919n/a sleeping.acquire()
920n/a
921n/a # check they have all timed out
922n/a for i in range(6):
923n/a woken.acquire()
924n/a self.assertReturnsIfImplemented(0, get_value, woken)
925n/a
926n/a # check state is not mucked up
927n/a self.check_invariant(cond)
928n/a
929n/a # start some more threads/processes
930n/a for i in range(3):
931n/a p = self.Process(target=self.f, args=(cond, sleeping, woken))
932n/a p.daemon = True
933n/a p.start()
934n/a
935n/a t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
936n/a t.daemon = True
937n/a t.start()
938n/a
939n/a # wait for them to all sleep
940n/a for i in range(6):
941n/a sleeping.acquire()
942n/a
943n/a # check no process/thread has woken up
944n/a time.sleep(DELTA)
945n/a self.assertReturnsIfImplemented(0, get_value, woken)
946n/a
947n/a # wake them all up
948n/a cond.acquire()
949n/a cond.notify_all()
950n/a cond.release()
951n/a
952n/a # check they have all woken
953n/a for i in range(10):
954n/a try:
955n/a if get_value(woken) == 6:
956n/a break
957n/a except NotImplementedError:
958n/a break
959n/a time.sleep(DELTA)
960n/a self.assertReturnsIfImplemented(6, get_value, woken)
961n/a
962n/a # check state is not mucked up
963n/a self.check_invariant(cond)
964n/a
965n/a def test_timeout(self):
966n/a cond = self.Condition()
967n/a wait = TimingWrapper(cond.wait)
968n/a cond.acquire()
969n/a res = wait(TIMEOUT1)
970n/a cond.release()
971n/a self.assertEqual(res, False)
972n/a self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
973n/a
974n/a @classmethod
975n/a def _test_waitfor_f(cls, cond, state):
976n/a with cond:
977n/a state.value = 0
978n/a cond.notify()
979n/a result = cond.wait_for(lambda : state.value==4)
980n/a if not result or state.value != 4:
981n/a sys.exit(1)
982n/a
983n/a @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
984n/a def test_waitfor(self):
985n/a # based on test in test/lock_tests.py
986n/a cond = self.Condition()
987n/a state = self.Value('i', -1)
988n/a
989n/a p = self.Process(target=self._test_waitfor_f, args=(cond, state))
990n/a p.daemon = True
991n/a p.start()
992n/a
993n/a with cond:
994n/a result = cond.wait_for(lambda : state.value==0)
995n/a self.assertTrue(result)
996n/a self.assertEqual(state.value, 0)
997n/a
998n/a for i in range(4):
999n/a time.sleep(0.01)
1000n/a with cond:
1001n/a state.value += 1
1002n/a cond.notify()
1003n/a
1004n/a p.join(5)
1005n/a self.assertFalse(p.is_alive())
1006n/a self.assertEqual(p.exitcode, 0)
1007n/a
1008n/a @classmethod
1009n/a def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1010n/a sem.release()
1011n/a with cond:
1012n/a expected = 0.1
1013n/a dt = time.time()
1014n/a result = cond.wait_for(lambda : state.value==4, timeout=expected)
1015n/a dt = time.time() - dt
1016n/a # borrow logic in assertTimeout() from test/lock_tests.py
1017n/a if not result and expected * 0.6 < dt < expected * 10.0:
1018n/a success.value = True
1019n/a
1020n/a @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1021n/a def test_waitfor_timeout(self):
1022n/a # based on test in test/lock_tests.py
1023n/a cond = self.Condition()
1024n/a state = self.Value('i', 0)
1025n/a success = self.Value('i', False)
1026n/a sem = self.Semaphore(0)
1027n/a
1028n/a p = self.Process(target=self._test_waitfor_timeout_f,
1029n/a args=(cond, state, success, sem))
1030n/a p.daemon = True
1031n/a p.start()
1032n/a self.assertTrue(sem.acquire(timeout=10))
1033n/a
1034n/a # Only increment 3 times, so state == 4 is never reached.
1035n/a for i in range(3):
1036n/a time.sleep(0.01)
1037n/a with cond:
1038n/a state.value += 1
1039n/a cond.notify()
1040n/a
1041n/a p.join(5)
1042n/a self.assertTrue(success.value)
1043n/a
1044n/a @classmethod
1045n/a def _test_wait_result(cls, c, pid):
1046n/a with c:
1047n/a c.notify()
1048n/a time.sleep(1)
1049n/a if pid is not None:
1050n/a os.kill(pid, signal.SIGINT)
1051n/a
1052n/a def test_wait_result(self):
1053n/a if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1054n/a pid = os.getpid()
1055n/a else:
1056n/a pid = None
1057n/a
1058n/a c = self.Condition()
1059n/a with c:
1060n/a self.assertFalse(c.wait(0))
1061n/a self.assertFalse(c.wait(0.1))
1062n/a
1063n/a p = self.Process(target=self._test_wait_result, args=(c, pid))
1064n/a p.start()
1065n/a
1066n/a self.assertTrue(c.wait(10))
1067n/a if pid is not None:
1068n/a self.assertRaises(KeyboardInterrupt, c.wait, 10)
1069n/a
1070n/a p.join()
1071n/a
1072n/a
1073n/aclass _TestEvent(BaseTestCase):
1074n/a
1075n/a @classmethod
1076n/a def _test_event(cls, event):
1077n/a time.sleep(TIMEOUT2)
1078n/a event.set()
1079n/a
1080n/a def test_event(self):
1081n/a event = self.Event()
1082n/a wait = TimingWrapper(event.wait)
1083n/a
1084n/a # Removed temporarily, due to API shear, this does not
1085n/a # work with threading._Event objects. is_set == isSet
1086n/a self.assertEqual(event.is_set(), False)
1087n/a
1088n/a # Removed, threading.Event.wait() will return the value of the __flag
1089n/a # instead of None. API Shear with the semaphore backed mp.Event
1090n/a self.assertEqual(wait(0.0), False)
1091n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1092n/a self.assertEqual(wait(TIMEOUT1), False)
1093n/a self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1094n/a
1095n/a event.set()
1096n/a
1097n/a # See note above on the API differences
1098n/a self.assertEqual(event.is_set(), True)
1099n/a self.assertEqual(wait(), True)
1100n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1101n/a self.assertEqual(wait(TIMEOUT1), True)
1102n/a self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1103n/a # self.assertEqual(event.is_set(), True)
1104n/a
1105n/a event.clear()
1106n/a
1107n/a #self.assertEqual(event.is_set(), False)
1108n/a
1109n/a p = self.Process(target=self._test_event, args=(event,))
1110n/a p.daemon = True
1111n/a p.start()
1112n/a self.assertEqual(wait(), True)
1113n/a
1114n/a#
1115n/a# Tests for Barrier - adapted from tests in test/lock_tests.py
1116n/a#
1117n/a
1118n/a# Many of the tests for threading.Barrier use a list as an atomic
1119n/a# counter: a value is appended to increment the counter, and the
1120n/a# length of the list gives the value. We use the class DummyList
1121n/a# for the same purpose.
1122n/a
1123n/aclass _DummyList(object):
1124n/a
1125n/a def __init__(self):
1126n/a wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1127n/a lock = multiprocessing.Lock()
1128n/a self.__setstate__((wrapper, lock))
1129n/a self._lengthbuf[0] = 0
1130n/a
1131n/a def __setstate__(self, state):
1132n/a (self._wrapper, self._lock) = state
1133n/a self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1134n/a
1135n/a def __getstate__(self):
1136n/a return (self._wrapper, self._lock)
1137n/a
1138n/a def append(self, _):
1139n/a with self._lock:
1140n/a self._lengthbuf[0] += 1
1141n/a
1142n/a def __len__(self):
1143n/a with self._lock:
1144n/a return self._lengthbuf[0]
1145n/a
1146n/adef _wait():
1147n/a # A crude wait/yield function not relying on synchronization primitives.
1148n/a time.sleep(0.01)
1149n/a
1150n/a
1151n/aclass Bunch(object):
1152n/a """
1153n/a A bunch of threads.
1154n/a """
1155n/a def __init__(self, namespace, f, args, n, wait_before_exit=False):
1156n/a """
1157n/a Construct a bunch of `n` threads running the same function `f`.
1158n/a If `wait_before_exit` is True, the threads won't terminate until
1159n/a do_finish() is called.
1160n/a """
1161n/a self.f = f
1162n/a self.args = args
1163n/a self.n = n
1164n/a self.started = namespace.DummyList()
1165n/a self.finished = namespace.DummyList()
1166n/a self._can_exit = namespace.Event()
1167n/a if not wait_before_exit:
1168n/a self._can_exit.set()
1169n/a for i in range(n):
1170n/a p = namespace.Process(target=self.task)
1171n/a p.daemon = True
1172n/a p.start()
1173n/a
1174n/a def task(self):
1175n/a pid = os.getpid()
1176n/a self.started.append(pid)
1177n/a try:
1178n/a self.f(*self.args)
1179n/a finally:
1180n/a self.finished.append(pid)
1181n/a self._can_exit.wait(30)
1182n/a assert self._can_exit.is_set()
1183n/a
1184n/a def wait_for_started(self):
1185n/a while len(self.started) < self.n:
1186n/a _wait()
1187n/a
1188n/a def wait_for_finished(self):
1189n/a while len(self.finished) < self.n:
1190n/a _wait()
1191n/a
1192n/a def do_finish(self):
1193n/a self._can_exit.set()
1194n/a
1195n/a
1196n/aclass AppendTrue(object):
1197n/a def __init__(self, obj):
1198n/a self.obj = obj
1199n/a def __call__(self):
1200n/a self.obj.append(True)
1201n/a
1202n/a
1203n/aclass _TestBarrier(BaseTestCase):
1204n/a """
1205n/a Tests for Barrier objects.
1206n/a """
1207n/a N = 5
1208n/a defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
1209n/a
1210n/a def setUp(self):
1211n/a self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1212n/a
1213n/a def tearDown(self):
1214n/a self.barrier.abort()
1215n/a self.barrier = None
1216n/a
1217n/a def DummyList(self):
1218n/a if self.TYPE == 'threads':
1219n/a return []
1220n/a elif self.TYPE == 'manager':
1221n/a return self.manager.list()
1222n/a else:
1223n/a return _DummyList()
1224n/a
1225n/a def run_threads(self, f, args):
1226n/a b = Bunch(self, f, args, self.N-1)
1227n/a f(*args)
1228n/a b.wait_for_finished()
1229n/a
1230n/a @classmethod
1231n/a def multipass(cls, barrier, results, n):
1232n/a m = barrier.parties
1233n/a assert m == cls.N
1234n/a for i in range(n):
1235n/a results[0].append(True)
1236n/a assert len(results[1]) == i * m
1237n/a barrier.wait()
1238n/a results[1].append(True)
1239n/a assert len(results[0]) == (i + 1) * m
1240n/a barrier.wait()
1241n/a try:
1242n/a assert barrier.n_waiting == 0
1243n/a except NotImplementedError:
1244n/a pass
1245n/a assert not barrier.broken
1246n/a
1247n/a def test_barrier(self, passes=1):
1248n/a """
1249n/a Test that a barrier is passed in lockstep
1250n/a """
1251n/a results = [self.DummyList(), self.DummyList()]
1252n/a self.run_threads(self.multipass, (self.barrier, results, passes))
1253n/a
1254n/a def test_barrier_10(self):
1255n/a """
1256n/a Test that a barrier works for 10 consecutive runs
1257n/a """
1258n/a return self.test_barrier(10)
1259n/a
1260n/a @classmethod
1261n/a def _test_wait_return_f(cls, barrier, queue):
1262n/a res = barrier.wait()
1263n/a queue.put(res)
1264n/a
1265n/a def test_wait_return(self):
1266n/a """
1267n/a test the return value from barrier.wait
1268n/a """
1269n/a queue = self.Queue()
1270n/a self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1271n/a results = [queue.get() for i in range(self.N)]
1272n/a self.assertEqual(results.count(0), 1)
1273n/a
1274n/a @classmethod
1275n/a def _test_action_f(cls, barrier, results):
1276n/a barrier.wait()
1277n/a if len(results) != 1:
1278n/a raise RuntimeError
1279n/a
1280n/a def test_action(self):
1281n/a """
1282n/a Test the 'action' callback
1283n/a """
1284n/a results = self.DummyList()
1285n/a barrier = self.Barrier(self.N, action=AppendTrue(results))
1286n/a self.run_threads(self._test_action_f, (barrier, results))
1287n/a self.assertEqual(len(results), 1)
1288n/a
1289n/a @classmethod
1290n/a def _test_abort_f(cls, barrier, results1, results2):
1291n/a try:
1292n/a i = barrier.wait()
1293n/a if i == cls.N//2:
1294n/a raise RuntimeError
1295n/a barrier.wait()
1296n/a results1.append(True)
1297n/a except threading.BrokenBarrierError:
1298n/a results2.append(True)
1299n/a except RuntimeError:
1300n/a barrier.abort()
1301n/a
1302n/a def test_abort(self):
1303n/a """
1304n/a Test that an abort will put the barrier in a broken state
1305n/a """
1306n/a results1 = self.DummyList()
1307n/a results2 = self.DummyList()
1308n/a self.run_threads(self._test_abort_f,
1309n/a (self.barrier, results1, results2))
1310n/a self.assertEqual(len(results1), 0)
1311n/a self.assertEqual(len(results2), self.N-1)
1312n/a self.assertTrue(self.barrier.broken)
1313n/a
1314n/a @classmethod
1315n/a def _test_reset_f(cls, barrier, results1, results2, results3):
1316n/a i = barrier.wait()
1317n/a if i == cls.N//2:
1318n/a # Wait until the other threads are all in the barrier.
1319n/a while barrier.n_waiting < cls.N-1:
1320n/a time.sleep(0.001)
1321n/a barrier.reset()
1322n/a else:
1323n/a try:
1324n/a barrier.wait()
1325n/a results1.append(True)
1326n/a except threading.BrokenBarrierError:
1327n/a results2.append(True)
1328n/a # Now, pass the barrier again
1329n/a barrier.wait()
1330n/a results3.append(True)
1331n/a
1332n/a def test_reset(self):
1333n/a """
1334n/a Test that a 'reset' on a barrier frees the waiting threads
1335n/a """
1336n/a results1 = self.DummyList()
1337n/a results2 = self.DummyList()
1338n/a results3 = self.DummyList()
1339n/a self.run_threads(self._test_reset_f,
1340n/a (self.barrier, results1, results2, results3))
1341n/a self.assertEqual(len(results1), 0)
1342n/a self.assertEqual(len(results2), self.N-1)
1343n/a self.assertEqual(len(results3), self.N)
1344n/a
1345n/a @classmethod
1346n/a def _test_abort_and_reset_f(cls, barrier, barrier2,
1347n/a results1, results2, results3):
1348n/a try:
1349n/a i = barrier.wait()
1350n/a if i == cls.N//2:
1351n/a raise RuntimeError
1352n/a barrier.wait()
1353n/a results1.append(True)
1354n/a except threading.BrokenBarrierError:
1355n/a results2.append(True)
1356n/a except RuntimeError:
1357n/a barrier.abort()
1358n/a # Synchronize and reset the barrier. Must synchronize first so
1359n/a # that everyone has left it when we reset, and after so that no
1360n/a # one enters it before the reset.
1361n/a if barrier2.wait() == cls.N//2:
1362n/a barrier.reset()
1363n/a barrier2.wait()
1364n/a barrier.wait()
1365n/a results3.append(True)
1366n/a
1367n/a def test_abort_and_reset(self):
1368n/a """
1369n/a Test that a barrier can be reset after being broken.
1370n/a """
1371n/a results1 = self.DummyList()
1372n/a results2 = self.DummyList()
1373n/a results3 = self.DummyList()
1374n/a barrier2 = self.Barrier(self.N)
1375n/a
1376n/a self.run_threads(self._test_abort_and_reset_f,
1377n/a (self.barrier, barrier2, results1, results2, results3))
1378n/a self.assertEqual(len(results1), 0)
1379n/a self.assertEqual(len(results2), self.N-1)
1380n/a self.assertEqual(len(results3), self.N)
1381n/a
1382n/a @classmethod
1383n/a def _test_timeout_f(cls, barrier, results):
1384n/a i = barrier.wait()
1385n/a if i == cls.N//2:
1386n/a # One thread is late!
1387n/a time.sleep(1.0)
1388n/a try:
1389n/a barrier.wait(0.5)
1390n/a except threading.BrokenBarrierError:
1391n/a results.append(True)
1392n/a
1393n/a def test_timeout(self):
1394n/a """
1395n/a Test wait(timeout)
1396n/a """
1397n/a results = self.DummyList()
1398n/a self.run_threads(self._test_timeout_f, (self.barrier, results))
1399n/a self.assertEqual(len(results), self.barrier.parties)
1400n/a
1401n/a @classmethod
1402n/a def _test_default_timeout_f(cls, barrier, results):
1403n/a i = barrier.wait(cls.defaultTimeout)
1404n/a if i == cls.N//2:
1405n/a # One thread is later than the default timeout
1406n/a time.sleep(1.0)
1407n/a try:
1408n/a barrier.wait()
1409n/a except threading.BrokenBarrierError:
1410n/a results.append(True)
1411n/a
1412n/a def test_default_timeout(self):
1413n/a """
1414n/a Test the barrier's default timeout
1415n/a """
1416n/a barrier = self.Barrier(self.N, timeout=0.5)
1417n/a results = self.DummyList()
1418n/a self.run_threads(self._test_default_timeout_f, (barrier, results))
1419n/a self.assertEqual(len(results), barrier.parties)
1420n/a
1421n/a def test_single_thread(self):
1422n/a b = self.Barrier(1)
1423n/a b.wait()
1424n/a b.wait()
1425n/a
1426n/a @classmethod
1427n/a def _test_thousand_f(cls, barrier, passes, conn, lock):
1428n/a for i in range(passes):
1429n/a barrier.wait()
1430n/a with lock:
1431n/a conn.send(i)
1432n/a
1433n/a def test_thousand(self):
1434n/a if self.TYPE == 'manager':
1435n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
1436n/a passes = 1000
1437n/a lock = self.Lock()
1438n/a conn, child_conn = self.Pipe(False)
1439n/a for j in range(self.N):
1440n/a p = self.Process(target=self._test_thousand_f,
1441n/a args=(self.barrier, passes, child_conn, lock))
1442n/a p.start()
1443n/a
1444n/a for i in range(passes):
1445n/a for j in range(self.N):
1446n/a self.assertEqual(conn.recv(), i)
1447n/a
1448n/a#
1449n/a#
1450n/a#
1451n/a
1452n/aclass _TestValue(BaseTestCase):
1453n/a
1454n/a ALLOWED_TYPES = ('processes',)
1455n/a
1456n/a codes_values = [
1457n/a ('i', 4343, 24234),
1458n/a ('d', 3.625, -4.25),
1459n/a ('h', -232, 234),
1460n/a ('c', latin('x'), latin('y'))
1461n/a ]
1462n/a
1463n/a def setUp(self):
1464n/a if not HAS_SHAREDCTYPES:
1465n/a self.skipTest("requires multiprocessing.sharedctypes")
1466n/a
1467n/a @classmethod
1468n/a def _test(cls, values):
1469n/a for sv, cv in zip(values, cls.codes_values):
1470n/a sv.value = cv[2]
1471n/a
1472n/a
1473n/a def test_value(self, raw=False):
1474n/a if raw:
1475n/a values = [self.RawValue(code, value)
1476n/a for code, value, _ in self.codes_values]
1477n/a else:
1478n/a values = [self.Value(code, value)
1479n/a for code, value, _ in self.codes_values]
1480n/a
1481n/a for sv, cv in zip(values, self.codes_values):
1482n/a self.assertEqual(sv.value, cv[1])
1483n/a
1484n/a proc = self.Process(target=self._test, args=(values,))
1485n/a proc.daemon = True
1486n/a proc.start()
1487n/a proc.join()
1488n/a
1489n/a for sv, cv in zip(values, self.codes_values):
1490n/a self.assertEqual(sv.value, cv[2])
1491n/a
1492n/a def test_rawvalue(self):
1493n/a self.test_value(raw=True)
1494n/a
1495n/a def test_getobj_getlock(self):
1496n/a val1 = self.Value('i', 5)
1497n/a lock1 = val1.get_lock()
1498n/a obj1 = val1.get_obj()
1499n/a
1500n/a val2 = self.Value('i', 5, lock=None)
1501n/a lock2 = val2.get_lock()
1502n/a obj2 = val2.get_obj()
1503n/a
1504n/a lock = self.Lock()
1505n/a val3 = self.Value('i', 5, lock=lock)
1506n/a lock3 = val3.get_lock()
1507n/a obj3 = val3.get_obj()
1508n/a self.assertEqual(lock, lock3)
1509n/a
1510n/a arr4 = self.Value('i', 5, lock=False)
1511n/a self.assertFalse(hasattr(arr4, 'get_lock'))
1512n/a self.assertFalse(hasattr(arr4, 'get_obj'))
1513n/a
1514n/a self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1515n/a
1516n/a arr5 = self.RawValue('i', 5)
1517n/a self.assertFalse(hasattr(arr5, 'get_lock'))
1518n/a self.assertFalse(hasattr(arr5, 'get_obj'))
1519n/a
1520n/a
1521n/aclass _TestArray(BaseTestCase):
1522n/a
1523n/a ALLOWED_TYPES = ('processes',)
1524n/a
1525n/a @classmethod
1526n/a def f(cls, seq):
1527n/a for i in range(1, len(seq)):
1528n/a seq[i] += seq[i-1]
1529n/a
1530n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1531n/a def test_array(self, raw=False):
1532n/a seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1533n/a if raw:
1534n/a arr = self.RawArray('i', seq)
1535n/a else:
1536n/a arr = self.Array('i', seq)
1537n/a
1538n/a self.assertEqual(len(arr), len(seq))
1539n/a self.assertEqual(arr[3], seq[3])
1540n/a self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1541n/a
1542n/a arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1543n/a
1544n/a self.assertEqual(list(arr[:]), seq)
1545n/a
1546n/a self.f(seq)
1547n/a
1548n/a p = self.Process(target=self.f, args=(arr,))
1549n/a p.daemon = True
1550n/a p.start()
1551n/a p.join()
1552n/a
1553n/a self.assertEqual(list(arr[:]), seq)
1554n/a
1555n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1556n/a def test_array_from_size(self):
1557n/a size = 10
1558n/a # Test for zeroing (see issue #11675).
1559n/a # The repetition below strengthens the test by increasing the chances
1560n/a # of previously allocated non-zero memory being used for the new array
1561n/a # on the 2nd and 3rd loops.
1562n/a for _ in range(3):
1563n/a arr = self.Array('i', size)
1564n/a self.assertEqual(len(arr), size)
1565n/a self.assertEqual(list(arr), [0] * size)
1566n/a arr[:] = range(10)
1567n/a self.assertEqual(list(arr), list(range(10)))
1568n/a del arr
1569n/a
1570n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1571n/a def test_rawarray(self):
1572n/a self.test_array(raw=True)
1573n/a
1574n/a @unittest.skipIf(c_int is None, "requires _ctypes")
1575n/a def test_getobj_getlock_obj(self):
1576n/a arr1 = self.Array('i', list(range(10)))
1577n/a lock1 = arr1.get_lock()
1578n/a obj1 = arr1.get_obj()
1579n/a
1580n/a arr2 = self.Array('i', list(range(10)), lock=None)
1581n/a lock2 = arr2.get_lock()
1582n/a obj2 = arr2.get_obj()
1583n/a
1584n/a lock = self.Lock()
1585n/a arr3 = self.Array('i', list(range(10)), lock=lock)
1586n/a lock3 = arr3.get_lock()
1587n/a obj3 = arr3.get_obj()
1588n/a self.assertEqual(lock, lock3)
1589n/a
1590n/a arr4 = self.Array('i', range(10), lock=False)
1591n/a self.assertFalse(hasattr(arr4, 'get_lock'))
1592n/a self.assertFalse(hasattr(arr4, 'get_obj'))
1593n/a self.assertRaises(AttributeError,
1594n/a self.Array, 'i', range(10), lock='notalock')
1595n/a
1596n/a arr5 = self.RawArray('i', range(10))
1597n/a self.assertFalse(hasattr(arr5, 'get_lock'))
1598n/a self.assertFalse(hasattr(arr5, 'get_obj'))
1599n/a
1600n/a#
1601n/a#
1602n/a#
1603n/a
1604n/aclass _TestContainers(BaseTestCase):
1605n/a
1606n/a ALLOWED_TYPES = ('manager',)
1607n/a
1608n/a def test_list(self):
1609n/a a = self.list(list(range(10)))
1610n/a self.assertEqual(a[:], list(range(10)))
1611n/a
1612n/a b = self.list()
1613n/a self.assertEqual(b[:], [])
1614n/a
1615n/a b.extend(list(range(5)))
1616n/a self.assertEqual(b[:], list(range(5)))
1617n/a
1618n/a self.assertEqual(b[2], 2)
1619n/a self.assertEqual(b[2:10], [2,3,4])
1620n/a
1621n/a b *= 2
1622n/a self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1623n/a
1624n/a self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1625n/a
1626n/a self.assertEqual(a[:], list(range(10)))
1627n/a
1628n/a d = [a, b]
1629n/a e = self.list(d)
1630n/a self.assertEqual(
1631n/a [element[:] for element in e],
1632n/a [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1633n/a )
1634n/a
1635n/a f = self.list([a])
1636n/a a.append('hello')
1637n/a self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
1638n/a
1639n/a def test_list_proxy_in_list(self):
1640n/a a = self.list([self.list(range(3)) for _i in range(3)])
1641n/a self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
1642n/a
1643n/a a[0][-1] = 55
1644n/a self.assertEqual(a[0][:], [0, 1, 55])
1645n/a for i in range(1, 3):
1646n/a self.assertEqual(a[i][:], [0, 1, 2])
1647n/a
1648n/a self.assertEqual(a[1].pop(), 2)
1649n/a self.assertEqual(len(a[1]), 2)
1650n/a for i in range(0, 3, 2):
1651n/a self.assertEqual(len(a[i]), 3)
1652n/a
1653n/a del a
1654n/a
1655n/a b = self.list()
1656n/a b.append(b)
1657n/a del b
1658n/a
1659n/a def test_dict(self):
1660n/a d = self.dict()
1661n/a indices = list(range(65, 70))
1662n/a for i in indices:
1663n/a d[i] = chr(i)
1664n/a self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1665n/a self.assertEqual(sorted(d.keys()), indices)
1666n/a self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1667n/a self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1668n/a
1669n/a def test_dict_proxy_nested(self):
1670n/a pets = self.dict(ferrets=2, hamsters=4)
1671n/a supplies = self.dict(water=10, feed=3)
1672n/a d = self.dict(pets=pets, supplies=supplies)
1673n/a
1674n/a self.assertEqual(supplies['water'], 10)
1675n/a self.assertEqual(d['supplies']['water'], 10)
1676n/a
1677n/a d['supplies']['blankets'] = 5
1678n/a self.assertEqual(supplies['blankets'], 5)
1679n/a self.assertEqual(d['supplies']['blankets'], 5)
1680n/a
1681n/a d['supplies']['water'] = 7
1682n/a self.assertEqual(supplies['water'], 7)
1683n/a self.assertEqual(d['supplies']['water'], 7)
1684n/a
1685n/a del pets
1686n/a del supplies
1687n/a self.assertEqual(d['pets']['ferrets'], 2)
1688n/a d['supplies']['blankets'] = 11
1689n/a self.assertEqual(d['supplies']['blankets'], 11)
1690n/a
1691n/a pets = d['pets']
1692n/a supplies = d['supplies']
1693n/a supplies['water'] = 7
1694n/a self.assertEqual(supplies['water'], 7)
1695n/a self.assertEqual(d['supplies']['water'], 7)
1696n/a
1697n/a d.clear()
1698n/a self.assertEqual(len(d), 0)
1699n/a self.assertEqual(supplies['water'], 7)
1700n/a self.assertEqual(pets['hamsters'], 4)
1701n/a
1702n/a l = self.list([pets, supplies])
1703n/a l[0]['marmots'] = 1
1704n/a self.assertEqual(pets['marmots'], 1)
1705n/a self.assertEqual(l[0]['marmots'], 1)
1706n/a
1707n/a del pets
1708n/a del supplies
1709n/a self.assertEqual(l[0]['marmots'], 1)
1710n/a
1711n/a outer = self.list([[88, 99], l])
1712n/a self.assertIsInstance(outer[0], list) # Not a ListProxy
1713n/a self.assertEqual(outer[-1][-1]['feed'], 3)
1714n/a
1715n/a def test_namespace(self):
1716n/a n = self.Namespace()
1717n/a n.name = 'Bob'
1718n/a n.job = 'Builder'
1719n/a n._hidden = 'hidden'
1720n/a self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1721n/a del n.job
1722n/a self.assertEqual(str(n), "Namespace(name='Bob')")
1723n/a self.assertTrue(hasattr(n, 'name'))
1724n/a self.assertTrue(not hasattr(n, 'job'))
1725n/a
1726n/a#
1727n/a#
1728n/a#
1729n/a
1730n/adef sqr(x, wait=0.0):
1731n/a time.sleep(wait)
1732n/a return x*x
1733n/a
1734n/adef mul(x, y):
1735n/a return x*y
1736n/a
1737n/adef raise_large_valuerror(wait):
1738n/a time.sleep(wait)
1739n/a raise ValueError("x" * 1024**2)
1740n/a
1741n/aclass SayWhenError(ValueError): pass
1742n/a
1743n/adef exception_throwing_generator(total, when):
1744n/a for i in range(total):
1745n/a if i == when:
1746n/a raise SayWhenError("Somebody said when")
1747n/a yield i
1748n/a
1749n/aclass _TestPool(BaseTestCase):
1750n/a
1751n/a @classmethod
1752n/a def setUpClass(cls):
1753n/a super().setUpClass()
1754n/a cls.pool = cls.Pool(4)
1755n/a
1756n/a @classmethod
1757n/a def tearDownClass(cls):
1758n/a cls.pool.terminate()
1759n/a cls.pool.join()
1760n/a cls.pool = None
1761n/a super().tearDownClass()
1762n/a
1763n/a def test_apply(self):
1764n/a papply = self.pool.apply
1765n/a self.assertEqual(papply(sqr, (5,)), sqr(5))
1766n/a self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1767n/a
1768n/a def test_map(self):
1769n/a pmap = self.pool.map
1770n/a self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1771n/a self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1772n/a list(map(sqr, list(range(100)))))
1773n/a
1774n/a def test_starmap(self):
1775n/a psmap = self.pool.starmap
1776n/a tuples = list(zip(range(10), range(9,-1, -1)))
1777n/a self.assertEqual(psmap(mul, tuples),
1778n/a list(itertools.starmap(mul, tuples)))
1779n/a tuples = list(zip(range(100), range(99,-1, -1)))
1780n/a self.assertEqual(psmap(mul, tuples, chunksize=20),
1781n/a list(itertools.starmap(mul, tuples)))
1782n/a
1783n/a def test_starmap_async(self):
1784n/a tuples = list(zip(range(100), range(99,-1, -1)))
1785n/a self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1786n/a list(itertools.starmap(mul, tuples)))
1787n/a
1788n/a def test_map_async(self):
1789n/a self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1790n/a list(map(sqr, list(range(10)))))
1791n/a
1792n/a def test_map_async_callbacks(self):
1793n/a call_args = self.manager.list() if self.TYPE == 'manager' else []
1794n/a self.pool.map_async(int, ['1'],
1795n/a callback=call_args.append,
1796n/a error_callback=call_args.append).wait()
1797n/a self.assertEqual(1, len(call_args))
1798n/a self.assertEqual([1], call_args[0])
1799n/a self.pool.map_async(int, ['a'],
1800n/a callback=call_args.append,
1801n/a error_callback=call_args.append).wait()
1802n/a self.assertEqual(2, len(call_args))
1803n/a self.assertIsInstance(call_args[1], ValueError)
1804n/a
1805n/a def test_map_unplicklable(self):
1806n/a # Issue #19425 -- failure to pickle should not cause a hang
1807n/a if self.TYPE == 'threads':
1808n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
1809n/a class A(object):
1810n/a def __reduce__(self):
1811n/a raise RuntimeError('cannot pickle')
1812n/a with self.assertRaises(RuntimeError):
1813n/a self.pool.map(sqr, [A()]*10)
1814n/a
1815n/a def test_map_chunksize(self):
1816n/a try:
1817n/a self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1818n/a except multiprocessing.TimeoutError:
1819n/a self.fail("pool.map_async with chunksize stalled on null list")
1820n/a
1821n/a def test_async(self):
1822n/a res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1823n/a get = TimingWrapper(res.get)
1824n/a self.assertEqual(get(), 49)
1825n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1826n/a
1827n/a def test_async_timeout(self):
1828n/a res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1829n/a get = TimingWrapper(res.get)
1830n/a self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1831n/a self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1832n/a
1833n/a def test_imap(self):
1834n/a it = self.pool.imap(sqr, list(range(10)))
1835n/a self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1836n/a
1837n/a it = self.pool.imap(sqr, list(range(10)))
1838n/a for i in range(10):
1839n/a self.assertEqual(next(it), i*i)
1840n/a self.assertRaises(StopIteration, it.__next__)
1841n/a
1842n/a it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1843n/a for i in range(1000):
1844n/a self.assertEqual(next(it), i*i)
1845n/a self.assertRaises(StopIteration, it.__next__)
1846n/a
1847n/a def test_imap_handle_iterable_exception(self):
1848n/a if self.TYPE == 'manager':
1849n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
1850n/a
1851n/a it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1852n/a for i in range(3):
1853n/a self.assertEqual(next(it), i*i)
1854n/a self.assertRaises(SayWhenError, it.__next__)
1855n/a
1856n/a # SayWhenError seen at start of problematic chunk's results
1857n/a it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1858n/a for i in range(6):
1859n/a self.assertEqual(next(it), i*i)
1860n/a self.assertRaises(SayWhenError, it.__next__)
1861n/a it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1862n/a for i in range(4):
1863n/a self.assertEqual(next(it), i*i)
1864n/a self.assertRaises(SayWhenError, it.__next__)
1865n/a
1866n/a def test_imap_unordered(self):
1867n/a it = self.pool.imap_unordered(sqr, list(range(1000)))
1868n/a self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1869n/a
1870n/a it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1871n/a self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1872n/a
1873n/a def test_imap_unordered_handle_iterable_exception(self):
1874n/a if self.TYPE == 'manager':
1875n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
1876n/a
1877n/a it = self.pool.imap_unordered(sqr,
1878n/a exception_throwing_generator(10, 3),
1879n/a 1)
1880n/a expected_values = list(map(sqr, list(range(10))))
1881n/a with self.assertRaises(SayWhenError):
1882n/a # imap_unordered makes it difficult to anticipate the SayWhenError
1883n/a for i in range(10):
1884n/a value = next(it)
1885n/a self.assertIn(value, expected_values)
1886n/a expected_values.remove(value)
1887n/a
1888n/a it = self.pool.imap_unordered(sqr,
1889n/a exception_throwing_generator(20, 7),
1890n/a 2)
1891n/a expected_values = list(map(sqr, list(range(20))))
1892n/a with self.assertRaises(SayWhenError):
1893n/a for i in range(20):
1894n/a value = next(it)
1895n/a self.assertIn(value, expected_values)
1896n/a expected_values.remove(value)
1897n/a
1898n/a def test_make_pool(self):
1899n/a expected_error = (RemoteError if self.TYPE == 'manager'
1900n/a else ValueError)
1901n/a
1902n/a self.assertRaises(expected_error, self.Pool, -1)
1903n/a self.assertRaises(expected_error, self.Pool, 0)
1904n/a
1905n/a if self.TYPE != 'manager':
1906n/a p = self.Pool(3)
1907n/a try:
1908n/a self.assertEqual(3, len(p._pool))
1909n/a finally:
1910n/a p.close()
1911n/a p.join()
1912n/a
1913n/a def test_terminate(self):
1914n/a result = self.pool.map_async(
1915n/a time.sleep, [0.1 for i in range(10000)], chunksize=1
1916n/a )
1917n/a self.pool.terminate()
1918n/a join = TimingWrapper(self.pool.join)
1919n/a join()
1920n/a # Sanity check the pool didn't wait for all tasks to finish
1921n/a self.assertLess(join.elapsed, 2.0)
1922n/a
1923n/a def test_empty_iterable(self):
1924n/a # See Issue 12157
1925n/a p = self.Pool(1)
1926n/a
1927n/a self.assertEqual(p.map(sqr, []), [])
1928n/a self.assertEqual(list(p.imap(sqr, [])), [])
1929n/a self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1930n/a self.assertEqual(p.map_async(sqr, []).get(), [])
1931n/a
1932n/a p.close()
1933n/a p.join()
1934n/a
1935n/a def test_context(self):
1936n/a if self.TYPE == 'processes':
1937n/a L = list(range(10))
1938n/a expected = [sqr(i) for i in L]
1939n/a with self.Pool(2) as p:
1940n/a r = p.map_async(sqr, L)
1941n/a self.assertEqual(r.get(), expected)
1942n/a self.assertRaises(ValueError, p.map_async, sqr, L)
1943n/a
1944n/a @classmethod
1945n/a def _test_traceback(cls):
1946n/a raise RuntimeError(123) # some comment
1947n/a
1948n/a def test_traceback(self):
1949n/a # We want ensure that the traceback from the child process is
1950n/a # contained in the traceback raised in the main process.
1951n/a if self.TYPE == 'processes':
1952n/a with self.Pool(1) as p:
1953n/a try:
1954n/a p.apply(self._test_traceback)
1955n/a except Exception as e:
1956n/a exc = e
1957n/a else:
1958n/a raise AssertionError('expected RuntimeError')
1959n/a self.assertIs(type(exc), RuntimeError)
1960n/a self.assertEqual(exc.args, (123,))
1961n/a cause = exc.__cause__
1962n/a self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1963n/a self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1964n/a
1965n/a with test.support.captured_stderr() as f1:
1966n/a try:
1967n/a raise exc
1968n/a except RuntimeError:
1969n/a sys.excepthook(*sys.exc_info())
1970n/a self.assertIn('raise RuntimeError(123) # some comment',
1971n/a f1.getvalue())
1972n/a
1973n/a @classmethod
1974n/a def _test_wrapped_exception(cls):
1975n/a raise RuntimeError('foo')
1976n/a
1977n/a def test_wrapped_exception(self):
1978n/a # Issue #20980: Should not wrap exception when using thread pool
1979n/a with self.Pool(1) as p:
1980n/a with self.assertRaises(RuntimeError):
1981n/a p.apply(self._test_wrapped_exception)
1982n/a
1983n/a def test_map_no_failfast(self):
1984n/a # Issue #23992: the fail-fast behaviour when an exception is raised
1985n/a # during map() would make Pool.join() deadlock, because a worker
1986n/a # process would fill the result queue (after the result handler thread
1987n/a # terminated, hence not draining it anymore).
1988n/a
1989n/a t_start = time.time()
1990n/a
1991n/a with self.assertRaises(ValueError):
1992n/a with self.Pool(2) as p:
1993n/a try:
1994n/a p.map(raise_large_valuerror, [0, 1])
1995n/a finally:
1996n/a time.sleep(0.5)
1997n/a p.close()
1998n/a p.join()
1999n/a
2000n/a # check that we indeed waited for all jobs
2001n/a self.assertGreater(time.time() - t_start, 0.9)
2002n/a
2003n/a
2004n/adef raising():
2005n/a raise KeyError("key")
2006n/a
2007n/adef unpickleable_result():
2008n/a return lambda: 42
2009n/a
2010n/aclass _TestPoolWorkerErrors(BaseTestCase):
2011n/a ALLOWED_TYPES = ('processes', )
2012n/a
2013n/a def test_async_error_callback(self):
2014n/a p = multiprocessing.Pool(2)
2015n/a
2016n/a scratchpad = [None]
2017n/a def errback(exc):
2018n/a scratchpad[0] = exc
2019n/a
2020n/a res = p.apply_async(raising, error_callback=errback)
2021n/a self.assertRaises(KeyError, res.get)
2022n/a self.assertTrue(scratchpad[0])
2023n/a self.assertIsInstance(scratchpad[0], KeyError)
2024n/a
2025n/a p.close()
2026n/a p.join()
2027n/a
2028n/a def test_unpickleable_result(self):
2029n/a from multiprocessing.pool import MaybeEncodingError
2030n/a p = multiprocessing.Pool(2)
2031n/a
2032n/a # Make sure we don't lose pool processes because of encoding errors.
2033n/a for iteration in range(20):
2034n/a
2035n/a scratchpad = [None]
2036n/a def errback(exc):
2037n/a scratchpad[0] = exc
2038n/a
2039n/a res = p.apply_async(unpickleable_result, error_callback=errback)
2040n/a self.assertRaises(MaybeEncodingError, res.get)
2041n/a wrapped = scratchpad[0]
2042n/a self.assertTrue(wrapped)
2043n/a self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2044n/a self.assertIsNotNone(wrapped.exc)
2045n/a self.assertIsNotNone(wrapped.value)
2046n/a
2047n/a p.close()
2048n/a p.join()
2049n/a
2050n/aclass _TestPoolWorkerLifetime(BaseTestCase):
2051n/a ALLOWED_TYPES = ('processes', )
2052n/a
2053n/a def test_pool_worker_lifetime(self):
2054n/a p = multiprocessing.Pool(3, maxtasksperchild=10)
2055n/a self.assertEqual(3, len(p._pool))
2056n/a origworkerpids = [w.pid for w in p._pool]
2057n/a # Run many tasks so each worker gets replaced (hopefully)
2058n/a results = []
2059n/a for i in range(100):
2060n/a results.append(p.apply_async(sqr, (i, )))
2061n/a # Fetch the results and verify we got the right answers,
2062n/a # also ensuring all the tasks have completed.
2063n/a for (j, res) in enumerate(results):
2064n/a self.assertEqual(res.get(), sqr(j))
2065n/a # Refill the pool
2066n/a p._repopulate_pool()
2067n/a # Wait until all workers are alive
2068n/a # (countdown * DELTA = 5 seconds max startup process time)
2069n/a countdown = 50
2070n/a while countdown and not all(w.is_alive() for w in p._pool):
2071n/a countdown -= 1
2072n/a time.sleep(DELTA)
2073n/a finalworkerpids = [w.pid for w in p._pool]
2074n/a # All pids should be assigned. See issue #7805.
2075n/a self.assertNotIn(None, origworkerpids)
2076n/a self.assertNotIn(None, finalworkerpids)
2077n/a # Finally, check that the worker pids have changed
2078n/a self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2079n/a p.close()
2080n/a p.join()
2081n/a
2082n/a def test_pool_worker_lifetime_early_close(self):
2083n/a # Issue #10332: closing a pool whose workers have limited lifetimes
2084n/a # before all the tasks completed would make join() hang.
2085n/a p = multiprocessing.Pool(3, maxtasksperchild=1)
2086n/a results = []
2087n/a for i in range(6):
2088n/a results.append(p.apply_async(sqr, (i, 0.3)))
2089n/a p.close()
2090n/a p.join()
2091n/a # check the results
2092n/a for (j, res) in enumerate(results):
2093n/a self.assertEqual(res.get(), sqr(j))
2094n/a
2095n/a#
2096n/a# Test of creating a customized manager class
2097n/a#
2098n/a
2099n/afrom multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2100n/a
2101n/aclass FooBar(object):
2102n/a def f(self):
2103n/a return 'f()'
2104n/a def g(self):
2105n/a raise ValueError
2106n/a def _h(self):
2107n/a return '_h()'
2108n/a
2109n/adef baz():
2110n/a for i in range(10):
2111n/a yield i*i
2112n/a
2113n/aclass IteratorProxy(BaseProxy):
2114n/a _exposed_ = ('__next__',)
2115n/a def __iter__(self):
2116n/a return self
2117n/a def __next__(self):
2118n/a return self._callmethod('__next__')
2119n/a
2120n/aclass MyManager(BaseManager):
2121n/a pass
2122n/a
2123n/aMyManager.register('Foo', callable=FooBar)
2124n/aMyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2125n/aMyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2126n/a
2127n/a
2128n/aclass _TestMyManager(BaseTestCase):
2129n/a
2130n/a ALLOWED_TYPES = ('manager',)
2131n/a
2132n/a def test_mymanager(self):
2133n/a manager = MyManager()
2134n/a manager.start()
2135n/a self.common(manager)
2136n/a manager.shutdown()
2137n/a
2138n/a # If the manager process exited cleanly then the exitcode
2139n/a # will be zero. Otherwise (after a short timeout)
2140n/a # terminate() is used, resulting in an exitcode of -SIGTERM.
2141n/a self.assertEqual(manager._process.exitcode, 0)
2142n/a
2143n/a def test_mymanager_context(self):
2144n/a with MyManager() as manager:
2145n/a self.common(manager)
2146n/a self.assertEqual(manager._process.exitcode, 0)
2147n/a
2148n/a def test_mymanager_context_prestarted(self):
2149n/a manager = MyManager()
2150n/a manager.start()
2151n/a with manager:
2152n/a self.common(manager)
2153n/a self.assertEqual(manager._process.exitcode, 0)
2154n/a
2155n/a def common(self, manager):
2156n/a foo = manager.Foo()
2157n/a bar = manager.Bar()
2158n/a baz = manager.baz()
2159n/a
2160n/a foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2161n/a bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2162n/a
2163n/a self.assertEqual(foo_methods, ['f', 'g'])
2164n/a self.assertEqual(bar_methods, ['f', '_h'])
2165n/a
2166n/a self.assertEqual(foo.f(), 'f()')
2167n/a self.assertRaises(ValueError, foo.g)
2168n/a self.assertEqual(foo._callmethod('f'), 'f()')
2169n/a self.assertRaises(RemoteError, foo._callmethod, '_h')
2170n/a
2171n/a self.assertEqual(bar.f(), 'f()')
2172n/a self.assertEqual(bar._h(), '_h()')
2173n/a self.assertEqual(bar._callmethod('f'), 'f()')
2174n/a self.assertEqual(bar._callmethod('_h'), '_h()')
2175n/a
2176n/a self.assertEqual(list(baz), [i*i for i in range(10)])
2177n/a
2178n/a
2179n/a#
2180n/a# Test of connecting to a remote server and using xmlrpclib for serialization
2181n/a#
2182n/a
2183n/a_queue = pyqueue.Queue()
2184n/adef get_queue():
2185n/a return _queue
2186n/a
2187n/aclass QueueManager(BaseManager):
2188n/a '''manager class used by server process'''
2189n/aQueueManager.register('get_queue', callable=get_queue)
2190n/a
2191n/aclass QueueManager2(BaseManager):
2192n/a '''manager class which specifies the same interface as QueueManager'''
2193n/aQueueManager2.register('get_queue')
2194n/a
2195n/a
2196n/aSERIALIZER = 'xmlrpclib'
2197n/a
2198n/aclass _TestRemoteManager(BaseTestCase):
2199n/a
2200n/a ALLOWED_TYPES = ('manager',)
2201n/a values = ['hello world', None, True, 2.25,
2202n/a 'hall\xe5 v\xe4rlden',
2203n/a '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2204n/a b'hall\xe5 v\xe4rlden',
2205n/a ]
2206n/a result = values[:]
2207n/a
2208n/a @classmethod
2209n/a def _putter(cls, address, authkey):
2210n/a manager = QueueManager2(
2211n/a address=address, authkey=authkey, serializer=SERIALIZER
2212n/a )
2213n/a manager.connect()
2214n/a queue = manager.get_queue()
2215n/a # Note that xmlrpclib will deserialize object as a list not a tuple
2216n/a queue.put(tuple(cls.values))
2217n/a
2218n/a def test_remote(self):
2219n/a authkey = os.urandom(32)
2220n/a
2221n/a manager = QueueManager(
2222n/a address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2223n/a )
2224n/a manager.start()
2225n/a
2226n/a p = self.Process(target=self._putter, args=(manager.address, authkey))
2227n/a p.daemon = True
2228n/a p.start()
2229n/a
2230n/a manager2 = QueueManager2(
2231n/a address=manager.address, authkey=authkey, serializer=SERIALIZER
2232n/a )
2233n/a manager2.connect()
2234n/a queue = manager2.get_queue()
2235n/a
2236n/a self.assertEqual(queue.get(), self.result)
2237n/a
2238n/a # Because we are using xmlrpclib for serialization instead of
2239n/a # pickle this will cause a serialization error.
2240n/a self.assertRaises(Exception, queue.put, time.sleep)
2241n/a
2242n/a # Make queue finalizer run before the server is stopped
2243n/a del queue
2244n/a manager.shutdown()
2245n/a
2246n/aclass _TestManagerRestart(BaseTestCase):
2247n/a
2248n/a @classmethod
2249n/a def _putter(cls, address, authkey):
2250n/a manager = QueueManager(
2251n/a address=address, authkey=authkey, serializer=SERIALIZER)
2252n/a manager.connect()
2253n/a queue = manager.get_queue()
2254n/a queue.put('hello world')
2255n/a
2256n/a def test_rapid_restart(self):
2257n/a authkey = os.urandom(32)
2258n/a manager = QueueManager(
2259n/a address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2260n/a srvr = manager.get_server()
2261n/a addr = srvr.address
2262n/a # Close the connection.Listener socket which gets opened as a part
2263n/a # of manager.get_server(). It's not needed for the test.
2264n/a srvr.listener.close()
2265n/a manager.start()
2266n/a
2267n/a p = self.Process(target=self._putter, args=(manager.address, authkey))
2268n/a p.daemon = True
2269n/a p.start()
2270n/a queue = manager.get_queue()
2271n/a self.assertEqual(queue.get(), 'hello world')
2272n/a del queue
2273n/a manager.shutdown()
2274n/a manager = QueueManager(
2275n/a address=addr, authkey=authkey, serializer=SERIALIZER)
2276n/a try:
2277n/a manager.start()
2278n/a except OSError as e:
2279n/a if e.errno != errno.EADDRINUSE:
2280n/a raise
2281n/a # Retry after some time, in case the old socket was lingering
2282n/a # (sporadic failure on buildbots)
2283n/a time.sleep(1.0)
2284n/a manager = QueueManager(
2285n/a address=addr, authkey=authkey, serializer=SERIALIZER)
2286n/a manager.shutdown()
2287n/a
2288n/a#
2289n/a#
2290n/a#
2291n/a
2292n/aSENTINEL = latin('')
2293n/a
2294n/aclass _TestConnection(BaseTestCase):
2295n/a
2296n/a ALLOWED_TYPES = ('processes', 'threads')
2297n/a
2298n/a @classmethod
2299n/a def _echo(cls, conn):
2300n/a for msg in iter(conn.recv_bytes, SENTINEL):
2301n/a conn.send_bytes(msg)
2302n/a conn.close()
2303n/a
2304n/a def test_connection(self):
2305n/a conn, child_conn = self.Pipe()
2306n/a
2307n/a p = self.Process(target=self._echo, args=(child_conn,))
2308n/a p.daemon = True
2309n/a p.start()
2310n/a
2311n/a seq = [1, 2.25, None]
2312n/a msg = latin('hello world')
2313n/a longmsg = msg * 10
2314n/a arr = array.array('i', list(range(4)))
2315n/a
2316n/a if self.TYPE == 'processes':
2317n/a self.assertEqual(type(conn.fileno()), int)
2318n/a
2319n/a self.assertEqual(conn.send(seq), None)
2320n/a self.assertEqual(conn.recv(), seq)
2321n/a
2322n/a self.assertEqual(conn.send_bytes(msg), None)
2323n/a self.assertEqual(conn.recv_bytes(), msg)
2324n/a
2325n/a if self.TYPE == 'processes':
2326n/a buffer = array.array('i', [0]*10)
2327n/a expected = list(arr) + [0] * (10 - len(arr))
2328n/a self.assertEqual(conn.send_bytes(arr), None)
2329n/a self.assertEqual(conn.recv_bytes_into(buffer),
2330n/a len(arr) * buffer.itemsize)
2331n/a self.assertEqual(list(buffer), expected)
2332n/a
2333n/a buffer = array.array('i', [0]*10)
2334n/a expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2335n/a self.assertEqual(conn.send_bytes(arr), None)
2336n/a self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2337n/a len(arr) * buffer.itemsize)
2338n/a self.assertEqual(list(buffer), expected)
2339n/a
2340n/a buffer = bytearray(latin(' ' * 40))
2341n/a self.assertEqual(conn.send_bytes(longmsg), None)
2342n/a try:
2343n/a res = conn.recv_bytes_into(buffer)
2344n/a except multiprocessing.BufferTooShort as e:
2345n/a self.assertEqual(e.args, (longmsg,))
2346n/a else:
2347n/a self.fail('expected BufferTooShort, got %s' % res)
2348n/a
2349n/a poll = TimingWrapper(conn.poll)
2350n/a
2351n/a self.assertEqual(poll(), False)
2352n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2353n/a
2354n/a self.assertEqual(poll(-1), False)
2355n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2356n/a
2357n/a self.assertEqual(poll(TIMEOUT1), False)
2358n/a self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2359n/a
2360n/a conn.send(None)
2361n/a time.sleep(.1)
2362n/a
2363n/a self.assertEqual(poll(TIMEOUT1), True)
2364n/a self.assertTimingAlmostEqual(poll.elapsed, 0)
2365n/a
2366n/a self.assertEqual(conn.recv(), None)
2367n/a
2368n/a really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2369n/a conn.send_bytes(really_big_msg)
2370n/a self.assertEqual(conn.recv_bytes(), really_big_msg)
2371n/a
2372n/a conn.send_bytes(SENTINEL) # tell child to quit
2373n/a child_conn.close()
2374n/a
2375n/a if self.TYPE == 'processes':
2376n/a self.assertEqual(conn.readable, True)
2377n/a self.assertEqual(conn.writable, True)
2378n/a self.assertRaises(EOFError, conn.recv)
2379n/a self.assertRaises(EOFError, conn.recv_bytes)
2380n/a
2381n/a p.join()
2382n/a
2383n/a def test_duplex_false(self):
2384n/a reader, writer = self.Pipe(duplex=False)
2385n/a self.assertEqual(writer.send(1), None)
2386n/a self.assertEqual(reader.recv(), 1)
2387n/a if self.TYPE == 'processes':
2388n/a self.assertEqual(reader.readable, True)
2389n/a self.assertEqual(reader.writable, False)
2390n/a self.assertEqual(writer.readable, False)
2391n/a self.assertEqual(writer.writable, True)
2392n/a self.assertRaises(OSError, reader.send, 2)
2393n/a self.assertRaises(OSError, writer.recv)
2394n/a self.assertRaises(OSError, writer.poll)
2395n/a
2396n/a def test_spawn_close(self):
2397n/a # We test that a pipe connection can be closed by parent
2398n/a # process immediately after child is spawned. On Windows this
2399n/a # would have sometimes failed on old versions because
2400n/a # child_conn would be closed before the child got a chance to
2401n/a # duplicate it.
2402n/a conn, child_conn = self.Pipe()
2403n/a
2404n/a p = self.Process(target=self._echo, args=(child_conn,))
2405n/a p.daemon = True
2406n/a p.start()
2407n/a child_conn.close() # this might complete before child initializes
2408n/a
2409n/a msg = latin('hello')
2410n/a conn.send_bytes(msg)
2411n/a self.assertEqual(conn.recv_bytes(), msg)
2412n/a
2413n/a conn.send_bytes(SENTINEL)
2414n/a conn.close()
2415n/a p.join()
2416n/a
2417n/a def test_sendbytes(self):
2418n/a if self.TYPE != 'processes':
2419n/a self.skipTest('test not appropriate for {}'.format(self.TYPE))
2420n/a
2421n/a msg = latin('abcdefghijklmnopqrstuvwxyz')
2422n/a a, b = self.Pipe()
2423n/a
2424n/a a.send_bytes(msg)
2425n/a self.assertEqual(b.recv_bytes(), msg)
2426n/a
2427n/a a.send_bytes(msg, 5)
2428n/a self.assertEqual(b.recv_bytes(), msg[5:])
2429n/a
2430n/a a.send_bytes(msg, 7, 8)
2431n/a self.assertEqual(b.recv_bytes(), msg[7:7+8])
2432n/a
2433n/a a.send_bytes(msg, 26)
2434n/a self.assertEqual(b.recv_bytes(), latin(''))
2435n/a
2436n/a a.send_bytes(msg, 26, 0)
2437n/a self.assertEqual(b.recv_bytes(), latin(''))
2438n/a
2439n/a self.assertRaises(ValueError, a.send_bytes, msg, 27)
2440n/a
2441n/a self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2442n/a
2443n/a self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2444n/a
2445n/a self.assertRaises(ValueError, a.send_bytes, msg, -1)
2446n/a
2447n/a self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2448n/a
2449n/a @classmethod
2450n/a def _is_fd_assigned(cls, fd):
2451n/a try:
2452n/a os.fstat(fd)
2453n/a except OSError as e:
2454n/a if e.errno == errno.EBADF:
2455n/a return False
2456n/a raise
2457n/a else:
2458n/a return True
2459n/a
2460n/a @classmethod
2461n/a def _writefd(cls, conn, data, create_dummy_fds=False):
2462n/a if create_dummy_fds:
2463n/a for i in range(0, 256):
2464n/a if not cls._is_fd_assigned(i):
2465n/a os.dup2(conn.fileno(), i)
2466n/a fd = reduction.recv_handle(conn)
2467n/a if msvcrt:
2468n/a fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2469n/a os.write(fd, data)
2470n/a os.close(fd)
2471n/a
2472n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2473n/a def test_fd_transfer(self):
2474n/a if self.TYPE != 'processes':
2475n/a self.skipTest("only makes sense with processes")
2476n/a conn, child_conn = self.Pipe(duplex=True)
2477n/a
2478n/a p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
2479n/a p.daemon = True
2480n/a p.start()
2481n/a self.addCleanup(test.support.unlink, test.support.TESTFN)
2482n/a with open(test.support.TESTFN, "wb") as f:
2483n/a fd = f.fileno()
2484n/a if msvcrt:
2485n/a fd = msvcrt.get_osfhandle(fd)
2486n/a reduction.send_handle(conn, fd, p.pid)
2487n/a p.join()
2488n/a with open(test.support.TESTFN, "rb") as f:
2489n/a self.assertEqual(f.read(), b"foo")
2490n/a
2491n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2492n/a @unittest.skipIf(sys.platform == "win32",
2493n/a "test semantics don't make sense on Windows")
2494n/a @unittest.skipIf(MAXFD <= 256,
2495n/a "largest assignable fd number is too small")
2496n/a @unittest.skipUnless(hasattr(os, "dup2"),
2497n/a "test needs os.dup2()")
2498n/a def test_large_fd_transfer(self):
2499n/a # With fd > 256 (issue #11657)
2500n/a if self.TYPE != 'processes':
2501n/a self.skipTest("only makes sense with processes")
2502n/a conn, child_conn = self.Pipe(duplex=True)
2503n/a
2504n/a p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
2505n/a p.daemon = True
2506n/a p.start()
2507n/a self.addCleanup(test.support.unlink, test.support.TESTFN)
2508n/a with open(test.support.TESTFN, "wb") as f:
2509n/a fd = f.fileno()
2510n/a for newfd in range(256, MAXFD):
2511n/a if not self._is_fd_assigned(newfd):
2512n/a break
2513n/a else:
2514n/a self.fail("could not find an unassigned large file descriptor")
2515n/a os.dup2(fd, newfd)
2516n/a try:
2517n/a reduction.send_handle(conn, newfd, p.pid)
2518n/a finally:
2519n/a os.close(newfd)
2520n/a p.join()
2521n/a with open(test.support.TESTFN, "rb") as f:
2522n/a self.assertEqual(f.read(), b"bar")
2523n/a
2524n/a @classmethod
2525n/a def _send_data_without_fd(self, conn):
2526n/a os.write(conn.fileno(), b"\0")
2527n/a
2528n/a @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2529n/a @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2530n/a def test_missing_fd_transfer(self):
2531n/a # Check that exception is raised when received data is not
2532n/a # accompanied by a file descriptor in ancillary data.
2533n/a if self.TYPE != 'processes':
2534n/a self.skipTest("only makes sense with processes")
2535n/a conn, child_conn = self.Pipe(duplex=True)
2536n/a
2537n/a p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2538n/a p.daemon = True
2539n/a p.start()
2540n/a self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2541n/a p.join()
2542n/a
2543n/a def test_context(self):
2544n/a a, b = self.Pipe()
2545n/a
2546n/a with a, b:
2547n/a a.send(1729)
2548n/a self.assertEqual(b.recv(), 1729)
2549n/a if self.TYPE == 'processes':
2550n/a self.assertFalse(a.closed)
2551n/a self.assertFalse(b.closed)
2552n/a
2553n/a if self.TYPE == 'processes':
2554n/a self.assertTrue(a.closed)
2555n/a self.assertTrue(b.closed)
2556n/a self.assertRaises(OSError, a.recv)
2557n/a self.assertRaises(OSError, b.recv)
2558n/a
2559n/aclass _TestListener(BaseTestCase):
2560n/a
2561n/a ALLOWED_TYPES = ('processes',)
2562n/a
2563n/a def test_multiple_bind(self):
2564n/a for family in self.connection.families:
2565n/a l = self.connection.Listener(family=family)
2566n/a self.addCleanup(l.close)
2567n/a self.assertRaises(OSError, self.connection.Listener,
2568n/a l.address, family)
2569n/a
2570n/a def test_context(self):
2571n/a with self.connection.Listener() as l:
2572n/a with self.connection.Client(l.address) as c:
2573n/a with l.accept() as d:
2574n/a c.send(1729)
2575n/a self.assertEqual(d.recv(), 1729)
2576n/a
2577n/a if self.TYPE == 'processes':
2578n/a self.assertRaises(OSError, l.accept)
2579n/a
2580n/aclass _TestListenerClient(BaseTestCase):
2581n/a
2582n/a ALLOWED_TYPES = ('processes', 'threads')
2583n/a
2584n/a @classmethod
2585n/a def _test(cls, address):
2586n/a conn = cls.connection.Client(address)
2587n/a conn.send('hello')
2588n/a conn.close()
2589n/a
2590n/a def test_listener_client(self):
2591n/a for family in self.connection.families:
2592n/a l = self.connection.Listener(family=family)
2593n/a p = self.Process(target=self._test, args=(l.address,))
2594n/a p.daemon = True
2595n/a p.start()
2596n/a conn = l.accept()
2597n/a self.assertEqual(conn.recv(), 'hello')
2598n/a p.join()
2599n/a l.close()
2600n/a
2601n/a def test_issue14725(self):
2602n/a l = self.connection.Listener()
2603n/a p = self.Process(target=self._test, args=(l.address,))
2604n/a p.daemon = True
2605n/a p.start()
2606n/a time.sleep(1)
2607n/a # On Windows the client process should by now have connected,
2608n/a # written data and closed the pipe handle by now. This causes
2609n/a # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2610n/a # 14725.
2611n/a conn = l.accept()
2612n/a self.assertEqual(conn.recv(), 'hello')
2613n/a conn.close()
2614n/a p.join()
2615n/a l.close()
2616n/a
2617n/a def test_issue16955(self):
2618n/a for fam in self.connection.families:
2619n/a l = self.connection.Listener(family=fam)
2620n/a c = self.connection.Client(l.address)
2621n/a a = l.accept()
2622n/a a.send_bytes(b"hello")
2623n/a self.assertTrue(c.poll(1))
2624n/a a.close()
2625n/a c.close()
2626n/a l.close()
2627n/a
2628n/aclass _TestPoll(BaseTestCase):
2629n/a
2630n/a ALLOWED_TYPES = ('processes', 'threads')
2631n/a
2632n/a def test_empty_string(self):
2633n/a a, b = self.Pipe()
2634n/a self.assertEqual(a.poll(), False)
2635n/a b.send_bytes(b'')
2636n/a self.assertEqual(a.poll(), True)
2637n/a self.assertEqual(a.poll(), True)
2638n/a
2639n/a @classmethod
2640n/a def _child_strings(cls, conn, strings):
2641n/a for s in strings:
2642n/a time.sleep(0.1)
2643n/a conn.send_bytes(s)
2644n/a conn.close()
2645n/a
2646n/a def test_strings(self):
2647n/a strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2648n/a a, b = self.Pipe()
2649n/a p = self.Process(target=self._child_strings, args=(b, strings))
2650n/a p.start()
2651n/a
2652n/a for s in strings:
2653n/a for i in range(200):
2654n/a if a.poll(0.01):
2655n/a break
2656n/a x = a.recv_bytes()
2657n/a self.assertEqual(s, x)
2658n/a
2659n/a p.join()
2660n/a
2661n/a @classmethod
2662n/a def _child_boundaries(cls, r):
2663n/a # Polling may "pull" a message in to the child process, but we
2664n/a # don't want it to pull only part of a message, as that would
2665n/a # corrupt the pipe for any other processes which might later
2666n/a # read from it.
2667n/a r.poll(5)
2668n/a
2669n/a def test_boundaries(self):
2670n/a r, w = self.Pipe(False)
2671n/a p = self.Process(target=self._child_boundaries, args=(r,))
2672n/a p.start()
2673n/a time.sleep(2)
2674n/a L = [b"first", b"second"]
2675n/a for obj in L:
2676n/a w.send_bytes(obj)
2677n/a w.close()
2678n/a p.join()
2679n/a self.assertIn(r.recv_bytes(), L)
2680n/a
2681n/a @classmethod
2682n/a def _child_dont_merge(cls, b):
2683n/a b.send_bytes(b'a')
2684n/a b.send_bytes(b'b')
2685n/a b.send_bytes(b'cd')
2686n/a
2687n/a def test_dont_merge(self):
2688n/a a, b = self.Pipe()
2689n/a self.assertEqual(a.poll(0.0), False)
2690n/a self.assertEqual(a.poll(0.1), False)
2691n/a
2692n/a p = self.Process(target=self._child_dont_merge, args=(b,))
2693n/a p.start()
2694n/a
2695n/a self.assertEqual(a.recv_bytes(), b'a')
2696n/a self.assertEqual(a.poll(1.0), True)
2697n/a self.assertEqual(a.poll(1.0), True)
2698n/a self.assertEqual(a.recv_bytes(), b'b')
2699n/a self.assertEqual(a.poll(1.0), True)
2700n/a self.assertEqual(a.poll(1.0), True)
2701n/a self.assertEqual(a.poll(0.0), True)
2702n/a self.assertEqual(a.recv_bytes(), b'cd')
2703n/a
2704n/a p.join()
2705n/a
2706n/a#
2707n/a# Test of sending connection and socket objects between processes
2708n/a#
2709n/a
2710n/a@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2711n/aclass _TestPicklingConnections(BaseTestCase):
2712n/a
2713n/a ALLOWED_TYPES = ('processes',)
2714n/a
2715n/a @classmethod
2716n/a def tearDownClass(cls):
2717n/a from multiprocessing import resource_sharer
2718n/a resource_sharer.stop(timeout=5)
2719n/a
2720n/a @classmethod
2721n/a def _listener(cls, conn, families):
2722n/a for fam in families:
2723n/a l = cls.connection.Listener(family=fam)
2724n/a conn.send(l.address)
2725n/a new_conn = l.accept()
2726n/a conn.send(new_conn)
2727n/a new_conn.close()
2728n/a l.close()
2729n/a
2730n/a l = socket.socket()
2731n/a l.bind((test.support.HOST, 0))
2732n/a l.listen()
2733n/a conn.send(l.getsockname())
2734n/a new_conn, addr = l.accept()
2735n/a conn.send(new_conn)
2736n/a new_conn.close()
2737n/a l.close()
2738n/a
2739n/a conn.recv()
2740n/a
2741n/a @classmethod
2742n/a def _remote(cls, conn):
2743n/a for (address, msg) in iter(conn.recv, None):
2744n/a client = cls.connection.Client(address)
2745n/a client.send(msg.upper())
2746n/a client.close()
2747n/a
2748n/a address, msg = conn.recv()
2749n/a client = socket.socket()
2750n/a client.connect(address)
2751n/a client.sendall(msg.upper())
2752n/a client.close()
2753n/a
2754n/a conn.close()
2755n/a
2756n/a def test_pickling(self):
2757n/a families = self.connection.families
2758n/a
2759n/a lconn, lconn0 = self.Pipe()
2760n/a lp = self.Process(target=self._listener, args=(lconn0, families))
2761n/a lp.daemon = True
2762n/a lp.start()
2763n/a lconn0.close()
2764n/a
2765n/a rconn, rconn0 = self.Pipe()
2766n/a rp = self.Process(target=self._remote, args=(rconn0,))
2767n/a rp.daemon = True
2768n/a rp.start()
2769n/a rconn0.close()
2770n/a
2771n/a for fam in families:
2772n/a msg = ('This connection uses family %s' % fam).encode('ascii')
2773n/a address = lconn.recv()
2774n/a rconn.send((address, msg))
2775n/a new_conn = lconn.recv()
2776n/a self.assertEqual(new_conn.recv(), msg.upper())
2777n/a
2778n/a rconn.send(None)
2779n/a
2780n/a msg = latin('This connection uses a normal socket')
2781n/a address = lconn.recv()
2782n/a rconn.send((address, msg))
2783n/a new_conn = lconn.recv()
2784n/a buf = []
2785n/a while True:
2786n/a s = new_conn.recv(100)
2787n/a if not s:
2788n/a break
2789n/a buf.append(s)
2790n/a buf = b''.join(buf)
2791n/a self.assertEqual(buf, msg.upper())
2792n/a new_conn.close()
2793n/a
2794n/a lconn.send(None)
2795n/a
2796n/a rconn.close()
2797n/a lconn.close()
2798n/a
2799n/a lp.join()
2800n/a rp.join()
2801n/a
2802n/a @classmethod
2803n/a def child_access(cls, conn):
2804n/a w = conn.recv()
2805n/a w.send('all is well')
2806n/a w.close()
2807n/a
2808n/a r = conn.recv()
2809n/a msg = r.recv()
2810n/a conn.send(msg*2)
2811n/a
2812n/a conn.close()
2813n/a
2814n/a def test_access(self):
2815n/a # On Windows, if we do not specify a destination pid when
2816n/a # using DupHandle then we need to be careful to use the
2817n/a # correct access flags for DuplicateHandle(), or else
2818n/a # DupHandle.detach() will raise PermissionError. For example,
2819n/a # for a read only pipe handle we should use
2820n/a # access=FILE_GENERIC_READ. (Unfortunately
2821n/a # DUPLICATE_SAME_ACCESS does not work.)
2822n/a conn, child_conn = self.Pipe()
2823n/a p = self.Process(target=self.child_access, args=(child_conn,))
2824n/a p.daemon = True
2825n/a p.start()
2826n/a child_conn.close()
2827n/a
2828n/a r, w = self.Pipe(duplex=False)
2829n/a conn.send(w)
2830n/a w.close()
2831n/a self.assertEqual(r.recv(), 'all is well')
2832n/a r.close()
2833n/a
2834n/a r, w = self.Pipe(duplex=False)
2835n/a conn.send(r)
2836n/a r.close()
2837n/a w.send('foobar')
2838n/a w.close()
2839n/a self.assertEqual(conn.recv(), 'foobar'*2)
2840n/a
2841n/a#
2842n/a#
2843n/a#
2844n/a
2845n/aclass _TestHeap(BaseTestCase):
2846n/a
2847n/a ALLOWED_TYPES = ('processes',)
2848n/a
2849n/a def test_heap(self):
2850n/a iterations = 5000
2851n/a maxblocks = 50
2852n/a blocks = []
2853n/a
2854n/a # create and destroy lots of blocks of different sizes
2855n/a for i in range(iterations):
2856n/a size = int(random.lognormvariate(0, 1) * 1000)
2857n/a b = multiprocessing.heap.BufferWrapper(size)
2858n/a blocks.append(b)
2859n/a if len(blocks) > maxblocks:
2860n/a i = random.randrange(maxblocks)
2861n/a del blocks[i]
2862n/a
2863n/a # get the heap object
2864n/a heap = multiprocessing.heap.BufferWrapper._heap
2865n/a
2866n/a # verify the state of the heap
2867n/a all = []
2868n/a occupied = 0
2869n/a heap._lock.acquire()
2870n/a self.addCleanup(heap._lock.release)
2871n/a for L in list(heap._len_to_seq.values()):
2872n/a for arena, start, stop in L:
2873n/a all.append((heap._arenas.index(arena), start, stop,
2874n/a stop-start, 'free'))
2875n/a for arena, start, stop in heap._allocated_blocks:
2876n/a all.append((heap._arenas.index(arena), start, stop,
2877n/a stop-start, 'occupied'))
2878n/a occupied += (stop-start)
2879n/a
2880n/a all.sort()
2881n/a
2882n/a for i in range(len(all)-1):
2883n/a (arena, start, stop) = all[i][:3]
2884n/a (narena, nstart, nstop) = all[i+1][:3]
2885n/a self.assertTrue((arena != narena and nstart == 0) or
2886n/a (stop == nstart))
2887n/a
2888n/a def test_free_from_gc(self):
2889n/a # Check that freeing of blocks by the garbage collector doesn't deadlock
2890n/a # (issue #12352).
2891n/a # Make sure the GC is enabled, and set lower collection thresholds to
2892n/a # make collections more frequent (and increase the probability of
2893n/a # deadlock).
2894n/a if not gc.isenabled():
2895n/a gc.enable()
2896n/a self.addCleanup(gc.disable)
2897n/a thresholds = gc.get_threshold()
2898n/a self.addCleanup(gc.set_threshold, *thresholds)
2899n/a gc.set_threshold(10)
2900n/a
2901n/a # perform numerous block allocations, with cyclic references to make
2902n/a # sure objects are collected asynchronously by the gc
2903n/a for i in range(5000):
2904n/a a = multiprocessing.heap.BufferWrapper(1)
2905n/a b = multiprocessing.heap.BufferWrapper(1)
2906n/a # circular references
2907n/a a.buddy = b
2908n/a b.buddy = a
2909n/a
2910n/a#
2911n/a#
2912n/a#
2913n/a
2914n/aclass _Foo(Structure):
2915n/a _fields_ = [
2916n/a ('x', c_int),
2917n/a ('y', c_double)
2918n/a ]
2919n/a
2920n/aclass _TestSharedCTypes(BaseTestCase):
2921n/a
2922n/a ALLOWED_TYPES = ('processes',)
2923n/a
2924n/a def setUp(self):
2925n/a if not HAS_SHAREDCTYPES:
2926n/a self.skipTest("requires multiprocessing.sharedctypes")
2927n/a
2928n/a @classmethod
2929n/a def _double(cls, x, y, foo, arr, string):
2930n/a x.value *= 2
2931n/a y.value *= 2
2932n/a foo.x *= 2
2933n/a foo.y *= 2
2934n/a string.value *= 2
2935n/a for i in range(len(arr)):
2936n/a arr[i] *= 2
2937n/a
2938n/a def test_sharedctypes(self, lock=False):
2939n/a x = Value('i', 7, lock=lock)
2940n/a y = Value(c_double, 1.0/3.0, lock=lock)
2941n/a foo = Value(_Foo, 3, 2, lock=lock)
2942n/a arr = self.Array('d', list(range(10)), lock=lock)
2943n/a string = self.Array('c', 20, lock=lock)
2944n/a string.value = latin('hello')
2945n/a
2946n/a p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2947n/a p.daemon = True
2948n/a p.start()
2949n/a p.join()
2950n/a
2951n/a self.assertEqual(x.value, 14)
2952n/a self.assertAlmostEqual(y.value, 2.0/3.0)
2953n/a self.assertEqual(foo.x, 6)
2954n/a self.assertAlmostEqual(foo.y, 4.0)
2955n/a for i in range(10):
2956n/a self.assertAlmostEqual(arr[i], i*2)
2957n/a self.assertEqual(string.value, latin('hellohello'))
2958n/a
2959n/a def test_synchronize(self):
2960n/a self.test_sharedctypes(lock=True)
2961n/a
2962n/a def test_copy(self):
2963n/a foo = _Foo(2, 5.0)
2964n/a bar = copy(foo)
2965n/a foo.x = 0
2966n/a foo.y = 0
2967n/a self.assertEqual(bar.x, 2)
2968n/a self.assertAlmostEqual(bar.y, 5.0)
2969n/a
2970n/a#
2971n/a#
2972n/a#
2973n/a
2974n/aclass _TestFinalize(BaseTestCase):
2975n/a
2976n/a ALLOWED_TYPES = ('processes',)
2977n/a
2978n/a @classmethod
2979n/a def _test_finalize(cls, conn):
2980n/a class Foo(object):
2981n/a pass
2982n/a
2983n/a a = Foo()
2984n/a util.Finalize(a, conn.send, args=('a',))
2985n/a del a # triggers callback for a
2986n/a
2987n/a b = Foo()
2988n/a close_b = util.Finalize(b, conn.send, args=('b',))
2989n/a close_b() # triggers callback for b
2990n/a close_b() # does nothing because callback has already been called
2991n/a del b # does nothing because callback has already been called
2992n/a
2993n/a c = Foo()
2994n/a util.Finalize(c, conn.send, args=('c',))
2995n/a
2996n/a d10 = Foo()
2997n/a util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2998n/a
2999n/a d01 = Foo()
3000n/a util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3001n/a d02 = Foo()
3002n/a util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3003n/a d03 = Foo()
3004n/a util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3005n/a
3006n/a util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3007n/a
3008n/a util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3009n/a
3010n/a # call multiprocessing's cleanup function then exit process without
3011n/a # garbage collecting locals
3012n/a util._exit_function()
3013n/a conn.close()
3014n/a os._exit(0)
3015n/a
3016n/a def test_finalize(self):
3017n/a conn, child_conn = self.Pipe()
3018n/a
3019n/a p = self.Process(target=self._test_finalize, args=(child_conn,))
3020n/a p.daemon = True
3021n/a p.start()
3022n/a p.join()
3023n/a
3024n/a result = [obj for obj in iter(conn.recv, 'STOP')]
3025n/a self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3026n/a
3027n/a#
3028n/a# Test that from ... import * works for each module
3029n/a#
3030n/a
3031n/aclass _TestImportStar(unittest.TestCase):
3032n/a
3033n/a def get_module_names(self):
3034n/a import glob
3035n/a folder = os.path.dirname(multiprocessing.__file__)
3036n/a pattern = os.path.join(folder, '*.py')
3037n/a files = glob.glob(pattern)
3038n/a modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3039n/a modules = ['multiprocessing.' + m for m in modules]
3040n/a modules.remove('multiprocessing.__init__')
3041n/a modules.append('multiprocessing')
3042n/a return modules
3043n/a
3044n/a def test_import(self):
3045n/a modules = self.get_module_names()
3046n/a if sys.platform == 'win32':
3047n/a modules.remove('multiprocessing.popen_fork')
3048n/a modules.remove('multiprocessing.popen_forkserver')
3049n/a modules.remove('multiprocessing.popen_spawn_posix')
3050n/a else:
3051n/a modules.remove('multiprocessing.popen_spawn_win32')
3052n/a if not HAS_REDUCTION:
3053n/a modules.remove('multiprocessing.popen_forkserver')
3054n/a
3055n/a if c_int is None:
3056n/a # This module requires _ctypes
3057n/a modules.remove('multiprocessing.sharedctypes')
3058n/a
3059n/a for name in modules:
3060n/a __import__(name)
3061n/a mod = sys.modules[name]
3062n/a self.assertTrue(hasattr(mod, '__all__'), name)
3063n/a
3064n/a for attr in mod.__all__:
3065n/a self.assertTrue(
3066n/a hasattr(mod, attr),
3067n/a '%r does not have attribute %r' % (mod, attr)
3068n/a )
3069n/a
3070n/a#
3071n/a# Quick test that logging works -- does not test logging output
3072n/a#
3073n/a
3074n/aclass _TestLogging(BaseTestCase):
3075n/a
3076n/a ALLOWED_TYPES = ('processes',)
3077n/a
3078n/a def test_enable_logging(self):
3079n/a logger = multiprocessing.get_logger()
3080n/a logger.setLevel(util.SUBWARNING)
3081n/a self.assertTrue(logger is not None)
3082n/a logger.debug('this will not be printed')
3083n/a logger.info('nor will this')
3084n/a logger.setLevel(LOG_LEVEL)
3085n/a
3086n/a @classmethod
3087n/a def _test_level(cls, conn):
3088n/a logger = multiprocessing.get_logger()
3089n/a conn.send(logger.getEffectiveLevel())
3090n/a
3091n/a def test_level(self):
3092n/a LEVEL1 = 32
3093n/a LEVEL2 = 37
3094n/a
3095n/a logger = multiprocessing.get_logger()
3096n/a root_logger = logging.getLogger()
3097n/a root_level = root_logger.level
3098n/a
3099n/a reader, writer = multiprocessing.Pipe(duplex=False)
3100n/a
3101n/a logger.setLevel(LEVEL1)
3102n/a p = self.Process(target=self._test_level, args=(writer,))
3103n/a p.daemon = True
3104n/a p.start()
3105n/a self.assertEqual(LEVEL1, reader.recv())
3106n/a
3107n/a logger.setLevel(logging.NOTSET)
3108n/a root_logger.setLevel(LEVEL2)
3109n/a p = self.Process(target=self._test_level, args=(writer,))
3110n/a p.daemon = True
3111n/a p.start()
3112n/a self.assertEqual(LEVEL2, reader.recv())
3113n/a
3114n/a root_logger.setLevel(root_level)
3115n/a logger.setLevel(level=LOG_LEVEL)
3116n/a
3117n/a
3118n/a# class _TestLoggingProcessName(BaseTestCase):
3119n/a#
3120n/a# def handle(self, record):
3121n/a# assert record.processName == multiprocessing.current_process().name
3122n/a# self.__handled = True
3123n/a#
3124n/a# def test_logging(self):
3125n/a# handler = logging.Handler()
3126n/a# handler.handle = self.handle
3127n/a# self.__handled = False
3128n/a# # Bypass getLogger() and side-effects
3129n/a# logger = logging.getLoggerClass()(
3130n/a# 'multiprocessing.test.TestLoggingProcessName')
3131n/a# logger.addHandler(handler)
3132n/a# logger.propagate = False
3133n/a#
3134n/a# logger.warn('foo')
3135n/a# assert self.__handled
3136n/a
3137n/a#
3138n/a# Check that Process.join() retries if os.waitpid() fails with EINTR
3139n/a#
3140n/a
3141n/aclass _TestPollEintr(BaseTestCase):
3142n/a
3143n/a ALLOWED_TYPES = ('processes',)
3144n/a
3145n/a @classmethod
3146n/a def _killer(cls, pid):
3147n/a time.sleep(0.1)
3148n/a os.kill(pid, signal.SIGUSR1)
3149n/a
3150n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3151n/a def test_poll_eintr(self):
3152n/a got_signal = [False]
3153n/a def record(*args):
3154n/a got_signal[0] = True
3155n/a pid = os.getpid()
3156n/a oldhandler = signal.signal(signal.SIGUSR1, record)
3157n/a try:
3158n/a killer = self.Process(target=self._killer, args=(pid,))
3159n/a killer.start()
3160n/a try:
3161n/a p = self.Process(target=time.sleep, args=(2,))
3162n/a p.start()
3163n/a p.join()
3164n/a finally:
3165n/a killer.join()
3166n/a self.assertTrue(got_signal[0])
3167n/a self.assertEqual(p.exitcode, 0)
3168n/a finally:
3169n/a signal.signal(signal.SIGUSR1, oldhandler)
3170n/a
3171n/a#
3172n/a# Test to verify handle verification, see issue 3321
3173n/a#
3174n/a
3175n/aclass TestInvalidHandle(unittest.TestCase):
3176n/a
3177n/a @unittest.skipIf(WIN32, "skipped on Windows")
3178n/a def test_invalid_handles(self):
3179n/a conn = multiprocessing.connection.Connection(44977608)
3180n/a # check that poll() doesn't crash
3181n/a try:
3182n/a conn.poll()
3183n/a except (ValueError, OSError):
3184n/a pass
3185n/a finally:
3186n/a # Hack private attribute _handle to avoid printing an error
3187n/a # in conn.__del__
3188n/a conn._handle = None
3189n/a self.assertRaises((ValueError, OSError),
3190n/a multiprocessing.connection.Connection, -1)
3191n/a
3192n/a
3193n/a
3194n/aclass OtherTest(unittest.TestCase):
3195n/a # TODO: add more tests for deliver/answer challenge.
3196n/a def test_deliver_challenge_auth_failure(self):
3197n/a class _FakeConnection(object):
3198n/a def recv_bytes(self, size):
3199n/a return b'something bogus'
3200n/a def send_bytes(self, data):
3201n/a pass
3202n/a self.assertRaises(multiprocessing.AuthenticationError,
3203n/a multiprocessing.connection.deliver_challenge,
3204n/a _FakeConnection(), b'abc')
3205n/a
3206n/a def test_answer_challenge_auth_failure(self):
3207n/a class _FakeConnection(object):
3208n/a def __init__(self):
3209n/a self.count = 0
3210n/a def recv_bytes(self, size):
3211n/a self.count += 1
3212n/a if self.count == 1:
3213n/a return multiprocessing.connection.CHALLENGE
3214n/a elif self.count == 2:
3215n/a return b'something bogus'
3216n/a return b''
3217n/a def send_bytes(self, data):
3218n/a pass
3219n/a self.assertRaises(multiprocessing.AuthenticationError,
3220n/a multiprocessing.connection.answer_challenge,
3221n/a _FakeConnection(), b'abc')
3222n/a
3223n/a#
3224n/a# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3225n/a#
3226n/a
3227n/adef initializer(ns):
3228n/a ns.test += 1
3229n/a
3230n/aclass TestInitializers(unittest.TestCase):
3231n/a def setUp(self):
3232n/a self.mgr = multiprocessing.Manager()
3233n/a self.ns = self.mgr.Namespace()
3234n/a self.ns.test = 0
3235n/a
3236n/a def tearDown(self):
3237n/a self.mgr.shutdown()
3238n/a self.mgr.join()
3239n/a
3240n/a def test_manager_initializer(self):
3241n/a m = multiprocessing.managers.SyncManager()
3242n/a self.assertRaises(TypeError, m.start, 1)
3243n/a m.start(initializer, (self.ns,))
3244n/a self.assertEqual(self.ns.test, 1)
3245n/a m.shutdown()
3246n/a m.join()
3247n/a
3248n/a def test_pool_initializer(self):
3249n/a self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3250n/a p = multiprocessing.Pool(1, initializer, (self.ns,))
3251n/a p.close()
3252n/a p.join()
3253n/a self.assertEqual(self.ns.test, 1)
3254n/a
3255n/a#
3256n/a# Issue 5155, 5313, 5331: Test process in processes
3257n/a# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3258n/a#
3259n/a
3260n/adef _this_sub_process(q):
3261n/a try:
3262n/a item = q.get(block=False)
3263n/a except pyqueue.Empty:
3264n/a pass
3265n/a
3266n/adef _test_process(q):
3267n/a queue = multiprocessing.Queue()
3268n/a subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3269n/a subProc.daemon = True
3270n/a subProc.start()
3271n/a subProc.join()
3272n/a
3273n/adef _afunc(x):
3274n/a return x*x
3275n/a
3276n/adef pool_in_process():
3277n/a pool = multiprocessing.Pool(processes=4)
3278n/a x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3279n/a pool.close()
3280n/a pool.join()
3281n/a
3282n/aclass _file_like(object):
3283n/a def __init__(self, delegate):
3284n/a self._delegate = delegate
3285n/a self._pid = None
3286n/a
3287n/a @property
3288n/a def cache(self):
3289n/a pid = os.getpid()
3290n/a # There are no race conditions since fork keeps only the running thread
3291n/a if pid != self._pid:
3292n/a self._pid = pid
3293n/a self._cache = []
3294n/a return self._cache
3295n/a
3296n/a def write(self, data):
3297n/a self.cache.append(data)
3298n/a
3299n/a def flush(self):
3300n/a self._delegate.write(''.join(self.cache))
3301n/a self._cache = []
3302n/a
3303n/aclass TestStdinBadfiledescriptor(unittest.TestCase):
3304n/a
3305n/a def test_queue_in_process(self):
3306n/a queue = multiprocessing.Queue()
3307n/a proc = multiprocessing.Process(target=_test_process, args=(queue,))
3308n/a proc.start()
3309n/a proc.join()
3310n/a
3311n/a def test_pool_in_process(self):
3312n/a p = multiprocessing.Process(target=pool_in_process)
3313n/a p.start()
3314n/a p.join()
3315n/a
3316n/a def test_flushing(self):
3317n/a sio = io.StringIO()
3318n/a flike = _file_like(sio)
3319n/a flike.write('foo')
3320n/a proc = multiprocessing.Process(target=lambda: flike.flush())
3321n/a flike.flush()
3322n/a assert sio.getvalue() == 'foo'
3323n/a
3324n/a
3325n/aclass TestWait(unittest.TestCase):
3326n/a
3327n/a @classmethod
3328n/a def _child_test_wait(cls, w, slow):
3329n/a for i in range(10):
3330n/a if slow:
3331n/a time.sleep(random.random()*0.1)
3332n/a w.send((i, os.getpid()))
3333n/a w.close()
3334n/a
3335n/a def test_wait(self, slow=False):
3336n/a from multiprocessing.connection import wait
3337n/a readers = []
3338n/a procs = []
3339n/a messages = []
3340n/a
3341n/a for i in range(4):
3342n/a r, w = multiprocessing.Pipe(duplex=False)
3343n/a p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3344n/a p.daemon = True
3345n/a p.start()
3346n/a w.close()
3347n/a readers.append(r)
3348n/a procs.append(p)
3349n/a self.addCleanup(p.join)
3350n/a
3351n/a while readers:
3352n/a for r in wait(readers):
3353n/a try:
3354n/a msg = r.recv()
3355n/a except EOFError:
3356n/a readers.remove(r)
3357n/a r.close()
3358n/a else:
3359n/a messages.append(msg)
3360n/a
3361n/a messages.sort()
3362n/a expected = sorted((i, p.pid) for i in range(10) for p in procs)
3363n/a self.assertEqual(messages, expected)
3364n/a
3365n/a @classmethod
3366n/a def _child_test_wait_socket(cls, address, slow):
3367n/a s = socket.socket()
3368n/a s.connect(address)
3369n/a for i in range(10):
3370n/a if slow:
3371n/a time.sleep(random.random()*0.1)
3372n/a s.sendall(('%s\n' % i).encode('ascii'))
3373n/a s.close()
3374n/a
3375n/a def test_wait_socket(self, slow=False):
3376n/a from multiprocessing.connection import wait
3377n/a l = socket.socket()
3378n/a l.bind((test.support.HOST, 0))
3379n/a l.listen()
3380n/a addr = l.getsockname()
3381n/a readers = []
3382n/a procs = []
3383n/a dic = {}
3384n/a
3385n/a for i in range(4):
3386n/a p = multiprocessing.Process(target=self._child_test_wait_socket,
3387n/a args=(addr, slow))
3388n/a p.daemon = True
3389n/a p.start()
3390n/a procs.append(p)
3391n/a self.addCleanup(p.join)
3392n/a
3393n/a for i in range(4):
3394n/a r, _ = l.accept()
3395n/a readers.append(r)
3396n/a dic[r] = []
3397n/a l.close()
3398n/a
3399n/a while readers:
3400n/a for r in wait(readers):
3401n/a msg = r.recv(32)
3402n/a if not msg:
3403n/a readers.remove(r)
3404n/a r.close()
3405n/a else:
3406n/a dic[r].append(msg)
3407n/a
3408n/a expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3409n/a for v in dic.values():
3410n/a self.assertEqual(b''.join(v), expected)
3411n/a
3412n/a def test_wait_slow(self):
3413n/a self.test_wait(True)
3414n/a
3415n/a def test_wait_socket_slow(self):
3416n/a self.test_wait_socket(True)
3417n/a
3418n/a def test_wait_timeout(self):
3419n/a from multiprocessing.connection import wait
3420n/a
3421n/a expected = 5
3422n/a a, b = multiprocessing.Pipe()
3423n/a
3424n/a start = time.time()
3425n/a res = wait([a, b], expected)
3426n/a delta = time.time() - start
3427n/a
3428n/a self.assertEqual(res, [])
3429n/a self.assertLess(delta, expected * 2)
3430n/a self.assertGreater(delta, expected * 0.5)
3431n/a
3432n/a b.send(None)
3433n/a
3434n/a start = time.time()
3435n/a res = wait([a, b], 20)
3436n/a delta = time.time() - start
3437n/a
3438n/a self.assertEqual(res, [a])
3439n/a self.assertLess(delta, 0.4)
3440n/a
3441n/a @classmethod
3442n/a def signal_and_sleep(cls, sem, period):
3443n/a sem.release()
3444n/a time.sleep(period)
3445n/a
3446n/a def test_wait_integer(self):
3447n/a from multiprocessing.connection import wait
3448n/a
3449n/a expected = 3
3450n/a sorted_ = lambda l: sorted(l, key=lambda x: id(x))
3451n/a sem = multiprocessing.Semaphore(0)
3452n/a a, b = multiprocessing.Pipe()
3453n/a p = multiprocessing.Process(target=self.signal_and_sleep,
3454n/a args=(sem, expected))
3455n/a
3456n/a p.start()
3457n/a self.assertIsInstance(p.sentinel, int)
3458n/a self.assertTrue(sem.acquire(timeout=20))
3459n/a
3460n/a start = time.time()
3461n/a res = wait([a, p.sentinel, b], expected + 20)
3462n/a delta = time.time() - start
3463n/a
3464n/a self.assertEqual(res, [p.sentinel])
3465n/a self.assertLess(delta, expected + 2)
3466n/a self.assertGreater(delta, expected - 2)
3467n/a
3468n/a a.send(None)
3469n/a
3470n/a start = time.time()
3471n/a res = wait([a, p.sentinel, b], 20)
3472n/a delta = time.time() - start
3473n/a
3474n/a self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
3475n/a self.assertLess(delta, 0.4)
3476n/a
3477n/a b.send(None)
3478n/a
3479n/a start = time.time()
3480n/a res = wait([a, p.sentinel, b], 20)
3481n/a delta = time.time() - start
3482n/a
3483n/a self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
3484n/a self.assertLess(delta, 0.4)
3485n/a
3486n/a p.terminate()
3487n/a p.join()
3488n/a
3489n/a def test_neg_timeout(self):
3490n/a from multiprocessing.connection import wait
3491n/a a, b = multiprocessing.Pipe()
3492n/a t = time.time()
3493n/a res = wait([a], timeout=-1)
3494n/a t = time.time() - t
3495n/a self.assertEqual(res, [])
3496n/a self.assertLess(t, 1)
3497n/a a.close()
3498n/a b.close()
3499n/a
3500n/a#
3501n/a# Issue 14151: Test invalid family on invalid environment
3502n/a#
3503n/a
3504n/aclass TestInvalidFamily(unittest.TestCase):
3505n/a
3506n/a @unittest.skipIf(WIN32, "skipped on Windows")
3507n/a def test_invalid_family(self):
3508n/a with self.assertRaises(ValueError):
3509n/a multiprocessing.connection.Listener(r'\\.\test')
3510n/a
3511n/a @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3512n/a def test_invalid_family_win32(self):
3513n/a with self.assertRaises(ValueError):
3514n/a multiprocessing.connection.Listener('/var/test.pipe')
3515n/a
3516n/a#
3517n/a# Issue 12098: check sys.flags of child matches that for parent
3518n/a#
3519n/a
3520n/aclass TestFlags(unittest.TestCase):
3521n/a @classmethod
3522n/a def run_in_grandchild(cls, conn):
3523n/a conn.send(tuple(sys.flags))
3524n/a
3525n/a @classmethod
3526n/a def run_in_child(cls):
3527n/a import json
3528n/a r, w = multiprocessing.Pipe(duplex=False)
3529n/a p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3530n/a p.start()
3531n/a grandchild_flags = r.recv()
3532n/a p.join()
3533n/a r.close()
3534n/a w.close()
3535n/a flags = (tuple(sys.flags), grandchild_flags)
3536n/a print(json.dumps(flags))
3537n/a
3538n/a def test_flags(self):
3539n/a import json, subprocess
3540n/a # start child process using unusual flags
3541n/a prog = ('from test._test_multiprocessing import TestFlags; ' +
3542n/a 'TestFlags.run_in_child()')
3543n/a data = subprocess.check_output(
3544n/a [sys.executable, '-E', '-S', '-O', '-c', prog])
3545n/a child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3546n/a self.assertEqual(child_flags, grandchild_flags)
3547n/a
3548n/a#
3549n/a# Test interaction with socket timeouts - see Issue #6056
3550n/a#
3551n/a
3552n/aclass TestTimeouts(unittest.TestCase):
3553n/a @classmethod
3554n/a def _test_timeout(cls, child, address):
3555n/a time.sleep(1)
3556n/a child.send(123)
3557n/a child.close()
3558n/a conn = multiprocessing.connection.Client(address)
3559n/a conn.send(456)
3560n/a conn.close()
3561n/a
3562n/a def test_timeout(self):
3563n/a old_timeout = socket.getdefaulttimeout()
3564n/a try:
3565n/a socket.setdefaulttimeout(0.1)
3566n/a parent, child = multiprocessing.Pipe(duplex=True)
3567n/a l = multiprocessing.connection.Listener(family='AF_INET')
3568n/a p = multiprocessing.Process(target=self._test_timeout,
3569n/a args=(child, l.address))
3570n/a p.start()
3571n/a child.close()
3572n/a self.assertEqual(parent.recv(), 123)
3573n/a parent.close()
3574n/a conn = l.accept()
3575n/a self.assertEqual(conn.recv(), 456)
3576n/a conn.close()
3577n/a l.close()
3578n/a p.join(10)
3579n/a finally:
3580n/a socket.setdefaulttimeout(old_timeout)
3581n/a
3582n/a#
3583n/a# Test what happens with no "if __name__ == '__main__'"
3584n/a#
3585n/a
3586n/aclass TestNoForkBomb(unittest.TestCase):
3587n/a def test_noforkbomb(self):
3588n/a sm = multiprocessing.get_start_method()
3589n/a name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3590n/a if sm != 'fork':
3591n/a rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
3592n/a self.assertEqual(out, b'')
3593n/a self.assertIn(b'RuntimeError', err)
3594n/a else:
3595n/a rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
3596n/a self.assertEqual(out.rstrip(), b'123')
3597n/a self.assertEqual(err, b'')
3598n/a
3599n/a#
3600n/a# Issue #17555: ForkAwareThreadLock
3601n/a#
3602n/a
3603n/aclass TestForkAwareThreadLock(unittest.TestCase):
3604n/a # We recurisvely start processes. Issue #17555 meant that the
3605n/a # after fork registry would get duplicate entries for the same
3606n/a # lock. The size of the registry at generation n was ~2**n.
3607n/a
3608n/a @classmethod
3609n/a def child(cls, n, conn):
3610n/a if n > 1:
3611n/a p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3612n/a p.start()
3613n/a conn.close()
3614n/a p.join(timeout=5)
3615n/a else:
3616n/a conn.send(len(util._afterfork_registry))
3617n/a conn.close()
3618n/a
3619n/a def test_lock(self):
3620n/a r, w = multiprocessing.Pipe(False)
3621n/a l = util.ForkAwareThreadLock()
3622n/a old_size = len(util._afterfork_registry)
3623n/a p = multiprocessing.Process(target=self.child, args=(5, w))
3624n/a p.start()
3625n/a w.close()
3626n/a new_size = r.recv()
3627n/a p.join(timeout=5)
3628n/a self.assertLessEqual(new_size, old_size)
3629n/a
3630n/a#
3631n/a# Check that non-forked child processes do not inherit unneeded fds/handles
3632n/a#
3633n/a
3634n/aclass TestCloseFds(unittest.TestCase):
3635n/a
3636n/a def get_high_socket_fd(self):
3637n/a if WIN32:
3638n/a # The child process will not have any socket handles, so
3639n/a # calling socket.fromfd() should produce WSAENOTSOCK even
3640n/a # if there is a handle of the same number.
3641n/a return socket.socket().detach()
3642n/a else:
3643n/a # We want to produce a socket with an fd high enough that a
3644n/a # freshly created child process will not have any fds as high.
3645n/a fd = socket.socket().detach()
3646n/a to_close = []
3647n/a while fd < 50:
3648n/a to_close.append(fd)
3649n/a fd = os.dup(fd)
3650n/a for x in to_close:
3651n/a os.close(x)
3652n/a return fd
3653n/a
3654n/a def close(self, fd):
3655n/a if WIN32:
3656n/a socket.socket(fileno=fd).close()
3657n/a else:
3658n/a os.close(fd)
3659n/a
3660n/a @classmethod
3661n/a def _test_closefds(cls, conn, fd):
3662n/a try:
3663n/a s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3664n/a except Exception as e:
3665n/a conn.send(e)
3666n/a else:
3667n/a s.close()
3668n/a conn.send(None)
3669n/a
3670n/a def test_closefd(self):
3671n/a if not HAS_REDUCTION:
3672n/a raise unittest.SkipTest('requires fd pickling')
3673n/a
3674n/a reader, writer = multiprocessing.Pipe()
3675n/a fd = self.get_high_socket_fd()
3676n/a try:
3677n/a p = multiprocessing.Process(target=self._test_closefds,
3678n/a args=(writer, fd))
3679n/a p.start()
3680n/a writer.close()
3681n/a e = reader.recv()
3682n/a p.join(timeout=5)
3683n/a finally:
3684n/a self.close(fd)
3685n/a writer.close()
3686n/a reader.close()
3687n/a
3688n/a if multiprocessing.get_start_method() == 'fork':
3689n/a self.assertIs(e, None)
3690n/a else:
3691n/a WSAENOTSOCK = 10038
3692n/a self.assertIsInstance(e, OSError)
3693n/a self.assertTrue(e.errno == errno.EBADF or
3694n/a e.winerror == WSAENOTSOCK, e)
3695n/a
3696n/a#
3697n/a# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3698n/a#
3699n/a
3700n/aclass TestIgnoreEINTR(unittest.TestCase):
3701n/a
3702n/a @classmethod
3703n/a def _test_ignore(cls, conn):
3704n/a def handler(signum, frame):
3705n/a pass
3706n/a signal.signal(signal.SIGUSR1, handler)
3707n/a conn.send('ready')
3708n/a x = conn.recv()
3709n/a conn.send(x)
3710n/a conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3711n/a
3712n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3713n/a def test_ignore(self):
3714n/a conn, child_conn = multiprocessing.Pipe()
3715n/a try:
3716n/a p = multiprocessing.Process(target=self._test_ignore,
3717n/a args=(child_conn,))
3718n/a p.daemon = True
3719n/a p.start()
3720n/a child_conn.close()
3721n/a self.assertEqual(conn.recv(), 'ready')
3722n/a time.sleep(0.1)
3723n/a os.kill(p.pid, signal.SIGUSR1)
3724n/a time.sleep(0.1)
3725n/a conn.send(1234)
3726n/a self.assertEqual(conn.recv(), 1234)
3727n/a time.sleep(0.1)
3728n/a os.kill(p.pid, signal.SIGUSR1)
3729n/a self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3730n/a time.sleep(0.1)
3731n/a p.join()
3732n/a finally:
3733n/a conn.close()
3734n/a
3735n/a @classmethod
3736n/a def _test_ignore_listener(cls, conn):
3737n/a def handler(signum, frame):
3738n/a pass
3739n/a signal.signal(signal.SIGUSR1, handler)
3740n/a with multiprocessing.connection.Listener() as l:
3741n/a conn.send(l.address)
3742n/a a = l.accept()
3743n/a a.send('welcome')
3744n/a
3745n/a @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3746n/a def test_ignore_listener(self):
3747n/a conn, child_conn = multiprocessing.Pipe()
3748n/a try:
3749n/a p = multiprocessing.Process(target=self._test_ignore_listener,
3750n/a args=(child_conn,))
3751n/a p.daemon = True
3752n/a p.start()
3753n/a child_conn.close()
3754n/a address = conn.recv()
3755n/a time.sleep(0.1)
3756n/a os.kill(p.pid, signal.SIGUSR1)
3757n/a time.sleep(0.1)
3758n/a client = multiprocessing.connection.Client(address)
3759n/a self.assertEqual(client.recv(), 'welcome')
3760n/a p.join()
3761n/a finally:
3762n/a conn.close()
3763n/a
3764n/aclass TestStartMethod(unittest.TestCase):
3765n/a @classmethod
3766n/a def _check_context(cls, conn):
3767n/a conn.send(multiprocessing.get_start_method())
3768n/a
3769n/a def check_context(self, ctx):
3770n/a r, w = ctx.Pipe(duplex=False)
3771n/a p = ctx.Process(target=self._check_context, args=(w,))
3772n/a p.start()
3773n/a w.close()
3774n/a child_method = r.recv()
3775n/a r.close()
3776n/a p.join()
3777n/a self.assertEqual(child_method, ctx.get_start_method())
3778n/a
3779n/a def test_context(self):
3780n/a for method in ('fork', 'spawn', 'forkserver'):
3781n/a try:
3782n/a ctx = multiprocessing.get_context(method)
3783n/a except ValueError:
3784n/a continue
3785n/a self.assertEqual(ctx.get_start_method(), method)
3786n/a self.assertIs(ctx.get_context(), ctx)
3787n/a self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3788n/a self.assertRaises(ValueError, ctx.set_start_method, None)
3789n/a self.check_context(ctx)
3790n/a
3791n/a def test_set_get(self):
3792n/a multiprocessing.set_forkserver_preload(PRELOAD)
3793n/a count = 0
3794n/a old_method = multiprocessing.get_start_method()
3795n/a try:
3796n/a for method in ('fork', 'spawn', 'forkserver'):
3797n/a try:
3798n/a multiprocessing.set_start_method(method, force=True)
3799n/a except ValueError:
3800n/a continue
3801n/a self.assertEqual(multiprocessing.get_start_method(), method)
3802n/a ctx = multiprocessing.get_context()
3803n/a self.assertEqual(ctx.get_start_method(), method)
3804n/a self.assertTrue(type(ctx).__name__.lower().startswith(method))
3805n/a self.assertTrue(
3806n/a ctx.Process.__name__.lower().startswith(method))
3807n/a self.check_context(multiprocessing)
3808n/a count += 1
3809n/a finally:
3810n/a multiprocessing.set_start_method(old_method, force=True)
3811n/a self.assertGreaterEqual(count, 1)
3812n/a
3813n/a def test_get_all(self):
3814n/a methods = multiprocessing.get_all_start_methods()
3815n/a if sys.platform == 'win32':
3816n/a self.assertEqual(methods, ['spawn'])
3817n/a else:
3818n/a self.assertTrue(methods == ['fork', 'spawn'] or
3819n/a methods == ['fork', 'spawn', 'forkserver'])
3820n/a
3821n/a def test_preload_resources(self):
3822n/a if multiprocessing.get_start_method() != 'forkserver':
3823n/a self.skipTest("test only relevant for 'forkserver' method")
3824n/a name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
3825n/a rc, out, err = test.support.script_helper.assert_python_ok(name)
3826n/a out = out.decode()
3827n/a err = err.decode()
3828n/a if out.rstrip() != 'ok' or err != '':
3829n/a print(out)
3830n/a print(err)
3831n/a self.fail("failed spawning forkserver or grandchild")
3832n/a
3833n/a
3834n/a#
3835n/a# Check that killing process does not leak named semaphores
3836n/a#
3837n/a
3838n/a@unittest.skipIf(sys.platform == "win32",
3839n/a "test semantics don't make sense on Windows")
3840n/aclass TestSemaphoreTracker(unittest.TestCase):
3841n/a def test_semaphore_tracker(self):
3842n/a import subprocess
3843n/a cmd = '''if 1:
3844n/a import multiprocessing as mp, time, os
3845n/a mp.set_start_method("spawn")
3846n/a lock1 = mp.Lock()
3847n/a lock2 = mp.Lock()
3848n/a os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3849n/a os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3850n/a time.sleep(10)
3851n/a '''
3852n/a r, w = os.pipe()
3853n/a p = subprocess.Popen([sys.executable,
3854n/a '-c', cmd % (w, w)],
3855n/a pass_fds=[w],
3856n/a stderr=subprocess.PIPE)
3857n/a os.close(w)
3858n/a with open(r, 'rb', closefd=True) as f:
3859n/a name1 = f.readline().rstrip().decode('ascii')
3860n/a name2 = f.readline().rstrip().decode('ascii')
3861n/a _multiprocessing.sem_unlink(name1)
3862n/a p.terminate()
3863n/a p.wait()
3864n/a time.sleep(2.0)
3865n/a with self.assertRaises(OSError) as ctx:
3866n/a _multiprocessing.sem_unlink(name2)
3867n/a # docs say it should be ENOENT, but OSX seems to give EINVAL
3868n/a self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
3869n/a err = p.stderr.read().decode('utf-8')
3870n/a p.stderr.close()
3871n/a expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3872n/a self.assertRegex(err, expected)
3873n/a self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
3874n/a
3875n/a#
3876n/a# Mixins
3877n/a#
3878n/a
3879n/aclass ProcessesMixin(object):
3880n/a TYPE = 'processes'
3881n/a Process = multiprocessing.Process
3882n/a connection = multiprocessing.connection
3883n/a current_process = staticmethod(multiprocessing.current_process)
3884n/a active_children = staticmethod(multiprocessing.active_children)
3885n/a Pool = staticmethod(multiprocessing.Pool)
3886n/a Pipe = staticmethod(multiprocessing.Pipe)
3887n/a Queue = staticmethod(multiprocessing.Queue)
3888n/a JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3889n/a Lock = staticmethod(multiprocessing.Lock)
3890n/a RLock = staticmethod(multiprocessing.RLock)
3891n/a Semaphore = staticmethod(multiprocessing.Semaphore)
3892n/a BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3893n/a Condition = staticmethod(multiprocessing.Condition)
3894n/a Event = staticmethod(multiprocessing.Event)
3895n/a Barrier = staticmethod(multiprocessing.Barrier)
3896n/a Value = staticmethod(multiprocessing.Value)
3897n/a Array = staticmethod(multiprocessing.Array)
3898n/a RawValue = staticmethod(multiprocessing.RawValue)
3899n/a RawArray = staticmethod(multiprocessing.RawArray)
3900n/a
3901n/a
3902n/aclass ManagerMixin(object):
3903n/a TYPE = 'manager'
3904n/a Process = multiprocessing.Process
3905n/a Queue = property(operator.attrgetter('manager.Queue'))
3906n/a JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3907n/a Lock = property(operator.attrgetter('manager.Lock'))
3908n/a RLock = property(operator.attrgetter('manager.RLock'))
3909n/a Semaphore = property(operator.attrgetter('manager.Semaphore'))
3910n/a BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3911n/a Condition = property(operator.attrgetter('manager.Condition'))
3912n/a Event = property(operator.attrgetter('manager.Event'))
3913n/a Barrier = property(operator.attrgetter('manager.Barrier'))
3914n/a Value = property(operator.attrgetter('manager.Value'))
3915n/a Array = property(operator.attrgetter('manager.Array'))
3916n/a list = property(operator.attrgetter('manager.list'))
3917n/a dict = property(operator.attrgetter('manager.dict'))
3918n/a Namespace = property(operator.attrgetter('manager.Namespace'))
3919n/a
3920n/a @classmethod
3921n/a def Pool(cls, *args, **kwds):
3922n/a return cls.manager.Pool(*args, **kwds)
3923n/a
3924n/a @classmethod
3925n/a def setUpClass(cls):
3926n/a cls.manager = multiprocessing.Manager()
3927n/a
3928n/a @classmethod
3929n/a def tearDownClass(cls):
3930n/a # only the manager process should be returned by active_children()
3931n/a # but this can take a bit on slow machines, so wait a few seconds
3932n/a # if there are other children too (see #17395)
3933n/a t = 0.01
3934n/a while len(multiprocessing.active_children()) > 1 and t < 5:
3935n/a time.sleep(t)
3936n/a t *= 2
3937n/a gc.collect() # do garbage collection
3938n/a if cls.manager._number_of_objects() != 0:
3939n/a # This is not really an error since some tests do not
3940n/a # ensure that all processes which hold a reference to a
3941n/a # managed object have been joined.
3942n/a print('Shared objects which still exist at manager shutdown:')
3943n/a print(cls.manager._debug_info())
3944n/a cls.manager.shutdown()
3945n/a cls.manager.join()
3946n/a cls.manager = None
3947n/a
3948n/a
3949n/aclass ThreadsMixin(object):
3950n/a TYPE = 'threads'
3951n/a Process = multiprocessing.dummy.Process
3952n/a connection = multiprocessing.dummy.connection
3953n/a current_process = staticmethod(multiprocessing.dummy.current_process)
3954n/a active_children = staticmethod(multiprocessing.dummy.active_children)
3955n/a Pool = staticmethod(multiprocessing.dummy.Pool)
3956n/a Pipe = staticmethod(multiprocessing.dummy.Pipe)
3957n/a Queue = staticmethod(multiprocessing.dummy.Queue)
3958n/a JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3959n/a Lock = staticmethod(multiprocessing.dummy.Lock)
3960n/a RLock = staticmethod(multiprocessing.dummy.RLock)
3961n/a Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3962n/a BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3963n/a Condition = staticmethod(multiprocessing.dummy.Condition)
3964n/a Event = staticmethod(multiprocessing.dummy.Event)
3965n/a Barrier = staticmethod(multiprocessing.dummy.Barrier)
3966n/a Value = staticmethod(multiprocessing.dummy.Value)
3967n/a Array = staticmethod(multiprocessing.dummy.Array)
3968n/a
3969n/a#
3970n/a# Functions used to create test cases from the base ones in this module
3971n/a#
3972n/a
3973n/adef install_tests_in_module_dict(remote_globs, start_method):
3974n/a __module__ = remote_globs['__name__']
3975n/a local_globs = globals()
3976n/a ALL_TYPES = {'processes', 'threads', 'manager'}
3977n/a
3978n/a for name, base in local_globs.items():
3979n/a if not isinstance(base, type):
3980n/a continue
3981n/a if issubclass(base, BaseTestCase):
3982n/a if base is BaseTestCase:
3983n/a continue
3984n/a assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3985n/a for type_ in base.ALLOWED_TYPES:
3986n/a newname = 'With' + type_.capitalize() + name[1:]
3987n/a Mixin = local_globs[type_.capitalize() + 'Mixin']
3988n/a class Temp(base, Mixin, unittest.TestCase):
3989n/a pass
3990n/a Temp.__name__ = Temp.__qualname__ = newname
3991n/a Temp.__module__ = __module__
3992n/a remote_globs[newname] = Temp
3993n/a elif issubclass(base, unittest.TestCase):
3994n/a class Temp(base, object):
3995n/a pass
3996n/a Temp.__name__ = Temp.__qualname__ = name
3997n/a Temp.__module__ = __module__
3998n/a remote_globs[name] = Temp
3999n/a
4000n/a dangling = [None, None]
4001n/a old_start_method = [None]
4002n/a
4003n/a def setUpModule():
4004n/a multiprocessing.set_forkserver_preload(PRELOAD)
4005n/a multiprocessing.process._cleanup()
4006n/a dangling[0] = multiprocessing.process._dangling.copy()
4007n/a dangling[1] = threading._dangling.copy()
4008n/a old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
4009n/a try:
4010n/a multiprocessing.set_start_method(start_method, force=True)
4011n/a except ValueError:
4012n/a raise unittest.SkipTest(start_method +
4013n/a ' start method not supported')
4014n/a
4015n/a if sys.platform.startswith("linux"):
4016n/a try:
4017n/a lock = multiprocessing.RLock()
4018n/a except OSError:
4019n/a raise unittest.SkipTest("OSError raises on RLock creation, "
4020n/a "see issue 3111!")
4021n/a check_enough_semaphores()
4022n/a util.get_temp_dir() # creates temp directory
4023n/a multiprocessing.get_logger().setLevel(LOG_LEVEL)
4024n/a
4025n/a def tearDownModule():
4026n/a multiprocessing.set_start_method(old_start_method[0], force=True)
4027n/a # pause a bit so we don't get warning about dangling threads/processes
4028n/a time.sleep(0.5)
4029n/a multiprocessing.process._cleanup()
4030n/a gc.collect()
4031n/a tmp = set(multiprocessing.process._dangling) - set(dangling[0])
4032n/a if tmp:
4033n/a print('Dangling processes:', tmp, file=sys.stderr)
4034n/a del tmp
4035n/a tmp = set(threading._dangling) - set(dangling[1])
4036n/a if tmp:
4037n/a print('Dangling threads:', tmp, file=sys.stderr)
4038n/a
4039n/a remote_globs['setUpModule'] = setUpModule
4040n/a remote_globs['tearDownModule'] = tearDownModule