ยปCore Development>Code coverage>Lib/asyncio/queues.py

Python code coverage for Lib/asyncio/queues.py

#countcontent
1n/a"""Queues"""
2n/a
3n/a__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
4n/a
5n/aimport collections
6n/aimport heapq
7n/a
8n/afrom . import compat
9n/afrom . import events
10n/afrom . import locks
11n/afrom .coroutines import coroutine
12n/a
13n/a
14n/aclass QueueEmpty(Exception):
15n/a """Exception raised when Queue.get_nowait() is called on a Queue object
16n/a which is empty.
17n/a """
18n/a pass
19n/a
20n/a
21n/aclass QueueFull(Exception):
22n/a """Exception raised when the Queue.put_nowait() method is called on a Queue
23n/a object which is full.
24n/a """
25n/a pass
26n/a
27n/a
28n/aclass Queue:
29n/a """A queue, useful for coordinating producer and consumer coroutines.
30n/a
31n/a If maxsize is less than or equal to zero, the queue size is infinite. If it
32n/a is an integer greater than 0, then "yield from put()" will block when the
33n/a queue reaches maxsize, until an item is removed by get().
34n/a
35n/a Unlike the standard library Queue, you can reliably know this Queue's size
36n/a with qsize(), since your single-threaded asyncio application won't be
37n/a interrupted between calling qsize() and doing an operation on the Queue.
38n/a """
39n/a
40n/a def __init__(self, maxsize=0, *, loop=None):
41n/a if loop is None:
42n/a self._loop = events.get_event_loop()
43n/a else:
44n/a self._loop = loop
45n/a self._maxsize = maxsize
46n/a
47n/a # Futures.
48n/a self._getters = collections.deque()
49n/a # Futures.
50n/a self._putters = collections.deque()
51n/a self._unfinished_tasks = 0
52n/a self._finished = locks.Event(loop=self._loop)
53n/a self._finished.set()
54n/a self._init(maxsize)
55n/a
56n/a # These three are overridable in subclasses.
57n/a
58n/a def _init(self, maxsize):
59n/a self._queue = collections.deque()
60n/a
61n/a def _get(self):
62n/a return self._queue.popleft()
63n/a
64n/a def _put(self, item):
65n/a self._queue.append(item)
66n/a
67n/a # End of the overridable methods.
68n/a
69n/a def _wakeup_next(self, waiters):
70n/a # Wake up the next waiter (if any) that isn't cancelled.
71n/a while waiters:
72n/a waiter = waiters.popleft()
73n/a if not waiter.done():
74n/a waiter.set_result(None)
75n/a break
76n/a
77n/a def __repr__(self):
78n/a return '<{} at {:#x} {}>'.format(
79n/a type(self).__name__, id(self), self._format())
80n/a
81n/a def __str__(self):
82n/a return '<{} {}>'.format(type(self).__name__, self._format())
83n/a
84n/a def _format(self):
85n/a result = 'maxsize={!r}'.format(self._maxsize)
86n/a if getattr(self, '_queue', None):
87n/a result += ' _queue={!r}'.format(list(self._queue))
88n/a if self._getters:
89n/a result += ' _getters[{}]'.format(len(self._getters))
90n/a if self._putters:
91n/a result += ' _putters[{}]'.format(len(self._putters))
92n/a if self._unfinished_tasks:
93n/a result += ' tasks={}'.format(self._unfinished_tasks)
94n/a return result
95n/a
96n/a def qsize(self):
97n/a """Number of items in the queue."""
98n/a return len(self._queue)
99n/a
100n/a @property
101n/a def maxsize(self):
102n/a """Number of items allowed in the queue."""
103n/a return self._maxsize
104n/a
105n/a def empty(self):
106n/a """Return True if the queue is empty, False otherwise."""
107n/a return not self._queue
108n/a
109n/a def full(self):
110n/a """Return True if there are maxsize items in the queue.
111n/a
112n/a Note: if the Queue was initialized with maxsize=0 (the default),
113n/a then full() is never True.
114n/a """
115n/a if self._maxsize <= 0:
116n/a return False
117n/a else:
118n/a return self.qsize() >= self._maxsize
119n/a
120n/a @coroutine
121n/a def put(self, item):
122n/a """Put an item into the queue.
123n/a
124n/a Put an item into the queue. If the queue is full, wait until a free
125n/a slot is available before adding item.
126n/a
127n/a This method is a coroutine.
128n/a """
129n/a while self.full():
130n/a putter = self._loop.create_future()
131n/a self._putters.append(putter)
132n/a try:
133n/a yield from putter
134n/a except:
135n/a putter.cancel() # Just in case putter is not done yet.
136n/a if not self.full() and not putter.cancelled():
137n/a # We were woken up by get_nowait(), but can't take
138n/a # the call. Wake up the next in line.
139n/a self._wakeup_next(self._putters)
140n/a raise
141n/a return self.put_nowait(item)
142n/a
143n/a def put_nowait(self, item):
144n/a """Put an item into the queue without blocking.
145n/a
146n/a If no free slot is immediately available, raise QueueFull.
147n/a """
148n/a if self.full():
149n/a raise QueueFull
150n/a self._put(item)
151n/a self._unfinished_tasks += 1
152n/a self._finished.clear()
153n/a self._wakeup_next(self._getters)
154n/a
155n/a @coroutine
156n/a def get(self):
157n/a """Remove and return an item from the queue.
158n/a
159n/a If queue is empty, wait until an item is available.
160n/a
161n/a This method is a coroutine.
162n/a """
163n/a while self.empty():
164n/a getter = self._loop.create_future()
165n/a self._getters.append(getter)
166n/a try:
167n/a yield from getter
168n/a except:
169n/a getter.cancel() # Just in case getter is not done yet.
170n/a if not self.empty() and not getter.cancelled():
171n/a # We were woken up by put_nowait(), but can't take
172n/a # the call. Wake up the next in line.
173n/a self._wakeup_next(self._getters)
174n/a raise
175n/a return self.get_nowait()
176n/a
177n/a def get_nowait(self):
178n/a """Remove and return an item from the queue.
179n/a
180n/a Return an item if one is immediately available, else raise QueueEmpty.
181n/a """
182n/a if self.empty():
183n/a raise QueueEmpty
184n/a item = self._get()
185n/a self._wakeup_next(self._putters)
186n/a return item
187n/a
188n/a def task_done(self):
189n/a """Indicate that a formerly enqueued task is complete.
190n/a
191n/a Used by queue consumers. For each get() used to fetch a task,
192n/a a subsequent call to task_done() tells the queue that the processing
193n/a on the task is complete.
194n/a
195n/a If a join() is currently blocking, it will resume when all items have
196n/a been processed (meaning that a task_done() call was received for every
197n/a item that had been put() into the queue).
198n/a
199n/a Raises ValueError if called more times than there were items placed in
200n/a the queue.
201n/a """
202n/a if self._unfinished_tasks <= 0:
203n/a raise ValueError('task_done() called too many times')
204n/a self._unfinished_tasks -= 1
205n/a if self._unfinished_tasks == 0:
206n/a self._finished.set()
207n/a
208n/a @coroutine
209n/a def join(self):
210n/a """Block until all items in the queue have been gotten and processed.
211n/a
212n/a The count of unfinished tasks goes up whenever an item is added to the
213n/a queue. The count goes down whenever a consumer calls task_done() to
214n/a indicate that the item was retrieved and all work on it is complete.
215n/a When the count of unfinished tasks drops to zero, join() unblocks.
216n/a """
217n/a if self._unfinished_tasks > 0:
218n/a yield from self._finished.wait()
219n/a
220n/a
221n/aclass PriorityQueue(Queue):
222n/a """A subclass of Queue; retrieves entries in priority order (lowest first).
223n/a
224n/a Entries are typically tuples of the form: (priority number, data).
225n/a """
226n/a
227n/a def _init(self, maxsize):
228n/a self._queue = []
229n/a
230n/a def _put(self, item, heappush=heapq.heappush):
231n/a heappush(self._queue, item)
232n/a
233n/a def _get(self, heappop=heapq.heappop):
234n/a return heappop(self._queue)
235n/a
236n/a
237n/aclass LifoQueue(Queue):
238n/a """A subclass of Queue that retrieves most recently added entries first."""
239n/a
240n/a def _init(self, maxsize):
241n/a self._queue = []
242n/a
243n/a def _put(self, item):
244n/a self._queue.append(item)
245n/a
246n/a def _get(self):
247n/a return self._queue.pop()
248n/a
249n/a
250n/aif not compat.PY35:
251n/a JoinableQueue = Queue
252n/a """Deprecated alias for Queue."""
253n/a __all__.append('JoinableQueue')