avocado-framework/avocado

View on GitHub
avocado/core/task/runtime.py

Summary

Maintainability
C
1 day
Test Coverage
B
85%
import itertools
import os
from enum import Enum

from avocado.core.dispatcher import TestPostDispatcher, TestPreDispatcher
from avocado.core.nrunner.task import TASK_DEFAULT_CATEGORY, Task
from avocado.core.test_id import TestID


class RuntimeTaskStatus(Enum):
    INTERRUPTED = "FINISHED INTERRUPTED"
    WAIT_DEPENDENCIES = "WAITING DEPENDENCIES"
    WAIT = "WAITING"
    FINISHED = "FINISHED"
    TIMEOUT = "FINISHED TIMEOUT"
    IN_CACHE = "FINISHED IN CACHE"
    FAILFAST = "FINISHED FAILFAST"
    FAIL_TRIAGE = "FINISHED WITH FAILURE ON TRIAGE"
    FAIL_START = "FINISHED FAILING TO START"
    STARTED = "STARTED"

    @staticmethod
    def finished_statuses():
        return [
            status
            for _, status in RuntimeTaskStatus.__members__.items()
            if "FINISHED" in status.value
        ]


class RuntimeTaskMixin:
    """Common utilities for RuntimeTask implementations."""

    @classmethod
    def from_runnable(
        cls,
        runnable,
        no_digits,
        index,
        base_dir,
        test_suite_name=None,
        status_server_uri=None,
        job_id=None,
        satisfiable_deps_execution_statuses=None,
    ):
        """Creates runtime task for test from runnable

        :param runnable: the "description" of what the task should run.
        :type runnable: :class:`avocado.core.nrunner.Runnable`
        :param no_digits: number of digits of the test uid
        :type no_digits: int
        :param index: index of tests inside test suite
        :type index: int
        :param base_dir: Path to the job base directory.
        :type base_dir: str
        :param test_suite_name: test suite name which this test is related to
        :type test_suite_name: str
        :param status_server_uri: the URIs for the status servers that this
                                  task should send updates to.
        :type status_server_uri: list
        :param job_id: the ID of the job, for authenticating messages that get
                       sent to the destination job's status server and will
                       make into the job's results.
        :type job_id: str
        :param satisfiable_deps_execution_statuses: The dependency result types that
                                                    satisfy the execution of this
                                                    RuntimeTask.
        :type satisfiable_deps_execution_statuses: list of test results.
        :returns: RuntimeTask of the test from runnable
        """

        # create test ID
        if test_suite_name:
            prefix = f"{test_suite_name}-{index}"
        else:
            prefix = index
        if cls.category is TASK_DEFAULT_CATEGORY:
            name = runnable.identifier
        else:
            name = f'{runnable.kind}-{runnable.kwargs.get("name")}'

        test_id = TestID(prefix, name, runnable.variant, no_digits)

        if not runnable.output_dir:
            runnable.output_dir = os.path.join(base_dir, test_id.str_filesystem)
        # handles the test task
        task = Task(
            runnable,
            identifier=test_id,
            status_uris=status_server_uri,
            category=cls.category,
            job_id=job_id,
        )
        return cls(task, satisfiable_deps_execution_statuses)


