Godley/Music-Library

View on GitHub
implementation/primaries/ExtractMetadata/classes/DataLayer/querylayer.py

Summary

Maintainability
C
1 day
Test Coverage
from sqlalchemy import create_engine, update
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.expression import exists, alias, select, or_

from sqlalchemy import Table, Column, Integer, String, MetaData, \
    ForeignKey, Boolean
from .exceptions import BadTableException


def col_or_none(data, col):
    if len(data) > 0:
        return data[0][col]


class QueryLayer(object):
    tables = {}
    join_tables = {"keys": "keys_ins_piece",
                   "clefs": "clefs_ins_piece",
                   "instruments": "clefs_ins_piece",
                   "tempos": "tempos_piece",
                   "time_signatures": "time_signatures_piece",
                   "playlists": "playlist_join"}
    fixtures = {
        "clefs": [{"name": "treble", "sign": "G", "line": 2},
                  {"name": "french", "sign": "G", "line": 1},
                  {"name": "varbaritone", "sign": "F", "line": 1},
                  {"name": "subbass", "sign": "F", "line": 5},
                  {"name": "bass", "sign": "F", "line": 4},
                  {"name": "alto", "sign": "C", "line": 3},
                  {"name": "percussion", "sign": "percussion", "line": -1},
                  {"name": "tenor", "sign": "C", "line": 4},
                  {"name": "baritone", "sign": "C", "line": 5},
                  {"name": "mezzosoprano", "sign": "C", "line": 2},
                  {"name": "soprano", "sign": "C", "line": 1},
                  {"name": "varC", "sign": "VARC", "line": -1},
                  {"name": "alto varC", "sign": "VARC", "line": 3},
                  {"name": "tenor varC", "sign": "VARC", "line": 4},
                  {"name": "baritone varC", "sign": "VARC", "line": 5}],

        "keys": [{"name": "E double flat major", "fifths": -10, "mode": "major"},
                 {"name": "B double flat major", "fifths": -9, "mode": "major"},
                 {"name": "F flat major", "fifths": -8, "mode": "major"},
                 {"name": "C flat major", "fifths": -7, "mode": "major"},
                 {"name": "G flat major", "fifths": -6, "mode": "major"},
                 {"name": "D flat major", "fifths": -5, "mode": "major"},
                 {"name": "A flat major", "fifths": -4, "mode": "major"},
                 {"name": "E flat major", "fifths": -3, "mode": "major"},
                 {"name": "B flat major", "fifths": -2, "mode": "major"},
                 {"name": "F major", "fifths": -1, "mode": "major"},
                 {"name": "C major", "fifths": 0, "mode": "major"},
                 {"name": "G major", "fifths": 1, "mode": "major"},
                 {"name": "D major", "fifths": 2, "mode": "major"},
                 {"name": "A major", "fifths": 3, "mode": "major"},
                 {"name": "E major", "fifths": 4, "mode": "major"},
                 {"name": "B major", "fifths": 5, "mode": "major"},
                 {"name": "F# major", "fifths": 6, "mode": "major"},
                 {"name": "C# major", "fifths": 7, "mode": "major"},
                 {"name": "G# major", "fifths": 8, "mode": "major"},
                 {"name": "D# major", "fifths": 9, "mode": "major"},
                 {"name": "A# major", "fifths": 10, "mode": "major"},

                 {"name": "C flat minor", "fifths": -10, "mode": "minor"},
                 {"name": "G flat minor", "fifths": -9, "mode": "minor"},
                 {"name": "D flat minor", "fifths": -8, "mode": "minor"},
                 {"name": "A flat minor", "fifths": -7, "mode": "minor"},
                 {"name": "E flat minor", "fifths": -6, "mode": "minor"},
                 {"name": "B flat minor", "fifths": -5, "mode": "minor"},
                 {"name": "F minor", "fifths": -4, "mode": "minor"},
                 {"name": "C minor", "fifths": -3, "mode": "minor"},
                 {"name": "G minor", "fifths": -2, "mode": "minor"},
                 {"name": "D minor", "fifths": -1, "mode": "minor"},
                 {"name": "A minor", "fifths": 0, "mode": "minor"},
                 {"name": "E minor", "fifths": 1, "mode": "minor"},
                 {"name": "B minor", "fifths": 2, "mode": "minor"},
                 {"name": "F# minor", "fifths": 3, "mode": "minor"},
                 {"name": "C# minor", "fifths": 4, "mode": "minor"},
                 {"name": "G# minor", "fifths": 5, "mode": "minor"},
                 {"name": "D# minor", "fifths": 6, "mode": "minor"},
                 {"name": "A# minor", "fifths": 7, "mode": "minor"},
                 {"name": "E# minor", "fifths": 8, "mode": "minor"},
                 {"name": "B# major", "fifths": 9, "mode": "minor"},
                 {"name": "F double# major", "fifths": 10, "mode": "minor"}]
    }

    def __init__(self, db_path):
        self.engine = create_engine(db_path, echo=True)

    def get_session(self):
        Session = sessionmaker(bind=self.engine)
        return Session()

    def init_tables(self):
        metadata = MetaData()
        self.tables["creators"] = Table(
            'creators', metadata, Column(
                'id', Integer, primary_key=True), Column(
                'name', String))

        self.tables["instruments"] = Table(
            'instruments', metadata, Column(
                'id', Integer, primary_key=True), Column(
                'name', String), Column(
                'chromatic', Integer), Column(
                    'diatonic', Integer))

        self.tables["keys"] = Table('keys', metadata,
                                    Column('id', Integer, primary_key=True),
                                    Column('name', String, unique=True),
                                    Column('mode', String),
                                    Column('fifths', Integer))

        self.tables["clefs"] = Table('clefs', metadata,
                                     Column('id', Integer, primary_key=True),
                                     Column('name', String, unique=True),
                                     Column('sign', String),
                                     Column('line', Integer),
                                     Column('clef-octave-change', Integer))

        self.tables["tempos"] = Table('tempos', metadata,
                                      Column('id', Integer, primary_key=True),
                                      Column('beat', String),
                                      Column('minute', Integer),
                                      Column('beat_2', String))

        self.tables["time_signatures"] = Table(
            'time_signatures', metadata, Column(
                'id', Integer, primary_key=True), Column(
                'beat', Integer), Column(
                'beat_type', Integer))

        self.tables["pieces"] = Table(
            'pieces', metadata,
            Column('id', Integer, primary_key=True),
            Column('name', String),
            Column('filename', String, unique=True),
            Column('archived', Boolean),
            Column('composer.id', None, ForeignKey('creators.id')),
            Column('lyricist.id', None, ForeignKey('creators.id')),
            Column('source', String),
            Column('secret', String),
            Column('license', String))

        self.tables["playlists"] = Table(
            'playlists', metadata, Column(
                'id', Integer, primary_key=True), Column(
                'name', String))
        self.tables["playlist_join"] = Table(
            'playlist_join', metadata, Column(
                'playlist.id', Integer, ForeignKey('playlists.id')), Column(
                'piece.id', Integer, ForeignKey('pieces.id')))

        self.tables["keys_ins_piece"] = Table(
            'key_ins_piece_join', metadata, Column(
                'piece.id', Integer, ForeignKey('pieces.id')), Column(
                'keys.id', Integer, ForeignKey('keys.id')), Column(
                'instruments.id', Integer, ForeignKey('instruments.id')))

        self.tables["clefs_ins_piece"] = Table(
            'clef_ins_piece_join', metadata, Column(
                'piece.id', Integer, ForeignKey('pieces.id')), Column(
                'clefs.id', Integer, ForeignKey('clefs.id')), Column(
                'instruments.id', Integer, ForeignKey('instruments.id')))

        self.tables["tempos_piece"] = Table(
            'tempo_piece_join', metadata, Column(
                'piece.id', Integer, ForeignKey('pieces.id')), Column(
                'tempos.id', Integer, ForeignKey('tempos.id')))

        self.tables["time_signatures_piece"] = Table(
            'time_piece_join', metadata, Column(
                'piece.id', Integer, ForeignKey('pieces.id')),
            Column('time_signatures.id', Integer,
                   ForeignKey('time_signatures.id')))

        metadata.create_all(self.engine)

    def setup(self):
        self.init_tables()
        self.add_fixtures()

    def validate_table(self, table):
        return table in self.tables

    def get_or_add(self, data, table="instruments"):
        elem = self.query(data, table=table)
        if elem is None or len(elem) == 0:
            self.add(data, table=table)
            elem = self.query(data, table=table)
        return elem

    def add_and_link(self, data, piece_id, table="tempos"):
        elem = self.get_or_add(data, table=table)[0]
        self.add({table + ".id": elem['id'],
                  "piece.id": piece_id},
                 table=table + "_piece")

    def add_multiple(self, data_list, table="pieces"):
        ids = []
        for elem in data_list:
            ids.append(self.add(elem, table=table))

    def add(self, data_dict, table="pieces"):
        if self.validate_table(table):
            query = self.tables[table].insert().values(**data_dict)
            return self.execute(query).inserted_primary_key
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def like_query(self, data, columns, query):
        if data is not None:
            for key in data:
                col = getattr(columns, key)
                query = query.filter(col.like(data[key]))
        return query

    def to_dict(self, table, query):
        if self.validate_table(table):
            _table = self.tables[table]
            columns = [col.name for col in _table.columns]
            return [{key: value for key, value in zip(columns, entry)}
                    for entry in query]
        else:
            raise BadTableException(
                "Table {} not in {}".format(
                    table, self.tables.keys()))

    def mk_or_expr(self, elems, column):
        expr = column == elems[0]
        for e in elems:
            expr = or_(expr, (column == e))
        return expr

    def query_multiple(
            self,
            data,
            filter_col="piece.id",
            table="clefs_ins_piece"):
        if self.validate_table(table):
            _table = self.tables[table]
            _filter_col = getattr(_table.columns, filter_col)
            q = select([_filter_col])
            for elem in data:
                query = _table.select()
                nxtalias = alias(_table)
                for key in elem:
                    col = getattr(nxtalias.columns, key)
                    expr = self.mk_or_expr(elem[key], col)
                    query = query.where(expr)

                alias_filter = getattr(nxtalias.columns, filter_col)
                query = query.where(alias_filter == _filter_col)
                q = q.where(exists(query))

            result_prox = self.execute(q)
            return set([elem[0] for elem in result_prox])
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def get_ids_for_like(self, data, table="pieces"):
        result = self.query(likedata=data, table=table)
        ids = [elem['id'] for elem in result]
        return ids

    def get_row_id(self, data, table="pieces"):
        result = self.query(data, table=table)
        if len(result) > 0:
            row = result[0]
            row = row['id']
        else:
            row = None
        return row

    def query_similar_rows(
            self,
            data,
            match_cols=[],
            excl_cols=[],
            table='pieces'):
        '''SELECT i2.ROWID, i2.name FROM instruments i1, instruments i2
                              WHERE i1.name = ? AND i2.diatonic = i1.diatonic
                              AND i2.chromatic = i1.chromatic
                              AND i2.name != i1.name'''
        if self.validate_table(table):
            _table = self.tables[table]
            query = _table.select()
            tbl_alias = alias(_table)
            for key in data:
                col = getattr(_table.columns, key)
                col2 = getattr(tbl_alias.columns, key)
                expr2 = col != col2
                expr = col2 == data[key]
                query = query.where((expr) & (expr2))
            for col in match_cols:
                column = getattr(_table.columns, col)
                col2 = getattr(tbl_alias.columns, col)
                expr = column == col2
                query = query.where(expr)
            for exc in excl_cols:
                column = getattr(_table.columns, exc)
                col2 = getattr(tbl_alias.columns, exc)
                expr = column != col2
                query = query.where(expr)
            res = list(self.execute(query))
            res = self.to_dict(table, res)
            return res
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def query(self, data=None, notdata=None, likedata=None, table="pieces"):
        if self.validate_table(table):
            columns = self.tables[table].columns
            query = self.mk_query(data, table=table)
            query = self.not_query(notdata, columns, query)
            query = self.like_query(likedata, columns, query)
            res = query.all()
            return self.to_dict(table, res)
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def mk_query(self, data, table="pieces"):
        if self.validate_table(table):
            session = self.get_session()
            query = session.query(self.tables[table])
            if data is not None:
                query = query.filter_by(**data)
            return query
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def get_all(self, table="pieces"):
        if self.validate_table(table):
            query = self.tables[table].select()
            return self.execute(query)
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def update(self, id, data, table="pieces"):
        if self.validate_table(table):
            query = update(
                self.tables[table]).where(
                self.tables[table].c.id == id).values(
                **data)
            self.execute(query)
        else:
            raise BadTableException(
                "table {} not in {}".format(
                    table, self.tables.keys()))

    def execute(self, query):
        conn = self.engine.connect()
        return conn.execute(query)

    def add_fixtures(self):
        for table in self.fixtures:
            for elem in self.fixtures[table]:
                self.get_or_add(elem, table=table)

    def order_by(self, data, store_val=None, column="piece.id"):
        result = {}
        for elem in data:
            if elem[column] not in result:
                result[elem[column]] = []
            if store_val is None:
                result[elem[column]].append(elem)
            else:
                result[elem[column]].append(elem[store_val])
        return result

    def get_join(self, table):
        return self.join_tables[table]

    def remove(self, id, table="pieces", column='id'):
        col = getattr(self.tables[table].columns, column)
        query = self.tables[table].delete().where(
            col == id)
        self.execute(query)

    def not_query(self, data, columns, query):
        if data is not None:
            for key in data:
                col = getattr(columns, key)
                query = query.filter(col != data[key])
        return query