ยปCore Development>Code coverage>Lib/sched.py

Python code coverage for Lib/sched.py

#countcontent
1n/a"""A generally useful event scheduler class.
2n/a
3n/aEach instance of this class manages its own queue.
4n/aNo multi-threading is implied; you are supposed to hack that
5n/ayourself, or use a single instance per application.
6n/a
7n/aEach instance is parametrized with two functions, one that is
8n/asupposed to return the current time, one that is supposed to
9n/aimplement a delay. You can implement real-time scheduling by
10n/asubstituting time and sleep from built-in module time, or you can
11n/aimplement simulated time by writing your own functions. This can
12n/aalso be used to integrate scheduling with STDWIN events; the delay
13n/afunction is allowed to modify the queue. Time can be expressed as
14n/aintegers or floating point numbers, as long as it is consistent.
15n/a
16n/aEvents are specified by tuples (time, priority, action, argument, kwargs).
17n/aAs in UNIX, lower priority numbers mean higher priority; in this
18n/away the queue can be maintained as a priority queue. Execution of the
19n/aevent means calling the action function, passing it the argument
20n/asequence in "argument" (remember that in Python, multiple function
21n/aarguments are be packed in a sequence) and keyword parameters in "kwargs".
22n/aThe action function may be an instance method so it
23n/ahas another way to reference private data (besides global variables).
24n/a"""
25n/a
26n/aimport time
27n/aimport heapq
28n/afrom collections import namedtuple
29n/atry:
30n/a import threading
31n/aexcept ImportError:
32n/a import dummy_threading as threading
33n/afrom time import monotonic as _time
34n/a
35n/a__all__ = ["scheduler"]
36n/a
37n/aclass Event(namedtuple('Event', 'time, priority, action, argument, kwargs')):
38n/a __slots__ = []
39n/a def __eq__(s, o): return (s.time, s.priority) == (o.time, o.priority)
40n/a def __lt__(s, o): return (s.time, s.priority) < (o.time, o.priority)
41n/a def __le__(s, o): return (s.time, s.priority) <= (o.time, o.priority)
42n/a def __gt__(s, o): return (s.time, s.priority) > (o.time, o.priority)
43n/a def __ge__(s, o): return (s.time, s.priority) >= (o.time, o.priority)
44n/a
45n/aEvent.time.__doc__ = ('''Numeric type compatible with the return value of the
46n/atimefunc function passed to the constructor.''')
47n/aEvent.priority.__doc__ = ('''Events scheduled for the same time will be executed
48n/ain the order of their priority.''')
49n/aEvent.action.__doc__ = ('''Executing the event means executing
50n/aaction(*argument, **kwargs)''')
51n/aEvent.argument.__doc__ = ('''argument is a sequence holding the positional
52n/aarguments for the action.''')
53n/aEvent.kwargs.__doc__ = ('''kwargs is a dictionary holding the keyword
54n/aarguments for the action.''')
55n/a
56n/a_sentinel = object()
57n/a
58n/aclass scheduler:
59n/a
60n/a def __init__(self, timefunc=_time, delayfunc=time.sleep):
61n/a """Initialize a new instance, passing the time and delay
62n/a functions"""
63n/a self._queue = []
64n/a self._lock = threading.RLock()
65n/a self.timefunc = timefunc
66n/a self.delayfunc = delayfunc
67n/a
68n/a def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
69n/a """Enter a new event in the queue at an absolute time.
70n/a
71n/a Returns an ID for the event which can be used to remove it,
72n/a if necessary.
73n/a
74n/a """
75n/a if kwargs is _sentinel:
76n/a kwargs = {}
77n/a event = Event(time, priority, action, argument, kwargs)
78n/a with self._lock:
79n/a heapq.heappush(self._queue, event)
80n/a return event # The ID
81n/a
82n/a def enter(self, delay, priority, action, argument=(), kwargs=_sentinel):
83n/a """A variant that specifies the time as a relative time.
84n/a
85n/a This is actually the more commonly used interface.
86n/a
87n/a """
88n/a time = self.timefunc() + delay
89n/a return self.enterabs(time, priority, action, argument, kwargs)
90n/a
91n/a def cancel(self, event):
92n/a """Remove an event from the queue.
93n/a
94n/a This must be presented the ID as returned by enter().
95n/a If the event is not in the queue, this raises ValueError.
96n/a
97n/a """
98n/a with self._lock:
99n/a self._queue.remove(event)
100n/a heapq.heapify(self._queue)
101n/a
102n/a def empty(self):
103n/a """Check whether the queue is empty."""
104n/a with self._lock:
105n/a return not self._queue
106n/a
107n/a def run(self, blocking=True):
108n/a """Execute events until the queue is empty.
109n/a If blocking is False executes the scheduled events due to
110n/a expire soonest (if any) and then return the deadline of the
111n/a next scheduled call in the scheduler.
112n/a
113n/a When there is a positive delay until the first event, the
114n/a delay function is called and the event is left in the queue;
115n/a otherwise, the event is removed from the queue and executed
116n/a (its action function is called, passing it the argument). If
117n/a the delay function returns prematurely, it is simply
118n/a restarted.
119n/a
120n/a It is legal for both the delay function and the action
121n/a function to modify the queue or to raise an exception;
122n/a exceptions are not caught but the scheduler's state remains
123n/a well-defined so run() may be called again.
124n/a
125n/a A questionable hack is added to allow other threads to run:
126n/a just after an event is executed, a delay of 0 is executed, to
127n/a avoid monopolizing the CPU when other threads are also
128n/a runnable.
129n/a
130n/a """
131n/a # localize variable access to minimize overhead
132n/a # and to improve thread safety
133n/a lock = self._lock
134n/a q = self._queue
135n/a delayfunc = self.delayfunc
136n/a timefunc = self.timefunc
137n/a pop = heapq.heappop
138n/a while True:
139n/a with lock:
140n/a if not q:
141n/a break
142n/a time, priority, action, argument, kwargs = q[0]
143n/a now = timefunc()
144n/a if time > now:
145n/a delay = True
146n/a else:
147n/a delay = False
148n/a pop(q)
149n/a if delay:
150n/a if not blocking:
151n/a return time - now
152n/a delayfunc(time - now)
153n/a else:
154n/a action(*argument, **kwargs)
155n/a delayfunc(0) # Let other threads run
156n/a
157n/a @property
158n/a def queue(self):
159n/a """An ordered list of upcoming events.
160n/a
161n/a Events are named tuples with fields for:
162n/a time, priority, action, arguments, kwargs
163n/a
164n/a """
165n/a # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
166n/a # With heapq, two events scheduled at the same time will show in
167n/a # the actual order they would be retrieved.
168n/a with self._lock:
169n/a events = self._queue[:]
170n/a return list(map(heapq.heappop, [events]*len(events)))