class RuntimeTask(RuntimeTaskMixin):
    """Task with extra status information on its life cycle status.

    The :class:`avocado.core.nrunner.Task` class contains information
    that is necessary to describe its persistence and execution by itself.

    This class wraps a :class:`avocado.core.nrunner.Task`, with extra
    information about its execution by a spawner within a state machine.
    """

    category = TASK_DEFAULT_CATEGORY

    def __init__(self, task, satisfiable_deps_execution_statuses=None):
        """Instantiates a new RuntimeTask.

        :param task: The task to keep additional information about
        :type task: :class:`avocado.core.nrunner.Task`
        :param satisfiable_deps_execution_statuses: The dependency result types that
                                                    satisfy the execution of this
                                                    RuntimeTask.
        :type satisfiable_deps_execution_statuses: list of test results.
        """
        #: The :class:`avocado.core.nrunner.Task`
        self.task = task
        #: The task status, a value from the enum
        #: :class:`avocado.core.task.runtime.RuntimeTaskStatus`
        self.status = None
        #: Information about task result when it is finished
        self._result = None
        #: Timeout limit for the completion of the task execution
        self.execution_timeout = None
        #: A handle that may be set by a spawner, and that may be
        #: spawner implementation specific, to keep track the task
        #: execution.  This may be a PID, a container ID, a FQDN+PID
        #: etc.
        self.spawner_handle = None
        #: The result of the spawning of a Task
        self.spawning_result = None
        self.dependencies = []
        self._satisfiable_deps_execution_statuses = ["pass"]
        if satisfiable_deps_execution_statuses:
            self._satisfiable_deps_execution_statuses = [
                status.lower() for status in satisfiable_deps_execution_statuses
            ]
        #: Flag to detect if the task should be save to cache
        self.is_cacheable = False

    def __repr__(self):
        if self.status is None:
            return f'<RuntimeTask Task Identifier: "{self.task.identifier}">'
        else:
            return (
                f'<RuntimeTask Task Identifier: "{self.task.identifier}" '
                f'Status: "{self.status}">'
            )

    def __hash__(self):
        return hash(self.task.identifier)

    def __eq__(self, other):
        if isinstance(other, RuntimeTask):
            return hash(self) == hash(other)
        return False

    @property
    def result(self):
        return self._result

    @property
    def satisfiable_deps_execution_statuses(self):
        return self._satisfiable_deps_execution_statuses

    @result.setter
    def result(self, result):
        self._result = result.lower()

    def are_dependencies_finished(self):
        for dependency in self.dependencies:
            if dependency.status not in RuntimeTaskStatus.finished_statuses():
                return False
        return True

    def get_finished_dependencies(self):
        """Returns all dependencies which already finished."""
        return [
            dep
            for dep in self.dependencies
            if dep.status in RuntimeTaskStatus.finished_statuses()
        ]

    def can_run(self):
        if not self.are_dependencies_finished():
            return False

        for dependency in self.dependencies:
            if dependency.result not in self.satisfiable_deps_execution_statuses:
                return False
        return True


class PrePostRuntimeTaskMixin(RuntimeTask):
    """Common utilities for PrePostRuntimeTask implementations."""

    @classmethod
    def get_tasks_from_test_task(
        cls,
        test_task,
        no_digits,
        base_dir,
        test_suite_name=None,
        status_server_uri=None,
        job_id=None,
        suite_config=None,
    ):
        """Creates runtime tasks for preTest task from test task.

        :param test_task: Runtime test task.
        :type test_task: :class:`avocado.core.task.runtime.RuntimeTask`
        :param no_digits: number of digits of the test uid
        :type no_digits: int
        :param base_dir: Path to the job base directory.
        :type base_dir: str
        :param test_suite_name: test suite name which this test is related to
        :type test_suite_name: str
        :param status_server_uri: the URIs for the status servers that this
                                  task should send updates to.
        :type status_server_uri: list
        :param job_id: the ID of the job, for authenticating messages that get
                       sent to the destination job's status server and will
                       make into the job's results.
        :type job_id: str
        :param suite_config: Configuration dict relevant for the whole suite.
        :type suite_config: dict
        :returns: Pre/Post RuntimeTasks of the dependencies from runnable
        :rtype: list
        """
        tasks = []
        plugins = cls.dispatcher().get_extentions_by_priority()
        runnable = test_task.task.runnable
        prefix = f"{test_task.task.identifier.str_filesystem}"
        for plugin in plugins:
            plugin = plugin.obj
            is_cacheable = getattr(plugin, "is_cacheable", False)
            test_runnables_method = getattr(plugin, f"{cls.category}_runnables")
            runnables = test_runnables_method(runnable, suite_config)
            for runnable in runnables:
                satisfiable_deps_execution_statuses = None
                if isinstance(runnable, tuple):
                    runnable, satisfiable_deps_execution_statuses = runnable
                output_dir_not_exists = runnable.output_dir is None
                task = cls.from_runnable(
                    runnable,
                    no_digits,
                    prefix,
                    base_dir,
                    test_suite_name,
                    status_server_uri,
                    job_id,
                    satisfiable_deps_execution_statuses,
                )
                if output_dir_not_exists:
                    runnable.output_dir = os.path.join(
                        os.path.abspath(os.path.join(base_dir, os.pardir)),
                        "dependencies",
                        str(task.task.identifier),
                    )
                    task.task.metadata["symlink"] = os.path.join(
                        test_task.task.runnable.output_dir,
                        "dependencies",
                        f'{runnable.kind}-{runnable.kwargs.get("name")}',
                    )
                task.is_cacheable = is_cacheable
                tasks.append(task)
        return tasks


