kuasha/cosmos

View on GitHub
cosmos/service/auth.py

Summary

Maintainability
D
2 days
Test Coverage
"""
 Copyright (C) 2014 Maruf Maniruzzaman
 Website: http://cosmosframework.com
 Author: Maruf Maniruzzaman
 License :: OSI Approved :: MIT License
"""
from builtins import bytes

from tornado.template import Template

from cosmos.rbac.object import AccessType, COSMOS_USERS_OBJECT_NAME, SYSTEM_USER, COSMOS_USERS_IDENTITY_OBJECT_NAME

__author__ = 'Maruf Maniruzzaman'

import tornado.web
from tornado import gen
from cosmos.service.requesthandler import *
import json
from cosmos.service.constants import *
import tornado.auth
from tornado.concurrent import return_future

try:
    import urlparse  # py2
except ImportError:
    import urllib.parse as urlparse  # py3

try:
    import urllib.parse as urllib_parse  # py3
except ImportError:
    import urllib as urllib_parse  # py2


LOGIN_PAGE_TEMPLATE = """
<!DOCTYPE html>
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <title>Login</title>
    <meta name="description" content="">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <link rel="stylesheet" href="http://ajax.googleapis.com/ajax/libs/angular_material/1.0.0/angular-material.min.css">
    <style>
        html,
        body {
            height: 100%;
        }
        html {
            display: table;
            margin: auto;
        }
        body {
            display: table-cell;
            vertical-align: middle;
        }
    </style>
</head>
<body ng-app="loginApp" ng-cloak>
<form action="." method="POST">
    <md-content class="md-padding" layout-xs="column" layout="row">
    <div layout="column">
      <md-card>
        <md-card-title>
          <md-card-title-text>
            <span class="md-headline">Login</span>
          </md-card-title-text>
          <md-card-title-media>
          </md-card-title-media>
        </md-card-title>
        <md-card-content>
            {% if message %}
            <span>{{message}}</span>
            {% end %}
            <input type="hidden" name="next" value="{{ next }}" />
            <md-input-container class="md-block">
                    <label>Username</label>
                    <input required id="username" type="text" name="username" />
            </md-input-container>
            <md-input-container class="md-block">
                    <label>Password</label>
                    <input required type="password" id="password" name="password" />
            </md-input-container>
        </md-card-content>
        <md-card-actions layout="row" layout-align="end center">
          <input id="loginbtn" type="submit" value="Login" class="md-button md-raised md-primary"></input>
        </md-card-actions>
      </md-card>
    <md-content>
    </form>
    <script src="http://ajax.googleapis.com/ajax/libs/angularjs/1.4.8/angular.min.js"></script>
    <script src="http://ajax.googleapis.com/ajax/libs/angularjs/1.4.8/angular-animate.min.js"></script>
    <script src="http://ajax.googleapis.com/ajax/libs/angularjs/1.4.8/angular-aria.min.js"></script>
    <script src="http://ajax.googleapis.com/ajax/libs/angularjs/1.4.8/angular-messages.min.js"></script>

    <!-- Angular Material Library -->
    <script src="http://ajax.googleapis.com/ajax/libs/angular_material/1.0.0/angular-material.min.js"></script>

    <!-- Your application bootstrap  -->
    <script type="text/javascript">
        angular.module('loginApp', ['ngMaterial']);
    </script>
</body>
</html>
"""

hmac_key = None

class LogoutHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        self.logout_current_user()
        self.redirect('/')


def get_jwt_payload_json(id_token):
    jwt_parts = str.split(str(id_token), '.')
    pad = 4 - len(jwt_parts[1]) % 4
    pads=["", "=", "==", "===", ""]
    id_values = base64.b64decode(jwt_parts[1]+pads[pad])

    return json.loads(id_values.decode("utf-8"))


