# Python code coverage for Lib/test/test_heapq.py

# | count | content |
---|---|---|

1 | n/a | """Unittests for heapq.""" |

2 | n/a | |

3 | n/a | import random |

4 | n/a | import unittest |

5 | n/a | |

6 | n/a | from test import support |

7 | n/a | from unittest import TestCase, skipUnless |

8 | n/a | from operator import itemgetter |

9 | n/a | |

10 | n/a | py_heapq = support.import_fresh_module('heapq', blocked=['_heapq']) |

11 | n/a | c_heapq = support.import_fresh_module('heapq', fresh=['_heapq']) |

12 | n/a | |

13 | n/a | # _heapq.nlargest/nsmallest are saved in heapq._nlargest/_smallest when |

14 | n/a | # _heapq is imported, so check them there |

15 | n/a | func_names = ['heapify', 'heappop', 'heappush', 'heappushpop', 'heapreplace', |

16 | n/a | '_heappop_max', '_heapreplace_max', '_heapify_max'] |

17 | n/a | |

18 | n/a | class TestModules(TestCase): |

19 | n/a | def test_py_functions(self): |

20 | n/a | for fname in func_names: |

21 | n/a | self.assertEqual(getattr(py_heapq, fname).__module__, 'heapq') |

22 | n/a | |

23 | n/a | @skipUnless(c_heapq, 'requires _heapq') |

24 | n/a | def test_c_functions(self): |

25 | n/a | for fname in func_names: |

26 | n/a | self.assertEqual(getattr(c_heapq, fname).__module__, '_heapq') |

27 | n/a | |

28 | n/a | |

29 | n/a | class TestHeap: |

30 | n/a | |

31 | n/a | def test_push_pop(self): |

32 | n/a | # 1) Push 256 random numbers and pop them off, verifying all's OK. |

33 | n/a | heap = [] |

34 | n/a | data = [] |

35 | n/a | self.check_invariant(heap) |

36 | n/a | for i in range(256): |

37 | n/a | item = random.random() |

38 | n/a | data.append(item) |

39 | n/a | self.module.heappush(heap, item) |

40 | n/a | self.check_invariant(heap) |

41 | n/a | results = [] |

42 | n/a | while heap: |

43 | n/a | item = self.module.heappop(heap) |

44 | n/a | self.check_invariant(heap) |

45 | n/a | results.append(item) |

46 | n/a | data_sorted = data[:] |

47 | n/a | data_sorted.sort() |

48 | n/a | self.assertEqual(data_sorted, results) |

49 | n/a | # 2) Check that the invariant holds for a sorted array |

50 | n/a | self.check_invariant(results) |

51 | n/a | |

52 | n/a | self.assertRaises(TypeError, self.module.heappush, []) |

53 | n/a | try: |

54 | n/a | self.assertRaises(TypeError, self.module.heappush, None, None) |

55 | n/a | self.assertRaises(TypeError, self.module.heappop, None) |

56 | n/a | except AttributeError: |

57 | n/a | pass |

58 | n/a | |

59 | n/a | def check_invariant(self, heap): |

60 | n/a | # Check the heap invariant. |

61 | n/a | for pos, item in enumerate(heap): |

62 | n/a | if pos: # pos 0 has no parent |

63 | n/a | parentpos = (pos-1) >> 1 |

64 | n/a | self.assertTrue(heap[parentpos] <= item) |

65 | n/a | |

66 | n/a | def test_heapify(self): |

67 | n/a | for size in list(range(30)) + [20000]: |

68 | n/a | heap = [random.random() for dummy in range(size)] |

69 | n/a | self.module.heapify(heap) |

70 | n/a | self.check_invariant(heap) |

71 | n/a | |

72 | n/a | self.assertRaises(TypeError, self.module.heapify, None) |

73 | n/a | |

74 | n/a | def test_naive_nbest(self): |

75 | n/a | data = [random.randrange(2000) for i in range(1000)] |

76 | n/a | heap = [] |

77 | n/a | for item in data: |

78 | n/a | self.module.heappush(heap, item) |

79 | n/a | if len(heap) > 10: |

80 | n/a | self.module.heappop(heap) |

81 | n/a | heap.sort() |

82 | n/a | self.assertEqual(heap, sorted(data)[-10:]) |

83 | n/a | |

84 | n/a | def heapiter(self, heap): |

85 | n/a | # An iterator returning a heap's elements, smallest-first. |

86 | n/a | try: |

87 | n/a | while 1: |

88 | n/a | yield self.module.heappop(heap) |

