samueldmq/infosystem

View on GitHub
infosystem/request.py

Summary

Maintainability
C
1 day
Test Coverage
import flask
import uuid

from infosystem.common import exception


class Request(flask.Request):

    # TODO(samueldmq): find a better place to put this utility method
    def _check_uuid4(self, uuid_str):
        if len(uuid_str) != 32:
            return False

        try:
            return uuid.UUID(uuid_str, version=4)
        except ValueError:
            return False

    @property
    def method(self):
        return self.environ['REQUEST_METHOD']

    @property
    def url(self):
        path_info = flask.request.environ['PATH_INFO'].rstrip('/')
        path_bits = [
            '<id>' if self._check_uuid4(i) else i for i in path_info.split('/')
        ]
        return '/'.join(path_bits)

    @property
    def token(self):
        return flask.request.headers.get('token')


class RequestManager(object):

    def __init__(self, subsystems):
        self.subsystems = subsystems

    def before_request(self):
        if flask.request.method == 'OPTIONS':
            return

        # Short-circuit if accessing the root URL,
        # which will just return the version
        # TODO(samueldmq): Do we need to create a subsystem just for this ?
        if not flask.request.url:
            return

        routes = self.subsystems['routes'].manager.list(
            url=flask.request.url, method=flask.request.method)
        if not routes:
            return flask.Response(response=None, status=404)
        route = routes[0]

        if route.sysadmin:
            # TODO(samueldmq): implement sysadmin logic
            return

        if route.bypass:
            # lookup the domain specified as domain_name in the request body
            data = flask.request.get_json()
            if data and data.get('domain_name'):
                domains = self.subsystems['domains'].manager.list(
                    name=data['domain_name'])
                if domains:
                    domain_id = domains[0].id

                    if not self.check_capability(route.id, domain_id):
                        return flask.Response(response=None, status=404)
                else:
                    return flask.Response(response=None, status=400)
            # TODO(samueldmq): THIS SHOULD BE RE-ENABLED,
            # DISABLED FOR NOW FOR /FOTOS
            # else:
            #     return flask.Response(response=None, status=400)
        else:
            token_id = flask.request.token

            if token_id:
                try:
                    token = self.subsystems['tokens'].manager.get(id=token_id)
                except exception.NotFound:
                    return flask.Response(response=None, status=401)

                user = self.subsystems['users'].manager.list(
                    id=token.user_id)[0]
                domain_id = user.domain_id

                if not self.check_capability(route.id, user.domain_id):
                    return flask.Response(response=None, status=404)

                # Now check the user grants
                grants = self.subsystems['grants'].manager.list(
                    user_id=user.id)
                user_role_ids = [g.role_id for g in grants]

                # TODO(samueldmq): sysadmin won't provide a domain,
                # so capabilities will be empty
                # treat this case here once we support sysadmin
                capability = self.subsystems['capabilities'].manager.list(
                    route_id=route.id, domain_id=domain_id)[0]

                policies = self.subsystems['policies'].manager.list(
                    capability_id=capability.id)

                # open capability
                if not policies:
                    return

                policy_role_ids = [p.role_id for p in policies]

                intersection = set(user_role_ids).intersection(policy_role_ids)
                if not intersection:
                    return flask.Response(response=None, status=401)
            else:
                return flask.Response(response=None, status=401)

    def check_capability(self, route_id, domain_id):
        capabilities = self.subsystems['capabilities'].manager.list(
            route_id=route_id, domain_id=domain_id)
        return capabilities