Coverage for /wheeldirectory/casa-6.7.0-12-py3.10.el8/lib/py/lib/python3.10/site-packages/casatools/__cerberus__/schema.py: 54%

257 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-10-31 17:39 +0000

1from __future__ import absolute_import 

2 

3from collections.abc import Callable, Hashable, Iterable, Mapping, MutableMapping 

4from copy import copy 

5 

6from casatools.__cerberus__ import errors 

7from casatools.__cerberus__.platform import _str_type 

8from casatools.__cerberus__.utils import get_Validator_class, validator_factory, mapping_hash 

9 

10 

11class SchemaError(Exception): 

12 """ Raised when the validation schema is missing, has the wrong format or 

13 contains errors. """ 

14 pass 

15 

16 

17class DefinitionSchema(MutableMapping): 

18 """ A dict-subclass for caching of validated schemas. """ 

19 

20 def __new__(cls, *args, **kwargs): 

21 if 'SchemaValidator' not in globals(): 

22 global SchemaValidator 

23 SchemaValidator = validator_factory('SchemaValidator', 

24 SchemaValidatorMixin) 

25 return super(DefinitionSchema, cls).__new__(cls) 

26 

27 def __init__(self, validator, schema={}): 

28 """ 

29 :param validator: An instance of Validator-(sub-)class that uses this 

30 schema. 

31 :param schema: A definition-schema as ``dict``. Defaults to an empty 

32 one. 

33 """ 

34 if not isinstance(validator, get_Validator_class()): 

35 raise RuntimeError('validator argument must be a Validator-' 

36 'instance.') 

37 self.validator = validator 

38 

39 if isinstance(schema, _str_type): 

40 schema = validator.schema_registry.get(schema, schema) 

41 

42 if not isinstance(schema, Mapping): 

43 try: 

44 schema = dict(schema) 

45 except Exception: 

46 raise SchemaError( 

47 errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema)) 

48 

49 self.validation_schema = SchemaValidationSchema(validator) 

50 self.schema_validator = SchemaValidator( 

51 None, allow_unknown=self.validation_schema, 

52 error_handler=errors.SchemaErrorHandler, 

53 target_schema=schema, target_validator=validator) 

54 

55 schema = self.expand(schema) 

56 self.validate(schema) 

57 self.schema = schema 

58 

59 def __delitem__(self, key): 

60 _new_schema = self.schema.copy() 

61 try: 

62 del _new_schema[key] 

63 except ValueError: 

64 raise SchemaError("Schema has no field '%s' defined" % key) 

65 except Exception as e: 

66 raise e 

67 else: 

68 del self.schema[key] 

69 

70 def __getitem__(self, item): 

71 return self.schema[item] 

72 

73 def __iter__(self): 

74 return iter(self.schema) 

75 

76 def __len__(self): 

77 return len(self.schema) 

78 

79 def __repr__(self): 

80 return str(self) 

81 

82 def __setitem__(self, key, value): 

83 value = self.expand({0: value})[0] 

84 self.validate({key: value}) 

85 self.schema[key] = value 

86 

87 def __str__(self): 

88 return str(self.schema) 

89 

90 def copy(self): 

91 return self.__class__(self.validator, self.schema.copy()) 

92 

93 @classmethod 

94 def expand(cls, schema): 

95 try: 

96 schema = cls._expand_logical_shortcuts(schema) 

97 schema = cls._expand_subschemas(schema) 

98 except Exception: 

99 pass 

100 return schema 

101 

102 @classmethod 

103 def _expand_logical_shortcuts(cls, schema): 

104 """ Expand agglutinated rules in a definition-schema. 

105 

106 :param schema: The schema-definition to expand. 

107 :return: The expanded schema-definition. 

108 """ 

109 def is_of_rule(x): 

110 return isinstance(x, _str_type) and \ 

111 x.startswith(('allof_', 'anyof_', 'noneof_', 'oneof_')) 

112 

113 for field in schema: 

114 for of_rule in (x for x in schema[field] if is_of_rule(x)): 

115 operator, rule = of_rule.split('_') 

116 schema[field].update({operator: []}) 

117 for value in schema[field][of_rule]: 

118 schema[field][operator].append({rule: value}) 

119 del schema[field][of_rule] 

120 return schema 

121 

122 @classmethod 

123 def _expand_subschemas(cls, schema): 

124 def has_schema_rule(): 

125 return isinstance(schema[field], Mapping) and \ 

126 'schema' in schema[field] 

127 

128 def has_mapping_schema(): 

129 """ Tries to determine heuristically if the schema-constraints are 

130 aimed to mappings. """ 

131 try: 

132 return all(isinstance(x, Mapping) for x 

133 in schema[field]['schema'].values()) 

134 except TypeError: 

135 return False 

136 

137 for field in schema: 

138 if not has_schema_rule(): 

139 pass 

140 elif has_mapping_schema(): 

141 schema[field]['schema'] = cls.expand(schema[field]['schema']) 

