17db96d56Sopenharmony_ciimport collections
27db96d56Sopenharmony_ciimport operator
37db96d56Sopenharmony_ciimport pathlib
47db96d56Sopenharmony_ciimport zipfile
57db96d56Sopenharmony_ci
67db96d56Sopenharmony_cifrom . import abc
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_cifrom ._itertools import unique_everseen
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_cidef remove_duplicates(items):
127db96d56Sopenharmony_ci    return iter(collections.OrderedDict.fromkeys(items))
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_ciclass FileReader(abc.TraversableResources):
167db96d56Sopenharmony_ci    def __init__(self, loader):
177db96d56Sopenharmony_ci        self.path = pathlib.Path(loader.path).parent
187db96d56Sopenharmony_ci
197db96d56Sopenharmony_ci    def resource_path(self, resource):
207db96d56Sopenharmony_ci        """
217db96d56Sopenharmony_ci        Return the file system path to prevent
227db96d56Sopenharmony_ci        `resources.path()` from creating a temporary
237db96d56Sopenharmony_ci        copy.
247db96d56Sopenharmony_ci        """
257db96d56Sopenharmony_ci        return str(self.path.joinpath(resource))
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci    def files(self):
287db96d56Sopenharmony_ci        return self.path
297db96d56Sopenharmony_ci
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ciclass ZipReader(abc.TraversableResources):
327db96d56Sopenharmony_ci    def __init__(self, loader, module):
337db96d56Sopenharmony_ci        _, _, name = module.rpartition('.')
347db96d56Sopenharmony_ci        self.prefix = loader.prefix.replace('\\', '/') + name + '/'
357db96d56Sopenharmony_ci        self.archive = loader.archive
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci    def open_resource(self, resource):
387db96d56Sopenharmony_ci        try:
397db96d56Sopenharmony_ci            return super().open_resource(resource)
407db96d56Sopenharmony_ci        except KeyError as exc:
417db96d56Sopenharmony_ci            raise FileNotFoundError(exc.args[0])
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci    def is_resource(self, path):
447db96d56Sopenharmony_ci        # workaround for `zipfile.Path.is_file` returning true
457db96d56Sopenharmony_ci        # for non-existent paths.
467db96d56Sopenharmony_ci        target = self.files().joinpath(path)
477db96d56Sopenharmony_ci        return target.is_file() and target.exists()
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci    def files(self):
507db96d56Sopenharmony_ci        return zipfile.Path(self.archive, self.prefix)
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ciclass MultiplexedPath(abc.Traversable):
547db96d56Sopenharmony_ci    """
557db96d56Sopenharmony_ci    Given a series of Traversable objects, implement a merged
567db96d56Sopenharmony_ci    version of the interface across all objects. Useful for
577db96d56Sopenharmony_ci    namespace packages which may be multihomed at a single
587db96d56Sopenharmony_ci    name.
597db96d56Sopenharmony_ci    """
607db96d56Sopenharmony_ci
617db96d56Sopenharmony_ci    def __init__(self, *paths):
627db96d56Sopenharmony_ci        self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
637db96d56Sopenharmony_ci        if not self._paths:
647db96d56Sopenharmony_ci            message = 'MultiplexedPath must contain at least one path'
657db96d56Sopenharmony_ci            raise FileNotFoundError(message)
667db96d56Sopenharmony_ci        if not all(path.is_dir() for path in self._paths):
677db96d56Sopenharmony_ci            raise NotADirectoryError('MultiplexedPath only supports directories')
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def iterdir(self):
707db96d56Sopenharmony_ci        files = (file for path in self._paths for file in path.iterdir())
717db96d56Sopenharmony_ci        return unique_everseen(files, key=operator.attrgetter('name'))
727db96d56Sopenharmony_ci
737db96d56Sopenharmony_ci    def read_bytes(self):
747db96d56Sopenharmony_ci        raise FileNotFoundError(f'{self} is not a file')
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci    def read_text(self, *args, **kwargs):
777db96d56Sopenharmony_ci        raise FileNotFoundError(f'{self} is not a file')
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def is_dir(self):
807db96d56Sopenharmony_ci        return True
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci    def is_file(self):
837db96d56Sopenharmony_ci        return False
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_ci    def joinpath(self, child):
867db96d56Sopenharmony_ci        # first try to find child in current paths
877db96d56Sopenharmony_ci        for file in self.iterdir():
887db96d56Sopenharmony_ci            if file.name == child:
897db96d56Sopenharmony_ci                return file
907db96d56Sopenharmony_ci        # if it does not exist, construct it with the first path
917db96d56Sopenharmony_ci        return self._paths[0] / child
927db96d56Sopenharmony_ci
937db96d56Sopenharmony_ci    __truediv__ = joinpath
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci    def open(self, *args, **kwargs):
967db96d56Sopenharmony_ci        raise FileNotFoundError(f'{self} is not a file')
977db96d56Sopenharmony_ci
987db96d56Sopenharmony_ci    @property
997db96d56Sopenharmony_ci    def name(self):
1007db96d56Sopenharmony_ci        return self._paths[0].name
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci    def __repr__(self):
1037db96d56Sopenharmony_ci        paths = ', '.join(f"'{path}'" for path in self._paths)
1047db96d56Sopenharmony_ci        return f'MultiplexedPath({paths})'
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ciclass NamespaceReader(abc.TraversableResources):
1087db96d56Sopenharmony_ci    def __init__(self, namespace_path):
1097db96d56Sopenharmony_ci        if 'NamespacePath' not in str(namespace_path):
1107db96d56Sopenharmony_ci            raise ValueError('Invalid path')
1117db96d56Sopenharmony_ci        self.path = MultiplexedPath(*list(namespace_path))
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci    def resource_path(self, resource):
1147db96d56Sopenharmony_ci        """
1157db96d56Sopenharmony_ci        Return the file system path to prevent
1167db96d56Sopenharmony_ci        `resources.path()` from creating a temporary
1177db96d56Sopenharmony_ci        copy.
1187db96d56Sopenharmony_ci        """
1197db96d56Sopenharmony_ci        return str(self.path.joinpath(resource))
1207db96d56Sopenharmony_ci
1217db96d56Sopenharmony_ci    def files(self):
1227db96d56Sopenharmony_ci        return self.path
123