CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/database/db_base_sqlite.py

Summary

Maintainability
A
35 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2019 Stefano Gottardo - @CastagnaIT (original implementation module)
    Main functions for access to SQLite database

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import sqlite3 as sql
import threading
from functools import wraps

import resources.lib.common as common
import resources.lib.database.db_base as db_base
import resources.lib.database.db_create_sqlite as db_create_sqlite
import resources.lib.database.db_utils as db_utils
from resources.lib.common.exceptions import DBSQLiteConnectionError, DBSQLiteError
from resources.lib.globals import G
from resources.lib.utils.logging import LOG


CONN_ISOLATION_LEVEL = None  # Autocommit mode

# ---------------------------------------------------------------------------
# Pay attention with the SQLite syntax:
# SQLite is case sensitive
# Also wrong upper/lower case of columns and tables name cause errors
# LIKE comparator use ASCII, the unicode chars are not comparable
# ---------------------------------------------------------------------------


def handle_connection(func):
    """
    A decorator that handle the connection status with the database
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        if args[0].is_mysql_database:
            # If database is mysql pass to next decorator
            return func(*args, **kwargs)
        conn = None
        is_not_thread_safe = not G.IS_SQLITE3_THREADSAFE
        try:
            if not args[0].is_connected:
                if is_not_thread_safe:
                    args[0].mutex.acquire()
                args[0].conn = sql.connect(args[0].db_file_path,
                                           isolation_level=CONN_ISOLATION_LEVEL,
                                           check_same_thread = is_not_thread_safe)
                args[0].is_connected = True
                conn = args[0].conn
            return func(*args, **kwargs)
        except sql.Error as exc:
            LOG.error('SQLite error {}:', exc.args[0])
            raise DBSQLiteConnectionError from exc
        finally:
            if conn and is_not_thread_safe:
                args[0].is_connected = False
                conn.close()
                args[0].mutex.release()
    return wrapper


class SQLiteDatabase(db_base.BaseDatabase):
    def __init__(self, db_filename):  # pylint: disable=super-on-old-class
        self.mutex = threading.Lock()
        self.local_storage = threading.local()
        self.is_mysql_database = False
        self.db_filename = db_filename
        self.db_file_path = db_utils.get_local_db_path(db_filename)
        super().__init__()

    @property
    def is_connected(self):
        return getattr(self.local_storage, 'is_connected', False)

    @is_connected.setter
    def is_connected(self, val):
        self.local_storage.is_connected = val

    def _initialize_connection(self):
        try:
            LOG.debug('Trying connection to the SQLite database {}', self.db_filename)
            self.conn = sql.connect(self.db_file_path, check_same_thread=False)
            cur = self.conn.cursor()
            cur.execute(str('SELECT SQLITE_VERSION()'))
            LOG.debug('Database connection {} was successful (SQLite ver. {} {})',
                      self.db_filename, cur.fetchone()[0],
                      'thread safe' if G.IS_SQLITE3_THREADSAFE else 'not thread safe')
            cur.row_factory = lambda cursor, row: row[0]
            cur.execute(str('SELECT name FROM sqlite_master WHERE type=\'table\' '
                            'AND name NOT LIKE \'sqlite_%\''))
            list_tables = cur.fetchall()
            if not list_tables:
                # If no tables exist create a new one
                self.conn.close()
                db_create_sqlite.create_database(self.db_file_path, self.db_filename)
        except sql.Error as exc:
            LOG.error('SQLite error {}:', exc.args[0])
            raise DBSQLiteConnectionError from exc
        finally:
            if self.conn:
                self.conn.close()

    def _executemany_non_query(self, query, params, cursor=None):
        try:
            if cursor is None:
                cursor = self.get_cursor()
            cursor.executemany(query, params)
        except sql.Error as exc:
            LOG.error('SQLite error {}:', exc.args[0])
            raise DBSQLiteError from exc
        except ValueError:
            LOG.error('Value {}', str(params))
            LOG.error('Value type {}', type(params))
            raise

    def _execute_non_query(self, query, params=None, cursor=None, **kwargs):
        try:
            if cursor is None:
                cursor = self.get_cursor()
            if params is not None:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
        except sql.Error as exc:
            LOG.error('SQLite error {}:', exc.args[0])
            raise DBSQLiteError from exc
        except ValueError:
            LOG.error('Value {}', str(params))
            LOG.error('Value type {}', type(params))
            raise

    def _execute_query(self, query, params=None, cursor=None):
        try:
            if cursor is None:
                cursor = self.get_cursor()
            if params is not None:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            return cursor
        except sql.Error as exc:
            LOG.error('SQLite error {}:', exc.args[0])
            raise DBSQLiteError from exc
        except ValueError:
            LOG.error('Value {}', str(params))
            LOG.error('Value type {}', type(params))
            raise

    def get_cursor(self):
        return self.conn.cursor()

    def get_cursor_for_dict_results(self):
        conn_cursor = self.conn.cursor()
        conn_cursor.row_factory = lambda c, r: dict(list(zip([col[0] for col in c.description], r)))
        return conn_cursor

    def get_cursor_for_list_results(self):
        conn_cursor = self.conn.cursor()
        conn_cursor.row_factory = lambda cursor, row: row[0]
        return conn_cursor

    def return_rows_as_list(self, conn_cursor):
        # See note in the MySqlDatabase class on same method
        return conn_cursor.fetchall()

    @handle_connection
    def get_value(self, key, default_value=None, table=db_utils.TABLE_APP_CONF, data_type=None):
        """
        Get a single value from database
        :param key: The key to get the value
        :param default_value: When key do not exist return this default value
        :param table: Table map
        :param data_type: OPTIONAL Used to set data type conversion only when default_value is None
        :return: The value, with data type of default_value or if none, of data_type specified
        """
        table_name = table[0]
        table_columns = table[1]
        query = f'SELECT {table_columns[1]} FROM {table_name} WHERE {table_columns[0]} = ?'
        cur = self._execute_query(query, (key,))
        result = cur.fetchone()
        if default_value is not None:
            data_type = type(default_value)
        elif data_type is None:
            data_type = str
        return common.convert_from_string(result[0], data_type) \
            if result is not None else default_value

    @handle_connection
    def get_values(self, key, default_value=None, table=db_utils.TABLE_APP_CONF):
        """
        Get multiple values from database - WARNING return row objects
        :param key: The key to get the values
        :param default_value: When key do not exist return this default value
        :param table: Table map
        :return: rows
        """
        table_name = table[0]
        table_columns = table[1]
        query = f'SELECT {table_columns[1]} FROM {table_name} WHERE {table_columns[0]} = ?'
        cur = self._execute_query(query, (key,))
        result = cur.fetchall()
        return result if result is not None else default_value

    @handle_connection
    def set_value(self, key, value, table=db_utils.TABLE_APP_CONF):
        """
        Store a single value to database
        :param key: The key to store the value
        :param value: Value to save
        :param table: Table map
        """
        table_name = table[0]
        table_columns = table[1]
        value = common.convert_to_string(value)
        # Update or insert approach, if there is no updated row then insert new one (no id changes)
        if common.CmpVersion(sql.sqlite_version) < '3.24.0':
            query = f'INSERT OR REPLACE INTO {table_name} ({table_columns[0]}, {table_columns[1]}) VALUES (?, ?)'
            self._execute_non_query(query, (key, value))
        else:
            # sqlite UPSERT clause exists only on sqlite >= 3.24.0
            query = (f'INSERT INTO {table_name} ({table_columns[0]}, {table_columns[1]}) VALUES (?, ?) '
                     f'ON CONFLICT({table_columns[0]}) DO UPDATE SET {table_columns[1]} = ? '
                     f'WHERE {table_columns[0]} = ?')
            self._execute_non_query(query, (key, value, value, key))

    @handle_connection
    def set_values(self, dict_values, table=db_utils.TABLE_APP_CONF):
        """
        Store multiple values to database
        :param dict_values: The key/value to store
        :param table: Table map
        """
        table_name = table[0]
        table_columns = table[1]
        # Doing many sqlite operations at the same makes the performance much worse (especially on Kodi 18)
        # The use of 'executemany' and 'transaction' can improve performance up to about 75% !!
        if common.CmpVersion(sql.sqlite_version) < '3.24.0':
            query = f'INSERT OR REPLACE INTO {table_name} ({table_columns[0]}, {table_columns[1]}) VALUES (?, ?)'
            records_values = [(key, common.convert_to_string(value)) for key, value in dict_values.items()]
        else:
            # sqlite UPSERT clause exists only on sqlite >= 3.24.0
            query = (f'INSERT INTO {table_name} ({table_columns[0]}, {table_columns[1]}) VALUES (?, ?) '
                     f'ON CONFLICT({table_columns[0]}) DO UPDATE SET {table_columns[1]} = ? '
                     f'WHERE {table_columns[0]} = ?')
            records_values = []
            for key, value in dict_values.items():
                value_str = common.convert_to_string(value)
                records_values.append((key, value_str, value_str, key))
        cur = self.get_cursor()
        cur.execute("BEGIN TRANSACTION;")
        self._executemany_non_query(query, records_values, cur)
        cur.execute("COMMIT;")

    @handle_connection
    def delete_key(self, key, table=db_utils.TABLE_APP_CONF):
        """
        Delete a key record from database
        :param key: The key to delete
        :param table: Table map
        :return: Number of deleted entries
        """
        table_name = table[0]
        table_columns = table[1]
        query = f'DELETE FROM {table_name} WHERE {table_columns[0]} = ?'
        cur = self._execute_query(query, (key,))
        return cur.rowcount

    def __del__(self):
        if self.conn:
            self.conn.close()