65c043c87eff27e9405316fdbc0c695f2b347441
[SubU] /
1 import email.message
2 import importlib.metadata
3 import os
4 import pathlib
5 import zipfile
6 from typing import (
7     Collection,
8     Dict,
9     Iterable,
10     Iterator,
11     Mapping,
12     Optional,
13     Sequence,
14     cast,
15 )
16
17 from pip._vendor.packaging.requirements import Requirement
18 from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
19 from pip._vendor.packaging.version import parse as parse_version
20
21 from pip._internal.exceptions import InvalidWheel, UnsupportedWheel
22 from pip._internal.metadata.base import (
23     BaseDistribution,
24     BaseEntryPoint,
25     DistributionVersion,
26     InfoPath,
27     Wheel,
28 )
29 from pip._internal.utils.misc import normalize_path
30 from pip._internal.utils.packaging import safe_extra
31 from pip._internal.utils.temp_dir import TempDirectory
32 from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
33
34 from ._compat import BasePath, get_dist_name
35
36
37 class WheelDistribution(importlib.metadata.Distribution):
38     """An ``importlib.metadata.Distribution`` read from a wheel.
39
40     Although ``importlib.metadata.PathDistribution`` accepts ``zipfile.Path``,
41     its implementation is too "lazy" for pip's needs (we can't keep the ZipFile
42     handle open for the entire lifetime of the distribution object).
43
44     This implementation eagerly reads the entire metadata directory into the
45     memory instead, and operates from that.
46     """
47
48     def __init__(
49         self,
50         files: Mapping[pathlib.PurePosixPath, bytes],
51         info_location: pathlib.PurePosixPath,
52     ) -> None:
53         self._files = files
54         self.info_location = info_location
55
56     @classmethod
57     def from_zipfile(
58         cls,
59         zf: zipfile.ZipFile,
60         name: str,
61         location: str,
62     ) -> "WheelDistribution":
63         info_dir, _ = parse_wheel(zf, name)
64         paths = (
65             (name, pathlib.PurePosixPath(name.split("/", 1)[-1]))
66             for name in zf.namelist()
67             if name.startswith(f"{info_dir}/")
68         )
69         files = {
70             relpath: read_wheel_metadata_file(zf, fullpath)
71             for fullpath, relpath in paths
72         }
73         info_location = pathlib.PurePosixPath(location, info_dir)
74         return cls(files, info_location)
75
76     def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]:
77         # Only allow iterating through the metadata directory.
78         if pathlib.PurePosixPath(str(path)) in self._files:
79             return iter(self._files)
80         raise FileNotFoundError(path)
81
82     def read_text(self, filename: str) -> Optional[str]:
83         try:
84             data = self._files[pathlib.PurePosixPath(filename)]
85         except KeyError:
86             return None
87         try:
88             text = data.decode("utf-8")
89         except UnicodeDecodeError as e:
90             wheel = self.info_location.parent
91             error = f"Error decoding metadata for {wheel}: {e} in {filename} file"
92             raise UnsupportedWheel(error)
93         return text
94
95
96 class Distribution(BaseDistribution):
97     def __init__(
98         self,
99         dist: importlib.metadata.Distribution,
100         info_location: Optional[BasePath],
101         installed_location: Optional[BasePath],
102     ) -> None:
103         self._dist = dist
104         self._info_location = info_location
105         self._installed_location = installed_location
106
107     @classmethod
108     def from_directory(cls, directory: str) -> BaseDistribution:
109         info_location = pathlib.Path(directory)
110         dist = importlib.metadata.Distribution.at(info_location)
111         return cls(dist, info_location, info_location.parent)
112
113     @classmethod
114     def from_metadata_file_contents(
115         cls,
116         metadata_contents: bytes,
117         filename: str,
118         project_name: str,
119     ) -> BaseDistribution:
120         # Generate temp dir to contain the metadata file, and write the file contents.
121         temp_dir = pathlib.Path(
122             TempDirectory(kind="metadata", globally_managed=True).path
123         )
124         metadata_path = temp_dir / "METADATA"
125         metadata_path.write_bytes(metadata_contents)
126         # Construct dist pointing to the newly created directory.
127         dist = importlib.metadata.Distribution.at(metadata_path.parent)
128         return cls(dist, metadata_path.parent, None)
129
130     @classmethod
131     def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
132         try:
133             with wheel.as_zipfile() as zf:
134                 dist = WheelDistribution.from_zipfile(zf, name, wheel.location)
135         except zipfile.BadZipFile as e:
136             raise InvalidWheel(wheel.location, name) from e
137         except UnsupportedWheel as e:
138             raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
139         return cls(dist, dist.info_location, pathlib.PurePosixPath(wheel.location))
140
141     @property
142     def location(self) -> Optional[str]:
143         if self._info_location is None:
144             return None
145         return str(self._info_location.parent)
146
147     @property
148     def info_location(self) -> Optional[str]:
149         if self._info_location is None:
150             return None
151         return str(self._info_location)
152
153     @property
154     def installed_location(self) -> Optional[str]:
155         if self._installed_location is None:
156             return None
157         return normalize_path(str(self._installed_location))
158
159     def _get_dist_name_from_location(self) -> Optional[str]:
160         """Try to get the name from the metadata directory name.
161
162         This is much faster than reading metadata.
163         """
164         if self._info_location is None:
165             return None
166         stem, suffix = os.path.splitext(self._info_location.name)
167         if suffix not in (".dist-info", ".egg-info"):
168             return None
169         return stem.split("-", 1)[0]
170
171     @property
172     def canonical_name(self) -> NormalizedName:
173         name = self._get_dist_name_from_location() or get_dist_name(self._dist)
174         return canonicalize_name(name)
175
176     @property
177     def version(self) -> DistributionVersion:
178         return parse_version(self._dist.version)
179
180     def is_file(self, path: InfoPath) -> bool:
181         return self._dist.read_text(str(path)) is not None
182
183     def iter_distutils_script_names(self) -> Iterator[str]:
184         # A distutils installation is always "flat" (not in e.g. egg form), so
185         # if this distribution's info location is NOT a pathlib.Path (but e.g.
186         # zipfile.Path), it can never contain any distutils scripts.
187         if not isinstance(self._info_location, pathlib.Path):
188             return
189         for child in self._info_location.joinpath("scripts").iterdir():
190             yield child.name
191
192     def read_text(self, path: InfoPath) -> str:
193         content = self._dist.read_text(str(path))
194         if content is None:
195             raise FileNotFoundError(path)
196         return content
197
198     def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
199         # importlib.metadata's EntryPoint structure sasitfies BaseEntryPoint.
200         return self._dist.entry_points
201
202     def _metadata_impl(self) -> email.message.Message:
203         # From Python 3.10+, importlib.metadata declares PackageMetadata as the
204         # return type. This protocol is unfortunately a disaster now and misses
205         # a ton of fields that we need, including get() and get_payload(). We
206         # rely on the implementation that the object is actually a Message now,
207         # until upstream can improve the protocol. (python/cpython#94952)
208         return cast(email.message.Message, self._dist.metadata)
209
210     def iter_provided_extras(self) -> Iterable[str]:
211         return (
212             safe_extra(extra) for extra in self.metadata.get_all("Provides-Extra", [])
213         )
214
215     def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
216         contexts: Sequence[Dict[str, str]] = [{"extra": safe_extra(e)} for e in extras]
217         for req_string in self.metadata.get_all("Requires-Dist", []):
218             req = Requirement(req_string)
219             if not req.marker:
220                 yield req
221             elif not extras and req.marker.evaluate({"extra": ""}):
222                 yield req
223             elif any(req.marker.evaluate(context) for context in contexts):
224                 yield req