Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatools/__cerberus__/errors.py: 76%
301 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1# -*-: coding utf-8 -*-
2""" This module contains the error-related constants and classes. """
4from __future__ import absolute_import
6from collections import defaultdict, namedtuple
7from collections.abc import MutableMapping
8from copy import copy, deepcopy
9from functools import wraps
10from pprint import pformat
12from casatools.__cerberus__.platform import PYTHON_VERSION
13from casatools.__cerberus__.utils import compare_paths_lt, quote_string
16ErrorDefinition = namedtuple('cerberus_error', 'code, rule')
17"""
18Error definition class
20Each distinguishable error is defined as a two-value-tuple that holds
21a *unique* error id as integer and the rule as string that can cause it.
22The attributes are accessible as properties ``id`` and ``rule``.
23The names do not contain a common prefix as they are supposed to be referenced
24within the module namespace, e.g. errors.CUSTOM
25"""
28# custom
29CUSTOM = ErrorDefinition(0x00, None)
31# existence
32DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141
33DOCUMENT_MISSING = "document is missing"
34REQUIRED_FIELD = ErrorDefinition(0x02, 'required')
35UNKNOWN_FIELD = ErrorDefinition(0x03, None)
36DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies')
37DEPENDENCIES_FIELD_VALUE = ErrorDefinition(0x05, 'dependencies')
38EXCLUDES_FIELD = ErrorDefinition(0x06, 'excludes')
40# shape
41DOCUMENT_FORMAT = ErrorDefinition(0x21, None) # issues/141
42DOCUMENT_FORMAT = "'{0}' is not a document, must be a dict"
43EMPTY_NOT_ALLOWED = ErrorDefinition(0x22, 'empty')
44NOT_NULLABLE = ErrorDefinition(0x23, 'nullable')
45BAD_TYPE = ErrorDefinition(0x24, 'type')
46BAD_TYPE_FOR_SCHEMA = ErrorDefinition(0x25, 'schema')
47ITEMS_LENGTH = ErrorDefinition(0x26, 'items')
48MIN_LENGTH = ErrorDefinition(0x27, 'minlength')
49MAX_LENGTH = ErrorDefinition(0x28, 'maxlength')
52# color
53REGEX_MISMATCH = ErrorDefinition(0x41, 'regex')
54MIN_VALUE = ErrorDefinition(0x42, 'min')
55MAX_VALUE = ErrorDefinition(0x43, 'max')
56UNALLOWED_VALUE = ErrorDefinition(0x44, 'allowed')
57UNALLOWED_VALUES = ErrorDefinition(0x45, 'allowed')
58FORBIDDEN_VALUE = ErrorDefinition(0x46, 'forbidden')
59FORBIDDEN_VALUES = ErrorDefinition(0x47, 'forbidden')
61# other
62NORMALIZATION = ErrorDefinition(0x60, None)
63COERCION_FAILED = ErrorDefinition(0x61, 'coerce')
64RENAMING_FAILED = ErrorDefinition(0x62, 'rename_handler')
65READONLY_FIELD = ErrorDefinition(0x63, 'readonly')
66SETTING_DEFAULT_FAILED = ErrorDefinition(0x64, 'default_setter')
68# groups
69ERROR_GROUP = ErrorDefinition(0x80, None)
70MAPPING_SCHEMA = ErrorDefinition(0x81, 'schema')
71SEQUENCE_SCHEMA = ErrorDefinition(0x82, 'schema')
72KEYSCHEMA = ErrorDefinition(0x83, 'keyschema')
73VALUESCHEMA = ErrorDefinition(0x84, 'valueschema')
74BAD_ITEMS = ErrorDefinition(0x8f, 'items')
76LOGICAL = ErrorDefinition(0x90, None)
77NONEOF = ErrorDefinition(0x91, 'noneof')
78ONEOF = ErrorDefinition(0x92, 'oneof')
79ANYOF = ErrorDefinition(0x93, 'anyof')
80ALLOF = ErrorDefinition(0x94, 'allof')
83""" SchemaError messages """
85SCHEMA_ERROR_DEFINITION_TYPE = \
86 "schema definition for field '{0}' must be a dict"
87SCHEMA_ERROR_MISSING = "validation schema missing"
90""" Error representations """
93class ValidationError(object):
94 """ A simple class to store and query basic error information. """
95 def __init__(self, document_path, schema_path, code, rule, constraint,
96 value, info):
97 self.document_path = document_path
98 """ The path to the field within the document that caused the error.
99 Type: :class:`tuple` """
100 self.schema_path = schema_path
101 """ The path to the rule within the schema that caused the error.
102 Type: :class:`tuple` """
103 self.code = code
104 """ The error's identifier code. Type: :class:`int` """
105 self.rule = rule
106 """ The rule that failed. Type: `string` """
107 self.constraint = constraint
108 """ The constraint that failed. """
109 self.value = value
110 """ The value that failed. """
111 self.info = info
112 """ May hold additional information about the error.
113 Type: :class:`tuple` """
115 def __eq__(self, other):
116 """ Assumes the errors relate to the same document and schema. """
117 return hash(self) == hash(other)
119 def __hash__(self):
120 """ Expects that all other properties are transitively determined. """
121 return hash(self.document_path) ^ hash(self.schema_path) \
122 ^ hash(self.code)
124 def __lt__(self, other):
125 if self.document_path != other.document_path:
126 return compare_paths_lt(self.document_path, other.document_path)
127 else:
128 return compare_paths_lt(self.schema_path, other.schema_path)
130 def __repr__(self):
131 return "{class_name} @ {memptr} ( " \
132 "document_path={document_path}," \
133 "schema_path={schema_path}," \
134 "code={code}," \
135 "constraint={constraint}," \
136 "value={value}," \
137 "info={info} )"\
138 .format(class_name=self.__class__.__name__, memptr=hex(id(self)), # noqa: E501
139 document_path=self.document_path,
140 schema_path=self.schema_path,
141 code=hex(self.code),
142 constraint=quote_string(self.constraint),
143 value=quote_string(self.value),
144 info=self.info)
146 @property
147 def child_errors(self):
148 """
149 A list that contains the individual errors of a bulk validation error.
150 """
151 return self.info[0] if self.is_group_error else None
153 @property
154 def definitions_errors(self):
155 """ Dictionary with errors of an *of-rule mapped to the index of the
156 definition it occurred in. Returns :obj:`None` if not applicable.
157 """
158 if not self.is_logic_error:
159 return None
161 result = defaultdict(list)
162 for error in self.child_errors:
163 i = error.schema_path[len(self.schema_path)]
164 result[i].append(error)
165 return result
167 @property
168 def field(self):
169 """ Field of the contextual mapping, possibly :obj:`None`. """
170 if self.document_path:
171 return self.document_path[-1]
172 else:
173 return None
175 @property
176 def is_group_error(self):
177 """ ``True`` for errors of bulk validations. """
178 return bool(self.code & ERROR_GROUP.code)
180 @property
181 def is_logic_error(self):
182 """ ``True`` for validation errors against different schemas with
183 *of-rules. """
184 return bool(self.code & LOGICAL.code - ERROR_GROUP.code)
186 @property
187 def is_normalization_error(self):
188 """ ``True`` for normalization errors. """
189 return bool(self.code & NORMALIZATION.code)
192class ErrorList(list):
193 """ A list for :class:`~cerberus.errrors.ValidationError` instances that
194 can be queried with the ``in`` keyword for a particular error code. """
195 def __contains__(self, error_definition):
196 for code in (x.code for x in self):
197 if code == error_definition.code:
198 return True
199 return False
202class ErrorTreeNode(MutableMapping):
203 __slots__ = ('descendants', 'errors', 'parent_node', 'path', 'tree_root')
205 def __init__(self, path, parent_node):
206 self.parent_node = parent_node
207 self.tree_root = self.parent_node.tree_root
208 self.path = path[:self.parent_node.depth + 1]
209 self.errors = ErrorList()
210 self.descendants = {}
212 def __add__(self, error):
213 self.add(error)
214 return self
216 def __delitem__(self, key):
217 del self.descendants[key]
219 def __iter__(self):
220 return iter(self.errors)
222 def __getitem__(self, item):
223 return self.descendants.get(item)
225 def __len__(self):
226 return len(self.errors)
228 def __setitem__(self, key, value):
229 self.descendants[key] = value
231 def __str__(self):
232 return str(self.errors) + ',' + str(self.descendants)
234 @property
235 def depth(self):
236 return len(self.path)
238 @property
239 def tree_type(self):
240 return self.tree_root.tree_type
242 def add(self, error):
243 error_path = self._path_of_(error)
245 key = error_path[self.depth]
246 if key not in self.descendants:
247 self[key] = ErrorTreeNode(error_path, self)
249 if len(error_path) == self.depth + 1:
250 self[key].errors.append(error)
251 self[key].errors.sort()
252 if error.is_group_error:
253 for child_error in error.child_errors:
254 self.tree_root += child_error
255 else:
256 self[key] += error
258 def _path_of_(self, error):
259 return getattr(error, self.tree_type + '_path')
262class ErrorTree(ErrorTreeNode):
263 """ Base class for :class:`~cerberus.errors.DocumentErrorTree` and
264 :class:`~cerberus.errors.SchemaErrorTree`. """
265 def __init__(self, errors=[]):
266 self.parent_node = None
267 self.tree_root = self
268 self.path = ()
269 self.errors = ErrorList()
270 self.descendants = {}
271 for error in errors:
272 self += error
274 def add(self, error):
275 """ Add an error to the tree.
277 :param error: :class:`~cerberus.errors.ValidationError`
278 """
279 if not self._path_of_(error):
280 self.errors.append(error)
281 self.errors.sort()
282 else:
283 super(ErrorTree, self).add(error)
285 def fetch_errors_from(self, path):
286 """ Returns all errors for a particular path.
288 :param path: :class:`tuple` of :term:`hashable` s.
289 :rtype: :class:`~cerberus.errors.ErrorList`
290 """
291 node = self.fetch_node_from(path)
292 if node is not None:
293 return node.errors
294 else:
295 return ErrorList()
297 def fetch_node_from(self, path):
298 """ Returns a node for a path.
300 :param path: Tuple of :term:`hashable` s.
301 :rtype: :class:`~cerberus.errors.ErrorTreeNode` or :obj:`None`
302 """
303 context = self
304 for key in path:
305 context = context[key]
306 if context is None:
307 break
308 return context
311class DocumentErrorTree(ErrorTree):
312 """ Implements a dict-like class to query errors by indexes following the
313 structure of a validated document. """
314 tree_type = 'document'
317class SchemaErrorTree(ErrorTree):
318 """ Implements a dict-like class to query errors by indexes following the
319 structure of the used schema. """
320 tree_type = 'schema'
323class BaseErrorHandler(object):
324 """ Base class for all error handlers.
325 Subclasses are identified as error-handlers with an instance-test. """
326 def __init__(self, *args, **kwargs):
327 """ Optionally initialize a new instance. """
328 pass
330 def __call__(self, errors):
331 """ Returns errors in a handler-specific format.
333 :param errors: An object containing the errors.
334 :type errors: :term:`iterable` of
335 :class:`~cerberus.errors.ValidationError` instances or a
336 :class:`~cerberus.Validator` instance
337 """
338 raise NotImplementedError
340 def __iter__(self):
341 """ Be a superhero and implement an iterator over errors. """
342 raise NotImplementedError
344 def add(self, error):
345 """ Add an error to the errors' container object of a handler.
347 :param error: The error to add.
348 :type error: :class:`~cerberus.errors.ValidationError`
349 """
350 raise NotImplementedError
352 def emit(self, error):
353 """ Optionally emits an error in the handler's format to a stream.
354 Or light a LED, or even shut down a power plant.
356 :param error: The error to emit.
357 :type error: :class:`~cerberus.errors.ValidationError`
358 """
359 pass
361 def end(self, validator):
362 """ Gets called when a validation ends.
364 :param validator: The calling validator.
365 :type validator: :class:`~cerberus.Validator` """
366 pass
368 def extend(self, errors):
369 """ Adds all errors to the handler's container object.
371 :param errors: The errors to add.
372 :type errors: :term:`iterable` of
373 :class:`~cerberus.errors.ValidationError` instances
374 """
375 for error in errors:
376 self.add(error)
378 def start(self, validator):
379 """ Gets called when a validation starts.
381 :param validator: The calling validator.
382 :type validator: :class:`~cerberus.Validator`
383 """
384 pass
387class ToyErrorHandler(BaseErrorHandler):
388 def __call__(self, *args, **kwargs):
389 raise RuntimeError('This is not supposed to happen.')
391 def clear(self):
392 pass
395def encode_unicode(f):
396 """Cerberus error messages expect regular binary strings.
397 If unicode is used in a ValidationError message can't be printed.
399 This decorator ensures that if legacy Python is used unicode
400 strings are encoded before passing to a function.
401 """
402 @wraps(f)
403 def wrapped(obj, error):
405 def _encode(value):
406 """Helper encoding unicode strings into binary utf-8"""
407 if isinstance(value, unicode): # noqa: F821
408 return value.encode('utf-8')
409 return value
411 error = copy(error)
412 error.document_path = _encode(error.document_path)
413 error.schema_path = _encode(error.schema_path)
414 error.constraint = _encode(error.constraint)
415 error.value = _encode(error.value)
416 error.info = _encode(error.info)
417 return f(obj, error)
419 return wrapped if PYTHON_VERSION < 3 else f
422class BasicErrorHandler(BaseErrorHandler):
423 """ Models cerberus' legacy. Returns a :class:`dict`. """
424 messages = {0x00: "{0}",
426 0x01: "document is missing",
427 0x02: "required field",
428 0x03: "unknown field",
429 0x04: "field '{0}' is required",
430 0x05: "depends on these values: {constraint}",
431 0x06: "{0} must not be present with '{field}'",
433 0x21: "'{0}' is not a document, must be a dict",
434 0x22: "empty values not allowed",
435 0x23: "null value not allowed",
436 0x24: "must be of {constraint} type",
437 0x25: "must be of dict type",
438 0x26: "length of list should be {constraint}, it is {0}",
439 0x27: "min length is {constraint}",
440 0x28: "max length is {constraint}",
442 0x41: "value does not match regex '{constraint}'",
443 0x42: "min value is {constraint}",
444 0x43: "max value is {constraint}",
445 0x44: "unallowed value {value}",
446 0x45: "unallowed values {0}",
447 0x46: "unallowed value {value}",
448 0x47: "unallowed values {0}",
450 0x61: "field '{field}' cannot be coerced: {0}",
451 0x62: "field '{field}' cannot be renamed: {0}",
452 0x63: "field is read-only",
453 0x64: "default value for '{field}' cannot be set: {0}",
455 0x81: "mapping doesn't validate subschema: {0}",
456 0x82: "one or more sequence-items don't validate: {0}",
457 0x83: "one or more keys of a mapping don't validate: "
458 "{0}",
459 0x84: "one or more values in a mapping don't validate: {0}",
460 0x85: "one or more sequence-items don't validate: {0}",
462 0x91: "one or more definitions validate",
463 0x92: "none or more than one rule validate",
464 0x93: "no definitions validate",
465 0x94: "one or more definitions don't validate"
466 }
468 def __init__(self, tree=None):
469 self.tree = {} if tree is None else tree
471 def __call__(self, errors=None):
472 if errors is not None:
473 self.clear()
474 self.extend(errors)
475 return self.pretty_tree
477 def __str__(self):
478 return pformat(self.pretty_tree)
480 @encode_unicode
481 def add(self, error):
482 if error.is_logic_error:
483 self.insert_logic_error(error)
484 elif error.is_group_error:
485 self.insert_group_error(error)
486 elif error.code in self.messages:
487 self.insert_error(error.document_path,
488 self.format_message(error.field, error))
490 def clear(self):
491 self.tree = {}
493 def format_message(self, field, error):
494 return self.messages[error.code].format(
495 *error.info, constraint=error.constraint,
496 field=field, value=error.value)
498 def insert_error(self, path, node):
499 """ Adds an error or sub-tree to :attr:tree.
501 :param path: Path to the error.
502 :type path: Tuple of strings and integers.
503 :param node: An error message or a sub-tree.
504 :type node: String or dictionary.
505 """
506 field = path[0]
507 if len(path) == 1:
508 if field in self.tree:
509 subtree = self.tree[field].pop()
510 self.tree[field] += [node, subtree]
511 else:
512 self.tree[field] = [node, {}]
513 elif len(path) >= 1:
514 if field not in self.tree:
515 self.tree[field] = [{}]
516 subtree = self.tree[field][-1]
518 if subtree:
519 new = self.__class__(tree=copy(subtree))
520 else:
521 new = self.__class__()
522 new.insert_error(path[1:], node)
523 subtree.update(new.tree)
525 def insert_group_error(self, error):
526 for error in error.child_errors:
527 if error.is_logic_error:
528 self.insert_logic_error(error)
529 elif error.is_group_error:
530 self.insert_group_error(error)
531 else:
532 self.insert_error(error.document_path,
533 self.format_message(error.field, error))
535 def insert_logic_error(self, error):
536 path = error.document_path + (error.rule, )
537 field = error.field
539 self.insert_error(path, self.format_message(field, error))
541 for i in error.definitions_errors:
542 child_errors = error.definitions_errors[i]
543 if not child_errors:
544 continue
545 nodename = '%s definition %s' % (error.rule, i)
546 for child_error in child_errors:
547 if child_error.is_logic_error:
548 raise NotImplementedError
549 elif child_error.is_group_error:
550 raise NotImplementedError
551 else:
552 self.insert_error(path + (nodename,),
553 self.format_message(field, child_error))
555 @property
556 def pretty_tree(self):
557 pretty = deepcopy(self.tree)
558 for field in pretty:
559 self._purge_empty_dicts(pretty[field])
560 return pretty
562 def _purge_empty_dicts(self, error_list):
563 subtree = error_list[-1]
564 if not error_list[-1]:
565 error_list.pop()
566 else:
567 for key in subtree:
568 self._purge_empty_dicts(subtree[key])
570 def start(self, validator):
571 self.clear()
574class SchemaErrorHandler(BasicErrorHandler):
575 messages = BasicErrorHandler.messages.copy()
576 messages[0x03] = "unknown rule"