89 | n/a | except IndexError: |

90 | n/a | pass |

91 | n/a | |

92 | n/a | def test_nbest(self): |

93 | n/a | # Less-naive "N-best" algorithm, much faster (if len(data) is big |

94 | n/a | # enough <wink>) than sorting all of data. However, if we had a max |

95 | n/a | # heap instead of a min heap, it could go faster still via |

96 | n/a | # heapify'ing all of data (linear time), then doing 10 heappops |

97 | n/a | # (10 log-time steps). |

98 | n/a | data = [random.randrange(2000) for i in range(1000)] |

99 | n/a | heap = data[:10] |

100 | n/a | self.module.heapify(heap) |

101 | n/a | for item in data[10:]: |

102 | n/a | if item > heap[0]: # this gets rarer the longer we run |

103 | n/a | self.module.heapreplace(heap, item) |

104 | n/a | self.assertEqual(list(self.heapiter(heap)), sorted(data)[-10:]) |

105 | n/a | |

106 | n/a | self.assertRaises(TypeError, self.module.heapreplace, None) |

107 | n/a | self.assertRaises(TypeError, self.module.heapreplace, None, None) |

108 | n/a | self.assertRaises(IndexError, self.module.heapreplace, [], None) |

109 | n/a | |

110 | n/a | def test_nbest_with_pushpop(self): |

111 | n/a | data = [random.randrange(2000) for i in range(1000)] |

112 | n/a | heap = data[:10] |

113 | n/a | self.module.heapify(heap) |

114 | n/a | for item in data[10:]: |

115 | n/a | self.module.heappushpop(heap, item) |

116 | n/a | self.assertEqual(list(self.heapiter(heap)), sorted(data)[-10:]) |

117 | n/a | self.assertEqual(self.module.heappushpop([], 'x'), 'x') |

118 | n/a | |

119 | n/a | def test_heappushpop(self): |

120 | n/a | h = [] |

121 | n/a | x = self.module.heappushpop(h, 10) |

122 | n/a | self.assertEqual((h, x), ([], 10)) |

123 | n/a | |

124 | n/a | h = [10] |

125 | n/a | x = self.module.heappushpop(h, 10.0) |

126 | n/a | self.assertEqual((h, x), ([10], 10.0)) |

127 | n/a | self.assertEqual(type(h[0]), int) |

128 | n/a | self.assertEqual(type(x), float) |

129 | n/a | |

130 | n/a | h = [10]; |

131 | n/a | x = self.module.heappushpop(h, 9) |

132 | n/a | self.assertEqual((h, x), ([10], 9)) |

133 | n/a | |

134 | n/a | h = [10]; |

135 | n/a | x = self.module.heappushpop(h, 11) |

136 | n/a | self.assertEqual((h, x), ([11], 10)) |

137 | n/a | |

138 | n/a | def test_heapsort(self): |

139 | n/a | # Exercise everything with repeated heapsort checks |

140 | n/a | for trial in range(100): |

141 | n/a | size = random.randrange(50) |

142 | n/a | data = [random.randrange(25) for i in range(size)] |

143 | n/a | if trial & 1: # Half of the time, use heapify |

144 | n/a | heap = data[:] |

145 | n/a | self.module.heapify(heap) |

146 | n/a | else: # The rest of the time, use heappush |

147 | n/a | heap = [] |

148 | n/a | for item in data: |

149 | n/a | self.module.heappush(heap, item) |

150 | n/a | heap_sorted = [self.module.heappop(heap) for i in range(size)] |

151 | n/a | self.assertEqual(heap_sorted, sorted(data)) |

152 | n/a | |

153 | n/a | def test_merge(self): |

154 | n/a | inputs = [] |

155 | n/a | for i in range(random.randrange(25)): |

156 | n/a | row = [] |

157 | n/a | for j in range(random.randrange(100)): |

158 | n/a | tup = random.choice('ABC'), random.randrange(-500, 500) |

159 | n/a | row.append(tup) |

160 | n/a | inputs.append(row) |

161 | n/a | |

162 | n/a | for key in [None, itemgetter(0), itemgetter(1), itemgetter(1, 0)]: |

163 | n/a | for reverse in [False, True]: |

164 | n/a | seqs = [] |

165 | n/a | for seq in inputs: |

166 | n/a | seqs.append(sorted(seq, key=key, reverse=reverse)) |

167 | n/a | self.assertEqual(sorted(chain(*inputs), key=key, reverse=reverse), |

168 | n/a | list(self.module.merge(*seqs, key=key, reverse=reverse))) |

