rycus86/dockerhub-proxy

View on GitHub
src/api.py

Summary

Maintainability
C
1 day
Test Coverage
import re
import logging
from functools import wraps
from threading import Lock
from agithub.base import API, ConnectionProperties, Client

logging.basicConfig(format='%(asctime)s [%(levelname)s] %(module)s.%(funcName)s - %(message)s')
logger = logging.getLogger('dockerhub-proxy')


def _with_login(method):
    @wraps(method)
    def wrapper(self, *args, **kwargs):
        if not self.client.has_auth_token():
            self.login()

        response = method(self, *args, **kwargs)

        if not response:
            logger.warn('Failed to fetch response, possible authentication issue, retrying after login...')

            self.login()
            response = method(self, *args, **kwargs)

        return response

    return wrapper


class LoginFailedException(BaseException):
    def __init__(self, *args):
        super(LoginFailedException, self).__init__(*args)


class DockerHubClient(Client):
    def __init__(self, auth_token):
        super(DockerHubClient, self).__init__()
        self._auth_token = auth_token

    def has_auth_token(self):
        return self._auth_token is not None

    def set_auth_token(self, token):
        self._auth_token = token

    def request(self, method, url, body_data, headers):
        final_url = self._add_trailing_slash(url)
        final_headers = self._add_authorization_header(headers)

        status, response = super(DockerHubClient, self).request(method, final_url, body_data, final_headers)

        self._drop_auth_token_if_not_successful(status)

        return status, response

    @staticmethod
    def _add_trailing_slash(url):
        return re.sub(r'([^/])(\?|$)', r'\1/\2', url, count=1)

    def _add_authorization_header(self, headers):
        if self._auth_token:
            new_headers = headers.copy()
            new_headers['Authorization'] = 'JWT %s' % self._auth_token
            return new_headers

        else:
            return headers

    def _drop_auth_token_if_not_successful(self, status):
        if status / 100 != 2:
            self._auth_token = None


class DockerHub(API):
    # noinspection PyMissingConstructor
    def __init__(self, username=None, password=None, auth_token=None, **kwargs):
        self._username = username
        self._password = password

        self._login_lock = Lock()

        props = ConnectionProperties(
            api_url=kwargs.pop('api_url', 'hub.docker.com'),
            secure_http=True
        )

        self.setClient(DockerHubClient(auth_token))
        self.setConnectionProperties(props)

    def login(self):
        with self._login_lock:
            if self._username and self._password:
                status, response = self.v2.users.login.post(body={
                    'username': self._username, 'password': self._password
                })

                if status == 200:
                    self.client.set_auth_token(response.get('token'))

                else:
                    raise LoginFailedException('HTTP %s' % status, response)

    @_with_login
    def get_user_details(self, username):
        return self._check_and_return(self.v2.users[username].get)

    @_with_login
    def get_repositories(self, username, page_size=100, limit=10000):
        return self._fetch_all_pages(self.v2.repositories[username].get,
                                     page=1, page_size=page_size, limit=limit)

    @_with_login
    def get_repository(self, username, repository_name):
        return self._check_and_return(self.v2.repositories[username][repository_name].get)

    @_with_login
    def get_tags(self, username, repository_name, page_size=100, limit=1000):
        return self._fetch_all_pages(self.v2.repositories[username][repository_name].tags.get,
                                     page=1, page_size=page_size, limit=limit)

    @_with_login
    def get_dockerfile(self, username, repository_name):
        return self._check_and_return(self.v2.repositories[username][repository_name].dockerfile.get)

    @_with_login
    def get_autobuild_settings(self, username, repository_name):
        return self._check_and_return(self.v2.repositories[username][repository_name].autobuild.get)

    @_with_login
    def get_comments(self, username, repository_name, page_size=100, limit=1000):
        return self._fetch_all_pages(self.v2.repositories[username][repository_name].comments.get,
                                     page=1, page_size=page_size, limit=limit)

    @_with_login
    def get_build_history(self, username, repository_name, page_size=100, limit=1000):
        return self._fetch_all_pages(self.v2.repositories[username][repository_name].buildhistory.get,
                                     page=1, page_size=page_size, limit=limit)

    @_with_login
    def get_build_details(self, username, repository_name, build_code):
        return self._check_and_return(self.v2.repositories[username][repository_name].buildhistory[build_code].get)

    @staticmethod
    def _check_and_return(incomplete_request):
        status, response = incomplete_request()

        if status == 200:
            return response

    @staticmethod
    def _fetch_all_pages(incomplete_request, page, page_size, limit):
        results = list()

        while True:
            kwargs = {'page_size': page_size}

            if page > 1:
                kwargs['page'] = page

            status, response = incomplete_request(**kwargs)

            if status == 200:
                results.extend(response.get('results'))

                if response.get('next'):
                    if not limit or len(results) < limit:
                        page += 1
                        continue

                count = response.get('count')

                return {
                    'count': count,
                    'results': results
                }

            else:
                break