ยปCore Development>Code coverage>Lib/test/test_concurrent_futures.py

Python code coverage for Lib/test/test_concurrent_futures.py

#countcontent
1n/aimport test.support
2n/a
3n/a# Skip tests if _multiprocessing wasn't built.
4n/atest.support.import_module('_multiprocessing')
5n/a# Skip tests if sem_open implementation is broken.
6n/atest.support.import_module('multiprocessing.synchronize')
7n/a# import threading after _multiprocessing to raise a more relevant error
8n/a# message: "No module named _multiprocessing". _multiprocessing is not compiled
9n/a# without thread support.
10n/atest.support.import_module('threading')
11n/a
12n/afrom test.support.script_helper import assert_python_ok
13n/a
14n/aimport os
15n/aimport sys
16n/aimport threading
17n/aimport time
18n/aimport unittest
19n/aimport weakref
20n/a
21n/afrom concurrent import futures
22n/afrom concurrent.futures._base import (
23n/a PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
24n/afrom concurrent.futures.process import BrokenProcessPool
25n/a
26n/a
27n/adef create_future(state=PENDING, exception=None, result=None):
28n/a f = Future()
29n/a f._state = state
30n/a f._exception = exception
31n/a f._result = result
32n/a return f
33n/a
34n/a
35n/aPENDING_FUTURE = create_future(state=PENDING)
36n/aRUNNING_FUTURE = create_future(state=RUNNING)
37n/aCANCELLED_FUTURE = create_future(state=CANCELLED)
38n/aCANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
39n/aEXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
40n/aSUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
41n/a
42n/a
43n/adef mul(x, y):
44n/a return x * y
45n/a
46n/a
47n/adef sleep_and_raise(t):
48n/a time.sleep(t)
49n/a raise Exception('this is an exception')
50n/a
51n/adef sleep_and_print(t, msg):
52n/a time.sleep(t)
53n/a print(msg)
54n/a sys.stdout.flush()
55n/a
56n/a
57n/aclass MyObject(object):
58n/a def my_method(self):
59n/a pass
60n/a
61n/a
62n/aclass ExecutorMixin:
63n/a worker_count = 5
64n/a
65n/a def setUp(self):
66n/a self.t1 = time.time()
67n/a try:
68n/a self.executor = self.executor_type(max_workers=self.worker_count)
69n/a except NotImplementedError as e:
70n/a self.skipTest(str(e))
71n/a self._prime_executor()
72n/a
73n/a def tearDown(self):
74n/a self.executor.shutdown(wait=True)
75n/a dt = time.time() - self.t1
76n/a if test.support.verbose:
77n/a print("%.2fs" % dt, end=' ')
78n/a self.assertLess(dt, 60, "synchronization issue: test lasted too long")
79n/a
80n/a def _prime_executor(self):
81n/a # Make sure that the executor is ready to do work before running the
82n/a # tests. This should reduce the probability of timeouts in the tests.
83n/a futures = [self.executor.submit(time.sleep, 0.1)
84n/a for _ in range(self.worker_count)]
85n/a
86n/a for f in futures:
87n/a f.result()
88n/a
89n/a
90n/aclass ThreadPoolMixin(ExecutorMixin):
91n/a executor_type = futures.ThreadPoolExecutor
92n/a
93n/a
94n/aclass ProcessPoolMixin(ExecutorMixin):
95n/a executor_type = futures.ProcessPoolExecutor
96n/a
97n/a
98n/aclass ExecutorShutdownTest:
99n/a def test_run_after_shutdown(self):
100n/a self.executor.shutdown()
101n/a self.assertRaises(RuntimeError,
102n/a self.executor.submit,
103n/a pow, 2, 5)
104n/a
105n/a def test_interpreter_shutdown(self):
106n/a # Test the atexit hook for shutdown of worker threads and processes
107n/a rc, out, err = assert_python_ok('-c', """if 1:
108n/a from concurrent.futures import {executor_type}
109n/a from time import sleep
110n/a from test.test_concurrent_futures import sleep_and_print
111n/a t = {executor_type}(5)
112n/a t.submit(sleep_and_print, 1.0, "apple")
113n/a """.format(executor_type=self.executor_type.__name__))
114n/a # Errors in atexit hooks don't change the process exit code, check
115n/a # stderr manually.
116n/a self.assertFalse(err)
117n/a self.assertEqual(out.strip(), b"apple")
118n/a
119n/a def test_hang_issue12364(self):
120n/a fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
121n/a self.executor.shutdown()
122n/a for f in fs:
123n/a f.result()
124n/a
125n/a
126n/aclass ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
127n/a def _prime_executor(self):
128n/a pass
129n/a
130n/a def test_threads_terminate(self):
131n/a self.executor.submit(mul, 21, 2)
132n/a self.executor.submit(mul, 6, 7)
133n/a self.executor.submit(mul, 3, 14)
134n/a self.assertEqual(len(self.executor._threads), 3)
135n/a self.executor.shutdown()
136n/a for t in self.executor._threads:
137n/a t.join()
138n/a
139n/a def test_context_manager_shutdown(self):
140n/a with futures.ThreadPoolExecutor(max_workers=5) as e:
141n/a executor = e
142n/a self.assertEqual(list(e.map(abs, range(-5, 5))),
143n/a [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
144n/a
145n/a for t in executor._threads:
146n/a t.join()
147n/a
148n/a def test_del_shutdown(self):
149n/a executor = futures.ThreadPoolExecutor(max_workers=5)
150n/a executor.map(abs, range(-5, 5))
151n/a threads = executor._threads
152n/a del executor
153n/a
154n/a for t in threads:
155n/a t.join()
156n/a
157n/a def test_thread_names_assigned(self):
158n/a executor = futures.ThreadPoolExecutor(
159n/a max_workers=5, thread_name_prefix='SpecialPool')
160n/a executor.map(abs, range(-5, 5))
161n/a threads = executor._threads
162n/a del executor
163n/a
164n/a for t in threads:
165n/a self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
166n/a t.join()
167n/a
168n/a def test_thread_names_default(self):
169n/a executor = futures.ThreadPoolExecutor(max_workers=5)
170n/a executor.map(abs, range(-5, 5))
171n/a threads = executor._threads
172n/a del executor
173n/a
174n/a for t in threads:
175n/a # We don't particularly care what the default name is, just that
176n/a # it has a default name implying that it is a ThreadPoolExecutor
177n/a # followed by what looks like a thread number.
178n/a self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
179n/a t.join()
180n/a
181n/a
182n/aclass ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
183n/a def _prime_executor(self):
184n/a pass
185n/a
186n/a def test_processes_terminate(self):
187n/a self.executor.submit(mul, 21, 2)
188n/a self.executor.submit(mul, 6, 7)
189n/a self.executor.submit(mul, 3, 14)
190n/a self.assertEqual(len(self.executor._processes), 5)
191n/a processes = self.executor._processes
192n/a self.executor.shutdown()
193n/a
194n/a for p in processes.values():
195n/a p.join()
196n/a
197n/a def test_context_manager_shutdown(self):
198n/a with futures.ProcessPoolExecutor(max_workers=5) as e:
199n/a processes = e._processes
200n/a self.assertEqual(list(e.map(abs, range(-5, 5))),
201n/a [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
202n/a
203n/a for p in processes.values():
204n/a p.join()
205n/a
206n/a def test_del_shutdown(self):
207n/a executor = futures.ProcessPoolExecutor(max_workers=5)
208n/a list(executor.map(abs, range(-5, 5)))
209n/a queue_management_thread = executor._queue_management_thread
210n/a processes = executor._processes
211n/a del executor
212n/a
213n/a queue_management_thread.join()
214n/a for p in processes.values():
215n/a p.join()
216n/a
217n/a
218n/aclass WaitTests:
219n/a
220n/a def test_first_completed(self):
221n/a future1 = self.executor.submit(mul, 21, 2)
222n/a future2 = self.executor.submit(time.sleep, 1.5)
223n/a
224n/a done, not_done = futures.wait(
225n/a [CANCELLED_FUTURE, future1, future2],
226n/a return_when=futures.FIRST_COMPLETED)
227n/a
228n/a self.assertEqual(set([future1]), done)
229n/a self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
230n/a
231n/a def test_first_completed_some_already_completed(self):
232n/a future1 = self.executor.submit(time.sleep, 1.5)
233n/a
234n/a finished, pending = futures.wait(
235n/a [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
236n/a return_when=futures.FIRST_COMPLETED)
237n/a
238n/a self.assertEqual(
239n/a set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
240n/a finished)
241n/a self.assertEqual(set([future1]), pending)
242n/a
243n/a def test_first_exception(self):
244n/a future1 = self.executor.submit(mul, 2, 21)
245n/a future2 = self.executor.submit(sleep_and_raise, 1.5)
246n/a future3 = self.executor.submit(time.sleep, 3)
247n/a
248n/a finished, pending = futures.wait(
249n/a [future1, future2, future3],
250n/a return_when=futures.FIRST_EXCEPTION)
251n/a
252n/a self.assertEqual(set([future1, future2]), finished)
253n/a self.assertEqual(set([future3]), pending)
254n/a
255n/a def test_first_exception_some_already_complete(self):
256n/a future1 = self.executor.submit(divmod, 21, 0)
257n/a future2 = self.executor.submit(time.sleep, 1.5)
258n/a
259n/a finished, pending = futures.wait(
260n/a [SUCCESSFUL_FUTURE,
261n/a CANCELLED_FUTURE,
262n/a CANCELLED_AND_NOTIFIED_FUTURE,
263n/a future1, future2],
264n/a return_when=futures.FIRST_EXCEPTION)
265n/a
266n/a self.assertEqual(set([SUCCESSFUL_FUTURE,
267n/a CANCELLED_AND_NOTIFIED_FUTURE,
268n/a future1]), finished)
269n/a self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
270n/a
271n/a def test_first_exception_one_already_failed(self):
272n/a future1 = self.executor.submit(time.sleep, 2)
273n/a
274n/a finished, pending = futures.wait(
275n/a [EXCEPTION_FUTURE, future1],
276n/a return_when=futures.FIRST_EXCEPTION)
277n/a
278n/a self.assertEqual(set([EXCEPTION_FUTURE]), finished)
279n/a self.assertEqual(set([future1]), pending)
280n/a
281n/a def test_all_completed(self):
282n/a future1 = self.executor.submit(divmod, 2, 0)
283n/a future2 = self.executor.submit(mul, 2, 21)
284n/a
285n/a finished, pending = futures.wait(
286n/a [SUCCESSFUL_FUTURE,
287n/a CANCELLED_AND_NOTIFIED_FUTURE,
288n/a EXCEPTION_FUTURE,
289n/a future1,
290n/a future2],
291n/a return_when=futures.ALL_COMPLETED)
292n/a
293n/a self.assertEqual(set([SUCCESSFUL_FUTURE,
294n/a CANCELLED_AND_NOTIFIED_FUTURE,
295n/a EXCEPTION_FUTURE,
296n/a future1,
297n/a future2]), finished)
298n/a self.assertEqual(set(), pending)
299n/a
300n/a def test_timeout(self):
301n/a future1 = self.executor.submit(mul, 6, 7)
302n/a future2 = self.executor.submit(time.sleep, 6)
303n/a
304n/a finished, pending = futures.wait(
305n/a [CANCELLED_AND_NOTIFIED_FUTURE,
306n/a EXCEPTION_FUTURE,
307n/a SUCCESSFUL_FUTURE,
308n/a future1, future2],
309n/a timeout=5,
310n/a return_when=futures.ALL_COMPLETED)
311n/a
312n/a self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
313n/a EXCEPTION_FUTURE,
314n/a SUCCESSFUL_FUTURE,
315n/a future1]), finished)
316n/a self.assertEqual(set([future2]), pending)
317n/a
318n/a
319n/aclass ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
320n/a
321n/a def test_pending_calls_race(self):
322n/a # Issue #14406: multi-threaded race condition when waiting on all
323n/a # futures.
324n/a event = threading.Event()
325n/a def future_func():
326n/a event.wait()
327n/a oldswitchinterval = sys.getswitchinterval()
328n/a sys.setswitchinterval(1e-6)
329n/a try:
330n/a fs = {self.executor.submit(future_func) for i in range(100)}
331n/a event.set()
332n/a futures.wait(fs, return_when=futures.ALL_COMPLETED)
333n/a finally:
334n/a sys.setswitchinterval(oldswitchinterval)
335n/a
336n/a
337n/aclass ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
338n/a pass
339n/a
340n/a
341n/aclass AsCompletedTests:
342n/a # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
343n/a def test_no_timeout(self):
344n/a future1 = self.executor.submit(mul, 2, 21)
345n/a future2 = self.executor.submit(mul, 7, 6)
346n/a
347n/a completed = set(futures.as_completed(
348n/a [CANCELLED_AND_NOTIFIED_FUTURE,
349n/a EXCEPTION_FUTURE,
350n/a SUCCESSFUL_FUTURE,
351n/a future1, future2]))
352n/a self.assertEqual(set(
353n/a [CANCELLED_AND_NOTIFIED_FUTURE,
354n/a EXCEPTION_FUTURE,
355n/a SUCCESSFUL_FUTURE,
356n/a future1, future2]),
357n/a completed)
358n/a
359n/a def test_zero_timeout(self):
360n/a future1 = self.executor.submit(time.sleep, 2)
361n/a completed_futures = set()
362n/a try:
363n/a for future in futures.as_completed(
364n/a [CANCELLED_AND_NOTIFIED_FUTURE,
365n/a EXCEPTION_FUTURE,
366n/a SUCCESSFUL_FUTURE,
367n/a future1],
368n/a timeout=0):
369n/a completed_futures.add(future)
370n/a except futures.TimeoutError:
371n/a pass
372n/a
373n/a self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
374n/a EXCEPTION_FUTURE,
375n/a SUCCESSFUL_FUTURE]),
376n/a completed_futures)
377n/a
378n/a def test_duplicate_futures(self):
379n/a # Issue 20367. Duplicate futures should not raise exceptions or give
380n/a # duplicate responses.
381n/a future1 = self.executor.submit(time.sleep, 2)
382n/a completed = [f for f in futures.as_completed([future1,future1])]
383n/a self.assertEqual(len(completed), 1)
384n/a
385n/a
386n/aclass ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
387n/a pass
388n/a
389n/a
390n/aclass ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
391n/a pass
392n/a
393n/a
394n/aclass ExecutorTest:
395n/a # Executor.shutdown() and context manager usage is tested by
396n/a # ExecutorShutdownTest.
397n/a def test_submit(self):
398n/a future = self.executor.submit(pow, 2, 8)
399n/a self.assertEqual(256, future.result())
400n/a
401n/a def test_submit_keyword(self):
402n/a future = self.executor.submit(mul, 2, y=8)
403n/a self.assertEqual(16, future.result())
404n/a
405n/a def test_map(self):
406n/a self.assertEqual(
407n/a list(self.executor.map(pow, range(10), range(10))),
408n/a list(map(pow, range(10), range(10))))
409n/a
410n/a def test_map_exception(self):
411n/a i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
412n/a self.assertEqual(i.__next__(), (0, 1))
413n/a self.assertEqual(i.__next__(), (0, 1))
414n/a self.assertRaises(ZeroDivisionError, i.__next__)
415n/a
416n/a def test_map_timeout(self):
417n/a results = []
418n/a try:
419n/a for i in self.executor.map(time.sleep,
420n/a [0, 0, 6],
421n/a timeout=5):
422n/a results.append(i)
423n/a except futures.TimeoutError:
424n/a pass
425n/a else:
426n/a self.fail('expected TimeoutError')
427n/a
428n/a self.assertEqual([None, None], results)
429n/a
430n/a def test_shutdown_race_issue12456(self):
431n/a # Issue #12456: race condition at shutdown where trying to post a
432n/a # sentinel in the call queue blocks (the queue is full while processes
433n/a # have exited).
434n/a self.executor.map(str, [2] * (self.worker_count + 1))
435n/a self.executor.shutdown()
436n/a
437n/a @test.support.cpython_only
438n/a def test_no_stale_references(self):
439n/a # Issue #16284: check that the executors don't unnecessarily hang onto
440n/a # references.
441n/a my_object = MyObject()
442n/a my_object_collected = threading.Event()
443n/a my_object_callback = weakref.ref(
444n/a my_object, lambda obj: my_object_collected.set())
445n/a # Deliberately discarding the future.
446n/a self.executor.submit(my_object.my_method)
447n/a del my_object
448n/a
449n/a collected = my_object_collected.wait(timeout=5.0)
450n/a self.assertTrue(collected,
451n/a "Stale reference not collected within timeout.")
452n/a
453n/a def test_max_workers_negative(self):
454n/a for number in (0, -1):
455n/a with self.assertRaisesRegex(ValueError,
456n/a "max_workers must be greater "
457n/a "than 0"):
458n/a self.executor_type(max_workers=number)
459n/a
460n/a
461n/aclass ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
462n/a def test_map_submits_without_iteration(self):
463n/a """Tests verifying issue 11777."""
464n/a finished = []
465n/a def record_finished(n):
466n/a finished.append(n)
467n/a
468n/a self.executor.map(record_finished, range(10))
469n/a self.executor.shutdown(wait=True)
470n/a self.assertCountEqual(finished, range(10))
471n/a
472n/a def test_default_workers(self):
473n/a executor = self.executor_type()
474n/a self.assertEqual(executor._max_workers,
475n/a (os.cpu_count() or 1) * 5)
476n/a
477n/a
478n/aclass ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
479n/a def test_killed_child(self):
480n/a # When a child process is abruptly terminated, the whole pool gets
481n/a # "broken".
482n/a futures = [self.executor.submit(time.sleep, 3)]
483n/a # Get one of the processes, and terminate (kill) it
484n/a p = next(iter(self.executor._processes.values()))
485n/a p.terminate()
486n/a for fut in futures:
487n/a self.assertRaises(BrokenProcessPool, fut.result)
488n/a # Submitting other jobs fails as well.
489n/a self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
490n/a
491n/a def test_map_chunksize(self):
492n/a def bad_map():
493n/a list(self.executor.map(pow, range(40), range(40), chunksize=-1))
494n/a
495n/a ref = list(map(pow, range(40), range(40)))
496n/a self.assertEqual(
497n/a list(self.executor.map(pow, range(40), range(40), chunksize=6)),
498n/a ref)
499n/a self.assertEqual(
500n/a list(self.executor.map(pow, range(40), range(40), chunksize=50)),
501n/a ref)
502n/a self.assertEqual(
503n/a list(self.executor.map(pow, range(40), range(40), chunksize=40)),
504n/a ref)
505n/a self.assertRaises(ValueError, bad_map)
506n/a
507n/a @classmethod
508n/a def _test_traceback(cls):
509n/a raise RuntimeError(123) # some comment
510n/a
511n/a def test_traceback(self):
512n/a # We want ensure that the traceback from the child process is
513n/a # contained in the traceback raised in the main process.
514n/a future = self.executor.submit(self._test_traceback)
515n/a with self.assertRaises(Exception) as cm:
516n/a future.result()
517n/a
518n/a exc = cm.exception
519n/a self.assertIs(type(exc), RuntimeError)
520n/a self.assertEqual(exc.args, (123,))
521n/a cause = exc.__cause__
522n/a self.assertIs(type(cause), futures.process._RemoteTraceback)
523n/a self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
524n/a
525n/a with test.support.captured_stderr() as f1:
526n/a try:
527n/a raise exc
528n/a except RuntimeError:
529n/a sys.excepthook(*sys.exc_info())
530n/a self.assertIn('raise RuntimeError(123) # some comment',
531n/a f1.getvalue())
532n/a
533n/a
534n/aclass FutureTests(unittest.TestCase):
535n/a def test_done_callback_with_result(self):
536n/a callback_result = None
537n/a def fn(callback_future):
538n/a nonlocal callback_result
539n/a callback_result = callback_future.result()
540n/a
541n/a f = Future()
542n/a f.add_done_callback(fn)
543n/a f.set_result(5)
544n/a self.assertEqual(5, callback_result)
545n/a
546n/a def test_done_callback_with_exception(self):
547n/a callback_exception = None
548n/a def fn(callback_future):
549n/a nonlocal callback_exception
550n/a callback_exception = callback_future.exception()
551n/a
552n/a f = Future()
553n/a f.add_done_callback(fn)
554n/a f.set_exception(Exception('test'))
555n/a self.assertEqual(('test',), callback_exception.args)
556n/a
557n/a def test_done_callback_with_cancel(self):
558n/a was_cancelled = None
559n/a def fn(callback_future):
560n/a nonlocal was_cancelled
561n/a was_cancelled = callback_future.cancelled()
562n/a
563n/a f = Future()
564n/a f.add_done_callback(fn)
565n/a self.assertTrue(f.cancel())
566n/a self.assertTrue(was_cancelled)
567n/a
568n/a def test_done_callback_raises(self):
569n/a with test.support.captured_stderr() as stderr:
570n/a raising_was_called = False
571n/a fn_was_called = False
572n/a
573n/a def raising_fn(callback_future):
574n/a nonlocal raising_was_called
575n/a raising_was_called = True
576n/a raise Exception('doh!')
577n/a
578n/a def fn(callback_future):
579n/a nonlocal fn_was_called
580n/a fn_was_called = True
581n/a
582n/a f = Future()
583n/a f.add_done_callback(raising_fn)
584n/a f.add_done_callback(fn)
585n/a f.set_result(5)
586n/a self.assertTrue(raising_was_called)
587n/a self.assertTrue(fn_was_called)
588n/a self.assertIn('Exception: doh!', stderr.getvalue())
589n/a
590n/a def test_done_callback_already_successful(self):
591n/a callback_result = None
592n/a def fn(callback_future):
593n/a nonlocal callback_result
594n/a callback_result = callback_future.result()
595n/a
596n/a f = Future()
597n/a f.set_result(5)
598n/a f.add_done_callback(fn)
599n/a self.assertEqual(5, callback_result)
600n/a
601n/a def test_done_callback_already_failed(self):
602n/a callback_exception = None
603n/a def fn(callback_future):
604n/a nonlocal callback_exception
605n/a callback_exception = callback_future.exception()
606n/a
607n/a f = Future()
608n/a f.set_exception(Exception('test'))
609n/a f.add_done_callback(fn)
610n/a self.assertEqual(('test',), callback_exception.args)
611n/a
612n/a def test_done_callback_already_cancelled(self):
613n/a was_cancelled = None
614n/a def fn(callback_future):
615n/a nonlocal was_cancelled
616n/a was_cancelled = callback_future.cancelled()
617n/a
618n/a f = Future()
619n/a self.assertTrue(f.cancel())
620n/a f.add_done_callback(fn)
621n/a self.assertTrue(was_cancelled)
622n/a
623n/a def test_repr(self):
624n/a self.assertRegex(repr(PENDING_FUTURE),
625n/a '<Future at 0x[0-9a-f]+ state=pending>')
626n/a self.assertRegex(repr(RUNNING_FUTURE),
627n/a '<Future at 0x[0-9a-f]+ state=running>')
628n/a self.assertRegex(repr(CANCELLED_FUTURE),
629n/a '<Future at 0x[0-9a-f]+ state=cancelled>')
630n/a self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
631n/a '<Future at 0x[0-9a-f]+ state=cancelled>')
632n/a self.assertRegex(
633n/a repr(EXCEPTION_FUTURE),
634n/a '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
635n/a self.assertRegex(
636n/a repr(SUCCESSFUL_FUTURE),
637n/a '<Future at 0x[0-9a-f]+ state=finished returned int>')
638n/a
639n/a
640n/a def test_cancel(self):
641n/a f1 = create_future(state=PENDING)
642n/a f2 = create_future(state=RUNNING)
643n/a f3 = create_future(state=CANCELLED)
644n/a f4 = create_future(state=CANCELLED_AND_NOTIFIED)
645n/a f5 = create_future(state=FINISHED, exception=OSError())
646n/a f6 = create_future(state=FINISHED, result=5)
647n/a
648n/a self.assertTrue(f1.cancel())
649n/a self.assertEqual(f1._state, CANCELLED)
650n/a
651n/a self.assertFalse(f2.cancel())
652n/a self.assertEqual(f2._state, RUNNING)
653n/a
654n/a self.assertTrue(f3.cancel())
655n/a self.assertEqual(f3._state, CANCELLED)
656n/a
657n/a self.assertTrue(f4.cancel())
658n/a self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
659n/a
660n/a self.assertFalse(f5.cancel())
661n/a self.assertEqual(f5._state, FINISHED)
662n/a
663n/a self.assertFalse(f6.cancel())
664n/a self.assertEqual(f6._state, FINISHED)
665n/a
666n/a def test_cancelled(self):
667n/a self.assertFalse(PENDING_FUTURE.cancelled())
668n/a self.assertFalse(RUNNING_FUTURE.cancelled())
669n/a self.assertTrue(CANCELLED_FUTURE.cancelled())
670n/a self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
671n/a self.assertFalse(EXCEPTION_FUTURE.cancelled())
672n/a self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
673n/a
674n/a def test_done(self):
675n/a self.assertFalse(PENDING_FUTURE.done())
676n/a self.assertFalse(RUNNING_FUTURE.done())
677n/a self.assertTrue(CANCELLED_FUTURE.done())
678n/a self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
679n/a self.assertTrue(EXCEPTION_FUTURE.done())
680n/a self.assertTrue(SUCCESSFUL_FUTURE.done())
681n/a
682n/a def test_running(self):
683n/a self.assertFalse(PENDING_FUTURE.running())
684n/a self.assertTrue(RUNNING_FUTURE.running())
685n/a self.assertFalse(CANCELLED_FUTURE.running())
686n/a self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
687n/a self.assertFalse(EXCEPTION_FUTURE.running())
688n/a self.assertFalse(SUCCESSFUL_FUTURE.running())
689n/a
690n/a def test_result_with_timeout(self):
691n/a self.assertRaises(futures.TimeoutError,
692n/a PENDING_FUTURE.result, timeout=0)
693n/a self.assertRaises(futures.TimeoutError,
694n/a RUNNING_FUTURE.result, timeout=0)
695n/a self.assertRaises(futures.CancelledError,
696n/a CANCELLED_FUTURE.result, timeout=0)
697n/a self.assertRaises(futures.CancelledError,
698n/a CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
699n/a self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
700n/a self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
701n/a
702n/a def test_result_with_success(self):
703n/a # TODO(brian@sweetapp.com): This test is timing dependent.
704n/a def notification():
705n/a # Wait until the main thread is waiting for the result.
706n/a time.sleep(1)
707n/a f1.set_result(42)
708n/a
709n/a f1 = create_future(state=PENDING)
710n/a t = threading.Thread(target=notification)
711n/a t.start()
712n/a
713n/a self.assertEqual(f1.result(timeout=5), 42)
714n/a
715n/a def test_result_with_cancel(self):
716n/a # TODO(brian@sweetapp.com): This test is timing dependent.
717n/a def notification():
718n/a # Wait until the main thread is waiting for the result.
719n/a time.sleep(1)
720n/a f1.cancel()
721n/a
722n/a f1 = create_future(state=PENDING)
723n/a t = threading.Thread(target=notification)
724n/a t.start()
725n/a
726n/a self.assertRaises(futures.CancelledError, f1.result, timeout=5)
727n/a
728n/a def test_exception_with_timeout(self):
729n/a self.assertRaises(futures.TimeoutError,
730n/a PENDING_FUTURE.exception, timeout=0)
731n/a self.assertRaises(futures.TimeoutError,
732n/a RUNNING_FUTURE.exception, timeout=0)
733n/a self.assertRaises(futures.CancelledError,
734n/a CANCELLED_FUTURE.exception, timeout=0)
735n/a self.assertRaises(futures.CancelledError,
736n/a CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
737n/a self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
738n/a OSError))
739n/a self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
740n/a
741n/a def test_exception_with_success(self):
742n/a def notification():
743n/a # Wait until the main thread is waiting for the exception.
744n/a time.sleep(1)
745n/a with f1._condition:
746n/a f1._state = FINISHED
747n/a f1._exception = OSError()
748n/a f1._condition.notify_all()
749n/a
750n/a f1 = create_future(state=PENDING)
751n/a t = threading.Thread(target=notification)
752n/a t.start()
753n/a
754n/a self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
755n/a
756n/a@test.support.reap_threads
757n/adef test_main():
758n/a try:
759n/a test.support.run_unittest(__name__)
760n/a finally:
761n/a test.support.reap_children()
762n/a
763n/aif __name__ == "__main__":
764n/a test_main()