169 | n/a | self.assertEqual(list(self.module.merge()), []) |

170 | n/a | |

171 | n/a | def test_merge_does_not_suppress_index_error(self): |

172 | n/a | # Issue 19018: Heapq.merge suppresses IndexError from user generator |

173 | n/a | def iterable(): |

174 | n/a | s = list(range(10)) |

175 | n/a | for i in range(20): |

176 | n/a | yield s[i] # IndexError when i > 10 |

177 | n/a | with self.assertRaises(IndexError): |

178 | n/a | list(self.module.merge(iterable(), iterable())) |

179 | n/a | |

180 | n/a | def test_merge_stability(self): |

181 | n/a | class Int(int): |

182 | n/a | pass |

183 | n/a | inputs = [[], [], [], []] |

184 | n/a | for i in range(20000): |

185 | n/a | stream = random.randrange(4) |

186 | n/a | x = random.randrange(500) |

187 | n/a | obj = Int(x) |

188 | n/a | obj.pair = (x, stream) |

189 | n/a | inputs[stream].append(obj) |

190 | n/a | for stream in inputs: |

191 | n/a | stream.sort() |

192 | n/a | result = [i.pair for i in self.module.merge(*inputs)] |

193 | n/a | self.assertEqual(result, sorted(result)) |

194 | n/a | |

195 | n/a | def test_nsmallest(self): |

196 | n/a | data = [(random.randrange(2000), i) for i in range(1000)] |

197 | n/a | for f in (None, lambda x: x[0] * 547 % 2000): |

198 | n/a | for n in (0, 1, 2, 10, 100, 400, 999, 1000, 1100): |

199 | n/a | self.assertEqual(list(self.module.nsmallest(n, data)), |

200 | n/a | sorted(data)[:n]) |

201 | n/a | self.assertEqual(list(self.module.nsmallest(n, data, key=f)), |

202 | n/a | sorted(data, key=f)[:n]) |

203 | n/a | |

204 | n/a | def test_nlargest(self): |

205 | n/a | data = [(random.randrange(2000), i) for i in range(1000)] |

206 | n/a | for f in (None, lambda x: x[0] * 547 % 2000): |

207 | n/a | for n in (0, 1, 2, 10, 100, 400, 999, 1000, 1100): |

208 | n/a | self.assertEqual(list(self.module.nlargest(n, data)), |

209 | n/a | sorted(data, reverse=True)[:n]) |

210 | n/a | self.assertEqual(list(self.module.nlargest(n, data, key=f)), |

211 | n/a | sorted(data, key=f, reverse=True)[:n]) |

212 | n/a | |

213 | n/a | def test_comparison_operator(self): |

214 | n/a | # Issue 3051: Make sure heapq works with both __lt__ |

215 | n/a | # For python 3.0, __le__ alone is not enough |

216 | n/a | def hsort(data, comp): |

217 | n/a | data = [comp(x) for x in data] |

218 | n/a | self.module.heapify(data) |

219 | n/a | return [self.module.heappop(data).x for i in range(len(data))] |

220 | n/a | class LT: |

221 | n/a | def __init__(self, x): |

222 | n/a | self.x = x |

223 | n/a | def __lt__(self, other): |

224 | n/a | return self.x > other.x |

225 | n/a | class LE: |

226 | n/a | def __init__(self, x): |

227 | n/a | self.x = x |

228 | n/a | def __le__(self, other): |

229 | n/a | return self.x >= other.x |

230 | n/a | data = [random.random() for i in range(100)] |

231 | n/a | target = sorted(data, reverse=True) |

232 | n/a | self.assertEqual(hsort(data, LT), target) |

233 | n/a | self.assertRaises(TypeError, data, LE) |

234 | n/a | |

235 | n/a | |

236 | n/a | class TestHeapPython(TestHeap, TestCase): |

237 | n/a | module = py_heapq |

238 | n/a | |

239 | n/a | |

240 | n/a | @skipUnless(c_heapq, 'requires _heapq') |

241 | n/a | class TestHeapC(TestHeap, TestCase): |

242 | n/a | module = c_heapq |

243 | n/a | |

244 | n/a | |

245 | n/a | #============================================================================== |

246 | n/a | |

247 | n/a | class LenOnly: |

248 | n/a | "Dummy sequence class defining __len__ but not __getitem__." |

249 | n/a | def __len__(self): |

250 | n/a | return 10 |

251 | n/a | |

252 | n/a | class GetOnly: |

