scitran/core

View on GitHub
api/auth/apikeys.py

Summary

Maintainability
A
0 mins
Test Coverage
import bson
import datetime

from .. import config, util
from ..web.errors import APIAuthProviderException

log = config.log

class APIKey(object):
    """
    Abstract API key class
    """

    @staticmethod
    def _preprocess_key(key):
        """
        Convention for API keys is that they can have arbitrary information, separated by a :,
        before the actual key. Generally, this will have a connection string in it.
        Strip this preamble, if any, before processing the key.
        """

        return key.split(":")[-1] # Get the last segment of the string after any : separators

    @staticmethod
    def validate(key):
        """
        AuthN for user accounts via api key.

        401s via APIAuthProviderException on failure.
        """
        key = APIKey._preprocess_key(key)

        timestamp = datetime.datetime.utcnow()
        api_key = config.db.apikeys.find_one_and_update({'_id': key}, {'$set': {'last_used': timestamp}})

        if api_key:

            # Some api keys may have additional requirements that must be met
            try:
                APIKeyTypes[api_key['type']].check(api_key)
            except KeyError:
                log.warning('Unknown API key type ({})'.format(api_key.get('type')))
                APIAuthProviderException('Invalid API key')

            return api_key

        else:
            raise APIAuthProviderException('Invalid API key')

    @staticmethod
    def generate_api_key(key_type):
        return {
            '_id': util.create_nonce(),
            'created': datetime.datetime.utcnow(),
            'type': key_type,
            'last_used': None
        }


class UserApiKey(APIKey):

    key_type = 'user'

    @classmethod
    def generate(cls, uid):
        """
        Generates API key for user, replaces existing API key if exists
        """
        api_key = cls.generate_api_key(cls.key_type)
        api_key['uid'] = uid
        config.db.apikeys.delete_many({'uid': uid, 'type': cls.key_type})
        config.db.apikeys.insert_one(api_key)
        return api_key['_id']

    @classmethod
    def get(cls, uid):
        return config.db.apikeys.find_one({'uid': uid, 'type': cls.key_type})

    @classmethod
    def check(cls, api_key):
        pass

class JobApiKey(APIKey):
    """
    API key that grants API access as a specified user during execution of a job
    Job must be in 'running' state to user API key
    """

    key_type = 'job'

    @classmethod
    def generate(cls, uid, job_id):
        """
        Returns an API key for user for use by a specific job.
        Re-uses such a key if it already exists.
        """

        job_id = str(job_id)

        existing_key = config.db.apikeys.find_one({
            'uid': uid,
            'job': job_id,
        })

        if existing_key is not None:
            return existing_key['_id']

        else:
            api_key = cls.generate_api_key(cls.key_type)
            api_key['uid'] = uid
            api_key['job'] = job_id

            config.db.apikeys.insert_one(api_key)
            return api_key['_id']

    @classmethod
    def remove(cls, job_id):
        config.db.apikeys.delete_many({'type': cls.key_type, 'job': str(job_id)})

    @classmethod
    def check(cls, api_key):
        job_id = api_key['job']
        if config.db.jobs.count({'_id': bson.ObjectId(job_id), 'state': 'running'}) != 1:
            raise APIAuthProviderException('Use of API key requires job to be in progress')


APIKeyTypes = {
    'user'        : UserApiKey,
    'job'          : JobApiKey
}