a12e2c75d132c73b556702159d535d15ed9abfd2
[SubU] /
1 import os
2 import pathlib
3 import tempfile
4 import functools
5 import contextlib
6 import types
7 import importlib
8
9 from typing import Union, Optional
10 from .abc import ResourceReader, Traversable
11
12 from ._compat import wrap_spec
13
14 Package = Union[types.ModuleType, str]
15
16
17 def files(package):
18     # type: (Package) -> Traversable
19     """
20     Get a Traversable resource from a package
21     """
22     return from_package(get_package(package))
23
24
25 def get_resource_reader(package):
26     # type: (types.ModuleType) -> Optional[ResourceReader]
27     """
28     Return the package's loader if it's a ResourceReader.
29     """
30     # We can't use
31     # a issubclass() check here because apparently abc.'s __subclasscheck__()
32     # hook wants to create a weak reference to the object, but
33     # zipimport.zipimporter does not support weak references, resulting in a
34     # TypeError.  That seems terrible.
35     spec = package.__spec__
36     reader = getattr(spec.loader, 'get_resource_reader', None)  # type: ignore
37     if reader is None:
38         return None
39     return reader(spec.name)  # type: ignore
40
41
42 def resolve(cand):
43     # type: (Package) -> types.ModuleType
44     return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
45
46
47 def get_package(package):
48     # type: (Package) -> types.ModuleType
49     """Take a package name or module object and return the module.
50
51     Raise an exception if the resolved module is not a package.
52     """
53     resolved = resolve(package)
54     if wrap_spec(resolved).submodule_search_locations is None:
55         raise TypeError(f'{package!r} is not a package')
56     return resolved
57
58
59 def from_package(package):
60     """
61     Return a Traversable object for the given package.
62
63     """
64     spec = wrap_spec(package)
65     reader = spec.loader.get_resource_reader(spec.name)
66     return reader.files()
67
68
69 @contextlib.contextmanager
70 def _tempfile(reader, suffix=''):
71     # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
72     # blocks due to the need to close the temporary file to work on Windows
73     # properly.
74     fd, raw_path = tempfile.mkstemp(suffix=suffix)
75     try:
76         try:
77             os.write(fd, reader())
78         finally:
79             os.close(fd)
80         del reader
81         yield pathlib.Path(raw_path)
82     finally:
83         try:
84             os.remove(raw_path)
85         except FileNotFoundError:
86             pass
87
88
89 @functools.singledispatch
90 def as_file(path):
91     """
92     Given a Traversable object, return that object as a
93     path on the local file system in a context manager.
94     """
95     return _tempfile(path.read_bytes, suffix=path.name)
96
97
98 @as_file.register(pathlib.Path)
99 @contextlib.contextmanager
100 def _(path):
101     """
102     Degenerate behavior for pathlib.Path objects.
103     """
104     yield path