albertyw/req-update

View on GitHub
req_update/util.py

Summary

Maintainability
A
2 hrs
Test Coverage
A
100%
from __future__ import annotations
import os
from pathlib import Path
import re
import subprocess
from typing import Optional, Union


BRANCH_NAME = 'dep-update'
COMMIT_MESSAGE = 'Update {language} {package} package to {version}'
SubprocessOutput = Union[
    subprocess.CalledProcessError,
    'subprocess.CompletedProcess[str]',  # Quoted to main compatibility with Python 3.7
]


class Updater:
    def __init__(self, util: Util) -> None:
        self.util = util

    def check_applicable(self) -> bool:
        """
        Return if this updater is applicable for the current repository
        """
        return False

    def update_dependencies(self) -> bool:
        """
        Update dependencies
        Return if updates were made
        """
        return False

    @property
    def language(self) -> str:
        """
        Return the name of the language being updated
        """
        return type(self).__name__


class Util:
    def __init__(self) -> None:
        self.push = False
        self.verbose = False
        self.dry_run = True
        self.branch_exists = False

    def check_repository_cleanliness(self) -> None:
        """
        Check that the repository is ready for updating dependencies.
        Non-clean repositories will raise a RuntimeError
        """
        # Make sure there are no uncommitted files
        if self.dry_run:
            return
        command = ['git', 'status', '--porcelain']
        try:
            result = self.execute_shell(command, True)
        except subprocess.CalledProcessError as error:
            raise RuntimeError('Must run within a git repository') from error
        lines = result.stdout.split('\n')
        # Do not count untracked files when checking for repository cleanliness
        lines = [line for line in lines if line and line[:2] != '??']
        if lines:
            raise RuntimeError('Repository not clean')

    def commit_git(self, commit_message: str) -> None:
        """Create a git commit of all changed files"""
        self.log(commit_message)
        command = ['git', 'commit', '-am', commit_message]
        self.execute_shell(command, False)
        self.push_dependency_update()

    def commit_dependency_update(
        self, language: str, dependency: str, version: str
    ) -> None:
        """Create a commit with a dependency update"""
        commit_message = COMMIT_MESSAGE.format(
            language=language,
            package=dependency,
            version=version,
        )
        self.commit_git(commit_message)

    def create_branch(self) -> None:
        """Create a new branch for committing dependency updates"""
        # Make sure branch does not already exist
        command = ['git', 'branch', '--list', BRANCH_NAME]
        result = self.execute_shell(command, True)
        output = result.stdout
        if output.strip() != '':
            command = ['git', 'checkout', BRANCH_NAME]
            self.branch_exists = True
        else:
            command = ['git', 'checkout', '-b', BRANCH_NAME]
        self.execute_shell(command, False)

    def rollback_branch(self) -> None:
        """Delete the dependency update branch"""
        if self.branch_exists:
            return
        command = ['git', 'checkout', '-']
        self.execute_shell(command, False)
        command = ['git', 'branch', '-d', BRANCH_NAME]
        self.execute_shell(command, False)

    def reset_changes(self) -> None:
        """Reset any noncommitted changes to the branch"""
        command = ['git', 'checkout', '.']
        self.execute_shell(command, False)

    def push_dependency_update(self) -> None:
        """Git push any commits to remote"""
        if not self.push:
            return
        self.log('Pushing commit to git remote')
        command = ['git', 'push', '-u', 'origin']
        self.execute_shell(command, False)

    def compare_versions(self, current: str, proposed: str) -> bool:
        """
        Take the current version and a proposed new version and return a bool
        for whether the new version is a valid upgrade.  The new version is a
        valid upgrade if the version structure matches and the version numbers
        are greater.
        """
        structure_regex = re.compile(r"[0-9]+")
        current_structure = structure_regex.sub("", current)
        proposed_structure = structure_regex.sub("", proposed)
        if current_structure != proposed_structure:
            return False
        num_regex = re.compile(r"\d+")
        current_nums = num_regex.findall(current)
        proposed_nums = num_regex.findall(proposed)
        for compares in zip(current_nums, proposed_nums):
            if int(compares[0]) < int(compares[1]):
                return True
            if int(compares[0]) > int(compares[1]):
                return False
        return False

    def check_major_version_update(
        self, dependency: str, old_version: str, new_version: str
    ) -> Optional[bool]:
        """
        Try to parse versions as semver and compare major version numbers.
        Log a warning if the major version numbers are different.
        Returns True if there is a major version bump
        Returns False if there is not a major version bump
        Returns None if versions are not semver
        """
        old_version_parsed = old_version.lstrip('^~ ').split('.')
        new_version_parsed = new_version.lstrip('^~ ').split('.')
        if len(old_version_parsed) != 3 or len(new_version_parsed) != 3:
            return None
        try:
            old_version_major = int(old_version_parsed[0])
        except ValueError:
            return None
        try:
            new_version_major = int(new_version_parsed[0])
        except ValueError:
            return None
        if old_version_major == new_version_major:
            return False
        self.warn(
            'Warning: Major version change on %s: %s updated to %s'
            % (dependency, old_version, new_version)
        )
        return True

    def execute_shell(
        self,
        command: list[str],
        readonly: bool,
        cwd: Optional[Path] = None,
        suppress_output: bool = False,
        ignore_exit_code: bool = False,
    ) -> SubprocessOutput:
        """Helper method to execute commands in a shell and return output"""
        if self.verbose:
            self.log(' '.join(command))
        if self.dry_run and not readonly:
            return subprocess.CompletedProcess(
                command, 0, stdout='', stderr=''
            )
        try:
            result = subprocess.run(
                command,
                cwd=cwd,
                capture_output=True,
                check=True,
                encoding='utf-8',
            )
        except subprocess.CalledProcessError as error:
            if ignore_exit_code:
                return error
            if not suppress_output:
                self.log(error.stdout)
                self.warn(error.stderr)
            raise
        return result

    @staticmethod
    def is_no_color() -> bool:
        """Return if output should have no color: https://no-color.org/"""
        return 'NO_COLOR' in os.environ

    def warn(self, data: str) -> None:
        """Helper method for warn-level logs"""
        if not Util.is_no_color():
            data = f'\033[93m{data}\033[0m'
        return self.log(data)

    def log(self, data: str) -> None:
        """Helper method for taking care of logging statements"""
        print(data)