avocado-framework/avocado

View on GitHub
avocado/core/nrunner/task.py

Summary

Maintainability
A
3 hrs
Test Coverage
F
50%
import base64
import json
import logging
import socket
import tempfile
import time
from uuid import uuid1

from avocado.core.nrunner.runnable import (
    RUNNERS_REGISTRY_STANDALONE_EXECUTABLE,
    Runnable,
)

LOG = logging.getLogger(__name__)

#: The default category for tasks, and the value that will cause the
#: task results to be included in the job results
TASK_DEFAULT_CATEGORY = "test"


class StatusEncoder(json.JSONEncoder):

    # pylint: disable=E0202
    def default(self, o):
        if isinstance(o, bytes):
            return {"__base64_encoded__": base64.b64encode(o).decode("ascii")}
        return json.JSONEncoder.default(self, o)


def json_dumps(data):
    return json.dumps(data, ensure_ascii=True, cls=StatusEncoder)


class TaskStatusService:
    """
    Implementation of interface that a task can use to post status updates

    TODO: make the interface generic and this just one of the implementations
    """

    def __init__(self, uri):
        self.uri = uri
        self._connection = None

    @property
    def connection(self):
        if not self._connection:
            self._create_connection()
        return self._connection

    def _create_connection(self):
        """
        Creates connection with `self.uri` based on `socket.create_connection`
        """
        if ":" in self.uri:
            host, port = self.uri.split(":")
            port = int(port)
            for _ in range(600):
                try:
                    self._connection = socket.create_connection((host, port))
                    break
                except ConnectionRefusedError as error:
                    LOG.warning(error)
                    time.sleep(1)
            else:
                self._connection = socket.create_connection((host, port))
        else:
            self._connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self._connection.connect(self.uri)

    def post(self, status):
        data = json_dumps(status)
        try:
            self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
        except BrokenPipeError:
            try:
                self._create_connection()
                self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
            except ConnectionRefusedError:
                LOG.warning(f"Connection with {self.uri} has been lost.")
                return False
        return True

    def close(self):
        if self.connection is not None:
            self.connection.close()

    def __repr__(self):
        return f'<TaskStatusService uri="{self.uri}">'


class Task:
    """
    Wraps the execution of a runnable

    While a runnable describes what to be run, and gets run by a
    runner, a task should be a unique entity to track its state,
    that is, whether it is pending, is running or has finished.
    """

    def __init__(
        self,
        runnable,
        identifier=None,
        status_uris=None,
        category=TASK_DEFAULT_CATEGORY,
        job_id=None,
    ):
        """Instantiates a new Task.

        :param runnable: the "description" of what the task should run.
        :type runnable: :class:`avocado.core.nrunner.Runnable`
        :param identifier: any identifier that is guaranteed to be unique
                           within the context of a Job. A recommended value
                           is a :class:`avocado.core.test_id.TestID` instance
                           when a task represents a test, because besides the
                           uniqueness aspect, it's also descriptive.  If an
                           identifier is not given, an automatically generated
                           one will be set.
        :param status_uris: the URIs for the status servers that this task
                            should send updates to.
        :type status_uris: list
        :param category: category of this task. Defaults to
                         :data:`TASK_DEFAULT_CATEGORY`.
        :type category: str
        :param job_id: the ID of the job, for authenticating messages that get
                       sent to the destination job's status server and will make
                       into the job's results.
        :type job_id: str
        """
        # pylint: disable=W0201
        self.runnable = runnable
        self.identifier = identifier or str(uuid1())
        #: Category of the task.  If the category is not "test", it
        #: will not be accounted for on a Job's test results.
        self.category = category
        self.job_id = job_id
        self.status_services = []
        status_uris = status_uris or self.runnable.config.get("run.status_server_uri")
        if status_uris is not None:
            if not isinstance(status_uris, list):
                status_uris = [status_uris]
            for status_uri in status_uris:
                self.status_services.append(TaskStatusService(status_uri))
        self.metadata = {}

    def __repr__(self):
        fmt = (
            '<Task identifier="{}" runnable="{}" status_services="{}"'
            ' category="{}" job_id="{}">'
        )
        return fmt.format(
            self.identifier,
            self.runnable,
            self.status_services,
            self.category,
            self.job_id,
        )

    def are_dependencies_available(self, runners_registry=None):
        """Verifies if dependencies needed to run this task are available.

        This currently checks the runner command only, but can be expanded once
        the handling of other types of dependencies are implemented.  See
        :doc:`/blueprints/BP002`.
        """
        if runners_registry is None:
            runners_registry = RUNNERS_REGISTRY_STANDALONE_EXECUTABLE
        return self.runnable.runner_command(runners_registry)

    def setup_output_dir(self, output_dir=None):
        if not self.runnable.output_dir:
            output_dir = output_dir or tempfile.mkdtemp(prefix=".avocado-task-")
            self.runnable.output_dir = output_dir

    @classmethod
    def from_recipe(cls, task_path):
        """
        Creates a task (which contains a runnable) from a task recipe file

        :param task_path: Path to a recipe file

        :rtype: instance of :class:`Task`
        """
        with open(task_path, encoding="utf-8") as recipe_file:
            recipe = json.load(recipe_file)

        identifier = recipe.get("id")
        runnable_recipe = recipe.get("runnable")
        runnable = Runnable(
            runnable_recipe.get("kind"),
            runnable_recipe.get("uri"),
            *runnable_recipe.get("args", ()),
            config=runnable_recipe.get("config"),
        )
        status_uris = recipe.get("status_uris")
        category = recipe.get("category")
        return cls(runnable, identifier, status_uris, category)

    def get_command_args(self):
        """
        Returns the command arguments that adhere to the runner interface

        This is useful for building 'task-run' commands that can be
        executed on a command line interface.

        :returns: the arguments that can be used on an avocado-runner command
        :rtype: list
        """
        args = ["-i", str(self.identifier), "-j", str(self.job_id)]
        args += self.runnable.get_command_args()

        for status_service in self.status_services:
            args.append("-s")
            args.append(status_service.uri)

        return args

    def run(self):
        self.setup_output_dir()
        runner_klass = self.runnable.pick_runner_class()
        runner = runner_klass()
        running_status_services = self.status_services
        damaged_status_services = []
        for status in runner.run(self.runnable):
            if status["status"] == "started":
                status.update({"output_dir": self.runnable.output_dir})
            status.update({"id": self.identifier})
            if self.job_id is not None:
                status.update({"job_id": self.job_id})
            for status_service in running_status_services:
                if not status_service.post(status):
                    damaged_status_services.append(status_service)
            if damaged_status_services:
                running_status_services = list(
                    filter(
                        lambda s: s not in damaged_status_services,
                        running_status_services,
                    )
                )
                damaged_status_services.clear()
            yield status