class CosmosAuthHandler(RequestHandler):
    @gen.coroutine
    def _authenticate_user(self, user_identity):
        self.current_identity = user_identity

        identity_type = user_identity.get("identity_type", None)

        #TODO: define methods and override in subclasses so we do not require these ifs and in functions below
        if identity_type == IDENTITY_TYPE_FB_GRAPH:
            fb_userid = user_identity["id"]
            query={"id": fb_userid, "identity_type": IDENTITY_TYPE_FB_GRAPH}
            columns = ["identity_type", "id", "user_id"]
        elif identity_type == IDENTITY_TYPE_GOOGLE_OAUTH2:
            id_token = user_identity["id_token"]
            id_values = get_jwt_payload_json(id_token)
            sub = id_values.get("sub")
            query = {"sub": sub, "identity_type": IDENTITY_TYPE_GOOGLE_OAUTH2}
            columns = ["identity_type", "sub", "user_id"]
        elif identity_type == IDENTITY_TYPE_GITHUB_OAUTH2:
            gh_userid = user_identity["id"]
            query={"id": gh_userid, "identity_type": IDENTITY_TYPE_GITHUB_OAUTH2}
            columns = ["identity_type", "id", "user_id"]
        else:
            claimed_id = user_identity.get("claimed_id")
            query = {"claimed_id":claimed_id}
            columns = ["identity_type", "claimed_id", "user_id"]

        object_service = self.settings['object_service']

        cursor = object_service.find(SYSTEM_USER, COSMOS_USERS_IDENTITY_OBJECT_NAME, query, columns)

        if (yield cursor.fetch_next):
            stored_user_id = cursor.next_object()
            user = yield object_service.load(SYSTEM_USER, COSMOS_USERS_OBJECT_NAME, str(stored_user_id["user_id"]), [])

            if not user:
                raise tornado.web.HTTPError(500, "User not found for previously stored identity of this user")

            self.set_current_user(user)
        else:
            create_result = yield self._create_user(user_identity)

            cursor = object_service.find(SYSTEM_USER, COSMOS_USERS_OBJECT_NAME, {"_id": create_result}, [])
            if (yield cursor.fetch_next):
                user = cursor.next_object()
            else:
                raise tornado.web.HTTPError(500, "Could not find created user")

            result = yield self._create_user_identity(str(user["_id"]), user_identity)

            self.set_current_user(user)

    def _create_user(self, user_identity):
        identity_type = user_identity.get("identity_type", None)

        if identity_type == IDENTITY_TYPE_FB_GRAPH:
            fb_userid = user_identity["id"]
            data = {"fb_userid": fb_userid, "roles": []}
        elif identity_type == IDENTITY_TYPE_GOOGLE_OAUTH2:
            id_token = user_identity["id_token"]
            id_values = get_jwt_payload_json(id_token)
            email = id_values.get("email")
            data = {"email": email, "roles": []}
        else:
            email = user_identity.get("email")
            data = {"email": email, "roles": []}

        object_service = self.settings['object_service']
        return object_service.save(SYSTEM_USER, COSMOS_USERS_OBJECT_NAME, data)

    def _create_user_identity(self, user_id, identity):
        data = identity
        data["user_id"] = user_id

        identity_type = identity.get("identity_type", None)

        if identity_type == IDENTITY_TYPE_GOOGLE_OAUTH2:
            id_token = identity["id_token"]
            id_values = get_jwt_payload_json(id_token)
            data.update(id_values)

        object_service = self.settings['object_service']
        return object_service.save(SYSTEM_USER, COSMOS_USERS_IDENTITY_OBJECT_NAME, data)


class OpenidLoginHandler(CosmosAuthHandler, tornado.auth.OpenIdMixin):
    @return_future
    def authorize_redirect(self, oauth_scope, callback_uri=None,
                           ax_attrs=["name", "email", "language", "username"],
                           callback=None):

        callback_uri = callback_uri or self.request.uri
        args = self._openid_args(callback_uri, ax_attrs=ax_attrs,
                                 oauth_scope=oauth_scope)
        self.redirect(self._OPENID_ENDPOINT + "?" + urllib_parse.urlencode(args))
        callback()

    @tornado.web.asynchronous
    @gen.coroutine
    def get(self):
        #TODO: Evaluate security risk of accepting openid.op_endpoint value of remote site response
        self._OPENID_ENDPOINT = self.get_argument("openid.op_endpoint", None)
        if not self._OPENID_ENDPOINT:
            raise tornado.web.HTTPError(400, "Endpoint not found")

        if self.get_argument("openid.mode", None):
            user = yield self.get_authenticated_user()
            if not user:
                raise tornado.web.HTTPError(500, "Openid auth failed")

            assert isinstance(user, dict)

            user['identity_type'] = IDENTITY_TYPE_OPEN_ID

            yield self._authenticate_user(user)
            self.redirect(self.get_argument("next", '/'))
        else:
            callback_uri = self.request.uri
            url = urlparse.urljoin(self.request.full_url(), callback_uri)
            urls = url  # url.replace('http','https')
            yield self.authenticate_redirect(callback_uri=urls)


