avocado-framework/avocado

View on GitHub
avocado/plugins/journal.py

Summary

Maintainability
A
0 mins
Test Coverage
A
97%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
# Author: Cleber Rosa <cleber@redhat.com>

"""Journal Plugin"""

import datetime
import os
import sqlite3

from avocado.core.plugin_interfaces import CLI, ResultEvents
from avocado.core.settings import settings

JOURNAL_FILENAME = ".journal.sqlite"

SCHEMA = {
    "job_info": "CREATE TABLE job_info (unique_id TEXT UNIQUE)",
    "test_journal": (
        "CREATE TABLE test_journal ("
        "tag TEXT, "
        "time TEXT, "
        "action TEXT, "
        "status TEXT, "
        "flushed BOOLEAN DEFAULT 0)"
    ),
}


class JournalResult(ResultEvents):
    """
    Test Result Journal class.

    This class keeps a log of the test updates: started running, finished, etc.
    This information can be forwarded live to an avocado server and provide
    feedback to users from a central place.
    """

    name = "journal"
    description = "Journal event based results implementation"

    def __init__(self, config):  # pylint: disable=W0613, W0231
        """
        Creates an instance of ResultJournal.

        :param job: an instance of :class:`avocado.core.job.Job`.
        """
        self.journal_initialized = False
        self.journal_path = ""
        self.journal = None
        self.journal_cursor = None
        self.config = config
        self.enabled = config.get("run.journal.enabled")

    def _init_journal(self, logdir):
        self.journal_path = os.path.join(logdir, JOURNAL_FILENAME)
        self.journal = sqlite3.connect(self.journal_path)
        self.journal_cursor = self.journal.cursor()
        for table in SCHEMA:  # pylint: disable=C0206
            res = self.journal_cursor.execute(f"PRAGMA table_info('{table}')")
            if res.fetchone() is None:
                self.journal_cursor.execute(SCHEMA[table])
        self.journal.commit()

    def lazy_init_journal(self, state):
        # lazy init because we need the toplevel logdir for the job
        if not self.journal_initialized:
            self._init_journal(state["job_logdir"])
            self._record_job_info(state)
            self.journal_initialized = True

    def _shutdown_journal(self):
        if self.journal_initialized:
            self.journal.close()

    def _record_job_info(self, state):
        res = self.journal_cursor.execute("SELECT unique_id FROM job_info")
        if res.fetchone() is None:
            sql = "INSERT INTO job_info (unique_id) VALUES (?)"
            self.journal_cursor.execute(sql, (state["job_unique_id"],))
            self.journal.commit()

    def _record_status(self, state, action):
        sql = "INSERT INTO test_journal (tag, time, action, status) VALUES (?, ?, ?, ?)"

        # This shouldn't be required
        if action == "ENDED":
            status = state["status"]
        else:
            status = None

        self.journal_cursor.execute(
            sql,
            (
                str(state["name"]),
                datetime.datetime(1, 1, 1).now().isoformat(),
                action,
                status,
            ),
        )
        self.journal.commit()

    def pre_tests(self, job):
        pass

    def start_test(self, result, state):
        if not self.enabled:
            return
        self.lazy_init_journal(state)
        self._record_status(state, "STARTED")

    def test_progress(self, progress=False):
        pass

    def end_test(self, result, state):
        if not self.enabled:
            return
        self.lazy_init_journal(state)
        self._record_status(state, "ENDED")

    def post_tests(self, job):
        if not self.enabled:
            return
        self._shutdown_journal()


class Journal(CLI):
    """
    Test journal
    """

    name = "journal"
    description = "Journal options for the 'run' subcommand"

    def configure(self, parser):
        run_subcommand_parser = parser.subcommands.choices.get("run", None)
        if run_subcommand_parser is None:
            return

        help_msg = (
            "Records test status changes (for use with "
            "avocado-journal-replay and avocado-server)"
        )
        settings.register_option(
            section="run.journal",
            key="enabled",
            default=False,
            key_type=bool,
            help_msg=help_msg,
            parser=run_subcommand_parser,
            long_arg="--journal",
        )

    def run(self, config):
        pass