oesah/bexio-api-python-client

View on GitHub
bexiopy/api.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
from __future__ import absolute_import

try:
    # For Python 3.0 and later
    from urllib.parse import urlencode
except ImportError:
    # Fall back to Python 2's urllib2
    from urllib import urlencode

import datetime
import logging
import os
import requests
import time
import uuid
import json

from django.core.files.storage import default_storage

from .settings import get_setting
from .resources import contacts, general, invoices


# create logger
logger = logging.getLogger('Bexiopy')
logger.setLevel(logging.DEBUG)

# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# create formatter
formatter = logging.Formatter(
    '\n%(asctime)s - %(levelname)s - %(name)s - %(message)s\n'
)

# add formatter to ch
ch.setFormatter(formatter)

# add ch to logger
logger.addHandler(ch)


class OAuth2(object):
    """
    This class handles most of the authentication processes. You need to
    instantiate it, authenticate properly and everything else should be
    automatic.

    To see how to instantiate this class properly, take a look at
    :func:`~bexiopy.api.Client.get_oauth2_service`.

    If you want to create an :code:`access_token` with a :code:`code`,
    you can take a look at
    :func:`~bexiopy.api.Client.fetch_access_token_with_auth_code`.
    """
    def __init__(self, *args, **kwargs):
        self.grant_type = ''
        self.config = {
            'client_id':
                kwargs.get('client_id', ''),

            'client_secret':
                kwargs.get('client_secret', ''),

            'authorization_uri':
                kwargs.get('authorization_uri', ''),

            'refresh_token_credential_uri':
                kwargs.get('refresh_token_credential_uri', ''),

            'token_credential_uri':
                kwargs.get('token_credential_uri', ''),

            'redirect_uri':
                kwargs.get('redirect_uri', ''),

            'issuer':
                kwargs.get('issuer', ''),

            'code':
                kwargs.get('code', ''),

            'refresh_token':
                kwargs.get('refresh_token', ''),

            'username':
                kwargs.get('username', ''),

            'password':
                kwargs.get('password', ''),
        }
        self.set_grant_type(self.get_grant_type())

    def set_refresh_token(self, refresh_token):
        """
        Set :code:`refresh_token` in config and return :code:`None`.

        Args:
            refresh_token (str): the refreh_token from our token

        Returns:
            None: nothing to return
        """
        self.config['refresh_token'] = refresh_token

    def get_refresh_token(self):
        """
        Get and return :code:`refresh_token` from config.

        Returns:
            str: :code:`refresh_token`
        """
        return self.config['refresh_token']

    def get_client_id(self):
        """
        Get and return :code:`client_id` from config.

        Returns:
            str: :code:`client_id`
        """
        return self.config['client_id']

    def set_client_id(self, client_id):
        """
        Set :code:`client_id` in config and return :code:`None`.

        Args:
            client_id (str): the client_id from our token

        Returns:
            None: nothing to return
        """
        self.config['client_id'] = client_id

    def get_client_secret(self):
        """
        Get and return :code:`client_secret` from config.

        Returns:
            str: :code:`client_secret`
        """
        return self.config['client_secret']

    def set_client_secret(self, client_secret):
        """
        Set :code:`client_secret` in config and return :code:`None`.

        Args:
            client_secret (str): the client_secret from our token

        Returns:
            None: nothing to return
        """
        self.config['client_secret'] = client_secret

    def get_code(self):
        """
        Get and return :code:`code` from config.

        Returns:
            str: :code:`code`
        """
        return self.config['code']

    def set_code(self, code):
        """
        Set :code:`code` in config and return :code:`None`.

        Args:
            code (str): the code from our token

        Returns:
            None: nothing to return
        """
        self.config['code'] = code

    def get_redirect_uri(self):
        """
        Get and return :code:`redirect_uri` from config.

        Returns:
            str: :code:`redirect_uri`
        """
        return self.config['redirect_uri']

    def set_redirect_uri(self, redirect_uri):
        """
        Set :code:`redirect_uri` in config and return :code:`None`.

        Args:
            redirect_uri (str): the redirect_uri from our token

        Returns:
            None: nothing to return
        """
        self.config['redirect_uri'] = redirect_uri

    def get_grant_type(self):
        """
        Get and return :code:`grant_type` from config.

        If we want to refresh the token, we need to set :code:`grant_type`
        accordingly:

        .. code-block:: python

            auth = OAuth2().get_oauth2_service()

            # refresh token
            auth.set_grant_type('refresh_token')  # important!
            auth.set_refresh_token(refresh_token)

            credentials = auth.generate_credentials_request()

            # ToDO(oesah): more examples for all grant_types
            ...

        Returns:
            str: :code:`grant_type` or empty string
        """
        if self.grant_type:
            return self.grant_type

        if not self.config['code']:
            return 'authorization_code'

        elif not self.config['refresh_token']:
            return 'refresh_token'

        elif ((not self.config['username']) and (not self.config['password'])):
            return 'password'

        else:
            return ''

    def set_grant_type(self, grant_type):
        """
        Set :code:`grant_type` in config and return :code:`None`.

        Args:
            grant_type (str): the grant_type from our token

        Returns:
            None: nothing to return
        """
        self.grant_type = grant_type

    def generate_credentials_request(self):
        """
        Create the correct URL to authenticate or refresh an existing
        token.

        Returns:
            dict: :code:`access_token` that is created from our request
        """
        uri = self.get_token_credential_uri()
        grant_type = self.get_grant_type()
        params = {
            'grant_type': grant_type
        }

        if grant_type == 'authorization_code':
            params['code'] = self.get_code()
            params['redirect_uri'] = self.get_redirect_uri()
            self.add_client_credentials(params)

        elif grant_type == 'refresh_token':
            uri = self.get_refresh_token_credential_uri()
            params['refresh_token'] = self.get_refresh_token()
            self.add_client_credentials(params)

        headers = {
            'Cache-Control': 'no-store',
            'Content-Type': 'application/x-www-form-urlencoded'
        }

        response = requests.post(uri, data=params, headers=headers)
        return response.json()

    def get_token_credential_uri(self):
        """
        Get and return :code:`token_credential_uri` from config.

        Returns:
            str: :code:`token_credential_uri`
        """
        return self.config['token_credential_uri']

    def get_refresh_token_credential_uri(self):
        """
        Get and return :code:`refresh_token_credential_uri` from config.

        Returns:
            str: :code:`refresh_token_credential_uri`
        """
        return self.config['refresh_token_credential_uri']

    def add_client_credentials(self, params):
        """
        Get, append and return :code:`client_id` and :code:`client_secret`
        to params for requests to Bexio API.

        Args:
            params (dict): dictionary of parameters that is used for requests

        Returns:
            dict: modified :code:`params`
        """
        params['client_id'] = self.get_client_id()
        params['client_secret'] = self.get_client_secret()

        return params


