#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) 2016-2019 Chintalagiri Shashank
#
# This file is part of tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Validation elements for Schema Controlled Files
===============================================
"""
from inspect import isclass
from tendril.validation.base import ValidatableBase
from tendril.validation.base import ValidationError
from tendril.validation.base import ValidationPolicy
[docs]class ContextualConfigError(ValidationError):
msg = "Incorrect Configuration"
def __init__(self, policy):
super(ContextualConfigError, self).__init__(policy)
[docs] def _format_path(self):
if isinstance(self._policy.path, tuple):
return '/'.join(self._policy.path)
else:
return self._policy.path
[docs] def render(self):
return {
'is_error': self.policy.is_error,
'group': self.msg,
'headline': self._policy.context.render(),
'detail': "Configuration seems to be incorrect.",
}
[docs]class ConfigKeyError(ContextualConfigError):
msg = "Configuration Key Missing"
def __init__(self, policy):
super(ConfigKeyError, self).__init__(policy)
def __repr__(self):
return "<ConfigKeyError {0} {1}>" \
"".format(self._policy.context, self._format_path())
[docs] def render(self):
if self._policy.options:
option_str = "Valid options are {0}" \
"".format(', '.join(self._policy.options))
else:
option_str = ''
return {
'is_error': self.policy.is_error,
'group': self.msg,
'headline': "{0} missing in {1}"
"".format(self._format_path(),
self._policy.context.render()),
'detail': "This required configuration option could not be "
"found in the configs file. " + option_str,
}
[docs]class ConfigValueInvalidError(ContextualConfigError):
msg = "Configuration Value Unrecognized"
def __init__(self, policy, value):
super(ConfigValueInvalidError, self).__init__(policy)
self._value = value
def __repr__(self):
return "<ConfigValueInvalidError {0} {1}>" \
"".format(self._policy.context, self._format_path())
[docs] def render(self):
if self._policy.options:
option_str = "Valid options are {0}" \
"".format(', '.join(self._policy.options))
else:
option_str = ''
return {
'is_error': self.policy.is_error,
'group': self.msg,
'headline': "'{0}' Invalid for {1} in {2}"
"".format(self._value, self._format_path(),
self._policy.context.render()),
'detail': "The value provided for this configuration option is "
"unrecognized or not allowed in this context. " +
option_str,
}
[docs]class ConfigOptionPolicy(ValidationPolicy):
def __init__(self, context, path, parser=None, parser_args=None,
required=True, options=None, default=None, is_error=True):
super(ConfigOptionPolicy, self).__init__(context, is_error)
self.path = path
self.parser = parser
self._parser_args = parser_args
self.options = options
self.default = default
self.required = required
@property
def parser_args(self):
return self._parser_args or {}
[docs] def get(self, data):
if self.path is None:
return data
try:
return get_dict_val(data, self)
except ConfigKeyError as error:
if not self.required:
if self.default is None or not self.parser:
return self.default
if isclass(self.parser) and isinstance(self.default, self.parser):
return self.default
vctx = self.context.child(self.parser.__name__)
return _parse(self.parser, self.default, vctx=vctx, **self.parser_args)
else:
raise error
[docs]def _parse(parser, value, vctx=None, **parser_args):
if isclass(parser) and \
issubclass(parser, ValidatableBase):
return parser(value, vctx=vctx, **parser_args)
else:
return parser(value)
[docs]def get_dict_val(d, policy=None):
try:
assert isinstance(d, dict)
except AssertionError:
print("Expected to get a dictionary here. This probably means the YAML "
"file is empty or unrecognizably mangled. Got {0} instead.".format(d))
print(policy.context)
raise
if isinstance(policy.path, tuple):
try:
for key in policy.path:
if key not in d.keys():
raise KeyError
d = d.get(key)
except (KeyError, AttributeError):
raise ConfigKeyError(policy=policy)
rval = d
else:
try:
if policy.path not in d.keys():
raise KeyError
rval = d.get(policy.path)
except KeyError:
raise ConfigKeyError(policy=policy)
if policy.parser:
try:
if isinstance(policy.parser, tuple):
for parser in policy.parser:
try:
vctx = policy.context.child(parser.__name__)
rval = _parse(parser, rval, vctx, **policy.parser_args)
break
except:
continue
else:
raise Exception
else:
vctx = policy.context.child(policy.parser.__name__)
rval = _parse(policy.parser, rval, vctx, **policy.parser_args)
except Exception as e:
raise ConfigValueInvalidError(policy=policy, value=rval)
if policy.options is None or rval in policy.options:
return rval
else:
raise ConfigValueInvalidError(policy=policy, value=rval)