f78e4838fb3a364fde4eddaf5d5b6b1557fdbe0b
[SubU] /
1 import io
2 import json
3 import logging
4 import os
5 import re
6 from contextlib import contextmanager
7 from textwrap import indent, wrap
8 from typing import Any, Dict, Iterator, List, Optional, Sequence, Union, cast
9
10 from .fastjsonschema_exceptions import JsonSchemaValueException
11
12 _logger = logging.getLogger(__name__)
13
14 _MESSAGE_REPLACEMENTS = {
15     "must be named by propertyName definition": "keys must be named by",
16     "one of contains definition": "at least one item that matches",
17     " same as const definition:": "",
18     "only specified items": "only items matching the definition",
19 }
20
21 _SKIP_DETAILS = (
22     "must not be empty",
23     "is always invalid",
24     "must not be there",
25 )
26
27 _NEED_DETAILS = {"anyOf", "oneOf", "anyOf", "contains", "propertyNames", "not", "items"}
28
29 _CAMEL_CASE_SPLITTER = re.compile(r"\W+|([A-Z][^A-Z\W]*)")
30 _IDENTIFIER = re.compile(r"^[\w_]+$", re.I)
31
32 _TOML_JARGON = {
33     "object": "table",
34     "property": "key",
35     "properties": "keys",
36     "property names": "keys",
37 }
38
39
40 class ValidationError(JsonSchemaValueException):
41     """Report violations of a given JSON schema.
42
43     This class extends :exc:`~fastjsonschema.JsonSchemaValueException`
44     by adding the following properties:
45
46     - ``summary``: an improved version of the ``JsonSchemaValueException`` error message
47       with only the necessary information)
48
49     - ``details``: more contextual information about the error like the failing schema
50       itself and the value that violates the schema.
51
52     Depending on the level of the verbosity of the ``logging`` configuration
53     the exception message will be only ``summary`` (default) or a combination of
54     ``summary`` and ``details`` (when the logging level is set to :obj:`logging.DEBUG`).
55     """
56
57     summary = ""
58     details = ""
59     _original_message = ""
60
61     @classmethod
62     def _from_jsonschema(cls, ex: JsonSchemaValueException):
63         formatter = _ErrorFormatting(ex)
64         obj = cls(str(formatter), ex.value, formatter.name, ex.definition, ex.rule)
65         debug_code = os.getenv("JSONSCHEMA_DEBUG_CODE_GENERATION", "false").lower()
66         if debug_code != "false":  # pragma: no cover
67             obj.__cause__, obj.__traceback__ = ex.__cause__, ex.__traceback__
68         obj._original_message = ex.message
69         obj.summary = formatter.summary
70         obj.details = formatter.details
71         return obj
72
73
74 @contextmanager
75 def detailed_errors():
76     try:
77         yield
78     except JsonSchemaValueException as ex:
79         raise ValidationError._from_jsonschema(ex) from None
80
81
82 class _ErrorFormatting:
83     def __init__(self, ex: JsonSchemaValueException):
84         self.ex = ex
85         self.name = f"`{self._simplify_name(ex.name)}`"
86         self._original_message = self.ex.message.replace(ex.name, self.name)
87         self._summary = ""
88         self._details = ""
89
90     def __str__(self) -> str:
91         if _logger.getEffectiveLevel() <= logging.DEBUG and self.details:
92             return f"{self.summary}\n\n{self.details}"
93
94         return self.summary
95
96     @property
97     def summary(self) -> str:
98         if not self._summary:
99             self._summary = self._expand_summary()
100
101         return self._summary
102
103     @property
104     def details(self) -> str:
105         if not self._details:
106             self._details = self._expand_details()
107
108         return self._details
109
110     def _simplify_name(self, name):
111         x = len("data.")
112         return name[x:] if name.startswith("data.") else name
113
114     def _expand_summary(self):
115         msg = self._original_message
116
117         for bad, repl in _MESSAGE_REPLACEMENTS.items():
118             msg = msg.replace(bad, repl)
119
120         if any(substring in msg for substring in _SKIP_DETAILS):
121             return msg
122
123         schema = self.ex.rule_definition
124         if self.ex.rule in _NEED_DETAILS and schema:
125             summary = _SummaryWriter(_TOML_JARGON)
126             return f"{msg}:\n\n{indent(summary(schema), '    ')}"
127
128         return msg
129
130     def _expand_details(self) -> str:
131         optional = []
132         desc_lines = self.ex.definition.pop("$$description", [])
133         desc = self.ex.definition.pop("description", None) or " ".join(desc_lines)
134         if desc:
135             description = "\n".join(
136                 wrap(
137                     desc,
138                     width=80,
139                     initial_indent="    ",
140                     subsequent_indent="    ",
141                     break_long_words=False,
142                 )
143             )
144             optional.append(f"DESCRIPTION:\n{description}")
145         schema = json.dumps(self.ex.definition, indent=4)
146         value = json.dumps(self.ex.value, indent=4)
147         defaults = [
148             f"GIVEN VALUE:\n{indent(value, '    ')}",
149             f"OFFENDING RULE: {self.ex.rule!r}",
150             f"DEFINITION:\n{indent(schema, '    ')}",
151         ]
152         return "\n\n".join(optional + defaults)
153
154
155 class _SummaryWriter:
156     _IGNORE = {"description", "default", "title", "examples"}
157
158     def __init__(self, jargon: Optional[Dict[str, str]] = None):
159         self.jargon: Dict[str, str] = jargon or {}
160         # Clarify confusing terms
161         self._terms = {
162             "anyOf": "at least one of the following",
163             "oneOf": "exactly one of the following",
164             "allOf": "all of the following",
165             "not": "(*NOT* the following)",
166             "prefixItems": f"{self._jargon('items')} (in order)",
167             "items": "items",
168             "contains": "contains at least one of",
169             "propertyNames": (
170                 f"non-predefined acceptable {self._jargon('property names')}"
171             ),
172             "patternProperties": f"{self._jargon('properties')} named via pattern",
173             "const": "predefined value",
174             "enum": "one of",
175         }
176         # Attributes that indicate that the definition is easy and can be done
177         # inline (e.g. string and number)
178         self._guess_inline_defs = [
179             "enum",
180             "const",
181             "maxLength",
182             "minLength",
183             "pattern",
184             "format",
185             "minimum",
186             "maximum",
187             "exclusiveMinimum",
188             "exclusiveMaximum",
189             "multipleOf",
190         ]
191
192     def _jargon(self, term: Union[str, List[str]]) -> Union[str, List[str]]:
193         if isinstance(term, list):
194             return [self.jargon.get(t, t) for t in term]
195         return self.jargon.get(term, term)
196
197     def __call__(
198         self,
199         schema: Union[dict, List[dict]],
200         prefix: str = "",
201         *,
202         _path: Sequence[str] = (),
203     ) -> str:
204         if isinstance(schema, list):
205             return self._handle_list(schema, prefix, _path)
206
207         filtered = self._filter_unecessary(schema, _path)
208         simple = self._handle_simple_dict(filtered, _path)
209         if simple:
210             return f"{prefix}{simple}"
211
212         child_prefix = self._child_prefix(prefix, "  ")
213         item_prefix = self._child_prefix(prefix, "- ")
214         indent = len(prefix) * " "
215         with io.StringIO() as buffer:
216             for i, (key, value) in enumerate(filtered.items()):
217                 child_path = [*_path, key]
218                 line_prefix = prefix if i == 0 else indent
219                 buffer.write(f"{line_prefix}{self._label(child_path)}:")
220                 # ^  just the first item should receive the complete prefix
221                 if isinstance(value, dict):
222                     filtered = self._filter_unecessary(value, child_path)
223                     simple = self._handle_simple_dict(filtered, child_path)
224                     buffer.write(
225                         f" {simple}"
226                         if simple
227                         else f"\n{self(value, child_prefix, _path=child_path)}"
228                     )
229                 elif isinstance(value, list) and (
230                     key != "type" or self._is_property(child_path)
231                 ):
232                     children = self._handle_list(value, item_prefix, child_path)
233                     sep = " " if children.startswith("[") else "\n"
234                     buffer.write(f"{sep}{children}")
235                 else:
236                     buffer.write(f" {self._value(value, child_path)}\n")
237             return buffer.getvalue()
238
239     def _is_unecessary(self, path: Sequence[str]) -> bool:
240         if self._is_property(path) or not path:  # empty path => instruction @ root
241             return False
242         key = path[-1]
243         return any(key.startswith(k) for k in "$_") or key in self._IGNORE
244
245     def _filter_unecessary(self, schema: dict, path: Sequence[str]):
246         return {
247             key: value
248             for key, value in schema.items()
249             if not self._is_unecessary([*path, key])
250         }
251
252     def _handle_simple_dict(self, value: dict, path: Sequence[str]) -> Optional[str]:
253         inline = any(p in value for p in self._guess_inline_defs)
254         simple = not any(isinstance(v, (list, dict)) for v in value.values())
255         if inline or simple:
256             return f"{{{', '.join(self._inline_attrs(value, path))}}}\n"
257         return None
258
259     def _handle_list(
260         self, schemas: list, prefix: str = "", path: Sequence[str] = ()
261     ) -> str:
262         if self._is_unecessary(path):
263             return ""
264
265         repr_ = repr(schemas)
266         if all(not isinstance(e, (dict, list)) for e in schemas) and len(repr_) < 60:
267             return f"{repr_}\n"
268
269         item_prefix = self._child_prefix(prefix, "- ")
270         return "".join(
271             self(v, item_prefix, _path=[*path, f"[{i}]"]) for i, v in enumerate(schemas)
272         )
273
274     def _is_property(self, path: Sequence[str]):
275         """Check if the given path can correspond to an arbitrarily named property"""
276         counter = 0
277         for key in path[-2::-1]:
278             if key not in {"properties", "patternProperties"}:
279                 break
280             counter += 1
281
282         # If the counter if even, the path correspond to a JSON Schema keyword
283         # otherwise it can be any arbitrary string naming a property
284         return counter % 2 == 1
285
286     def _label(self, path: Sequence[str]) -> str:
287         *parents, key = path
288         if not self._is_property(path):
289             norm_key = _separate_terms(key)
290             return self._terms.get(key) or " ".join(self._jargon(norm_key))
291
292         if parents[-1] == "patternProperties":
293             return f"(regex {key!r})"
294         return repr(key)  # property name
295
296     def _value(self, value: Any, path: Sequence[str]) -> str:
297         if path[-1] == "type" and not self._is_property(path):
298             type_ = self._jargon(value)
299             return (
300                 f"[{', '.join(type_)}]" if isinstance(value, list) else cast(str, type_)
301             )
302         return repr(value)
303
304     def _inline_attrs(self, schema: dict, path: Sequence[str]) -> Iterator[str]:
305         for key, value in schema.items():
306             child_path = [*path, key]
307             yield f"{self._label(child_path)}: {self._value(value, child_path)}"
308
309     def _child_prefix(self, parent_prefix: str, child_prefix: str) -> str:
310         return len(parent_prefix) * " " + child_prefix
311
312
313 def _separate_terms(word: str) -> List[str]:
314     """
315     >>> _separate_terms("FooBar-foo")
316     ['foo', 'bar', 'foo']
317     """
318     return [w.lower() for w in _CAMEL_CASE_SPLITTER.split(word) if w]