avocado-framework/avocado

View on GitHub
avocado/core/status/repo.py

Summary

Maintainability
A
35 mins
Test Coverage
A
95%
import heapq
import logging

from avocado.core.status.utils import json_loads
from avocado.core.teststatus import STATUSES

LOG = logging.getLogger(__name__)


class StatusMsgMissingDataError(Exception):
    """Status message does not contain the required data."""


class StatusRepo:
    """Maintains tasks' status related data and provides aggregated info."""

    def __init__(self, job_id):
        """Initializes a new StatusRepo

        :param job_id: the job unique identification for which the
                       messages are destined to.
        :type job_id: str
        """
        self.job_id = job_id
        #: Contains all received messages by a given task (by its ID)
        self._all_data = {}
        #: Contains the most up to date status of a task, and the time
        #: it was set in a tuple (status, time).  This is keyed
        #: by the task ID, and the most up to date status is determined by
        #: the status type of message.
        self._status = {}
        #: Contains a global journal of status updates to be picked, each
        #: entry containing a tuple with (task_id, status, time).  It discards
        #: status that have been superseded by newer status.
        self._status_journal_summary = []
        #: Contains the task IDs keyed by the result received
        self._by_result = {}

    def _handle_task_finished(self, message):
        task_id = message["id"]

        result = message.get("result")
        if result is not None and result.upper() not in STATUSES:
            overridden = "error"
            message["result"] = overridden
            message["fail_reason"] = (
                f'Runner error occurred: Test reports unsupported status "{result}"'
            )
            LOG.error(
                'Task "%s" finished message with unsupported status '
                '"%s", changing to "%s"',
                task_id,
                result,
                overridden,
            )

        self._set_by_result(message)
        self._set_task_data(message)
        LOG.debug('Task "%s" finished message: "%s"', task_id, message)

    def _handle_task_started(self, message):
        if "output_dir" not in message:
            raise StatusMsgMissingDataError("output_dir")
        task_id = message["id"]
        LOG.debug('Task "%s" started message: "%s"', task_id, message)
        self._set_task_data(message)

    def _set_by_result(self, message):
        """Sets an entry in the aggregate by result.

        For messages that include a "result" key, expected for example,
        from a "finished" status message, this will allow users to query
        for tasks with a given result."""
        result = message.get("result")
        if result not in self._by_result:
            self._by_result[result] = []
        if message["id"] not in self._by_result[result]:
            self._by_result[result].append(message["id"])

    def _set_task_data(self, message):
        """Appends all data on message to an entry keyed by the task's ID."""
        task_id = message.pop("id")
        if task_id not in self._all_data:
            self._all_data[task_id] = []
        self._all_data[task_id].append(message)

    def get_all_task_data(self, task_id):
        """Returns all data on a given task, by its ID."""
        return self._all_data.get(task_id)

    def get_task_data(self, task_id, index):
        """Returns the data on the index of a given task, by its ID."""
        task_data = self._all_data.get(task_id)
        return task_data[index]

    def get_latest_task_data(self, task_id):
        """Returns the latest data on a given task, by its ID."""
        task_data = self._all_data.get(task_id)
        if task_data is None:
            return None
        return task_data[-1]

    def status_journal_summary_pop(self):
        return heapq.heappop(self._status_journal_summary)

    def _update_status(self, message):
        """Update the latest status of a task (by message)."""
        task_id = message.get("id")
        status = message.get("status")
        time = message.get("time")
        if not all((task_id, status, time)):
            return
        if task_id not in self._status:
            self._status[task_id] = (status, time)
            heapq.heappush(self._status_journal_summary, (time, task_id, status, 0))
        else:
            current_status, _ = self._status[task_id]
            if current_status == "finished":
                LOG.warning(
                    "Received a %s message after finished message: %s", status, message
                )
            elif status == "started":
                LOG.warning(
                    "Received a started message when the status is already %s: %s",
                    current_status,
                    message,
                )
            else:
                self._status[task_id] = (status, time)
            index = len(self.get_all_task_data(task_id))
            heapq.heappush(self._status_journal_summary, (time, task_id, status, index))

    def process_message(self, message):
        for required_field in ("id", "job_id"):
            if required_field not in message:
                raise StatusMsgMissingDataError(required_field)

        job_id = message.get("job_id")
        if job_id != self.job_id:
            LOG.warning("Received a message destined for a different job: %s", message)
            return
        message.pop("job_id")

        self._update_status(message)
        handlers = {
            "started": self._handle_task_started,
            "finished": self._handle_task_finished,
        }
        meth = handlers.get(message.get("status"), self._set_task_data)
        meth(message)

    def process_raw_message(self, raw_message):
        raw_message = raw_message.strip()
        message = json_loads(raw_message)
        self.process_message(message)

    @property
    def result_stats(self):
        return {key: len(value) for key, value in self._by_result.items()}

    def get_task_status(self, task_id):
        return self._status.get(task_id, (None, None))[0]

    @staticmethod
    def _is_in_task(tasks, task_ids):
        """Returns True if any of the tasks is in task_ids."""
        return any([True for task_id in task_ids if task_id in tasks])

    def get_result_set_for_tasks(self, task_ids):
        """Returns a set of results for the given tasks."""
        results = [
            key
            for key, value in self._by_result.items()
            if self._is_in_task(value, task_ids)
        ]
        return set(results)