avocado-framework/avocado

View on GitHub
avocado/core/dependencies/requirements/cache/backends/sqlite.py

Summary

Maintainability
B
4 hrs
Test Coverage
F
57%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2020
# Authors: Cleber Rosa <crosa@redhat.com>

"""
Test requirements module.
"""

import os
import sqlite3

from avocado.core.data_dir import get_datafile_path

#: The location of the requirements cache database
CACHE_DATABASE_PATH = get_datafile_path("cache", "requirements.sqlite")

sqlite3.register_adapter(bool, int)
sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v)))

#: The definition of the database schema
SCHEMA = [
    "CREATE TABLE IF NOT EXISTS requirement_type (requirement_type TEXT UNIQUE)",
    "CREATE TABLE IF NOT EXISTS environment_type (environment_type TEXT UNIQUE)",
    (
        "CREATE TABLE IF NOT EXISTS environment ("
        "environment_type TEXT,"
        "environment TEXT,"
        "FOREIGN KEY(environment_type) REFERENCES "
        "environment_type(environment_type)"
        ")"
    ),
    (
        "CREATE UNIQUE INDEX IF NOT EXISTS "
        "environment_idx ON environment (environment, environment_type)"
    ),
    (
        "CREATE TABLE IF NOT EXISTS requirement ("
        "environment_type TEXT,"
        "environment TEXT,"
        "requirement_type TEXT,"
        "requirement TEXT,"
        "saved BOOLEAN,"
        "FOREIGN KEY(environment_type) REFERENCES environment(environment_type),"
        "FOREIGN KEY(environment) REFERENCES environment(environment),"
        "FOREIGN KEY(requirement_type) REFERENCES requirement_type(requirement_type)"
        ")"
    ),
    (
        "CREATE UNIQUE INDEX IF NOT EXISTS requirement_idx ON requirement "
        "(environment_type, environment, requirement_type, requirement)"
    ),
]


def _create_requirement_cache_db():
    os.makedirs(os.path.dirname(CACHE_DATABASE_PATH), exist_ok=True)
    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        cursor = conn.cursor()
        for entry in SCHEMA:
            _ = cursor.execute(entry)
        conn.commit()


def set_requirement(
    environment_type, environment, requirement_type, requirement, saved=True
):
    if not os.path.exists(CACHE_DATABASE_PATH):
        _create_requirement_cache_db()

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        cursor = conn.cursor()
        sql = "INSERT OR IGNORE INTO environment_type VALUES (?)"
        cursor.execute(sql, (environment_type,))
        sql = "INSERT OR IGNORE INTO environment VALUES (?, ?)"
        cursor.execute(sql, (environment_type, environment))
        sql = "INSERT OR IGNORE INTO requirement_type VALUES (?)"
        cursor.execute(sql, (requirement_type,))
        sql = "INSERT OR IGNORE INTO requirement VALUES (?, ?, ?, ?, ?)"
        cursor.execute(
            sql, (environment_type, environment, requirement_type, requirement, saved)
        )
        conn.commit()


def is_requirement_in_cache(
    environment_type, environment, requirement_type, requirement
):
    """Checks if requirement is in cache.

    :rtype: True if requirement is in cache
            False if requirement is not in cache
            None if requirement is in cache but it is not saved yet.
    """
    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    sql = (
        "SELECT r.saved FROM requirement r WHERE ("
        "environment_type = ? AND "
        "environment = ? AND "
        "requirement_type = ? AND "
        "requirement = ?)"
    )

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        cursor = conn.cursor()
        result = cursor.execute(
            sql, (environment_type, environment, requirement_type, requirement)
        )
        row = result.fetchone()
        if row is not None:
            if row[0]:
                return True
            return None
    return False


def is_environment_prepared(environment):
    """Checks if environment has all requirements saved."""

    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    sql = (
        "SELECT COUNT(*) FROM requirement r JOIN "
        "environment e ON e.environment = r.environment "
        "WHERE (r.environment = ? AND "
        "r.saved = 0)"
    )

    with sqlite3.connect(
        CACHE_DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES
    ) as conn:
        cursor = conn.cursor()
        result = cursor.execute(sql, (environment,))

        row = result.fetchone()
        if row is not None:
            return row[0] == 0
    return False


def update_environment(environment_type, old_environment, new_environment):
    """Updates environment information for each requirement in one environment.

    It will remove the old environment and add the new one to the cache.

    :param environment_type: Type of fetched environment
    :type environment_type: str
    :param old_environment: Environment which should be updated
    :type environment: str
    :param new_environment: Environment, which will be a reimbursement for the
                            old one.
    :type environment: str
    """
    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        cursor = conn.cursor()
        sql = "INSERT OR IGNORE INTO environment VALUES (?, ?)"
        cursor.execute(sql, (environment_type, new_environment))

        sql = (
            "UPDATE requirement SET environment = ? WHERE ("
            "environment_type = ? AND "
            "environment = ? )"
        )

        cursor.execute(sql, (new_environment, environment_type, old_environment))

        sql = (
            "DELETE FROM environment WHERE ("
            "environment_type = ? AND "
            "environment = ? )"
        )

        cursor.execute(sql, (environment_type, old_environment))
        conn.commit()


