cogniteev/easy-upgrade

View on GitHub
easy_upgrade/lib/github.py

Summary

Maintainability
A
1 hr
Test Coverage

from datetime import datetime
from fnmatch import fnmatch
import logging
import os
import os.path as osp
import shutil
import stat

import requests

from .. api import (
    Fetcher,
    Release,
    ReleaseProvider,
)
from .. toolbox import (
    download_http_url,
    temp_dir,
)

GITHUB_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%Z'
GITHUB_ROOT_API = "https://api.github.com"


def parse_date(date):
    if date.endswith('Z'):  # replace zulu abbreviation by UTC
        date = date[:-1] + 'UTC'
    return datetime.strptime(date, GITHUB_DATE_FORMAT)


class GitHubRelease(Release):
    def __init__(self, provider, name, config):
        self.organization, self.repository = name.split('/', 1)
        super(GitHubRelease, self).__init__(provider, name, config)
        self.with_prerelease = self.get('with-prerelease', False)
        self.with_draft = self.get('with-draft', False)
        self.name_filter = config.get('name')

    @property
    def pkg_name(self):
        return self.repository

    def get_releases(self):
        url = "{root}/repos/{org}/{repo}/releases".format(
            root=GITHUB_ROOT_API,
            org=self.organization,
            repo=self.repository
        )
        response = requests.get(url, auth=self.provider.basic_auth)
        response.raise_for_status()
        return response.json()

    def get_latest_release(self):
        result = None
        release_date = None

        releases = self.get_releases()
        for release in releases:
            name = release['name']
            if not self.with_prerelease and release['prerelease']:
                continue
            elif not self.with_draft and release['draft']:
                continue
            elif self.name_filter and not fnmatch(name, self.name_filter):
                continue
            elif release_date is None:
                release_date = parse_date(release['published_at'])
                result = release
            else:
                date = parse_date(release['published_at'])
                if date > release_date:
                    release_date = date
                    result = release
        return result


class GitHubProvider(ReleaseProvider):
    def __init__(self, name, top_config, release_cls=GitHubRelease):
        super(GitHubProvider, self).__init__(name, top_config, release_cls)
        self.basic_auth = self.get('basic-auth')
        if self.basic_auth:
            self.basic_auth = tuple(self.basic_auth.split(':', 1))


class GitHubAsset(Fetcher):
    providers = 'github'
    name = 'asset'

    def candidate_version(self):
        if not getattr(self, 'grelease', None):
            self.grelease = self.release.get_latest_release()
        if self.grelease is None:
            return None
        version = self.grelease['name']
        return version

    def fetch(self, output_directory):
        name = self.get('name')
        if name is not None:
            name = self.jinja_eval(name)
        getattr(self, 'grelease')
        assets = self.grelease.get('assets', [])
        if not any(assets):
            logging.warning("Could not find any asset in release")
        else:
            if name is not None:
                # FIXME implement additional filters...
                assets = filter(lambda a: a['name'] == name, assets)
        for asset in assets:
            self.download_asset(asset, output_directory)
        if not any(assets):
            raise Exception("Didn't find asset matching requirements")

    def download_asset(self, asset, output_directory):
        session = requests.Session()
        session.auth = self.provider.basic_auth
        output_file = osp.join(output_directory, self['file'])
        with temp_dir(cleanup=False) as download_dir:
            file_path, content_type = download_http_url(
                asset['browser_download_url'],
                session,
                download_dir,
                filename=self['file']
            )
            dir_path = osp.dirname(output_file)
            if not osp.isdir(dir_path):
                os.makedirs(dir_path)
            shutil.move(file_path, output_file)
        if content_type in ['application/octet-stream']:
            st = os.stat(output_file)
            os.chmod(
                output_file, st.st_mode |
                stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH
            )