class GoogleOAuth2LoginHandler(CosmosAuthHandler, tornado.auth.GoogleOAuth2Mixin):
    @tornado.web.asynchronous
    @gen.coroutine
    def get(self):

        #callback_uri = self.request.uri
        #url = urlparse.urljoin(self.request.full_url(), callback_uri)
        #urls = url.replace('http','https')
        urls = self.settings[self._OAUTH_SETTINGS_KEY]['redirect_uri']

        if self.get_argument('code', False):
            # The url must match what was used during authorize phase.
            # It can not accept if there is any mismatch including parameters
            user = yield self.get_authenticated_user(redirect_uri=urls, code=self.get_argument('code'))

            if not user:
                raise tornado.web.HTTPError(500, "Google auth failed")

            user["identity_type"] = IDENTITY_TYPE_GOOGLE_OAUTH2
            yield self._authenticate_user(user)
            self.redirect(self.get_argument("next", '/'))
        else:
            yield self.authorize_redirect(
                redirect_uri=urls,
                client_id=self.settings[self._OAUTH_SETTINGS_KEY]['key'],
                client_secret=self.settings[self._OAUTH_SETTINGS_KEY]['secret'],
                scope=['profile', 'email'],
                response_type='code',
                extra_params={'approval_prompt': 'auto'})

