17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# Module which supports allocation of memory from an mmap 37db96d56Sopenharmony_ci# 47db96d56Sopenharmony_ci# multiprocessing/heap.py 57db96d56Sopenharmony_ci# 67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk 77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 87db96d56Sopenharmony_ci# 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ciimport bisect 117db96d56Sopenharmony_cifrom collections import defaultdict 127db96d56Sopenharmony_ciimport mmap 137db96d56Sopenharmony_ciimport os 147db96d56Sopenharmony_ciimport sys 157db96d56Sopenharmony_ciimport tempfile 167db96d56Sopenharmony_ciimport threading 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_cifrom .context import reduction, assert_spawning 197db96d56Sopenharmony_cifrom . import util 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ci__all__ = ['BufferWrapper'] 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ci# 247db96d56Sopenharmony_ci# Inheritable class which wraps an mmap, and from which blocks can be allocated 257db96d56Sopenharmony_ci# 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ciif sys.platform == 'win32': 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci import _winapi 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci class Arena(object): 327db96d56Sopenharmony_ci """ 337db96d56Sopenharmony_ci A shared memory area backed by anonymous memory (Windows). 347db96d56Sopenharmony_ci """ 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci _rand = tempfile._RandomNameSequence() 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci def __init__(self, size): 397db96d56Sopenharmony_ci self.size = size 407db96d56Sopenharmony_ci for i in range(100): 417db96d56Sopenharmony_ci name = 'pym-%d-%s' % (os.getpid(), next(self._rand)) 427db96d56Sopenharmony_ci buf = mmap.mmap(-1, size, tagname=name) 437db96d56Sopenharmony_ci if _winapi.GetLastError() == 0: 447db96d56Sopenharmony_ci break 457db96d56Sopenharmony_ci # We have reopened a preexisting mmap. 467db96d56Sopenharmony_ci buf.close() 477db96d56Sopenharmony_ci else: 487db96d56Sopenharmony_ci raise FileExistsError('Cannot find name for new mmap') 497db96d56Sopenharmony_ci self.name = name 507db96d56Sopenharmony_ci self.buffer = buf 517db96d56Sopenharmony_ci self._state = (self.size, self.name) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci def __getstate__(self): 547db96d56Sopenharmony_ci assert_spawning(self) 557db96d56Sopenharmony_ci return self._state 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci def __setstate__(self, state): 587db96d56Sopenharmony_ci self.size, self.name = self._state = state 597db96d56Sopenharmony_ci # Reopen existing mmap 607db96d56Sopenharmony_ci self.buffer = mmap.mmap(-1, self.size, tagname=self.name) 617db96d56Sopenharmony_ci # XXX Temporarily preventing buildbot failures while determining 627db96d56Sopenharmony_ci # XXX the correct long-term fix. See issue 23060 637db96d56Sopenharmony_ci #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_cielse: 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci class Arena(object): 687db96d56Sopenharmony_ci """ 697db96d56Sopenharmony_ci A shared memory area backed by a temporary file (POSIX). 707db96d56Sopenharmony_ci """ 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci if sys.platform == 'linux': 737db96d56Sopenharmony_ci _dir_candidates = ['/dev/shm'] 747db96d56Sopenharmony_ci else: 757db96d56Sopenharmony_ci _dir_candidates = [] 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def __init__(self, size, fd=-1): 787db96d56Sopenharmony_ci self.size = size 797db96d56Sopenharmony_ci self.fd = fd 807db96d56Sopenharmony_ci if fd == -1: 817db96d56Sopenharmony_ci # Arena is created anew (if fd != -1, it means we're coming 827db96d56Sopenharmony_ci # from rebuild_arena() below) 837db96d56Sopenharmony_ci self.fd, name = tempfile.mkstemp( 847db96d56Sopenharmony_ci prefix='pym-%d-'%os.getpid(), 857db96d56Sopenharmony_ci dir=self._choose_dir(size)) 867db96d56Sopenharmony_ci os.unlink(name) 877db96d56Sopenharmony_ci util.Finalize(self, os.close, (self.fd,)) 887db96d56Sopenharmony_ci os.ftruncate(self.fd, size) 897db96d56Sopenharmony_ci self.buffer = mmap.mmap(self.fd, self.size) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def _choose_dir(self, size): 927db96d56Sopenharmony_ci # Choose a non-storage backed directory if possible, 937db96d56Sopenharmony_ci # to improve performance 947db96d56Sopenharmony_ci for d in self._dir_candidates: 957db96d56Sopenharmony_ci st = os.statvfs(d) 967db96d56Sopenharmony_ci if st.f_bavail * st.f_frsize >= size: # enough free space? 977db96d56Sopenharmony_ci return d 987db96d56Sopenharmony_ci return util.get_temp_dir() 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci def reduce_arena(a): 1017db96d56Sopenharmony_ci if a.fd == -1: 1027db96d56Sopenharmony_ci raise ValueError('Arena is unpicklable because ' 1037db96d56Sopenharmony_ci 'forking was enabled when it was created') 1047db96d56Sopenharmony_ci return rebuild_arena, (a.size, reduction.DupFd(a.fd)) 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ci def rebuild_arena(size, dupfd): 1077db96d56Sopenharmony_ci return Arena(size, dupfd.detach()) 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci reduction.register(Arena, reduce_arena) 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci# 1127db96d56Sopenharmony_ci# Class allowing allocation of chunks of memory from arenas 1137db96d56Sopenharmony_ci# 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ciclass Heap(object): 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci # Minimum malloc() alignment 1187db96d56Sopenharmony_ci _alignment = 8 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB 1217db96d56Sopenharmony_ci _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def __init__(self, size=mmap.PAGESIZE): 1247db96d56Sopenharmony_ci self._lastpid = os.getpid() 1257db96d56Sopenharmony_ci self._lock = threading.Lock() 1267db96d56Sopenharmony_ci # Current arena allocation size 1277db96d56Sopenharmony_ci self._size = size 1287db96d56Sopenharmony_ci # A sorted list of available block sizes in arenas 1297db96d56Sopenharmony_ci self._lengths = [] 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ci # Free block management: 1327db96d56Sopenharmony_ci # - map each block size to a list of `(Arena, start, stop)` blocks 1337db96d56Sopenharmony_ci self._len_to_seq = {} 1347db96d56Sopenharmony_ci # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block 1357db96d56Sopenharmony_ci # starting at that offset 1367db96d56Sopenharmony_ci self._start_to_block = {} 1377db96d56Sopenharmony_ci # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block 1387db96d56Sopenharmony_ci # ending at that offset 1397db96d56Sopenharmony_ci self._stop_to_block = {} 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ci # Map arenas to their `(Arena, start, stop)` blocks in use 1427db96d56Sopenharmony_ci self._allocated_blocks = defaultdict(set) 1437db96d56Sopenharmony_ci self._arenas = [] 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci # List of pending blocks to free - see comment in free() below 1467db96d56Sopenharmony_ci self._pending_free_blocks = [] 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ci # Statistics 1497db96d56Sopenharmony_ci self._n_mallocs = 0 1507db96d56Sopenharmony_ci self._n_frees = 0 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci @staticmethod 1537db96d56Sopenharmony_ci def _roundup(n, alignment): 1547db96d56Sopenharmony_ci # alignment must be a power of 2 1557db96d56Sopenharmony_ci mask = alignment - 1 1567db96d56Sopenharmony_ci return (n + mask) & ~mask 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_ci def _new_arena(self, size): 1597db96d56Sopenharmony_ci # Create a new arena with at least the given *size* 1607db96d56Sopenharmony_ci length = self._roundup(max(self._size, size), mmap.PAGESIZE) 1617db96d56Sopenharmony_ci # We carve larger and larger arenas, for efficiency, until we 1627db96d56Sopenharmony_ci # reach a large-ish size (roughly L3 cache-sized) 1637db96d56Sopenharmony_ci if self._size < self._DOUBLE_ARENA_SIZE_UNTIL: 1647db96d56Sopenharmony_ci self._size *= 2 1657db96d56Sopenharmony_ci util.info('allocating a new mmap of length %d', length) 1667db96d56Sopenharmony_ci arena = Arena(length) 1677db96d56Sopenharmony_ci self._arenas.append(arena) 1687db96d56Sopenharmony_ci return (arena, 0, length) 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci def _discard_arena(self, arena): 1717db96d56Sopenharmony_ci # Possibly delete the given (unused) arena 1727db96d56Sopenharmony_ci length = arena.size 1737db96d56Sopenharmony_ci # Reusing an existing arena is faster than creating a new one, so 1747db96d56Sopenharmony_ci # we only reclaim space if it's large enough. 1757db96d56Sopenharmony_ci if length < self._DISCARD_FREE_SPACE_LARGER_THAN: 1767db96d56Sopenharmony_ci return 1777db96d56Sopenharmony_ci blocks = self._allocated_blocks.pop(arena) 1787db96d56Sopenharmony_ci assert not blocks 1797db96d56Sopenharmony_ci del self._start_to_block[(arena, 0)] 1807db96d56Sopenharmony_ci del self._stop_to_block[(arena, length)] 1817db96d56Sopenharmony_ci self._arenas.remove(arena) 1827db96d56Sopenharmony_ci seq = self._len_to_seq[length] 1837db96d56Sopenharmony_ci seq.remove((arena, 0, length)) 1847db96d56Sopenharmony_ci if not seq: 1857db96d56Sopenharmony_ci del self._len_to_seq[length] 1867db96d56Sopenharmony_ci self._lengths.remove(length) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci def _malloc(self, size): 1897db96d56Sopenharmony_ci # returns a large enough block -- it might be much larger 1907db96d56Sopenharmony_ci i = bisect.bisect_left(self._lengths, size) 1917db96d56Sopenharmony_ci if i == len(self._lengths): 1927db96d56Sopenharmony_ci return self._new_arena(size) 1937db96d56Sopenharmony_ci else: 1947db96d56Sopenharmony_ci length = self._lengths[i] 1957db96d56Sopenharmony_ci seq = self._len_to_seq[length] 1967db96d56Sopenharmony_ci block = seq.pop() 1977db96d56Sopenharmony_ci if not seq: 1987db96d56Sopenharmony_ci del self._len_to_seq[length], self._lengths[i] 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci (arena, start, stop) = block 2017db96d56Sopenharmony_ci del self._start_to_block[(arena, start)] 2027db96d56Sopenharmony_ci del self._stop_to_block[(arena, stop)] 2037db96d56Sopenharmony_ci return block 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci def _add_free_block(self, block): 2067db96d56Sopenharmony_ci # make block available and try to merge with its neighbours in the arena 2077db96d56Sopenharmony_ci (arena, start, stop) = block 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci try: 2107db96d56Sopenharmony_ci prev_block = self._stop_to_block[(arena, start)] 2117db96d56Sopenharmony_ci except KeyError: 2127db96d56Sopenharmony_ci pass 2137db96d56Sopenharmony_ci else: 2147db96d56Sopenharmony_ci start, _ = self._absorb(prev_block) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci try: 2177db96d56Sopenharmony_ci next_block = self._start_to_block[(arena, stop)] 2187db96d56Sopenharmony_ci except KeyError: 2197db96d56Sopenharmony_ci pass 2207db96d56Sopenharmony_ci else: 2217db96d56Sopenharmony_ci _, stop = self._absorb(next_block) 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci block = (arena, start, stop) 2247db96d56Sopenharmony_ci length = stop - start 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci try: 2277db96d56Sopenharmony_ci self._len_to_seq[length].append(block) 2287db96d56Sopenharmony_ci except KeyError: 2297db96d56Sopenharmony_ci self._len_to_seq[length] = [block] 2307db96d56Sopenharmony_ci bisect.insort(self._lengths, length) 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci self._start_to_block[(arena, start)] = block 2337db96d56Sopenharmony_ci self._stop_to_block[(arena, stop)] = block 2347db96d56Sopenharmony_ci 2357db96d56Sopenharmony_ci def _absorb(self, block): 2367db96d56Sopenharmony_ci # deregister this block so it can be merged with a neighbour 2377db96d56Sopenharmony_ci (arena, start, stop) = block 2387db96d56Sopenharmony_ci del self._start_to_block[(arena, start)] 2397db96d56Sopenharmony_ci del self._stop_to_block[(arena, stop)] 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci length = stop - start 2427db96d56Sopenharmony_ci seq = self._len_to_seq[length] 2437db96d56Sopenharmony_ci seq.remove(block) 2447db96d56Sopenharmony_ci if not seq: 2457db96d56Sopenharmony_ci del self._len_to_seq[length] 2467db96d56Sopenharmony_ci self._lengths.remove(length) 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci return start, stop 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci def _remove_allocated_block(self, block): 2517db96d56Sopenharmony_ci arena, start, stop = block 2527db96d56Sopenharmony_ci blocks = self._allocated_blocks[arena] 2537db96d56Sopenharmony_ci blocks.remove((start, stop)) 2547db96d56Sopenharmony_ci if not blocks: 2557db96d56Sopenharmony_ci # Arena is entirely free, discard it from this process 2567db96d56Sopenharmony_ci self._discard_arena(arena) 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci def _free_pending_blocks(self): 2597db96d56Sopenharmony_ci # Free all the blocks in the pending list - called with the lock held. 2607db96d56Sopenharmony_ci while True: 2617db96d56Sopenharmony_ci try: 2627db96d56Sopenharmony_ci block = self._pending_free_blocks.pop() 2637db96d56Sopenharmony_ci except IndexError: 2647db96d56Sopenharmony_ci break 2657db96d56Sopenharmony_ci self._add_free_block(block) 2667db96d56Sopenharmony_ci self._remove_allocated_block(block) 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def free(self, block): 2697db96d56Sopenharmony_ci # free a block returned by malloc() 2707db96d56Sopenharmony_ci # Since free() can be called asynchronously by the GC, it could happen 2717db96d56Sopenharmony_ci # that it's called while self._lock is held: in that case, 2727db96d56Sopenharmony_ci # self._lock.acquire() would deadlock (issue #12352). To avoid that, a 2737db96d56Sopenharmony_ci # trylock is used instead, and if the lock can't be acquired 2747db96d56Sopenharmony_ci # immediately, the block is added to a list of blocks to be freed 2757db96d56Sopenharmony_ci # synchronously sometimes later from malloc() or free(), by calling 2767db96d56Sopenharmony_ci # _free_pending_blocks() (appending and retrieving from a list is not 2777db96d56Sopenharmony_ci # strictly thread-safe but under CPython it's atomic thanks to the GIL). 2787db96d56Sopenharmony_ci if os.getpid() != self._lastpid: 2797db96d56Sopenharmony_ci raise ValueError( 2807db96d56Sopenharmony_ci "My pid ({0:n}) is not last pid {1:n}".format( 2817db96d56Sopenharmony_ci os.getpid(),self._lastpid)) 2827db96d56Sopenharmony_ci if not self._lock.acquire(False): 2837db96d56Sopenharmony_ci # can't acquire the lock right now, add the block to the list of 2847db96d56Sopenharmony_ci # pending blocks to free 2857db96d56Sopenharmony_ci self._pending_free_blocks.append(block) 2867db96d56Sopenharmony_ci else: 2877db96d56Sopenharmony_ci # we hold the lock 2887db96d56Sopenharmony_ci try: 2897db96d56Sopenharmony_ci self._n_frees += 1 2907db96d56Sopenharmony_ci self._free_pending_blocks() 2917db96d56Sopenharmony_ci self._add_free_block(block) 2927db96d56Sopenharmony_ci self._remove_allocated_block(block) 2937db96d56Sopenharmony_ci finally: 2947db96d56Sopenharmony_ci self._lock.release() 2957db96d56Sopenharmony_ci 2967db96d56Sopenharmony_ci def malloc(self, size): 2977db96d56Sopenharmony_ci # return a block of right size (possibly rounded up) 2987db96d56Sopenharmony_ci if size < 0: 2997db96d56Sopenharmony_ci raise ValueError("Size {0:n} out of range".format(size)) 3007db96d56Sopenharmony_ci if sys.maxsize <= size: 3017db96d56Sopenharmony_ci raise OverflowError("Size {0:n} too large".format(size)) 3027db96d56Sopenharmony_ci if os.getpid() != self._lastpid: 3037db96d56Sopenharmony_ci self.__init__() # reinitialize after fork 3047db96d56Sopenharmony_ci with self._lock: 3057db96d56Sopenharmony_ci self._n_mallocs += 1 3067db96d56Sopenharmony_ci # allow pending blocks to be marked available 3077db96d56Sopenharmony_ci self._free_pending_blocks() 3087db96d56Sopenharmony_ci size = self._roundup(max(size, 1), self._alignment) 3097db96d56Sopenharmony_ci (arena, start, stop) = self._malloc(size) 3107db96d56Sopenharmony_ci real_stop = start + size 3117db96d56Sopenharmony_ci if real_stop < stop: 3127db96d56Sopenharmony_ci # if the returned block is larger than necessary, mark 3137db96d56Sopenharmony_ci # the remainder available 3147db96d56Sopenharmony_ci self._add_free_block((arena, real_stop, stop)) 3157db96d56Sopenharmony_ci self._allocated_blocks[arena].add((start, real_stop)) 3167db96d56Sopenharmony_ci return (arena, start, real_stop) 3177db96d56Sopenharmony_ci 3187db96d56Sopenharmony_ci# 3197db96d56Sopenharmony_ci# Class wrapping a block allocated out of a Heap -- can be inherited by child process 3207db96d56Sopenharmony_ci# 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ciclass BufferWrapper(object): 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci _heap = Heap() 3257db96d56Sopenharmony_ci 3267db96d56Sopenharmony_ci def __init__(self, size): 3277db96d56Sopenharmony_ci if size < 0: 3287db96d56Sopenharmony_ci raise ValueError("Size {0:n} out of range".format(size)) 3297db96d56Sopenharmony_ci if sys.maxsize <= size: 3307db96d56Sopenharmony_ci raise OverflowError("Size {0:n} too large".format(size)) 3317db96d56Sopenharmony_ci block = BufferWrapper._heap.malloc(size) 3327db96d56Sopenharmony_ci self._state = (block, size) 3337db96d56Sopenharmony_ci util.Finalize(self, BufferWrapper._heap.free, args=(block,)) 3347db96d56Sopenharmony_ci 3357db96d56Sopenharmony_ci def create_memoryview(self): 3367db96d56Sopenharmony_ci (arena, start, stop), size = self._state 3377db96d56Sopenharmony_ci return memoryview(arena.buffer)[start:start+size] 338