fossasia/open-event-orga-server

View on GitHub
app/api/helpers/db.py

Summary

Maintainability
A
1 hr
Test Coverage
import binascii
import logging
import os
import uuid

from flask import request
from flask_rest_jsonapi.exceptions import ObjectNotFound
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound

from app.models import db

# ONLY INCLUDE THOSE DB HELPERS WHICH ARE NOT SPECIFIC TO ANY MODEL


def save_to_db(item, msg="Saved to db", print_error=True):
    """Convenience function to wrap a proper DB save
    :param print_error:
    :param item: will be saved to database
    :param msg: Message to log
    """
    try:
        logging.info(msg)
        db.session.add(item)
        logging.info('added to session')
        db.session.commit()
        return True
    except Exception:
        logging.exception('DB Exception!')
        db.session.rollback()
        return False


def safe_query_by_id(model, id):
    return safe_query_by(model, id)


def safe_query_by(model, value, param='id'):
    return safe_query_without_soft_deleted_entries(model, param, value, param)


def safe_query_kwargs(model, kwargs, parameter_name, column_name='id'):
    """
    :param model: db Model to be queried
    :param kwargs: it contains parameter_name's value eg kwargs['event_id']
                where parameter_name='event_id'
    :param parameter_name: Name of parameter to be printed in json-api error
                message eg 'event_id'
    :param column_name: Name of column default is 'id'.
    :return:
    """
    return safe_query(
        model,
        column_name,
        kwargs[parameter_name],
        parameter_name,
    )


def safe_query_without_soft_deleted_entries(
    model, column_name, value, parameter_name, filter_deleted=True
):
    """
    Wrapper query to properly raise exception after filtering the soft deleted entries
    :param model: db Model to be queried
    :param column_name: name of the column to be queried for the given value
    :param value: value to be queried against the given column name, e.g view_kwargs['event_id']
    :param parameter_name: Name of parameter to be printed in json-api error message eg 'event_id'
    :param filter_deleted: Deleted records are filtered if set to true
    :return:
    """
    try:
        record = model.query.filter(getattr(model, column_name) == value)
        if filter_deleted and hasattr(model, 'deleted_at'):
            record = record.filter_by(deleted_at=None)
        record = record.one()
    except NoResultFound:
        raise ObjectNotFound(
            {'parameter': f'{parameter_name}'},
            f"{model.__name__}: {value} not found",
        )
    else:
        return record


def safe_query(model, column_name, value, parameter_name):
    """
    Wrapper query to properly raise exception
    :param model: db Model to be queried
    :param column_name: name of the column to be queried for the given value
    :param value: value to be queried against the given column name, e.g view_kwargs['event_id']
    :param parameter_name: Name of parameter to be printed in json-api error message eg 'event_id'
    :return:
    """
    return safe_query_without_soft_deleted_entries(
        model,
        column_name,
        value,
        parameter_name,
        # TODO(Areeb): Check that only admin can pass this parameter
        request.args.get('get_trashed') != 'true',
    )


def get_or_create(model, defaults=None, **kwargs):
    """
    This function queries a record in the model, if not found it will create one.
    :param model: db Model to be queried
    :param **kwargs: Arguments to the filter_by method of sqlalchemy.orm.query.Query.filter_by to be filtered by
    """
    fetch = lambda: db.session.query(model).filter_by(**kwargs).first()
    instance = fetch()
    if instance:
        return instance, False
    kwargs.update(defaults or {})
    instance = model(**kwargs)
    try:
        db.session.add(instance)
        db.session.commit()
        return instance, True
    except IntegrityError:
        db.session.rollback()
        instance = fetch()
        if not instance:
            raise
        return instance, False


def get_count(query):
    """
    Counts how many records are there in a database table/model
    :param query: <sqlalchemy.orm.query.Query> a SQLAlchemy query object
    :return: Number
    """
    count_q = query.statement.with_only_columns([func.count()]).order_by(None)
    count = query.session.execute(count_q).scalar()
    return count


def get_new_slug(model, name):
    """
    Helper function to create a new slug if required, else return orignal.
    :param model: Specify model from db.
    :param name: Identifier to generate slug.
    """
    slug = (
        name.lower()
        .replace("& ", "")
        .replace(",", "")
        .replace("/", "-")
        .replace(" ", "-")
    )
    count = get_count(model.query.filter_by(slug=slug))
    if count == 0:
        return slug
    return f'{slug}-{uuid.uuid4().hex}'


def get_new_identifier(model=None, length=None):
    if not length:
        identifier = str(uuid.uuid4())
    else:
        identifier = str(binascii.b2a_hex(os.urandom(int(length / 2))), 'utf-8')
    count = (
        0 if model is None else get_count(model.query.filter_by(identifier=identifier))
    )
    if not identifier.isdigit() and count == 0:
        return identifier
    return get_new_identifier(model)


def save_bulk_to_db(items, msg="Saved to db"):
    """Convenience function to wrap a proper DB save
    :param item: will be saved to database
    :param msg: Message to log
    """
    try:
        logging.info(msg)
        db.session.bulk_save_objects(items)
        logging.info('added to session')
        db.session.commit()
        return True
    except SQLAlchemyError:
        logging.exception('DB Exception!')
        db.session.rollback()
        return False