def update_requirement_status(
    environment_type, environment, requirement_type, requirement, new_status
):
    """Updates status of selected requirement in cache.

    The status has two values, save=True or not_save=False.

    :param environment_type: Type of fetched environment
    :type environment_type: str
    :param environment: Environment where the requirement is
    :type environment: str
    :param requirement_type: Type of the requirement in environment
    :type requirement_type: str
    :param requirement: Name of requirement
    :type requirement: str
    :param new_status: Requirement status which will be updated
    :type new_status: bool
    """

    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    sql = (
        "UPDATE requirement SET saved = ? WHERE ("
        "environment_type = ? AND "
        "environment = ? AND "
        "requirement_type = ? AND "
        "requirement = ?)"
    )

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        cursor = conn.cursor()
        cursor.execute(
            sql,
            (new_status, environment_type, environment, requirement_type, requirement),
        )
        conn.commit()

    return True


def delete_environment(environment_type, environment):
    """Deletes environment with all its requirements from cache.

    :param environment_type: Type of environment
    :type environment_type: str
    :param environment: Environment which will be deleted
    :type environment: str
    """

    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        sql = (
            "DELETE FROM requirement WHERE ("
            "environment_type = ? AND "
            "environment = ? )"
        )
        cursor = conn.cursor()
        cursor.execute(sql, (environment_type, environment))
        sql = (
            "DELETE FROM environment WHERE ("
            "environment_type = ? AND "
            "environment = ? )"
        )
        cursor.execute(sql, (environment_type, environment))
        conn.commit()


def delete_requirement(environment_type, environment, requirement_type, requirement):
    """Deletes requirement from cache.

    :param environment_type: Type of environment
    :type environment_type: str
    :param environment: Environment where the requirement is.
    :type environment: str
    :param requirement_type: Type of the requirement in environment
    :type requirement_type: str
    :param requirement: Name of requirement which will be deleted
    :type requirement: str
    """

    if not os.path.exists(CACHE_DATABASE_PATH):
        return False

    with sqlite3.connect(CACHE_DATABASE_PATH) as conn:
        sql = (
            "DELETE FROM requirement WHERE ("
            "environment_type = ? AND "
            "environment = ? AND "
            "requirement_type = ? AND "
            "requirement = ?)"
        )
        cursor = conn.cursor()
        cursor.execute(
            sql, (environment_type, environment, requirement_type, requirement)
        )
        conn.commit()


def get_all_environments_with_requirement(
    environment_type, requirement_type, requirement
):
    """Fetches all environments with selected requirement from cache.

    :param environment_type: Type of fetched environment
    :type environment_type: str
    :param requirement_type: Type of the requirement in environment
    :type requirement_type: str
    :param requirement: Name of requirement
    :type requirement: str
    :return: Dict with all environments which has selected requirements.

    """
    requirements = {}
    if not os.path.exists(CACHE_DATABASE_PATH):
        return requirements

    environment_select = (
        "SELECT e.environment FROM requirement r JOIN "
        "environment e ON e.environment = r.environment "
        "WHERE (r.environment_type = ? AND "
        "r.requirement_type = ? AND "
        "r.requirement = ?)"
    )
    sql = (
        f"SELECT r.environment, r.requirement_type, r.requirement "
        f"FROM requirement AS r, ({environment_select}) AS e "
        f"WHERE r.environment = e.environment"
    )

    with sqlite3.connect(
        CACHE_DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES
    ) as conn:
        cursor = conn.cursor()
        result = cursor.execute(sql, (environment_type, requirement_type, requirement))

        for row in result.fetchall():
            if row[0] in requirements:
                requirements[row[0]].append((row[1], row[2]))
            else:
                requirements[row[0]] = [(row[1], row[2])]
    return requirements


def get_all_requirements():
    """Fetches all requirements from database.

    :return: Dict with all environments which has requirements.

    """
    requirements = {}
    if not os.path.exists(CACHE_DATABASE_PATH):
        return requirements

    sql = "SELECT * FROM requirement"

    with sqlite3.connect(
        CACHE_DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES
    ) as conn:
        cursor = conn.cursor()
        result = cursor.execute(sql)

        for row in result.fetchall():
            environment_type = row[0]
            if environment_type not in requirements:
                requirements[environment_type] = []
            requirements[environment_type].append(
                {
                    "environment": row[1],
                    "requirement_type": row[2],
                    "requirement": row[3],
                }
            )
    return requirements