1 """Support for installing and building the "wheel" binary package format.
15 from base64 import urlsafe_b64encode
16 from email.message import Message
17 from itertools import chain, filterfalse, starmap
37 from zipfile import ZipFile, ZipInfo
39 from pip._vendor.distlib.scripts import ScriptMaker
40 from pip._vendor.distlib.util import get_export_entry
41 from pip._vendor.packaging.utils import canonicalize_name
43 from pip._internal.exceptions import InstallationError
44 from pip._internal.locations import get_major_minor_version
45 from pip._internal.metadata import (
48 get_wheel_distribution,
50 from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
51 from pip._internal.models.scheme import SCHEME_KEYS, Scheme
52 from pip._internal.utils.filesystem import adjacent_tmp_file, replace
53 from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition
54 from pip._internal.utils.unpacking import (
57 set_extracted_file_to_default_mode_plus_executable,
58 zip_item_is_executable,
60 from pip._internal.utils.wheel import parse_wheel
63 from typing import Protocol
66 src_record_path: "RecordPath"
70 def save(self) -> None:
74 logger = logging.getLogger(__name__)
76 RecordPath = NewType("RecordPath", str)
77 InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
80 def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
81 """Return (encoded_digest, length) for path using hashlib.sha256()"""
82 h, length = hash_file(path, blocksize)
83 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
84 return (digest, str(length))
87 def csv_io_kwargs(mode: str) -> Dict[str, Any]:
88 """Return keyword arguments to properly open a CSV file
91 return {"mode": mode, "newline": "", "encoding": "utf-8"}
94 def fix_script(path: str) -> bool:
95 """Replace #!python with #!/path/to/python
96 Return True if file was changed.
98 # XXX RECORD hashes will need to be updated
99 assert os.path.isfile(path)
101 with open(path, "rb") as script:
102 firstline = script.readline()
103 if not firstline.startswith(b"#!python"):
105 exename = sys.executable.encode(sys.getfilesystemencoding())
106 firstline = b"#!" + exename + os.linesep.encode("ascii")
108 with open(path, "wb") as script:
109 script.write(firstline)
114 def wheel_root_is_purelib(metadata: Message) -> bool:
115 return metadata.get("Root-Is-Purelib", "").lower() == "true"
118 def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]:
121 for entry_point in dist.iter_entry_points():
122 if entry_point.group == "console_scripts":
123 console_scripts[entry_point.name] = entry_point.value
124 elif entry_point.group == "gui_scripts":
125 gui_scripts[entry_point.name] = entry_point.value
126 return console_scripts, gui_scripts
129 def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
130 """Determine if any scripts are not on PATH and format a warning.
131 Returns a warning message if one or more scripts are not on PATH,
137 # Group scripts by the path they were installed in
138 grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
139 for destfile in scripts:
140 parent_dir = os.path.dirname(destfile)
141 script_name = os.path.basename(destfile)
142 grouped_by_dir[parent_dir].add(script_name)
144 # We don't want to warn for directories that are on PATH.
146 os.path.normcase(i).rstrip(os.sep)
147 for i in os.environ.get("PATH", "").split(os.pathsep)
149 # If an executable sits with sys.executable, we don't warn for it.
150 # This covers the case of venv invocations without activating the venv.
151 not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
152 warn_for: Dict[str, Set[str]] = {
154 for parent_dir, scripts in grouped_by_dir.items()
155 if os.path.normcase(parent_dir) not in not_warn_dirs
162 for parent_dir, dir_scripts in warn_for.items():
163 sorted_scripts: List[str] = sorted(dir_scripts)
164 if len(sorted_scripts) == 1:
165 start_text = "script {} is".format(sorted_scripts[0])
167 start_text = "scripts {} are".format(
168 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
172 "The {} installed in '{}' which is not on PATH.".format(
173 start_text, parent_dir
178 "Consider adding {} to PATH or, if you prefer "
179 "to suppress this warning, use --no-warn-script-location."
181 if len(msg_lines) == 1:
182 msg_lines.append(last_line_fmt.format("this directory"))
184 msg_lines.append(last_line_fmt.format("these directories"))
186 # Add a note if any directory starts with ~
187 warn_for_tilde = any(
188 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
191 tilde_warning_msg = (
192 "NOTE: The current PATH contains path(s) starting with `~`, "
193 "which may not be expanded by all applications."
195 msg_lines.append(tilde_warning_msg)
197 # Returns the formatted multiline message
198 return "\n".join(msg_lines)
201 def _normalized_outrows(
202 outrows: Iterable[InstalledCSVRow],
203 ) -> List[Tuple[str, str, str]]:
204 """Normalize the given rows of a RECORD file.
206 Items in each row are converted into str. Rows are then sorted to make
207 the value more predictable for tests.
209 Each row is a 3-tuple (path, hash, size) and corresponds to a record of
210 a RECORD file (see PEP 376 and PEP 427 for details). For the rows
211 passed to this function, the size can be an integer as an int or string,
214 # Normally, there should only be one row per path, in which case the
215 # second and third elements don't come into play when sorting.
216 # However, in cases in the wild where a path might happen to occur twice,
217 # we don't want the sort operation to trigger an error (but still want
218 # determinism). Since the third element can be an int or string, we
219 # coerce each element to a string to avoid a TypeError in this case.
220 # For additional background, see--
221 # https://github.com/pypa/pip/issues/5868
223 (record_path, hash_, str(size)) for record_path, hash_, size in outrows
227 def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
228 return os.path.join(lib_dir, record_path)
231 def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
232 # On Windows, do not handle relative paths if they belong to different
234 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
235 path = os.path.relpath(path, lib_dir)
237 path = path.replace(os.path.sep, "/")
238 return cast("RecordPath", path)
241 def get_csv_rows_for_installed(
242 old_csv_rows: List[List[str]],
243 installed: Dict[RecordPath, RecordPath],
244 changed: Set[RecordPath],
245 generated: List[str],
247 ) -> List[InstalledCSVRow]:
249 :param installed: A map from archive RECORD path to installation RECORD
252 installed_rows: List[InstalledCSVRow] = []
253 for row in old_csv_rows:
255 logger.warning("RECORD line has more than three elements: %s", row)
256 old_record_path = cast("RecordPath", row[0])
257 new_record_path = installed.pop(old_record_path, old_record_path)
258 if new_record_path in changed:
259 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
261 digest = row[1] if len(row) > 1 else ""
262 length = row[2] if len(row) > 2 else ""
263 installed_rows.append((new_record_path, digest, length))
265 path = _fs_to_record_path(f, lib_dir)
266 digest, length = rehash(f)
267 installed_rows.append((path, digest, length))
268 for installed_record_path in installed.values():
269 installed_rows.append((installed_record_path, "", ""))
270 return installed_rows
273 def get_console_script_specs(console: Dict[str, str]) -> List[str]:
275 Given the mapping from entrypoint name to callable, return the relevant
276 console script specs.
278 # Don't mutate caller's version
279 console = console.copy()
281 scripts_to_generate = []
283 # Special case pip and setuptools to generate versioned wrappers
285 # The issue is that some projects (specifically, pip and setuptools) use
286 # code in setup.py to create "versioned" entry points - pip2.7 on Python
287 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
288 # the wheel metadata at build time, and so if the wheel is installed with
289 # a *different* version of Python the entry points will be wrong. The
290 # correct fix for this is to enhance the metadata to be able to describe
291 # such versioned entry points, but that won't happen till Metadata 2.0 is
293 # In the meantime, projects using versioned entry points will either have
294 # incorrect versioned entry points, or they will not be able to distribute
295 # "universal" wheels (i.e., they will need a wheel per Python version).
297 # Because setuptools and pip are bundled with _ensurepip and virtualenv,
298 # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
299 # override the versioned entry points in the wheel and generate the
300 # correct ones. This code is purely a short-term measure until Metadata 2.0
303 # To add the level of hack in this section of code, in order to support
304 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
305 # variable which will control which version scripts get installed.
307 # ENSUREPIP_OPTIONS=altinstall
308 # - Only pipX.Y and easy_install-X.Y will be generated and installed
309 # ENSUREPIP_OPTIONS=install
310 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
311 # that this option is technically if ENSUREPIP_OPTIONS is set and is
314 # - The default behavior is to install pip, pipX, pipX.Y, easy_install
315 # and easy_install-X.Y.
316 pip_script = console.pop("pip", None)
318 if "ENSUREPIP_OPTIONS" not in os.environ:
319 scripts_to_generate.append("pip = " + pip_script)
321 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
322 scripts_to_generate.append(
323 "pip{} = {}".format(sys.version_info[0], pip_script)
326 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
327 # Delete any other versioned pip entry points
328 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
331 easy_install_script = console.pop("easy_install", None)
332 if easy_install_script:
333 if "ENSUREPIP_OPTIONS" not in os.environ:
334 scripts_to_generate.append("easy_install = " + easy_install_script)
336 scripts_to_generate.append(
337 "easy_install-{} = {}".format(
338 get_major_minor_version(), easy_install_script
341 # Delete any other versioned easy_install entry points
343 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
345 for k in easy_install_ep:
348 # Generate the console entry points specified in the wheel
349 scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
351 return scripts_to_generate
356 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
358 self.src_record_path = src_record_path
359 self.dest_path = dest_path
360 self._zip_file = zip_file
363 def _getinfo(self) -> ZipInfo:
364 return self._zip_file.getinfo(self.src_record_path)
366 def save(self) -> None:
367 # directory creation is lazy and after file filtering
368 # to ensure we don't install empty dirs; empty dirs can't be
370 parent_dir = os.path.dirname(self.dest_path)
371 ensure_dir(parent_dir)
373 # When we open the output file below, any existing file is truncated
374 # before we start writing the new contents. This is fine in most
375 # cases, but can cause a segfault if pip has loaded a shared
376 # object (e.g. from pyopenssl through its vendored urllib3)
377 # Since the shared object is mmap'd an attempt to call a
378 # symbol in it will then cause a segfault. Unlinking the file
379 # allows writing of new contents while allowing the process to
380 # continue to use the old copy.
381 if os.path.exists(self.dest_path):
382 os.unlink(self.dest_path)
384 zipinfo = self._getinfo()
386 with self._zip_file.open(zipinfo) as f:
387 with open(self.dest_path, "wb") as dest:
388 shutil.copyfileobj(f, dest)
390 if zip_item_is_executable(zipinfo):
391 set_extracted_file_to_default_mode_plus_executable(self.dest_path)
395 def __init__(self, file: "File") -> None:
397 self.src_record_path = self._file.src_record_path
398 self.dest_path = self._file.dest_path
401 def save(self) -> None:
403 self.changed = fix_script(self.dest_path)
406 class MissingCallableSuffix(InstallationError):
407 def __init__(self, entry_point: str) -> None:
409 "Invalid script entry point: {} - A callable "
410 "suffix is required. Cf https://packaging.python.org/"
411 "specifications/entry-points/#use-for-scripts for more "
412 "information.".format(entry_point)
416 def _raise_for_invalid_entrypoint(specification: str) -> None:
417 entry = get_export_entry(specification)
418 if entry is not None and entry.suffix is None:
419 raise MissingCallableSuffix(str(entry))
422 class PipScriptMaker(ScriptMaker):
424 self, specification: str, options: Optional[Dict[str, Any]] = None
426 _raise_for_invalid_entrypoint(specification)
427 return super().make(specification, options)
435 pycompile: bool = True,
436 warn_script_location: bool = True,
437 direct_url: Optional[DirectUrl] = None,
438 requested: bool = False,
442 :param name: Name of the project to install
443 :param wheel_zip: open ZipFile for wheel being installed
444 :param scheme: Distutils scheme dictating the install directories
445 :param req_description: String used in place of the requirement, for
447 :param pycompile: Whether to byte-compile installed Python files
448 :param warn_script_location: Whether to check that scripts are installed
449 into a directory on PATH
450 :raises UnsupportedWheel:
451 * when the directory holds an unpacked wheel with incompatible
453 * when the .dist-info dir does not match the wheel
455 info_dir, metadata = parse_wheel(wheel_zip, name)
457 if wheel_root_is_purelib(metadata):
458 lib_dir = scheme.purelib
460 lib_dir = scheme.platlib
462 # Record details of the files moved
463 # installed = files copied from the wheel to the destination
464 # changed = files changed while installing (scripts #! line typically)
465 # generated = files newly generated during the install (script wrappers)
466 installed: Dict[RecordPath, RecordPath] = {}
467 changed: Set[RecordPath] = set()
468 generated: List[str] = []
470 def record_installed(
471 srcfile: RecordPath, destfile: str, modified: bool = False
473 """Map archive RECORD paths to installation RECORD paths."""
474 newpath = _fs_to_record_path(destfile, lib_dir)
475 installed[srcfile] = newpath
479 def is_dir_path(path: RecordPath) -> bool:
480 return path.endswith("/")
482 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
483 if not is_within_directory(dest_dir_path, target_path):
485 "The wheel {!r} has a file {!r} trying to install"
486 " outside the target directory {!r}"
488 raise InstallationError(
489 message.format(wheel_path, target_path, dest_dir_path)
492 def root_scheme_file_maker(
493 zip_file: ZipFile, dest: str
494 ) -> Callable[[RecordPath], "File"]:
495 def make_root_scheme_file(record_path: RecordPath) -> "File":
496 normed_path = os.path.normpath(record_path)
497 dest_path = os.path.join(dest, normed_path)
498 assert_no_path_traversal(dest, dest_path)
499 return ZipBackedFile(record_path, dest_path, zip_file)
501 return make_root_scheme_file
503 def data_scheme_file_maker(
504 zip_file: ZipFile, scheme: Scheme
505 ) -> Callable[[RecordPath], "File"]:
506 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
508 def make_data_scheme_file(record_path: RecordPath) -> "File":
509 normed_path = os.path.normpath(record_path)
511 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
514 "Unexpected file in {}: {!r}. .data directory contents"
515 " should be named like: '<scheme key>/<path>'."
516 ).format(wheel_path, record_path)
517 raise InstallationError(message)
520 scheme_path = scheme_paths[scheme_key]
522 valid_scheme_keys = ", ".join(sorted(scheme_paths))
524 "Unknown scheme key used in {}: {} (for file {!r}). .data"
525 " directory contents should be in subdirectories named"
526 " with a valid scheme key ({})"
527 ).format(wheel_path, scheme_key, record_path, valid_scheme_keys)
528 raise InstallationError(message)
530 dest_path = os.path.join(scheme_path, dest_subpath)
531 assert_no_path_traversal(scheme_path, dest_path)
532 return ZipBackedFile(record_path, dest_path, zip_file)
534 return make_data_scheme_file
536 def is_data_scheme_path(path: RecordPath) -> bool:
537 return path.split("/", 1)[0].endswith(".data")
539 paths = cast(List[RecordPath], wheel_zip.namelist())
540 file_paths = filterfalse(is_dir_path, paths)
541 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
543 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
544 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
546 def is_script_scheme_path(path: RecordPath) -> bool:
547 parts = path.split("/", 2)
548 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
550 other_scheme_paths, script_scheme_paths = partition(
551 is_script_scheme_path, data_scheme_paths
554 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
555 other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
556 files = chain(files, other_scheme_files)
558 # Get the defined entry points
559 distribution = get_wheel_distribution(
560 FilesystemWheel(wheel_path),
561 canonicalize_name(name),
563 console, gui = get_entrypoints(distribution)
565 def is_entrypoint_wrapper(file: "File") -> bool:
566 # EP, EP.exe and EP-script.py are scripts generated for
567 # entry point EP by setuptools
568 path = file.dest_path
569 name = os.path.basename(path)
570 if name.lower().endswith(".exe"):
571 matchname = name[:-4]
572 elif name.lower().endswith("-script.py"):
573 matchname = name[:-10]
574 elif name.lower().endswith(".pya"):
575 matchname = name[:-4]
578 # Ignore setuptools-generated scripts
579 return matchname in console or matchname in gui
581 script_scheme_files: Iterator[File] = map(
582 make_data_scheme_file, script_scheme_paths
584 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
585 script_scheme_files = map(ScriptFile, script_scheme_files)
586 files = chain(files, script_scheme_files)
590 record_installed(file.src_record_path, file.dest_path, file.changed)
592 def pyc_source_file_paths() -> Generator[str, None, None]:
593 # We de-duplicate installation paths, since there can be overlap (e.g.
594 # file in .data maps to same location as file in wheel root).
595 # Sorting installation paths makes it easier to reproduce and debug
596 # issues related to permissions on existing files.
597 for installed_path in sorted(set(installed.values())):
598 full_installed_path = os.path.join(lib_dir, installed_path)
599 if not os.path.isfile(full_installed_path):
601 if not full_installed_path.endswith(".py"):
603 yield full_installed_path
605 def pyc_output_path(path: str) -> str:
606 """Return the path the pyc file would have been written to."""
607 return importlib.util.cache_from_source(path)
609 # Compile all of the pyc files for the installed files
611 with captured_stdout() as stdout:
612 with warnings.catch_warnings():
613 warnings.filterwarnings("ignore")
614 for path in pyc_source_file_paths():
615 success = compileall.compile_file(path, force=True, quiet=True)
617 pyc_path = pyc_output_path(path)
618 assert os.path.exists(pyc_path)
619 pyc_record_path = cast(
620 "RecordPath", pyc_path.replace(os.path.sep, "/")
622 record_installed(pyc_record_path, pyc_path)
623 logger.debug(stdout.getvalue())
625 maker = PipScriptMaker(None, scheme.scripts)
627 # Ensure old scripts are overwritten.
628 # See https://github.com/pypa/pip/issues/1800
631 # Ensure we don't generate any variants for scripts because this is almost
632 # never what somebody wants.
633 # See https://bitbucket.org/pypa/distlib/issue/35/
634 maker.variants = {""}
636 # This is required because otherwise distlib creates scripts that are not
638 # See https://bitbucket.org/pypa/distlib/issue/32/
639 maker.set_mode = True
641 # Generate the console and GUI entry points specified in the wheel
642 scripts_to_generate = get_console_script_specs(console)
644 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
646 generated_console_scripts = maker.make_multiple(scripts_to_generate)
647 generated.extend(generated_console_scripts)
649 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
651 if warn_script_location:
652 msg = message_about_scripts_not_on_PATH(generated_console_scripts)
656 generated_file_mode = 0o666 & ~current_umask()
658 @contextlib.contextmanager
659 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
660 with adjacent_tmp_file(path, **kwargs) as f:
662 os.chmod(f.name, generated_file_mode)
663 replace(f.name, path)
665 dest_info_dir = os.path.join(lib_dir, info_dir)
667 # Record pip as the installer
668 installer_path = os.path.join(dest_info_dir, "INSTALLER")
669 with _generate_file(installer_path) as installer_file:
670 installer_file.write(b"pip\n")
671 generated.append(installer_path)
673 # Record the PEP 610 direct URL reference
674 if direct_url is not None:
675 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
676 with _generate_file(direct_url_path) as direct_url_file:
677 direct_url_file.write(direct_url.to_json().encode("utf-8"))
678 generated.append(direct_url_path)
680 # Record the REQUESTED file
682 requested_path = os.path.join(dest_info_dir, "REQUESTED")
683 with open(requested_path, "wb"):
685 generated.append(requested_path)
687 record_text = distribution.read_text("RECORD")
688 record_rows = list(csv.reader(record_text.splitlines()))
690 rows = get_csv_rows_for_installed(
698 # Record details of all files installed
699 record_path = os.path.join(dest_info_dir, "RECORD")
701 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
702 # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
703 # "writer" has incompatible type "BinaryIO"; expected "_Writer"
704 writer = csv.writer(cast("IO[str]", record_file))
705 writer.writerows(_normalized_outrows(rows))
708 @contextlib.contextmanager
709 def req_error_context(req_description: str) -> Generator[None, None, None]:
712 except InstallationError as e:
713 message = "For req: {}. {}".format(req_description, e.args[0])
714 raise InstallationError(message) from e
721 req_description: str,
722 pycompile: bool = True,
723 warn_script_location: bool = True,
724 direct_url: Optional[DirectUrl] = None,
725 requested: bool = False,
727 with ZipFile(wheel_path, allowZip64=True) as z:
728 with req_error_context(req_description):
732 wheel_path=wheel_path,
735 warn_script_location=warn_script_location,
736 direct_url=direct_url,