class PreRuntimeTask(PrePostRuntimeTaskMixin):
    """Runtime task for tasks run before test"""

    category = "pre_test"
    dispatcher = TestPreDispatcher


class PostRuntimeTask(PrePostRuntimeTaskMixin):
    """Runtime task for tasks run after test"""

    category = "post_test"
    dispatcher = TestPostDispatcher


class RuntimeTaskGraph:
    """Graph representing dependencies between runtime tasks."""

    def __init__(
        self,
        tests,
        test_suite_name,
        status_server_uri,
        job_id,
        base_dir,
        suite_config=None,
    ):
        """Instantiates a new RuntimeTaskGraph.

        From the list of tests, it will create runtime tasks and connects them
        inside the graph by its dependencies.

        :param tests: runnables from test suite
        :type tests: list
        :param test_suite_name: test suite name which this test is related to
        :type test_suite_name: str
        :param status_server_uri: the URIs for the status servers that this
                                  task should send updates to.
        :type status_server_uri: list
        :param job_id: the ID of the job, for authenticating messages that get
                       sent to the destination job's status server and will
                       make into the job's results.
        :type job_id: str
        :param base_dir: Path to the job base directory.
        :type base_dir: str
        :param suite_config: Configuration dict relevant for the whole suite.
        :type suite_config: dict
        """
        self.graph = {}
        # create graph
        no_digits = len(str(len(tests)))
        for index, runnable in enumerate(tests, start=1):
            runtime_test = RuntimeTask.from_runnable(
                runnable,
                no_digits,
                index,
                base_dir,
                test_suite_name,
                status_server_uri,
                job_id,
            )
            self.graph[runtime_test] = runtime_test

            # with --dry-run we don't want to run dependencies
            if runnable.kind != "dry-run":
                pre_tasks = PreRuntimeTask.get_tasks_from_test_task(
                    runtime_test,
                    no_digits,
                    base_dir,
                    test_suite_name,
                    status_server_uri,
                    job_id,
                    suite_config,
                )
                post_tasks = PostRuntimeTask.get_tasks_from_test_task(
                    runtime_test,
                    no_digits,
                    base_dir,
                    test_suite_name,
                    status_server_uri,
                    job_id,
                    suite_config,
                )
                if pre_tasks or post_tasks:
                    self._connect_tasks(pre_tasks, [runtime_test], post_tasks)

    def _connect_tasks(self, pre_tasks, tasks, post_tasks):
        connections = list(itertools.product(pre_tasks, tasks))
        connections += list(itertools.product(tasks, post_tasks))
        for dependency, task in connections:
            self.graph[task] = task
            self.graph[dependency] = dependency
            task.dependencies.append(dependency)

    def get_tasks_in_topological_order(self):
        """Computes the topological order of runtime tasks in graph

        :returns: runtime tasks in topological order
        :rtype: list
        """

        def topological_order_util(vertex, visited, topological_order):
            visited[vertex] = True
            for v in vertex.dependencies:
                if not visited[v]:
                    topological_order_util(v, visited, topological_order)
            topological_order.append(vertex)

        visited = dict.fromkeys(self.graph, False)
        topological_order = []

        for vertex in self.graph.values():
            if not visited[vertex]:
                topological_order_util(vertex, visited, topological_order)
        return topological_order