api/validators.py
import copy
import json
import jsonschema
import os
from . import config
from .web.errors import DBValidationException, InputValidationException
log = config.log
def validate_data(data, schema_json, schema_type, verb, optional=False):
"""
Convenience method to validate a JSON schema against some action.
If optional is set, validate_data won't complain about null data.
"""
if optional and data is None:
return
suri = schema_uri(schema_type, schema_json)
validator = from_schema_path(suri)
validator(data, verb)
def _validate_json(json_data, schema, resolver):
jsonschema.validate(json_data, schema, resolver=resolver, format_checker=jsonschema.FormatChecker())
def _resolve_schema(schema_file_uri):
with open(schema_file_uri) as schema_file:
base_uri = os.path.dirname(schema_file_uri)
schema = json.load(schema_file)
resolver = jsonschema.RefResolver('file://'+base_uri+'/', schema)
return (schema, resolver)
def no_op(g, *args): # pylint: disable=unused-argument
return g
def schema_uri(type_, schema_name):
return os.path.join(
config.schema_path,
type_,
schema_name
)
def decorator_from_schema_path(schema_url):
if schema_url is None:
return no_op
schema, resolver = _resolve_schema(schema_url)
def g(exec_op):
def validator(method, **kwargs):
payload = kwargs['payload']
if method == 'PUT' and schema.get('required'):
_schema = copy.copy(schema)
_schema.pop('required')
else:
_schema = schema
if method in ['POST', 'PUT']:
try:
_validate_json(payload, _schema, resolver)
except jsonschema.ValidationError as e:
raise DBValidationException(str(e))
return exec_op(method, **kwargs)
return validator
return g
def from_schema_path(schema_url):
if schema_url is None:
return no_op
# split the url in base_uri and schema_name
schema, resolver = _resolve_schema(schema_url)
def g(payload, method):
if method == 'PUT' and schema.get('required'):
_schema = copy.copy(schema)
_schema.pop('required')
else:
_schema = schema
if method in ['POST', 'PUT']:
try:
_validate_json(payload, _schema, resolver)
except jsonschema.ValidationError as e:
raise InputValidationException(str(e))
return g
def key_check(schema_url):
"""
for sublists of mongo container there is no automatic key check when creating, updating or deleting an object.
We are adding a custom array field to the json schemas ("key_fields").
The uniqueness is checked on the combination of all the elements of "key_fields".
For an example check api/schemas/input/permission.json:
The key fields are the _id. Uniqueness is checked on the combination
of the values of the _id of the permissions.
So this method ensures that:
1. after a POST and PUT request we don't have two items with the same values for the key set
2. a GET will retrieve a single item
3. a DELETE (most importantly) will delete a single item
"""
if schema_url is None:
return no_op
schema, _ = _resolve_schema(schema_url)
if schema.get('key_fields') is None:
return no_op
def g(exec_op):
def f(method, _id, query_params = None, payload = None, exclude_params=None):
if method == 'POST':
try:
exclude_params = _post_exclude_params(schema.get('key_fields', []), payload)
except KeyError as e:
raise InputValidationException('missing key {} in payload'.format(e.args))
else:
_check_query_params(schema.get('key_fields'), query_params)
if method == 'PUT' and schema.get('key_fields'):
exclude_params = _put_exclude_params(schema['key_fields'], query_params, payload)
return exec_op(method, _id=_id, query_params=query_params, payload=payload, exclude_params=exclude_params)
return f
return g
def _put_exclude_params(keys, query_params, payload):
exclude_params = None
_eqp = {}
for k in keys:
value_p = payload.get(k)
if value_p and value_p != query_params.get(k):
_eqp[k] = value_p
exclude_params = _eqp
else:
_eqp[k] = query_params.get(k)
return exclude_params
def _post_exclude_params(keys, payload):
return {
k: payload[k] for k in keys
}
def _check_query_params(keys, query_params):
assert set(keys) == set(query_params.keys()), """
{}
is different from expected:
{}
""".format(query_params.keys(), keys)
def verify_payload_exists(handler_method):
"""
A decorator to ensure the json payload exists and is not empty
Useful for POST and PUT handler operations
"""
def verify_payload_dec(self, *args, **kwargs):
try:
if not self.request.json_body:
raise InputValidationException("Empty Payload")
except ValueError:
raise InputValidationException("Empty Payload")
return handler_method(self, *args, **kwargs)
return verify_payload_dec