class Client(object):
    """
    The client that is used to communicate with Bexio over the API. Once
    authentication has happened, the client will take over most of the
    work.

    Use the client to perform actions like refreshing the token, making
    a call to the API, get the authentication URL that is needed to create
    an access token, etc.

    Usage:

    .. code-block:: python

        >>> from bexiopy.api import Client

        >>> client = Client()
        >>> client.get_oauth2_auth_url()
        Out: https://office.bexio.com/oauth/authorize?client_secret=....

        >>> client.refresh_token()
        Out: {'access_token': '...', ...}

        >>> client.call('GET', 'salutation')
        Out: [{'id': 1, name': 'Herr'}, {'id': 2, name': 'Frau'}, ...]

    """
    def __init__(self, *args, **kwargs):
        # construct auth data
        self.API_URL = get_setting('BEXIO_API_URL')
        self.OAUTH2_AUTH_URL = get_setting('BEXIO_AUTH_URL')
        self.OAUTH2_TOKEN_URI = get_setting('BEXIO_TOKEN_URL')
        self.OAUTH2_REFRESH_TOKEN_URI = get_setting('BEXIO_TOKEN_REFRESH_URL')

        # construct client data
        self.auth = None
        self.config = {
            'client_id': get_setting('BEXIO_CLIENT_ID'),
            'client_secret': get_setting('BEXIO_CLIENT_SECRET'),
            'redirect_uri': get_setting('BEXIO_APPLICATION_REDIRECTION_URL'),
            'scope': get_setting('BEXIO_APPLICATION_SCOPES'),
            'state': uuid.uuid4()
        }
        # get access_token data from file, if exists
        self.load_access_token_from_file()

    def set_client_id(self, client_id):
        """
        Set :code:`client_id` in config and return :code:`None`.

        Args:
            client_id (str): the client_id from our token

        Returns:
            None: nothing to return
        """
        self.config['client_id'] = client_id

    def get_client_id(self):
        """
        Get and return :code:`client_id` from config.

        Returns:
            str: :code:`client_id`
        """
        return self.config['client_id']

    def set_client_secret(self, client_secret):
        """
        Set :code:`client_secret` in config and return :code:`None`.

        Args:
            client_secret (str): the client_secret from our token

        Returns:
            None: nothing to return
        """
        self.config['client_secret'] = client_secret

    def get_client_secret(self):
        """
        Get and return :code:`client_secret` from config.

        Returns:
            str: :code:`client_secret`
        """
        return self.config['client_secret']

    def set_redirect_uri(self, redirect_uri):
        """
        Set :code:`redirect_uri` in config and return :code:`None`.

        Args:
            redirect_uri (str): the redirect_uri from our token

        Returns:
            None: nothing to return
        """
        self.config['redirect_uri'] = redirect_uri

    def get_redirect_uri(self):
        """
        Get and return :code:`redirect_uri` from config.

        Returns:
            str: :code:`redirect_uri`
        """
        return self.config['redirect_uri']

    def get_org(self):
        """
        Get and return :code:`org` from config.

        Returns:
            str: :code:`org`
        """
        return self.access_token['org']

    def file_put_contents(self, access_token):
        """
        Write response data into a shelve file, which is basically a locally
        stored dictionary.

        Args:
            access_token (dict): access token dict we receive from bexio

        Returns:
            None: nothing to return
        """
        d = default_storage.open(get_setting('BEXIO_CREDENTIALS_FILENAME'), 'w')

        # write json to file
        json.dump(access_token, d, indent=4, sort_keys=True, default=str)

        # closing is important!
        d.close()

    def set_access_token(self, access_token):
        """
        Set :code:`client_secret` in config and return :code:`None`.

        Args:
            client_secret (str): the client_secret from our token

        Returns:
            None: nothing to return

        Raises:
            ValueError: if :code:`access_token` is not valid dict or
                missing data
        """
        if not isinstance(access_token, dict):
            raise ValueError('Invalid token format: Need dict instead of '
                             '%s' % type(access_token))

        if access_token and (not access_token['access_token']):
            raise ValueError('Invalid token: Missing "access_token" in '
                             '"self.access_token".')

        self.access_token = access_token
        self.file_put_contents(access_token)

    def get_access_token(self):
        """
        Get and return :code:`access_token` from config.

        Returns:
            str: :code:`access_token`
        """
        return self.access_token['access_token']

    def is_access_token_expired(self):
        """
        Return :code:`True`, if :code:`access_token` expired else
        :code:`False` depending on the :code:`created` and
        :code:`expires_in` dates.

        Returns:
            bool: True or False depending on date
        """
        if not self.access_token:
            return True

        created = 0
        expires_in = 0

        if self.access_token['created']:
            created = datetime.datetime.strptime(self.access_token['created'], "%Y-%m-%d %H:%M:%S.%f")

        if self.access_token['expires_in']:
            expires_in = self.access_token['expires_in']

        # created + ~4 hours
        diff = created + datetime.timedelta(seconds=expires_in - 30)
        # are 4 hours passed since creation date?
        expired = diff < datetime.datetime.now()
        if expired:
            logger.warning("\n\nTOKEN EXPIRED!\n\n")
        return expired

    def get_refresh_token(self):
        """
        Return the refresh_token that was saved in our local file during
        authentication.

        Returns:
            str: :code:`refresh_token` from this class
        """
        if self.access_token and self.access_token['refresh_token']:
            return self.access_token['refresh_token']

    def fetch_access_token_with_auth_code(self, code):
        """
        Get a code and create an `access_token` from that code.

        Args:
            code (str): the code that we receive in our URL once authenticated

        Returns:
            dict: :code:`access_token` data
        """
        if not code:
            raise Exception("Invalid code")

        auth = self.get_oauth2_service()
        auth.set_code(code)
        auth.set_redirect_uri(self.get_redirect_uri())

        credentials = auth.generate_credentials_request()
        if credentials and credentials['access_token']:
            credentials['created'] = datetime.datetime.now()
            self.set_access_token(credentials)

        return credentials

    def get_oauth2_service(self):
        """
        Instantiate the OAuth2 service to be used for authentication.

        Returns:
            class: :class:`~bexiopy.api.OAuth2` instance
        """
        if not self.auth:
            self.auth = OAuth2(**{
                'client_id': self.get_client_id(),
                'client_secret': self.get_client_secret(),
                'authorization_uri': self.OAUTH2_AUTH_URL,
                'token_credential_uri': self.OAUTH2_TOKEN_URI,
                'refresh_token_credential_uri': self.OAUTH2_REFRESH_TOKEN_URI,
                'refresh_token': self.get_refresh_token(),
                'redirect_uri': self.get_redirect_uri(),
                'issuer': self.config['client_id']
            })
        return self.auth

    def get_oauth2_auth_url(self):
        """
        Return the URL for authentication with Bexio.

        Returns:
            str: URL for verifying the account
        """
        params = {
            'client_id': self.config['client_id'],
            'client_secret': self.config['client_secret'],
            'redirect_uri': self.config['redirect_uri'],
            'scope': self.config['scope'],
            'state': self.config['state']
        }
        redirect_to = self.OAUTH2_AUTH_URL + '?' + urlencode(params)
        return redirect_to

    def get_request_headers(self):
        """
        Return the headers needed to make a regular request, given we
        are already authenticated.

        Returns:
            dict: map of request headers
        """
        return {
            'Accept': 'application/json',
            'Authorization': 'Bearer ' + self.get_access_token()
        }

    def load_access_token_from_file(self):
        """
        Open and return the dictionary of the shelve file.

        Returns:
            dict: :code:`access_token` loaded from file
        """
        token_file = get_setting('BEXIO_CREDENTIALS_FILENAME')
        if default_storage.exists(token_file):
            with default_storage.open(token_file, 'r') as access_token:
                token = json.load(access_token)
            access_token.close()
            self.access_token = token
        else:
            # let user know, he needs to authentiate with bexio
            logger.warning(
                "You need to authorize this app with your "
                "bexio instance. Open the following link and "
                "you will be redirected back.\n\n%s" %
                self.get_oauth2_auth_url())
            self.access_token = {}
        return self.access_token

    def refresh_token(self, refresh_token=''):
        """
        Get an (optional) `refresh_token` and create a new 'access_token'
        from that.

        .. note::
            It's imporant to set the :code:`grant_type` to
            **refresh_token** with
            :code:`OAuth2().set_grant_type('refresh_token')`.

        Returns:
            dict: :code:`access_token` data on success

        Raises:
            ValueError: if no :code:`refresh_token` can be found or the
                token could not be processed correctly.
        """
        logger.info("refreshing token...\n")
        if not refresh_token:
            if not self.access_token['refresh_token']:
                raise ValueError('Refresh token must be passed or set as part '
                                 'of the access_token')

            refresh_token = self.access_token['refresh_token']

        auth = self.get_oauth2_service()
        auth.set_grant_type('refresh_token')  # important!
        auth.set_refresh_token(refresh_token)
        credentials = auth.generate_credentials_request()

        # wait for bexio request to finish
        logger.info("Waiting for token from Bexio...")
        time.sleep(2)

        # set token once we received it
        if credentials and ('error' not in credentials):
            credentials['created'] = datetime.datetime.now()
            if not credentials['refresh_token']:
                credentials['refresh_token'] = refresh_token
            self.set_access_token(credentials)
            logger.info("\nSuccessfully refreshed token...\n")
            return credentials
        else:
            logger.error(
                'Illegal access token received when token was refreshed!')

    def call(self, method, path, data={}):
        """
        Get `method`, `path` and optionally `data`, make a request
        to the Bexio API and return the resonse as `json`.

        Usage:

        .. code-block:: python

            call('POST', 'salutation', {'param1': 'test'})
            call('GET', 'salutation')

        Args:
            method (str): The request method to use ('GET', 'POST', etc.)
            path (str): The endpoint for the API call
            data (dict): Data for 'POST' and other requests

        Returns:
            dict: The response of the request (:code:`response.json()`)
        """
        self.load_access_token_from_file()
        if not self.access_token:
            if not self.access_token:
                print(
                    'You must authenticate with Bexio. Open the '
                    'following URL and authenticate: \n\n %s' %
                    self.get_oauth2_auth_url())
                return

        if self.is_access_token_expired():
            self.refresh_token()

        kwargs = {
            'headers': self.get_request_headers()
        }

        if data:
            if isinstance(data, (dict, list)):
                import json
                data = json.dumps(data)
            kwargs.update({
                'data': data
            })
        url = self.API_URL + '/' + self.get_org() + '/' + path
        logger.info('[%s] %s' % (method.upper(), url))
        response = getattr(requests, method.lower())(url, **kwargs)
        response_json = response.json()
        if 'error_code' in response_json:
            logger.error(response_json)
        return response_json


class Bexiopy(object):
    """
    The class to be used to query the Bexio API. Each resource is available
    via this class.

    See API Resources section for available queries.

    Usage:

    .. code-block:: python

        from bexiopy.api import Bexiopy

        bexio = Bexiopy()

        # get all contacts
        contacts = bexio.contacts.all()

        # get a specific invoice
        invoice = bexio.invoices.get(2)
    """
    contacts = contacts.ContactsResource()
    invoices = invoices.InvoicesResource()
    general = general.GeneralResource()