6621549b8449130d2d01ebac0a3649d8b70c4f91
[SubU] /
1 import contextlib
2 import hashlib
3 import logging
4 import os
5 from types import TracebackType
6 from typing import Dict, Generator, Optional, Set, Type, Union
7
8 from pip._internal.models.link import Link
9 from pip._internal.req.req_install import InstallRequirement
10 from pip._internal.utils.temp_dir import TempDirectory
11
12 logger = logging.getLogger(__name__)
13
14
15 @contextlib.contextmanager
16 def update_env_context_manager(**changes: str) -> Generator[None, None, None]:
17     target = os.environ
18
19     # Save values from the target and change them.
20     non_existent_marker = object()
21     saved_values: Dict[str, Union[object, str]] = {}
22     for name, new_value in changes.items():
23         try:
24             saved_values[name] = target[name]
25         except KeyError:
26             saved_values[name] = non_existent_marker
27         target[name] = new_value
28
29     try:
30         yield
31     finally:
32         # Restore original values in the target.
33         for name, original_value in saved_values.items():
34             if original_value is non_existent_marker:
35                 del target[name]
36             else:
37                 assert isinstance(original_value, str)  # for mypy
38                 target[name] = original_value
39
40
41 @contextlib.contextmanager
42 def get_build_tracker() -> Generator["BuildTracker", None, None]:
43     root = os.environ.get("PIP_BUILD_TRACKER")
44     with contextlib.ExitStack() as ctx:
45         if root is None:
46             root = ctx.enter_context(TempDirectory(kind="build-tracker")).path
47             ctx.enter_context(update_env_context_manager(PIP_BUILD_TRACKER=root))
48             logger.debug("Initialized build tracking at %s", root)
49
50         with BuildTracker(root) as tracker:
51             yield tracker
52
53
54 class BuildTracker:
55     def __init__(self, root: str) -> None:
56         self._root = root
57         self._entries: Set[InstallRequirement] = set()
58         logger.debug("Created build tracker: %s", self._root)
59
60     def __enter__(self) -> "BuildTracker":
61         logger.debug("Entered build tracker: %s", self._root)
62         return self
63
64     def __exit__(
65         self,
66         exc_type: Optional[Type[BaseException]],
67         exc_val: Optional[BaseException],
68         exc_tb: Optional[TracebackType],
69     ) -> None:
70         self.cleanup()
71
72     def _entry_path(self, link: Link) -> str:
73         hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
74         return os.path.join(self._root, hashed)
75
76     def add(self, req: InstallRequirement) -> None:
77         """Add an InstallRequirement to build tracking."""
78
79         assert req.link
80         # Get the file to write information about this requirement.
81         entry_path = self._entry_path(req.link)
82
83         # Try reading from the file. If it exists and can be read from, a build
84         # is already in progress, so a LookupError is raised.
85         try:
86             with open(entry_path) as fp:
87                 contents = fp.read()
88         except FileNotFoundError:
89             pass
90         else:
91             message = "{} is already being built: {}".format(req.link, contents)
92             raise LookupError(message)
93
94         # If we're here, req should really not be building already.
95         assert req not in self._entries
96
97         # Start tracking this requirement.
98         with open(entry_path, "w", encoding="utf-8") as fp:
99             fp.write(str(req))
100         self._entries.add(req)
101
102         logger.debug("Added %s to build tracker %r", req, self._root)
103
104     def remove(self, req: InstallRequirement) -> None:
105         """Remove an InstallRequirement from build tracking."""
106
107         assert req.link
108         # Delete the created file and the corresponding entries.
109         os.unlink(self._entry_path(req.link))
110         self._entries.remove(req)
111
112         logger.debug("Removed %s from build tracker %r", req, self._root)
113
114     def cleanup(self) -> None:
115         for req in set(self._entries):
116             self.remove(req)
117
118         logger.debug("Removed build tracker: %r", self._root)
119
120     @contextlib.contextmanager
121     def track(self, req: InstallRequirement) -> Generator[None, None, None]:
122         self.add(req)
123         yield
124         self.remove(req)