ยปCore Development>Code coverage>Tools/ccbench/ccbench.py

Python code coverage for Tools/ccbench/ccbench.py

#countcontent
1n/a# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2n/a
3n/afrom __future__ import division
4n/afrom __future__ import print_function
5n/a
6n/a"""
7n/accbench, a Python concurrency benchmark.
8n/a"""
9n/a
10n/aimport time
11n/aimport os
12n/aimport sys
13n/aimport itertools
14n/aimport threading
15n/aimport subprocess
16n/aimport socket
17n/afrom optparse import OptionParser, SUPPRESS_HELP
18n/aimport platform
19n/a
20n/a# Compatibility
21n/atry:
22n/a xrange
23n/aexcept NameError:
24n/a xrange = range
25n/a
26n/atry:
27n/a map = itertools.imap
28n/aexcept AttributeError:
29n/a pass
30n/a
31n/a
32n/aTHROUGHPUT_DURATION = 2.0
33n/a
34n/aLATENCY_PING_INTERVAL = 0.1
35n/aLATENCY_DURATION = 2.0
36n/a
37n/aBANDWIDTH_PACKET_SIZE = 1024
38n/aBANDWIDTH_DURATION = 2.0
39n/a
40n/a
41n/adef task_pidigits():
42n/a """Pi calculation (Python)"""
43n/a _map = map
44n/a _count = itertools.count
45n/a _islice = itertools.islice
46n/a
47n/a def calc_ndigits(n):
48n/a # From http://shootout.alioth.debian.org/
49n/a def gen_x():
50n/a return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51n/a
52n/a def compose(a, b):
53n/a aq, ar, as_, at = a
54n/a bq, br, bs, bt = b
55n/a return (aq * bq,
56n/a aq * br + ar * bt,
57n/a as_ * bq + at * bs,
58n/a as_ * br + at * bt)
59n/a
60n/a def extract(z, j):
61n/a q, r, s, t = z
62n/a return (q*j + r) // (s*j + t)
63n/a
64n/a def pi_digits():
65n/a z = (1, 0, 0, 1)
66n/a x = gen_x()
67n/a while 1:
68n/a y = extract(z, 3)
69n/a while y != extract(z, 4):
70n/a z = compose(z, next(x))
71n/a y = extract(z, 3)
72n/a z = compose((10, -10*y, 0, 1), z)
73n/a yield y
74n/a
75n/a return list(_islice(pi_digits(), n))
76n/a
77n/a return calc_ndigits, (50, )
78n/a
79n/adef task_regex():
80n/a """regular expression (C)"""
81n/a # XXX this task gives horrendous latency results.
82n/a import re
83n/a # Taken from the `inspect` module
84n/a pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85n/a with open(__file__, "r") as f:
86n/a arg = f.read(2000)
87n/a
88n/a def findall(s):
89n/a t = time.time()
90n/a try:
91n/a return pat.findall(s)
92n/a finally:
93n/a print(time.time() - t)
94n/a return pat.findall, (arg, )
95n/a
96n/adef task_sort():
97n/a """list sorting (C)"""
98n/a def list_sort(l):
99n/a l = l[::-1]
100n/a l.sort()
101n/a
102n/a return list_sort, (list(range(1000)), )
103n/a
104n/adef task_compress_zlib():
105n/a """zlib compression (C)"""
106n/a import zlib
107n/a with open(__file__, "rb") as f:
108n/a arg = f.read(5000) * 3
109n/a
110n/a def compress(s):
111n/a zlib.decompress(zlib.compress(s, 5))
112n/a return compress, (arg, )
113n/a
114n/adef task_compress_bz2():
115n/a """bz2 compression (C)"""
116n/a import bz2
117n/a with open(__file__, "rb") as f:
118n/a arg = f.read(3000) * 2
119n/a
120n/a def compress(s):
121n/a bz2.compress(s)
122n/a return compress, (arg, )
123n/a
124n/adef task_hashing():
125n/a """SHA1 hashing (C)"""
126n/a import hashlib
127n/a with open(__file__, "rb") as f:
128n/a arg = f.read(5000) * 30
129n/a
130n/a def compute(s):
131n/a hashlib.sha1(s).digest()
132n/a return compute, (arg, )
133n/a
134n/a
135n/athroughput_tasks = [task_pidigits, task_regex]
136n/afor mod in 'bz2', 'hashlib':
137n/a try:
138n/a globals()[mod] = __import__(mod)
139n/a except ImportError:
140n/a globals()[mod] = None
141n/a
142n/a# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
143n/a# hashlib if available.
144n/a# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
145n/aif bz2 is not None:
146n/a throughput_tasks.append(task_compress_bz2)
147n/aelif hashlib is not None:
148n/a throughput_tasks.append(task_hashing)
149n/aelse:
150n/a throughput_tasks.append(task_compress_zlib)
151n/a
152n/alatency_tasks = throughput_tasks
153n/abandwidth_tasks = [task_pidigits]
154n/a
155n/a
156n/aclass TimedLoop:
157n/a def __init__(self, func, args):
158n/a self.func = func
159n/a self.args = args
160n/a
161n/a def __call__(self, start_time, min_duration, end_event, do_yield=False):
162n/a step = 20
163n/a niters = 0
164n/a duration = 0.0
165n/a _time = time.time
166n/a _sleep = time.sleep
167n/a _func = self.func
168n/a _args = self.args
169n/a t1 = start_time
170n/a while True:
171n/a for i in range(step):
172n/a _func(*_args)
173n/a t2 = _time()
174n/a # If another thread terminated, the current measurement is invalid
175n/a # => return the previous one.
176n/a if end_event:
177n/a return niters, duration
178n/a niters += step
179n/a duration = t2 - start_time
180n/a if duration >= min_duration:
181n/a end_event.append(None)
182n/a return niters, duration
183n/a if t2 - t1 < 0.01:
184n/a # Minimize interference of measurement on overall runtime
185n/a step = step * 3 // 2
186n/a elif do_yield:
187n/a # OS scheduling of Python threads is sometimes so bad that we
188n/a # have to force thread switching ourselves, otherwise we get
189n/a # completely useless results.
190n/a _sleep(0.0001)
191n/a t1 = t2
192n/a
193n/a
194n/adef run_throughput_test(func, args, nthreads):
195n/a assert nthreads >= 1
196n/a
197n/a # Warm up
198n/a func(*args)
199n/a
200n/a results = []
201n/a loop = TimedLoop(func, args)
202n/a end_event = []
203n/a
204n/a if nthreads == 1:
205n/a # Pure single-threaded performance, without any switching or
206n/a # synchronization overhead.
207n/a start_time = time.time()
208n/a results.append(loop(start_time, THROUGHPUT_DURATION,
209n/a end_event, do_yield=False))
210n/a return results
211n/a
212n/a started = False
213n/a ready_cond = threading.Condition()
214n/a start_cond = threading.Condition()
215n/a ready = []
216n/a
217n/a def run():
218n/a with ready_cond:
219n/a ready.append(None)
220n/a ready_cond.notify()
221n/a with start_cond:
222n/a while not started:
223n/a start_cond.wait()
224n/a results.append(loop(start_time, THROUGHPUT_DURATION,
225n/a end_event, do_yield=True))
226n/a
227n/a threads = []
228n/a for i in range(nthreads):
229n/a threads.append(threading.Thread(target=run))
230n/a for t in threads:
231n/a t.setDaemon(True)
232n/a t.start()
233n/a # We don't want measurements to include thread startup overhead,
234n/a # so we arrange for timing to start after all threads are ready.
235n/a with ready_cond:
236n/a while len(ready) < nthreads:
237n/a ready_cond.wait()
238n/a with start_cond:
239n/a start_time = time.time()
240n/a started = True
241n/a start_cond.notify(nthreads)
242n/a for t in threads:
243n/a t.join()
244n/a
245n/a return results
246n/a
247n/adef run_throughput_tests(max_threads):
248n/a for task in throughput_tasks:
249n/a print(task.__doc__)
250n/a print()
251n/a func, args = task()
252n/a nthreads = 1
253n/a baseline_speed = None
254n/a while nthreads <= max_threads:
255n/a results = run_throughput_test(func, args, nthreads)
256n/a # Taking the max duration rather than average gives pessimistic
257n/a # results rather than optimistic.
258n/a speed = sum(r[0] for r in results) / max(r[1] for r in results)
259n/a print("threads=%d: %d" % (nthreads, speed), end="")
260n/a if baseline_speed is None:
261n/a print(" iterations/s.")
262n/a baseline_speed = speed
263n/a else:
264n/a print(" ( %d %%)" % (speed / baseline_speed * 100))
265n/a nthreads += 1
266n/a print()
267n/a
268n/a
269n/aLAT_END = "END"
270n/a
271n/adef _sendto(sock, s, addr):
272n/a sock.sendto(s.encode('ascii'), addr)
273n/a
274n/adef _recv(sock, n):
275n/a return sock.recv(n).decode('ascii')
276n/a
277n/adef latency_client(addr, nb_pings, interval):
278n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
279n/a try:
280n/a _time = time.time
281n/a _sleep = time.sleep
282n/a def _ping():
283n/a _sendto(sock, "%r\n" % _time(), addr)
284n/a # The first ping signals the parent process that we are ready.
285n/a _ping()
286n/a # We give the parent a bit of time to notice.
287n/a _sleep(1.0)
288n/a for i in range(nb_pings):
289n/a _sleep(interval)
290n/a _ping()
291n/a _sendto(sock, LAT_END + "\n", addr)
292n/a finally:
293n/a sock.close()
294n/a
295n/adef run_latency_client(**kwargs):
296n/a cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
297n/a cmd_line.extend(['--latclient', repr(kwargs)])
298n/a return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
299n/a #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
300n/a
301n/adef run_latency_test(func, args, nthreads):
302n/a # Create a listening socket to receive the pings. We use UDP which should
303n/a # be painlessly cross-platform.
304n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
305n/a sock.bind(("127.0.0.1", 0))
306n/a addr = sock.getsockname()
307n/a
308n/a interval = LATENCY_PING_INTERVAL
309n/a duration = LATENCY_DURATION
310n/a nb_pings = int(duration / interval)
311n/a
312n/a results = []
313n/a threads = []
314n/a end_event = []
315n/a start_cond = threading.Condition()
316n/a started = False
317n/a if nthreads > 0:
318n/a # Warm up
319n/a func(*args)
320n/a
321n/a results = []
322n/a loop = TimedLoop(func, args)
323n/a ready = []
324n/a ready_cond = threading.Condition()
325n/a
326n/a def run():
327n/a with ready_cond:
328n/a ready.append(None)
329n/a ready_cond.notify()
330n/a with start_cond:
331n/a while not started:
332n/a start_cond.wait()
333n/a loop(start_time, duration * 1.5, end_event, do_yield=False)
334n/a
335n/a for i in range(nthreads):
336n/a threads.append(threading.Thread(target=run))
337n/a for t in threads:
338n/a t.setDaemon(True)
339n/a t.start()
340n/a # Wait for threads to be ready
341n/a with ready_cond:
342n/a while len(ready) < nthreads:
343n/a ready_cond.wait()
344n/a
345n/a # Run the client and wait for the first ping(s) to arrive before
346n/a # unblocking the background threads.
347n/a chunks = []
348n/a process = run_latency_client(addr=sock.getsockname(),
349n/a nb_pings=nb_pings, interval=interval)
350n/a s = _recv(sock, 4096)
351n/a _time = time.time
352n/a
353n/a with start_cond:
354n/a start_time = _time()
355n/a started = True
356n/a start_cond.notify(nthreads)
357n/a
358n/a while LAT_END not in s:
359n/a s = _recv(sock, 4096)
360n/a t = _time()
361n/a chunks.append((t, s))
362n/a
363n/a # Tell the background threads to stop.
364n/a end_event.append(None)
365n/a for t in threads:
366n/a t.join()
367n/a process.wait()
368n/a sock.close()
369n/a
370n/a for recv_time, chunk in chunks:
371n/a # NOTE: it is assumed that a line sent by a client wasn't received
372n/a # in two chunks because the lines are very small.
373n/a for line in chunk.splitlines():
374n/a line = line.strip()
375n/a if line and line != LAT_END:
376n/a send_time = eval(line)
377n/a assert isinstance(send_time, float)
378n/a results.append((send_time, recv_time))
379n/a
380n/a return results
381n/a
382n/adef run_latency_tests(max_threads):
383n/a for task in latency_tasks:
384n/a print("Background CPU task:", task.__doc__)
385n/a print()
386n/a func, args = task()
387n/a nthreads = 0
388n/a while nthreads <= max_threads:
389n/a results = run_latency_test(func, args, nthreads)
390n/a n = len(results)
391n/a # We print out milliseconds
392n/a lats = [1000 * (t2 - t1) for (t1, t2) in results]
393n/a #print(list(map(int, lats)))
394n/a avg = sum(lats) / n
395n/a dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
396n/a print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
397n/a print()
398n/a #print(" [... from %d samples]" % n)
399n/a nthreads += 1
400n/a print()
401n/a
402n/a
403n/aBW_END = "END"
404n/a
405n/adef bandwidth_client(addr, packet_size, duration):
406n/a sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
407n/a sock.bind(("127.0.0.1", 0))
408n/a local_addr = sock.getsockname()
409n/a _time = time.time
410n/a _sleep = time.sleep
411n/a def _send_chunk(msg):
412n/a _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
413n/a # We give the parent some time to be ready.
414n/a _sleep(1.0)
415n/a try:
416n/a start_time = _time()
417n/a end_time = start_time + duration * 2.0
418n/a i = 0
419n/a while _time() < end_time:
420n/a _send_chunk(str(i))
421n/a s = _recv(sock, packet_size)
422n/a assert len(s) == packet_size
423n/a i += 1
424n/a _send_chunk(BW_END)
425n/a finally:
426n/a sock.close()
427n/a
428n/adef run_bandwidth_client(**kwargs):
429n/a cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
430n/a cmd_line.extend(['--bwclient', repr(kwargs)])
431n/a return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
432n/a #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
433n/a
434n/adef run_bandwidth_test(func, args, nthreads):
435n/a # Create a listening socket to receive the packets. We use UDP which should
436n/a # be painlessly cross-platform.
437n/a with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
438n/a sock.bind(("127.0.0.1", 0))
439n/a addr = sock.getsockname()
440n/a
441n/a duration = BANDWIDTH_DURATION
442n/a packet_size = BANDWIDTH_PACKET_SIZE
443n/a
444n/a results = []
445n/a threads = []
446n/a end_event = []
447n/a start_cond = threading.Condition()
448n/a started = False
449n/a if nthreads > 0:
450n/a # Warm up
451n/a func(*args)
452n/a
453n/a results = []
454n/a loop = TimedLoop(func, args)
455n/a ready = []
456n/a ready_cond = threading.Condition()
457n/a
458n/a def run():
459n/a with ready_cond:
460n/a ready.append(None)
461n/a ready_cond.notify()
462n/a with start_cond:
463n/a while not started:
464n/a start_cond.wait()
465n/a loop(start_time, duration * 1.5, end_event, do_yield=False)
466n/a
467n/a for i in range(nthreads):
468n/a threads.append(threading.Thread(target=run))
469n/a for t in threads:
470n/a t.setDaemon(True)
471n/a t.start()
472n/a # Wait for threads to be ready
473n/a with ready_cond:
474n/a while len(ready) < nthreads:
475n/a ready_cond.wait()
476n/a
477n/a # Run the client and wait for the first packet to arrive before
478n/a # unblocking the background threads.
479n/a process = run_bandwidth_client(addr=addr,
480n/a packet_size=packet_size,
481n/a duration=duration)
482n/a _time = time.time
483n/a # This will also wait for the parent to be ready
484n/a s = _recv(sock, packet_size)
485n/a remote_addr = eval(s.partition('#')[0])
486n/a
487n/a with start_cond:
488n/a start_time = _time()
489n/a started = True
490n/a start_cond.notify(nthreads)
491n/a
492n/a n = 0
493n/a first_time = None
494n/a while not end_event and BW_END not in s:
495n/a _sendto(sock, s, remote_addr)
496n/a s = _recv(sock, packet_size)
497n/a if first_time is None:
498n/a first_time = _time()
499n/a n += 1
500n/a end_time = _time()
501n/a
502n/a end_event.append(None)
503n/a for t in threads:
504n/a t.join()
505n/a process.kill()
506n/a
507n/a return (n - 1) / (end_time - first_time)
508n/a
509n/adef run_bandwidth_tests(max_threads):
510n/a for task in bandwidth_tasks:
511n/a print("Background CPU task:", task.__doc__)
512n/a print()
513n/a func, args = task()
514n/a nthreads = 0
515n/a baseline_speed = None
516n/a while nthreads <= max_threads:
517n/a results = run_bandwidth_test(func, args, nthreads)
518n/a speed = results
519n/a #speed = len(results) * 1.0 / results[-1][0]
520n/a print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
521n/a if baseline_speed is None:
522n/a print(" packets/s.")
523n/a baseline_speed = speed
524n/a else:
525n/a print(" ( %d %%)" % (speed / baseline_speed * 100))
526n/a nthreads += 1
527n/a print()
528n/a
529n/a
530n/adef main():
531n/a usage = "usage: %prog [-h|--help] [options]"
532n/a parser = OptionParser(usage=usage)
533n/a parser.add_option("-t", "--throughput",
534n/a action="store_true", dest="throughput", default=False,
535n/a help="run throughput tests")
536n/a parser.add_option("-l", "--latency",
537n/a action="store_true", dest="latency", default=False,
538n/a help="run latency tests")
539n/a parser.add_option("-b", "--bandwidth",
540n/a action="store_true", dest="bandwidth", default=False,
541n/a help="run I/O bandwidth tests")
542n/a parser.add_option("-i", "--interval",
543n/a action="store", type="int", dest="check_interval", default=None,
544n/a help="sys.setcheckinterval() value")
545n/a parser.add_option("-I", "--switch-interval",
546n/a action="store", type="float", dest="switch_interval", default=None,
547n/a help="sys.setswitchinterval() value")
548n/a parser.add_option("-n", "--num-threads",
549n/a action="store", type="int", dest="nthreads", default=4,
550n/a help="max number of threads in tests")
551n/a
552n/a # Hidden option to run the pinging and bandwidth clients
553n/a parser.add_option("", "--latclient",
554n/a action="store", dest="latclient", default=None,
555n/a help=SUPPRESS_HELP)
556n/a parser.add_option("", "--bwclient",
557n/a action="store", dest="bwclient", default=None,
558n/a help=SUPPRESS_HELP)
559n/a
560n/a options, args = parser.parse_args()
561n/a if args:
562n/a parser.error("unexpected arguments")
563n/a
564n/a if options.latclient:
565n/a kwargs = eval(options.latclient)
566n/a latency_client(**kwargs)
567n/a return
568n/a
569n/a if options.bwclient:
570n/a kwargs = eval(options.bwclient)
571n/a bandwidth_client(**kwargs)
572n/a return
573n/a
574n/a if not options.throughput and not options.latency and not options.bandwidth:
575n/a options.throughput = options.latency = options.bandwidth = True
576n/a if options.check_interval:
577n/a sys.setcheckinterval(options.check_interval)
578n/a if options.switch_interval:
579n/a sys.setswitchinterval(options.switch_interval)
580n/a
581n/a print("== %s %s (%s) ==" % (
582n/a platform.python_implementation(),
583n/a platform.python_version(),
584n/a platform.python_build()[0],
585n/a ))
586n/a # Processor identification often has repeated spaces
587n/a cpu = ' '.join(platform.processor().split())
588n/a print("== %s %s on '%s' ==" % (
589n/a platform.machine(),
590n/a platform.system(),
591n/a cpu,
592n/a ))
593n/a print()
594n/a
595n/a if options.throughput:
596n/a print("--- Throughput ---")
597n/a print()
598n/a run_throughput_tests(options.nthreads)
599n/a
600n/a if options.latency:
601n/a print("--- Latency ---")
602n/a print()
603n/a run_latency_tests(options.nthreads)
604n/a
605n/a if options.bandwidth:
606n/a print("--- I/O bandwidth ---")
607n/a print()
608n/a run_bandwidth_tests(options.nthreads)
609n/a
610n/aif __name__ == "__main__":
611n/a main()