ยปCore Development>Code coverage>Lib/multiprocessing/synchronize.py

Python code coverage for Lib/multiprocessing/synchronize.py

#countcontent
1n/a#
2n/a# Module implementing synchronization primitives
3n/a#
4n/a# multiprocessing/synchronize.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/a__all__ = [
11n/a 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12n/a ]
13n/a
14n/aimport threading
15n/aimport sys
16n/aimport tempfile
17n/aimport _multiprocessing
18n/a
19n/afrom time import time as _time
20n/a
21n/afrom . import context
22n/afrom . import process
23n/afrom . import util
24n/a
25n/a# Try to import the mp.synchronize module cleanly, if it fails
26n/a# raise ImportError for platforms lacking a working sem_open implementation.
27n/a# See issue 3770
28n/atry:
29n/a from _multiprocessing import SemLock, sem_unlink
30n/aexcept (ImportError):
31n/a raise ImportError("This platform lacks a functioning sem_open" +
32n/a " implementation, therefore, the required" +
33n/a " synchronization primitives needed will not" +
34n/a " function, see issue 3770.")
35n/a
36n/a#
37n/a# Constants
38n/a#
39n/a
40n/aRECURSIVE_MUTEX, SEMAPHORE = list(range(2))
41n/aSEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
42n/a
43n/a#
44n/a# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
45n/a#
46n/a
47n/aclass SemLock(object):
48n/a
49n/a _rand = tempfile._RandomNameSequence()
50n/a
51n/a def __init__(self, kind, value, maxvalue, *, ctx):
52n/a if ctx is None:
53n/a ctx = context._default_context.get_context()
54n/a name = ctx.get_start_method()
55n/a unlink_now = sys.platform == 'win32' or name == 'fork'
56n/a for i in range(100):
57n/a try:
58n/a sl = self._semlock = _multiprocessing.SemLock(
59n/a kind, value, maxvalue, self._make_name(),
60n/a unlink_now)
61n/a except FileExistsError:
62n/a pass
63n/a else:
64n/a break
65n/a else:
66n/a raise FileExistsError('cannot find name for semaphore')
67n/a
68n/a util.debug('created semlock with handle %s' % sl.handle)
69n/a self._make_methods()
70n/a
71n/a if sys.platform != 'win32':
72n/a def _after_fork(obj):
73n/a obj._semlock._after_fork()
74n/a util.register_after_fork(self, _after_fork)
75n/a
76n/a if self._semlock.name is not None:
77n/a # We only get here if we are on Unix with forking
78n/a # disabled. When the object is garbage collected or the
79n/a # process shuts down we unlink the semaphore name
80n/a from .semaphore_tracker import register
81n/a register(self._semlock.name)
82n/a util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
83n/a exitpriority=0)
84n/a
85n/a @staticmethod
86n/a def _cleanup(name):
87n/a from .semaphore_tracker import unregister
88n/a sem_unlink(name)
89n/a unregister(name)
90n/a
91n/a def _make_methods(self):
92n/a self.acquire = self._semlock.acquire
93n/a self.release = self._semlock.release
94n/a
95n/a def __enter__(self):
96n/a return self._semlock.__enter__()
97n/a
98n/a def __exit__(self, *args):
99n/a return self._semlock.__exit__(*args)
100n/a
101n/a def __getstate__(self):
102n/a context.assert_spawning(self)
103n/a sl = self._semlock
104n/a if sys.platform == 'win32':
105n/a h = context.get_spawning_popen().duplicate_for_child(sl.handle)
106n/a else:
107n/a h = sl.handle
108n/a return (h, sl.kind, sl.maxvalue, sl.name)
109n/a
110n/a def __setstate__(self, state):
111n/a self._semlock = _multiprocessing.SemLock._rebuild(*state)
112n/a util.debug('recreated blocker with handle %r' % state[0])
113n/a self._make_methods()
114n/a
115n/a @staticmethod
116n/a def _make_name():
117n/a return '%s-%s' % (process.current_process()._config['semprefix'],
118n/a next(SemLock._rand))
119n/a
120n/a#
121n/a# Semaphore
122n/a#
123n/a
124n/aclass Semaphore(SemLock):
125n/a
126n/a def __init__(self, value=1, *, ctx):
127n/a SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
128n/a
129n/a def get_value(self):
130n/a return self._semlock._get_value()
131n/a
132n/a def __repr__(self):
133n/a try:
134n/a value = self._semlock._get_value()
135n/a except Exception:
136n/a value = 'unknown'
137n/a return '<%s(value=%s)>' % (self.__class__.__name__, value)
138n/a
139n/a#
140n/a# Bounded semaphore
141n/a#
142n/a
143n/aclass BoundedSemaphore(Semaphore):
144n/a
145n/a def __init__(self, value=1, *, ctx):
146n/a SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
147n/a
148n/a def __repr__(self):
149n/a try:
150n/a value = self._semlock._get_value()
151n/a except Exception:
152n/a value = 'unknown'
153n/a return '<%s(value=%s, maxvalue=%s)>' % \
154n/a (self.__class__.__name__, value, self._semlock.maxvalue)
155n/a
156n/a#
157n/a# Non-recursive lock
158n/a#
159n/a
160n/aclass Lock(SemLock):
161n/a
162n/a def __init__(self, *, ctx):
163n/a SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
164n/a
165n/a def __repr__(self):
166n/a try:
167n/a if self._semlock._is_mine():
168n/a name = process.current_process().name
169n/a if threading.current_thread().name != 'MainThread':
170n/a name += '|' + threading.current_thread().name
171n/a elif self._semlock._get_value() == 1:
172n/a name = 'None'
173n/a elif self._semlock._count() > 0:
174n/a name = 'SomeOtherThread'
175n/a else:
176n/a name = 'SomeOtherProcess'
177n/a except Exception:
178n/a name = 'unknown'
179n/a return '<%s(owner=%s)>' % (self.__class__.__name__, name)
180n/a
181n/a#
182n/a# Recursive lock
183n/a#
184n/a
185n/aclass RLock(SemLock):
186n/a
187n/a def __init__(self, *, ctx):
188n/a SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
189n/a
190n/a def __repr__(self):
191n/a try:
192n/a if self._semlock._is_mine():
193n/a name = process.current_process().name
194n/a if threading.current_thread().name != 'MainThread':
195n/a name += '|' + threading.current_thread().name
196n/a count = self._semlock._count()
197n/a elif self._semlock._get_value() == 1:
198n/a name, count = 'None', 0
199n/a elif self._semlock._count() > 0:
200n/a name, count = 'SomeOtherThread', 'nonzero'
201n/a else:
202n/a name, count = 'SomeOtherProcess', 'nonzero'
203n/a except Exception:
204n/a name, count = 'unknown', 'unknown'
205n/a return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
206n/a
207n/a#
208n/a# Condition variable
209n/a#
210n/a
211n/aclass Condition(object):
212n/a
213n/a def __init__(self, lock=None, *, ctx):
214n/a self._lock = lock or ctx.RLock()
215n/a self._sleeping_count = ctx.Semaphore(0)
216n/a self._woken_count = ctx.Semaphore(0)
217n/a self._wait_semaphore = ctx.Semaphore(0)
218n/a self._make_methods()
219n/a
220n/a def __getstate__(self):
221n/a context.assert_spawning(self)
222n/a return (self._lock, self._sleeping_count,
223n/a self._woken_count, self._wait_semaphore)
224n/a
225n/a def __setstate__(self, state):
226n/a (self._lock, self._sleeping_count,
227n/a self._woken_count, self._wait_semaphore) = state
228n/a self._make_methods()
229n/a
230n/a def __enter__(self):
231n/a return self._lock.__enter__()
232n/a
233n/a def __exit__(self, *args):
234n/a return self._lock.__exit__(*args)
235n/a
236n/a def _make_methods(self):
237n/a self.acquire = self._lock.acquire
238n/a self.release = self._lock.release
239n/a
240n/a def __repr__(self):
241n/a try:
242n/a num_waiters = (self._sleeping_count._semlock._get_value() -
243n/a self._woken_count._semlock._get_value())
244n/a except Exception:
245n/a num_waiters = 'unknown'
246n/a return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
247n/a
248n/a def wait(self, timeout=None):
249n/a assert self._lock._semlock._is_mine(), \
250n/a 'must acquire() condition before using wait()'
251n/a
252n/a # indicate that this thread is going to sleep
253n/a self._sleeping_count.release()
254n/a
255n/a # release lock
256n/a count = self._lock._semlock._count()
257n/a for i in range(count):
258n/a self._lock.release()
259n/a
260n/a try:
261n/a # wait for notification or timeout
262n/a return self._wait_semaphore.acquire(True, timeout)
263n/a finally:
264n/a # indicate that this thread has woken
265n/a self._woken_count.release()
266n/a
267n/a # reacquire lock
268n/a for i in range(count):
269n/a self._lock.acquire()
270n/a
271n/a def notify(self):
272n/a assert self._lock._semlock._is_mine(), 'lock is not owned'
273n/a assert not self._wait_semaphore.acquire(False)
274n/a
275n/a # to take account of timeouts since last notify() we subtract
276n/a # woken_count from sleeping_count and rezero woken_count
277n/a while self._woken_count.acquire(False):
278n/a res = self._sleeping_count.acquire(False)
279n/a assert res
280n/a
281n/a if self._sleeping_count.acquire(False): # try grabbing a sleeper
282n/a self._wait_semaphore.release() # wake up one sleeper
283n/a self._woken_count.acquire() # wait for the sleeper to wake
284n/a
285n/a # rezero _wait_semaphore in case a timeout just happened
286n/a self._wait_semaphore.acquire(False)
287n/a
288n/a def notify_all(self):
289n/a assert self._lock._semlock._is_mine(), 'lock is not owned'
290n/a assert not self._wait_semaphore.acquire(False)
291n/a
292n/a # to take account of timeouts since last notify*() we subtract
293n/a # woken_count from sleeping_count and rezero woken_count
294n/a while self._woken_count.acquire(False):
295n/a res = self._sleeping_count.acquire(False)
296n/a assert res
297n/a
298n/a sleepers = 0
299n/a while self._sleeping_count.acquire(False):
300n/a self._wait_semaphore.release() # wake up one sleeper
301n/a sleepers += 1
302n/a
303n/a if sleepers:
304n/a for i in range(sleepers):
305n/a self._woken_count.acquire() # wait for a sleeper to wake
306n/a
307n/a # rezero wait_semaphore in case some timeouts just happened
308n/a while self._wait_semaphore.acquire(False):
309n/a pass
310n/a
311n/a def wait_for(self, predicate, timeout=None):
312n/a result = predicate()
313n/a if result:
314n/a return result
315n/a if timeout is not None:
316n/a endtime = _time() + timeout
317n/a else:
318n/a endtime = None
319n/a waittime = None
320n/a while not result:
321n/a if endtime is not None:
322n/a waittime = endtime - _time()
323n/a if waittime <= 0:
324n/a break
325n/a self.wait(waittime)
326n/a result = predicate()
327n/a return result
328n/a
329n/a#
330n/a# Event
331n/a#
332n/a
333n/aclass Event(object):
334n/a
335n/a def __init__(self, *, ctx):
336n/a self._cond = ctx.Condition(ctx.Lock())
337n/a self._flag = ctx.Semaphore(0)
338n/a
339n/a def is_set(self):
340n/a with self._cond:
341n/a if self._flag.acquire(False):
342n/a self._flag.release()
343n/a return True
344n/a return False
345n/a
346n/a def set(self):
347n/a with self._cond:
348n/a self._flag.acquire(False)
349n/a self._flag.release()
350n/a self._cond.notify_all()
351n/a
352n/a def clear(self):
353n/a with self._cond:
354n/a self._flag.acquire(False)
355n/a
356n/a def wait(self, timeout=None):
357n/a with self._cond:
358n/a if self._flag.acquire(False):
359n/a self._flag.release()
360n/a else:
361n/a self._cond.wait(timeout)
362n/a
363n/a if self._flag.acquire(False):
364n/a self._flag.release()
365n/a return True
366n/a return False
367n/a
368n/a#
369n/a# Barrier
370n/a#
371n/a
372n/aclass Barrier(threading.Barrier):
373n/a
374n/a def __init__(self, parties, action=None, timeout=None, *, ctx):
375n/a import struct
376n/a from .heap import BufferWrapper
377n/a wrapper = BufferWrapper(struct.calcsize('i') * 2)
378n/a cond = ctx.Condition()
379n/a self.__setstate__((parties, action, timeout, cond, wrapper))
380n/a self._state = 0
381n/a self._count = 0
382n/a
383n/a def __setstate__(self, state):
384n/a (self._parties, self._action, self._timeout,
385n/a self._cond, self._wrapper) = state
386n/a self._array = self._wrapper.create_memoryview().cast('i')
387n/a
388n/a def __getstate__(self):
389n/a return (self._parties, self._action, self._timeout,
390n/a self._cond, self._wrapper)
391n/a
392n/a @property
393n/a def _state(self):
394n/a return self._array[0]
395n/a
396n/a @_state.setter
397n/a def _state(self, value):
398n/a self._array[0] = value
399n/a
400n/a @property
401n/a def _count(self):
402n/a return self._array[1]
403n/a
404n/a @_count.setter
405n/a def _count(self, value):
406n/a self._array[1] = value