17db96d56Sopenharmony_ci"""Cache lines from Python source files. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciThis is intended to read lines from modules imported -- hence if a filename 47db96d56Sopenharmony_ciis not found, it will look down the module search path for a file by 57db96d56Sopenharmony_cithat name. 67db96d56Sopenharmony_ci""" 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ciimport functools 97db96d56Sopenharmony_ciimport sys 107db96d56Sopenharmony_ciimport os 117db96d56Sopenharmony_ciimport tokenize 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ci__all__ = ["getline", "clearcache", "checkcache", "lazycache"] 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci# The cache. Maps filenames to either a thunk which will provide source code, 177db96d56Sopenharmony_ci# or a tuple (size, mtime, lines, fullname) once loaded. 187db96d56Sopenharmony_cicache = {} 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_cidef clearcache(): 227db96d56Sopenharmony_ci """Clear the cache entirely.""" 237db96d56Sopenharmony_ci cache.clear() 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_cidef getline(filename, lineno, module_globals=None): 277db96d56Sopenharmony_ci """Get a line for a Python source file from the cache. 287db96d56Sopenharmony_ci Update the cache if it doesn't contain an entry for this file already.""" 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_ci lines = getlines(filename, module_globals) 317db96d56Sopenharmony_ci if 1 <= lineno <= len(lines): 327db96d56Sopenharmony_ci return lines[lineno - 1] 337db96d56Sopenharmony_ci return '' 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_cidef getlines(filename, module_globals=None): 377db96d56Sopenharmony_ci """Get the lines for a Python source file from the cache. 387db96d56Sopenharmony_ci Update the cache if it doesn't contain an entry for this file already.""" 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci if filename in cache: 417db96d56Sopenharmony_ci entry = cache[filename] 427db96d56Sopenharmony_ci if len(entry) != 1: 437db96d56Sopenharmony_ci return cache[filename][2] 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci try: 467db96d56Sopenharmony_ci return updatecache(filename, module_globals) 477db96d56Sopenharmony_ci except MemoryError: 487db96d56Sopenharmony_ci clearcache() 497db96d56Sopenharmony_ci return [] 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_cidef checkcache(filename=None): 537db96d56Sopenharmony_ci """Discard cache entries that are out of date. 547db96d56Sopenharmony_ci (This is not checked upon each call!)""" 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci if filename is None: 577db96d56Sopenharmony_ci filenames = list(cache.keys()) 587db96d56Sopenharmony_ci elif filename in cache: 597db96d56Sopenharmony_ci filenames = [filename] 607db96d56Sopenharmony_ci else: 617db96d56Sopenharmony_ci return 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci for filename in filenames: 647db96d56Sopenharmony_ci entry = cache[filename] 657db96d56Sopenharmony_ci if len(entry) == 1: 667db96d56Sopenharmony_ci # lazy cache entry, leave it lazy. 677db96d56Sopenharmony_ci continue 687db96d56Sopenharmony_ci size, mtime, lines, fullname = entry 697db96d56Sopenharmony_ci if mtime is None: 707db96d56Sopenharmony_ci continue # no-op for files loaded via a __loader__ 717db96d56Sopenharmony_ci try: 727db96d56Sopenharmony_ci stat = os.stat(fullname) 737db96d56Sopenharmony_ci except OSError: 747db96d56Sopenharmony_ci cache.pop(filename, None) 757db96d56Sopenharmony_ci continue 767db96d56Sopenharmony_ci if size != stat.st_size or mtime != stat.st_mtime: 777db96d56Sopenharmony_ci cache.pop(filename, None) 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_cidef updatecache(filename, module_globals=None): 817db96d56Sopenharmony_ci """Update a cache entry and return its list of lines. 827db96d56Sopenharmony_ci If something's wrong, print a message, discard the cache entry, 837db96d56Sopenharmony_ci and return an empty list.""" 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci if filename in cache: 867db96d56Sopenharmony_ci if len(cache[filename]) != 1: 877db96d56Sopenharmony_ci cache.pop(filename, None) 887db96d56Sopenharmony_ci if not filename or (filename.startswith('<') and filename.endswith('>')): 897db96d56Sopenharmony_ci return [] 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci fullname = filename 927db96d56Sopenharmony_ci try: 937db96d56Sopenharmony_ci stat = os.stat(fullname) 947db96d56Sopenharmony_ci except OSError: 957db96d56Sopenharmony_ci basename = filename 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci # Realise a lazy loader based lookup if there is one 987db96d56Sopenharmony_ci # otherwise try to lookup right now. 997db96d56Sopenharmony_ci if lazycache(filename, module_globals): 1007db96d56Sopenharmony_ci try: 1017db96d56Sopenharmony_ci data = cache[filename][0]() 1027db96d56Sopenharmony_ci except (ImportError, OSError): 1037db96d56Sopenharmony_ci pass 1047db96d56Sopenharmony_ci else: 1057db96d56Sopenharmony_ci if data is None: 1067db96d56Sopenharmony_ci # No luck, the PEP302 loader cannot find the source 1077db96d56Sopenharmony_ci # for this module. 1087db96d56Sopenharmony_ci return [] 1097db96d56Sopenharmony_ci cache[filename] = ( 1107db96d56Sopenharmony_ci len(data), 1117db96d56Sopenharmony_ci None, 1127db96d56Sopenharmony_ci [line + '\n' for line in data.splitlines()], 1137db96d56Sopenharmony_ci fullname 1147db96d56Sopenharmony_ci ) 1157db96d56Sopenharmony_ci return cache[filename][2] 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci # Try looking through the module search path, which is only useful 1187db96d56Sopenharmony_ci # when handling a relative filename. 1197db96d56Sopenharmony_ci if os.path.isabs(filename): 1207db96d56Sopenharmony_ci return [] 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci for dirname in sys.path: 1237db96d56Sopenharmony_ci try: 1247db96d56Sopenharmony_ci fullname = os.path.join(dirname, basename) 1257db96d56Sopenharmony_ci except (TypeError, AttributeError): 1267db96d56Sopenharmony_ci # Not sufficiently string-like to do anything useful with. 1277db96d56Sopenharmony_ci continue 1287db96d56Sopenharmony_ci try: 1297db96d56Sopenharmony_ci stat = os.stat(fullname) 1307db96d56Sopenharmony_ci break 1317db96d56Sopenharmony_ci except OSError: 1327db96d56Sopenharmony_ci pass 1337db96d56Sopenharmony_ci else: 1347db96d56Sopenharmony_ci return [] 1357db96d56Sopenharmony_ci try: 1367db96d56Sopenharmony_ci with tokenize.open(fullname) as fp: 1377db96d56Sopenharmony_ci lines = fp.readlines() 1387db96d56Sopenharmony_ci except (OSError, UnicodeDecodeError, SyntaxError): 1397db96d56Sopenharmony_ci return [] 1407db96d56Sopenharmony_ci if lines and not lines[-1].endswith('\n'): 1417db96d56Sopenharmony_ci lines[-1] += '\n' 1427db96d56Sopenharmony_ci size, mtime = stat.st_size, stat.st_mtime 1437db96d56Sopenharmony_ci cache[filename] = size, mtime, lines, fullname 1447db96d56Sopenharmony_ci return lines 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci 1477db96d56Sopenharmony_cidef lazycache(filename, module_globals): 1487db96d56Sopenharmony_ci """Seed the cache for filename with module_globals. 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci The module loader will be asked for the source only when getlines is 1517db96d56Sopenharmony_ci called, not immediately. 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci If there is an entry in the cache already, it is not altered. 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ci :return: True if a lazy load is registered in the cache, 1567db96d56Sopenharmony_ci otherwise False. To register such a load a module loader with a 1577db96d56Sopenharmony_ci get_source method must be found, the filename must be a cacheable 1587db96d56Sopenharmony_ci filename, and the filename must not be already cached. 1597db96d56Sopenharmony_ci """ 1607db96d56Sopenharmony_ci if filename in cache: 1617db96d56Sopenharmony_ci if len(cache[filename]) == 1: 1627db96d56Sopenharmony_ci return True 1637db96d56Sopenharmony_ci else: 1647db96d56Sopenharmony_ci return False 1657db96d56Sopenharmony_ci if not filename or (filename.startswith('<') and filename.endswith('>')): 1667db96d56Sopenharmony_ci return False 1677db96d56Sopenharmony_ci # Try for a __loader__, if available 1687db96d56Sopenharmony_ci if module_globals and '__name__' in module_globals: 1697db96d56Sopenharmony_ci name = module_globals['__name__'] 1707db96d56Sopenharmony_ci if (loader := module_globals.get('__loader__')) is None: 1717db96d56Sopenharmony_ci if spec := module_globals.get('__spec__'): 1727db96d56Sopenharmony_ci try: 1737db96d56Sopenharmony_ci loader = spec.loader 1747db96d56Sopenharmony_ci except AttributeError: 1757db96d56Sopenharmony_ci pass 1767db96d56Sopenharmony_ci get_source = getattr(loader, 'get_source', None) 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci if name and get_source: 1797db96d56Sopenharmony_ci get_lines = functools.partial(get_source, name) 1807db96d56Sopenharmony_ci cache[filename] = (get_lines,) 1817db96d56Sopenharmony_ci return True 1827db96d56Sopenharmony_ci return False 183