ยปCore Development>Code coverage>Lib/test/test_itertools.py

Python code coverage for Lib/test/test_itertools.py

#countcontent
1n/aimport unittest
2n/afrom test import support
3n/afrom itertools import *
4n/aimport weakref
5n/afrom decimal import Decimal
6n/afrom fractions import Fraction
7n/aimport operator
8n/aimport random
9n/aimport copy
10n/aimport pickle
11n/afrom functools import reduce
12n/aimport sys
13n/aimport struct
14n/amaxsize = support.MAX_Py_ssize_t
15n/aminsize = -maxsize-1
16n/a
17n/adef lzip(*args):
18n/a return list(zip(*args))
19n/a
20n/adef onearg(x):
21n/a 'Test function of one argument'
22n/a return 2*x
23n/a
24n/adef errfunc(*args):
25n/a 'Test function that raises an error'
26n/a raise ValueError
27n/a
28n/adef gen3():
29n/a 'Non-restartable source sequence'
30n/a for i in (0, 1, 2):
31n/a yield i
32n/a
33n/adef isEven(x):
34n/a 'Test predicate'
35n/a return x%2==0
36n/a
37n/adef isOdd(x):
38n/a 'Test predicate'
39n/a return x%2==1
40n/a
41n/adef tupleize(*args):
42n/a return args
43n/a
44n/adef irange(n):
45n/a for i in range(n):
46n/a yield i
47n/a
48n/aclass StopNow:
49n/a 'Class emulating an empty iterable.'
50n/a def __iter__(self):
51n/a return self
52n/a def __next__(self):
53n/a raise StopIteration
54n/a
55n/adef take(n, seq):
56n/a 'Convenience function for partially consuming a long of infinite iterable'
57n/a return list(islice(seq, n))
58n/a
59n/adef prod(iterable):
60n/a return reduce(operator.mul, iterable, 1)
61n/a
62n/adef fact(n):
63n/a 'Factorial'
64n/a return prod(range(1, n+1))
65n/a
66n/a# root level methods for pickling ability
67n/adef testR(r):
68n/a return r[0]
69n/a
70n/adef testR2(r):
71n/a return r[2]
72n/a
73n/adef underten(x):
74n/a return x<10
75n/a
76n/apicklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto))
77n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1)]
78n/a
79n/aclass TestBasicOps(unittest.TestCase):
80n/a
81n/a def pickletest(self, protocol, it, stop=4, take=1, compare=None):
82n/a """Test that an iterator is the same after pickling, also when part-consumed"""
83n/a def expand(it, i=0):
84n/a # Recursively expand iterables, within sensible bounds
85n/a if i > 10:
86n/a raise RuntimeError("infinite recursion encountered")
87n/a if isinstance(it, str):
88n/a return it
89n/a try:
90n/a l = list(islice(it, stop))
91n/a except TypeError:
92n/a return it # can't expand it
93n/a return [expand(e, i+1) for e in l]
94n/a
95n/a # Test the initial copy against the original
96n/a dump = pickle.dumps(it, protocol)
97n/a i2 = pickle.loads(dump)
98n/a self.assertEqual(type(it), type(i2))
99n/a a, b = expand(it), expand(i2)
100n/a self.assertEqual(a, b)
101n/a if compare:
102n/a c = expand(compare)
103n/a self.assertEqual(a, c)
104n/a
105n/a # Take from the copy, and create another copy and compare them.
106n/a i3 = pickle.loads(dump)
107n/a took = 0
108n/a try:
109n/a for i in range(take):
110n/a next(i3)
111n/a took += 1
112n/a except StopIteration:
113n/a pass #in case there is less data than 'take'
114n/a dump = pickle.dumps(i3, protocol)
115n/a i4 = pickle.loads(dump)
116n/a a, b = expand(i3), expand(i4)
117n/a self.assertEqual(a, b)
118n/a if compare:
119n/a c = expand(compare[took:])
120n/a self.assertEqual(a, c);
121n/a
122n/a def test_accumulate(self):
123n/a self.assertEqual(list(accumulate(range(10))), # one positional arg
124n/a [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])
125n/a self.assertEqual(list(accumulate(iterable=range(10))), # kw arg
126n/a [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])
127n/a for typ in int, complex, Decimal, Fraction: # multiple types
128n/a self.assertEqual(
129n/a list(accumulate(map(typ, range(10)))),
130n/a list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])))
131n/a self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric
132n/a self.assertEqual(list(accumulate([])), []) # empty iterable
133n/a self.assertEqual(list(accumulate([7])), [7]) # iterable of length one
134n/a self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args
135n/a self.assertRaises(TypeError, accumulate) # too few args
136n/a self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg
137n/a self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add
138n/a
139n/a s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6]
140n/a self.assertEqual(list(accumulate(s, min)),
141n/a [2, 2, 2, 2, 2, 0, 0, 0, 0, 0])
142n/a self.assertEqual(list(accumulate(s, max)),
143n/a [2, 8, 9, 9, 9, 9, 9, 9, 9, 9])
144n/a self.assertEqual(list(accumulate(s, operator.mul)),
145n/a [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0])
146n/a with self.assertRaises(TypeError):
147n/a list(accumulate(s, chr)) # unary-operation
148n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
149n/a self.pickletest(proto, accumulate(range(10))) # test pickling
150n/a
151n/a def test_chain(self):
152n/a
153n/a def chain2(*iterables):
154n/a 'Pure python version in the docs'
155n/a for it in iterables:
156n/a for element in it:
157n/a yield element
158n/a
159n/a for c in (chain, chain2):
160n/a self.assertEqual(list(c('abc', 'def')), list('abcdef'))
161n/a self.assertEqual(list(c('abc')), list('abc'))
162n/a self.assertEqual(list(c('')), [])
163n/a self.assertEqual(take(4, c('abc', 'def')), list('abcd'))
164n/a self.assertRaises(TypeError, list,c(2, 3))
165n/a
166n/a def test_chain_from_iterable(self):
167n/a self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef'))
168n/a self.assertEqual(list(chain.from_iterable(['abc'])), list('abc'))
169n/a self.assertEqual(list(chain.from_iterable([''])), [])
170n/a self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd'))
171n/a self.assertRaises(TypeError, list, chain.from_iterable([2, 3]))
172n/a
173n/a def test_chain_reducible(self):
174n/a for oper in [copy.deepcopy] + picklecopiers:
175n/a it = chain('abc', 'def')
176n/a self.assertEqual(list(oper(it)), list('abcdef'))
177n/a self.assertEqual(next(it), 'a')
178n/a self.assertEqual(list(oper(it)), list('bcdef'))
179n/a
180n/a self.assertEqual(list(oper(chain(''))), [])
181n/a self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd'))
182n/a self.assertRaises(TypeError, list, oper(chain(2, 3)))
183n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
184n/a self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef'))
185n/a
186n/a def test_chain_setstate(self):
187n/a self.assertRaises(TypeError, chain().__setstate__, ())
188n/a self.assertRaises(TypeError, chain().__setstate__, [])
189n/a self.assertRaises(TypeError, chain().__setstate__, 0)
190n/a self.assertRaises(TypeError, chain().__setstate__, ([],))
191n/a self.assertRaises(TypeError, chain().__setstate__, (iter([]), []))
192n/a it = chain()
193n/a it.__setstate__((iter(['abc', 'def']),))
194n/a self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f'])
195n/a it = chain()
196n/a it.__setstate__((iter(['abc', 'def']), iter(['ghi'])))
197n/a self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f'])
198n/a
199n/a def test_combinations(self):
200n/a self.assertRaises(TypeError, combinations, 'abc') # missing r argument
201n/a self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments
202n/a self.assertRaises(TypeError, combinations, None) # pool is not iterable
203n/a self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative
204n/a
205n/a for op in [lambda a:a] + picklecopiers:
206n/a self.assertEqual(list(op(combinations('abc', 32))), []) # r > n
207n/a
208n/a self.assertEqual(list(op(combinations('ABCD', 2))),
209n/a [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
210n/a testIntermediate = combinations('ABCD', 2)
211n/a next(testIntermediate)
212n/a self.assertEqual(list(op(testIntermediate)),
213n/a [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
214n/a
215n/a self.assertEqual(list(op(combinations(range(4), 3))),
216n/a [(0,1,2), (0,1,3), (0,2,3), (1,2,3)])
217n/a testIntermediate = combinations(range(4), 3)
218n/a next(testIntermediate)
219n/a self.assertEqual(list(op(testIntermediate)),
220n/a [(0,1,3), (0,2,3), (1,2,3)])
221n/a
222n/a
223n/a def combinations1(iterable, r):
224n/a 'Pure python version shown in the docs'
225n/a pool = tuple(iterable)
226n/a n = len(pool)
227n/a if r > n:
228n/a return
229n/a indices = list(range(r))
230n/a yield tuple(pool[i] for i in indices)
231n/a while 1:
232n/a for i in reversed(range(r)):
233n/a if indices[i] != i + n - r:
234n/a break
235n/a else:
236n/a return
237n/a indices[i] += 1
238n/a for j in range(i+1, r):
239n/a indices[j] = indices[j-1] + 1
240n/a yield tuple(pool[i] for i in indices)
241n/a
242n/a def combinations2(iterable, r):
243n/a 'Pure python version shown in the docs'
244n/a pool = tuple(iterable)
245n/a n = len(pool)
246n/a for indices in permutations(range(n), r):
247n/a if sorted(indices) == list(indices):
248n/a yield tuple(pool[i] for i in indices)
249n/a
250n/a def combinations3(iterable, r):
251n/a 'Pure python version from cwr()'
252n/a pool = tuple(iterable)
253n/a n = len(pool)
254n/a for indices in combinations_with_replacement(range(n), r):
255n/a if len(set(indices)) == r:
256n/a yield tuple(pool[i] for i in indices)
257n/a
258n/a for n in range(7):
259n/a values = [5*x-12 for x in range(n)]
260n/a for r in range(n+2):
261n/a result = list(combinations(values, r))
262n/a self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs
263n/a self.assertEqual(len(result), len(set(result))) # no repeats
264n/a self.assertEqual(result, sorted(result)) # lexicographic order
265n/a for c in result:
266n/a self.assertEqual(len(c), r) # r-length combinations
267n/a self.assertEqual(len(set(c)), r) # no duplicate elements
268n/a self.assertEqual(list(c), sorted(c)) # keep original ordering
269n/a self.assertTrue(all(e in values for e in c)) # elements taken from input iterable
270n/a self.assertEqual(list(c),
271n/a [e for e in values if e in c]) # comb is a subsequence of the input iterable
272n/a self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version
273n/a self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version
274n/a self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version
275n/a
276n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
277n/a self.pickletest(proto, combinations(values, r)) # test pickling
278n/a
279n/a @support.bigaddrspacetest
280n/a def test_combinations_overflow(self):
281n/a with self.assertRaises((OverflowError, MemoryError)):
282n/a combinations("AA", 2**29)
283n/a
284n/a # Test implementation detail: tuple re-use
285n/a @support.impl_detail("tuple reuse is specific to CPython")
286n/a def test_combinations_tuple_reuse(self):
287n/a self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1)
288n/a self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1)
289n/a
290n/a def test_combinations_with_replacement(self):
291n/a cwr = combinations_with_replacement
292n/a self.assertRaises(TypeError, cwr, 'abc') # missing r argument
293n/a self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments
294n/a self.assertRaises(TypeError, cwr, None) # pool is not iterable
295n/a self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative
296n/a
297n/a for op in [lambda a:a] + picklecopiers:
298n/a self.assertEqual(list(op(cwr('ABC', 2))),
299n/a [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
300n/a testIntermediate = cwr('ABC', 2)
301n/a next(testIntermediate)
302n/a self.assertEqual(list(op(testIntermediate)),
303n/a [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
304n/a
305n/a
306n/a def cwr1(iterable, r):
307n/a 'Pure python version shown in the docs'
308n/a # number items returned: (n+r-1)! / r! / (n-1)! when n>0
309n/a pool = tuple(iterable)
310n/a n = len(pool)
311n/a if not n and r:
312n/a return
313n/a indices = [0] * r
314n/a yield tuple(pool[i] for i in indices)
315n/a while 1:
316n/a for i in reversed(range(r)):
317n/a if indices[i] != n - 1:
318n/a break
319n/a else:
320n/a return
321n/a indices[i:] = [indices[i] + 1] * (r - i)
322n/a yield tuple(pool[i] for i in indices)
323n/a
324n/a def cwr2(iterable, r):
325n/a 'Pure python version shown in the docs'
326n/a pool = tuple(iterable)
327n/a n = len(pool)
328n/a for indices in product(range(n), repeat=r):
329n/a if sorted(indices) == list(indices):
330n/a yield tuple(pool[i] for i in indices)
331n/a
332n/a def numcombs(n, r):
333n/a if not n:
334n/a return 0 if r else 1
335n/a return fact(n+r-1) / fact(r)/ fact(n-1)
336n/a
337n/a for n in range(7):
338n/a values = [5*x-12 for x in range(n)]
339n/a for r in range(n+2):
340n/a result = list(cwr(values, r))
341n/a
342n/a self.assertEqual(len(result), numcombs(n, r)) # right number of combs
343n/a self.assertEqual(len(result), len(set(result))) # no repeats
344n/a self.assertEqual(result, sorted(result)) # lexicographic order
345n/a
346n/a regular_combs = list(combinations(values, r)) # compare to combs without replacement
347n/a if n == 0 or r <= 1:
348n/a self.assertEqual(result, regular_combs) # cases that should be identical
349n/a else:
350n/a self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs
351n/a
352n/a for c in result:
353n/a self.assertEqual(len(c), r) # r-length combinations
354n/a noruns = [k for k,v in groupby(c)] # combo without consecutive repeats
355n/a self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive
356n/a self.assertEqual(list(c), sorted(c)) # keep original ordering
357n/a self.assertTrue(all(e in values for e in c)) # elements taken from input iterable
358n/a self.assertEqual(noruns,
359n/a [e for e in values if e in c]) # comb is a subsequence of the input iterable
360n/a self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version
361n/a self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version
362n/a
363n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
364n/a self.pickletest(proto, cwr(values,r)) # test pickling
365n/a
366n/a @support.bigaddrspacetest
367n/a def test_combinations_with_replacement_overflow(self):
368n/a with self.assertRaises((OverflowError, MemoryError)):
369n/a combinations_with_replacement("AA", 2**30)
370n/a
371n/a # Test implementation detail: tuple re-use
372n/a @support.impl_detail("tuple reuse is specific to CPython")
373n/a def test_combinations_with_replacement_tuple_reuse(self):
374n/a cwr = combinations_with_replacement
375n/a self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1)
376n/a self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1)
377n/a
378n/a def test_permutations(self):
379n/a self.assertRaises(TypeError, permutations) # too few arguments
380n/a self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments
381n/a self.assertRaises(TypeError, permutations, None) # pool is not iterable
382n/a self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative
383n/a self.assertEqual(list(permutations('abc', 32)), []) # r > n
384n/a self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None
385n/a self.assertEqual(list(permutations(range(3), 2)),
386n/a [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)])
387n/a
388n/a def permutations1(iterable, r=None):
389n/a 'Pure python version shown in the docs'
390n/a pool = tuple(iterable)
391n/a n = len(pool)
392n/a r = n if r is None else r
393n/a if r > n:
394n/a return
395n/a indices = list(range(n))
396n/a cycles = list(range(n-r+1, n+1))[::-1]
397n/a yield tuple(pool[i] for i in indices[:r])
398n/a while n:
399n/a for i in reversed(range(r)):
400n/a cycles[i] -= 1
401n/a if cycles[i] == 0:
402n/a indices[i:] = indices[i+1:] + indices[i:i+1]
403n/a cycles[i] = n - i
404n/a else:
405n/a j = cycles[i]
406n/a indices[i], indices[-j] = indices[-j], indices[i]
407n/a yield tuple(pool[i] for i in indices[:r])
408n/a break
409n/a else:
410n/a return
411n/a
412n/a def permutations2(iterable, r=None):
413n/a 'Pure python version shown in the docs'
414n/a pool = tuple(iterable)
415n/a n = len(pool)
416n/a r = n if r is None else r
417n/a for indices in product(range(n), repeat=r):
418n/a if len(set(indices)) == r:
419n/a yield tuple(pool[i] for i in indices)
420n/a
421n/a for n in range(7):
422n/a values = [5*x-12 for x in range(n)]
423n/a for r in range(n+2):
424n/a result = list(permutations(values, r))
425n/a self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms
426n/a self.assertEqual(len(result), len(set(result))) # no repeats
427n/a self.assertEqual(result, sorted(result)) # lexicographic order
428n/a for p in result:
429n/a self.assertEqual(len(p), r) # r-length permutations
430n/a self.assertEqual(len(set(p)), r) # no duplicate elements
431n/a self.assertTrue(all(e in values for e in p)) # elements taken from input iterable
432n/a self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version
433n/a self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version
434n/a if r == n:
435n/a self.assertEqual(result, list(permutations(values, None))) # test r as None
436n/a self.assertEqual(result, list(permutations(values))) # test default r
437n/a
438n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
439n/a self.pickletest(proto, permutations(values, r)) # test pickling
440n/a
441n/a @support.bigaddrspacetest
442n/a def test_permutations_overflow(self):
443n/a with self.assertRaises((OverflowError, MemoryError)):
444n/a permutations("A", 2**30)
445n/a
446n/a @support.impl_detail("tuple reuse is specific to CPython")
447n/a def test_permutations_tuple_reuse(self):
448n/a self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1)
449n/a self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1)
450n/a
451n/a def test_combinatorics(self):
452n/a # Test relationships between product(), permutations(),
453n/a # combinations() and combinations_with_replacement().
454n/a
455n/a for n in range(6):
456n/a s = 'ABCDEFG'[:n]
457n/a for r in range(8):
458n/a prod = list(product(s, repeat=r))
459n/a cwr = list(combinations_with_replacement(s, r))
460n/a perm = list(permutations(s, r))
461n/a comb = list(combinations(s, r))
462n/a
463n/a # Check size
464n/a self.assertEqual(len(prod), n**r)
465n/a self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r))
466n/a self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r))
467n/a self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r))
468n/a
469n/a # Check lexicographic order without repeated tuples
470n/a self.assertEqual(prod, sorted(set(prod)))
471n/a self.assertEqual(cwr, sorted(set(cwr)))
472n/a self.assertEqual(perm, sorted(set(perm)))
473n/a self.assertEqual(comb, sorted(set(comb)))
474n/a
475n/a # Check interrelationships
476n/a self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted
477n/a self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups
478n/a self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted
479n/a self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups
480n/a self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr
481n/a self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm
482n/a self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm
483n/a
484n/a def test_compress(self):
485n/a self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF'))
486n/a self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF'))
487n/a self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list(''))
488n/a self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF'))
489n/a self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC'))
490n/a self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC'))
491n/a n = 10000
492n/a data = chain.from_iterable(repeat(range(6), n))
493n/a selectors = chain.from_iterable(repeat((0, 1)))
494n/a self.assertEqual(list(compress(data, selectors)), [1,3,5] * n)
495n/a self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable
496n/a self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable
497n/a self.assertRaises(TypeError, compress, range(6)) # too few args
498n/a self.assertRaises(TypeError, compress, range(6), None) # too many args
499n/a
500n/a # check copy, deepcopy, pickle
501n/a for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers:
502n/a for data, selectors, result1, result2 in [
503n/a ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'),
504n/a ('ABCDEF', [0,0,0,0,0,0], '', ''),
505n/a ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'),
506n/a ('ABCDEF', [1,0,1], 'AC', 'C'),
507n/a ('ABC', [0,1,1,1,1,1], 'BC', 'C'),
508n/a ]:
509n/a
510n/a self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1))
511n/a self.assertEqual(list(op(compress(data, selectors))), list(result1))
512n/a testIntermediate = compress(data, selectors)
513n/a if result1:
514n/a next(testIntermediate)
515n/a self.assertEqual(list(op(testIntermediate)), list(result2))
516n/a
517n/a
518n/a def test_count(self):
519n/a self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)])
520n/a self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)])
521n/a self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)])
522n/a self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)])
523n/a self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)])
524n/a self.assertRaises(TypeError, count, 2, 3, 4)
525n/a self.assertRaises(TypeError, count, 'a')
526n/a self.assertEqual(take(10, count(maxsize-5)),
527n/a list(range(maxsize-5, maxsize+5)))
528n/a self.assertEqual(take(10, count(-maxsize-5)),
529n/a list(range(-maxsize-5, -maxsize+5)))
530n/a self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25])
531n/a self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j])
532n/a self.assertEqual(take(3, count(Decimal('1.1'))),
533n/a [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')])
534n/a self.assertEqual(take(3, count(Fraction(2, 3))),
535n/a [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)])
536n/a BIGINT = 1<<1000
537n/a self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2])
538n/a c = count(3)
539n/a self.assertEqual(repr(c), 'count(3)')
540n/a next(c)
541n/a self.assertEqual(repr(c), 'count(4)')
542n/a c = count(-9)
543n/a self.assertEqual(repr(c), 'count(-9)')
544n/a next(c)
545n/a self.assertEqual(next(c), -8)
546n/a self.assertEqual(repr(count(10.25)), 'count(10.25)')
547n/a self.assertEqual(repr(count(10.0)), 'count(10.0)')
548n/a self.assertEqual(type(next(count(10.0))), float)
549n/a for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5):
550n/a # Test repr
551n/a r1 = repr(count(i))
552n/a r2 = 'count(%r)'.__mod__(i)
553n/a self.assertEqual(r1, r2)
554n/a
555n/a # check copy, deepcopy, pickle
556n/a for value in -3, 3, maxsize-5, maxsize+5:
557n/a c = count(value)
558n/a self.assertEqual(next(copy.copy(c)), value)
559n/a self.assertEqual(next(copy.deepcopy(c)), value)
560n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
561n/a self.pickletest(proto, count(value))
562n/a
563n/a #check proper internal error handling for large "step' sizes
564n/a count(1, maxsize+5); sys.exc_info()
565n/a
566n/a def test_count_with_stride(self):
567n/a self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)])
568n/a self.assertEqual(lzip('abc',count(start=2,step=3)),
569n/a [('a', 2), ('b', 5), ('c', 8)])
570n/a self.assertEqual(lzip('abc',count(step=-1)),
571n/a [('a', 0), ('b', -1), ('c', -2)])
572n/a self.assertRaises(TypeError, count, 'a', 'b')
573n/a self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)])
574n/a self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)])
575n/a self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)])
576n/a self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3)))
577n/a self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3)))
578n/a self.assertEqual(take(3, count(10, maxsize+5)),
579n/a list(range(10, 10+3*(maxsize+5), maxsize+5)))
580n/a self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5])
581n/a self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j])
582n/a self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))),
583n/a [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')])
584n/a self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))),
585n/a [Fraction(2,3), Fraction(17,21), Fraction(20,21)])
586n/a BIGINT = 1<<1000
587n/a self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT])
588n/a self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0]))
589n/a c = count(3, 5)
590n/a self.assertEqual(repr(c), 'count(3, 5)')
591n/a next(c)
592n/a self.assertEqual(repr(c), 'count(8, 5)')
593n/a c = count(-9, 0)
594n/a self.assertEqual(repr(c), 'count(-9, 0)')
595n/a next(c)
596n/a self.assertEqual(repr(c), 'count(-9, 0)')
597n/a c = count(-9, -3)
598n/a self.assertEqual(repr(c), 'count(-9, -3)')
599n/a next(c)
600n/a self.assertEqual(repr(c), 'count(-12, -3)')
601n/a self.assertEqual(repr(c), 'count(-12, -3)')
602n/a self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)')
603n/a self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int
604n/a self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0
605n/a self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)')
606n/a c = count(10, 1.0)
607n/a self.assertEqual(type(next(c)), int)
608n/a self.assertEqual(type(next(c)), float)
609n/a for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5):
610n/a for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5):
611n/a # Test repr
612n/a r1 = repr(count(i, j))
613n/a if j == 1:
614n/a r2 = ('count(%r)' % i)
615n/a else:
616n/a r2 = ('count(%r, %r)' % (i, j))
617n/a self.assertEqual(r1, r2)
618n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
619n/a self.pickletest(proto, count(i, j))
620n/a
621n/a def test_cycle(self):
622n/a self.assertEqual(take(10, cycle('abc')), list('abcabcabca'))
623n/a self.assertEqual(list(cycle('')), [])
624n/a self.assertRaises(TypeError, cycle)
625n/a self.assertRaises(TypeError, cycle, 5)
626n/a self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0])
627n/a
628n/a # check copy, deepcopy, pickle
629n/a c = cycle('abc')
630n/a self.assertEqual(next(c), 'a')
631n/a #simple copy currently not supported, because __reduce__ returns
632n/a #an internal iterator
633n/a #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab'))
634n/a self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab'))
635n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
636n/a self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))),
637n/a list('bcabcabcab'))
638n/a next(c)
639n/a self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))),
640n/a list('cabcabcabc'))
641n/a next(c)
642n/a next(c)
643n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
644n/a self.pickletest(proto, cycle('abc'))
645n/a
646n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
647n/a # test with partial consumed input iterable
648n/a it = iter('abcde')
649n/a c = cycle(it)
650n/a _ = [next(c) for i in range(2)] # consume 2 of 5 inputs
651n/a p = pickle.dumps(c, proto)
652n/a d = pickle.loads(p) # rebuild the cycle object
653n/a self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
654n/a
655n/a # test with completely consumed input iterable
656n/a it = iter('abcde')
657n/a c = cycle(it)
658n/a _ = [next(c) for i in range(7)] # consume 7 of 5 inputs
659n/a p = pickle.dumps(c, proto)
660n/a d = pickle.loads(p) # rebuild the cycle object
661n/a self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
662n/a
663n/a def test_cycle_setstate(self):
664n/a # Verify both modes for restoring state
665n/a
666n/a # Mode 0 is efficient. It uses an incompletely consumed input
667n/a # iterator to build a cycle object and then passes in state with
668n/a # a list of previously consumed values. There is no data
669n/a # overlap between the two.
670n/a c = cycle('defg')
671n/a c.__setstate__((list('abc'), 0))
672n/a self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
673n/a
674n/a # Mode 1 is inefficient. It starts with a cycle object built
675n/a # from an iterator over the remaining elements in a partial
676n/a # cycle and then passes in state with all of the previously
677n/a # seen values (this overlaps values included in the iterator).
678n/a c = cycle('defg')
679n/a c.__setstate__((list('abcdefg'), 1))
680n/a self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
681n/a
682n/a # The first argument to setstate needs to be a tuple
683n/a with self.assertRaises(TypeError):
684n/a cycle('defg').__setstate__([list('abcdefg'), 0])
685n/a
686n/a # The first argument in the setstate tuple must be a list
687n/a with self.assertRaises(TypeError):
688n/a c = cycle('defg')
689n/a c.__setstate__((tuple('defg'), 0))
690n/a take(20, c)
691n/a
692n/a # The second argument in the setstate tuple must be an int
693n/a with self.assertRaises(TypeError):
694n/a cycle('defg').__setstate__((list('abcdefg'), 'x'))
695n/a
696n/a self.assertRaises(TypeError, cycle('').__setstate__, ())
697n/a self.assertRaises(TypeError, cycle('').__setstate__, ([],))
698n/a
699n/a def test_groupby(self):
700n/a # Check whether it accepts arguments correctly
701n/a self.assertEqual([], list(groupby([])))
702n/a self.assertEqual([], list(groupby([], key=id)))
703n/a self.assertRaises(TypeError, list, groupby('abc', []))
704n/a self.assertRaises(TypeError, groupby, None)
705n/a self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10)
706n/a
707n/a # Check normal input
708n/a s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22),
709n/a (2,15,22), (3,16,23), (3,17,23)]
710n/a dup = []
711n/a for k, g in groupby(s, lambda r:r[0]):
712n/a for elem in g:
713n/a self.assertEqual(k, elem[0])
714n/a dup.append(elem)
715n/a self.assertEqual(s, dup)
716n/a
717n/a # Check normal pickled
718n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
719n/a dup = []
720n/a for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)):
721n/a for elem in g:
722n/a self.assertEqual(k, elem[0])
723n/a dup.append(elem)
724n/a self.assertEqual(s, dup)
725n/a
726n/a # Check nested case
727n/a dup = []
728n/a for k, g in groupby(s, testR):
729n/a for ik, ig in groupby(g, testR2):
730n/a for elem in ig:
731n/a self.assertEqual(k, elem[0])
732n/a self.assertEqual(ik, elem[2])
733n/a dup.append(elem)
734n/a self.assertEqual(s, dup)
735n/a
736n/a # Check nested and pickled
737n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
738n/a dup = []
739n/a for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)):
740n/a for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)):
741n/a for elem in ig:
742n/a self.assertEqual(k, elem[0])
743n/a self.assertEqual(ik, elem[2])
744n/a dup.append(elem)
745n/a self.assertEqual(s, dup)
746n/a
747n/a
748n/a # Check case where inner iterator is not used
749n/a keys = [k for k, g in groupby(s, testR)]
750n/a expectedkeys = set([r[0] for r in s])
751n/a self.assertEqual(set(keys), expectedkeys)
752n/a self.assertEqual(len(keys), len(expectedkeys))
753n/a
754n/a # Exercise pipes and filters style
755n/a s = 'abracadabra'
756n/a # sort s | uniq
757n/a r = [k for k, g in groupby(sorted(s))]
758n/a self.assertEqual(r, ['a', 'b', 'c', 'd', 'r'])
759n/a # sort s | uniq -d
760n/a r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))]
761n/a self.assertEqual(r, ['a', 'b', 'r'])
762n/a # sort s | uniq -c
763n/a r = [(len(list(g)), k) for k, g in groupby(sorted(s))]
764n/a self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')])
765n/a # sort s | uniq -c | sort -rn | head -3
766n/a r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3]
767n/a self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')])
768n/a
769n/a # iter.__next__ failure
770n/a class ExpectedError(Exception):
771n/a pass
772n/a def delayed_raise(n=0):
773n/a for i in range(n):
774n/a yield 'yo'
775n/a raise ExpectedError
776n/a def gulp(iterable, keyp=None, func=list):
777n/a return [func(g) for k, g in groupby(iterable, keyp)]
778n/a
779n/a # iter.__next__ failure on outer object
780n/a self.assertRaises(ExpectedError, gulp, delayed_raise(0))
781n/a # iter.__next__ failure on inner object
782n/a self.assertRaises(ExpectedError, gulp, delayed_raise(1))
783n/a
784n/a # __eq__ failure
785n/a class DummyCmp:
786n/a def __eq__(self, dst):
787n/a raise ExpectedError
788n/a s = [DummyCmp(), DummyCmp(), None]
789n/a
790n/a # __eq__ failure on outer object
791n/a self.assertRaises(ExpectedError, gulp, s, func=id)
792n/a # __eq__ failure on inner object
793n/a self.assertRaises(ExpectedError, gulp, s)
794n/a
795n/a # keyfunc failure
796n/a def keyfunc(obj):
797n/a if keyfunc.skip > 0:
798n/a keyfunc.skip -= 1
799n/a return obj
800n/a else:
801n/a raise ExpectedError
802n/a
803n/a # keyfunc failure on outer object
804n/a keyfunc.skip = 0
805n/a self.assertRaises(ExpectedError, gulp, [None], keyfunc)
806n/a keyfunc.skip = 1
807n/a self.assertRaises(ExpectedError, gulp, [None, None], keyfunc)
808n/a
809n/a def test_filter(self):
810n/a self.assertEqual(list(filter(isEven, range(6))), [0,2,4])
811n/a self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2])
812n/a self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2])
813n/a self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6])
814n/a self.assertRaises(TypeError, filter)
815n/a self.assertRaises(TypeError, filter, lambda x:x)
816n/a self.assertRaises(TypeError, filter, lambda x:x, range(6), 7)
817n/a self.assertRaises(TypeError, filter, isEven, 3)
818n/a self.assertRaises(TypeError, next, filter(range(6), range(6)))
819n/a
820n/a # check copy, deepcopy, pickle
821n/a ans = [0,2,4]
822n/a
823n/a c = filter(isEven, range(6))
824n/a self.assertEqual(list(copy.copy(c)), ans)
825n/a c = filter(isEven, range(6))
826n/a self.assertEqual(list(copy.deepcopy(c)), ans)
827n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
828n/a c = filter(isEven, range(6))
829n/a self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans)
830n/a next(c)
831n/a self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:])
832n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
833n/a c = filter(isEven, range(6))
834n/a self.pickletest(proto, c)
835n/a
836n/a def test_filterfalse(self):
837n/a self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5])
838n/a self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0])
839n/a self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0])
840n/a self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7])
841n/a self.assertRaises(TypeError, filterfalse)
842n/a self.assertRaises(TypeError, filterfalse, lambda x:x)
843n/a self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7)
844n/a self.assertRaises(TypeError, filterfalse, isEven, 3)
845n/a self.assertRaises(TypeError, next, filterfalse(range(6), range(6)))
846n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
847n/a self.pickletest(proto, filterfalse(isEven, range(6)))
848n/a
849n/a def test_zip(self):
850n/a # XXX This is rather silly now that builtin zip() calls zip()...
851n/a ans = [(x,y) for x, y in zip('abc',count())]
852n/a self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
853n/a self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6)))
854n/a self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3)))
855n/a self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3)))
856n/a self.assertEqual(list(zip('abcdef')), lzip('abcdef'))
857n/a self.assertEqual(list(zip()), lzip())
858n/a self.assertRaises(TypeError, zip, 3)
859n/a self.assertRaises(TypeError, zip, range(3), 3)
860n/a self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')],
861n/a lzip('abc', 'def'))
862n/a self.assertEqual([pair for pair in zip('abc', 'def')],
863n/a lzip('abc', 'def'))
864n/a
865n/a @support.impl_detail("tuple reuse is specific to CPython")
866n/a def test_zip_tuple_reuse(self):
867n/a ids = list(map(id, zip('abc', 'def')))
868n/a self.assertEqual(min(ids), max(ids))
869n/a ids = list(map(id, list(zip('abc', 'def'))))
870n/a self.assertEqual(len(dict.fromkeys(ids)), len(ids))
871n/a
872n/a # check copy, deepcopy, pickle
873n/a ans = [(x,y) for x, y in copy.copy(zip('abc',count()))]
874n/a self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
875n/a
876n/a ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))]
877n/a self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
878n/a
879n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
880n/a ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))]
881n/a self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
882n/a
883n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
884n/a testIntermediate = zip('abc',count())
885n/a next(testIntermediate)
886n/a ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))]
887n/a self.assertEqual(ans, [('b', 1), ('c', 2)])
888n/a
889n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
890n/a self.pickletest(proto, zip('abc', count()))
891n/a
892n/a def test_ziplongest(self):
893n/a for args in [
894n/a ['abc', range(6)],
895n/a [range(6), 'abc'],
896n/a [range(1000), range(2000,2100), range(3000,3050)],
897n/a [range(1000), range(0), range(3000,3050), range(1200), range(1500)],
898n/a [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)],
899n/a ]:
900n/a target = [tuple([arg[i] if i < len(arg) else None for arg in args])
901n/a for i in range(max(map(len, args)))]
902n/a self.assertEqual(list(zip_longest(*args)), target)
903n/a self.assertEqual(list(zip_longest(*args, **{})), target)
904n/a target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X'
905n/a self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target)
906n/a
907n/a self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input
908n/a
909n/a self.assertEqual(list(zip_longest()), list(zip()))
910n/a self.assertEqual(list(zip_longest([])), list(zip([])))
911n/a self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef')))
912n/a
913n/a self.assertEqual(list(zip_longest('abc', 'defg', **{})),
914n/a list(zip(list('abc')+[None], 'defg'))) # empty keyword dict
915n/a self.assertRaises(TypeError, zip_longest, 3)
916n/a self.assertRaises(TypeError, zip_longest, range(3), 3)
917n/a
918n/a for stmt in [
919n/a "zip_longest('abc', fv=1)",
920n/a "zip_longest('abc', fillvalue=1, bogus_keyword=None)",
921n/a ]:
922n/a try:
923n/a eval(stmt, globals(), locals())
924n/a except TypeError:
925n/a pass
926n/a else:
927n/a self.fail('Did not raise Type in: ' + stmt)
928n/a
929n/a self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')],
930n/a list(zip('abc', 'def')))
931n/a self.assertEqual([pair for pair in zip_longest('abc', 'def')],
932n/a list(zip('abc', 'def')))
933n/a
934n/a @support.impl_detail("tuple reuse is specific to CPython")
935n/a def test_zip_longest_tuple_reuse(self):
936n/a ids = list(map(id, zip_longest('abc', 'def')))
937n/a self.assertEqual(min(ids), max(ids))
938n/a ids = list(map(id, list(zip_longest('abc', 'def'))))
939n/a self.assertEqual(len(dict.fromkeys(ids)), len(ids))
940n/a
941n/a def test_zip_longest_pickling(self):
942n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
943n/a self.pickletest(proto, zip_longest("abc", "def"))
944n/a self.pickletest(proto, zip_longest("abc", "defgh"))
945n/a self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1))
946n/a self.pickletest(proto, zip_longest("", "defgh"))
947n/a
948n/a def test_bug_7244(self):
949n/a
950n/a class Repeater:
951n/a # this class is similar to itertools.repeat
952n/a def __init__(self, o, t, e):
953n/a self.o = o
954n/a self.t = int(t)
955n/a self.e = e
956n/a def __iter__(self): # its iterator is itself
957n/a return self
958n/a def __next__(self):
959n/a if self.t > 0:
960n/a self.t -= 1
961n/a return self.o
962n/a else:
963n/a raise self.e
964n/a
965n/a # Formerly this code in would fail in debug mode
966n/a # with Undetected Error and Stop Iteration
967n/a r1 = Repeater(1, 3, StopIteration)
968n/a r2 = Repeater(2, 4, StopIteration)
969n/a def run(r1, r2):
970n/a result = []
971n/a for i, j in zip_longest(r1, r2, fillvalue=0):
972n/a with support.captured_output('stdout'):
973n/a print((i, j))
974n/a result.append((i, j))
975n/a return result
976n/a self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)])
977n/a
978n/a # Formerly, the RuntimeError would be lost
979n/a # and StopIteration would stop as expected
980n/a r1 = Repeater(1, 3, RuntimeError)
981n/a r2 = Repeater(2, 4, StopIteration)
982n/a it = zip_longest(r1, r2, fillvalue=0)
983n/a self.assertEqual(next(it), (1, 2))
984n/a self.assertEqual(next(it), (1, 2))
985n/a self.assertEqual(next(it), (1, 2))
986n/a self.assertRaises(RuntimeError, next, it)
987n/a
988n/a def test_product(self):
989n/a for args, result in [
990n/a ([], [()]), # zero iterables
991n/a (['ab'], [('a',), ('b',)]), # one iterable
992n/a ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables
993n/a ([range(0), range(2), range(3)], []), # first iterable with zero length
994n/a ([range(2), range(0), range(3)], []), # middle iterable with zero length
995n/a ([range(2), range(3), range(0)], []), # last iterable with zero length
996n/a ]:
997n/a self.assertEqual(list(product(*args)), result)
998n/a for r in range(4):
999n/a self.assertEqual(list(product(*(args*r))),
1000n/a list(product(*args, **dict(repeat=r))))
1001n/a self.assertEqual(len(list(product(*[range(7)]*6))), 7**6)
1002n/a self.assertRaises(TypeError, product, range(6), None)
1003n/a
1004n/a def product1(*args, **kwds):
1005n/a pools = list(map(tuple, args)) * kwds.get('repeat', 1)
1006n/a n = len(pools)
1007n/a if n == 0:
1008n/a yield ()
1009n/a return
1010n/a if any(len(pool) == 0 for pool in pools):
1011n/a return
1012n/a indices = [0] * n
1013n/a yield tuple(pool[i] for pool, i in zip(pools, indices))
1014n/a while 1:
1015n/a for i in reversed(range(n)): # right to left
1016n/a if indices[i] == len(pools[i]) - 1:
1017n/a continue
1018n/a indices[i] += 1
1019n/a for j in range(i+1, n):
1020n/a indices[j] = 0
1021n/a yield tuple(pool[i] for pool, i in zip(pools, indices))
1022n/a break
1023n/a else:
1024n/a return
1025n/a
1026n/a def product2(*args, **kwds):
1027n/a 'Pure python version used in docs'
1028n/a pools = list(map(tuple, args)) * kwds.get('repeat', 1)
1029n/a result = [[]]
1030n/a for pool in pools:
1031n/a result = [x+[y] for x in result for y in pool]
1032n/a for prod in result:
1033n/a yield tuple(prod)
1034n/a
1035n/a argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3),
1036n/a set('abcdefg'), range(11), tuple(range(13))]
1037n/a for i in range(100):
1038n/a args = [random.choice(argtypes) for j in range(random.randrange(5))]
1039n/a expected_len = prod(map(len, args))
1040n/a self.assertEqual(len(list(product(*args))), expected_len)
1041n/a self.assertEqual(list(product(*args)), list(product1(*args)))
1042n/a self.assertEqual(list(product(*args)), list(product2(*args)))
1043n/a args = map(iter, args)
1044n/a self.assertEqual(len(list(product(*args))), expected_len)
1045n/a
1046n/a @support.bigaddrspacetest
1047n/a def test_product_overflow(self):
1048n/a with self.assertRaises((OverflowError, MemoryError)):
1049n/a product(*(['ab']*2**5), repeat=2**25)
1050n/a
1051n/a @support.impl_detail("tuple reuse is specific to CPython")
1052n/a def test_product_tuple_reuse(self):
1053n/a self.assertEqual(len(set(map(id, product('abc', 'def')))), 1)
1054n/a self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1)
1055n/a
1056n/a def test_product_pickling(self):
1057n/a # check copy, deepcopy, pickle
1058n/a for args, result in [
1059n/a ([], [()]), # zero iterables
1060n/a (['ab'], [('a',), ('b',)]), # one iterable
1061n/a ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables
1062n/a ([range(0), range(2), range(3)], []), # first iterable with zero length
1063n/a ([range(2), range(0), range(3)], []), # middle iterable with zero length
1064n/a ([range(2), range(3), range(0)], []), # last iterable with zero length
1065n/a ]:
1066n/a self.assertEqual(list(copy.copy(product(*args))), result)
1067n/a self.assertEqual(list(copy.deepcopy(product(*args))), result)
1068n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1069n/a self.pickletest(proto, product(*args))
1070n/a
1071n/a def test_product_issue_25021(self):
1072n/a # test that indices are properly clamped to the length of the tuples
1073n/a p = product((1, 2),(3,))
1074n/a p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped
1075n/a self.assertEqual(next(p), (2, 3))
1076n/a # test that empty tuple in the list will result in an immediate StopIteration
1077n/a p = product((1, 2), (), (3,))
1078n/a p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped
1079n/a self.assertRaises(StopIteration, next, p)
1080n/a
1081n/a def test_repeat(self):
1082n/a self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a'])
1083n/a self.assertEqual(lzip(range(3),repeat('a')),
1084n/a [(0, 'a'), (1, 'a'), (2, 'a')])
1085n/a self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a'])
1086n/a self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a'])
1087n/a self.assertEqual(list(repeat('a', 0)), [])
1088n/a self.assertEqual(list(repeat('a', -3)), [])
1089n/a self.assertRaises(TypeError, repeat)
1090n/a self.assertRaises(TypeError, repeat, None, 3, 4)
1091n/a self.assertRaises(TypeError, repeat, None, 'a')
1092n/a r = repeat(1+0j)
1093n/a self.assertEqual(repr(r), 'repeat((1+0j))')
1094n/a r = repeat(1+0j, 5)
1095n/a self.assertEqual(repr(r), 'repeat((1+0j), 5)')
1096n/a list(r)
1097n/a self.assertEqual(repr(r), 'repeat((1+0j), 0)')
1098n/a
1099n/a # check copy, deepcopy, pickle
1100n/a c = repeat(object='a', times=10)
1101n/a self.assertEqual(next(c), 'a')
1102n/a self.assertEqual(take(2, copy.copy(c)), list('a' * 2))
1103n/a self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2))
1104n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1105n/a self.pickletest(proto, repeat(object='a', times=10))
1106n/a
1107n/a def test_repeat_with_negative_times(self):
1108n/a self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)")
1109n/a self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)")
1110n/a self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)")
1111n/a self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)")
1112n/a
1113n/a def test_map(self):
1114n/a self.assertEqual(list(map(operator.pow, range(3), range(1,7))),
1115n/a [0**1, 1**2, 2**3])
1116n/a self.assertEqual(list(map(tupleize, 'abc', range(5))),
1117n/a [('a',0),('b',1),('c',2)])
1118n/a self.assertEqual(list(map(tupleize, 'abc', count())),
1119n/a [('a',0),('b',1),('c',2)])
1120n/a self.assertEqual(take(2,map(tupleize, 'abc', count())),
1121n/a [('a',0),('b',1)])
1122n/a self.assertEqual(list(map(operator.pow, [])), [])
1123n/a self.assertRaises(TypeError, map)
1124n/a self.assertRaises(TypeError, list, map(None, range(3), range(3)))
1125n/a self.assertRaises(TypeError, map, operator.neg)
1126n/a self.assertRaises(TypeError, next, map(10, range(5)))
1127n/a self.assertRaises(ValueError, next, map(errfunc, [4], [5]))
1128n/a self.assertRaises(TypeError, next, map(onearg, [4], [5]))
1129n/a
1130n/a # check copy, deepcopy, pickle
1131n/a ans = [('a',0),('b',1),('c',2)]
1132n/a
1133n/a c = map(tupleize, 'abc', count())
1134n/a self.assertEqual(list(copy.copy(c)), ans)
1135n/a
1136n/a c = map(tupleize, 'abc', count())
1137n/a self.assertEqual(list(copy.deepcopy(c)), ans)
1138n/a
1139n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1140n/a c = map(tupleize, 'abc', count())
1141n/a self.pickletest(proto, c)
1142n/a
1143n/a def test_starmap(self):
1144n/a self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))),
1145n/a [0**1, 1**2, 2**3])
1146n/a self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))),
1147n/a [0**1, 1**2, 2**3])
1148n/a self.assertEqual(list(starmap(operator.pow, [])), [])
1149n/a self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5])
1150n/a self.assertRaises(TypeError, list, starmap(operator.pow, [None]))
1151n/a self.assertRaises(TypeError, starmap)
1152n/a self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra')
1153n/a self.assertRaises(TypeError, next, starmap(10, [(4,5)]))
1154n/a self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)]))
1155n/a self.assertRaises(TypeError, next, starmap(onearg, [(4,5)]))
1156n/a
1157n/a # check copy, deepcopy, pickle
1158n/a ans = [0**1, 1**2, 2**3]
1159n/a
1160n/a c = starmap(operator.pow, zip(range(3), range(1,7)))
1161n/a self.assertEqual(list(copy.copy(c)), ans)
1162n/a
1163n/a c = starmap(operator.pow, zip(range(3), range(1,7)))
1164n/a self.assertEqual(list(copy.deepcopy(c)), ans)
1165n/a
1166n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1167n/a c = starmap(operator.pow, zip(range(3), range(1,7)))
1168n/a self.pickletest(proto, c)
1169n/a
1170n/a def test_islice(self):
1171n/a for args in [ # islice(args) should agree with range(args)
1172n/a (10, 20, 3),
1173n/a (10, 3, 20),
1174n/a (10, 20),
1175n/a (10, 3),
1176n/a (20,)
1177n/a ]:
1178n/a self.assertEqual(list(islice(range(100), *args)),
1179n/a list(range(*args)))
1180n/a
1181n/a for args, tgtargs in [ # Stop when seqn is exhausted
1182n/a ((10, 110, 3), ((10, 100, 3))),
1183n/a ((10, 110), ((10, 100))),
1184n/a ((110,), (100,))
1185n/a ]:
1186n/a self.assertEqual(list(islice(range(100), *args)),
1187n/a list(range(*tgtargs)))
1188n/a
1189n/a # Test stop=None
1190n/a self.assertEqual(list(islice(range(10), None)), list(range(10)))
1191n/a self.assertEqual(list(islice(range(10), None, None)), list(range(10)))
1192n/a self.assertEqual(list(islice(range(10), None, None, None)), list(range(10)))
1193n/a self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10)))
1194n/a self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2)))
1195n/a
1196n/a # Test number of items consumed SF #1171417
1197n/a it = iter(range(10))
1198n/a self.assertEqual(list(islice(it, 3)), list(range(3)))
1199n/a self.assertEqual(list(it), list(range(3, 10)))
1200n/a
1201n/a # Test invalid arguments
1202n/a ra = range(10)
1203n/a self.assertRaises(TypeError, islice, ra)
1204n/a self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4)
1205n/a self.assertRaises(ValueError, islice, ra, -5, 10, 1)
1206n/a self.assertRaises(ValueError, islice, ra, 1, -5, -1)
1207n/a self.assertRaises(ValueError, islice, ra, 1, 10, -1)
1208n/a self.assertRaises(ValueError, islice, ra, 1, 10, 0)
1209n/a self.assertRaises(ValueError, islice, ra, 'a')
1210n/a self.assertRaises(ValueError, islice, ra, 'a', 1)
1211n/a self.assertRaises(ValueError, islice, ra, 1, 'a')
1212n/a self.assertRaises(ValueError, islice, ra, 'a', 1, 1)
1213n/a self.assertRaises(ValueError, islice, ra, 1, 'a', 1)
1214n/a self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1)
1215n/a
1216n/a # Issue #10323: Less islice in a predictable state
1217n/a c = count()
1218n/a self.assertEqual(list(islice(c, 1, 3, 50)), [1])
1219n/a self.assertEqual(next(c), 3)
1220n/a
1221n/a # check copy, deepcopy, pickle
1222n/a for args in [ # islice(args) should agree with range(args)
1223n/a (10, 20, 3),
1224n/a (10, 3, 20),
1225n/a (10, 20),
1226n/a (10, 3),
1227n/a (20,)
1228n/a ]:
1229n/a self.assertEqual(list(copy.copy(islice(range(100), *args))),
1230n/a list(range(*args)))
1231n/a self.assertEqual(list(copy.deepcopy(islice(range(100), *args))),
1232n/a list(range(*args)))
1233n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1234n/a self.pickletest(proto, islice(range(100), *args))
1235n/a
1236n/a # Issue #21321: check source iterator is not referenced
1237n/a # from islice() after the latter has been exhausted
1238n/a it = (x for x in (1, 2))
1239n/a wr = weakref.ref(it)
1240n/a it = islice(it, 1)
1241n/a self.assertIsNotNone(wr())
1242n/a list(it) # exhaust the iterator
1243n/a support.gc_collect()
1244n/a self.assertIsNone(wr())
1245n/a
1246n/a def test_takewhile(self):
1247n/a data = [1, 3, 5, 20, 2, 4, 6, 8]
1248n/a self.assertEqual(list(takewhile(underten, data)), [1, 3, 5])
1249n/a self.assertEqual(list(takewhile(underten, [])), [])
1250n/a self.assertRaises(TypeError, takewhile)
1251n/a self.assertRaises(TypeError, takewhile, operator.pow)
1252n/a self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra')
1253n/a self.assertRaises(TypeError, next, takewhile(10, [(4,5)]))
1254n/a self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)]))
1255n/a t = takewhile(bool, [1, 1, 1, 0, 0, 0])
1256n/a self.assertEqual(list(t), [1, 1, 1])
1257n/a self.assertRaises(StopIteration, next, t)
1258n/a
1259n/a # check copy, deepcopy, pickle
1260n/a self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5])
1261n/a self.assertEqual(list(copy.deepcopy(takewhile(underten, data))),
1262n/a [1, 3, 5])
1263n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1264n/a self.pickletest(proto, takewhile(underten, data))
1265n/a
1266n/a def test_dropwhile(self):
1267n/a data = [1, 3, 5, 20, 2, 4, 6, 8]
1268n/a self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8])
1269n/a self.assertEqual(list(dropwhile(underten, [])), [])
1270n/a self.assertRaises(TypeError, dropwhile)
1271n/a self.assertRaises(TypeError, dropwhile, operator.pow)
1272n/a self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra')
1273n/a self.assertRaises(TypeError, next, dropwhile(10, [(4,5)]))
1274n/a self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)]))
1275n/a
1276n/a # check copy, deepcopy, pickle
1277n/a self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8])
1278n/a self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))),
1279n/a [20, 2, 4, 6, 8])
1280n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1281n/a self.pickletest(proto, dropwhile(underten, data))
1282n/a
1283n/a def test_tee(self):
1284n/a n = 200
1285n/a
1286n/a a, b = tee([]) # test empty iterator
1287n/a self.assertEqual(list(a), [])
1288n/a self.assertEqual(list(b), [])
1289n/a
1290n/a a, b = tee(irange(n)) # test 100% interleaved
1291n/a self.assertEqual(lzip(a,b), lzip(range(n), range(n)))
1292n/a
1293n/a a, b = tee(irange(n)) # test 0% interleaved
1294n/a self.assertEqual(list(a), list(range(n)))
1295n/a self.assertEqual(list(b), list(range(n)))
1296n/a
1297n/a a, b = tee(irange(n)) # test dealloc of leading iterator
1298n/a for i in range(100):
1299n/a self.assertEqual(next(a), i)
1300n/a del a
1301n/a self.assertEqual(list(b), list(range(n)))
1302n/a
1303n/a a, b = tee(irange(n)) # test dealloc of trailing iterator
1304n/a for i in range(100):
1305n/a self.assertEqual(next(a), i)
1306n/a del b
1307n/a self.assertEqual(list(a), list(range(100, n)))
1308n/a
1309n/a for j in range(5): # test randomly interleaved
1310n/a order = [0]*n + [1]*n
1311n/a random.shuffle(order)
1312n/a lists = ([], [])
1313n/a its = tee(irange(n))
1314n/a for i in order:
1315n/a value = next(its[i])
1316n/a lists[i].append(value)
1317n/a self.assertEqual(lists[0], list(range(n)))
1318n/a self.assertEqual(lists[1], list(range(n)))
1319n/a
1320n/a # test argument format checking
1321n/a self.assertRaises(TypeError, tee)
1322n/a self.assertRaises(TypeError, tee, 3)
1323n/a self.assertRaises(TypeError, tee, [1,2], 'x')
1324n/a self.assertRaises(TypeError, tee, [1,2], 3, 'x')
1325n/a
1326n/a # tee object should be instantiable
1327n/a a, b = tee('abc')
1328n/a c = type(a)('def')
1329n/a self.assertEqual(list(c), list('def'))
1330n/a
1331n/a # test long-lagged and multi-way split
1332n/a a, b, c = tee(range(2000), 3)
1333n/a for i in range(100):
1334n/a self.assertEqual(next(a), i)
1335n/a self.assertEqual(list(b), list(range(2000)))
1336n/a self.assertEqual([next(c), next(c)], list(range(2)))
1337n/a self.assertEqual(list(a), list(range(100,2000)))
1338n/a self.assertEqual(list(c), list(range(2,2000)))
1339n/a
1340n/a # test values of n
1341n/a self.assertRaises(TypeError, tee, 'abc', 'invalid')
1342n/a self.assertRaises(ValueError, tee, [], -1)
1343n/a for n in range(5):
1344n/a result = tee('abc', n)
1345n/a self.assertEqual(type(result), tuple)
1346n/a self.assertEqual(len(result), n)
1347n/a self.assertEqual([list(x) for x in result], [list('abc')]*n)
1348n/a
1349n/a # tee pass-through to copyable iterator
1350n/a a, b = tee('abc')
1351n/a c, d = tee(a)
1352n/a self.assertTrue(a is c)
1353n/a
1354n/a # test tee_new
1355n/a t1, t2 = tee('abc')
1356n/a tnew = type(t1)
1357n/a self.assertRaises(TypeError, tnew)
1358n/a self.assertRaises(TypeError, tnew, 10)
1359n/a t3 = tnew(t1)
1360n/a self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc'))
1361n/a
1362n/a # test that tee objects are weak referencable
1363n/a a, b = tee(range(10))
1364n/a p = weakref.proxy(a)
1365n/a self.assertEqual(getattr(p, '__class__'), type(b))
1366n/a del a
1367n/a self.assertRaises(ReferenceError, getattr, p, '__class__')
1368n/a
1369n/a ans = list('abc')
1370n/a long_ans = list(range(10000))
1371n/a
1372n/a # check copy
1373n/a a, b = tee('abc')
1374n/a self.assertEqual(list(copy.copy(a)), ans)
1375n/a self.assertEqual(list(copy.copy(b)), ans)
1376n/a a, b = tee(list(range(10000)))
1377n/a self.assertEqual(list(copy.copy(a)), long_ans)
1378n/a self.assertEqual(list(copy.copy(b)), long_ans)
1379n/a
1380n/a # check partially consumed copy
1381n/a a, b = tee('abc')
1382n/a take(2, a)
1383n/a take(1, b)
1384n/a self.assertEqual(list(copy.copy(a)), ans[2:])
1385n/a self.assertEqual(list(copy.copy(b)), ans[1:])
1386n/a self.assertEqual(list(a), ans[2:])
1387n/a self.assertEqual(list(b), ans[1:])
1388n/a a, b = tee(range(10000))
1389n/a take(100, a)
1390n/a take(60, b)
1391n/a self.assertEqual(list(copy.copy(a)), long_ans[100:])
1392n/a self.assertEqual(list(copy.copy(b)), long_ans[60:])
1393n/a self.assertEqual(list(a), long_ans[100:])
1394n/a self.assertEqual(list(b), long_ans[60:])
1395n/a
1396n/a # check deepcopy
1397n/a a, b = tee('abc')
1398n/a self.assertEqual(list(copy.deepcopy(a)), ans)
1399n/a self.assertEqual(list(copy.deepcopy(b)), ans)
1400n/a self.assertEqual(list(a), ans)
1401n/a self.assertEqual(list(b), ans)
1402n/a a, b = tee(range(10000))
1403n/a self.assertEqual(list(copy.deepcopy(a)), long_ans)
1404n/a self.assertEqual(list(copy.deepcopy(b)), long_ans)
1405n/a self.assertEqual(list(a), long_ans)
1406n/a self.assertEqual(list(b), long_ans)
1407n/a
1408n/a # check partially consumed deepcopy
1409n/a a, b = tee('abc')
1410n/a take(2, a)
1411n/a take(1, b)
1412n/a self.assertEqual(list(copy.deepcopy(a)), ans[2:])
1413n/a self.assertEqual(list(copy.deepcopy(b)), ans[1:])
1414n/a self.assertEqual(list(a), ans[2:])
1415n/a self.assertEqual(list(b), ans[1:])
1416n/a a, b = tee(range(10000))
1417n/a take(100, a)
1418n/a take(60, b)
1419n/a self.assertEqual(list(copy.deepcopy(a)), long_ans[100:])
1420n/a self.assertEqual(list(copy.deepcopy(b)), long_ans[60:])
1421n/a self.assertEqual(list(a), long_ans[100:])
1422n/a self.assertEqual(list(b), long_ans[60:])
1423n/a
1424n/a # check pickle
1425n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1426n/a self.pickletest(proto, iter(tee('abc')))
1427n/a a, b = tee('abc')
1428n/a self.pickletest(proto, a, compare=ans)
1429n/a self.pickletest(proto, b, compare=ans)
1430n/a
1431n/a # Issue 13454: Crash when deleting backward iterator from tee()
1432n/a def test_tee_del_backward(self):
1433n/a forward, backward = tee(repeat(None, 20000000))
1434n/a try:
1435n/a any(forward) # exhaust the iterator
1436n/a del backward
1437n/a except:
1438n/a del forward, backward
1439n/a raise
1440n/a
1441n/a def test_StopIteration(self):
1442n/a self.assertRaises(StopIteration, next, zip())
1443n/a
1444n/a for f in (chain, cycle, zip, groupby):
1445n/a self.assertRaises(StopIteration, next, f([]))
1446n/a self.assertRaises(StopIteration, next, f(StopNow()))
1447n/a
1448n/a self.assertRaises(StopIteration, next, islice([], None))
1449n/a self.assertRaises(StopIteration, next, islice(StopNow(), None))
1450n/a
1451n/a p, q = tee([])
1452n/a self.assertRaises(StopIteration, next, p)
1453n/a self.assertRaises(StopIteration, next, q)
1454n/a p, q = tee(StopNow())
1455n/a self.assertRaises(StopIteration, next, p)
1456n/a self.assertRaises(StopIteration, next, q)
1457n/a
1458n/a self.assertRaises(StopIteration, next, repeat(None, 0))
1459n/a
1460n/a for f in (filter, filterfalse, map, takewhile, dropwhile, starmap):
1461n/a self.assertRaises(StopIteration, next, f(lambda x:x, []))
1462n/a self.assertRaises(StopIteration, next, f(lambda x:x, StopNow()))
1463n/a
1464n/aclass TestExamples(unittest.TestCase):
1465n/a
1466n/a def test_accumulate(self):
1467n/a self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15])
1468n/a
1469n/a def test_accumulate_reducible(self):
1470n/a # check copy, deepcopy, pickle
1471n/a data = [1, 2, 3, 4, 5]
1472n/a accumulated = [1, 3, 6, 10, 15]
1473n/a
1474n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1475n/a it = accumulate(data)
1476n/a self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:])
1477n/a self.assertEqual(next(it), 1)
1478n/a self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:])
1479n/a it = accumulate(data)
1480n/a self.assertEqual(next(it), 1)
1481n/a self.assertEqual(list(copy.deepcopy(it)), accumulated[1:])
1482n/a self.assertEqual(list(copy.copy(it)), accumulated[1:])
1483n/a
1484n/a def test_accumulate_reducible_none(self):
1485n/a # Issue #25718: total is None
1486n/a it = accumulate([None, None, None], operator.is_)
1487n/a self.assertEqual(next(it), None)
1488n/a for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1489n/a it_copy = pickle.loads(pickle.dumps(it, proto))
1490n/a self.assertEqual(list(it_copy), [True, False])
1491n/a self.assertEqual(list(copy.deepcopy(it)), [True, False])
1492n/a self.assertEqual(list(copy.copy(it)), [True, False])
1493n/a
1494n/a def test_chain(self):
1495n/a self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF')
1496n/a
1497n/a def test_chain_from_iterable(self):
1498n/a self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF')
1499n/a
1500n/a def test_combinations(self):
1501n/a self.assertEqual(list(combinations('ABCD', 2)),
1502n/a [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
1503n/a self.assertEqual(list(combinations(range(4), 3)),
1504n/a [(0,1,2), (0,1,3), (0,2,3), (1,2,3)])
1505n/a
1506n/a def test_combinations_with_replacement(self):
1507n/a self.assertEqual(list(combinations_with_replacement('ABC', 2)),
1508n/a [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
1509n/a
1510n/a def test_compress(self):
1511n/a self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF'))
1512n/a
1513n/a def test_count(self):
1514n/a self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14])
1515n/a
1516n/a def test_cycle(self):
1517n/a self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD'))
1518n/a
1519n/a def test_dropwhile(self):
1520n/a self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1])
1521n/a
1522n/a def test_groupby(self):
1523n/a self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')],
1524n/a list('ABCDAB'))
1525n/a self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')],
1526n/a [list('AAAA'), list('BBB'), list('CC'), list('D')])
1527n/a
1528n/a def test_filter(self):
1529n/a self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9])
1530n/a
1531n/a def test_filterfalse(self):
1532n/a self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8])
1533n/a
1534n/a def test_map(self):
1535n/a self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000])
1536n/a
1537n/a def test_islice(self):
1538n/a self.assertEqual(list(islice('ABCDEFG', 2)), list('AB'))
1539n/a self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD'))
1540n/a self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG'))
1541n/a self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG'))
1542n/a
1543n/a def test_zip(self):
1544n/a self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')])
1545n/a
1546n/a def test_zip_longest(self):
1547n/a self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')),
1548n/a [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')])
1549n/a
1550n/a def test_permutations(self):
1551n/a self.assertEqual(list(permutations('ABCD', 2)),
1552n/a list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split())))
1553n/a self.assertEqual(list(permutations(range(3))),
1554n/a [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)])
1555n/a
1556n/a def test_product(self):
1557n/a self.assertEqual(list(product('ABCD', 'xy')),
1558n/a list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split())))
1559n/a self.assertEqual(list(product(range(2), repeat=3)),
1560n/a [(0,0,0), (0,0,1), (0,1,0), (0,1,1),
1561n/a (1,0,0), (1,0,1), (1,1,0), (1,1,1)])
1562n/a
1563n/a def test_repeat(self):
1564n/a self.assertEqual(list(repeat(10, 3)), [10, 10, 10])
1565n/a
1566n/a def test_stapmap(self):
1567n/a self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])),
1568n/a [32, 9, 1000])
1569n/a
1570n/a def test_takewhile(self):
1571n/a self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4])
1572n/a
1573n/a
1574n/aclass TestGC(unittest.TestCase):
1575n/a
1576n/a def makecycle(self, iterator, container):
1577n/a container.append(iterator)
1578n/a next(iterator)
1579n/a del container, iterator
1580n/a
1581n/a def test_accumulate(self):
1582n/a a = []
1583n/a self.makecycle(accumulate([1,2,a,3]), a)
1584n/a
1585n/a def test_chain(self):
1586n/a a = []
1587n/a self.makecycle(chain(a), a)
1588n/a
1589n/a def test_chain_from_iterable(self):
1590n/a a = []
1591n/a self.makecycle(chain.from_iterable([a]), a)
1592n/a
1593n/a def test_combinations(self):
1594n/a a = []
1595n/a self.makecycle(combinations([1,2,a,3], 3), a)
1596n/a
1597n/a def test_combinations_with_replacement(self):
1598n/a a = []
1599n/a self.makecycle(combinations_with_replacement([1,2,a,3], 3), a)
1600n/a
1601n/a def test_compress(self):
1602n/a a = []
1603n/a self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a)
1604n/a
1605n/a def test_count(self):
1606n/a a = []
1607n/a Int = type('Int', (int,), dict(x=a))
1608n/a self.makecycle(count(Int(0), Int(1)), a)
1609n/a
1610n/a def test_cycle(self):
1611n/a a = []
1612n/a self.makecycle(cycle([a]*2), a)
1613n/a
1614n/a def test_dropwhile(self):
1615n/a a = []
1616n/a self.makecycle(dropwhile(bool, [0, a, a]), a)
1617n/a
1618n/a def test_groupby(self):
1619n/a a = []
1620n/a self.makecycle(groupby([a]*2, lambda x:x), a)
1621n/a
1622n/a def test_issue2246(self):
1623n/a # Issue 2246 -- the _grouper iterator was not included in GC
1624n/a n = 10
1625n/a keyfunc = lambda x: x
1626n/a for i, j in groupby(range(n), key=keyfunc):
1627n/a keyfunc.__dict__.setdefault('x',[]).append(j)
1628n/a
1629n/a def test_filter(self):
1630n/a a = []
1631n/a self.makecycle(filter(lambda x:True, [a]*2), a)
1632n/a
1633n/a def test_filterfalse(self):
1634n/a a = []
1635n/a self.makecycle(filterfalse(lambda x:False, a), a)
1636n/a
1637n/a def test_zip(self):
1638n/a a = []
1639n/a self.makecycle(zip([a]*2, [a]*3), a)
1640n/a
1641n/a def test_zip_longest(self):
1642n/a a = []
1643n/a self.makecycle(zip_longest([a]*2, [a]*3), a)
1644n/a b = [a, None]
1645n/a self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a)
1646n/a
1647n/a def test_map(self):
1648n/a a = []
1649n/a self.makecycle(map(lambda x:x, [a]*2), a)
1650n/a
1651n/a def test_islice(self):
1652n/a a = []
1653n/a self.makecycle(islice([a]*2, None), a)
1654n/a
1655n/a def test_permutations(self):
1656n/a a = []
1657n/a self.makecycle(permutations([1,2,a,3], 3), a)
1658n/a
1659n/a def test_product(self):
1660n/a a = []
1661n/a self.makecycle(product([1,2,a,3], repeat=3), a)
1662n/a
1663n/a def test_repeat(self):
1664n/a a = []
1665n/a self.makecycle(repeat(a), a)
1666n/a
1667n/a def test_starmap(self):
1668n/a a = []
1669n/a self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a)
1670n/a
1671n/a def test_takewhile(self):
1672n/a a = []
1673n/a self.makecycle(takewhile(bool, [1, 0, a, a]), a)
1674n/a
1675n/adef R(seqn):
1676n/a 'Regular generator'
1677n/a for i in seqn:
1678n/a yield i
1679n/a
1680n/aclass G:
1681n/a 'Sequence using __getitem__'
1682n/a def __init__(self, seqn):
1683n/a self.seqn = seqn
1684n/a def __getitem__(self, i):
1685n/a return self.seqn[i]
1686n/a
1687n/aclass I:
1688n/a 'Sequence using iterator protocol'
1689n/a def __init__(self, seqn):
1690n/a self.seqn = seqn
1691n/a self.i = 0
1692n/a def __iter__(self):
1693n/a return self
1694n/a def __next__(self):
1695n/a if self.i >= len(self.seqn): raise StopIteration
1696n/a v = self.seqn[self.i]
1697n/a self.i += 1
1698n/a return v
1699n/a
1700n/aclass Ig:
1701n/a 'Sequence using iterator protocol defined with a generator'
1702n/a def __init__(self, seqn):
1703n/a self.seqn = seqn
1704n/a self.i = 0
1705n/a def __iter__(self):
1706n/a for val in self.seqn:
1707n/a yield val
1708n/a
1709n/aclass X:
1710n/a 'Missing __getitem__ and __iter__'
1711n/a def __init__(self, seqn):
1712n/a self.seqn = seqn
1713n/a self.i = 0
1714n/a def __next__(self):
1715n/a if self.i >= len(self.seqn): raise StopIteration
1716n/a v = self.seqn[self.i]
1717n/a self.i += 1
1718n/a return v
1719n/a
1720n/aclass N:
1721n/a 'Iterator missing __next__()'
1722n/a def __init__(self, seqn):
1723n/a self.seqn = seqn
1724n/a self.i = 0
1725n/a def __iter__(self):
1726n/a return self
1727n/a
1728n/aclass E:
1729n/a 'Test propagation of exceptions'
1730n/a def __init__(self, seqn):
1731n/a self.seqn = seqn
1732n/a self.i = 0
1733n/a def __iter__(self):
1734n/a return self
1735n/a def __next__(self):
1736n/a 3 // 0
1737n/a
1738n/aclass S:
1739n/a 'Test immediate stop'
1740n/a def __init__(self, seqn):
1741n/a pass
1742n/a def __iter__(self):
1743n/a return self
1744n/a def __next__(self):
1745n/a raise StopIteration
1746n/a
1747n/adef L(seqn):
1748n/a 'Test multiple tiers of iterators'
1749n/a return chain(map(lambda x:x, R(Ig(G(seqn)))))
1750n/a
1751n/a
1752n/aclass TestVariousIteratorArgs(unittest.TestCase):
1753n/a
1754n/a def test_accumulate(self):
1755n/a s = [1,2,3,4,5]
1756n/a r = [1,3,6,10,15]
1757n/a n = len(s)
1758n/a for g in (G, I, Ig, L, R):
1759n/a self.assertEqual(list(accumulate(g(s))), r)
1760n/a self.assertEqual(list(accumulate(S(s))), [])
1761n/a self.assertRaises(TypeError, accumulate, X(s))
1762n/a self.assertRaises(TypeError, accumulate, N(s))
1763n/a self.assertRaises(ZeroDivisionError, list, accumulate(E(s)))
1764n/a
1765n/a def test_chain(self):
1766n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1767n/a for g in (G, I, Ig, S, L, R):
1768n/a self.assertEqual(list(chain(g(s))), list(g(s)))
1769n/a self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s)))
1770n/a self.assertRaises(TypeError, list, chain(X(s)))
1771n/a self.assertRaises(TypeError, list, chain(N(s)))
1772n/a self.assertRaises(ZeroDivisionError, list, chain(E(s)))
1773n/a
1774n/a def test_compress(self):
1775n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1776n/a n = len(s)
1777n/a for g in (G, I, Ig, S, L, R):
1778n/a self.assertEqual(list(compress(g(s), repeat(1))), list(g(s)))
1779n/a self.assertRaises(TypeError, compress, X(s), repeat(1))
1780n/a self.assertRaises(TypeError, compress, N(s), repeat(1))
1781n/a self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1)))
1782n/a
1783n/a def test_product(self):
1784n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1785n/a self.assertRaises(TypeError, product, X(s))
1786n/a self.assertRaises(TypeError, product, N(s))
1787n/a self.assertRaises(ZeroDivisionError, product, E(s))
1788n/a
1789n/a def test_cycle(self):
1790n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1791n/a for g in (G, I, Ig, S, L, R):
1792n/a tgtlen = len(s) * 3
1793n/a expected = list(g(s))*3
1794n/a actual = list(islice(cycle(g(s)), tgtlen))
1795n/a self.assertEqual(actual, expected)
1796n/a self.assertRaises(TypeError, cycle, X(s))
1797n/a self.assertRaises(TypeError, cycle, N(s))
1798n/a self.assertRaises(ZeroDivisionError, list, cycle(E(s)))
1799n/a
1800n/a def test_groupby(self):
1801n/a for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
1802n/a for g in (G, I, Ig, S, L, R):
1803n/a self.assertEqual([k for k, sb in groupby(g(s))], list(g(s)))
1804n/a self.assertRaises(TypeError, groupby, X(s))
1805n/a self.assertRaises(TypeError, groupby, N(s))
1806n/a self.assertRaises(ZeroDivisionError, list, groupby(E(s)))
1807n/a
1808n/a def test_filter(self):
1809n/a for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
1810n/a for g in (G, I, Ig, S, L, R):
1811n/a self.assertEqual(list(filter(isEven, g(s))),
1812n/a [x for x in g(s) if isEven(x)])
1813n/a self.assertRaises(TypeError, filter, isEven, X(s))
1814n/a self.assertRaises(TypeError, filter, isEven, N(s))
1815n/a self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s)))
1816n/a
1817n/a def test_filterfalse(self):
1818n/a for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
1819n/a for g in (G, I, Ig, S, L, R):
1820n/a self.assertEqual(list(filterfalse(isEven, g(s))),
1821n/a [x for x in g(s) if isOdd(x)])
1822n/a self.assertRaises(TypeError, filterfalse, isEven, X(s))
1823n/a self.assertRaises(TypeError, filterfalse, isEven, N(s))
1824n/a self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s)))
1825n/a
1826n/a def test_zip(self):
1827n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1828n/a for g in (G, I, Ig, S, L, R):
1829n/a self.assertEqual(list(zip(g(s))), lzip(g(s)))
1830n/a self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s)))
1831n/a self.assertRaises(TypeError, zip, X(s))
1832n/a self.assertRaises(TypeError, zip, N(s))
1833n/a self.assertRaises(ZeroDivisionError, list, zip(E(s)))
1834n/a
1835n/a def test_ziplongest(self):
1836n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1837n/a for g in (G, I, Ig, S, L, R):
1838n/a self.assertEqual(list(zip_longest(g(s))), list(zip(g(s))))
1839n/a self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s))))
1840n/a self.assertRaises(TypeError, zip_longest, X(s))
1841n/a self.assertRaises(TypeError, zip_longest, N(s))
1842n/a self.assertRaises(ZeroDivisionError, list, zip_longest(E(s)))
1843n/a
1844n/a def test_map(self):
1845n/a for s in (range(10), range(0), range(100), (7,11), range(20,50,5)):
1846n/a for g in (G, I, Ig, S, L, R):
1847n/a self.assertEqual(list(map(onearg, g(s))),
1848n/a [onearg(x) for x in g(s)])
1849n/a self.assertEqual(list(map(operator.pow, g(s), g(s))),
1850n/a [x**x for x in g(s)])
1851n/a self.assertRaises(TypeError, map, onearg, X(s))
1852n/a self.assertRaises(TypeError, map, onearg, N(s))
1853n/a self.assertRaises(ZeroDivisionError, list, map(onearg, E(s)))
1854n/a
1855n/a def test_islice(self):
1856n/a for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1857n/a for g in (G, I, Ig, S, L, R):
1858n/a self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2])
1859n/a self.assertRaises(TypeError, islice, X(s), 10)
1860n/a self.assertRaises(TypeError, islice, N(s), 10)
1861n/a self.assertRaises(ZeroDivisionError, list, islice(E(s), 10))
1862n/a
1863n/a def test_starmap(self):
1864n/a for s in (range(10), range(0), range(100), (7,11), range(20,50,5)):
1865n/a for g in (G, I, Ig, S, L, R):
1866n/a ss = lzip(s, s)
1867n/a self.assertEqual(list(starmap(operator.pow, g(ss))),
1868n/a [x**x for x in g(s)])
1869n/a self.assertRaises(TypeError, starmap, operator.pow, X(ss))
1870n/a self.assertRaises(TypeError, starmap, operator.pow, N(ss))
1871n/a self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss)))
1872n/a
1873n/a def test_takewhile(self):
1874n/a for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
1875n/a for g in (G, I, Ig, S, L, R):
1876n/a tgt = []
1877n/a for elem in g(s):
1878n/a if not isEven(elem): break
1879n/a tgt.append(elem)
1880n/a self.assertEqual(list(takewhile(isEven, g(s))), tgt)
1881n/a self.assertRaises(TypeError, takewhile, isEven, X(s))
1882n/a self.assertRaises(TypeError, takewhile, isEven, N(s))
1883n/a self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s)))
1884n/a
1885n/a def test_dropwhile(self):
1886n/a for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
1887n/a for g in (G, I, Ig, S, L, R):
1888n/a tgt = []
1889n/a for elem in g(s):
1890n/a if not tgt and isOdd(elem): continue
1891n/a tgt.append(elem)
1892n/a self.assertEqual(list(dropwhile(isOdd, g(s))), tgt)
1893n/a self.assertRaises(TypeError, dropwhile, isOdd, X(s))
1894n/a self.assertRaises(TypeError, dropwhile, isOdd, N(s))
1895n/a self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s)))
1896n/a
1897n/a def test_tee(self):
1898n/a for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
1899n/a for g in (G, I, Ig, S, L, R):
1900n/a it1, it2 = tee(g(s))
1901n/a self.assertEqual(list(it1), list(g(s)))
1902n/a self.assertEqual(list(it2), list(g(s)))
1903n/a self.assertRaises(TypeError, tee, X(s))
1904n/a self.assertRaises(TypeError, tee, N(s))
1905n/a self.assertRaises(ZeroDivisionError, list, tee(E(s))[0])
1906n/a
1907n/aclass LengthTransparency(unittest.TestCase):
1908n/a
1909n/a def test_repeat(self):
1910n/a self.assertEqual(operator.length_hint(repeat(None, 50)), 50)
1911n/a self.assertEqual(operator.length_hint(repeat(None, 0)), 0)
1912n/a self.assertEqual(operator.length_hint(repeat(None), 12), 12)
1913n/a
1914n/a def test_repeat_with_negative_times(self):
1915n/a self.assertEqual(operator.length_hint(repeat(None, -1)), 0)
1916n/a self.assertEqual(operator.length_hint(repeat(None, -2)), 0)
1917n/a self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0)
1918n/a self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0)
1919n/a
1920n/aclass RegressionTests(unittest.TestCase):
1921n/a
1922n/a def test_sf_793826(self):
1923n/a # Fix Armin Rigo's successful efforts to wreak havoc
1924n/a
1925n/a def mutatingtuple(tuple1, f, tuple2):
1926n/a # this builds a tuple t which is a copy of tuple1,
1927n/a # then calls f(t), then mutates t to be equal to tuple2
1928n/a # (needs len(tuple1) == len(tuple2)).
1929n/a def g(value, first=[1]):
1930n/a if first:
1931n/a del first[:]
1932n/a f(next(z))
1933n/a return value
1934n/a items = list(tuple2)
1935n/a items[1:1] = list(tuple1)
1936n/a gen = map(g, items)
1937n/a z = zip(*[gen]*len(tuple1))
1938n/a next(z)
1939n/a
1940n/a def f(t):
1941n/a global T
1942n/a T = t
1943n/a first[:] = list(T)
1944n/a
1945n/a first = []
1946n/a mutatingtuple((1,2,3), f, (4,5,6))
1947n/a second = list(T)
1948n/a self.assertEqual(first, second)
1949n/a
1950n/a
1951n/a def test_sf_950057(self):
1952n/a # Make sure that chain() and cycle() catch exceptions immediately
1953n/a # rather than when shifting between input sources
1954n/a
1955n/a def gen1():
1956n/a hist.append(0)
1957n/a yield 1
1958n/a hist.append(1)
1959n/a raise AssertionError
1960n/a hist.append(2)
1961n/a
1962n/a def gen2(x):
1963n/a hist.append(3)
1964n/a yield 2
1965n/a hist.append(4)
1966n/a
1967n/a hist = []
1968n/a self.assertRaises(AssertionError, list, chain(gen1(), gen2(False)))
1969n/a self.assertEqual(hist, [0,1])
1970n/a
1971n/a hist = []
1972n/a self.assertRaises(AssertionError, list, chain(gen1(), gen2(True)))
1973n/a self.assertEqual(hist, [0,1])
1974n/a
1975n/a hist = []
1976n/a self.assertRaises(AssertionError, list, cycle(gen1()))
1977n/a self.assertEqual(hist, [0,1])
1978n/a
1979n/aclass SubclassWithKwargsTest(unittest.TestCase):
1980n/a def test_keywords_in_subclass(self):
1981n/a # count is not subclassable...
1982n/a for cls in (repeat, zip, filter, filterfalse, chain, map,
1983n/a starmap, islice, takewhile, dropwhile, cycle, compress):
1984n/a class Subclass(cls):
1985n/a def __init__(self, newarg=None, *args):
1986n/a cls.__init__(self, *args)
1987n/a try:
1988n/a Subclass(newarg=1)
1989n/a except TypeError as err:
1990n/a # we expect type errors because of wrong argument count
1991n/a self.assertNotIn("does not take keyword arguments", err.args[0])
1992n/a
1993n/a@support.cpython_only
1994n/aclass SizeofTest(unittest.TestCase):
1995n/a def setUp(self):
1996n/a self.ssize_t = struct.calcsize('n')
1997n/a
1998n/a check_sizeof = support.check_sizeof
1999n/a
2000n/a def test_product_sizeof(self):
2001n/a basesize = support.calcobjsize('3Pi')
2002n/a check = self.check_sizeof
2003n/a check(product('ab', '12'), basesize + 2 * self.ssize_t)
2004n/a check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t)
2005n/a
2006n/a def test_combinations_sizeof(self):
2007n/a basesize = support.calcobjsize('3Pni')
2008n/a check = self.check_sizeof
2009n/a check(combinations('abcd', 3), basesize + 3 * self.ssize_t)
2010n/a check(combinations(range(10), 4), basesize + 4 * self.ssize_t)
2011n/a
2012n/a def test_combinations_with_replacement_sizeof(self):
2013n/a cwr = combinations_with_replacement
2014n/a basesize = support.calcobjsize('3Pni')
2015n/a check = self.check_sizeof
2016n/a check(cwr('abcd', 3), basesize + 3 * self.ssize_t)
2017n/a check(cwr(range(10), 4), basesize + 4 * self.ssize_t)
2018n/a
2019n/a def test_permutations_sizeof(self):
2020n/a basesize = support.calcobjsize('4Pni')
2021n/a check = self.check_sizeof
2022n/a check(permutations('abcd'),
2023n/a basesize + 4 * self.ssize_t + 4 * self.ssize_t)
2024n/a check(permutations('abcd', 3),
2025n/a basesize + 4 * self.ssize_t + 3 * self.ssize_t)
2026n/a check(permutations('abcde', 3),
2027n/a basesize + 5 * self.ssize_t + 3 * self.ssize_t)
2028n/a check(permutations(range(10), 4),
2029n/a basesize + 10 * self.ssize_t + 4 * self.ssize_t)
2030n/a
2031n/a
2032n/alibreftest = """ Doctest for examples in the library reference: libitertools.tex
2033n/a
2034n/a
2035n/a>>> amounts = [120.15, 764.05, 823.14]
2036n/a>>> for checknum, amount in zip(count(1200), amounts):
2037n/a... print('Check %d is for $%.2f' % (checknum, amount))
2038n/a...
2039n/aCheck 1200 is for $120.15
2040n/aCheck 1201 is for $764.05
2041n/aCheck 1202 is for $823.14
2042n/a
2043n/a>>> import operator
2044n/a>>> for cube in map(operator.pow, range(1,4), repeat(3)):
2045n/a... print(cube)
2046n/a...
2047n/a1
2048n/a8
2049n/a27
2050n/a
2051n/a>>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele']
2052n/a>>> for name in islice(reportlines, 3, None, 2):
2053n/a... print(name.title())
2054n/a...
2055n/aAlex
2056n/aLaura
2057n/aMartin
2058n/aWalter
2059n/aSamuele
2060n/a
2061n/a>>> from operator import itemgetter
2062n/a>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3)
2063n/a>>> di = sorted(sorted(d.items()), key=itemgetter(1))
2064n/a>>> for k, g in groupby(di, itemgetter(1)):
2065n/a... print(k, list(map(itemgetter(0), g)))
2066n/a...
2067n/a1 ['a', 'c', 'e']
2068n/a2 ['b', 'd', 'f']
2069n/a3 ['g']
2070n/a
2071n/a# Find runs of consecutive numbers using groupby. The key to the solution
2072n/a# is differencing with a range so that consecutive numbers all appear in
2073n/a# same group.
2074n/a>>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28]
2075n/a>>> for k, g in groupby(enumerate(data), lambda t:t[0]-t[1]):
2076n/a... print(list(map(operator.itemgetter(1), g)))
2077n/a...
2078n/a[1]
2079n/a[4, 5, 6]
2080n/a[10]
2081n/a[15, 16, 17, 18]
2082n/a[22]
2083n/a[25, 26, 27, 28]
2084n/a
2085n/a>>> def take(n, iterable):
2086n/a... "Return first n items of the iterable as a list"
2087n/a... return list(islice(iterable, n))
2088n/a
2089n/a>>> def enumerate(iterable, start=0):
2090n/a... return zip(count(start), iterable)
2091n/a
2092n/a>>> def tabulate(function, start=0):
2093n/a... "Return function(0), function(1), ..."
2094n/a... return map(function, count(start))
2095n/a
2096n/a>>> def nth(iterable, n, default=None):
2097n/a... "Returns the nth item or a default value"
2098n/a... return next(islice(iterable, n, None), default)
2099n/a
2100n/a>>> def all_equal(iterable):
2101n/a... "Returns True if all the elements are equal to each other"
2102n/a... g = groupby(iterable)
2103n/a... return next(g, True) and not next(g, False)
2104n/a
2105n/a>>> def quantify(iterable, pred=bool):
2106n/a... "Count how many times the predicate is true"
2107n/a... return sum(map(pred, iterable))
2108n/a
2109n/a>>> def padnone(iterable):
2110n/a... "Returns the sequence elements and then returns None indefinitely"
2111n/a... return chain(iterable, repeat(None))
2112n/a
2113n/a>>> def ncycles(iterable, n):
2114n/a... "Returns the sequence elements n times"
2115n/a... return chain(*repeat(iterable, n))
2116n/a
2117n/a>>> def dotproduct(vec1, vec2):
2118n/a... return sum(map(operator.mul, vec1, vec2))
2119n/a
2120n/a>>> def flatten(listOfLists):
2121n/a... return list(chain.from_iterable(listOfLists))
2122n/a
2123n/a>>> def repeatfunc(func, times=None, *args):
2124n/a... "Repeat calls to func with specified arguments."
2125n/a... " Example: repeatfunc(random.random)"
2126n/a... if times is None:
2127n/a... return starmap(func, repeat(args))
2128n/a... else:
2129n/a... return starmap(func, repeat(args, times))
2130n/a
2131n/a>>> def pairwise(iterable):
2132n/a... "s -> (s0,s1), (s1,s2), (s2, s3), ..."
2133n/a... a, b = tee(iterable)
2134n/a... try:
2135n/a... next(b)
2136n/a... except StopIteration:
2137n/a... pass
2138n/a... return zip(a, b)
2139n/a
2140n/a>>> def grouper(n, iterable, fillvalue=None):
2141n/a... "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
2142n/a... args = [iter(iterable)] * n
2143n/a... return zip_longest(*args, fillvalue=fillvalue)
2144n/a
2145n/a>>> def roundrobin(*iterables):
2146n/a... "roundrobin('ABC', 'D', 'EF') --> A D E B F C"
2147n/a... # Recipe credited to George Sakkis
2148n/a... pending = len(iterables)
2149n/a... nexts = cycle(iter(it).__next__ for it in iterables)
2150n/a... while pending:
2151n/a... try:
2152n/a... for next in nexts:
2153n/a... yield next()
2154n/a... except StopIteration:
2155n/a... pending -= 1
2156n/a... nexts = cycle(islice(nexts, pending))
2157n/a
2158n/a>>> def powerset(iterable):
2159n/a... "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
2160n/a... s = list(iterable)
2161n/a... return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
2162n/a
2163n/a>>> def unique_everseen(iterable, key=None):
2164n/a... "List unique elements, preserving order. Remember all elements ever seen."
2165n/a... # unique_everseen('AAAABBBCCDAABBB') --> A B C D
2166n/a... # unique_everseen('ABBCcAD', str.lower) --> A B C D
2167n/a... seen = set()
2168n/a... seen_add = seen.add
2169n/a... if key is None:
2170n/a... for element in iterable:
2171n/a... if element not in seen:
2172n/a... seen_add(element)
2173n/a... yield element
2174n/a... else:
2175n/a... for element in iterable:
2176n/a... k = key(element)
2177n/a... if k not in seen:
2178n/a... seen_add(k)
2179n/a... yield element
2180n/a
2181n/a>>> def unique_justseen(iterable, key=None):
2182n/a... "List unique elements, preserving order. Remember only the element just seen."
2183n/a... # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
2184n/a... # unique_justseen('ABBCcAD', str.lower) --> A B C A D
2185n/a... return map(next, map(itemgetter(1), groupby(iterable, key)))
2186n/a
2187n/a>>> def first_true(iterable, default=False, pred=None):
2188n/a... '''Returns the first true value in the iterable.
2189n/a...
2190n/a... If no true value is found, returns *default*
2191n/a...
2192n/a... If *pred* is not None, returns the first item
2193n/a... for which pred(item) is true.
2194n/a...
2195n/a... '''
2196n/a... # first_true([a,b,c], x) --> a or b or c or x
2197n/a... # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
2198n/a... return next(filter(pred, iterable), default)
2199n/a
2200n/aThis is not part of the examples but it tests to make sure the definitions
2201n/aperform as purported.
2202n/a
2203n/a>>> take(10, count())
2204n/a[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2205n/a
2206n/a>>> list(enumerate('abc'))
2207n/a[(0, 'a'), (1, 'b'), (2, 'c')]
2208n/a
2209n/a>>> list(islice(tabulate(lambda x: 2*x), 4))
2210n/a[0, 2, 4, 6]
2211n/a
2212n/a>>> nth('abcde', 3)
2213n/a'd'
2214n/a
2215n/a>>> nth('abcde', 9) is None
2216n/aTrue
2217n/a
2218n/a>>> [all_equal(s) for s in ('', 'A', 'AAAA', 'AAAB', 'AAABA')]
2219n/a[True, True, True, False, False]
2220n/a
2221n/a>>> quantify(range(99), lambda x: x%2==0)
2222n/a50
2223n/a
2224n/a>>> a = [[1, 2, 3], [4, 5, 6]]
2225n/a>>> flatten(a)
2226n/a[1, 2, 3, 4, 5, 6]
2227n/a
2228n/a>>> list(repeatfunc(pow, 5, 2, 3))
2229n/a[8, 8, 8, 8, 8]
2230n/a
2231n/a>>> import random
2232n/a>>> take(5, map(int, repeatfunc(random.random)))
2233n/a[0, 0, 0, 0, 0]
2234n/a
2235n/a>>> list(pairwise('abcd'))
2236n/a[('a', 'b'), ('b', 'c'), ('c', 'd')]
2237n/a
2238n/a>>> list(pairwise([]))
2239n/a[]
2240n/a
2241n/a>>> list(pairwise('a'))
2242n/a[]
2243n/a
2244n/a>>> list(islice(padnone('abc'), 0, 6))
2245n/a['a', 'b', 'c', None, None, None]
2246n/a
2247n/a>>> list(ncycles('abc', 3))
2248n/a['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c']
2249n/a
2250n/a>>> dotproduct([1,2,3], [4,5,6])
2251n/a32
2252n/a
2253n/a>>> list(grouper(3, 'abcdefg', 'x'))
2254n/a[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'x', 'x')]
2255n/a
2256n/a>>> list(roundrobin('abc', 'd', 'ef'))
2257n/a['a', 'd', 'e', 'b', 'f', 'c']
2258n/a
2259n/a>>> list(powerset([1,2,3]))
2260n/a[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)]
2261n/a
2262n/a>>> all(len(list(powerset(range(n)))) == 2**n for n in range(18))
2263n/aTrue
2264n/a
2265n/a>>> list(powerset('abcde')) == sorted(sorted(set(powerset('abcde'))), key=len)
2266n/aTrue
2267n/a
2268n/a>>> list(unique_everseen('AAAABBBCCDAABBB'))
2269n/a['A', 'B', 'C', 'D']
2270n/a
2271n/a>>> list(unique_everseen('ABBCcAD', str.lower))
2272n/a['A', 'B', 'C', 'D']
2273n/a
2274n/a>>> list(unique_justseen('AAAABBBCCDAABBB'))
2275n/a['A', 'B', 'C', 'D', 'A', 'B']
2276n/a
2277n/a>>> list(unique_justseen('ABBCcAD', str.lower))
2278n/a['A', 'B', 'C', 'A', 'D']
2279n/a
2280n/a>>> first_true('ABC0DEF1', '9', str.isdigit)
2281n/a'0'
2282n/a
2283n/a"""
2284n/a
2285n/a__test__ = {'libreftest' : libreftest}
2286n/a
2287n/adef test_main(verbose=None):
2288n/a test_classes = (TestBasicOps, TestVariousIteratorArgs, TestGC,
2289n/a RegressionTests, LengthTransparency,
2290n/a SubclassWithKwargsTest, TestExamples,
2291n/a SizeofTest)
2292n/a support.run_unittest(*test_classes)
2293n/a
2294n/a # verify reference counting
2295n/a if verbose and hasattr(sys, "gettotalrefcount"):
2296n/a import gc
2297n/a counts = [None] * 5
2298n/a for i in range(len(counts)):
2299n/a support.run_unittest(*test_classes)
2300n/a gc.collect()
2301n/a counts[i] = sys.gettotalrefcount()
2302n/a print(counts)
2303n/a
2304n/a # doctest the examples in the library reference
2305n/a support.run_doctest(sys.modules[__name__], verbose)
2306n/a
2307n/aif __name__ == "__main__":
2308n/a test_main(verbose=True)