ยปCore Development>Code coverage>Demo/threads/sync.py

Python code coverage for Demo/threads/sync.py

#countcontent
1n/a# Defines classes that provide synchronization objects. Note that use of
2n/a# this module requires that your Python support threads.
3n/a#
4n/a# condition(lock=None) # a POSIX-like condition-variable object
5n/a# barrier(n) # an n-thread barrier
6n/a# event() # an event object
7n/a# semaphore(n=1) # a semaphore object, with initial count n
8n/a# mrsw() # a multiple-reader single-writer lock
9n/a#
10n/a# CONDITIONS
11n/a#
12n/a# A condition object is created via
13n/a# import this_module
14n/a# your_condition_object = this_module.condition(lock=None)
15n/a#
16n/a# As explained below, a condition object has a lock associated with it,
17n/a# used in the protocol to protect condition data. You can specify a
18n/a# lock to use in the constructor, else the constructor will allocate
19n/a# an anonymous lock for you. Specifying a lock explicitly can be useful
20n/a# when more than one condition keys off the same set of shared data.
21n/a#
22n/a# Methods:
23n/a# .acquire()
24n/a# acquire the lock associated with the condition
25n/a# .release()
26n/a# release the lock associated with the condition
27n/a# .wait()
28n/a# block the thread until such time as some other thread does a
29n/a# .signal or .broadcast on the same condition, and release the
30n/a# lock associated with the condition. The lock associated with
31n/a# the condition MUST be in the acquired state at the time
32n/a# .wait is invoked.
33n/a# .signal()
34n/a# wake up exactly one thread (if any) that previously did a .wait
35n/a# on the condition; that thread will awaken with the lock associated
36n/a# with the condition in the acquired state. If no threads are
37n/a# .wait'ing, this is a nop. If more than one thread is .wait'ing on
38n/a# the condition, any of them may be awakened.
39n/a# .broadcast()
40n/a# wake up all threads (if any) that are .wait'ing on the condition;
41n/a# the threads are woken up serially, each with the lock in the
42n/a# acquired state, so should .release() as soon as possible. If no
43n/a# threads are .wait'ing, this is a nop.
44n/a#
45n/a# Note that if a thread does a .wait *while* a signal/broadcast is
46n/a# in progress, it's guaranteeed to block until a subsequent
47n/a# signal/broadcast.
48n/a#
49n/a# Secret feature: `broadcast' actually takes an integer argument,
50n/a# and will wake up exactly that many waiting threads (or the total
51n/a# number waiting, if that's less). Use of this is dubious, though,
52n/a# and probably won't be supported if this form of condition is
53n/a# reimplemented in C.
54n/a#
55n/a# DIFFERENCES FROM POSIX
56n/a#
57n/a# + A separate mutex is not needed to guard condition data. Instead, a
58n/a# condition object can (must) be .acquire'ed and .release'ed directly.
59n/a# This eliminates a common error in using POSIX conditions.
60n/a#
61n/a# + Because of implementation difficulties, a POSIX `signal' wakes up
62n/a# _at least_ one .wait'ing thread. Race conditions make it difficult
63n/a# to stop that. This implementation guarantees to wake up only one,
64n/a# but you probably shouldn't rely on that.
65n/a#
66n/a# PROTOCOL
67n/a#
68n/a# Condition objects are used to block threads until "some condition" is
69n/a# true. E.g., a thread may wish to wait until a producer pumps out data
70n/a# for it to consume, or a server may wish to wait until someone requests
71n/a# its services, or perhaps a whole bunch of threads want to wait until a
72n/a# preceding pass over the data is complete. Early models for conditions
73n/a# relied on some other thread figuring out when a blocked thread's
74n/a# condition was true, and made the other thread responsible both for
75n/a# waking up the blocked thread and guaranteeing that it woke up with all
76n/a# data in a correct state. This proved to be very delicate in practice,
77n/a# and gave conditions a bad name in some circles.
78n/a#
79n/a# The POSIX model addresses these problems by making a thread responsible
80n/a# for ensuring that its own state is correct when it wakes, and relies
81n/a# on a rigid protocol to make this easy; so long as you stick to the
82n/a# protocol, POSIX conditions are easy to "get right":
83n/a#
84n/a# A) The thread that's waiting for some arbitrarily-complex condition
85n/a# (ACC) to become true does:
86n/a#
87n/a# condition.acquire()
88n/a# while not (code to evaluate the ACC):
89n/a# condition.wait()
90n/a# # That blocks the thread, *and* releases the lock. When a
91n/a# # condition.signal() happens, it will wake up some thread that
92n/a# # did a .wait, *and* acquire the lock again before .wait
93n/a# # returns.
94n/a# #
95n/a# # Because the lock is acquired at this point, the state used
96n/a# # in evaluating the ACC is frozen, so it's safe to go back &
97n/a# # reevaluate the ACC.
98n/a#
99n/a# # At this point, ACC is true, and the thread has the condition
100n/a# # locked.
101n/a# # So code here can safely muck with the shared state that
102n/a# # went into evaluating the ACC -- if it wants to.
103n/a# # When done mucking with the shared state, do
104n/a# condition.release()
105n/a#
106n/a# B) Threads that are mucking with shared state that may affect the
107n/a# ACC do:
108n/a#
109n/a# condition.acquire()
110n/a# # muck with shared state
111n/a# condition.release()
112n/a# if it's possible that ACC is true now:
113n/a# condition.signal() # or .broadcast()
114n/a#
115n/a# Note: You may prefer to put the "if" clause before the release().
116n/a# That's fine, but do note that anyone waiting on the signal will
117n/a# stay blocked until the release() is done (since acquiring the
118n/a# condition is part of what .wait() does before it returns).
119n/a#
120n/a# TRICK OF THE TRADE
121n/a#
122n/a# With simpler forms of conditions, it can be impossible to know when
123n/a# a thread that's supposed to do a .wait has actually done it. But
124n/a# because this form of condition releases a lock as _part_ of doing a
125n/a# wait, the state of that lock can be used to guarantee it.
126n/a#
127n/a# E.g., suppose thread A spawns thread B and later wants to wait for B to
128n/a# complete:
129n/a#
130n/a# In A: In B:
131n/a#
132n/a# B_done = condition() ... do work ...
133n/a# B_done.acquire() B_done.acquire(); B_done.release()
134n/a# spawn B B_done.signal()
135n/a# ... some time later ... ... and B exits ...
136n/a# B_done.wait()
137n/a#
138n/a# Because B_done was in the acquire'd state at the time B was spawned,
139n/a# B's attempt to acquire B_done can't succeed until A has done its
140n/a# B_done.wait() (which releases B_done). So B's B_done.signal() is
141n/a# guaranteed to be seen by the .wait(). Without the lock trick, B
142n/a# may signal before A .waits, and then A would wait forever.
143n/a#
144n/a# BARRIERS
145n/a#
146n/a# A barrier object is created via
147n/a# import this_module
148n/a# your_barrier = this_module.barrier(num_threads)
149n/a#
150n/a# Methods:
151n/a# .enter()
152n/a# the thread blocks until num_threads threads in all have done
153n/a# .enter(). Then the num_threads threads that .enter'ed resume,
154n/a# and the barrier resets to capture the next num_threads threads
155n/a# that .enter it.
156n/a#
157n/a# EVENTS
158n/a#
159n/a# An event object is created via
160n/a# import this_module
161n/a# your_event = this_module.event()
162n/a#
163n/a# An event has two states, `posted' and `cleared'. An event is
164n/a# created in the cleared state.
165n/a#
166n/a# Methods:
167n/a#
168n/a# .post()
169n/a# Put the event in the posted state, and resume all threads
170n/a# .wait'ing on the event (if any).
171n/a#
172n/a# .clear()
173n/a# Put the event in the cleared state.
174n/a#
175n/a# .is_posted()
176n/a# Returns 0 if the event is in the cleared state, or 1 if the event
177n/a# is in the posted state.
178n/a#
179n/a# .wait()
180n/a# If the event is in the posted state, returns immediately.
181n/a# If the event is in the cleared state, blocks the calling thread
182n/a# until the event is .post'ed by another thread.
183n/a#
184n/a# Note that an event, once posted, remains posted until explicitly
185n/a# cleared. Relative to conditions, this is both the strength & weakness
186n/a# of events. It's a strength because the .post'ing thread doesn't have to
187n/a# worry about whether the threads it's trying to communicate with have
188n/a# already done a .wait (a condition .signal is seen only by threads that
189n/a# do a .wait _prior_ to the .signal; a .signal does not persist). But
190n/a# it's a weakness because .clear'ing an event is error-prone: it's easy
191n/a# to mistakenly .clear an event before all the threads you intended to
192n/a# see the event get around to .wait'ing on it. But so long as you don't
193n/a# need to .clear an event, events are easy to use safely.
194n/a#
195n/a# SEMAPHORES
196n/a#
197n/a# A semaphore object is created via
198n/a# import this_module
199n/a# your_semaphore = this_module.semaphore(count=1)
200n/a#
201n/a# A semaphore has an integer count associated with it. The initial value
202n/a# of the count is specified by the optional argument (which defaults to
203n/a# 1) passed to the semaphore constructor.
204n/a#
205n/a# Methods:
206n/a#
207n/a# .p()
208n/a# If the semaphore's count is greater than 0, decrements the count
209n/a# by 1 and returns.
210n/a# Else if the semaphore's count is 0, blocks the calling thread
211n/a# until a subsequent .v() increases the count. When that happens,
212n/a# the count will be decremented by 1 and the calling thread resumed.
213n/a#
214n/a# .v()
215n/a# Increments the semaphore's count by 1, and wakes up a thread (if
216n/a# any) blocked by a .p(). It's an (detected) error for a .v() to
217n/a# increase the semaphore's count to a value larger than the initial
218n/a# count.
219n/a#
220n/a# MULTIPLE-READER SINGLE-WRITER LOCKS
221n/a#
222n/a# A mrsw lock is created via
223n/a# import this_module
224n/a# your_mrsw_lock = this_module.mrsw()
225n/a#
226n/a# This kind of lock is often useful with complex shared data structures.
227n/a# The object lets any number of "readers" proceed, so long as no thread
228n/a# wishes to "write". When a (one or more) thread declares its intention
229n/a# to "write" (e.g., to update a shared structure), all current readers
230n/a# are allowed to finish, and then a writer gets exclusive access; all
231n/a# other readers & writers are blocked until the current writer completes.
232n/a# Finally, if some thread is waiting to write and another is waiting to
233n/a# read, the writer takes precedence.
234n/a#
235n/a# Methods:
236n/a#
237n/a# .read_in()
238n/a# If no thread is writing or waiting to write, returns immediately.
239n/a# Else blocks until no thread is writing or waiting to write. So
240n/a# long as some thread has completed a .read_in but not a .read_out,
241n/a# writers are blocked.
242n/a#
243n/a# .read_out()
244n/a# Use sometime after a .read_in to declare that the thread is done
245n/a# reading. When all threads complete reading, a writer can proceed.
246n/a#
247n/a# .write_in()
248n/a# If no thread is writing (has completed a .write_in, but hasn't yet
249n/a# done a .write_out) or reading (similarly), returns immediately.
250n/a# Else blocks the calling thread, and threads waiting to read, until
251n/a# the current writer completes writing or all the current readers
252n/a# complete reading; if then more than one thread is waiting to
253n/a# write, one of them is allowed to proceed, but which one is not
254n/a# specified.
255n/a#
256n/a# .write_out()
257n/a# Use sometime after a .write_in to declare that the thread is done
258n/a# writing. Then if some other thread is waiting to write, it's
259n/a# allowed to proceed. Else all threads (if any) waiting to read are
260n/a# allowed to proceed.
261n/a#
262n/a# .write_to_read()
263n/a# Use instead of a .write_in to declare that the thread is done
264n/a# writing but wants to continue reading without other writers
265n/a# intervening. If there are other threads waiting to write, they
266n/a# are allowed to proceed only if the current thread calls
267n/a# .read_out; threads waiting to read are only allowed to proceed
268n/a# if there are are no threads waiting to write. (This is a
269n/a# weakness of the interface!)
270n/a
271n/aimport _thread as thread
272n/a
273n/aclass condition:
274n/a def __init__(self, lock=None):
275n/a # the lock actually used by .acquire() and .release()
276n/a if lock is None:
277n/a self.mutex = thread.allocate_lock()
278n/a else:
279n/a if hasattr(lock, 'acquire') and \
280n/a hasattr(lock, 'release'):
281n/a self.mutex = lock
282n/a else:
283n/a raise TypeError('condition constructor requires ' \
284n/a 'a lock argument')
285n/a
286n/a # lock used to block threads until a signal
287n/a self.checkout = thread.allocate_lock()
288n/a self.checkout.acquire()
289n/a
290n/a # internal critical-section lock, & the data it protects
291n/a self.idlock = thread.allocate_lock()
292n/a self.id = 0
293n/a self.waiting = 0 # num waiters subject to current release
294n/a self.pending = 0 # num waiters awaiting next signal
295n/a self.torelease = 0 # num waiters to release
296n/a self.releasing = 0 # 1 iff release is in progress
297n/a
298n/a def acquire(self):
299n/a self.mutex.acquire()
300n/a
301n/a def release(self):
302n/a self.mutex.release()
303n/a
304n/a def wait(self):
305n/a mutex, checkout, idlock = self.mutex, self.checkout, self.idlock
306n/a if not mutex.locked():
307n/a raise ValueError("condition must be .acquire'd when .wait() invoked")
308n/a
309n/a idlock.acquire()
310n/a myid = self.id
311n/a self.pending = self.pending + 1
312n/a idlock.release()
313n/a
314n/a mutex.release()
315n/a
316n/a while 1:
317n/a checkout.acquire(); idlock.acquire()
318n/a if myid < self.id:
319n/a break
320n/a checkout.release(); idlock.release()
321n/a
322n/a self.waiting = self.waiting - 1
323n/a self.torelease = self.torelease - 1
324n/a if self.torelease:
325n/a checkout.release()
326n/a else:
327n/a self.releasing = 0
328n/a if self.waiting == self.pending == 0:
329n/a self.id = 0
330n/a idlock.release()
331n/a mutex.acquire()
332n/a
333n/a def signal(self):
334n/a self.broadcast(1)
335n/a
336n/a def broadcast(self, num = -1):
337n/a if num < -1:
338n/a raise ValueError('.broadcast called with num %r' % (num,))
339n/a if num == 0:
340n/a return
341n/a self.idlock.acquire()
342n/a if self.pending:
343n/a self.waiting = self.waiting + self.pending
344n/a self.pending = 0
345n/a self.id = self.id + 1
346n/a if num == -1:
347n/a self.torelease = self.waiting
348n/a else:
349n/a self.torelease = min( self.waiting,
350n/a self.torelease + num )
351n/a if self.torelease and not self.releasing:
352n/a self.releasing = 1
353n/a self.checkout.release()
354n/a self.idlock.release()
355n/a
356n/aclass barrier:
357n/a def __init__(self, n):
358n/a self.n = n
359n/a self.togo = n
360n/a self.full = condition()
361n/a
362n/a def enter(self):
363n/a full = self.full
364n/a full.acquire()
365n/a self.togo = self.togo - 1
366n/a if self.togo:
367n/a full.wait()
368n/a else:
369n/a self.togo = self.n
370n/a full.broadcast()
371n/a full.release()
372n/a
373n/aclass event:
374n/a def __init__(self):
375n/a self.state = 0
376n/a self.posted = condition()
377n/a
378n/a def post(self):
379n/a self.posted.acquire()
380n/a self.state = 1
381n/a self.posted.broadcast()
382n/a self.posted.release()
383n/a
384n/a def clear(self):
385n/a self.posted.acquire()
386n/a self.state = 0
387n/a self.posted.release()
388n/a
389n/a def is_posted(self):
390n/a self.posted.acquire()
391n/a answer = self.state
392n/a self.posted.release()
393n/a return answer
394n/a
395n/a def wait(self):
396n/a self.posted.acquire()
397n/a if not self.state:
398n/a self.posted.wait()
399n/a self.posted.release()
400n/a
401n/aclass semaphore:
402n/a def __init__(self, count=1):
403n/a if count <= 0:
404n/a raise ValueError('semaphore count %d; must be >= 1' % count)
405n/a self.count = count
406n/a self.maxcount = count
407n/a self.nonzero = condition()
408n/a
409n/a def p(self):
410n/a self.nonzero.acquire()
411n/a while self.count == 0:
412n/a self.nonzero.wait()
413n/a self.count = self.count - 1
414n/a self.nonzero.release()
415n/a
416n/a def v(self):
417n/a self.nonzero.acquire()
418n/a if self.count == self.maxcount:
419n/a raise ValueError('.v() tried to raise semaphore count above ' \
420n/a 'initial value %r' % self.maxcount)
421n/a self.count = self.count + 1
422n/a self.nonzero.signal()
423n/a self.nonzero.release()
424n/a
425n/aclass mrsw:
426n/a def __init__(self):
427n/a # critical-section lock & the data it protects
428n/a self.rwOK = thread.allocate_lock()
429n/a self.nr = 0 # number readers actively reading (not just waiting)
430n/a self.nw = 0 # number writers either waiting to write or writing
431n/a self.writing = 0 # 1 iff some thread is writing
432n/a
433n/a # conditions
434n/a self.readOK = condition(self.rwOK) # OK to unblock readers
435n/a self.writeOK = condition(self.rwOK) # OK to unblock writers
436n/a
437n/a def read_in(self):
438n/a self.rwOK.acquire()
439n/a while self.nw:
440n/a self.readOK.wait()
441n/a self.nr = self.nr + 1
442n/a self.rwOK.release()
443n/a
444n/a def read_out(self):
445n/a self.rwOK.acquire()
446n/a if self.nr <= 0:
447n/a raise ValueError('.read_out() invoked without an active reader')
448n/a self.nr = self.nr - 1
449n/a if self.nr == 0:
450n/a self.writeOK.signal()
451n/a self.rwOK.release()
452n/a
453n/a def write_in(self):
454n/a self.rwOK.acquire()
455n/a self.nw = self.nw + 1
456n/a while self.writing or self.nr:
457n/a self.writeOK.wait()
458n/a self.writing = 1
459n/a self.rwOK.release()
460n/a
461n/a def write_out(self):
462n/a self.rwOK.acquire()
463n/a if not self.writing:
464n/a raise ValueError('.write_out() invoked without an active writer')
465n/a self.writing = 0
466n/a self.nw = self.nw - 1
467n/a if self.nw:
468n/a self.writeOK.signal()
469n/a else:
470n/a self.readOK.broadcast()
471n/a self.rwOK.release()
472n/a
473n/a def write_to_read(self):
474n/a self.rwOK.acquire()
475n/a if not self.writing:
476n/a raise ValueError('.write_to_read() invoked without an active writer')
477n/a self.writing = 0
478n/a self.nw = self.nw - 1
479n/a self.nr = self.nr + 1
480n/a if not self.nw:
481n/a self.readOK.broadcast()
482n/a self.rwOK.release()
483n/a
484n/a# The rest of the file is a test case, that runs a number of parallelized
485n/a# quicksorts in parallel. If it works, you'll get about 600 lines of
486n/a# tracing output, with a line like
487n/a# test passed! 209 threads created in all
488n/a# as the last line. The content and order of preceding lines will
489n/a# vary across runs.
490n/a
491n/adef _new_thread(func, *args):
492n/a global TID
493n/a tid.acquire(); id = TID = TID+1; tid.release()
494n/a io.acquire(); alive.append(id); \
495n/a print('starting thread', id, '--', len(alive), 'alive'); \
496n/a io.release()
497n/a thread.start_new_thread( func, (id,) + args )
498n/a
499n/adef _qsort(tid, a, l, r, finished):
500n/a # sort a[l:r]; post finished when done
501n/a io.acquire(); print('thread', tid, 'qsort', l, r); io.release()
502n/a if r-l > 1:
503n/a pivot = a[l]
504n/a j = l+1 # make a[l:j] <= pivot, and a[j:r] > pivot
505n/a for i in range(j, r):
506n/a if a[i] <= pivot:
507n/a a[j], a[i] = a[i], a[j]
508n/a j = j + 1
509n/a a[l], a[j-1] = a[j-1], pivot
510n/a
511n/a l_subarray_sorted = event()
512n/a r_subarray_sorted = event()
513n/a _new_thread(_qsort, a, l, j-1, l_subarray_sorted)
514n/a _new_thread(_qsort, a, j, r, r_subarray_sorted)
515n/a l_subarray_sorted.wait()
516n/a r_subarray_sorted.wait()
517n/a
518n/a io.acquire(); print('thread', tid, 'qsort done'); \
519n/a alive.remove(tid); io.release()
520n/a finished.post()
521n/a
522n/adef _randarray(tid, a, finished):
523n/a io.acquire(); print('thread', tid, 'randomizing array'); \
524n/a io.release()
525n/a for i in range(1, len(a)):
526n/a wh.acquire(); j = randint(0,i); wh.release()
527n/a a[i], a[j] = a[j], a[i]
528n/a io.acquire(); print('thread', tid, 'randomizing done'); \
529n/a alive.remove(tid); io.release()
530n/a finished.post()
531n/a
532n/adef _check_sort(a):
533n/a if a != range(len(a)):
534n/a raise ValueError('a not sorted', a)
535n/a
536n/adef _run_one_sort(tid, a, bar, done):
537n/a # randomize a, and quicksort it
538n/a # for variety, all the threads running this enter a barrier
539n/a # at the end, and post `done' after the barrier exits
540n/a io.acquire(); print('thread', tid, 'randomizing', a); \
541n/a io.release()
542n/a finished = event()
543n/a _new_thread(_randarray, a, finished)
544n/a finished.wait()
545n/a
546n/a io.acquire(); print('thread', tid, 'sorting', a); io.release()
547n/a finished.clear()
548n/a _new_thread(_qsort, a, 0, len(a), finished)
549n/a finished.wait()
550n/a _check_sort(a)
551n/a
552n/a io.acquire(); print('thread', tid, 'entering barrier'); \
553n/a io.release()
554n/a bar.enter()
555n/a io.acquire(); print('thread', tid, 'leaving barrier'); \
556n/a io.release()
557n/a io.acquire(); alive.remove(tid); io.release()
558n/a bar.enter() # make sure they've all removed themselves from alive
559n/a ## before 'done' is posted
560n/a bar.enter() # just to be cruel
561n/a done.post()
562n/a
563n/adef test():
564n/a global TID, tid, io, wh, randint, alive
565n/a import random
566n/a randint = random.randint
567n/a
568n/a TID = 0 # thread ID (1, 2, ...)
569n/a tid = thread.allocate_lock() # for changing TID
570n/a io = thread.allocate_lock() # for printing, and 'alive'
571n/a wh = thread.allocate_lock() # for calls to random
572n/a alive = [] # IDs of active threads
573n/a
574n/a NSORTS = 5
575n/a arrays = []
576n/a for i in range(NSORTS):
577n/a arrays.append( range( (i+1)*10 ) )
578n/a
579n/a bar = barrier(NSORTS)
580n/a finished = event()
581n/a for i in range(NSORTS):
582n/a _new_thread(_run_one_sort, arrays[i], bar, finished)
583n/a finished.wait()
584n/a
585n/a print('all threads done, and checking results ...')
586n/a if alive:
587n/a raise ValueError('threads still alive at end', alive)
588n/a for i in range(NSORTS):
589n/a a = arrays[i]
590n/a if len(a) != (i+1)*10:
591n/a raise ValueError('length of array', i, 'screwed up')
592n/a _check_sort(a)
593n/a
594n/a print('test passed!', TID, 'threads created in all')
595n/a
596n/aif __name__ == '__main__':
597n/a test()
598n/a
599n/a# end of module