c79941398a2c1d502e60cd0dd0703d8c0530a30f
[SubU] /
1 """Support for installing and building the "wheel" binary package format.
2 """
3
4 import collections
5 import compileall
6 import contextlib
7 import csv
8 import importlib
9 import logging
10 import os.path
11 import re
12 import shutil
13 import sys
14 import warnings
15 from base64 import urlsafe_b64encode
16 from email.message import Message
17 from itertools import chain, filterfalse, starmap
18 from typing import (
19     IO,
20     TYPE_CHECKING,
21     Any,
22     BinaryIO,
23     Callable,
24     Dict,
25     Generator,
26     Iterable,
27     Iterator,
28     List,
29     NewType,
30     Optional,
31     Sequence,
32     Set,
33     Tuple,
34     Union,
35     cast,
36 )
37 from zipfile import ZipFile, ZipInfo
38
39 from pip._vendor.distlib.scripts import ScriptMaker
40 from pip._vendor.distlib.util import get_export_entry
41 from pip._vendor.packaging.utils import canonicalize_name
42
43 from pip._internal.exceptions import InstallationError
44 from pip._internal.locations import get_major_minor_version
45 from pip._internal.metadata import (
46     BaseDistribution,
47     FilesystemWheel,
48     get_wheel_distribution,
49 )
50 from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
51 from pip._internal.models.scheme import SCHEME_KEYS, Scheme
52 from pip._internal.utils.filesystem import adjacent_tmp_file, replace
53 from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition
54 from pip._internal.utils.unpacking import (
55     current_umask,
56     is_within_directory,
57     set_extracted_file_to_default_mode_plus_executable,
58     zip_item_is_executable,
59 )
60 from pip._internal.utils.wheel import parse_wheel
61
62 if TYPE_CHECKING:
63     from typing import Protocol
64
65     class File(Protocol):
66         src_record_path: "RecordPath"
67         dest_path: str
68         changed: bool
69
70         def save(self) -> None:
71             pass
72
73
74 logger = logging.getLogger(__name__)
75
76 RecordPath = NewType("RecordPath", str)
77 InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
78
79
80 def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
81     """Return (encoded_digest, length) for path using hashlib.sha256()"""
82     h, length = hash_file(path, blocksize)
83     digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
84     return (digest, str(length))
85
86
87 def csv_io_kwargs(mode: str) -> Dict[str, Any]:
88     """Return keyword arguments to properly open a CSV file
89     in the given mode.
90     """
91     return {"mode": mode, "newline": "", "encoding": "utf-8"}
92
93
94 def fix_script(path: str) -> bool:
95     """Replace #!python with #!/path/to/python
96     Return True if file was changed.
97     """
98     # XXX RECORD hashes will need to be updated
99     assert os.path.isfile(path)
100
101     with open(path, "rb") as script:
102         firstline = script.readline()
103         if not firstline.startswith(b"#!python"):
104             return False
105         exename = sys.executable.encode(sys.getfilesystemencoding())
106         firstline = b"#!" + exename + os.linesep.encode("ascii")
107         rest = script.read()
108     with open(path, "wb") as script:
109         script.write(firstline)
110         script.write(rest)
111     return True
112
113
114 def wheel_root_is_purelib(metadata: Message) -> bool:
115     return metadata.get("Root-Is-Purelib", "").lower() == "true"
116
117
118 def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]:
119     console_scripts = {}
120     gui_scripts = {}
121     for entry_point in dist.iter_entry_points():
122         if entry_point.group == "console_scripts":
123             console_scripts[entry_point.name] = entry_point.value
124         elif entry_point.group == "gui_scripts":
125             gui_scripts[entry_point.name] = entry_point.value
126     return console_scripts, gui_scripts
127
128
129 def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
130     """Determine if any scripts are not on PATH and format a warning.
131     Returns a warning message if one or more scripts are not on PATH,
132     otherwise None.
133     """
134     if not scripts:
135         return None
136
137     # Group scripts by the path they were installed in
138     grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
139     for destfile in scripts:
140         parent_dir = os.path.dirname(destfile)
141         script_name = os.path.basename(destfile)
142         grouped_by_dir[parent_dir].add(script_name)
143
144     # We don't want to warn for directories that are on PATH.
145     not_warn_dirs = [
146         os.path.normcase(i).rstrip(os.sep)
147         for i in os.environ.get("PATH", "").split(os.pathsep)
148     ]
149     # If an executable sits with sys.executable, we don't warn for it.
150     #     This covers the case of venv invocations without activating the venv.
151     not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
152     warn_for: Dict[str, Set[str]] = {
153         parent_dir: scripts
154         for parent_dir, scripts in grouped_by_dir.items()
155         if os.path.normcase(parent_dir) not in not_warn_dirs
156     }
157     if not warn_for:
158         return None
159
160     # Format a message
161     msg_lines = []
162     for parent_dir, dir_scripts in warn_for.items():
163         sorted_scripts: List[str] = sorted(dir_scripts)
164         if len(sorted_scripts) == 1:
165             start_text = "script {} is".format(sorted_scripts[0])
166         else:
167             start_text = "scripts {} are".format(
168                 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
169             )
170
171         msg_lines.append(
172             "The {} installed in '{}' which is not on PATH.".format(
173                 start_text, parent_dir
174             )
175         )
176
177     last_line_fmt = (
178         "Consider adding {} to PATH or, if you prefer "
179         "to suppress this warning, use --no-warn-script-location."
180     )
181     if len(msg_lines) == 1:
182         msg_lines.append(last_line_fmt.format("this directory"))
183     else:
184         msg_lines.append(last_line_fmt.format("these directories"))
185
186     # Add a note if any directory starts with ~
187     warn_for_tilde = any(
188         i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
189     )
190     if warn_for_tilde:
191         tilde_warning_msg = (
192             "NOTE: The current PATH contains path(s) starting with `~`, "
193             "which may not be expanded by all applications."
194         )
195         msg_lines.append(tilde_warning_msg)
196
197     # Returns the formatted multiline message
198     return "\n".join(msg_lines)
199
200
201 def _normalized_outrows(
202     outrows: Iterable[InstalledCSVRow],
203 ) -> List[Tuple[str, str, str]]:
204     """Normalize the given rows of a RECORD file.
205
206     Items in each row are converted into str. Rows are then sorted to make
207     the value more predictable for tests.
208
209     Each row is a 3-tuple (path, hash, size) and corresponds to a record of
210     a RECORD file (see PEP 376 and PEP 427 for details).  For the rows
211     passed to this function, the size can be an integer as an int or string,
212     or the empty string.
213     """
214     # Normally, there should only be one row per path, in which case the
215     # second and third elements don't come into play when sorting.
216     # However, in cases in the wild where a path might happen to occur twice,
217     # we don't want the sort operation to trigger an error (but still want
218     # determinism).  Since the third element can be an int or string, we
219     # coerce each element to a string to avoid a TypeError in this case.
220     # For additional background, see--
221     # https://github.com/pypa/pip/issues/5868
222     return sorted(
223         (record_path, hash_, str(size)) for record_path, hash_, size in outrows
224     )
225
226
227 def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
228     return os.path.join(lib_dir, record_path)
229
230
231 def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
232     # On Windows, do not handle relative paths if they belong to different
233     # logical disks
234     if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
235         path = os.path.relpath(path, lib_dir)
236
237     path = path.replace(os.path.sep, "/")
238     return cast("RecordPath", path)
239
240
241 def get_csv_rows_for_installed(
242     old_csv_rows: List[List[str]],
243     installed: Dict[RecordPath, RecordPath],
244     changed: Set[RecordPath],
245     generated: List[str],
246     lib_dir: str,
247 ) -> List[InstalledCSVRow]:
248     """
249     :param installed: A map from archive RECORD path to installation RECORD
250         path.
251     """
252     installed_rows: List[InstalledCSVRow] = []
253     for row in old_csv_rows:
254         if len(row) > 3:
255             logger.warning("RECORD line has more than three elements: %s", row)
256         old_record_path = cast("RecordPath", row[0])
257         new_record_path = installed.pop(old_record_path, old_record_path)
258         if new_record_path in changed:
259             digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
260         else:
261             digest = row[1] if len(row) > 1 else ""
262             length = row[2] if len(row) > 2 else ""
263         installed_rows.append((new_record_path, digest, length))
264     for f in generated:
265         path = _fs_to_record_path(f, lib_dir)
266         digest, length = rehash(f)
267         installed_rows.append((path, digest, length))
268     for installed_record_path in installed.values():
269         installed_rows.append((installed_record_path, "", ""))
270     return installed_rows
271
272
273 def get_console_script_specs(console: Dict[str, str]) -> List[str]:
274     """
275     Given the mapping from entrypoint name to callable, return the relevant
276     console script specs.
277     """
278     # Don't mutate caller's version
279     console = console.copy()
280
281     scripts_to_generate = []
282
283     # Special case pip and setuptools to generate versioned wrappers
284     #
285     # The issue is that some projects (specifically, pip and setuptools) use
286     # code in setup.py to create "versioned" entry points - pip2.7 on Python
287     # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
288     # the wheel metadata at build time, and so if the wheel is installed with
289     # a *different* version of Python the entry points will be wrong. The
290     # correct fix for this is to enhance the metadata to be able to describe
291     # such versioned entry points, but that won't happen till Metadata 2.0 is
292     # available.
293     # In the meantime, projects using versioned entry points will either have
294     # incorrect versioned entry points, or they will not be able to distribute
295     # "universal" wheels (i.e., they will need a wheel per Python version).
296     #
297     # Because setuptools and pip are bundled with _ensurepip and virtualenv,
298     # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
299     # override the versioned entry points in the wheel and generate the
300     # correct ones. This code is purely a short-term measure until Metadata 2.0
301     # is available.
302     #
303     # To add the level of hack in this section of code, in order to support
304     # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
305     # variable which will control which version scripts get installed.
306     #
307     # ENSUREPIP_OPTIONS=altinstall
308     #   - Only pipX.Y and easy_install-X.Y will be generated and installed
309     # ENSUREPIP_OPTIONS=install
310     #   - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
311     #     that this option is technically if ENSUREPIP_OPTIONS is set and is
312     #     not altinstall
313     # DEFAULT
314     #   - The default behavior is to install pip, pipX, pipX.Y, easy_install
315     #     and easy_install-X.Y.
316     pip_script = console.pop("pip", None)
317     if pip_script:
318         if "ENSUREPIP_OPTIONS" not in os.environ:
319             scripts_to_generate.append("pip = " + pip_script)
320
321         if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
322             scripts_to_generate.append(
323                 "pip{} = {}".format(sys.version_info[0], pip_script)
324             )
325
326         scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
327         # Delete any other versioned pip entry points
328         pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
329         for k in pip_ep:
330             del console[k]
331     easy_install_script = console.pop("easy_install", None)
332     if easy_install_script:
333         if "ENSUREPIP_OPTIONS" not in os.environ:
334             scripts_to_generate.append("easy_install = " + easy_install_script)
335
336         scripts_to_generate.append(
337             "easy_install-{} = {}".format(
338                 get_major_minor_version(), easy_install_script
339             )
340         )
341         # Delete any other versioned easy_install entry points
342         easy_install_ep = [
343             k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
344         ]
345         for k in easy_install_ep:
346             del console[k]
347
348     # Generate the console entry points specified in the wheel
349     scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
350
351     return scripts_to_generate
352
353
354 class ZipBackedFile:
355     def __init__(
356         self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
357     ) -> None:
358         self.src_record_path = src_record_path
359         self.dest_path = dest_path
360         self._zip_file = zip_file
361         self.changed = False
362
363     def _getinfo(self) -> ZipInfo:
364         return self._zip_file.getinfo(self.src_record_path)
365
366     def save(self) -> None:
367         # directory creation is lazy and after file filtering
368         # to ensure we don't install empty dirs; empty dirs can't be
369         # uninstalled.
370         parent_dir = os.path.dirname(self.dest_path)
371         ensure_dir(parent_dir)
372
373         # When we open the output file below, any existing file is truncated
374         # before we start writing the new contents. This is fine in most
375         # cases, but can cause a segfault if pip has loaded a shared
376         # object (e.g. from pyopenssl through its vendored urllib3)
377         # Since the shared object is mmap'd an attempt to call a
378         # symbol in it will then cause a segfault. Unlinking the file
379         # allows writing of new contents while allowing the process to
380         # continue to use the old copy.
381         if os.path.exists(self.dest_path):
382             os.unlink(self.dest_path)
383
384         zipinfo = self._getinfo()
385
386         with self._zip_file.open(zipinfo) as f:
387             with open(self.dest_path, "wb") as dest:
388                 shutil.copyfileobj(f, dest)
389
390         if zip_item_is_executable(zipinfo):
391             set_extracted_file_to_default_mode_plus_executable(self.dest_path)
392
393
394 class ScriptFile:
395     def __init__(self, file: "File") -> None:
396         self._file = file
397         self.src_record_path = self._file.src_record_path
398         self.dest_path = self._file.dest_path
399         self.changed = False
400
401     def save(self) -> None:
402         self._file.save()
403         self.changed = fix_script(self.dest_path)
404
405
406 class MissingCallableSuffix(InstallationError):
407     def __init__(self, entry_point: str) -> None:
408         super().__init__(
409             "Invalid script entry point: {} - A callable "
410             "suffix is required. Cf https://packaging.python.org/"
411             "specifications/entry-points/#use-for-scripts for more "
412             "information.".format(entry_point)
413         )
414
415
416 def _raise_for_invalid_entrypoint(specification: str) -> None:
417     entry = get_export_entry(specification)
418     if entry is not None and entry.suffix is None:
419         raise MissingCallableSuffix(str(entry))
420
421
422 class PipScriptMaker(ScriptMaker):
423     def make(
424         self, specification: str, options: Optional[Dict[str, Any]] = None
425     ) -> List[str]:
426         _raise_for_invalid_entrypoint(specification)
427         return super().make(specification, options)
428
429
430 def _install_wheel(
431     name: str,
432     wheel_zip: ZipFile,
433     wheel_path: str,
434     scheme: Scheme,
435     pycompile: bool = True,
436     warn_script_location: bool = True,
437     direct_url: Optional[DirectUrl] = None,
438     requested: bool = False,
439 ) -> None:
440     """Install a wheel.
441
442     :param name: Name of the project to install
443     :param wheel_zip: open ZipFile for wheel being installed
444     :param scheme: Distutils scheme dictating the install directories
445     :param req_description: String used in place of the requirement, for
446         logging
447     :param pycompile: Whether to byte-compile installed Python files
448     :param warn_script_location: Whether to check that scripts are installed
449         into a directory on PATH
450     :raises UnsupportedWheel:
451         * when the directory holds an unpacked wheel with incompatible
452           Wheel-Version
453         * when the .dist-info dir does not match the wheel
454     """
455     info_dir, metadata = parse_wheel(wheel_zip, name)
456
457     if wheel_root_is_purelib(metadata):
458         lib_dir = scheme.purelib
459     else:
460         lib_dir = scheme.platlib
461
462     # Record details of the files moved
463     #   installed = files copied from the wheel to the destination
464     #   changed = files changed while installing (scripts #! line typically)
465     #   generated = files newly generated during the install (script wrappers)
466     installed: Dict[RecordPath, RecordPath] = {}
467     changed: Set[RecordPath] = set()
468     generated: List[str] = []
469
470     def record_installed(
471         srcfile: RecordPath, destfile: str, modified: bool = False
472     ) -> None:
473         """Map archive RECORD paths to installation RECORD paths."""
474         newpath = _fs_to_record_path(destfile, lib_dir)
475         installed[srcfile] = newpath
476         if modified:
477             changed.add(newpath)
478
479     def is_dir_path(path: RecordPath) -> bool:
480         return path.endswith("/")
481
482     def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
483         if not is_within_directory(dest_dir_path, target_path):
484             message = (
485                 "The wheel {!r} has a file {!r} trying to install"
486                 " outside the target directory {!r}"
487             )
488             raise InstallationError(
489                 message.format(wheel_path, target_path, dest_dir_path)
490             )
491
492     def root_scheme_file_maker(
493         zip_file: ZipFile, dest: str
494     ) -> Callable[[RecordPath], "File"]:
495         def make_root_scheme_file(record_path: RecordPath) -> "File":
496             normed_path = os.path.normpath(record_path)
497             dest_path = os.path.join(dest, normed_path)
498             assert_no_path_traversal(dest, dest_path)
499             return ZipBackedFile(record_path, dest_path, zip_file)
500
501         return make_root_scheme_file
502
503     def data_scheme_file_maker(
504         zip_file: ZipFile, scheme: Scheme
505     ) -> Callable[[RecordPath], "File"]:
506         scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
507
508         def make_data_scheme_file(record_path: RecordPath) -> "File":
509             normed_path = os.path.normpath(record_path)
510             try:
511                 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
512             except ValueError:
513                 message = (
514                     "Unexpected file in {}: {!r}. .data directory contents"
515                     " should be named like: '<scheme key>/<path>'."
516                 ).format(wheel_path, record_path)
517                 raise InstallationError(message)
518
519             try:
520                 scheme_path = scheme_paths[scheme_key]
521             except KeyError:
522                 valid_scheme_keys = ", ".join(sorted(scheme_paths))
523                 message = (
524                     "Unknown scheme key used in {}: {} (for file {!r}). .data"
525                     " directory contents should be in subdirectories named"
526                     " with a valid scheme key ({})"
527                 ).format(wheel_path, scheme_key, record_path, valid_scheme_keys)
528                 raise InstallationError(message)
529
530             dest_path = os.path.join(scheme_path, dest_subpath)
531             assert_no_path_traversal(scheme_path, dest_path)
532             return ZipBackedFile(record_path, dest_path, zip_file)
533
534         return make_data_scheme_file
535
536     def is_data_scheme_path(path: RecordPath) -> bool:
537         return path.split("/", 1)[0].endswith(".data")
538
539     paths = cast(List[RecordPath], wheel_zip.namelist())
540     file_paths = filterfalse(is_dir_path, paths)
541     root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
542
543     make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
544     files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
545
546     def is_script_scheme_path(path: RecordPath) -> bool:
547         parts = path.split("/", 2)
548         return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
549
550     other_scheme_paths, script_scheme_paths = partition(
551         is_script_scheme_path, data_scheme_paths
552     )
553
554     make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
555     other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
556     files = chain(files, other_scheme_files)
557
558     # Get the defined entry points
559     distribution = get_wheel_distribution(
560         FilesystemWheel(wheel_path),
561         canonicalize_name(name),
562     )
563     console, gui = get_entrypoints(distribution)
564
565     def is_entrypoint_wrapper(file: "File") -> bool:
566         # EP, EP.exe and EP-script.py are scripts generated for
567         # entry point EP by setuptools
568         path = file.dest_path
569         name = os.path.basename(path)
570         if name.lower().endswith(".exe"):
571             matchname = name[:-4]
572         elif name.lower().endswith("-script.py"):
573             matchname = name[:-10]
574         elif name.lower().endswith(".pya"):
575             matchname = name[:-4]
576         else:
577             matchname = name
578         # Ignore setuptools-generated scripts
579         return matchname in console or matchname in gui
580
581     script_scheme_files: Iterator[File] = map(
582         make_data_scheme_file, script_scheme_paths
583     )
584     script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
585     script_scheme_files = map(ScriptFile, script_scheme_files)
586     files = chain(files, script_scheme_files)
587
588     for file in files:
589         file.save()
590         record_installed(file.src_record_path, file.dest_path, file.changed)
591
592     def pyc_source_file_paths() -> Generator[str, None, None]:
593         # We de-duplicate installation paths, since there can be overlap (e.g.
594         # file in .data maps to same location as file in wheel root).
595         # Sorting installation paths makes it easier to reproduce and debug
596         # issues related to permissions on existing files.
597         for installed_path in sorted(set(installed.values())):
598             full_installed_path = os.path.join(lib_dir, installed_path)
599             if not os.path.isfile(full_installed_path):
600                 continue
601             if not full_installed_path.endswith(".py"):
602                 continue
603             yield full_installed_path
604
605     def pyc_output_path(path: str) -> str:
606         """Return the path the pyc file would have been written to."""
607         return importlib.util.cache_from_source(path)
608
609     # Compile all of the pyc files for the installed files
610     if pycompile:
611         with captured_stdout() as stdout:
612             with warnings.catch_warnings():
613                 warnings.filterwarnings("ignore")
614                 for path in pyc_source_file_paths():
615                     success = compileall.compile_file(path, force=True, quiet=True)
616                     if success:
617                         pyc_path = pyc_output_path(path)
618                         assert os.path.exists(pyc_path)
619                         pyc_record_path = cast(
620                             "RecordPath", pyc_path.replace(os.path.sep, "/")
621                         )
622                         record_installed(pyc_record_path, pyc_path)
623         logger.debug(stdout.getvalue())
624
625     maker = PipScriptMaker(None, scheme.scripts)
626
627     # Ensure old scripts are overwritten.
628     # See https://github.com/pypa/pip/issues/1800
629     maker.clobber = True
630
631     # Ensure we don't generate any variants for scripts because this is almost
632     # never what somebody wants.
633     # See https://bitbucket.org/pypa/distlib/issue/35/
634     maker.variants = {""}
635
636     # This is required because otherwise distlib creates scripts that are not
637     # executable.
638     # See https://bitbucket.org/pypa/distlib/issue/32/
639     maker.set_mode = True
640
641     # Generate the console and GUI entry points specified in the wheel
642     scripts_to_generate = get_console_script_specs(console)
643
644     gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
645
646     generated_console_scripts = maker.make_multiple(scripts_to_generate)
647     generated.extend(generated_console_scripts)
648
649     generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
650
651     if warn_script_location:
652         msg = message_about_scripts_not_on_PATH(generated_console_scripts)
653         if msg is not None:
654             logger.warning(msg)
655
656     generated_file_mode = 0o666 & ~current_umask()
657
658     @contextlib.contextmanager
659     def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
660         with adjacent_tmp_file(path, **kwargs) as f:
661             yield f
662         os.chmod(f.name, generated_file_mode)
663         replace(f.name, path)
664
665     dest_info_dir = os.path.join(lib_dir, info_dir)
666
667     # Record pip as the installer
668     installer_path = os.path.join(dest_info_dir, "INSTALLER")
669     with _generate_file(installer_path) as installer_file:
670         installer_file.write(b"pip\n")
671     generated.append(installer_path)
672
673     # Record the PEP 610 direct URL reference
674     if direct_url is not None:
675         direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
676         with _generate_file(direct_url_path) as direct_url_file:
677             direct_url_file.write(direct_url.to_json().encode("utf-8"))
678         generated.append(direct_url_path)
679
680     # Record the REQUESTED file
681     if requested:
682         requested_path = os.path.join(dest_info_dir, "REQUESTED")
683         with open(requested_path, "wb"):
684             pass
685         generated.append(requested_path)
686
687     record_text = distribution.read_text("RECORD")
688     record_rows = list(csv.reader(record_text.splitlines()))
689
690     rows = get_csv_rows_for_installed(
691         record_rows,
692         installed=installed,
693         changed=changed,
694         generated=generated,
695         lib_dir=lib_dir,
696     )
697
698     # Record details of all files installed
699     record_path = os.path.join(dest_info_dir, "RECORD")
700
701     with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
702         # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
703         # "writer" has incompatible type "BinaryIO"; expected "_Writer"
704         writer = csv.writer(cast("IO[str]", record_file))
705         writer.writerows(_normalized_outrows(rows))
706
707
708 @contextlib.contextmanager
709 def req_error_context(req_description: str) -> Generator[None, None, None]:
710     try:
711         yield
712     except InstallationError as e:
713         message = "For req: {}. {}".format(req_description, e.args[0])
714         raise InstallationError(message) from e
715
716
717 def install_wheel(
718     name: str,
719     wheel_path: str,
720     scheme: Scheme,
721     req_description: str,
722     pycompile: bool = True,
723     warn_script_location: bool = True,
724     direct_url: Optional[DirectUrl] = None,
725     requested: bool = False,
726 ) -> None:
727     with ZipFile(wheel_path, allowZip64=True) as z:
728         with req_error_context(req_description):
729             _install_wheel(
730                 name=name,
731                 wheel_zip=z,
732                 wheel_path=wheel_path,
733                 scheme=scheme,
734                 pycompile=pycompile,
735                 warn_script_location=warn_script_location,
736                 direct_url=direct_url,
737                 requested=requested,
738             )