ea363d86a564b5450666aa00aecd46353326a75a
[SubU] /
1 from contextlib import suppress
2 from io import TextIOWrapper
3
4 from . import abc
5
6
7 class SpecLoaderAdapter:
8     """
9     Adapt a package spec to adapt the underlying loader.
10     """
11
12     def __init__(self, spec, adapter=lambda spec: spec.loader):
13         self.spec = spec
14         self.loader = adapter(spec)
15
16     def __getattr__(self, name):
17         return getattr(self.spec, name)
18
19
20 class TraversableResourcesLoader:
21     """
22     Adapt a loader to provide TraversableResources.
23     """
24
25     def __init__(self, spec):
26         self.spec = spec
27
28     def get_resource_reader(self, name):
29         return CompatibilityFiles(self.spec)._native()
30
31
32 def _io_wrapper(file, mode='r', *args, **kwargs):
33     if mode == 'r':
34         return TextIOWrapper(file, *args, **kwargs)
35     elif mode == 'rb':
36         return file
37     raise ValueError(
38         "Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
39     )
40
41
42 class CompatibilityFiles:
43     """
44     Adapter for an existing or non-existent resource reader
45     to provide a compatibility .files().
46     """
47
48     class SpecPath(abc.Traversable):
49         """
50         Path tied to a module spec.
51         Can be read and exposes the resource reader children.
52         """
53
54         def __init__(self, spec, reader):
55             self._spec = spec
56             self._reader = reader
57
58         def iterdir(self):
59             if not self._reader:
60                 return iter(())
61             return iter(
62                 CompatibilityFiles.ChildPath(self._reader, path)
63                 for path in self._reader.contents()
64             )
65
66         def is_file(self):
67             return False
68
69         is_dir = is_file
70
71         def joinpath(self, other):
72             if not self._reader:
73                 return CompatibilityFiles.OrphanPath(other)
74             return CompatibilityFiles.ChildPath(self._reader, other)
75
76         @property
77         def name(self):
78             return self._spec.name
79
80         def open(self, mode='r', *args, **kwargs):
81             return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
82
83     class ChildPath(abc.Traversable):
84         """
85         Path tied to a resource reader child.
86         Can be read but doesn't expose any meaningful children.
87         """
88
89         def __init__(self, reader, name):
90             self._reader = reader
91             self._name = name
92
93         def iterdir(self):
94             return iter(())
95
96         def is_file(self):
97             return self._reader.is_resource(self.name)
98
99         def is_dir(self):
100             return not self.is_file()
101
102         def joinpath(self, other):
103             return CompatibilityFiles.OrphanPath(self.name, other)
104
105         @property
106         def name(self):
107             return self._name
108
109         def open(self, mode='r', *args, **kwargs):
110             return _io_wrapper(
111                 self._reader.open_resource(self.name), mode, *args, **kwargs
112             )
113
114     class OrphanPath(abc.Traversable):
115         """
116         Orphan path, not tied to a module spec or resource reader.
117         Can't be read and doesn't expose any meaningful children.
118         """
119
120         def __init__(self, *path_parts):
121             if len(path_parts) < 1:
122                 raise ValueError('Need at least one path part to construct a path')
123             self._path = path_parts
124
125         def iterdir(self):
126             return iter(())
127
128         def is_file(self):
129             return False
130
131         is_dir = is_file
132
133         def joinpath(self, other):
134             return CompatibilityFiles.OrphanPath(*self._path, other)
135
136         @property
137         def name(self):
138             return self._path[-1]
139
140         def open(self, mode='r', *args, **kwargs):
141             raise FileNotFoundError("Can't open orphan path")
142
143     def __init__(self, spec):
144         self.spec = spec
145
146     @property
147     def _reader(self):
148         with suppress(AttributeError):
149             return self.spec.loader.get_resource_reader(self.spec.name)
150
151     def _native(self):
152         """
153         Return the native reader if it supports files().
154         """
155         reader = self._reader
156         return reader if hasattr(reader, 'files') else self
157
158     def __getattr__(self, attr):
159         return getattr(self._reader, attr)
160
161     def files(self):
162         return CompatibilityFiles.SpecPath(self.spec, self._reader)
163
164
165 def wrap_spec(package):
166     """
167     Construct a package spec with traversable compatibility
168     on the spec/loader/reader.
169     """
170     return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)