cogniteev/easy-upgrade

View on GitHub
easy_upgrade/api.py

Summary

Maintainability
C
1 day
Test Coverage
import inspect
import logging
import operator

import pkg_resources

from .toolbox import temp_dir


def parse_version(version):
    if version is None:
        return version
    version = version.lstrip('v')
    return pkg_resources.parse_version(version)


class Action(dict):
    actions = {}
    bases = ['Fetcher', 'Installer', 'PostInstaller']

    def __init__(self, provider, release, config):
        super(Action, self).__init__(config)
        self.provider = provider
        self.release = release

    @classmethod
    def clear(cls):
        cls.actions.clear()

    def jinja_eval(self, text):
        # FIXME
        return text

    class __metaclass__(type):
        def __new__(mcs, name, bases, attrs):
            new_type = type.__new__(mcs, name, bases, attrs)
            if name != 'Action' and name not in Action.bases:
                action_name = Action.get_action_name(new_type, attrs)
                providers = Action.get_action_providers(new_type, attrs)
                basename = Action.get_action_basename(new_type)
                Action.register(basename, new_type, action_name, providers)
            return new_type

    @classmethod
    def get_action_basename(cls, new_type):
        classes = list(new_type.__bases__)
        candidates = []
        while any(classes):
            clazz = classes.pop()
            if clazz.__name__ in cls.bases:
                candidates.append(clazz)
            classes += clazz.__bases__
        if not(any(candidates)):
            raise Exception(
                "Action class must extend one of those classes: " +
                ", ".join(cls.bases)
            )
        elif len(candidates) != 1:
            raise Exception(
                "Action class must extend exactly one of those classes: " +
                ", ".join(cls.bases)
            )
        return candidates[0].__name__

    @classmethod
    def get_action_name(cls, new_type, attrs):
        if 'name' not in attrs:
            raise Exception(
                "class '{}' misses 'name' static member".format(
                    new_type.__name__))
        return attrs['name']

    @classmethod
    def get_action_providers(cls, new_type, attrs):
        providers = attrs.get('providers')
        if 'providers' not in attrs:
            classes = [new_type]
            while providers is None and any(classes):
                clazz = classes.pop()
                if hasattr(clazz, 'providers'):
                    providers = getattr(clazz, 'providers')
                    break
                classes += clazz.__bases__
        return providers

    @classmethod
    def get_action(cls, base, name, provider):
        if isinstance(provider, ReleaseProvider):
            provider = provider.name
        base = base.__name__ if inspect.isclass(base) else base
        actions = cls.actions.get(base, {})
        providers, action = actions.get(name, (None, None))
        if action is None:
            raise Exception("Unknown {} action {}".format(base, name))
        if providers is None:
            return action
        elif provider in providers:
            return action
        else:
            raise Exception(
                "{} Action '{}' is not available in provider '{}'".format(
                    base, name, provider)
            )

    @classmethod
    def register(cls, base_name, action_cls, name, providers):
        if name in cls.actions.get(base_name, {}):
            raise Exception("Action '{}' is already registered".format(name))
        if isinstance(providers, basestring):
            providers = (providers,)
        elif isinstance(providers, list):
            providers = tuple(providers)
        elif providers and not isinstance(providers, tuple):
            raise Exception(
                "Error in class {}".format(name) +
                ", 'providers' static member must be either "
                "a string, a list, or a tuple")
        cls.actions.setdefault(base_name, {})[name] = (providers, action_cls)


class Fetcher(Action):
    def candidate_version(self):
        """returns the version the fetcher would like to install"""
    def fetch(self, output_directory):
        pass


class Installer(Action):
    def installed_version(self):
        raise NotImplementedError()

    def install(self, fetched_items_path, version):
        raise NotImplementedError


class PostInstaller(Action):
    def execute(self, fetched_items_path, version):
        raise NotImplementedError


