zalando/connexion

View on GitHub
connexion/operations/secure.py

Summary

Maintainability
D
2 days
Test Coverage
import functools
import logging

from ..decorators.decorator import RequestResponseDecorator

logger = logging.getLogger("connexion.operations.secure")

DEFAULT_MIMETYPE = 'application/json'


class SecureOperation(object):

    def __init__(self, api, security, security_schemes):
        """
        :param security: list of security rules the application uses by default
        :type security: list
        :param security_definitions: `Security Definitions Object
            <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#security-definitions-object>`_
        :type security_definitions: dict
        """
        self._api = api
        self._security = security
        self._security_schemes = security_schemes

    @property
    def api(self):
        return self._api

    @property
    def security(self):
        return self._security

    @property
    def security_schemes(self):
        return self._security_schemes

    @property
    def security_decorator(self):
        """
        Gets the security decorator for operation

        From Swagger Specification:

        **Security Definitions Object**

        A declaration of the security schemes available to be used in the specification.

        This does not enforce the security schemes on the operations and only serves to provide the relevant details
        for each scheme.


        **Operation Object -> security**

        A declaration of which security schemes are applied for this operation. The list of values describes alternative
        security schemes that can be used (that is, there is a logical OR between the security requirements).
        This definition overrides any declared top-level security. To remove a top-level security declaration,
        an empty array can be used.


        **Security Requirement Object**

        Lists the required security schemes to execute this operation. The object can have multiple security schemes
        declared in it which are all required (that is, there is a logical AND between the schemes).

        The name used for each property **MUST** correspond to a security scheme declared in the Security Definitions.

        :rtype: types.FunctionType
        """
        logger.debug('... Security: %s', self.security, extra=vars(self))
        if not self.security:
            return self._api.security_handler_factory.security_passthrough

        auth_funcs = []
        required_scopes = None
        for security_req in self.security:
            if not security_req:
                auth_funcs.append(self._api.security_handler_factory.verify_none())
                continue
            elif len(security_req) > 1:
                logger.warning("... More than one security scheme in security requirement defined. "
                               "**DENYING ALL REQUESTS**", extra=vars(self))
                return self._api.security_handler_factory.security_deny

            scheme_name, scopes = next(iter(security_req.items()))
            security_scheme = self.security_schemes[scheme_name]

            if security_scheme['type'] == 'oauth2':
                required_scopes = scopes
                token_info_func = self._api.security_handler_factory.get_tokeninfo_func(security_scheme)
                scope_validate_func = self._api.security_handler_factory.get_scope_validate_func(security_scheme)
                if not token_info_func:
                    logger.warning("... x-tokenInfoFunc missing", extra=vars(self))
                    continue

                auth_funcs.append(self._api.security_handler_factory.verify_oauth(token_info_func, scope_validate_func))

            # Swagger 2.0
            elif security_scheme['type'] == 'basic':
                basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme)
                if not basic_info_func:
                    logger.warning("... x-basicInfoFunc missing", extra=vars(self))
                    continue

                auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func))

            # OpenAPI 3.0.0
            elif security_scheme['type'] == 'http':
                scheme = security_scheme['scheme'].lower()
                if scheme == 'basic':
                    basic_info_func = self._api.security_handler_factory.get_basicinfo_func(security_scheme)
                    if not basic_info_func:
                        logger.warning("... x-basicInfoFunc missing", extra=vars(self))
                        continue

                    auth_funcs.append(self._api.security_handler_factory.verify_basic(basic_info_func))
                elif scheme == 'bearer':
                    bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
                    if not bearer_info_func:
                        logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
                        continue
                    auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func))
                else:
                    logger.warning("... Unsupported http authorization scheme %s" % scheme, extra=vars(self))

            elif security_scheme['type'] == 'apiKey':
                scheme = security_scheme.get('x-authentication-scheme', '').lower()
                if scheme == 'bearer':
                    bearer_info_func = self._api.security_handler_factory.get_bearerinfo_func(security_scheme)
                    if not bearer_info_func:
                        logger.warning("... x-bearerInfoFunc missing", extra=vars(self))
                        continue
                    auth_funcs.append(self._api.security_handler_factory.verify_bearer(bearer_info_func))
                else:
                    apikey_info_func = self._api.security_handler_factory.get_apikeyinfo_func(security_scheme)
                    if not apikey_info_func:
                        logger.warning("... x-apikeyInfoFunc missing", extra=vars(self))
                        continue

                    auth_funcs.append(self._api.security_handler_factory.verify_api_key(
                        apikey_info_func, security_scheme['in'], security_scheme['name']
                        ))

            else:
                logger.warning("... Unsupported security scheme type %s" % security_scheme['type'], extra=vars(self))

        return functools.partial(self._api.security_handler_factory.verify_security, auth_funcs, required_scopes)

    def get_mimetype(self):
        return DEFAULT_MIMETYPE

    @property
    def _request_response_decorator(self):
        """
        Guarantees that instead of the internal representation of the
        operation handler response
        (connexion.lifecycle.ConnexionRequest) a framework specific
        object is returned.
        :rtype: types.FunctionType
        """
        return RequestResponseDecorator(self.api, self.get_mimetype())