Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatools/__cerberus__/schema.py: 54%
257 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-01 07:19 +0000
1from __future__ import absolute_import
3from collections.abc import Callable, Hashable, Iterable, Mapping, MutableMapping
4from copy import copy
6from casatools.__cerberus__ import errors
7from casatools.__cerberus__.platform import _str_type
8from casatools.__cerberus__.utils import get_Validator_class, validator_factory, mapping_hash
11class SchemaError(Exception):
12 """ Raised when the validation schema is missing, has the wrong format or
13 contains errors. """
14 pass
17class DefinitionSchema(MutableMapping):
18 """ A dict-subclass for caching of validated schemas. """
20 def __new__(cls, *args, **kwargs):
21 if 'SchemaValidator' not in globals():
22 global SchemaValidator
23 SchemaValidator = validator_factory('SchemaValidator',
24 SchemaValidatorMixin)
25 return super(DefinitionSchema, cls).__new__(cls)
27 def __init__(self, validator, schema={}):
28 """
29 :param validator: An instance of Validator-(sub-)class that uses this
30 schema.
31 :param schema: A definition-schema as ``dict``. Defaults to an empty
32 one.
33 """
34 if not isinstance(validator, get_Validator_class()):
35 raise RuntimeError('validator argument must be a Validator-'
36 'instance.')
37 self.validator = validator
39 if isinstance(schema, _str_type):
40 schema = validator.schema_registry.get(schema, schema)
42 if not isinstance(schema, Mapping):
43 try:
44 schema = dict(schema)
45 except Exception:
46 raise SchemaError(
47 errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema))
49 self.validation_schema = SchemaValidationSchema(validator)
50 self.schema_validator = SchemaValidator(
51 None, allow_unknown=self.validation_schema,
52 error_handler=errors.SchemaErrorHandler,
53 target_schema=schema, target_validator=validator)
55 schema = self.expand(schema)
56 self.validate(schema)
57 self.schema = schema
59 def __delitem__(self, key):
60 _new_schema = self.schema.copy()
61 try:
62 del _new_schema[key]
63 except ValueError:
64 raise SchemaError("Schema has no field '%s' defined" % key)
65 except Exception as e:
66 raise e
67 else:
68 del self.schema[key]
70 def __getitem__(self, item):
71 return self.schema[item]
73 def __iter__(self):
74 return iter(self.schema)
76 def __len__(self):
77 return len(self.schema)
79 def __repr__(self):
80 return str(self)
82 def __setitem__(self, key, value):
83 value = self.expand({0: value})[0]
84 self.validate({key: value})
85 self.schema[key] = value
87 def __str__(self):
88 return str(self.schema)
90 def copy(self):
91 return self.__class__(self.validator, self.schema.copy())
93 @classmethod
94 def expand(cls, schema):
95 try:
96 schema = cls._expand_logical_shortcuts(schema)
97 schema = cls._expand_subschemas(schema)
98 except Exception:
99 pass
100 return schema
102 @classmethod
103 def _expand_logical_shortcuts(cls, schema):
104 """ Expand agglutinated rules in a definition-schema.
106 :param schema: The schema-definition to expand.
107 :return: The expanded schema-definition.
108 """
109 def is_of_rule(x):
110 return isinstance(x, _str_type) and \
111 x.startswith(('allof_', 'anyof_', 'noneof_', 'oneof_'))
113 for field in schema:
114 for of_rule in (x for x in schema[field] if is_of_rule(x)):
115 operator, rule = of_rule.split('_')
116 schema[field].update({operator: []})
117 for value in schema[field][of_rule]:
118 schema[field][operator].append({rule: value})
119 del schema[field][of_rule]
120 return schema
122 @classmethod
123 def _expand_subschemas(cls, schema):
124 def has_schema_rule():
125 return isinstance(schema[field], Mapping) and \
126 'schema' in schema[field]
128 def has_mapping_schema():
129 """ Tries to determine heuristically if the schema-constraints are
130 aimed to mappings. """
131 try:
132 return all(isinstance(x, Mapping) for x
133 in schema[field]['schema'].values())
134 except TypeError:
135 return False
137 for field in schema:
138 if not has_schema_rule():
139 pass
140 elif has_mapping_schema():
141 schema[field]['schema'] = cls.expand(schema[field]['schema'])
142 else: # assumes schema-constraints for a sequence
143 schema[field]['schema'] = \
144 cls.expand({0: schema[field]['schema']})[0]
146 for rule in ('keyschema', 'valueschema'):
147 if rule in schema[field]:
148 schema[field][rule] = \
149 cls.expand({0: schema[field][rule]})[0]
151 for rule in ('allof', 'anyof', 'items', 'noneof', 'oneof'):
152 if rule in schema[field]:
153 new_rules_definition = []
154 for item in schema[field][rule]:
155 new_rules_definition.append(cls.expand({0: item})[0])
156 schema[field][rule] = new_rules_definition
157 return schema
159 def update(self, schema):
160 try:
161 schema = self.expand(schema)
162 _new_schema = self.schema.copy()
163 _new_schema.update(schema)
164 self.validate(_new_schema)
165 except ValueError:
166 raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE
167 .format(schema))
168 except Exception as e:
169 raise e
170 else:
171 self.schema = _new_schema
173 def regenerate_validation_schema(self):
174 self.validation_schema = SchemaValidationSchema(self.validator)
176 def validate(self, schema=None):
177 if schema is None:
178 schema = self.schema
179 _hash = mapping_hash(schema)
180 if _hash not in self.validator._valid_schemas:
181 self._validate(schema)
182 self.validator._valid_schemas.add(_hash)
184 def _validate(self, schema):
185 """ Validates a schema that defines rules against supported rules.
187 :param schema: The schema to be validated as a legal cerberus schema
188 according to the rules of this Validator object.
189 """
190 if isinstance(schema, _str_type):
191 schema = self.validator.schema_registry.get(schema, schema)
193 if schema is None:
194 raise SchemaError(errors.SCHEMA_ERROR_MISSING)
196 schema = copy(schema)
197 for field in schema:
198 if isinstance(schema[field], _str_type):
199 schema[field] = rules_set_registry.get(schema[field],
200 schema[field])
202 if not self.schema_validator(schema, normalize=False):
203 raise SchemaError(self.schema_validator.errors)
206class UnvalidatedSchema(DefinitionSchema):
207 def __init__(self, schema={}):
208 if not isinstance(schema, Mapping):
209 schema = dict(schema)
210 self.schema = schema
212 def validate(self, schema):
213 pass
215 def copy(self):
216 # Override ancestor's copy, because
217 # UnvalidatedSchema does not have .validator:
218 return self.__class__(self.schema.copy())
221class SchemaValidationSchema(UnvalidatedSchema):
222 def __init__(self, validator):
223 self.schema = {'allow_unknown': False,
224 'schema': validator.rules,
225 'type': 'dict'}
228class SchemaValidatorMixin(object):
229 """ This validator is extended to validate schemas passed to a Cerberus
230 validator. """
231 @property
232 def known_rules_set_refs(self):
233 return self._config.get('known_rules_set_refs', ())
235 @known_rules_set_refs.setter
236 def known_rules_set_refs(self, value):
237 self._config['known_rules_set_refs'] = value
239 @property
240 def known_schema_refs(self):
241 return self._config.get('known_schema_refs', ())
243 @known_schema_refs.setter
244 def known_schema_refs(self, value):
245 self._config['known_schema_refs'] = value
247 @property
248 def target_schema(self):
249 """ The schema that is being validated. """
250 return self._config['target_schema']
252 @property
253 def target_validator(self):
254 """ The validator whose schema is being validated. """
255 return self._config['target_validator']
257 def _validate_logical(self, rule, field, value):
258 """ {'allowed': ('allof', 'anyof', 'noneof', 'oneof')} """
259 validator = self._get_child_validator(
260 document_crumb=rule,
261 schema=self.root_allow_unknown['schema'],
262 allow_unknown=self.root_allow_unknown['allow_unknown']
263 )
265 for constraints in value:
266 _hash = mapping_hash({'turing': constraints})
267 if _hash in self.target_validator._valid_schemas:
268 continue
270 validator(constraints, normalize=False)
271 if validator._errors:
272 self._error(validator._errors)
273 else:
274 self.target_validator._valid_schemas.add(_hash)
276 def _validate_type_callable(self, value):
277 if isinstance(value, Callable):
278 return True
280 def _validate_type_hashable(self, value):
281 if isinstance(value, Hashable):
282 return True
284 def _validate_type_hashables(self, value):
285 if self._validate_type_list(value):
286 return all(self._validate_type_hashable(x) for x in value)
288 def _validator_bulk_schema(self, field, value):
289 if isinstance(value, _str_type):
290 if value in self.known_rules_set_refs:
291 return
292 else:
293 self.known_rules_set_refs += (value,)
294 definition = self.target_validator.rules_set_registry.get(value)
295 if definition is None:
296 path = self.document_path + (field,)
297 self._error(path, 'Rules set definition %s not found.' % value)
298 return
299 else:
300 value = definition
302 _hash = mapping_hash({'turing': value})
303 if _hash in self.target_validator._valid_schemas:
304 return
306 validator = self._get_child_validator(
307 document_crumb=field,
308 schema=self.root_allow_unknown['schema'],
309 allow_unknown=self.root_allow_unknown['allow_unknown'])
310 validator(value, normalize=False)
311 if validator._errors:
312 self._error(validator._errors)
313 else:
314 self.target_validator._valid_schemas.add(_hash)
316 def _validator_handler(self, field, value):
317 if isinstance(value, Callable):
318 return
319 if isinstance(value, _str_type):
320 if value not in self.target_validator.validators + \
321 self.target_validator.coercers:
322 self._error(field, '%s is no valid coercer' % value)
323 elif isinstance(value, Iterable):
324 for handler in value:
325 self._validator_handler(field, handler)
327 def _validator_items(self, field, value):
328 for i, schema in enumerate(value):
329 self._validator_bulk_schema((field, i), schema)
331 def _validator_schema(self, field, value):
332 if isinstance(value, _str_type):
333 if value in self.known_schema_refs:
334 return
335 else:
336 self.known_schema_refs += (value,)
337 definition = self.target_validator.schema_registry.get(value)
338 if definition is None:
339 path = self.document_path + (field,)
340 self._error(path, 'Schema definition %s not found.' % value)
341 return
342 else:
343 value = definition
345 _hash = mapping_hash(value)
346 if _hash in self.target_validator._valid_schemas:
347 return
349 validator = self._get_child_validator(
350 document_crumb=field,
351 schema=None, allow_unknown=self.root_allow_unknown)
352 validator(value, normalize=False)
353 if validator._errors:
354 self._error(validator._errors)
355 else:
356 self.target_validator._valid_schemas.add(_hash)
359####
362class Registry(object):
363 """ A registry to store and retrieve schemas and parts of it by a name
364 that can be used in validation schemas.
366 :param definitions: Optional, initial definitions.
367 :type definitions: any :term:`mapping` """
369 def __init__(self, definitions={}):
370 self._storage = {}
371 self.extend(definitions)
373 def add(self, name, definition):
374 """ Register a definition to the registry. Existing definitions are
375 replaced silently.
377 :param name: The name which can be used as reference in a validation
378 schema.
379 :type name: :class:`str`
380 :param definition: The definition.
381 :type definition: any :term:`mapping` """
382 self._storage[name] = self._expand_definition(definition)
384 def all(self):
385 """ Returns a :class:`dict` with all registered definitions mapped to
386 their name. """
387 return self._storage
389 def extend(self, definitions):
390 """ Add several definitions at once. Existing definitions are
391 replaced silently.
393 :param definitions: The names and definitions.
394 :type definitions: a :term:`mapping` or an :term:`iterable` with
395 two-value :class:`tuple` s """
396 for name, definition in dict(definitions).items():
397 self.add(name, definition)
399 def clear(self):
400 """ Purge all definitions in the registry. """
401 self._storage.clear()
403 def get(self, name, default=None):
404 """ Retrieve a definition from the registry.
406 :param name: The reference that points to the definition.
407 :type name: :class:`str`
408 :param default: Return value if the reference isn't registered. """
409 return self._storage.get(name, default)
411 def remove(self, *names):
412 """ Unregister definitions from the registry.
414 :param names: The names of the definitions that are to be
415 unregistered. """
416 for name in names:
417 self._storage.pop(name, None)
420class SchemaRegistry(Registry):
421 @classmethod
422 def _expand_definition(cls, definition):
423 return DefinitionSchema.expand(definition)
426class RulesSetRegistry(Registry):
427 @classmethod
428 def _expand_definition(cls, definition):
429 return DefinitionSchema.expand({0: definition})[0]
432schema_registry, rules_set_registry = SchemaRegistry(), RulesSetRegistry()