142 else: # assumes schema-constraints for a sequence 

143 schema[field]['schema'] = \ 

144 cls.expand({0: schema[field]['schema']})[0] 

145 

146 for rule in ('keyschema', 'valueschema'): 

147 if rule in schema[field]: 

148 schema[field][rule] = \ 

149 cls.expand({0: schema[field][rule]})[0] 

150 

151 for rule in ('allof', 'anyof', 'items', 'noneof', 'oneof'): 

152 if rule in schema[field]: 

153 new_rules_definition = [] 

154 for item in schema[field][rule]: 

155 new_rules_definition.append(cls.expand({0: item})[0]) 

156 schema[field][rule] = new_rules_definition 

157 return schema 

158 

159 def update(self, schema): 

160 try: 

161 schema = self.expand(schema) 

162 _new_schema = self.schema.copy() 

163 _new_schema.update(schema) 

164 self.validate(_new_schema) 

165 except ValueError: 

166 raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE 

167 .format(schema)) 

168 except Exception as e: 

169 raise e 

170 else: 

171 self.schema = _new_schema 

172 

173 def regenerate_validation_schema(self): 

174 self.validation_schema = SchemaValidationSchema(self.validator) 

175 

176 def validate(self, schema=None): 

177 if schema is None: 

178 schema = self.schema 

179 _hash = mapping_hash(schema) 

180 if _hash not in self.validator._valid_schemas: 

181 self._validate(schema) 

182 self.validator._valid_schemas.add(_hash) 

183 

184 def _validate(self, schema): 

185 """ Validates a schema that defines rules against supported rules. 

186 

187 :param schema: The schema to be validated as a legal cerberus schema 

188 according to the rules of this Validator object. 

189 """ 

190 if isinstance(schema, _str_type): 

191 schema = self.validator.schema_registry.get(schema, schema) 

192 

193 if schema is None: 

194 raise SchemaError(errors.SCHEMA_ERROR_MISSING) 

195 

196 schema = copy(schema) 

197 for field in schema: 

198 if isinstance(schema[field], _str_type): 

199 schema[field] = rules_set_registry.get(schema[field], 

200 schema[field]) 

201 

202 if not self.schema_validator(schema, normalize=False): 

203 raise SchemaError(self.schema_validator.errors) 

204 

205 

206class UnvalidatedSchema(DefinitionSchema): 

207 def __init__(self, schema={}): 

208 if not isinstance(schema, Mapping): 

209 schema = dict(schema) 

210 self.schema = schema 

211 

212 def validate(self, schema): 

213 pass 

214 

215 def copy(self): 

216 # Override ancestor's copy, because 

217 # UnvalidatedSchema does not have .validator: 

218 return self.__class__(self.schema.copy()) 

219 

220 

221class SchemaValidationSchema(UnvalidatedSchema): 

222 def __init__(self, validator): 

223 self.schema = {'allow_unknown': False, 

224 'schema': validator.rules, 

225 'type': 'dict'} 

226 

227 

228class SchemaValidatorMixin(object): 

229 """ This validator is extended to validate schemas passed to a Cerberus 

230 validator. """ 

231 @property 

232 def known_rules_set_refs(self): 

233 return self._config.get('known_rules_set_refs', ()) 

234 

235 @known_rules_set_refs.setter 

236 def known_rules_set_refs(self, value): 

237 self._config['known_rules_set_refs'] = value 

238 

239 @property 

240 def known_schema_refs(self): 

241 return self._config.get('known_schema_refs', ()) 

242 

243 @known_schema_refs.setter 

244 def known_schema_refs(self, value): 

245 self._config['known_schema_refs'] = value 

246 

247 @property 

248 def target_schema(self): 

249 """ The schema that is being validated. """ 

250 return self._config['target_schema'] 

251 

252 @property 

253 def target_validator(self): 

254 """ The validator whose schema is being validated. """ 

255 return self._config['target_validator'] 

256 

257 def _validate_logical(self, rule, field, value): 

258 """ {'allowed': ('allof', 'anyof', 'noneof', 'oneof')} """ 

259 validator = self._get_child_validator( 

260 document_crumb=rule, 

261 schema=self.root_allow_unknown['schema'], 

262 allow_unknown=self.root_allow_unknown['allow_unknown'] 

263 ) 

264 

265 for constraints in value: 

266 _hash = mapping_hash({'turing': constraints}) 

267 if _hash in self.target_validator._valid_schemas: 

268 continue 

269 

270 validator(constraints, normalize=False) 

271 if validator._errors: 

272 self._error(validator._errors) 

273 else: 

274 self.target_validator._valid_schemas.add(_hash) 

275 

276 def _validate_type_callable(self, value): 

277 if isinstance(value, Callable): 

278 return True 

279 

280 def _validate_type_hashable(self, value): 

281 if isinstance(value, Hashable): 

282 return True 

283 

284 def _validate_type_hashables(self, value): 

