ยปCore Development>Code coverage>Lib/threading.py

Python code coverage for Lib/threading.py

#countcontent
1n/a"""Thread module emulating a subset of Java's threading model."""
2n/a
3n/aimport sys as _sys
4n/aimport _thread
5n/a
6n/afrom time import monotonic as _time
7n/afrom traceback import format_exc as _format_exc
8n/afrom _weakrefset import WeakSet
9n/afrom itertools import islice as _islice, count as _count
10n/atry:
11n/a from _collections import deque as _deque
12n/aexcept ImportError:
13n/a from collections import deque as _deque
14n/a
15n/a# Note regarding PEP 8 compliant names
16n/a# This threading model was originally inspired by Java, and inherited
17n/a# the convention of camelCase function and method names from that
18n/a# language. Those original names are not in any imminent danger of
19n/a# being deprecated (even for Py3k),so this module provides them as an
20n/a# alias for the PEP 8 compliant names
21n/a# Note that using the new PEP 8 compliant names facilitates substitution
22n/a# with the multiprocessing module, which doesn't provide the old
23n/a# Java inspired names.
24n/a
25n/a__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
26n/a 'enumerate', 'main_thread', 'TIMEOUT_MAX',
27n/a 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
28n/a 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
29n/a 'setprofile', 'settrace', 'local', 'stack_size']
30n/a
31n/a# Rename some stuff so "from threading import *" is safe
32n/a_start_new_thread = _thread.start_new_thread
33n/a_allocate_lock = _thread.allocate_lock
34n/a_set_sentinel = _thread._set_sentinel
35n/aget_ident = _thread.get_ident
36n/aThreadError = _thread.error
37n/atry:
38n/a _CRLock = _thread.RLock
39n/aexcept AttributeError:
40n/a _CRLock = None
41n/aTIMEOUT_MAX = _thread.TIMEOUT_MAX
42n/adel _thread
43n/a
44n/a
45n/a# Support for profile and trace hooks
46n/a
47n/a_profile_hook = None
48n/a_trace_hook = None
49n/a
50n/adef setprofile(func):
51n/a """Set a profile function for all threads started from the threading module.
52n/a
53n/a The func will be passed to sys.setprofile() for each thread, before its
54n/a run() method is called.
55n/a
56n/a """
57n/a global _profile_hook
58n/a _profile_hook = func
59n/a
60n/adef settrace(func):
61n/a """Set a trace function for all threads started from the threading module.
62n/a
63n/a The func will be passed to sys.settrace() for each thread, before its run()
64n/a method is called.
65n/a
66n/a """
67n/a global _trace_hook
68n/a _trace_hook = func
69n/a
70n/a# Synchronization classes
71n/a
72n/aLock = _allocate_lock
73n/a
74n/adef RLock(*args, **kwargs):
75n/a """Factory function that returns a new reentrant lock.
76n/a
77n/a A reentrant lock must be released by the thread that acquired it. Once a
78n/a thread has acquired a reentrant lock, the same thread may acquire it again
79n/a without blocking; the thread must release it once for each time it has
80n/a acquired it.
81n/a
82n/a """
83n/a if _CRLock is None:
84n/a return _PyRLock(*args, **kwargs)
85n/a return _CRLock(*args, **kwargs)
86n/a
87n/aclass _RLock:
88n/a """This class implements reentrant lock objects.
89n/a
90n/a A reentrant lock must be released by the thread that acquired it. Once a
91n/a thread has acquired a reentrant lock, the same thread may acquire it
92n/a again without blocking; the thread must release it once for each time it
93n/a has acquired it.
94n/a
95n/a """
96n/a
97n/a def __init__(self):
98n/a self._block = _allocate_lock()
99n/a self._owner = None
100n/a self._count = 0
101n/a
102n/a def __repr__(self):
103n/a owner = self._owner
104n/a try:
105n/a owner = _active[owner].name
106n/a except KeyError:
107n/a pass
108n/a return "<%s %s.%s object owner=%r count=%d at %s>" % (
109n/a "locked" if self._block.locked() else "unlocked",
110n/a self.__class__.__module__,
111n/a self.__class__.__qualname__,
112n/a owner,
113n/a self._count,
114n/a hex(id(self))
115n/a )
116n/a
117n/a def acquire(self, blocking=True, timeout=-1):
118n/a """Acquire a lock, blocking or non-blocking.
119n/a
120n/a When invoked without arguments: if this thread already owns the lock,
121n/a increment the recursion level by one, and return immediately. Otherwise,
122n/a if another thread owns the lock, block until the lock is unlocked. Once
123n/a the lock is unlocked (not owned by any thread), then grab ownership, set
124n/a the recursion level to one, and return. If more than one thread is
125n/a blocked waiting until the lock is unlocked, only one at a time will be
126n/a able to grab ownership of the lock. There is no return value in this
127n/a case.
128n/a
129n/a When invoked with the blocking argument set to true, do the same thing
130n/a as when called without arguments, and return true.
131n/a
132n/a When invoked with the blocking argument set to false, do not block. If a
133n/a call without an argument would block, return false immediately;
134n/a otherwise, do the same thing as when called without arguments, and
135n/a return true.
136n/a
137n/a When invoked with the floating-point timeout argument set to a positive
138n/a value, block for at most the number of seconds specified by timeout
139n/a and as long as the lock cannot be acquired. Return true if the lock has
140n/a been acquired, false if the timeout has elapsed.
141n/a
142n/a """
143n/a me = get_ident()
144n/a if self._owner == me:
145n/a self._count += 1
146n/a return 1
147n/a rc = self._block.acquire(blocking, timeout)
148n/a if rc:
149n/a self._owner = me
150n/a self._count = 1
151n/a return rc
152n/a
153n/a __enter__ = acquire
154n/a
155n/a def release(self):
156n/a """Release a lock, decrementing the recursion level.
157n/a
158n/a If after the decrement it is zero, reset the lock to unlocked (not owned
159n/a by any thread), and if any other threads are blocked waiting for the
160n/a lock to become unlocked, allow exactly one of them to proceed. If after
161n/a the decrement the recursion level is still nonzero, the lock remains
162n/a locked and owned by the calling thread.
163n/a
164n/a Only call this method when the calling thread owns the lock. A
165n/a RuntimeError is raised if this method is called when the lock is
166n/a unlocked.
167n/a
168n/a There is no return value.
169n/a
170n/a """
171n/a if self._owner != get_ident():
172n/a raise RuntimeError("cannot release un-acquired lock")
173n/a self._count = count = self._count - 1
174n/a if not count:
175n/a self._owner = None
176n/a self._block.release()
177n/a
178n/a def __exit__(self, t, v, tb):
179n/a self.release()
180n/a
181n/a # Internal methods used by condition variables
182n/a
183n/a def _acquire_restore(self, state):
184n/a self._block.acquire()
185n/a self._count, self._owner = state
186n/a
187n/a def _release_save(self):
188n/a if self._count == 0:
189n/a raise RuntimeError("cannot release un-acquired lock")
190n/a count = self._count
191n/a self._count = 0
192n/a owner = self._owner
193n/a self._owner = None
194n/a self._block.release()
195n/a return (count, owner)
196n/a
197n/a def _is_owned(self):
198n/a return self._owner == get_ident()
199n/a
200n/a_PyRLock = _RLock
201n/a
202n/a
203n/aclass Condition:
204n/a """Class that implements a condition variable.
205n/a
206n/a A condition variable allows one or more threads to wait until they are
207n/a notified by another thread.
208n/a
209n/a If the lock argument is given and not None, it must be a Lock or RLock
210n/a object, and it is used as the underlying lock. Otherwise, a new RLock object
211n/a is created and used as the underlying lock.
212n/a
213n/a """
214n/a
215n/a def __init__(self, lock=None):
216n/a if lock is None:
217n/a lock = RLock()
218n/a self._lock = lock
219n/a # Export the lock's acquire() and release() methods
220n/a self.acquire = lock.acquire
221n/a self.release = lock.release
222n/a # If the lock defines _release_save() and/or _acquire_restore(),
223n/a # these override the default implementations (which just call
224n/a # release() and acquire() on the lock). Ditto for _is_owned().
225n/a try:
226n/a self._release_save = lock._release_save
227n/a except AttributeError:
228n/a pass
229n/a try:
230n/a self._acquire_restore = lock._acquire_restore
231n/a except AttributeError:
232n/a pass
233n/a try:
234n/a self._is_owned = lock._is_owned
235n/a except AttributeError:
236n/a pass
237n/a self._waiters = _deque()
238n/a
239n/a def __enter__(self):
240n/a return self._lock.__enter__()
241n/a
242n/a def __exit__(self, *args):
243n/a return self._lock.__exit__(*args)
244n/a
245n/a def __repr__(self):
246n/a return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
247n/a
248n/a def _release_save(self):
249n/a self._lock.release() # No state to save
250n/a
251n/a def _acquire_restore(self, x):
252n/a self._lock.acquire() # Ignore saved state
253n/a
254n/a def _is_owned(self):
255n/a # Return True if lock is owned by current_thread.
256n/a # This method is called only if _lock doesn't have _is_owned().
257n/a if self._lock.acquire(0):
258n/a self._lock.release()
259n/a return False
260n/a else:
261n/a return True
262n/a
263n/a def wait(self, timeout=None):
264n/a """Wait until notified or until a timeout occurs.
265n/a
266n/a If the calling thread has not acquired the lock when this method is
267n/a called, a RuntimeError is raised.
268n/a
269n/a This method releases the underlying lock, and then blocks until it is
270n/a awakened by a notify() or notify_all() call for the same condition
271n/a variable in another thread, or until the optional timeout occurs. Once
272n/a awakened or timed out, it re-acquires the lock and returns.
273n/a
274n/a When the timeout argument is present and not None, it should be a
275n/a floating point number specifying a timeout for the operation in seconds
276n/a (or fractions thereof).
277n/a
278n/a When the underlying lock is an RLock, it is not released using its
279n/a release() method, since this may not actually unlock the lock when it
280n/a was acquired multiple times recursively. Instead, an internal interface
281n/a of the RLock class is used, which really unlocks it even when it has
282n/a been recursively acquired several times. Another internal interface is
283n/a then used to restore the recursion level when the lock is reacquired.
284n/a
285n/a """
286n/a if not self._is_owned():
287n/a raise RuntimeError("cannot wait on un-acquired lock")
288n/a waiter = _allocate_lock()
289n/a waiter.acquire()
290n/a self._waiters.append(waiter)
291n/a saved_state = self._release_save()
292n/a gotit = False
293n/a try: # restore state no matter what (e.g., KeyboardInterrupt)
294n/a if timeout is None:
295n/a waiter.acquire()
296n/a gotit = True
297n/a else:
298n/a if timeout > 0:
299n/a gotit = waiter.acquire(True, timeout)
300n/a else:
301n/a gotit = waiter.acquire(False)
302n/a return gotit
303n/a finally:
304n/a self._acquire_restore(saved_state)
305n/a if not gotit:
306n/a try:
307n/a self._waiters.remove(waiter)
308n/a except ValueError:
309n/a pass
310n/a
311n/a def wait_for(self, predicate, timeout=None):
312n/a """Wait until a condition evaluates to True.
313n/a
314n/a predicate should be a callable which result will be interpreted as a
315n/a boolean value. A timeout may be provided giving the maximum time to
316n/a wait.
317n/a
318n/a """
319n/a endtime = None
320n/a waittime = timeout
321n/a result = predicate()
322n/a while not result:
323n/a if waittime is not None:
324n/a if endtime is None:
325n/a endtime = _time() + waittime
326n/a else:
327n/a waittime = endtime - _time()
328n/a if waittime <= 0:
329n/a break
330n/a self.wait(waittime)
331n/a result = predicate()
332n/a return result
333n/a
334n/a def notify(self, n=1):
335n/a """Wake up one or more threads waiting on this condition, if any.
336n/a
337n/a If the calling thread has not acquired the lock when this method is
338n/a called, a RuntimeError is raised.
339n/a
340n/a This method wakes up at most n of the threads waiting for the condition
341n/a variable; it is a no-op if no threads are waiting.
342n/a
343n/a """
344n/a if not self._is_owned():
345n/a raise RuntimeError("cannot notify on un-acquired lock")
346n/a all_waiters = self._waiters
347n/a waiters_to_notify = _deque(_islice(all_waiters, n))
348n/a if not waiters_to_notify:
349n/a return
350n/a for waiter in waiters_to_notify:
351n/a waiter.release()
352n/a try:
353n/a all_waiters.remove(waiter)
354n/a except ValueError:
355n/a pass
356n/a
357n/a def notify_all(self):
358n/a """Wake up all threads waiting on this condition.
359n/a
360n/a If the calling thread has not acquired the lock when this method
361n/a is called, a RuntimeError is raised.
362n/a
363n/a """
364n/a self.notify(len(self._waiters))
365n/a
366n/a notifyAll = notify_all
367n/a
368n/a
369n/aclass Semaphore:
370n/a """This class implements semaphore objects.
371n/a
372n/a Semaphores manage a counter representing the number of release() calls minus
373n/a the number of acquire() calls, plus an initial value. The acquire() method
374n/a blocks if necessary until it can return without making the counter
375n/a negative. If not given, value defaults to 1.
376n/a
377n/a """
378n/a
379n/a # After Tim Peters' semaphore class, but not quite the same (no maximum)
380n/a
381n/a def __init__(self, value=1):
382n/a if value < 0:
383n/a raise ValueError("semaphore initial value must be >= 0")
384n/a self._cond = Condition(Lock())
385n/a self._value = value
386n/a
387n/a def acquire(self, blocking=True, timeout=None):
388n/a """Acquire a semaphore, decrementing the internal counter by one.
389n/a
390n/a When invoked without arguments: if the internal counter is larger than
391n/a zero on entry, decrement it by one and return immediately. If it is zero
392n/a on entry, block, waiting until some other thread has called release() to
393n/a make it larger than zero. This is done with proper interlocking so that
394n/a if multiple acquire() calls are blocked, release() will wake exactly one
395n/a of them up. The implementation may pick one at random, so the order in
396n/a which blocked threads are awakened should not be relied on. There is no
397n/a return value in this case.
398n/a
399n/a When invoked with blocking set to true, do the same thing as when called
400n/a without arguments, and return true.
401n/a
402n/a When invoked with blocking set to false, do not block. If a call without
403n/a an argument would block, return false immediately; otherwise, do the
404n/a same thing as when called without arguments, and return true.
405n/a
406n/a When invoked with a timeout other than None, it will block for at
407n/a most timeout seconds. If acquire does not complete successfully in
408n/a that interval, return false. Return true otherwise.
409n/a
410n/a """
411n/a if not blocking and timeout is not None:
412n/a raise ValueError("can't specify timeout for non-blocking acquire")
413n/a rc = False
414n/a endtime = None
415n/a with self._cond:
416n/a while self._value == 0:
417n/a if not blocking:
418n/a break
419n/a if timeout is not None:
420n/a if endtime is None:
421n/a endtime = _time() + timeout
422n/a else:
423n/a timeout = endtime - _time()
424n/a if timeout <= 0:
425n/a break
426n/a self._cond.wait(timeout)
427n/a else:
428n/a self._value -= 1
429n/a rc = True
430n/a return rc
431n/a
432n/a __enter__ = acquire
433n/a
434n/a def release(self):
435n/a """Release a semaphore, incrementing the internal counter by one.
436n/a
437n/a When the counter is zero on entry and another thread is waiting for it
438n/a to become larger than zero again, wake up that thread.
439n/a
440n/a """
441n/a with self._cond:
442n/a self._value += 1
443n/a self._cond.notify()
444n/a
445n/a def __exit__(self, t, v, tb):
446n/a self.release()
447n/a
448n/a
449n/aclass BoundedSemaphore(Semaphore):
450n/a """Implements a bounded semaphore.
451n/a
452n/a A bounded semaphore checks to make sure its current value doesn't exceed its
453n/a initial value. If it does, ValueError is raised. In most situations
454n/a semaphores are used to guard resources with limited capacity.
455n/a
456n/a If the semaphore is released too many times it's a sign of a bug. If not
457n/a given, value defaults to 1.
458n/a
459n/a Like regular semaphores, bounded semaphores manage a counter representing
460n/a the number of release() calls minus the number of acquire() calls, plus an
461n/a initial value. The acquire() method blocks if necessary until it can return
462n/a without making the counter negative. If not given, value defaults to 1.
463n/a
464n/a """
465n/a
466n/a def __init__(self, value=1):
467n/a Semaphore.__init__(self, value)
468n/a self._initial_value = value
469n/a
470n/a def release(self):
471n/a """Release a semaphore, incrementing the internal counter by one.
472n/a
473n/a When the counter is zero on entry and another thread is waiting for it
474n/a to become larger than zero again, wake up that thread.
475n/a
476n/a If the number of releases exceeds the number of acquires,
477n/a raise a ValueError.
478n/a
479n/a """
480n/a with self._cond:
481n/a if self._value >= self._initial_value:
482n/a raise ValueError("Semaphore released too many times")
483n/a self._value += 1
484n/a self._cond.notify()
485n/a
486n/a
487n/aclass Event:
488n/a """Class implementing event objects.
489n/a
490n/a Events manage a flag that can be set to true with the set() method and reset
491n/a to false with the clear() method. The wait() method blocks until the flag is
492n/a true. The flag is initially false.
493n/a
494n/a """
495n/a
496n/a # After Tim Peters' event class (without is_posted())
497n/a
498n/a def __init__(self):
499n/a self._cond = Condition(Lock())
500n/a self._flag = False
501n/a
502n/a def _reset_internal_locks(self):
503n/a # private! called by Thread._reset_internal_locks by _after_fork()
504n/a self._cond.__init__(Lock())
505n/a
506n/a def is_set(self):
507n/a """Return true if and only if the internal flag is true."""
508n/a return self._flag
509n/a
510n/a isSet = is_set
511n/a
512n/a def set(self):
513n/a """Set the internal flag to true.
514n/a
515n/a All threads waiting for it to become true are awakened. Threads
516n/a that call wait() once the flag is true will not block at all.
517n/a
518n/a """
519n/a with self._cond:
520n/a self._flag = True
521n/a self._cond.notify_all()
522n/a
523n/a def clear(self):
524n/a """Reset the internal flag to false.
525n/a
526n/a Subsequently, threads calling wait() will block until set() is called to
527n/a set the internal flag to true again.
528n/a
529n/a """
530n/a with self._cond:
531n/a self._flag = False
532n/a
533n/a def wait(self, timeout=None):
534n/a """Block until the internal flag is true.
535n/a
536n/a If the internal flag is true on entry, return immediately. Otherwise,
537n/a block until another thread calls set() to set the flag to true, or until
538n/a the optional timeout occurs.
539n/a
540n/a When the timeout argument is present and not None, it should be a
541n/a floating point number specifying a timeout for the operation in seconds
542n/a (or fractions thereof).
543n/a
544n/a This method returns the internal flag on exit, so it will always return
545n/a True except if a timeout is given and the operation times out.
546n/a
547n/a """
548n/a with self._cond:
549n/a signaled = self._flag
550n/a if not signaled:
551n/a signaled = self._cond.wait(timeout)
552n/a return signaled
553n/a
554n/a
555n/a# A barrier class. Inspired in part by the pthread_barrier_* api and
556n/a# the CyclicBarrier class from Java. See
557n/a# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
558n/a# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
559n/a# CyclicBarrier.html
560n/a# for information.
561n/a# We maintain two main states, 'filling' and 'draining' enabling the barrier
562n/a# to be cyclic. Threads are not allowed into it until it has fully drained
563n/a# since the previous cycle. In addition, a 'resetting' state exists which is
564n/a# similar to 'draining' except that threads leave with a BrokenBarrierError,
565n/a# and a 'broken' state in which all threads get the exception.
566n/aclass Barrier:
567n/a """Implements a Barrier.
568n/a
569n/a Useful for synchronizing a fixed number of threads at known synchronization
570n/a points. Threads block on 'wait()' and are simultaneously once they have all
571n/a made that call.
572n/a
573n/a """
574n/a
575n/a def __init__(self, parties, action=None, timeout=None):
576n/a """Create a barrier, initialised to 'parties' threads.
577n/a
578n/a 'action' is a callable which, when supplied, will be called by one of
579n/a the threads after they have all entered the barrier and just prior to
580n/a releasing them all. If a 'timeout' is provided, it is uses as the
581n/a default for all subsequent 'wait()' calls.
582n/a
583n/a """
584n/a self._cond = Condition(Lock())
585n/a self._action = action
586n/a self._timeout = timeout
587n/a self._parties = parties
588n/a self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
589n/a self._count = 0
590n/a
591n/a def wait(self, timeout=None):
592n/a """Wait for the barrier.
593n/a
594n/a When the specified number of threads have started waiting, they are all
595n/a simultaneously awoken. If an 'action' was provided for the barrier, one
596n/a of the threads will have executed that callback prior to returning.
597n/a Returns an individual index number from 0 to 'parties-1'.
598n/a
599n/a """
600n/a if timeout is None:
601n/a timeout = self._timeout
602n/a with self._cond:
603n/a self._enter() # Block while the barrier drains.
604n/a index = self._count
605n/a self._count += 1
606n/a try:
607n/a if index + 1 == self._parties:
608n/a # We release the barrier
609n/a self._release()
610n/a else:
611n/a # We wait until someone releases us
612n/a self._wait(timeout)
613n/a return index
614n/a finally:
615n/a self._count -= 1
616n/a # Wake up any threads waiting for barrier to drain.
617n/a self._exit()
618n/a
619n/a # Block until the barrier is ready for us, or raise an exception
620n/a # if it is broken.
621n/a def _enter(self):
622n/a while self._state in (-1, 1):
623n/a # It is draining or resetting, wait until done
624n/a self._cond.wait()
625n/a #see if the barrier is in a broken state
626n/a if self._state < 0:
627n/a raise BrokenBarrierError
628n/a assert self._state == 0
629n/a
630n/a # Optionally run the 'action' and release the threads waiting
631n/a # in the barrier.
632n/a def _release(self):
633n/a try:
634n/a if self._action:
635n/a self._action()
636n/a # enter draining state
637n/a self._state = 1
638n/a self._cond.notify_all()
639n/a except:
640n/a #an exception during the _action handler. Break and reraise
641n/a self._break()
642n/a raise
643n/a
644n/a # Wait in the barrier until we are released. Raise an exception
645n/a # if the barrier is reset or broken.
646n/a def _wait(self, timeout):
647n/a if not self._cond.wait_for(lambda : self._state != 0, timeout):
648n/a #timed out. Break the barrier
649n/a self._break()
650n/a raise BrokenBarrierError
651n/a if self._state < 0:
652n/a raise BrokenBarrierError
653n/a assert self._state == 1
654n/a
655n/a # If we are the last thread to exit the barrier, signal any threads
656n/a # waiting for the barrier to drain.
657n/a def _exit(self):
658n/a if self._count == 0:
659n/a if self._state in (-1, 1):
660n/a #resetting or draining
661n/a self._state = 0
662n/a self._cond.notify_all()
663n/a
664n/a def reset(self):
665n/a """Reset the barrier to the initial state.
666n/a
667n/a Any threads currently waiting will get the BrokenBarrier exception
668n/a raised.
669n/a
670n/a """
671n/a with self._cond:
672n/a if self._count > 0:
673n/a if self._state == 0:
674n/a #reset the barrier, waking up threads
675n/a self._state = -1
676n/a elif self._state == -2:
677n/a #was broken, set it to reset state
678n/a #which clears when the last thread exits
679n/a self._state = -1
680n/a else:
681n/a self._state = 0
682n/a self._cond.notify_all()
683n/a
684n/a def abort(self):
685n/a """Place the barrier into a 'broken' state.
686n/a
687n/a Useful in case of error. Any currently waiting threads and threads
688n/a attempting to 'wait()' will have BrokenBarrierError raised.
689n/a
690n/a """
691n/a with self._cond:
692n/a self._break()
693n/a
694n/a def _break(self):
695n/a # An internal error was detected. The barrier is set to
696n/a # a broken state all parties awakened.
697n/a self._state = -2
698n/a self._cond.notify_all()
699n/a
700n/a @property
701n/a def parties(self):
702n/a """Return the number of threads required to trip the barrier."""
703n/a return self._parties
704n/a
705n/a @property
706n/a def n_waiting(self):
707n/a """Return the number of threads currently waiting at the barrier."""
708n/a # We don't need synchronization here since this is an ephemeral result
709n/a # anyway. It returns the correct value in the steady state.
710n/a if self._state == 0:
711n/a return self._count
712n/a return 0
713n/a
714n/a @property
715n/a def broken(self):
716n/a """Return True if the barrier is in a broken state."""
717n/a return self._state == -2
718n/a
719n/a# exception raised by the Barrier class
720n/aclass BrokenBarrierError(RuntimeError):
721n/a pass
722n/a
723n/a
724n/a# Helper to generate new thread names
725n/a_counter = _count().__next__
726n/a_counter() # Consume 0 so first non-main thread has id 1.
727n/adef _newname(template="Thread-%d"):
728n/a return template % _counter()
729n/a
730n/a# Active thread administration
731n/a_active_limbo_lock = _allocate_lock()
732n/a_active = {} # maps thread id to Thread object
733n/a_limbo = {}
734n/a_dangling = WeakSet()
735n/a
736n/a# Main class for threads
737n/a
738n/aclass Thread:
739n/a """A class that represents a thread of control.
740n/a
741n/a This class can be safely subclassed in a limited fashion. There are two ways
742n/a to specify the activity: by passing a callable object to the constructor, or
743n/a by overriding the run() method in a subclass.
744n/a
745n/a """
746n/a
747n/a _initialized = False
748n/a # Need to store a reference to sys.exc_info for printing
749n/a # out exceptions when a thread tries to use a global var. during interp.
750n/a # shutdown and thus raises an exception about trying to perform some
751n/a # operation on/with a NoneType
752n/a _exc_info = _sys.exc_info
753n/a # Keep sys.exc_clear too to clear the exception just before
754n/a # allowing .join() to return.
755n/a #XXX __exc_clear = _sys.exc_clear
756n/a
757n/a def __init__(self, group=None, target=None, name=None,
758n/a args=(), kwargs=None, *, daemon=None):
759n/a """This constructor should always be called with keyword arguments. Arguments are:
760n/a
761n/a *group* should be None; reserved for future extension when a ThreadGroup
762n/a class is implemented.
763n/a
764n/a *target* is the callable object to be invoked by the run()
765n/a method. Defaults to None, meaning nothing is called.
766n/a
767n/a *name* is the thread name. By default, a unique name is constructed of
768n/a the form "Thread-N" where N is a small decimal number.
769n/a
770n/a *args* is the argument tuple for the target invocation. Defaults to ().
771n/a
772n/a *kwargs* is a dictionary of keyword arguments for the target
773n/a invocation. Defaults to {}.
774n/a
775n/a If a subclass overrides the constructor, it must make sure to invoke
776n/a the base class constructor (Thread.__init__()) before doing anything
777n/a else to the thread.
778n/a
779n/a """
780n/a assert group is None, "group argument must be None for now"
781n/a if kwargs is None:
782n/a kwargs = {}
783n/a self._target = target
784n/a self._name = str(name or _newname())
785n/a self._args = args
786n/a self._kwargs = kwargs
787n/a if daemon is not None:
788n/a self._daemonic = daemon
789n/a else:
790n/a self._daemonic = current_thread().daemon
791n/a self._ident = None
792n/a self._tstate_lock = None
793n/a self._started = Event()
794n/a self._is_stopped = False
795n/a self._initialized = True
796n/a # sys.stderr is not stored in the class like
797n/a # sys.exc_info since it can be changed between instances
798n/a self._stderr = _sys.stderr
799n/a # For debugging and _after_fork()
800n/a _dangling.add(self)
801n/a
802n/a def _reset_internal_locks(self, is_alive):
803n/a # private! Called by _after_fork() to reset our internal locks as
804n/a # they may be in an invalid state leading to a deadlock or crash.
805n/a self._started._reset_internal_locks()
806n/a if is_alive:
807n/a self._set_tstate_lock()
808n/a else:
809n/a # The thread isn't alive after fork: it doesn't have a tstate
810n/a # anymore.
811n/a self._is_stopped = True
812n/a self._tstate_lock = None
813n/a
814n/a def __repr__(self):
815n/a assert self._initialized, "Thread.__init__() was not called"
816n/a status = "initial"
817n/a if self._started.is_set():
818n/a status = "started"
819n/a self.is_alive() # easy way to get ._is_stopped set when appropriate
820n/a if self._is_stopped:
821n/a status = "stopped"
822n/a if self._daemonic:
823n/a status += " daemon"
824n/a if self._ident is not None:
825n/a status += " %s" % self._ident
826n/a return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
827n/a
828n/a def start(self):
829n/a """Start the thread's activity.
830n/a
831n/a It must be called at most once per thread object. It arranges for the
832n/a object's run() method to be invoked in a separate thread of control.
833n/a
834n/a This method will raise a RuntimeError if called more than once on the
835n/a same thread object.
836n/a
837n/a """
838n/a if not self._initialized:
839n/a raise RuntimeError("thread.__init__() not called")
840n/a
841n/a if self._started.is_set():
842n/a raise RuntimeError("threads can only be started once")
843n/a with _active_limbo_lock:
844n/a _limbo[self] = self
845n/a try:
846n/a _start_new_thread(self._bootstrap, ())
847n/a except Exception:
848n/a with _active_limbo_lock:
849n/a del _limbo[self]
850n/a raise
851n/a self._started.wait()
852n/a
853n/a def run(self):
854n/a """Method representing the thread's activity.
855n/a
856n/a You may override this method in a subclass. The standard run() method
857n/a invokes the callable object passed to the object's constructor as the
858n/a target argument, if any, with sequential and keyword arguments taken
859n/a from the args and kwargs arguments, respectively.
860n/a
861n/a """
862n/a try:
863n/a if self._target:
864n/a self._target(*self._args, **self._kwargs)
865n/a finally:
866n/a # Avoid a refcycle if the thread is running a function with
867n/a # an argument that has a member that points to the thread.
868n/a del self._target, self._args, self._kwargs
869n/a
870n/a def _bootstrap(self):
871n/a # Wrapper around the real bootstrap code that ignores
872n/a # exceptions during interpreter cleanup. Those typically
873n/a # happen when a daemon thread wakes up at an unfortunate
874n/a # moment, finds the world around it destroyed, and raises some
875n/a # random exception *** while trying to report the exception in
876n/a # _bootstrap_inner() below ***. Those random exceptions
877n/a # don't help anybody, and they confuse users, so we suppress
878n/a # them. We suppress them only when it appears that the world
879n/a # indeed has already been destroyed, so that exceptions in
880n/a # _bootstrap_inner() during normal business hours are properly
881n/a # reported. Also, we only suppress them for daemonic threads;
882n/a # if a non-daemonic encounters this, something else is wrong.
883n/a try:
884n/a self._bootstrap_inner()
885n/a except:
886n/a if self._daemonic and _sys is None:
887n/a return
888n/a raise
889n/a
890n/a def _set_ident(self):
891n/a self._ident = get_ident()
892n/a
893n/a def _set_tstate_lock(self):
894n/a """
895n/a Set a lock object which will be released by the interpreter when
896n/a the underlying thread state (see pystate.h) gets deleted.
897n/a """
898n/a self._tstate_lock = _set_sentinel()
899n/a self._tstate_lock.acquire()
900n/a
901n/a def _bootstrap_inner(self):
902n/a try:
903n/a self._set_ident()
904n/a self._set_tstate_lock()
905n/a self._started.set()
906n/a with _active_limbo_lock:
907n/a _active[self._ident] = self
908n/a del _limbo[self]
909n/a
910n/a if _trace_hook:
911n/a _sys.settrace(_trace_hook)
912n/a if _profile_hook:
913n/a _sys.setprofile(_profile_hook)
914n/a
915n/a try:
916n/a self.run()
917n/a except SystemExit:
918n/a pass
919n/a except:
920n/a # If sys.stderr is no more (most likely from interpreter
921n/a # shutdown) use self._stderr. Otherwise still use sys (as in
922n/a # _sys) in case sys.stderr was redefined since the creation of
923n/a # self.
924n/a if _sys and _sys.stderr is not None:
925n/a print("Exception in thread %s:\n%s" %
926n/a (self.name, _format_exc()), file=_sys.stderr)
927n/a elif self._stderr is not None:
928n/a # Do the best job possible w/o a huge amt. of code to
929n/a # approximate a traceback (code ideas from
930n/a # Lib/traceback.py)
931n/a exc_type, exc_value, exc_tb = self._exc_info()
932n/a try:
933n/a print((
934n/a "Exception in thread " + self.name +
935n/a " (most likely raised during interpreter shutdown):"), file=self._stderr)
936n/a print((
937n/a "Traceback (most recent call last):"), file=self._stderr)
938n/a while exc_tb:
939n/a print((
940n/a ' File "%s", line %s, in %s' %
941n/a (exc_tb.tb_frame.f_code.co_filename,
942n/a exc_tb.tb_lineno,
943n/a exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
944n/a exc_tb = exc_tb.tb_next
945n/a print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
946n/a # Make sure that exc_tb gets deleted since it is a memory
947n/a # hog; deleting everything else is just for thoroughness
948n/a finally:
949n/a del exc_type, exc_value, exc_tb
950n/a finally:
951n/a # Prevent a race in
952n/a # test_threading.test_no_refcycle_through_target when
953n/a # the exception keeps the target alive past when we
954n/a # assert that it's dead.
955n/a #XXX self._exc_clear()
956n/a pass
957n/a finally:
958n/a with _active_limbo_lock:
959n/a try:
960n/a # We don't call self._delete() because it also
961n/a # grabs _active_limbo_lock.
962n/a del _active[get_ident()]
963n/a except:
964n/a pass
965n/a
966n/a def _stop(self):
967n/a # After calling ._stop(), .is_alive() returns False and .join() returns
968n/a # immediately. ._tstate_lock must be released before calling ._stop().
969n/a #
970n/a # Normal case: C code at the end of the thread's life
971n/a # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
972n/a # that's detected by our ._wait_for_tstate_lock(), called by .join()
973n/a # and .is_alive(). Any number of threads _may_ call ._stop()
974n/a # simultaneously (for example, if multiple threads are blocked in
975n/a # .join() calls), and they're not serialized. That's harmless -
976n/a # they'll just make redundant rebindings of ._is_stopped and
977n/a # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
978n/a # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
979n/a # (the assert is executed only if ._tstate_lock is None).
980n/a #
981n/a # Special case: _main_thread releases ._tstate_lock via this
982n/a # module's _shutdown() function.
983n/a lock = self._tstate_lock
984n/a if lock is not None:
985n/a assert not lock.locked()
986n/a self._is_stopped = True
987n/a self._tstate_lock = None
988n/a
989n/a def _delete(self):
990n/a "Remove current thread from the dict of currently running threads."
991n/a
992n/a # Notes about running with _dummy_thread:
993n/a #
994n/a # Must take care to not raise an exception if _dummy_thread is being
995n/a # used (and thus this module is being used as an instance of
996n/a # dummy_threading). _dummy_thread.get_ident() always returns -1 since
997n/a # there is only one thread if _dummy_thread is being used. Thus
998n/a # len(_active) is always <= 1 here, and any Thread instance created
999n/a # overwrites the (if any) thread currently registered in _active.
1000n/a #
1001n/a # An instance of _MainThread is always created by 'threading'. This
1002n/a # gets overwritten the instant an instance of Thread is created; both
1003n/a # threads return -1 from _dummy_thread.get_ident() and thus have the
1004n/a # same key in the dict. So when the _MainThread instance created by
1005n/a # 'threading' tries to clean itself up when atexit calls this method
1006n/a # it gets a KeyError if another Thread instance was created.
1007n/a #
1008n/a # This all means that KeyError from trying to delete something from
1009n/a # _active if dummy_threading is being used is a red herring. But
1010n/a # since it isn't if dummy_threading is *not* being used then don't
1011n/a # hide the exception.
1012n/a
1013n/a try:
1014n/a with _active_limbo_lock:
1015n/a del _active[get_ident()]
1016n/a # There must not be any python code between the previous line
1017n/a # and after the lock is released. Otherwise a tracing function
1018n/a # could try to acquire the lock again in the same thread, (in
1019n/a # current_thread()), and would block.
1020n/a except KeyError:
1021n/a if 'dummy_threading' not in _sys.modules:
1022n/a raise
1023n/a
1024n/a def join(self, timeout=None):
1025n/a """Wait until the thread terminates.
1026n/a
1027n/a This blocks the calling thread until the thread whose join() method is
1028n/a called terminates -- either normally or through an unhandled exception
1029n/a or until the optional timeout occurs.
1030n/a
1031n/a When the timeout argument is present and not None, it should be a
1032n/a floating point number specifying a timeout for the operation in seconds
1033n/a (or fractions thereof). As join() always returns None, you must call
1034n/a isAlive() after join() to decide whether a timeout happened -- if the
1035n/a thread is still alive, the join() call timed out.
1036n/a
1037n/a When the timeout argument is not present or None, the operation will
1038n/a block until the thread terminates.
1039n/a
1040n/a A thread can be join()ed many times.
1041n/a
1042n/a join() raises a RuntimeError if an attempt is made to join the current
1043n/a thread as that would cause a deadlock. It is also an error to join() a
1044n/a thread before it has been started and attempts to do so raises the same
1045n/a exception.
1046n/a
1047n/a """
1048n/a if not self._initialized:
1049n/a raise RuntimeError("Thread.__init__() not called")
1050n/a if not self._started.is_set():
1051n/a raise RuntimeError("cannot join thread before it is started")
1052n/a if self is current_thread():
1053n/a raise RuntimeError("cannot join current thread")
1054n/a
1055n/a if timeout is None:
1056n/a self._wait_for_tstate_lock()
1057n/a else:
1058n/a # the behavior of a negative timeout isn't documented, but
1059n/a # historically .join(timeout=x) for x<0 has acted as if timeout=0
1060n/a self._wait_for_tstate_lock(timeout=max(timeout, 0))
1061n/a
1062n/a def _wait_for_tstate_lock(self, block=True, timeout=-1):
1063n/a # Issue #18808: wait for the thread state to be gone.
1064n/a # At the end of the thread's life, after all knowledge of the thread
1065n/a # is removed from C data structures, C code releases our _tstate_lock.
1066n/a # This method passes its arguments to _tstate_lock.acquire().
1067n/a # If the lock is acquired, the C code is done, and self._stop() is
1068n/a # called. That sets ._is_stopped to True, and ._tstate_lock to None.
1069n/a lock = self._tstate_lock
1070n/a if lock is None: # already determined that the C code is done
1071n/a assert self._is_stopped
1072n/a elif lock.acquire(block, timeout):
1073n/a lock.release()
1074n/a self._stop()
1075n/a
1076n/a @property
1077n/a def name(self):
1078n/a """A string used for identification purposes only.
1079n/a
1080n/a It has no semantics. Multiple threads may be given the same name. The
1081n/a initial name is set by the constructor.
1082n/a
1083n/a """
1084n/a assert self._initialized, "Thread.__init__() not called"
1085n/a return self._name
1086n/a
1087n/a @name.setter
1088n/a def name(self, name):
1089n/a assert self._initialized, "Thread.__init__() not called"
1090n/a self._name = str(name)
1091n/a
1092n/a @property
1093n/a def ident(self):
1094n/a """Thread identifier of this thread or None if it has not been started.
1095n/a
1096n/a This is a nonzero integer. See the thread.get_ident() function. Thread
1097n/a identifiers may be recycled when a thread exits and another thread is
1098n/a created. The identifier is available even after the thread has exited.
1099n/a
1100n/a """
1101n/a assert self._initialized, "Thread.__init__() not called"
1102n/a return self._ident
1103n/a
1104n/a def is_alive(self):
1105n/a """Return whether the thread is alive.
1106n/a
1107n/a This method returns True just before the run() method starts until just
1108n/a after the run() method terminates. The module function enumerate()
1109n/a returns a list of all alive threads.
1110n/a
1111n/a """
1112n/a assert self._initialized, "Thread.__init__() not called"
1113n/a if self._is_stopped or not self._started.is_set():
1114n/a return False
1115n/a self._wait_for_tstate_lock(False)
1116n/a return not self._is_stopped
1117n/a
1118n/a isAlive = is_alive
1119n/a
1120n/a @property
1121n/a def daemon(self):
1122n/a """A boolean value indicating whether this thread is a daemon thread.
1123n/a
1124n/a This must be set before start() is called, otherwise RuntimeError is
1125n/a raised. Its initial value is inherited from the creating thread; the
1126n/a main thread is not a daemon thread and therefore all threads created in
1127n/a the main thread default to daemon = False.
1128n/a
1129n/a The entire Python program exits when no alive non-daemon threads are
1130n/a left.
1131n/a
1132n/a """
1133n/a assert self._initialized, "Thread.__init__() not called"
1134n/a return self._daemonic
1135n/a
1136n/a @daemon.setter
1137n/a def daemon(self, daemonic):
1138n/a if not self._initialized:
1139n/a raise RuntimeError("Thread.__init__() not called")
1140n/a if self._started.is_set():
1141n/a raise RuntimeError("cannot set daemon status of active thread")
1142n/a self._daemonic = daemonic
1143n/a
1144n/a def isDaemon(self):
1145n/a return self.daemon
1146n/a
1147n/a def setDaemon(self, daemonic):
1148n/a self.daemon = daemonic
1149n/a
1150n/a def getName(self):
1151n/a return self.name
1152n/a
1153n/a def setName(self, name):
1154n/a self.name = name
1155n/a
1156n/a# The timer class was contributed by Itamar Shtull-Trauring
1157n/a
1158n/aclass Timer(Thread):
1159n/a """Call a function after a specified number of seconds:
1160n/a
1161n/a t = Timer(30.0, f, args=None, kwargs=None)
1162n/a t.start()
1163n/a t.cancel() # stop the timer's action if it's still waiting
1164n/a
1165n/a """
1166n/a
1167n/a def __init__(self, interval, function, args=None, kwargs=None):
1168n/a Thread.__init__(self)
1169n/a self.interval = interval
1170n/a self.function = function
1171n/a self.args = args if args is not None else []
1172n/a self.kwargs = kwargs if kwargs is not None else {}
1173n/a self.finished = Event()
1174n/a
1175n/a def cancel(self):
1176n/a """Stop the timer if it hasn't finished yet."""
1177n/a self.finished.set()
1178n/a
1179n/a def run(self):
1180n/a self.finished.wait(self.interval)
1181n/a if not self.finished.is_set():
1182n/a self.function(*self.args, **self.kwargs)
1183n/a self.finished.set()
1184n/a
1185n/a# Special thread class to represent the main thread
1186n/a# This is garbage collected through an exit handler
1187n/a
1188n/aclass _MainThread(Thread):
1189n/a
1190n/a def __init__(self):
1191n/a Thread.__init__(self, name="MainThread", daemon=False)
1192n/a self._set_tstate_lock()
1193n/a self._started.set()
1194n/a self._set_ident()
1195n/a with _active_limbo_lock:
1196n/a _active[self._ident] = self
1197n/a
1198n/a
1199n/a# Dummy thread class to represent threads not started here.
1200n/a# These aren't garbage collected when they die, nor can they be waited for.
1201n/a# If they invoke anything in threading.py that calls current_thread(), they
1202n/a# leave an entry in the _active dict forever after.
1203n/a# Their purpose is to return *something* from current_thread().
1204n/a# They are marked as daemon threads so we won't wait for them
1205n/a# when we exit (conform previous semantics).
1206n/a
1207n/aclass _DummyThread(Thread):
1208n/a
1209n/a def __init__(self):
1210n/a Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1211n/a
1212n/a self._started.set()
1213n/a self._set_ident()
1214n/a with _active_limbo_lock:
1215n/a _active[self._ident] = self
1216n/a
1217n/a def _stop(self):
1218n/a pass
1219n/a
1220n/a def join(self, timeout=None):
1221n/a assert False, "cannot join a dummy thread"
1222n/a
1223n/a
1224n/a# Global API functions
1225n/a
1226n/adef current_thread():
1227n/a """Return the current Thread object, corresponding to the caller's thread of control.
1228n/a
1229n/a If the caller's thread of control was not created through the threading
1230n/a module, a dummy thread object with limited functionality is returned.
1231n/a
1232n/a """
1233n/a try:
1234n/a return _active[get_ident()]
1235n/a except KeyError:
1236n/a return _DummyThread()
1237n/a
1238n/acurrentThread = current_thread
1239n/a
1240n/adef active_count():
1241n/a """Return the number of Thread objects currently alive.
1242n/a
1243n/a The returned count is equal to the length of the list returned by
1244n/a enumerate().
1245n/a
1246n/a """
1247n/a with _active_limbo_lock:
1248n/a return len(_active) + len(_limbo)
1249n/a
1250n/aactiveCount = active_count
1251n/a
1252n/adef _enumerate():
1253n/a # Same as enumerate(), but without the lock. Internal use only.
1254n/a return list(_active.values()) + list(_limbo.values())
1255n/a
1256n/adef enumerate():
1257n/a """Return a list of all Thread objects currently alive.
1258n/a
1259n/a The list includes daemonic threads, dummy thread objects created by
1260n/a current_thread(), and the main thread. It excludes terminated threads and
1261n/a threads that have not yet been started.
1262n/a
1263n/a """
1264n/a with _active_limbo_lock:
1265n/a return list(_active.values()) + list(_limbo.values())
1266n/a
1267n/afrom _thread import stack_size
1268n/a
1269n/a# Create the main thread object,
1270n/a# and make it available for the interpreter
1271n/a# (Py_Main) as threading._shutdown.
1272n/a
1273n/a_main_thread = _MainThread()
1274n/a
1275n/adef _shutdown():
1276n/a # Obscure: other threads may be waiting to join _main_thread. That's
1277n/a # dubious, but some code does it. We can't wait for C code to release
1278n/a # the main thread's tstate_lock - that won't happen until the interpreter
1279n/a # is nearly dead. So we release it here. Note that just calling _stop()
1280n/a # isn't enough: other threads may already be waiting on _tstate_lock.
1281n/a tlock = _main_thread._tstate_lock
1282n/a # The main thread isn't finished yet, so its thread state lock can't have
1283n/a # been released.
1284n/a assert tlock is not None
1285n/a assert tlock.locked()
1286n/a tlock.release()
1287n/a _main_thread._stop()
1288n/a t = _pickSomeNonDaemonThread()
1289n/a while t:
1290n/a t.join()
1291n/a t = _pickSomeNonDaemonThread()
1292n/a _main_thread._delete()
1293n/a
1294n/adef _pickSomeNonDaemonThread():
1295n/a for t in enumerate():
1296n/a if not t.daemon and t.is_alive():
1297n/a return t
1298n/a return None
1299n/a
1300n/adef main_thread():
1301n/a """Return the main thread object.
1302n/a
1303n/a In normal conditions, the main thread is the thread from which the
1304n/a Python interpreter was started.
1305n/a """
1306n/a return _main_thread
1307n/a
1308n/a# get thread-local implementation, either from the thread
1309n/a# module, or from the python fallback
1310n/a
1311n/atry:
1312n/a from _thread import _local as local
1313n/aexcept ImportError:
1314n/a from _threading_local import local
1315n/a
1316n/a
1317n/adef _after_fork():
1318n/a # This function is called by Python/ceval.c:PyEval_ReInitThreads which
1319n/a # is called from PyOS_AfterFork. Here we cleanup threading module state
1320n/a # that should not exist after a fork.
1321n/a
1322n/a # Reset _active_limbo_lock, in case we forked while the lock was held
1323n/a # by another (non-forked) thread. http://bugs.python.org/issue874900
1324n/a global _active_limbo_lock, _main_thread
1325n/a _active_limbo_lock = _allocate_lock()
1326n/a
1327n/a # fork() only copied the current thread; clear references to others.
1328n/a new_active = {}
1329n/a current = current_thread()
1330n/a _main_thread = current
1331n/a with _active_limbo_lock:
1332n/a # Dangling thread instances must still have their locks reset,
1333n/a # because someone may join() them.
1334n/a threads = set(_enumerate())
1335n/a threads.update(_dangling)
1336n/a for thread in threads:
1337n/a # Any lock/condition variable may be currently locked or in an
1338n/a # invalid state, so we reinitialize them.
1339n/a if thread is current:
1340n/a # There is only one active thread. We reset the ident to
1341n/a # its new value since it can have changed.
1342n/a thread._reset_internal_locks(True)
1343n/a ident = get_ident()
1344n/a thread._ident = ident
1345n/a new_active[ident] = thread
1346n/a else:
1347n/a # All the others are already stopped.
1348n/a thread._reset_internal_locks(False)
1349n/a thread._stop()
1350n/a
1351n/a _limbo.clear()
1352n/a _active.clear()
1353n/a _active.update(new_active)
1354n/a assert len(_active) == 1