class GithubOAuth2LoginHandler(CosmosAuthHandler, tornado.auth.OAuth2Mixin):
    _OAUTH_AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
    _OAUTH_ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token"
    _OAUTH_SETTINGS_KEY = 'github_oauth'

    @gen.coroutine
    def get_authenticated_user(self, redirect_uri, code, callback=None):
        body = urllib_parse.urlencode({
            "redirect_uri": redirect_uri,
            "code": code,
            "client_id": self.settings[self._OAUTH_SETTINGS_KEY]['client_id'],
            "client_secret": self.settings[self._OAUTH_SETTINGS_KEY]['secret'],
            "grant_type": "authorization_code",
        })
        http = tornado.httpclient.AsyncHTTPClient()

        response = yield http.fetch(self._OAUTH_ACCESS_TOKEN_URL,
                   method="POST", headers={'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}, body=body)

        if response.error:
            tornado.AuthError('Github auth error: %s' % str(response))

        user_token = escape.json_decode(response.body)
        url = "https://api.github.com/user?access_token="+user_token.get("access_token")
        response = yield http.fetch(url,
                   method="GET", headers={"User-Agent": "Cosmos", 'Accept': 'application/json'})

        result = escape.json_decode(response.body)
        result.update(user_token)

        raise gen.Return(result)

    @tornado.web.asynchronous
    @gen.coroutine
    def get(self):

        #callback_uri = self.request.uri
        #url = urlparse.urljoin(self.request.full_url(), callback_uri)
        #urls = url.replace('http','https')
        urls = self.settings[self._OAUTH_SETTINGS_KEY]['redirect_uri']

        if self.get_argument('code', False):
            # The url must match what was used during authorize phase.
            # It can not accept if there is any mismatch including parameters
            user = yield self.get_authenticated_user(redirect_uri=urls, code=self.get_argument('code'))

            if not user:
                raise tornado.web.HTTPError(500, "Github auth failed")

            user["identity_type"] = IDENTITY_TYPE_GITHUB_OAUTH2
            yield self._authenticate_user(user)
            self.redirect(self.get_argument("next", '/'))
        else:
            yield self.authorize_redirect(
                redirect_uri=urls,
                client_id=self.settings[self._OAUTH_SETTINGS_KEY]['client_id'],
                client_secret=self.settings[self._OAUTH_SETTINGS_KEY]['secret'],
                scope=['user'],
                response_type='code',
                extra_params={'approval_prompt': 'auto'})

def add_params(url, params):
    url_parts = list(urlparse.urlparse(url))
    query = dict(urlparse.parse_qsl(url_parts[4]))
    query.update(params)
    url_parts[4] = urllib_parse.urlencode(query)
    return urlparse.urlunparse(url_parts)

class FacebookGraphLoginHandler(CosmosAuthHandler, tornado.auth.FacebookGraphMixin):
    @tornado.web.asynchronous
    @gen.coroutine
    def get(self):
        default_next = self.settings.get("default_login_next_uri", "/")
        nextpg = self.get_argument("next", default_next)
        params = {'next': nextpg}
        callback_uri = self.request.uri

        redirect_uri_base = self.settings.get("facebook_redirect_uri", self.request.full_url())

        url = urlparse.urljoin(redirect_uri_base, callback_uri)

        redirect_uri = add_params(url, params)

        redirect_uris = redirect_uri #redirect_uri.replace('http','https')

        if self.get_argument("code", False):
            user = yield self.get_authenticated_user(
                                        redirect_uri= redirect_uris,
                                        client_id=self.settings["facebook_api_key"],
                                        client_secret=self.settings["facebook_secret"],
                                        code=self.get_argument("code")
                                        )
            if user:
                user['identity_type'] = IDENTITY_TYPE_FB_GRAPH
                yield self._authenticate_user(user)
                self.redirect(self.get_argument("next", '/'))
            else:
                raise tornado.web.HTTPError(500, "Facebook auth failed")


        else:
            yield self.authorize_redirect(redirect_uri=redirect_uri,
                              client_id= self.settings["facebook_api_key"],
                              extra_params={"scope": self.settings["facebook_scope"]})


class BasicLoginHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        next = self.get_argument("next", '/')
        self._show_login_window(next)

    def _show_login_window(self, next="/", message=None, login_template=None):
        if not login_template:
            login_template = self.settings.get('login_template', LOGIN_PAGE_TEMPLATE)
        t = Template(login_template)
        html = t.generate(next=next, message=message)
        self.write(html)
        self.finish()

    @gen.coroutine
    def post(self):
        username = self.get_argument("username", default=None)
        password = self.get_argument("password", default=None)
        redirect_url = self.get_argument("next", default='/')

        if not username or len(username) < 1:
            body = self.request.body
            if type(body) is bytes:
                body = body.decode('utf-8')
            if body and len(body) > 32:
                data = json.loads(body)
                assert isinstance(data, dict)
                username = data.get("username")
                password = data.get("password")
                redirect_url = data.get("next", "/")

        if not username or not password:
            self._show_login_window(message="Missing required values.")
            return

        object_name = COSMOS_USERS_OBJECT_NAME
        obj_serv = self.settings['object_service']

        columns = ["username", "password", "roles"]
        query = {"username": username}

        cursor = obj_serv.find(SYSTEM_USER, object_name, query, columns)

        found = yield cursor.fetch_next
        if not found:
            raise tornado.web.HTTPError(401, "Unauthorized")
        else:
            user = cursor.next_object()
            loaded_password_hash = user.get(PASSWORD_COLUMN_NAME)
            hmac_key= self.settings["hmac_key"]
            validate_password(password, loaded_password_hash, hmac_key)
            del user["password"]
            self.set_current_user(user)
            self.redirect(redirect_url)


def validate_password(password, saved_password_hash, hmac_key):
    hmac_password = get_hmac_password(password, hmac_key)
    try:
        # FIXME: WARNING possible attack: use hmac.compare_digest(a, b) instead (https://docs.python.org/2/library/hmac.html)
        if not saved_password_hash == hmac_password:
            raise tornado.web.HTTPError(401, "Unauthorized")
    except:
        raise tornado.web.HTTPError(401, "Unauthorized")

def get_hmac_password(password, hmac_key):
    hmac_hex = hmac.new(hmac_key.encode("utf-8"), password.encode("utf-8")).hexdigest()
    hmac_password = "{0}{1}".format(PASSWORD_HMAC_SIGNATURE, hmac_hex)
    return hmac_password

@gen.coroutine
def before_user_insert(object_service, object_name, data, access_type):
    assert isinstance(hmac_key, object)
    assert object_name == COSMOS_USERS_OBJECT_NAME
    assert isinstance(data, dict)
    assert access_type == AccessType.INSERT or access_type == AccessType.UPDATE

    username = data.get(USERNAME_COLUMN_NAME)

    if username and len(username) > 0:
        username = username.lower()

        data["username"] = username
        query = {"username": username}
        columns=["username"]
        cursor = object_service.find(SYSTEM_USER, COSMOS_USERS_OBJECT_NAME, query, columns)

        if(yield cursor.fetch_next):
            user = cursor.next_object()
            if user:
                raise tornado.web.HTTPError(409, "Conflict: Duplicate username")

    password = data.get(PASSWORD_COLUMN_NAME)

    if not password:
        return

    if password.find(PASSWORD_HMAC_SIGNATURE) > 0:
        return

    hmac_password = get_hmac_password(password, hmac_key)

    data[PASSWORD_COLUMN_NAME] = hmac_password