253 | n/a | "Dummy sequence class defining __getitem__ but not __len__." |

254 | n/a | def __getitem__(self, ndx): |

255 | n/a | return 10 |

256 | n/a | |

257 | n/a | class CmpErr: |

258 | n/a | "Dummy element that always raises an error during comparison" |

259 | n/a | def __eq__(self, other): |

260 | n/a | raise ZeroDivisionError |

261 | n/a | __ne__ = __lt__ = __le__ = __gt__ = __ge__ = __eq__ |

262 | n/a | |

263 | n/a | def R(seqn): |

264 | n/a | 'Regular generator' |

265 | n/a | for i in seqn: |

266 | n/a | yield i |

267 | n/a | |

268 | n/a | class G: |

269 | n/a | 'Sequence using __getitem__' |

270 | n/a | def __init__(self, seqn): |

271 | n/a | self.seqn = seqn |

272 | n/a | def __getitem__(self, i): |

273 | n/a | return self.seqn[i] |

274 | n/a | |

275 | n/a | class I: |

276 | n/a | 'Sequence using iterator protocol' |

277 | n/a | def __init__(self, seqn): |

278 | n/a | self.seqn = seqn |

279 | n/a | self.i = 0 |

280 | n/a | def __iter__(self): |

281 | n/a | return self |

282 | n/a | def __next__(self): |

283 | n/a | if self.i >= len(self.seqn): raise StopIteration |

284 | n/a | v = self.seqn[self.i] |

285 | n/a | self.i += 1 |

286 | n/a | return v |

287 | n/a | |

288 | n/a | class Ig: |

289 | n/a | 'Sequence using iterator protocol defined with a generator' |

290 | n/a | def __init__(self, seqn): |

291 | n/a | self.seqn = seqn |

292 | n/a | self.i = 0 |

293 | n/a | def __iter__(self): |

294 | n/a | for val in self.seqn: |

295 | n/a | yield val |

296 | n/a | |

297 | n/a | class X: |

298 | n/a | 'Missing __getitem__ and __iter__' |

299 | n/a | def __init__(self, seqn): |

300 | n/a | self.seqn = seqn |

301 | n/a | self.i = 0 |

302 | n/a | def __next__(self): |

303 | n/a | if self.i >= len(self.seqn): raise StopIteration |

304 | n/a | v = self.seqn[self.i] |

305 | n/a | self.i += 1 |

306 | n/a | return v |

307 | n/a | |

308 | n/a | class N: |

309 | n/a | 'Iterator missing __next__()' |

310 | n/a | def __init__(self, seqn): |

311 | n/a | self.seqn = seqn |

312 | n/a | self.i = 0 |

313 | n/a | def __iter__(self): |

314 | n/a | return self |

315 | n/a | |

316 | n/a | class E: |

317 | n/a | 'Test propagation of exceptions' |

318 | n/a | def __init__(self, seqn): |

319 | n/a | self.seqn = seqn |

320 | n/a | self.i = 0 |

321 | n/a | def __iter__(self): |

322 | n/a | return self |

323 | n/a | def __next__(self): |

324 | n/a | 3 // 0 |

325 | n/a | |

326 | n/a | class S: |

327 | n/a | 'Test immediate stop' |

328 | n/a | def __init__(self, seqn): |

329 | n/a | pass |

330 | n/a | def __iter__(self): |

331 | n/a | return self |

332 | n/a | def __next__(self): |

333 | n/a | raise StopIteration |

334 | n/a | |

335 | n/a | from itertools import chain |

336 | n/a | def L(seqn): |

337 | n/a | 'Test multiple tiers of iterators' |

338 | n/a | return chain(map(lambda x:x, R(Ig(G(seqn))))) |

339 | n/a | |

340 | n/a | |

341 | n/a | class SideEffectLT: |

342 | n/a | def __init__(self, value, heap): |

343 | n/a | self.value = value |

344 | n/a | self.heap = heap |

345 | n/a | |

346 | n/a | def __lt__(self, other): |

347 | n/a | self.heap[:] = [] |

348 | n/a | return self.value < other.value |

349 | n/a | |

350 | n/a | |

351 | n/a | class TestErrorHandling: |

352 | n/a | |

353 | n/a | def test_non_sequence(self): |

354 | n/a | for f in (self.module.heapify, self.module.heappop): |

355 | n/a | self.assertRaises((TypeError, AttributeError), f, 10) |

356 | n/a | for f in (self.module.heappush, self.module.heapreplace, |

357 | n/a | self.module.nlargest, self.module.nsmallest): |

