ยปCore Development>Code coverage>Lib/multiprocessing/queues.py

Python code coverage for Lib/multiprocessing/queues.py

#countcontent
1n/a#
2n/a# Module implementing queues
3n/a#
4n/a# multiprocessing/queues.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/a__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11n/a
12n/aimport sys
13n/aimport os
14n/aimport threading
15n/aimport collections
16n/aimport time
17n/aimport weakref
18n/aimport errno
19n/a
20n/afrom queue import Empty, Full
21n/a
22n/aimport _multiprocessing
23n/a
24n/afrom . import connection
25n/afrom . import context
26n/a_ForkingPickler = context.reduction.ForkingPickler
27n/a
28n/afrom .util import debug, info, Finalize, register_after_fork, is_exiting
29n/a
30n/a#
31n/a# Queue type using a pipe, buffer and thread
32n/a#
33n/a
34n/aclass Queue(object):
35n/a
36n/a def __init__(self, maxsize=0, *, ctx):
37n/a if maxsize <= 0:
38n/a # Can raise ImportError (see issues #3770 and #23400)
39n/a from .synchronize import SEM_VALUE_MAX as maxsize
40n/a self._maxsize = maxsize
41n/a self._reader, self._writer = connection.Pipe(duplex=False)
42n/a self._rlock = ctx.Lock()
43n/a self._opid = os.getpid()
44n/a if sys.platform == 'win32':
45n/a self._wlock = None
46n/a else:
47n/a self._wlock = ctx.Lock()
48n/a self._sem = ctx.BoundedSemaphore(maxsize)
49n/a # For use by concurrent.futures
50n/a self._ignore_epipe = False
51n/a
52n/a self._after_fork()
53n/a
54n/a if sys.platform != 'win32':
55n/a register_after_fork(self, Queue._after_fork)
56n/a
57n/a def __getstate__(self):
58n/a context.assert_spawning(self)
59n/a return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60n/a self._rlock, self._wlock, self._sem, self._opid)
61n/a
62n/a def __setstate__(self, state):
63n/a (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64n/a self._rlock, self._wlock, self._sem, self._opid) = state
65n/a self._after_fork()
66n/a
67n/a def _after_fork(self):
68n/a debug('Queue._after_fork()')
69n/a self._notempty = threading.Condition(threading.Lock())
70n/a self._buffer = collections.deque()
71n/a self._thread = None
72n/a self._jointhread = None
73n/a self._joincancelled = False
74n/a self._closed = False
75n/a self._close = None
76n/a self._send_bytes = self._writer.send_bytes
77n/a self._recv_bytes = self._reader.recv_bytes
78n/a self._poll = self._reader.poll
79n/a
80n/a def put(self, obj, block=True, timeout=None):
81n/a assert not self._closed
82n/a if not self._sem.acquire(block, timeout):
83n/a raise Full
84n/a
85n/a with self._notempty:
86n/a if self._thread is None:
87n/a self._start_thread()
88n/a self._buffer.append(obj)
89n/a self._notempty.notify()
90n/a
91n/a def get(self, block=True, timeout=None):
92n/a if block and timeout is None:
93n/a with self._rlock:
94n/a res = self._recv_bytes()
95n/a self._sem.release()
96n/a else:
97n/a if block:
98n/a deadline = time.time() + timeout
99n/a if not self._rlock.acquire(block, timeout):
100n/a raise Empty
101n/a try:
102n/a if block:
103n/a timeout = deadline - time.time()
104n/a if timeout < 0 or not self._poll(timeout):
105n/a raise Empty
106n/a elif not self._poll():
107n/a raise Empty
108n/a res = self._recv_bytes()
109n/a self._sem.release()
110n/a finally:
111n/a self._rlock.release()
112n/a # unserialize the data after having released the lock
113n/a return _ForkingPickler.loads(res)
114n/a
115n/a def qsize(self):
116n/a # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
117n/a return self._maxsize - self._sem._semlock._get_value()
118n/a
119n/a def empty(self):
120n/a return not self._poll()
121n/a
122n/a def full(self):
123n/a return self._sem._semlock._is_zero()
124n/a
125n/a def get_nowait(self):
126n/a return self.get(False)
127n/a
128n/a def put_nowait(self, obj):
129n/a return self.put(obj, False)
130n/a
131n/a def close(self):
132n/a self._closed = True
133n/a try:
134n/a self._reader.close()
135n/a finally:
136n/a close = self._close
137n/a if close:
138n/a self._close = None
139n/a close()
140n/a
141n/a def join_thread(self):
142n/a debug('Queue.join_thread()')
143n/a assert self._closed
144n/a if self._jointhread:
145n/a self._jointhread()
146n/a
147n/a def cancel_join_thread(self):
148n/a debug('Queue.cancel_join_thread()')
149n/a self._joincancelled = True
150n/a try:
151n/a self._jointhread.cancel()
152n/a except AttributeError:
153n/a pass
154n/a
155n/a def _start_thread(self):
156n/a debug('Queue._start_thread()')
157n/a
158n/a # Start thread which transfers data from buffer to pipe
159n/a self._buffer.clear()
160n/a self._thread = threading.Thread(
161n/a target=Queue._feed,
162n/a args=(self._buffer, self._notempty, self._send_bytes,
163n/a self._wlock, self._writer.close, self._ignore_epipe),
164n/a name='QueueFeederThread'
165n/a )
166n/a self._thread.daemon = True
167n/a
168n/a debug('doing self._thread.start()')
169n/a self._thread.start()
170n/a debug('... done self._thread.start()')
171n/a
172n/a # On process exit we will wait for data to be flushed to pipe.
173n/a #
174n/a # However, if this process created the queue then all
175n/a # processes which use the queue will be descendants of this
176n/a # process. Therefore waiting for the queue to be flushed
177n/a # is pointless once all the child processes have been joined.
178n/a created_by_this_process = (self._opid == os.getpid())
179n/a if not self._joincancelled and not created_by_this_process:
180n/a self._jointhread = Finalize(
181n/a self._thread, Queue._finalize_join,
182n/a [weakref.ref(self._thread)],
183n/a exitpriority=-5
184n/a )
185n/a
186n/a # Send sentinel to the thread queue object when garbage collected
187n/a self._close = Finalize(
188n/a self, Queue._finalize_close,
189n/a [self._buffer, self._notempty],
190n/a exitpriority=10
191n/a )
192n/a
193n/a @staticmethod
194n/a def _finalize_join(twr):
195n/a debug('joining queue thread')
196n/a thread = twr()
197n/a if thread is not None:
198n/a thread.join()
199n/a debug('... queue thread joined')
200n/a else:
201n/a debug('... queue thread already dead')
202n/a
203n/a @staticmethod
204n/a def _finalize_close(buffer, notempty):
205n/a debug('telling queue thread to quit')
206n/a with notempty:
207n/a buffer.append(_sentinel)
208n/a notempty.notify()
209n/a
210n/a @staticmethod
211n/a def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
212n/a debug('starting thread to feed data to pipe')
213n/a nacquire = notempty.acquire
214n/a nrelease = notempty.release
215n/a nwait = notempty.wait
216n/a bpopleft = buffer.popleft
217n/a sentinel = _sentinel
218n/a if sys.platform != 'win32':
219n/a wacquire = writelock.acquire
220n/a wrelease = writelock.release
221n/a else:
222n/a wacquire = None
223n/a
224n/a try:
225n/a while 1:
226n/a nacquire()
227n/a try:
228n/a if not buffer:
229n/a nwait()
230n/a finally:
231n/a nrelease()
232n/a try:
233n/a while 1:
234n/a obj = bpopleft()
235n/a if obj is sentinel:
236n/a debug('feeder thread got sentinel -- exiting')
237n/a close()
238n/a return
239n/a
240n/a # serialize the data before acquiring the lock
241n/a obj = _ForkingPickler.dumps(obj)
242n/a if wacquire is None:
243n/a send_bytes(obj)
244n/a else:
245n/a wacquire()
246n/a try:
247n/a send_bytes(obj)
248n/a finally:
249n/a wrelease()
250n/a except IndexError:
251n/a pass
252n/a except Exception as e:
253n/a if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
254n/a return
255n/a # Since this runs in a daemon thread the resources it uses
256n/a # may be become unusable while the process is cleaning up.
257n/a # We ignore errors which happen after the process has
258n/a # started to cleanup.
259n/a try:
260n/a if is_exiting():
261n/a info('error in queue thread: %s', e)
262n/a else:
263n/a import traceback
264n/a traceback.print_exc()
265n/a except Exception:
266n/a pass
267n/a
268n/a_sentinel = object()
269n/a
270n/a#
271n/a# A queue type which also supports join() and task_done() methods
272n/a#
273n/a# Note that if you do not call task_done() for each finished task then
274n/a# eventually the counter's semaphore may overflow causing Bad Things
275n/a# to happen.
276n/a#
277n/a
278n/aclass JoinableQueue(Queue):
279n/a
280n/a def __init__(self, maxsize=0, *, ctx):
281n/a Queue.__init__(self, maxsize, ctx=ctx)
282n/a self._unfinished_tasks = ctx.Semaphore(0)
283n/a self._cond = ctx.Condition()
284n/a
285n/a def __getstate__(self):
286n/a return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
287n/a
288n/a def __setstate__(self, state):
289n/a Queue.__setstate__(self, state[:-2])
290n/a self._cond, self._unfinished_tasks = state[-2:]
291n/a
292n/a def put(self, obj, block=True, timeout=None):
293n/a assert not self._closed
294n/a if not self._sem.acquire(block, timeout):
295n/a raise Full
296n/a
297n/a with self._notempty, self._cond:
298n/a if self._thread is None:
299n/a self._start_thread()
300n/a self._buffer.append(obj)
301n/a self._unfinished_tasks.release()
302n/a self._notempty.notify()
303n/a
304n/a def task_done(self):
305n/a with self._cond:
306n/a if not self._unfinished_tasks.acquire(False):
307n/a raise ValueError('task_done() called too many times')
308n/a if self._unfinished_tasks._semlock._is_zero():
309n/a self._cond.notify_all()
310n/a
311n/a def join(self):
312n/a with self._cond:
313n/a if not self._unfinished_tasks._semlock._is_zero():
314n/a self._cond.wait()
315n/a
316n/a#
317n/a# Simplified Queue type -- really just a locked pipe
318n/a#
319n/a
320n/aclass SimpleQueue(object):
321n/a
322n/a def __init__(self, *, ctx):
323n/a self._reader, self._writer = connection.Pipe(duplex=False)
324n/a self._rlock = ctx.Lock()
325n/a self._poll = self._reader.poll
326n/a if sys.platform == 'win32':
327n/a self._wlock = None
328n/a else:
329n/a self._wlock = ctx.Lock()
330n/a
331n/a def empty(self):
332n/a return not self._poll()
333n/a
334n/a def __getstate__(self):
335n/a context.assert_spawning(self)
336n/a return (self._reader, self._writer, self._rlock, self._wlock)
337n/a
338n/a def __setstate__(self, state):
339n/a (self._reader, self._writer, self._rlock, self._wlock) = state
340n/a
341n/a def get(self):
342n/a with self._rlock:
343n/a res = self._reader.recv_bytes()
344n/a # unserialize the data after having released the lock
345n/a return _ForkingPickler.loads(res)
346n/a
347n/a def put(self, obj):
348n/a # serialize the data before acquiring the lock
349n/a obj = _ForkingPickler.dumps(obj)
350n/a if self._wlock is None:
351n/a # writes to a message oriented win32 pipe are atomic
352n/a self._writer.send_bytes(obj)
353n/a else:
354n/a with self._wlock:
355n/a self._writer.send_bytes(obj)