ยปCore Development>Code coverage>Lib/Queue.py

Python code coverage for Lib/Queue.py

#countcontent
11"""A multi-producer, multi-consumer queue."""
2n/a
31from time import time as _time
41try:
51 import threading as _threading
60except ImportError:
70 import dummy_threading as _threading
81from collections import deque
91import heapq
10n/a
111__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
12n/a
132class Empty(Exception):
141 "Exception raised by Queue.get(block=0)/get_nowait()."
151 pass
16n/a
172class Full(Exception):
181 "Exception raised by Queue.put(block=0)/put_nowait()."
191 pass
20n/a
212class Queue:
22n/a """Create a queue object with a given maximum size.
23n/a
24n/a If maxsize is <= 0, the queue size is infinite.
251 """
261 def __init__(self, maxsize=0):
27151 self.maxsize = maxsize
28151 self._init(maxsize)
29n/a # mutex must be held whenever the queue is mutating. All methods
30n/a # that acquire mutex must release it before returning. mutex
31n/a # is shared between the three conditions, so acquiring and
32n/a # releasing the conditions also acquires and releases mutex.
33151 self.mutex = _threading.Lock()
34n/a # Notify not_empty whenever an item is added to the queue; a
35n/a # thread waiting to get is notified then.
36151 self.not_empty = _threading.Condition(self.mutex)
37n/a # Notify not_full whenever an item is removed from the queue;
38n/a # a thread waiting to put is notified then.
39151 self.not_full = _threading.Condition(self.mutex)
40n/a # Notify all_tasks_done whenever the number of unfinished tasks
41n/a # drops to zero; thread waiting to join() is notified to resume
42151 self.all_tasks_done = _threading.Condition(self.mutex)
43151 self.unfinished_tasks = 0
44n/a
451 def task_done(self):
46n/a """Indicate that a formerly enqueued task is complete.
47n/a
48n/a Used by Queue consumer threads. For each get() used to fetch a task,
49n/a a subsequent call to task_done() tells the queue that the processing
50n/a on the task is complete.
51n/a
52n/a If a join() is currently blocking, it will resume when all items
53n/a have been processed (meaning that a task_done() call was received
54n/a for every item that had been put() into the queue).
55n/a
56n/a Raises a ValueError if called more times than there were items
57n/a placed in the queue.
58n/a """
59671 self.all_tasks_done.acquire()
60671 try:
61671 unfinished = self.unfinished_tasks - 1
62671 if unfinished <= 0:
6375 if unfinished < 0:
646 raise ValueError('task_done() called too many times')
6569 self.all_tasks_done.notify_all()
66665 self.unfinished_tasks = unfinished
67n/a finally:
68671 self.all_tasks_done.release()
69n/a
701 def join(self):
71n/a """Blocks until all items in the Queue have been gotten and processed.
72n/a
73n/a The count of unfinished tasks goes up whenever an item is added to the
74n/a queue. The count goes down whenever a consumer thread calls task_done()
75n/a to indicate the item was retrieved and all work on it is complete.
76n/a
77n/a When the count of unfinished tasks drops to zero, join() unblocks.
78n/a """
7956 self.all_tasks_done.acquire()
8056 try:
81110 while self.unfinished_tasks:
8254 self.all_tasks_done.wait()
83n/a finally:
8456 self.all_tasks_done.release()
85n/a
861 def qsize(self):
87n/a """Return the approximate size of the queue (not reliable!)."""
8819 self.mutex.acquire()
8919 n = self._qsize()
9019 self.mutex.release()
9119 return n
92n/a
931 def empty(self):
94n/a """Return True if the queue is empty, False otherwise (not reliable!)."""
95123 self.mutex.acquire()
96123 n = not self._qsize()
97123 self.mutex.release()
98123 return n
99n/a
1001 def full(self):
101n/a """Return True if the queue is full, False otherwise (not reliable!)."""
10223 self.mutex.acquire()
10323 n = 0 < self.maxsize == self._qsize()
10423 self.mutex.release()
10523 return n
106n/a
1071 def put(self, item, block=True, timeout=None):
108n/a """Put an item into the queue.
109n/a
110n/a If optional args 'block' is true and 'timeout' is None (the default),
111n/a block if necessary until a free slot is available. If 'timeout' is
112n/a a positive number, it blocks at most 'timeout' seconds and raises
113n/a the Full exception if no free slot was available within that time.
114n/a Otherwise ('block' is false), put an item on the queue if a free slot
115n/a is immediately available, else raise the Full exception ('timeout'
116n/a is ignored in that case).
117n/a """
1183134 self.not_full.acquire()
1193134 try:
1203134 if self.maxsize > 0:
121138 if not block:
12215 if self._qsize() == self.maxsize:
12310 raise Full
124123 elif timeout is None:
125117 while self._qsize() == self.maxsize:
12612 self.not_full.wait()
12718 elif timeout < 0:
1280 raise ValueError("'timeout' must be a positive number")
129n/a else:
13018 endtime = _time() + timeout
13134 while self._qsize() == self.maxsize:
13224 remaining = endtime - _time()
13324 if remaining <= 0.0:
1348 raise Full
13516 self.not_full.wait(remaining)
1363116 self._put(item)
1373108 self.unfinished_tasks += 1
1383108 self.not_empty.notify()
139n/a finally:
1403134 self.not_full.release()
141n/a
1421 def put_nowait(self, item):
143n/a """Put an item into the queue without blocking.
144n/a
145n/a Only enqueue the item if a free slot is immediately available.
146n/a Otherwise raise the Full exception.
147n/a """
1482 return self.put(item, False)
149n/a
1501 def get(self, block=True, timeout=None):
151n/a """Remove and return an item from the queue.
152n/a
153n/a If optional args 'block' is true and 'timeout' is None (the default),
154n/a block if necessary until an item is available. If 'timeout' is
155n/a a positive number, it blocks at most 'timeout' seconds and raises
156n/a the Empty exception if no item was available within that time.
157n/a Otherwise ('block' is false), return an item if one is immediately
158n/a available, else raise the Empty exception ('timeout' is ignored
159n/a in that case).
160n/a """
1613121 self.not_empty.acquire()
1623121 try:
1633121 if not block:
16412 if not self._qsize():
16511 raise Empty
1663109 elif timeout is None:
1673507 while not self._qsize():
168458 self.not_empty.wait()
16960 elif timeout < 0:
1700 raise ValueError("'timeout' must be a positive number")
171n/a else:
17260 endtime = _time() + timeout
17374 while not self._qsize():
17422 remaining = endtime - _time()
17522 if remaining <= 0.0:
1768 raise Empty
17714 self.not_empty.wait(remaining)
1783102 item = self._get()
1793096 self.not_full.notify()
1803096 return item
181n/a finally:
1823121 self.not_empty.release()
183n/a
1841 def get_nowait(self):
185n/a """Remove and return an item from the queue without blocking.
186n/a
187n/a Only get an item if one is immediately available. Otherwise
188n/a raise the Empty exception.
189n/a """
1902 return self.get(False)
191n/a
192n/a # Override these methods to implement other queue organizations
193n/a # (e.g. stack or priority queue).
194n/a # These will only be called with appropriate locks held
195n/a
196n/a # Initialize the queue representation
1971 def _init(self, maxsize):
198145 self.queue = deque()
199n/a
2001 def _qsize(self, len=len):
2013321 return len(self.queue)
202n/a
203n/a # Put a new item in the queue
2041 def _put(self, item):
2052652 self.queue.append(item)
206n/a
207n/a # Get an item from the queue
2081 def _get(self):
2092640 return self.queue.popleft()
210n/a
211n/a
2122class PriorityQueue(Queue):
213n/a '''Variant of Queue that retrieves open entries in priority order (lowest first).
214n/a
215n/a Entries are typically tuples of the form: (priority number, data).
2161 '''
217n/a
2181 def _init(self, maxsize):
2193 self.queue = []
220n/a
2211 def _qsize(self, len=len):
222298 return len(self.queue)
223n/a
2241 def _put(self, item, heappush=heapq.heappush):
225228 heappush(self.queue, item)
226n/a
2271 def _get(self, heappop=heapq.heappop):
228228 return heappop(self.queue)
229n/a
230n/a
2312class LifoQueue(Queue):
2321 '''Variant of Queue that retrieves most recently added entries first.'''
233n/a
2341 def _init(self, maxsize):
2353 self.queue = []
236n/a
2371 def _qsize(self, len=len):
238305 return len(self.queue)
239n/a
2401 def _put(self, item):
241228 self.queue.append(item)
242n/a
2431 def _get(self):
244228 return self.queue.pop()