Stibbons/dopplerr

View on GitHub
dopplerr/db.py

Summary

Maintainability
A
1 hr
Test Coverage
# coding: utf-8

# Standard Libraries
import datetime
from pathlib import Path

# Third Party Libraries
from peewee import BooleanField
from peewee import CharField
from peewee import DateTimeField
from peewee import DoesNotExist
from peewee import ForeignKeyField
from peewee import IntegerField
from peewee import Model
from peewee import PrimaryKeyField
from peewee import TextField
from peewee import Using
from playhouse.sqliteq import SqliteQueueDatabase

# Dopplerr
from dopplerr.singleton import singleton


# pylint: disable=invalid-name
class Events(Model):
    timestamp = DateTimeField(default=datetime.datetime.now)
    type = CharField()
    message = TextField()


class SeriesMedias(Model):
    id = PrimaryKeyField()
    tv_db_id = IntegerField(null=True)
    season_number = IntegerField(null=True)
    episode_number = IntegerField(null=True)
    added_timestamp = DateTimeField(default=datetime.datetime.now)
    series_title = CharField(null=True)
    episode_title = CharField(null=True)
    quality = CharField(null=True)
    video_languages = CharField(null=True)
    dirty = BooleanField(default=True)
    media_filename = TextField(null=False, index=True)


class SeriesSubtitles(Model):
    added_timestamp = DateTimeField(default=datetime.datetime.now)
    series_media = ForeignKeyField(SeriesMedias, related_name='subtitles')
    language = CharField()


# pylint: enable=invalid-name


@singleton
class DopplerrDb(object):

    def __init__(self):
        self.__sqlite_db_path = None
        self.__conn = None
        self.__database = None

    @property
    def database(self):
        if not self.__database:
            self.__database = SqliteQueueDatabase(
                self.__sqlite_db_path, pragmas=(('foreign_keys', 'on'), ))
        return self.__database

    @property
    def conn(self):
        return self.database.get_conn()

    def init(self, sqlite_db_path: Path, reset_db=False):
        self.__sqlite_db_path = sqlite_db_path.as_posix()
        if reset_db:
            sqlite_db_path.unlink()

    def create_tables(self):
        self.database.create_table(Events, safe=True)
        self.database.create_table(SeriesSubtitles, safe=True)
        self.database.create_table(SeriesMedias, safe=True)

    def insert_event(self, thetype: str, message: str):
        with Using(self.database, [Events], with_transaction=False):
            Events.create(type=thetype, message=message)

    def get_recent_events(self, limit: int):
        with Using(self.database, [Events], with_transaction=False):
            events = (Events.select().limit(limit).order_by(Events.timestamp.desc()).execute())
            return [{
                "timestamp": e.timestamp.strftime('%Y-%m-%dT%H:%M:%S'),
                "type": e.type,
                "message": e.message
            } for e in events]

    def update_series_media(self,
                            series_title,
                            tv_db_id,
                            season_number,
                            episode_number,
                            episode_title,
                            quality,
                            video_languages,
                            media_filename,
                            dirty=True):
        assert media_filename, "media_filename cannot be None"
        with Using(self.database, [SeriesMedias], with_transaction=False):
            media, _ = self._get_or_create(
                SeriesMedias,
                tv_db_id=tv_db_id,
                season_number=season_number,
                episode_number=episode_number,
                media_filename=media_filename)
            media.series_title = series_title
            media.episode_title = episode_title
            media.quality = quality
            media.video_languages = video_languages
            media.dirty = dirty
            media.media_filename = media_filename
            media.save()

    @staticmethod
    def _get_or_create(model, **kwargs):
        try:
            it = model.get(**kwargs)
            return it, False
        except DoesNotExist:
            it = model.create(**kwargs)
            return it, True

    def update_fetched_series_subtitles(self, series_episode_uid, subtitles_languages, dirty=True):
        with Using(self.database, [SeriesMedias, SeriesSubtitles], with_transaction=False):
            media = (SeriesMedias.select().where(
                SeriesMedias.tv_db_id == series_episode_uid.tv_db_id,
                SeriesMedias.season_number == series_episode_uid.season_number,
                SeriesMedias.episode_number == series_episode_uid.episode_number))
            for lang in subtitles_languages:
                self._get_or_create(
                    SeriesSubtitles,
                    series_media=media,
                    language=lang,
                )
            for m in media:
                m.dirty = dirty
                m.save()

    def get_last_fetched_series(self, limit: int):
        with Using(self.database, [SeriesMedias, SeriesSubtitles], with_transaction=False):
            events = (SeriesSubtitles.select().order_by(
                SeriesSubtitles.added_timestamp.desc()).limit(limit).execute())
            return [{
                "added_timestamp": e.added_timestamp.strftime('%Y-%m-%dT%H:%M:%S'),
                "series_title": e.series_media.series_title,
                "season_number": e.series_media.season_number,
                "episode_number": e.series_media.episode_number,
                "episode_title": e.series_media.episode_title,
                "quality": e.series_media.quality,
                "video_languages": e.series_media.video_languages,
                "subtitle_language": e.language,
            } for e in events]

    def get_medias_series(self):
        with Using(self.database, [SeriesMedias, SeriesSubtitles], with_transaction=False):
            limit = 100
            medias = (SeriesMedias.select().order_by(
                SeriesMedias.series_title.desc()).limit(limit).execute())
            return [{
                "added_timestamp": med.added_timestamp.strftime('%Y-%m-%dT%H:%M:%S'),
                "series_title": med.series_title,
                "season_number": med.season_number,
                "episode_number": med.episode_number,
                "episode_title": med.episode_title,
                "quality": med.quality,
                "video_languages": med.video_languages,
                "subtitle_languages": sorted(s.language for s in med.subtitles),
                "dirty": med.dirty,
            } for med in medias]

    def media_exists(self, media_filename):
        with Using(self.database, [SeriesMedias], with_transaction=False):
            medias = (SeriesMedias.select(SeriesMedias.media_filename)
                      .where(SeriesMedias.media_filename == media_filename)).execute()
            return bool(medias)