ยปCore Development>Code coverage>Lib/multiprocessing/heap.py

Python code coverage for Lib/multiprocessing/heap.py

#countcontent
1n/a#
2n/a# Module which supports allocation of memory from an mmap
3n/a#
4n/a# multiprocessing/heap.py
5n/a#
6n/a# Copyright (c) 2006-2008, R Oudkerk
7n/a# Licensed to PSF under a Contributor Agreement.
8n/a#
9n/a
10n/aimport bisect
11n/aimport mmap
12n/aimport os
13n/aimport sys
14n/aimport tempfile
15n/aimport threading
16n/a
17n/afrom .context import reduction, assert_spawning
18n/afrom . import util
19n/a
20n/a__all__ = ['BufferWrapper']
21n/a
22n/a#
23n/a# Inheritable class which wraps an mmap, and from which blocks can be allocated
24n/a#
25n/a
26n/aif sys.platform == 'win32':
27n/a
28n/a import _winapi
29n/a
30n/a class Arena(object):
31n/a
32n/a _rand = tempfile._RandomNameSequence()
33n/a
34n/a def __init__(self, size):
35n/a self.size = size
36n/a for i in range(100):
37n/a name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
38n/a buf = mmap.mmap(-1, size, tagname=name)
39n/a if _winapi.GetLastError() == 0:
40n/a break
41n/a # We have reopened a preexisting mmap.
42n/a buf.close()
43n/a else:
44n/a raise FileExistsError('Cannot find name for new mmap')
45n/a self.name = name
46n/a self.buffer = buf
47n/a self._state = (self.size, self.name)
48n/a
49n/a def __getstate__(self):
50n/a assert_spawning(self)
51n/a return self._state
52n/a
53n/a def __setstate__(self, state):
54n/a self.size, self.name = self._state = state
55n/a self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
56n/a # XXX Temporarily preventing buildbot failures while determining
57n/a # XXX the correct long-term fix. See issue 23060
58n/a #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
59n/a
60n/aelse:
61n/a
62n/a class Arena(object):
63n/a
64n/a def __init__(self, size, fd=-1):
65n/a self.size = size
66n/a self.fd = fd
67n/a if fd == -1:
68n/a self.fd, name = tempfile.mkstemp(
69n/a prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
70n/a os.unlink(name)
71n/a util.Finalize(self, os.close, (self.fd,))
72n/a with open(self.fd, 'wb', closefd=False) as f:
73n/a bs = 1024 * 1024
74n/a if size >= bs:
75n/a zeros = b'\0' * bs
76n/a for _ in range(size // bs):
77n/a f.write(zeros)
78n/a del zeros
79n/a f.write(b'\0' * (size % bs))
80n/a assert f.tell() == size
81n/a self.buffer = mmap.mmap(self.fd, self.size)
82n/a
83n/a def reduce_arena(a):
84n/a if a.fd == -1:
85n/a raise ValueError('Arena is unpicklable because '
86n/a 'forking was enabled when it was created')
87n/a return rebuild_arena, (a.size, reduction.DupFd(a.fd))
88n/a
89n/a def rebuild_arena(size, dupfd):
90n/a return Arena(size, dupfd.detach())
91n/a
92n/a reduction.register(Arena, reduce_arena)
93n/a
94n/a#
95n/a# Class allowing allocation of chunks of memory from arenas
96n/a#
97n/a
98n/aclass Heap(object):
99n/a
100n/a _alignment = 8
101n/a
102n/a def __init__(self, size=mmap.PAGESIZE):
103n/a self._lastpid = os.getpid()
104n/a self._lock = threading.Lock()
105n/a self._size = size
106n/a self._lengths = []
107n/a self._len_to_seq = {}
108n/a self._start_to_block = {}
109n/a self._stop_to_block = {}
110n/a self._allocated_blocks = set()
111n/a self._arenas = []
112n/a # list of pending blocks to free - see free() comment below
113n/a self._pending_free_blocks = []
114n/a
115n/a @staticmethod
116n/a def _roundup(n, alignment):
117n/a # alignment must be a power of 2
118n/a mask = alignment - 1
119n/a return (n + mask) & ~mask
120n/a
121n/a def _malloc(self, size):
122n/a # returns a large enough block -- it might be much larger
123n/a i = bisect.bisect_left(self._lengths, size)
124n/a if i == len(self._lengths):
125n/a length = self._roundup(max(self._size, size), mmap.PAGESIZE)
126n/a self._size *= 2
127n/a util.info('allocating a new mmap of length %d', length)
128n/a arena = Arena(length)
129n/a self._arenas.append(arena)
130n/a return (arena, 0, length)
131n/a else:
132n/a length = self._lengths[i]
133n/a seq = self._len_to_seq[length]
134n/a block = seq.pop()
135n/a if not seq:
136n/a del self._len_to_seq[length], self._lengths[i]
137n/a
138n/a (arena, start, stop) = block
139n/a del self._start_to_block[(arena, start)]
140n/a del self._stop_to_block[(arena, stop)]
141n/a return block
142n/a
143n/a def _free(self, block):
144n/a # free location and try to merge with neighbours
145n/a (arena, start, stop) = block
146n/a
147n/a try:
148n/a prev_block = self._stop_to_block[(arena, start)]
149n/a except KeyError:
150n/a pass
151n/a else:
152n/a start, _ = self._absorb(prev_block)
153n/a
154n/a try:
155n/a next_block = self._start_to_block[(arena, stop)]
156n/a except KeyError:
157n/a pass
158n/a else:
159n/a _, stop = self._absorb(next_block)
160n/a
161n/a block = (arena, start, stop)
162n/a length = stop - start
163n/a
164n/a try:
165n/a self._len_to_seq[length].append(block)
166n/a except KeyError:
167n/a self._len_to_seq[length] = [block]
168n/a bisect.insort(self._lengths, length)
169n/a
170n/a self._start_to_block[(arena, start)] = block
171n/a self._stop_to_block[(arena, stop)] = block
172n/a
173n/a def _absorb(self, block):
174n/a # deregister this block so it can be merged with a neighbour
175n/a (arena, start, stop) = block
176n/a del self._start_to_block[(arena, start)]
177n/a del self._stop_to_block[(arena, stop)]
178n/a
179n/a length = stop - start
180n/a seq = self._len_to_seq[length]
181n/a seq.remove(block)
182n/a if not seq:
183n/a del self._len_to_seq[length]
184n/a self._lengths.remove(length)
185n/a
186n/a return start, stop
187n/a
188n/a def _free_pending_blocks(self):
189n/a # Free all the blocks in the pending list - called with the lock held.
190n/a while True:
191n/a try:
192n/a block = self._pending_free_blocks.pop()
193n/a except IndexError:
194n/a break
195n/a self._allocated_blocks.remove(block)
196n/a self._free(block)
197n/a
198n/a def free(self, block):
199n/a # free a block returned by malloc()
200n/a # Since free() can be called asynchronously by the GC, it could happen
201n/a # that it's called while self._lock is held: in that case,
202n/a # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
203n/a # trylock is used instead, and if the lock can't be acquired
204n/a # immediately, the block is added to a list of blocks to be freed
205n/a # synchronously sometimes later from malloc() or free(), by calling
206n/a # _free_pending_blocks() (appending and retrieving from a list is not
207n/a # strictly thread-safe but under cPython it's atomic thanks to the GIL).
208n/a assert os.getpid() == self._lastpid
209n/a if not self._lock.acquire(False):
210n/a # can't acquire the lock right now, add the block to the list of
211n/a # pending blocks to free
212n/a self._pending_free_blocks.append(block)
213n/a else:
214n/a # we hold the lock
215n/a try:
216n/a self._free_pending_blocks()
217n/a self._allocated_blocks.remove(block)
218n/a self._free(block)
219n/a finally:
220n/a self._lock.release()
221n/a
222n/a def malloc(self, size):
223n/a # return a block of right size (possibly rounded up)
224n/a assert 0 <= size < sys.maxsize
225n/a if os.getpid() != self._lastpid:
226n/a self.__init__() # reinitialize after fork
227n/a with self._lock:
228n/a self._free_pending_blocks()
229n/a size = self._roundup(max(size,1), self._alignment)
230n/a (arena, start, stop) = self._malloc(size)
231n/a new_stop = start + size
232n/a if new_stop < stop:
233n/a self._free((arena, new_stop, stop))
234n/a block = (arena, start, new_stop)
235n/a self._allocated_blocks.add(block)
236n/a return block
237n/a
238n/a#
239n/a# Class representing a chunk of an mmap -- can be inherited by child process
240n/a#
241n/a
242n/aclass BufferWrapper(object):
243n/a
244n/a _heap = Heap()
245n/a
246n/a def __init__(self, size):
247n/a assert 0 <= size < sys.maxsize
248n/a block = BufferWrapper._heap.malloc(size)
249n/a self._state = (block, size)
250n/a util.Finalize(self, BufferWrapper._heap.free, args=(block,))
251n/a
252n/a def create_memoryview(self):
253n/a (arena, start, stop), size = self._state
254n/a return memoryview(arena.buffer)[start:start+size]