ยปCore Development>Code coverage>Lib/concurrent/futures/thread.py

Python code coverage for Lib/concurrent/futures/thread.py

#countcontent
1n/a# Copyright 2009 Brian Quinlan. All Rights Reserved.
2n/a# Licensed to PSF under a Contributor Agreement.
3n/a
4n/a"""Implements ThreadPoolExecutor."""
5n/a
6n/a__author__ = 'Brian Quinlan (brian@sweetapp.com)'
7n/a
8n/aimport atexit
9n/afrom concurrent.futures import _base
10n/aimport queue
11n/aimport threading
12n/aimport weakref
13n/aimport os
14n/a
15n/a# Workers are created as daemon threads. This is done to allow the interpreter
16n/a# to exit when there are still idle threads in a ThreadPoolExecutor's thread
17n/a# pool (i.e. shutdown() was not called). However, allowing workers to die with
18n/a# the interpreter has two undesirable properties:
19n/a# - The workers would still be running during interpreter shutdown,
20n/a# meaning that they would fail in unpredictable ways.
21n/a# - The workers could be killed while evaluating a work item, which could
22n/a# be bad if the callable being evaluated has external side-effects e.g.
23n/a# writing to a file.
24n/a#
25n/a# To work around this problem, an exit handler is installed which tells the
26n/a# workers to exit when their work queues are empty and then waits until the
27n/a# threads finish.
28n/a
29n/a_threads_queues = weakref.WeakKeyDictionary()
30n/a_shutdown = False
31n/a
32n/adef _python_exit():
33n/a global _shutdown
34n/a _shutdown = True
35n/a items = list(_threads_queues.items())
36n/a for t, q in items:
37n/a q.put(None)
38n/a for t, q in items:
39n/a t.join()
40n/a
41n/aatexit.register(_python_exit)
42n/a
43n/aclass _WorkItem(object):
44n/a def __init__(self, future, fn, args, kwargs):
45n/a self.future = future
46n/a self.fn = fn
47n/a self.args = args
48n/a self.kwargs = kwargs
49n/a
50n/a def run(self):
51n/a if not self.future.set_running_or_notify_cancel():
52n/a return
53n/a
54n/a try:
55n/a result = self.fn(*self.args, **self.kwargs)
56n/a except BaseException as e:
57n/a self.future.set_exception(e)
58n/a else:
59n/a self.future.set_result(result)
60n/a
61n/adef _worker(executor_reference, work_queue):
62n/a try:
63n/a while True:
64n/a work_item = work_queue.get(block=True)
65n/a if work_item is not None:
66n/a work_item.run()
67n/a # Delete references to object. See issue16284
68n/a del work_item
69n/a continue
70n/a executor = executor_reference()
71n/a # Exit if:
72n/a # - The interpreter is shutting down OR
73n/a # - The executor that owns the worker has been collected OR
74n/a # - The executor that owns the worker has been shutdown.
75n/a if _shutdown or executor is None or executor._shutdown:
76n/a # Notice other workers
77n/a work_queue.put(None)
78n/a return
79n/a del executor
80n/a except BaseException:
81n/a _base.LOGGER.critical('Exception in worker', exc_info=True)
82n/a
83n/aclass ThreadPoolExecutor(_base.Executor):
84n/a def __init__(self, max_workers=None, thread_name_prefix=''):
85n/a """Initializes a new ThreadPoolExecutor instance.
86n/a
87n/a Args:
88n/a max_workers: The maximum number of threads that can be used to
89n/a execute the given calls.
90n/a thread_name_prefix: An optional name prefix to give our threads.
91n/a """
92n/a if max_workers is None:
93n/a # Use this number because ThreadPoolExecutor is often
94n/a # used to overlap I/O instead of CPU work.
95n/a max_workers = (os.cpu_count() or 1) * 5
96n/a if max_workers <= 0:
97n/a raise ValueError("max_workers must be greater than 0")
98n/a
99n/a self._max_workers = max_workers
100n/a self._work_queue = queue.Queue()
101n/a self._threads = set()
102n/a self._shutdown = False
103n/a self._shutdown_lock = threading.Lock()
104n/a self._thread_name_prefix = thread_name_prefix
105n/a
106n/a def submit(self, fn, *args, **kwargs):
107n/a with self._shutdown_lock:
108n/a if self._shutdown:
109n/a raise RuntimeError('cannot schedule new futures after shutdown')
110n/a
111n/a f = _base.Future()
112n/a w = _WorkItem(f, fn, args, kwargs)
113n/a
114n/a self._work_queue.put(w)
115n/a self._adjust_thread_count()
116n/a return f
117n/a submit.__doc__ = _base.Executor.submit.__doc__
118n/a
119n/a def _adjust_thread_count(self):
120n/a # When the executor gets lost, the weakref callback will wake up
121n/a # the worker threads.
122n/a def weakref_cb(_, q=self._work_queue):
123n/a q.put(None)
124n/a # TODO(bquinlan): Should avoid creating new threads if there are more
125n/a # idle threads than items in the work queue.
126n/a num_threads = len(self._threads)
127n/a if num_threads < self._max_workers:
128n/a thread_name = '%s_%d' % (self._thread_name_prefix or self,
129n/a num_threads)
130n/a t = threading.Thread(name=thread_name, target=_worker,
131n/a args=(weakref.ref(self, weakref_cb),
132n/a self._work_queue))
133n/a t.daemon = True
134n/a t.start()
135n/a self._threads.add(t)
136n/a _threads_queues[t] = self._work_queue
137n/a
138n/a def shutdown(self, wait=True):
139n/a with self._shutdown_lock:
140n/a self._shutdown = True
141n/a self._work_queue.put(None)
142n/a if wait:
143n/a for t in self._threads:
144n/a t.join()
145n/a shutdown.__doc__ = _base.Executor.shutdown.__doc__