avocado-framework/avocado

View on GitHub
selftests/unit/status_repo.py

Summary

Maintainability
A
1 hr
Test Coverage
from unittest import TestCase

from avocado.core.status import repo, utils


class StatusRepo(TestCase):
    def setUp(self):
        job_id = "0000000000000000000000000000000000000000"
        self.status_repo = repo.StatusRepo(job_id)

    def test_process_raw_message_invalid(self):
        with self.assertRaises(utils.StatusMsgInvalidJSONError):
            self.status_repo.process_raw_message("+-+-InvalidJSON-AFAICT-+-+")

    def test_process_raw_message_no_id(self):
        msg = (
            '{"status": "finished", "time": 1597774000.5140226, '
            '"returncode": 0, '
            '"job_id": "0000000000000000000000000000000000000000"}'
        )
        with self.assertRaises(repo.StatusMsgMissingDataError):
            self.status_repo.process_raw_message(msg)

    def test_process_raw_message_no_job_id(self):
        msg = (
            '{"status": "finished", "time": 1597774000.5140226, '
            '"returncode": 0, "id": "1-foo"}'
        )
        with self.assertRaises(repo.StatusMsgMissingDataError):
            self.status_repo.process_raw_message(msg)

    def test_set_task_data(self):
        self.status_repo._set_task_data({"id": "1-foo", "status": "started"})
        self.assertEqual(self.status_repo._all_data["1-foo"], [{"status": "started"}])

    def test_handle_task_started(self):
        msg = {"id": "1-foo", "status": "started", "output_dir": "/fake/path"}
        self.status_repo._handle_task_started(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"),
            [{"status": "started", "output_dir": "/fake/path"}],
        )

    def test_handle_task_started_no_output_dir(self):
        msg = {"id": "1-foo", "status": "started"}
        with self.assertRaises(repo.StatusMsgMissingDataError):
            self.status_repo._handle_task_started(msg)

    def test_handle_task_finished_no_result(self):
        msg = {"id": "1-foo", "status": "finished"}
        self.status_repo._handle_task_finished(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"), [{"status": "finished"}]
        )
        self.assertEqual(self.status_repo._by_result.get(None), ["1-foo"])

    def test_handle_task_finished_result(self):
        msg = {"id": "1-foo", "status": "finished", "result": "pass"}
        self.status_repo._handle_task_finished(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"),
            [{"status": "finished", "result": "pass"}],
        )
        self.assertEqual(self.status_repo._by_result.get("pass"), ["1-foo"])

    def test_process_message_running(self):
        msg = {
            "id": "1-foo",
            "status": "running",
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"), [{"status": "running"}]
        )

    def test_process_raw_message_task_started(self):
        msg = (
            '{"id": "1-foo", "status": "started", '
            '"output_dir": "/fake/path", '
            '"job_id": "0000000000000000000000000000000000000000"}'
        )
        self.status_repo.process_raw_message(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"),
            [{"status": "started", "output_dir": "/fake/path"}],
        )

    def test_process_raw_message_task_running(self):
        msg = (
            '{"id": "1-foo", "status": "running", '
            '"job_id": "0000000000000000000000000000000000000000"}'
        )
        self.status_repo.process_raw_message(msg)
        self.assertEqual(
            self.status_repo.get_all_task_data("1-foo"), [{"status": "running"}]
        )

    def test_process_messages_running(self):
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1597894378.6080744,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1597894378.6103745,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(
            self.status_repo.get_latest_task_data("1-foo"),
            {"status": "running", "time": 1597894378.6103745},
        )

    def test_task_status_time(self):
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1597894378.0000002,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(
            self.status_repo._status["1-foo"], ("running", 1597894378.0000002)
        )
        msg = {
            "id": "1-foo",
            "status": "started",
            "time": 1597894378.0000001,
            "output_dir": "/fake/path",
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(
            self.status_repo._status["1-foo"], ("running", 1597894378.0000002)
        )

    def test_no_task_status(self):
        self.assertIsNone(self.status_repo.get_task_status("1-no-existing-id"))

    def test_get_task_status(self):
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1597894378.0000002,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(self.status_repo.get_task_status("1-foo"), "running")
        msg = {
            "id": "1-foo",
            "status": "started",
            "time": 1597894378.0000001,
            "output_dir": "/fake/path",
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(self.status_repo.get_task_status("1-foo"), "running")

    def test_get_task_status_journal_summary(self):
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1000000003.0,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        msg = {
            "id": "1-foo",
            "status": "running",
            "time": 1000000002.0,
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        msg = {
            "id": "1-foo",
            "status": "started",
            "time": 1000000001.0,
            "output_dir": "/fake/path",
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        msg = {
            "id": "1-foo",
            "status": "finished",
            "time": 1000000004.0,
            "result": "pass",
            "job_id": "0000000000000000000000000000000000000000",
        }
        self.status_repo.process_message(msg)
        self.assertEqual(self.status_repo.get_task_status("1-foo"), "finished")
        self.assertEqual(
            self.status_repo.status_journal_summary_pop(),
            (1000000001.0, "1-foo", "started", 2),
        )
        self.assertEqual(
            self.status_repo.status_journal_summary_pop(),
            (1000000002.0, "1-foo", "running", 1),
        )
        self.assertEqual(
            self.status_repo.status_journal_summary_pop(),
            (1000000003.0, "1-foo", "running", 0),
        )
        self.assertEqual(
            self.status_repo.status_journal_summary_pop(),
            (1000000004.0, "1-foo", "finished", 3),
        )
        with self.assertRaises(IndexError):
            self.status_repo.status_journal_summary_pop()