ยปCore Development>Code coverage>Lib/queue.py

Python code coverage for Lib/queue.py

#countcontent
1n/a'''A multi-producer, multi-consumer queue.'''
2n/a
3n/atry:
4n/a import threading
5n/aexcept ImportError:
6n/a import dummy_threading as threading
7n/afrom collections import deque
8n/afrom heapq import heappush, heappop
9n/afrom time import monotonic as time
10n/a
11n/a__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
12n/a
13n/aclass Empty(Exception):
14n/a 'Exception raised by Queue.get(block=0)/get_nowait().'
15n/a pass
16n/a
17n/aclass Full(Exception):
18n/a 'Exception raised by Queue.put(block=0)/put_nowait().'
19n/a pass
20n/a
21n/aclass Queue:
22n/a '''Create a queue object with a given maximum size.
23n/a
24n/a If maxsize is <= 0, the queue size is infinite.
25n/a '''
26n/a
27n/a def __init__(self, maxsize=0):
28n/a self.maxsize = maxsize
29n/a self._init(maxsize)
30n/a
31n/a # mutex must be held whenever the queue is mutating. All methods
32n/a # that acquire mutex must release it before returning. mutex
33n/a # is shared between the three conditions, so acquiring and
34n/a # releasing the conditions also acquires and releases mutex.
35n/a self.mutex = threading.Lock()
36n/a
37n/a # Notify not_empty whenever an item is added to the queue; a
38n/a # thread waiting to get is notified then.
39n/a self.not_empty = threading.Condition(self.mutex)
40n/a
41n/a # Notify not_full whenever an item is removed from the queue;
42n/a # a thread waiting to put is notified then.
43n/a self.not_full = threading.Condition(self.mutex)
44n/a
45n/a # Notify all_tasks_done whenever the number of unfinished tasks
46n/a # drops to zero; thread waiting to join() is notified to resume
47n/a self.all_tasks_done = threading.Condition(self.mutex)
48n/a self.unfinished_tasks = 0
49n/a
50n/a def task_done(self):
51n/a '''Indicate that a formerly enqueued task is complete.
52n/a
53n/a Used by Queue consumer threads. For each get() used to fetch a task,
54n/a a subsequent call to task_done() tells the queue that the processing
55n/a on the task is complete.
56n/a
57n/a If a join() is currently blocking, it will resume when all items
58n/a have been processed (meaning that a task_done() call was received
59n/a for every item that had been put() into the queue).
60n/a
61n/a Raises a ValueError if called more times than there were items
62n/a placed in the queue.
63n/a '''
64n/a with self.all_tasks_done:
65n/a unfinished = self.unfinished_tasks - 1
66n/a if unfinished <= 0:
67n/a if unfinished < 0:
68n/a raise ValueError('task_done() called too many times')
69n/a self.all_tasks_done.notify_all()
70n/a self.unfinished_tasks = unfinished
71n/a
72n/a def join(self):
73n/a '''Blocks until all items in the Queue have been gotten and processed.
74n/a
75n/a The count of unfinished tasks goes up whenever an item is added to the
76n/a queue. The count goes down whenever a consumer thread calls task_done()
77n/a to indicate the item was retrieved and all work on it is complete.
78n/a
79n/a When the count of unfinished tasks drops to zero, join() unblocks.
80n/a '''
81n/a with self.all_tasks_done:
82n/a while self.unfinished_tasks:
83n/a self.all_tasks_done.wait()
84n/a
85n/a def qsize(self):
86n/a '''Return the approximate size of the queue (not reliable!).'''
87n/a with self.mutex:
88n/a return self._qsize()
89n/a
90n/a def empty(self):
91n/a '''Return True if the queue is empty, False otherwise (not reliable!).
92n/a
93n/a This method is likely to be removed at some point. Use qsize() == 0
94n/a as a direct substitute, but be aware that either approach risks a race
95n/a condition where a queue can grow before the result of empty() or
96n/a qsize() can be used.
97n/a
98n/a To create code that needs to wait for all queued tasks to be
99n/a completed, the preferred technique is to use the join() method.
100n/a '''
101n/a with self.mutex:
102n/a return not self._qsize()
103n/a
104n/a def full(self):
105n/a '''Return True if the queue is full, False otherwise (not reliable!).
106n/a
107n/a This method is likely to be removed at some point. Use qsize() >= n
108n/a as a direct substitute, but be aware that either approach risks a race
109n/a condition where a queue can shrink before the result of full() or
110n/a qsize() can be used.
111n/a '''
112n/a with self.mutex:
113n/a return 0 < self.maxsize <= self._qsize()
114n/a
115n/a def put(self, item, block=True, timeout=None):
116n/a '''Put an item into the queue.
117n/a
118n/a If optional args 'block' is true and 'timeout' is None (the default),
119n/a block if necessary until a free slot is available. If 'timeout' is
120n/a a non-negative number, it blocks at most 'timeout' seconds and raises
121n/a the Full exception if no free slot was available within that time.
122n/a Otherwise ('block' is false), put an item on the queue if a free slot
123n/a is immediately available, else raise the Full exception ('timeout'
124n/a is ignored in that case).
125n/a '''
126n/a with self.not_full:
127n/a if self.maxsize > 0:
128n/a if not block:
129n/a if self._qsize() >= self.maxsize:
130n/a raise Full
131n/a elif timeout is None:
132n/a while self._qsize() >= self.maxsize:
133n/a self.not_full.wait()
134n/a elif timeout < 0:
135n/a raise ValueError("'timeout' must be a non-negative number")
136n/a else:
137n/a endtime = time() + timeout
138n/a while self._qsize() >= self.maxsize:
139n/a remaining = endtime - time()
140n/a if remaining <= 0.0:
141n/a raise Full
142n/a self.not_full.wait(remaining)
143n/a self._put(item)
144n/a self.unfinished_tasks += 1
145n/a self.not_empty.notify()
146n/a
147n/a def get(self, block=True, timeout=None):
148n/a '''Remove and return an item from the queue.
149n/a
150n/a If optional args 'block' is true and 'timeout' is None (the default),
151n/a block if necessary until an item is available. If 'timeout' is
152n/a a non-negative number, it blocks at most 'timeout' seconds and raises
153n/a the Empty exception if no item was available within that time.
154n/a Otherwise ('block' is false), return an item if one is immediately
155n/a available, else raise the Empty exception ('timeout' is ignored
156n/a in that case).
157n/a '''
158n/a with self.not_empty:
159n/a if not block:
160n/a if not self._qsize():
161n/a raise Empty
162n/a elif timeout is None:
163n/a while not self._qsize():
164n/a self.not_empty.wait()
165n/a elif timeout < 0:
166n/a raise ValueError("'timeout' must be a non-negative number")
167n/a else:
168n/a endtime = time() + timeout
169n/a while not self._qsize():
170n/a remaining = endtime - time()
171n/a if remaining <= 0.0:
172n/a raise Empty
173n/a self.not_empty.wait(remaining)
174n/a item = self._get()
175n/a self.not_full.notify()
176n/a return item
177n/a
178n/a def put_nowait(self, item):
179n/a '''Put an item into the queue without blocking.
180n/a
181n/a Only enqueue the item if a free slot is immediately available.
182n/a Otherwise raise the Full exception.
183n/a '''
184n/a return self.put(item, block=False)
185n/a
186n/a def get_nowait(self):
187n/a '''Remove and return an item from the queue without blocking.
188n/a
189n/a Only get an item if one is immediately available. Otherwise
190n/a raise the Empty exception.
191n/a '''
192n/a return self.get(block=False)
193n/a
194n/a # Override these methods to implement other queue organizations
195n/a # (e.g. stack or priority queue).
196n/a # These will only be called with appropriate locks held
197n/a
198n/a # Initialize the queue representation
199n/a def _init(self, maxsize):
200n/a self.queue = deque()
201n/a
202n/a def _qsize(self):
203n/a return len(self.queue)
204n/a
205n/a # Put a new item in the queue
206n/a def _put(self, item):
207n/a self.queue.append(item)
208n/a
209n/a # Get an item from the queue
210n/a def _get(self):
211n/a return self.queue.popleft()
212n/a
213n/a
214n/aclass PriorityQueue(Queue):
215n/a '''Variant of Queue that retrieves open entries in priority order (lowest first).
216n/a
217n/a Entries are typically tuples of the form: (priority number, data).
218n/a '''
219n/a
220n/a def _init(self, maxsize):
221n/a self.queue = []
222n/a
223n/a def _qsize(self):
224n/a return len(self.queue)
225n/a
226n/a def _put(self, item):
227n/a heappush(self.queue, item)
228n/a
229n/a def _get(self):
230n/a return heappop(self.queue)
231n/a
232n/a
233n/aclass LifoQueue(Queue):
234n/a '''Variant of Queue that retrieves most recently added entries first.'''
235n/a
236n/a def _init(self, maxsize):
237n/a self.queue = []
238n/a
239n/a def _qsize(self):
240n/a return len(self.queue)
241n/a
242n/a def _put(self, item):
243n/a self.queue.append(item)
244n/a
245n/a def _get(self):
246n/a return self.queue.pop()