358 | n/a | self.assertRaises((TypeError, AttributeError), f, 10, 10) |

359 | n/a | |

360 | n/a | def test_len_only(self): |

361 | n/a | for f in (self.module.heapify, self.module.heappop): |

362 | n/a | self.assertRaises((TypeError, AttributeError), f, LenOnly()) |

363 | n/a | for f in (self.module.heappush, self.module.heapreplace): |

364 | n/a | self.assertRaises((TypeError, AttributeError), f, LenOnly(), 10) |

365 | n/a | for f in (self.module.nlargest, self.module.nsmallest): |

366 | n/a | self.assertRaises(TypeError, f, 2, LenOnly()) |

367 | n/a | |

368 | n/a | def test_get_only(self): |

369 | n/a | for f in (self.module.heapify, self.module.heappop): |

370 | n/a | self.assertRaises(TypeError, f, GetOnly()) |

371 | n/a | for f in (self.module.heappush, self.module.heapreplace): |

372 | n/a | self.assertRaises(TypeError, f, GetOnly(), 10) |

373 | n/a | for f in (self.module.nlargest, self.module.nsmallest): |

374 | n/a | self.assertRaises(TypeError, f, 2, GetOnly()) |

375 | n/a | |

376 | n/a | def test_get_only(self): |

377 | n/a | seq = [CmpErr(), CmpErr(), CmpErr()] |

378 | n/a | for f in (self.module.heapify, self.module.heappop): |

379 | n/a | self.assertRaises(ZeroDivisionError, f, seq) |

380 | n/a | for f in (self.module.heappush, self.module.heapreplace): |

381 | n/a | self.assertRaises(ZeroDivisionError, f, seq, 10) |

382 | n/a | for f in (self.module.nlargest, self.module.nsmallest): |

383 | n/a | self.assertRaises(ZeroDivisionError, f, 2, seq) |

384 | n/a | |

385 | n/a | def test_arg_parsing(self): |

386 | n/a | for f in (self.module.heapify, self.module.heappop, |

387 | n/a | self.module.heappush, self.module.heapreplace, |

388 | n/a | self.module.nlargest, self.module.nsmallest): |

389 | n/a | self.assertRaises((TypeError, AttributeError), f, 10) |

390 | n/a | |

391 | n/a | def test_iterable_args(self): |

392 | n/a | for f in (self.module.nlargest, self.module.nsmallest): |

393 | n/a | for s in ("123", "", range(1000), (1, 1.2), range(2000,2200,5)): |

394 | n/a | for g in (G, I, Ig, L, R): |

395 | n/a | self.assertEqual(list(f(2, g(s))), list(f(2,s))) |

396 | n/a | self.assertEqual(list(f(2, S(s))), []) |

397 | n/a | self.assertRaises(TypeError, f, 2, X(s)) |

398 | n/a | self.assertRaises(TypeError, f, 2, N(s)) |

399 | n/a | self.assertRaises(ZeroDivisionError, f, 2, E(s)) |

400 | n/a | |

401 | n/a | # Issue #17278: the heap may change size while it's being walked. |

402 | n/a | |

403 | n/a | def test_heappush_mutating_heap(self): |

404 | n/a | heap = [] |

405 | n/a | heap.extend(SideEffectLT(i, heap) for i in range(200)) |

406 | n/a | # Python version raises IndexError, C version RuntimeError |

407 | n/a | with self.assertRaises((IndexError, RuntimeError)): |

408 | n/a | self.module.heappush(heap, SideEffectLT(5, heap)) |

409 | n/a | |

410 | n/a | def test_heappop_mutating_heap(self): |

411 | n/a | heap = [] |

412 | n/a | heap.extend(SideEffectLT(i, heap) for i in range(200)) |

413 | n/a | # Python version raises IndexError, C version RuntimeError |

414 | n/a | with self.assertRaises((IndexError, RuntimeError)): |

415 | n/a | self.module.heappop(heap) |

416 | n/a | |

417 | n/a | |

418 | n/a | class TestErrorHandlingPython(TestErrorHandling, TestCase): |

419 | n/a | module = py_heapq |

420 | n/a | |

421 | n/a | @skipUnless(c_heapq, 'requires _heapq') |

422 | n/a | class TestErrorHandlingC(TestErrorHandling, TestCase): |

423 | n/a | module = c_heapq |

424 | n/a | |

425 | n/a | |

426 | n/a | if __name__ == "__main__": |

427 | n/a | unittest.main() |