dev-pipeline/devpipeline-core

View on GitHub
lib/devpipeline_core/command.py

Summary

Maintainability
A
1 hr
Test Coverage
#!/usr/bin/python3

"""
Classes and functions releated to executing commands.
"""

import argparse

import devpipeline_core.configinfo
import devpipeline_core.env
import devpipeline_core.resolve
import devpipeline_core.version


def setup_target_parser(parser):
    parser.add_argument(
        "targets",
        nargs="*",
        default=argparse.SUPPRESS,
        help="The targets to operate on",
    )


def setup_task_parser(parser):
    setup_target_parser(parser)
    parser.add_argument(
        "--dependencies",
        help="Control how build dependencies are handled.",
        default="deep",
    )
    parser.add_argument(
        "--executor", help="The method to execute commands.", default="quiet"
    )
    parser.add_argument(
        "--keep-going",
        action="store_true",
        help="If a task fails, continue executing as many remaining tasks as possible.",
    )


def add_version_info(argparser, version):
    argparser.add_argument(
        "--version", action="version", version="%(prog)s {}".format(version)
    )
    return argparser


def _setup_targets(parsed_args, components):
    if "targets" in parsed_args:
        return parsed_args.targets
    parsed_args.dependencies = "deep"
    return components.keys()


def process_targets(arguments, work_fn, config_fn):
    full_config = config_fn()
    targets = _setup_targets(arguments, full_config)
    work_fn(targets, full_config)


def _execute_targets(task_dict, task_queue, config_info, full_config, fail_function):
    for component_tasks in task_queue:
        for component_task in component_tasks:
            task_heading = "  {} ({})".format(component_task[0], component_task[1])
            config_info.executor.message(task_heading)
            config_info.executor.message("-" * (2 + len(task_heading)))

            config_info.config = full_config.get(component_task[0])
            config_info.env = devpipeline_core.env.create_environment(
                config_info.config
            )
            try:
                task_dict[component_task[1]](config_info)
                task_queue.resolve(component_task)
            except Exception as failure:  # pylint: disable=broad-except
                fail_function(failure, component_task)
            config_info.executor.message("")


class _PartialFailureException(Exception):
    def __init__(self, failed):
        self.failed = failed


def process_tasks(arguments, tasks, config_fn):
    def _work_fn(targets, full_config):
        task_order = []
        task_dict = {}
        for name, fn in tasks:
            task_order.append(name)
            task_dict[name] = fn

        executor = _get_executor(arguments)
        resolver = _get_resolver(arguments)
        dep_manager = resolver(targets, full_config, task_order)
        task_queue = dep_manager.get_queue()
        config_info = devpipeline_core.configinfo.ConfigInfo(executor)

        try:
            failed = []

            def _keep_going_on_failure(failure, task):
                skipped = task_queue.fail(task)
                failed.append((task, skipped, str(failure)))

            def _fail_immediately(failure, task):
                del task
                raise failure

            fail_fn = _fail_immediately
            if arguments.keep_going:
                fail_fn = _keep_going_on_failure

            _execute_targets(task_dict, task_queue, config_info, full_config, fail_fn)
            if failed:
                raise _PartialFailureException(failed)
        finally:
            full_config.write()

    process_targets(arguments, _work_fn, config_fn)


def _get_executor(parsed_args):
    helper_fn = devpipeline_core.EXECUTOR_TYPES.get(parsed_args.executor)
    if helper_fn:
        return helper_fn[0]()
    raise Exception("{} isn't a valid executor".format(parsed_args.executor))


def _get_resolver(parsed_args):
    if "dependencies" not in parsed_args:
        parsed_args.dependencies = "deep"
    resolver = devpipeline_core.DEPENDENCY_RESOLVERS.get(parsed_args.dependencies)
    if resolver:
        return resolver[0]
    raise Exception("{} isn't a valid resolver".format(parsed_args.dependencies))