cbec59e2c6d3238afd29b4d46626a1550f849e2b
[SubU] /
1 import functools
2 import importlib.metadata
3 import logging
4 import os
5 import pathlib
6 import sys
7 import zipfile
8 import zipimport
9 from typing import Iterator, List, Optional, Sequence, Set, Tuple
10
11 from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
12
13 from pip._internal.metadata.base import BaseDistribution, BaseEnvironment
14 from pip._internal.models.wheel import Wheel
15 from pip._internal.utils.deprecation import deprecated
16 from pip._internal.utils.filetypes import WHEEL_EXTENSION
17
18 from ._compat import BadMetadata, BasePath, get_dist_name, get_info_location
19 from ._dists import Distribution
20
21 logger = logging.getLogger(__name__)
22
23
24 def _looks_like_wheel(location: str) -> bool:
25     if not location.endswith(WHEEL_EXTENSION):
26         return False
27     if not os.path.isfile(location):
28         return False
29     if not Wheel.wheel_file_re.match(os.path.basename(location)):
30         return False
31     return zipfile.is_zipfile(location)
32
33
34 class _DistributionFinder:
35     """Finder to locate distributions.
36
37     The main purpose of this class is to memoize found distributions' names, so
38     only one distribution is returned for each package name. At lot of pip code
39     assumes this (because it is setuptools's behavior), and not doing the same
40     can potentially cause a distribution in lower precedence path to override a
41     higher precedence one if the caller is not careful.
42
43     Eventually we probably want to make it possible to see lower precedence
44     installations as well. It's useful feature, after all.
45     """
46
47     FoundResult = Tuple[importlib.metadata.Distribution, Optional[BasePath]]
48
49     def __init__(self) -> None:
50         self._found_names: Set[NormalizedName] = set()
51
52     def _find_impl(self, location: str) -> Iterator[FoundResult]:
53         """Find distributions in a location."""
54         # Skip looking inside a wheel. Since a package inside a wheel is not
55         # always valid (due to .data directories etc.), its .dist-info entry
56         # should not be considered an installed distribution.
57         if _looks_like_wheel(location):
58             return
59         # To know exactly where we find a distribution, we have to feed in the
60         # paths one by one, instead of dumping the list to importlib.metadata.
61         for dist in importlib.metadata.distributions(path=[location]):
62             info_location = get_info_location(dist)
63             try:
64                 raw_name = get_dist_name(dist)
65             except BadMetadata as e:
66                 logger.warning("Skipping %s due to %s", info_location, e.reason)
67                 continue
68             normalized_name = canonicalize_name(raw_name)
69             if normalized_name in self._found_names:
70                 continue
71             self._found_names.add(normalized_name)
72             yield dist, info_location
73
74     def find(self, location: str) -> Iterator[BaseDistribution]:
75         """Find distributions in a location.
76
77         The path can be either a directory, or a ZIP archive.
78         """
79         for dist, info_location in self._find_impl(location):
80             if info_location is None:
81                 installed_location: Optional[BasePath] = None
82             else:
83                 installed_location = info_location.parent
84             yield Distribution(dist, info_location, installed_location)
85
86     def find_linked(self, location: str) -> Iterator[BaseDistribution]:
87         """Read location in egg-link files and return distributions in there.
88
89         The path should be a directory; otherwise this returns nothing. This
90         follows how setuptools does this for compatibility. The first non-empty
91         line in the egg-link is read as a path (resolved against the egg-link's
92         containing directory if relative). Distributions found at that linked
93         location are returned.
94         """
95         path = pathlib.Path(location)
96         if not path.is_dir():
97             return
98         for child in path.iterdir():
99             if child.suffix != ".egg-link":
100                 continue
101             with child.open() as f:
102                 lines = (line.strip() for line in f)
103                 target_rel = next((line for line in lines if line), "")
104             if not target_rel:
105                 continue
106             target_location = str(path.joinpath(target_rel))
107             for dist, info_location in self._find_impl(target_location):
108                 yield Distribution(dist, info_location, path)
109
110     def _find_eggs_in_dir(self, location: str) -> Iterator[BaseDistribution]:
111         from pip._vendor.pkg_resources import find_distributions
112
113         from pip._internal.metadata import pkg_resources as legacy
114
115         with os.scandir(location) as it:
116             for entry in it:
117                 if not entry.name.endswith(".egg"):
118                     continue
119                 for dist in find_distributions(entry.path):
120                     yield legacy.Distribution(dist)
121
122     def _find_eggs_in_zip(self, location: str) -> Iterator[BaseDistribution]:
123         from pip._vendor.pkg_resources import find_eggs_in_zip
124
125         from pip._internal.metadata import pkg_resources as legacy
126
127         try:
128             importer = zipimport.zipimporter(location)
129         except zipimport.ZipImportError:
130             return
131         for dist in find_eggs_in_zip(importer, location):
132             yield legacy.Distribution(dist)
133
134     def find_eggs(self, location: str) -> Iterator[BaseDistribution]:
135         """Find eggs in a location.
136
137         This actually uses the old *pkg_resources* backend. We likely want to
138         deprecate this so we can eventually remove the *pkg_resources*
139         dependency entirely. Before that, this should first emit a deprecation
140         warning for some versions when using the fallback since importing
141         *pkg_resources* is slow for those who don't need it.
142         """
143         if os.path.isdir(location):
144             yield from self._find_eggs_in_dir(location)
145         if zipfile.is_zipfile(location):
146             yield from self._find_eggs_in_zip(location)
147
148
149 @functools.lru_cache(maxsize=None)  # Warn a distribution exactly once.
150 def _emit_egg_deprecation(location: Optional[str]) -> None:
151     deprecated(
152         reason=f"Loading egg at {location} is deprecated.",
153         replacement="to use pip for package installation.",
154         gone_in=None,
155     )
156
157
158 class Environment(BaseEnvironment):
159     def __init__(self, paths: Sequence[str]) -> None:
160         self._paths = paths
161
162     @classmethod
163     def default(cls) -> BaseEnvironment:
164         return cls(sys.path)
165
166     @classmethod
167     def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
168         if paths is None:
169             return cls(sys.path)
170         return cls(paths)
171
172     def _iter_distributions(self) -> Iterator[BaseDistribution]:
173         finder = _DistributionFinder()
174         for location in self._paths:
175             yield from finder.find(location)
176             for dist in finder.find_eggs(location):
177                 # _emit_egg_deprecation(dist.location)  # TODO: Enable this.
178                 yield dist
179             # This must go last because that's how pkg_resources tie-breaks.
180             yield from finder.find_linked(location)
181
182     def get_distribution(self, name: str) -> Optional[BaseDistribution]:
183         matches = (
184             distribution
185             for distribution in self.iter_all_distributions()
186             if distribution.canonical_name == canonicalize_name(name)
187         )
188         return next(matches, None)