class Release(dict):
    def __init__(self, provider, name, config):
        super(Release, self).__init__(config)
        self.provider = provider
        self.name = name
        self.fetcher = self.__extract_action('fetch', Fetcher)
        self.installer = self.__extract_action('install', Installer)
        self.post_installers = self.__extract_action(
            'post-install', PostInstaller, unique=False, default=[]
        )
        self.logger = logging.getLogger('{}:{}'.format(
            self.provider.name,
            self.name
        ))

    def pkg_name(self):
        return self.name

    def get_versions(self):
        installed = self.installer.installed_version()
        candidate = self.fetcher.candidate_version()
        versions = dict()
        if installed:
            versions['installed'] = {
                'human': installed,
                'tuple': parse_version(installed),
            }
        if candidate:
            versions['candidate'] = {
                'human': candidate,
                'tuple': parse_version(candidate),
            }
        return versions

    def __version(self, version):
        if version:
            version_tuple = parse_version(version)
        else:
            version_tuple = None
        return version, version_tuple

    def __is_bidder_newer(self, installed, bidder, bidder_str):
        if installed:
            if installed == bidder:
                self.logger.info(
                    "candidate version ({}) ".format(bidder_str) +
                    "is already installed. Nothing to do."
                )
                return False
            elif installed > bidder:
                self.logger.info(
                    "installed version ({}) ".format(installed) +
                    "is more recent than " +
                    "candidate version ({})".format(bidder_str)
                )
                return False
        return True

    def install(self):
        _, version = self.__version(self.installer.installed_version())
        bidder_str, bidder = self.__version(self.fetcher.candidate_version())
        if not self.__is_bidder_newer(version, bidder, bidder_str):
            return False
        self.logger.info("starting installation of version {}".format(
            bidder_str
        ))
        top_config = self.provider.top_config
        cleanup_temp_dir = top_config.get('cleanup-temp-dir', True)
        with temp_dir(cleanup=cleanup_temp_dir) as d:
            self.logger.info("fetching release")
            self.fetcher.fetch(d)
            self.logger.info("installing release")
            self.installer.install(d, bidder_str)
            for post_installer in self.post_installers:
                post_installer.execute(d, bidder_str)
        return True

    def __get_raw_config(self, config_key, default, unique):
        raw_configs = self.get(config_key)
        if raw_configs is None:
            if default is not None:
                return default
            else:
                raise Exception(
                    "Expecting key '{}' in release configuration"
                    .format(config_key)
                )
        elif isinstance(raw_configs, list):
            if unique:
                if len(raw_configs) != 1:
                    raise Exception(
                        "Release config key '{}' ".format(config_key) +
                        "expect only one item"
                    )
        else:
            raw_configs = [raw_configs]
        return raw_configs

    def __instantiate_action(self, base_action_cls, name, config):
        action_cls = Action.get_action(
            base_action_cls,
            name,
            self.provider
        )
        return action_cls(self.provider, self, config)

    def __extract_action(self, config_key, base_action_cls,
                         unique=True, default=None):
        raw_configs = self.__get_raw_config(config_key, default, unique)
        actions = []
        for config in raw_configs:
            if not isinstance(config, dict):
                raise Exception(
                    "Release config key '{}' ".format(config_key) +
                    "expect dictionary values"
                )
            if len(config) != 1:
                raise Exception(
                    "Release config-key '{}' ".format(config_key) +
                    "expect a dictionary values of 1 element (name => config)"
                )
            action_name, action_config = config.items()[0]
            actions.append(self.__instantiate_action(
                base_action_cls,
                action_name,
                action_config
            ))
        if unique:
            return actions[0]
        return actions


class ReleaseProvider(dict):
    def __init__(self, name, top_config, release_cls=Release):
        self.name = name
        self.top_config = top_config
        super(ReleaseProvider, self).__init__(top_config.get(name))
        self.release_cls = release_cls
        self.releases = {}
        for name, raw_config in self.get('releases', {}).items():
            self.releases[name] = release_cls(self, name, raw_config)

    def install(self, *releases):
        if not any(releases):
            return reduce(
                operator.__and__,
                map(
                    lambda r: r.install(),
                    self.releases.values()
                )
            )
        else:
            return reduce(
                operator.__and__,
                map(
                    lambda r: r.install(),
                    [release for name, release in self.releases.items()
                     if name in releases]
                )
            )


class EasyUpgrade(object):
    @classmethod
    def from_yaml(cls, path):
        return EasyUpgrade(cls.load_yaml(path))

    @classmethod
    def load_yaml(cls, path):
        from yaml import load
        with open(path) as istr:
            return load(istr)

    def get_packages_version(self):
        for provider in self.providers.values():
            for release in provider.releases.values():
                yield {
                    'provider': provider.name,
                    'release': release.name,
                    'versions': release.get_versions()
                }

    def get_outdated_packages(self):
        for pkg in self.get_packages_version():
            versions = pkg['versions']
            outdated = False
            installed = versions.get('installed')
            if 'candidate' in versions:
                if installed:
                    if installed['tuple'] < versions['candidate']['tuple']:
                        outdated = True
                else:
                    outdated = True
            if outdated:
                yield pkg

    def __init__(self, config):
        self.config = config
        self.providers = {}
        _actions = 'easy_upgrade.actions'
        _providers = 'easy_upgrade.providers'
        for entrypoint in pkg_resources.iter_entry_points(group=_actions):
            entrypoint.load()
        for entrypoint in pkg_resources.iter_entry_points(group=_providers):
            provider = entrypoint.load()
            name = entrypoint.name
            if name in config:
                self.providers[name] = provider(name, config)