Source code for tendril.schema.base

#!/usr/bin/env python
# encoding: utf-8

# Copyright (C) 2019 Chintalagiri Shashank
# This file is part of tendril.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.
Base Schemas (:mod:`tendril.schema.base`)

import os
import warnings
from six import iteritems
from decimal import Decimal
from jinja2 import Template
from tendril.utils.files import yml as yaml

from tendril.validation.base import ValidatableBase
from tendril.validation.base import ValidationContext
from tendril.validation.schema import SchemaPolicy
from tendril.validation.schema import SchemaNotSupportedError
from tendril.validation.configs import ConfigOptionPolicy
from tendril.validation.configs import ContextualConfigError

from tendril.utils import log
logger = log.get_logger(__name__, log.DEFAULT)

[docs]class SchemaProcessorBase(ValidatableBase): def __init__(self, *args, **kwargs): super(SchemaProcessorBase, self).__init__(*args, **kwargs) self._policies = {} self._load_schema_policies() @property def _raw(self): return self._raw_content
[docs] def _p(self, *args, **kwargs): return ConfigOptionPolicy(self._validation_context, *args, **kwargs)
[docs] def elements(self): return {}
[docs] def schema_policies(self): policies = self.elements() return policies
[docs] def _load_schema_policies(self): self._policies.update(self.schema_policies())
[docs] def _process_element(self, key, policy): if isinstance(policy, ConfigOptionPolicy): try: value = policy.get(self._raw) if isinstance(value, ValidatableBase): value.validate() self._validation_errors.add(value.validation_errors) setattr(self, key, value) except ContextualConfigError as e: self._validation_errors.add(e)
[docs] def _process(self): for key, policy in iteritems(self._policies): self._process_element(key, policy)
def __getattr__(self, item): if item not in self._policies.keys(): raise AttributeError("%r has no attribute %r" % (type(self), item)) policy = self._policies[item] return policy.get(self._raw)
[docs] def _validate(self): self._validated = True
[docs]class NakedSchemaObject(SchemaProcessorBase): def __init__(self, content, *args, **kwargs): super(NakedSchemaObject, self).__init__(*args, **kwargs) self._raw_content = content self._process() if self.validation_errors.terrors: warnings.warn("{0} of class {1} has {2} Validation Errors" "".format(self.ident, self.__class__.__name__, self.validation_errors.terrors), UserWarning)
[docs]class SchemaControlledObject(NakedSchemaObject): legacy_schema_name = None supports_schema_name = None supports_schema_version_max = None supports_schema_version_min = None def __init__(self, *args, strict_schema=False, **kwargs): self._strict_schema = strict_schema super(SchemaControlledObject, self).__init__(*args, **kwargs)
[docs] def _stub_content(self): return { 'schema_name': self.supports_schema_name, 'schema_version': self.supports_schema_version_max, }
[docs] def elements(self): e = super(SchemaControlledObject, self).elements() e.update({ 'schema_name': self._p(('schema', 'name'),), 'schema_version': self._p(('schema', 'version'), parser=Decimal), }) return e
[docs] def schema_policies(self): policies = super(SchemaControlledObject, self).schema_policies() policies.update({ 'schema_policy': SchemaPolicy( self._validation_context, self.supports_schema_name, self.supports_schema_version_max, self.supports_schema_version_min ) }) return policies
[docs] def _verify_schema_decl(self): policy = self._policies['schema_policy'] if self.supports_schema_name == '*': return if self.schema_name == self.legacy_schema_name: self.schema_name = self.supports_schema_name logger.debug("Validating Schema Policy : {0} {1}" "".format(self.schema_name, self.schema_version)) if not policy.validate(self.schema_name, self.schema_version): raise SchemaNotSupportedError( policy, '{0} v{1}'.format(self.schema_name, self.schema_version) )
[docs] def _process(self): super(SchemaControlledObject, self)._process() try: self._verify_schema_decl() except SchemaNotSupportedError as e: if self._strict_schema: raise self._validation_errors.add(e)
[docs]class SchemaControlledYamlFile(SchemaControlledObject): supports_schema_name = '*' FileNotFoundExceptionType = None template = None def __init__(self, path, *args, **kwargs): self._path = path vctx = ValidationContext( self._path, locality=self.supports_schema_name or self.__class__.__name__ ) raw_content = self._get_yaml_file() super(SchemaControlledYamlFile, self).__init__( raw_content, *args, vctx=vctx, **kwargs ) @property def path(self): return self._path
[docs] def _generate_stub(self): template = Template(open(self.template).read()) with open(self._path, 'w') as f: f.write(template.render(stage=self._stub_content()))
[docs] def _get_yaml_file(self): if self.template and not os.path.exists(self._path): self._generate_stub() if self.FileNotFoundExceptionType and not os.path.exists(self._path): raise self.FileNotFoundExceptionType(self._path) return yaml.load(self._path)
[docs]def load(manager): logger.debug("Loading {0}".format(__name__)) manager.load_schema('SchemaControlledYamlFile', SchemaControlledYamlFile, doc="Base class for schema controlled file processors.")