6300dfc57f051e461776b82591471c7dc7fc486d
[SubU] /
1 import collections
2 import math
3 from typing import (
4     TYPE_CHECKING,
5     Dict,
6     Iterable,
7     Iterator,
8     Mapping,
9     Sequence,
10     TypeVar,
11     Union,
12 )
13
14 from pip._vendor.resolvelib.providers import AbstractProvider
15
16 from .base import Candidate, Constraint, Requirement
17 from .candidates import REQUIRES_PYTHON_IDENTIFIER
18 from .factory import Factory
19
20 if TYPE_CHECKING:
21     from pip._vendor.resolvelib.providers import Preference
22     from pip._vendor.resolvelib.resolvers import RequirementInformation
23
24     PreferenceInformation = RequirementInformation[Requirement, Candidate]
25
26     _ProviderBase = AbstractProvider[Requirement, Candidate, str]
27 else:
28     _ProviderBase = AbstractProvider
29
30 # Notes on the relationship between the provider, the factory, and the
31 # candidate and requirement classes.
32 #
33 # The provider is a direct implementation of the resolvelib class. Its role
34 # is to deliver the API that resolvelib expects.
35 #
36 # Rather than work with completely abstract "requirement" and "candidate"
37 # concepts as resolvelib does, pip has concrete classes implementing these two
38 # ideas. The API of Requirement and Candidate objects are defined in the base
39 # classes, but essentially map fairly directly to the equivalent provider
40 # methods. In particular, `find_matches` and `is_satisfied_by` are
41 # requirement methods, and `get_dependencies` is a candidate method.
42 #
43 # The factory is the interface to pip's internal mechanisms. It is stateless,
44 # and is created by the resolver and held as a property of the provider. It is
45 # responsible for creating Requirement and Candidate objects, and provides
46 # services to those objects (access to pip's finder and preparer).
47
48
49 D = TypeVar("D")
50 V = TypeVar("V")
51
52
53 def _get_with_identifier(
54     mapping: Mapping[str, V],
55     identifier: str,
56     default: D,
57 ) -> Union[D, V]:
58     """Get item from a package name lookup mapping with a resolver identifier.
59
60     This extra logic is needed when the target mapping is keyed by package
61     name, which cannot be directly looked up with an identifier (which may
62     contain requested extras). Additional logic is added to also look up a value
63     by "cleaning up" the extras from the identifier.
64     """
65     if identifier in mapping:
66         return mapping[identifier]
67     # HACK: Theoretically we should check whether this identifier is a valid
68     # "NAME[EXTRAS]" format, and parse out the name part with packaging or
69     # some regular expression. But since pip's resolver only spits out three
70     # kinds of identifiers: normalized PEP 503 names, normalized names plus
71     # extras, and Requires-Python, we can cheat a bit here.
72     name, open_bracket, _ = identifier.partition("[")
73     if open_bracket and name in mapping:
74         return mapping[name]
75     return default
76
77
78 class PipProvider(_ProviderBase):
79     """Pip's provider implementation for resolvelib.
80
81     :params constraints: A mapping of constraints specified by the user. Keys
82         are canonicalized project names.
83     :params ignore_dependencies: Whether the user specified ``--no-deps``.
84     :params upgrade_strategy: The user-specified upgrade strategy.
85     :params user_requested: A set of canonicalized package names that the user
86         supplied for pip to install/upgrade.
87     """
88
89     def __init__(
90         self,
91         factory: Factory,
92         constraints: Dict[str, Constraint],
93         ignore_dependencies: bool,
94         upgrade_strategy: str,
95         user_requested: Dict[str, int],
96     ) -> None:
97         self._factory = factory
98         self._constraints = constraints
99         self._ignore_dependencies = ignore_dependencies
100         self._upgrade_strategy = upgrade_strategy
101         self._user_requested = user_requested
102         self._known_depths: Dict[str, float] = collections.defaultdict(lambda: math.inf)
103
104     def identify(self, requirement_or_candidate: Union[Requirement, Candidate]) -> str:
105         return requirement_or_candidate.name
106
107     def get_preference(  # type: ignore
108         self,
109         identifier: str,
110         resolutions: Mapping[str, Candidate],
111         candidates: Mapping[str, Iterator[Candidate]],
112         information: Mapping[str, Iterable["PreferenceInformation"]],
113         backtrack_causes: Sequence["PreferenceInformation"],
114     ) -> "Preference":
115         """Produce a sort key for given requirement based on preference.
116
117         The lower the return value is, the more preferred this group of
118         arguments is.
119
120         Currently pip considers the following in order:
121
122         * Prefer if any of the known requirements is "direct", e.g. points to an
123           explicit URL.
124         * If equal, prefer if any requirement is "pinned", i.e. contains
125           operator ``===`` or ``==``.
126         * If equal, calculate an approximate "depth" and resolve requirements
127           closer to the user-specified requirements first.
128         * Order user-specified requirements by the order they are specified.
129         * If equal, prefers "non-free" requirements, i.e. contains at least one
130           operator, such as ``>=`` or ``<``.
131         * If equal, order alphabetically for consistency (helps debuggability).
132         """
133         lookups = (r.get_candidate_lookup() for r, _ in information[identifier])
134         candidate, ireqs = zip(*lookups)
135         operators = [
136             specifier.operator
137             for specifier_set in (ireq.specifier for ireq in ireqs if ireq)
138             for specifier in specifier_set
139         ]
140
141         direct = candidate is not None
142         pinned = any(op[:2] == "==" for op in operators)
143         unfree = bool(operators)
144
145         try:
146             requested_order: Union[int, float] = self._user_requested[identifier]
147         except KeyError:
148             requested_order = math.inf
149             parent_depths = (
150                 self._known_depths[parent.name] if parent is not None else 0.0
151                 for _, parent in information[identifier]
152             )
153             inferred_depth = min(d for d in parent_depths) + 1.0
154         else:
155             inferred_depth = 1.0
156         self._known_depths[identifier] = inferred_depth
157
158         requested_order = self._user_requested.get(identifier, math.inf)
159
160         # Requires-Python has only one candidate and the check is basically
161         # free, so we always do it first to avoid needless work if it fails.
162         requires_python = identifier == REQUIRES_PYTHON_IDENTIFIER
163
164         # HACK: Setuptools have a very long and solid backward compatibility
165         # track record, and extremely few projects would request a narrow,
166         # non-recent version range of it since that would break a lot things.
167         # (Most projects specify it only to request for an installer feature,
168         # which does not work, but that's another topic.) Intentionally
169         # delaying Setuptools helps reduce branches the resolver has to check.
170         # This serves as a temporary fix for issues like "apache-airflow[all]"
171         # while we work on "proper" branch pruning techniques.
172         delay_this = identifier == "setuptools"
173
174         # Prefer the causes of backtracking on the assumption that the problem
175         # resolving the dependency tree is related to the failures that caused
176         # the backtracking
177         backtrack_cause = self.is_backtrack_cause(identifier, backtrack_causes)
178
179         return (
180             not requires_python,
181             delay_this,
182             not direct,
183             not pinned,
184             not backtrack_cause,
185             inferred_depth,
186             requested_order,
187             not unfree,
188             identifier,
189         )
190
191     def find_matches(
192         self,
193         identifier: str,
194         requirements: Mapping[str, Iterator[Requirement]],
195         incompatibilities: Mapping[str, Iterator[Candidate]],
196     ) -> Iterable[Candidate]:
197         def _eligible_for_upgrade(identifier: str) -> bool:
198             """Are upgrades allowed for this project?
199
200             This checks the upgrade strategy, and whether the project was one
201             that the user specified in the command line, in order to decide
202             whether we should upgrade if there's a newer version available.
203
204             (Note that we don't need access to the `--upgrade` flag, because
205             an upgrade strategy of "to-satisfy-only" means that `--upgrade`
206             was not specified).
207             """
208             if self._upgrade_strategy == "eager":
209                 return True
210             elif self._upgrade_strategy == "only-if-needed":
211                 user_order = _get_with_identifier(
212                     self._user_requested,
213                     identifier,
214                     default=None,
215                 )
216                 return user_order is not None
217             return False
218
219         constraint = _get_with_identifier(
220             self._constraints,
221             identifier,
222             default=Constraint.empty(),
223         )
224         return self._factory.find_candidates(
225             identifier=identifier,
226             requirements=requirements,
227             constraint=constraint,
228             prefers_installed=(not _eligible_for_upgrade(identifier)),
229             incompatibilities=incompatibilities,
230         )
231
232     def is_satisfied_by(self, requirement: Requirement, candidate: Candidate) -> bool:
233         return requirement.is_satisfied_by(candidate)
234
235     def get_dependencies(self, candidate: Candidate) -> Sequence[Requirement]:
236         with_requires = not self._ignore_dependencies
237         return [r for r in candidate.iter_dependencies(with_requires) if r is not None]
238
239     @staticmethod
240     def is_backtrack_cause(
241         identifier: str, backtrack_causes: Sequence["PreferenceInformation"]
242     ) -> bool:
243         for backtrack_cause in backtrack_causes:
244             if identifier == backtrack_cause.requirement.name:
245                 return True
246             if backtrack_cause.parent and identifier == backtrack_cause.parent.name:
247                 return True
248         return False