17db96d56Sopenharmony_ciimport collections 27db96d56Sopenharmony_ciimport operator 37db96d56Sopenharmony_ciimport pathlib 47db96d56Sopenharmony_ciimport zipfile 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_cifrom . import abc 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_cifrom ._itertools import unique_everseen 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_cidef remove_duplicates(items): 127db96d56Sopenharmony_ci return iter(collections.OrderedDict.fromkeys(items)) 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ciclass FileReader(abc.TraversableResources): 167db96d56Sopenharmony_ci def __init__(self, loader): 177db96d56Sopenharmony_ci self.path = pathlib.Path(loader.path).parent 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ci def resource_path(self, resource): 207db96d56Sopenharmony_ci """ 217db96d56Sopenharmony_ci Return the file system path to prevent 227db96d56Sopenharmony_ci `resources.path()` from creating a temporary 237db96d56Sopenharmony_ci copy. 247db96d56Sopenharmony_ci """ 257db96d56Sopenharmony_ci return str(self.path.joinpath(resource)) 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci def files(self): 287db96d56Sopenharmony_ci return self.path 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ciclass ZipReader(abc.TraversableResources): 327db96d56Sopenharmony_ci def __init__(self, loader, module): 337db96d56Sopenharmony_ci _, _, name = module.rpartition('.') 347db96d56Sopenharmony_ci self.prefix = loader.prefix.replace('\\', '/') + name + '/' 357db96d56Sopenharmony_ci self.archive = loader.archive 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci def open_resource(self, resource): 387db96d56Sopenharmony_ci try: 397db96d56Sopenharmony_ci return super().open_resource(resource) 407db96d56Sopenharmony_ci except KeyError as exc: 417db96d56Sopenharmony_ci raise FileNotFoundError(exc.args[0]) 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci def is_resource(self, path): 447db96d56Sopenharmony_ci # workaround for `zipfile.Path.is_file` returning true 457db96d56Sopenharmony_ci # for non-existent paths. 467db96d56Sopenharmony_ci target = self.files().joinpath(path) 477db96d56Sopenharmony_ci return target.is_file() and target.exists() 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci def files(self): 507db96d56Sopenharmony_ci return zipfile.Path(self.archive, self.prefix) 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ciclass MultiplexedPath(abc.Traversable): 547db96d56Sopenharmony_ci """ 557db96d56Sopenharmony_ci Given a series of Traversable objects, implement a merged 567db96d56Sopenharmony_ci version of the interface across all objects. Useful for 577db96d56Sopenharmony_ci namespace packages which may be multihomed at a single 587db96d56Sopenharmony_ci name. 597db96d56Sopenharmony_ci """ 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ci def __init__(self, *paths): 627db96d56Sopenharmony_ci self._paths = list(map(pathlib.Path, remove_duplicates(paths))) 637db96d56Sopenharmony_ci if not self._paths: 647db96d56Sopenharmony_ci message = 'MultiplexedPath must contain at least one path' 657db96d56Sopenharmony_ci raise FileNotFoundError(message) 667db96d56Sopenharmony_ci if not all(path.is_dir() for path in self._paths): 677db96d56Sopenharmony_ci raise NotADirectoryError('MultiplexedPath only supports directories') 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def iterdir(self): 707db96d56Sopenharmony_ci files = (file for path in self._paths for file in path.iterdir()) 717db96d56Sopenharmony_ci return unique_everseen(files, key=operator.attrgetter('name')) 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci def read_bytes(self): 747db96d56Sopenharmony_ci raise FileNotFoundError(f'{self} is not a file') 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci def read_text(self, *args, **kwargs): 777db96d56Sopenharmony_ci raise FileNotFoundError(f'{self} is not a file') 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def is_dir(self): 807db96d56Sopenharmony_ci return True 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci def is_file(self): 837db96d56Sopenharmony_ci return False 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci def joinpath(self, child): 867db96d56Sopenharmony_ci # first try to find child in current paths 877db96d56Sopenharmony_ci for file in self.iterdir(): 887db96d56Sopenharmony_ci if file.name == child: 897db96d56Sopenharmony_ci return file 907db96d56Sopenharmony_ci # if it does not exist, construct it with the first path 917db96d56Sopenharmony_ci return self._paths[0] / child 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci __truediv__ = joinpath 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def open(self, *args, **kwargs): 967db96d56Sopenharmony_ci raise FileNotFoundError(f'{self} is not a file') 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci @property 997db96d56Sopenharmony_ci def name(self): 1007db96d56Sopenharmony_ci return self._paths[0].name 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci def __repr__(self): 1037db96d56Sopenharmony_ci paths = ', '.join(f"'{path}'" for path in self._paths) 1047db96d56Sopenharmony_ci return f'MultiplexedPath({paths})' 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ci 1077db96d56Sopenharmony_ciclass NamespaceReader(abc.TraversableResources): 1087db96d56Sopenharmony_ci def __init__(self, namespace_path): 1097db96d56Sopenharmony_ci if 'NamespacePath' not in str(namespace_path): 1107db96d56Sopenharmony_ci raise ValueError('Invalid path') 1117db96d56Sopenharmony_ci self.path = MultiplexedPath(*list(namespace_path)) 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci def resource_path(self, resource): 1147db96d56Sopenharmony_ci """ 1157db96d56Sopenharmony_ci Return the file system path to prevent 1167db96d56Sopenharmony_ci `resources.path()` from creating a temporary 1177db96d56Sopenharmony_ci copy. 1187db96d56Sopenharmony_ci """ 1197db96d56Sopenharmony_ci return str(self.path.joinpath(resource)) 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci def files(self): 1227db96d56Sopenharmony_ci return self.path 123