285 if self._validate_type_list(value): 

286 return all(self._validate_type_hashable(x) for x in value) 

287 

288 def _validator_bulk_schema(self, field, value): 

289 if isinstance(value, _str_type): 

290 if value in self.known_rules_set_refs: 

291 return 

292 else: 

293 self.known_rules_set_refs += (value,) 

294 definition = self.target_validator.rules_set_registry.get(value) 

295 if definition is None: 

296 path = self.document_path + (field,) 

297 self._error(path, 'Rules set definition %s not found.' % value) 

298 return 

299 else: 

300 value = definition 

301 

302 _hash = mapping_hash({'turing': value}) 

303 if _hash in self.target_validator._valid_schemas: 

304 return 

305 

306 validator = self._get_child_validator( 

307 document_crumb=field, 

308 schema=self.root_allow_unknown['schema'], 

309 allow_unknown=self.root_allow_unknown['allow_unknown']) 

310 validator(value, normalize=False) 

311 if validator._errors: 

312 self._error(validator._errors) 

313 else: 

314 self.target_validator._valid_schemas.add(_hash) 

315 

316 def _validator_handler(self, field, value): 

317 if isinstance(value, Callable): 

318 return 

319 if isinstance(value, _str_type): 

320 if value not in self.target_validator.validators + \ 

321 self.target_validator.coercers: 

322 self._error(field, '%s is no valid coercer' % value) 

323 elif isinstance(value, Iterable): 

324 for handler in value: 

325 self._validator_handler(field, handler) 

326 

327 def _validator_items(self, field, value): 

328 for i, schema in enumerate(value): 

329 self._validator_bulk_schema((field, i), schema) 

330 

331 def _validator_schema(self, field, value): 

332 if isinstance(value, _str_type): 

333 if value in self.known_schema_refs: 

334 return 

335 else: 

336 self.known_schema_refs += (value,) 

337 definition = self.target_validator.schema_registry.get(value) 

338 if definition is None: 

339 path = self.document_path + (field,) 

340 self._error(path, 'Schema definition %s not found.' % value) 

341 return 

342 else: 

343 value = definition 

344 

345 _hash = mapping_hash(value) 

346 if _hash in self.target_validator._valid_schemas: 

347 return 

348 

349 validator = self._get_child_validator( 

350 document_crumb=field, 

351 schema=None, allow_unknown=self.root_allow_unknown) 

352 validator(value, normalize=False) 

353 if validator._errors: 

354 self._error(validator._errors) 

355 else: 

356 self.target_validator._valid_schemas.add(_hash) 

357 

358 

359#### 

360 

361 

362class Registry(object): 

363 """ A registry to store and retrieve schemas and parts of it by a name 

364 that can be used in validation schemas. 

365 

366 :param definitions: Optional, initial definitions. 

367 :type definitions: any :term:`mapping` """ 

368 

369 def __init__(self, definitions={}): 

370 self._storage = {} 

371 self.extend(definitions) 

372 

373 def add(self, name, definition): 

374 """ Register a definition to the registry. Existing definitions are 

375 replaced silently. 

376 

377 :param name: The name which can be used as reference in a validation 

378 schema. 

379 :type name: :class:`str` 

380 :param definition: The definition. 

381 :type definition: any :term:`mapping` """ 

382 self._storage[name] = self._expand_definition(definition) 

383 

384 def all(self): 

385 """ Returns a :class:`dict` with all registered definitions mapped to 

386 their name. """ 

387 return self._storage 

388 

389 def extend(self, definitions): 

390 """ Add several definitions at once. Existing definitions are 

391 replaced silently. 

392 

393 :param definitions: The names and definitions. 

394 :type definitions: a :term:`mapping` or an :term:`iterable` with 

395 two-value :class:`tuple` s """ 

396 for name, definition in dict(definitions).items(): 

397 self.add(name, definition) 

398 

399 def clear(self): 

400 """ Purge all definitions in the registry. """ 

401 self._storage.clear() 

402 

403 def get(self, name, default=None): 

404 """ Retrieve a definition from the registry. 

405 

406 :param name: The reference that points to the definition. 

407 :type name: :class:`str` 

408 :param default: Return value if the reference isn't registered. """ 

409 return self._storage.get(name, default) 

410 

411 def remove(self, *names): 

412 """ Unregister definitions from the registry. 

413 

414 :param names: The names of the definitions that are to be 

415 unregistered. """ 

416 for name in names: 

417 self._storage.pop(name, None) 

418 

419 

420class SchemaRegistry(Registry): 

421 @classmethod 

422 def _expand_definition(cls, definition): 

423 return DefinitionSchema.expand(definition) 

424 

425 

426class RulesSetRegistry(Registry): 

427 @classmethod 

428 def _expand_definition(cls, definition): 

429 return DefinitionSchema.expand({0: definition})[0] 

430 

431 

432schema_registry, rules_set_registry = SchemaRegistry(), RulesSetRegistry()