ยปCore Development>Code coverage>Lib/asyncio/locks.py

Python code coverage for Lib/asyncio/locks.py

#countcontent
1n/a"""Synchronization primitives."""
2n/a
3n/a__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
4n/a
5n/aimport collections
6n/a
7n/afrom . import compat
8n/afrom . import events
9n/afrom . import futures
10n/afrom .coroutines import coroutine
11n/a
12n/a
13n/aclass _ContextManager:
14n/a """Context manager.
15n/a
16n/a This enables the following idiom for acquiring and releasing a
17n/a lock around a block:
18n/a
19n/a with (yield from lock):
20n/a <block>
21n/a
22n/a while failing loudly when accidentally using:
23n/a
24n/a with lock:
25n/a <block>
26n/a """
27n/a
28n/a def __init__(self, lock):
29n/a self._lock = lock
30n/a
31n/a def __enter__(self):
32n/a # We have no use for the "as ..." clause in the with
33n/a # statement for locks.
34n/a return None
35n/a
36n/a def __exit__(self, *args):
37n/a try:
38n/a self._lock.release()
39n/a finally:
40n/a self._lock = None # Crudely prevent reuse.
41n/a
42n/a
43n/aclass _ContextManagerMixin:
44n/a def __enter__(self):
45n/a raise RuntimeError(
46n/a '"yield from" should be used as context manager expression')
47n/a
48n/a def __exit__(self, *args):
49n/a # This must exist because __enter__ exists, even though that
50n/a # always raises; that's how the with-statement works.
51n/a pass
52n/a
53n/a @coroutine
54n/a def __iter__(self):
55n/a # This is not a coroutine. It is meant to enable the idiom:
56n/a #
57n/a # with (yield from lock):
58n/a # <block>
59n/a #
60n/a # as an alternative to:
61n/a #
62n/a # yield from lock.acquire()
63n/a # try:
64n/a # <block>
65n/a # finally:
66n/a # lock.release()
67n/a yield from self.acquire()
68n/a return _ContextManager(self)
69n/a
70n/a if compat.PY35:
71n/a
72n/a def __await__(self):
73n/a # To make "with await lock" work.
74n/a yield from self.acquire()
75n/a return _ContextManager(self)
76n/a
77n/a @coroutine
78n/a def __aenter__(self):
79n/a yield from self.acquire()
80n/a # We have no use for the "as ..." clause in the with
81n/a # statement for locks.
82n/a return None
83n/a
84n/a @coroutine
85n/a def __aexit__(self, exc_type, exc, tb):
86n/a self.release()
87n/a
88n/a
89n/aclass Lock(_ContextManagerMixin):
90n/a """Primitive lock objects.
91n/a
92n/a A primitive lock is a synchronization primitive that is not owned
93n/a by a particular coroutine when locked. A primitive lock is in one
94n/a of two states, 'locked' or 'unlocked'.
95n/a
96n/a It is created in the unlocked state. It has two basic methods,
97n/a acquire() and release(). When the state is unlocked, acquire()
98n/a changes the state to locked and returns immediately. When the
99n/a state is locked, acquire() blocks until a call to release() in
100n/a another coroutine changes it to unlocked, then the acquire() call
101n/a resets it to locked and returns. The release() method should only
102n/a be called in the locked state; it changes the state to unlocked
103n/a and returns immediately. If an attempt is made to release an
104n/a unlocked lock, a RuntimeError will be raised.
105n/a
106n/a When more than one coroutine is blocked in acquire() waiting for
107n/a the state to turn to unlocked, only one coroutine proceeds when a
108n/a release() call resets the state to unlocked; first coroutine which
109n/a is blocked in acquire() is being processed.
110n/a
111n/a acquire() is a coroutine and should be called with 'yield from'.
112n/a
113n/a Locks also support the context management protocol. '(yield from lock)'
114n/a should be used as the context manager expression.
115n/a
116n/a Usage:
117n/a
118n/a lock = Lock()
119n/a ...
120n/a yield from lock
121n/a try:
122n/a ...
123n/a finally:
124n/a lock.release()
125n/a
126n/a Context manager usage:
127n/a
128n/a lock = Lock()
129n/a ...
130n/a with (yield from lock):
131n/a ...
132n/a
133n/a Lock objects can be tested for locking state:
134n/a
135n/a if not lock.locked():
136n/a yield from lock
137n/a else:
138n/a # lock is acquired
139n/a ...
140n/a
141n/a """
142n/a
143n/a def __init__(self, *, loop=None):
144n/a self._waiters = collections.deque()
145n/a self._locked = False
146n/a if loop is not None:
147n/a self._loop = loop
148n/a else:
149n/a self._loop = events.get_event_loop()
150n/a
151n/a def __repr__(self):
152n/a res = super().__repr__()
153n/a extra = 'locked' if self._locked else 'unlocked'
154n/a if self._waiters:
155n/a extra = '{},waiters:{}'.format(extra, len(self._waiters))
156n/a return '<{} [{}]>'.format(res[1:-1], extra)
157n/a
158n/a def locked(self):
159n/a """Return True if lock is acquired."""
160n/a return self._locked
161n/a
162n/a @coroutine
163n/a def acquire(self):
164n/a """Acquire a lock.
165n/a
166n/a This method blocks until the lock is unlocked, then sets it to
167n/a locked and returns True.
168n/a """
169n/a if not self._locked and all(w.cancelled() for w in self._waiters):
170n/a self._locked = True
171n/a return True
172n/a
173n/a fut = self._loop.create_future()
174n/a self._waiters.append(fut)
175n/a try:
176n/a yield from fut
177n/a self._locked = True
178n/a return True
179n/a finally:
180n/a self._waiters.remove(fut)
181n/a
182n/a def release(self):
183n/a """Release a lock.
184n/a
185n/a When the lock is locked, reset it to unlocked, and return.
186n/a If any other coroutines are blocked waiting for the lock to become
187n/a unlocked, allow exactly one of them to proceed.
188n/a
189n/a When invoked on an unlocked lock, a RuntimeError is raised.
190n/a
191n/a There is no return value.
192n/a """
193n/a if self._locked:
194n/a self._locked = False
195n/a # Wake up the first waiter who isn't cancelled.
196n/a for fut in self._waiters:
197n/a if not fut.done():
198n/a fut.set_result(True)
199n/a break
200n/a else:
201n/a raise RuntimeError('Lock is not acquired.')
202n/a
203n/a
204n/aclass Event:
205n/a """Asynchronous equivalent to threading.Event.
206n/a
207n/a Class implementing event objects. An event manages a flag that can be set
208n/a to true with the set() method and reset to false with the clear() method.
209n/a The wait() method blocks until the flag is true. The flag is initially
210n/a false.
211n/a """
212n/a
213n/a def __init__(self, *, loop=None):
214n/a self._waiters = collections.deque()
215n/a self._value = False
216n/a if loop is not None:
217n/a self._loop = loop
218n/a else:
219n/a self._loop = events.get_event_loop()
220n/a
221n/a def __repr__(self):
222n/a res = super().__repr__()
223n/a extra = 'set' if self._value else 'unset'
224n/a if self._waiters:
225n/a extra = '{},waiters:{}'.format(extra, len(self._waiters))
226n/a return '<{} [{}]>'.format(res[1:-1], extra)
227n/a
228n/a def is_set(self):
229n/a """Return True if and only if the internal flag is true."""
230n/a return self._value
231n/a
232n/a def set(self):
233n/a """Set the internal flag to true. All coroutines waiting for it to
234n/a become true are awakened. Coroutine that call wait() once the flag is
235n/a true will not block at all.
236n/a """
237n/a if not self._value:
238n/a self._value = True
239n/a
240n/a for fut in self._waiters:
241n/a if not fut.done():
242n/a fut.set_result(True)
243n/a
244n/a def clear(self):
245n/a """Reset the internal flag to false. Subsequently, coroutines calling
246n/a wait() will block until set() is called to set the internal flag
247n/a to true again."""
248n/a self._value = False
249n/a
250n/a @coroutine
251n/a def wait(self):
252n/a """Block until the internal flag is true.
253n/a
254n/a If the internal flag is true on entry, return True
255n/a immediately. Otherwise, block until another coroutine calls
256n/a set() to set the flag to true, then return True.
257n/a """
258n/a if self._value:
259n/a return True
260n/a
261n/a fut = self._loop.create_future()
262n/a self._waiters.append(fut)
263n/a try:
264n/a yield from fut
265n/a return True
266n/a finally:
267n/a self._waiters.remove(fut)
268n/a
269n/a
270n/aclass Condition(_ContextManagerMixin):
271n/a """Asynchronous equivalent to threading.Condition.
272n/a
273n/a This class implements condition variable objects. A condition variable
274n/a allows one or more coroutines to wait until they are notified by another
275n/a coroutine.
276n/a
277n/a A new Lock object is created and used as the underlying lock.
278n/a """
279n/a
280n/a def __init__(self, lock=None, *, loop=None):
281n/a if loop is not None:
282n/a self._loop = loop
283n/a else:
284n/a self._loop = events.get_event_loop()
285n/a
286n/a if lock is None:
287n/a lock = Lock(loop=self._loop)
288n/a elif lock._loop is not self._loop:
289n/a raise ValueError("loop argument must agree with lock")
290n/a
291n/a self._lock = lock
292n/a # Export the lock's locked(), acquire() and release() methods.
293n/a self.locked = lock.locked
294n/a self.acquire = lock.acquire
295n/a self.release = lock.release
296n/a
297n/a self._waiters = collections.deque()
298n/a
299n/a def __repr__(self):
300n/a res = super().__repr__()
301n/a extra = 'locked' if self.locked() else 'unlocked'
302n/a if self._waiters:
303n/a extra = '{},waiters:{}'.format(extra, len(self._waiters))
304n/a return '<{} [{}]>'.format(res[1:-1], extra)
305n/a
306n/a @coroutine
307n/a def wait(self):
308n/a """Wait until notified.
309n/a
310n/a If the calling coroutine has not acquired the lock when this
311n/a method is called, a RuntimeError is raised.
312n/a
313n/a This method releases the underlying lock, and then blocks
314n/a until it is awakened by a notify() or notify_all() call for
315n/a the same condition variable in another coroutine. Once
316n/a awakened, it re-acquires the lock and returns True.
317n/a """
318n/a if not self.locked():
319n/a raise RuntimeError('cannot wait on un-acquired lock')
320n/a
321n/a self.release()
322n/a try:
323n/a fut = self._loop.create_future()
324n/a self._waiters.append(fut)
325n/a try:
326n/a yield from fut
327n/a return True
328n/a finally:
329n/a self._waiters.remove(fut)
330n/a
331n/a finally:
332n/a # Must reacquire lock even if wait is cancelled
333n/a while True:
334n/a try:
335n/a yield from self.acquire()
336n/a break
337n/a except futures.CancelledError:
338n/a pass
339n/a
340n/a @coroutine
341n/a def wait_for(self, predicate):
342n/a """Wait until a predicate becomes true.
343n/a
344n/a The predicate should be a callable which result will be
345n/a interpreted as a boolean value. The final predicate value is
346n/a the return value.
347n/a """
348n/a result = predicate()
349n/a while not result:
350n/a yield from self.wait()
351n/a result = predicate()
352n/a return result
353n/a
354n/a def notify(self, n=1):
355n/a """By default, wake up one coroutine waiting on this condition, if any.
356n/a If the calling coroutine has not acquired the lock when this method
357n/a is called, a RuntimeError is raised.
358n/a
359n/a This method wakes up at most n of the coroutines waiting for the
360n/a condition variable; it is a no-op if no coroutines are waiting.
361n/a
362n/a Note: an awakened coroutine does not actually return from its
363n/a wait() call until it can reacquire the lock. Since notify() does
364n/a not release the lock, its caller should.
365n/a """
366n/a if not self.locked():
367n/a raise RuntimeError('cannot notify on un-acquired lock')
368n/a
369n/a idx = 0
370n/a for fut in self._waiters:
371n/a if idx >= n:
372n/a break
373n/a
374n/a if not fut.done():
375n/a idx += 1
376n/a fut.set_result(False)
377n/a
378n/a def notify_all(self):
379n/a """Wake up all threads waiting on this condition. This method acts
380n/a like notify(), but wakes up all waiting threads instead of one. If the
381n/a calling thread has not acquired the lock when this method is called,
382n/a a RuntimeError is raised.
383n/a """
384n/a self.notify(len(self._waiters))
385n/a
386n/a
387n/aclass Semaphore(_ContextManagerMixin):
388n/a """A Semaphore implementation.
389n/a
390n/a A semaphore manages an internal counter which is decremented by each
391n/a acquire() call and incremented by each release() call. The counter
392n/a can never go below zero; when acquire() finds that it is zero, it blocks,
393n/a waiting until some other thread calls release().
394n/a
395n/a Semaphores also support the context management protocol.
396n/a
397n/a The optional argument gives the initial value for the internal
398n/a counter; it defaults to 1. If the value given is less than 0,
399n/a ValueError is raised.
400n/a """
401n/a
402n/a def __init__(self, value=1, *, loop=None):
403n/a if value < 0:
404n/a raise ValueError("Semaphore initial value must be >= 0")
405n/a self._value = value
406n/a self._waiters = collections.deque()
407n/a if loop is not None:
408n/a self._loop = loop
409n/a else:
410n/a self._loop = events.get_event_loop()
411n/a
412n/a def __repr__(self):
413n/a res = super().__repr__()
414n/a extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
415n/a self._value)
416n/a if self._waiters:
417n/a extra = '{},waiters:{}'.format(extra, len(self._waiters))
418n/a return '<{} [{}]>'.format(res[1:-1], extra)
419n/a
420n/a def _wake_up_next(self):
421n/a while self._waiters:
422n/a waiter = self._waiters.popleft()
423n/a if not waiter.done():
424n/a waiter.set_result(None)
425n/a return
426n/a
427n/a def locked(self):
428n/a """Returns True if semaphore can not be acquired immediately."""
429n/a return self._value == 0
430n/a
431n/a @coroutine
432n/a def acquire(self):
433n/a """Acquire a semaphore.
434n/a
435n/a If the internal counter is larger than zero on entry,
436n/a decrement it by one and return True immediately. If it is
437n/a zero on entry, block, waiting until some other coroutine has
438n/a called release() to make it larger than 0, and then return
439n/a True.
440n/a """
441n/a while self._value <= 0:
442n/a fut = self._loop.create_future()
443n/a self._waiters.append(fut)
444n/a try:
445n/a yield from fut
446n/a except:
447n/a # See the similar code in Queue.get.
448n/a fut.cancel()
449n/a if self._value > 0 and not fut.cancelled():
450n/a self._wake_up_next()
451n/a raise
452n/a self._value -= 1
453n/a return True
454n/a
455n/a def release(self):
456n/a """Release a semaphore, incrementing the internal counter by one.
457n/a When it was zero on entry and another coroutine is waiting for it to
458n/a become larger than zero again, wake up that coroutine.
459n/a """
460n/a self._value += 1
461n/a self._wake_up_next()
462n/a
463n/a
464n/aclass BoundedSemaphore(Semaphore):
465n/a """A bounded semaphore implementation.
466n/a
467n/a This raises ValueError in release() if it would increase the value
468n/a above the initial value.
469n/a """
470n/a
471n/a def __init__(self, value=1, *, loop=None):
472n/a self._bound_value = value
473n/a super().__init__(value, loop=loop)
474n/a
475n/a def release(self):
476n/a if self._value >= self._bound_value:
477n/a raise ValueError('BoundedSemaphore released too many times')
478n/a super().release()