oplik0/cherrydoor

View on GitHub
cherrydoor/database/mongo.py

Summary

Maintainability
C
1 day
Test Coverage
"""Functions to simplify interacting with database."""
import datetime as dt
from math import ceil

from bson.objectid import ObjectId
from bson import SON
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import WriteConcern, IndexModel, ASCENDING, ReturnDocument


def init_db(config, loop):
    """Initiate the database connection.

    Parameters
    ----------
    config : AttrDict
        The app configuration
    loop : asyncio.AbstractEventLoop
        The event loop to use for database connection
    Returns
    -------
    db : motor.motor_asyncio.AsyncIOMotorClient
        The database connection object
    """
    db = AsyncIOMotorClient(
        f"mongodb://{config.get('mongo', {}).get('url', 'localhost:27017')}/{config.get('mongo', {}).get('name', 'cherrydoor')}",
        username=config.get("mongo", {}).get("username", None),
        password=config.get("mongo", {}).get("password", None),
        io_loop=loop,
    )[config.get("mongo", {}).get("name", "cherrydoor")]
    return db


async def setup_db(app):
    """Set up database indexes and collections.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    """
    user_indexes = [
        IndexModel([("username", ASCENDING)], name="username_index", unique=True),
        IndexModel([("cards", ASCENDING)], name="cards_index", sparse=True),
        IndexModel([("tokens.token", ASCENDING)], name="token_index", sparse=True),
    ]
    command_log_options = await app["db"].terminal.options()
    if (
        "capped" not in command_log_options
        or not command_log_options["capped"]
        or "timeseries" not in command_log_options
        or command_log_options["timeseries"]["timeField"] != "timestamp"
    ):
        await app["db"].drop_collection("terminal")
        await app["db"].create_collection(
            "terminal",
            size=app["config"].get("command_log_size", 100000000),
            capped=True,
            max=10000,
#             timeseries={
#                 "timeField": "timestamp",
#                 "metaField": "command",
#                 "granularity": "seconds",
#             },
#             expireAfterSeconds=app["config"].get(
#                 "command_log_expire_after", 604800
#             ),
        )
    await app["db"].users.create_indexes(user_indexes)


async def close_db(app):
    """Close the database connection.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    """
    app["db"].close()


async def list_permissions(app, username):
    """List the permissions for a user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    username : str
        The username of the user
    Returns
    -------
    permissions : list
        The list of permissions user has
    """
    user = await app["db"].users.find_one(
        {"username": username}, {"permissions": 1, "_id": 0}
    )
    if user is not None:
        return user.get("permissions", [])
    return []


async def user_exists(app, username):
    """Check if a user exists by username.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    username : str
        The username of the user
    Returns
    -------
    exists : bool
        True if the user exists, False otherwise
    """
    count = app["db"].users.count_documents({"username": username})
    return count > 0


async def create_user(
    app, username, hashed_password=None, permissions=None, cards=None
):
    """Create a single new user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    username : str
        The username for the new user
    hashed_password : str
        The argon2id hashed password for the new user
    permissions : list
        The list of permissions the new user will have
    cards : list
        The list of cards assigned to the new user
    Returns
    -------
    uid : str
        The uid of the new user
    """
    permissions = permissions if permissions else []
    cards = cards if cards else []
    result = (
        await app["db"]
        .users.with_options(write_concern=WriteConcern(w="majority"))
        .insert_one(
            {
                "username": username,
                "password": hashed_password,
                "permissions": permissions,
                "cards": cards,
            }
        )
    )
    return str(result.inserted_id)


async def change_user_password(app, identity, new_password_hash):
    """Change user's password.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    identity : str
        The uid of the user whose password is to be changed
    new_password_hash : str
        The new argon2id hashed password
    """
    await app["db"].users.update_one(
        {"_id": ObjectId(identity)}, {"$set": {"password": new_password_hash}}
    )


async def add_token_to_user(app, identity, token_name, token):
    """Assign an API token to a user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    identity : str
        The uid of the user to whom the token is to be assigned
    token_name : str
        The frontend name of the token to be assigned
    token : str
        The branca token to be assigned
    """
    await app["db"].users.update_one(
        {"_id": ObjectId(identity)},
        {"$addToSet": {"tokens": {"name": token_name, "token": token}}},
    )


async def find_user_by_uid(app, identity, fields=["username"]):
    """Find a user by their uid.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    identity : str
        The uid to search for
    fields : list, default=["username"]
        The fields to be returned in the user document
    Returns
    -------
    user : dict
        The user document
    """
    return await find_user_by(app, "_id", ObjectId(identity), fields)


async def find_user_by_username(app, username, fields=["_id"]):
    """Find a user by their username.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    username : str
        The username to search for
    fields : list, default=["_id"]
        The fields to be returned in the user document
    Returns
    -------
    user : dict
        The user document
    """
    return await find_user_by(app, "username", username, fields)


async def find_user_by_token(app, token, fields=["username"]):
    """Find a user by an API token assigned to them.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    token : str
        The API token to search for
    fields : list, default=["username"]
        The fields to be returned in the user document
    Returns
    -------
    user : dict
        The user document
    """
    return await find_user_by(app, "tokens.token", token, fields)


async def find_user_by_cards(app, cards, fields=["username"]):
    """Find a user by a list of cards assigned to them.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    cards : list
        The list of cards to search for
    fields : list, default=["username"]
        The fields to be returned in the user document
    Returns
    -------
    user : dict
        The user document
    """
    if not isinstance(cards, list):
        cards = [cards]
    projection = {}
    for field in fields:
        projection[field] = 1
    if "_id" not in fields:
        projection["_id"] = 0
    return await app["db"].users.find_one({"cards": cards}, projection)


async def find_user_by(app, search_fields, values, return_fields):
    """Find a user by arbritary search fields.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    search_fields : list
        The fields to search for
    values : list
        The values to search for
    return_fields : list
        The fields to return in the user document
    Returns
    -------
    user : dict
        The user document
    """
    if not isinstance(search_fields, list):
        search_fields = [search_fields]
    if not isinstance(values, list):
        values = [values]
    if "api_key" in search_fields:
        search_fields[search_fields.index("api_key")] = "tokens.token"
    projection = {}
    for field in return_fields:
        projection[field] = 1
    if "_id" not in return_fields:
        projection["_id"] = 0
    user = await app["db"].users.find_one(
        SON(dict(zip(search_fields, values))), projection=projection
    )
    return user


#
async def get_grouped_logs(app, datetime_from, datetime_to, granularity):
    """Get logs between two datetimes.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    datetime_from : datetime.datetime
        The start datetime
    datetime_to : datetime.datetime
        The end datetime
    granularity : str
        The granularity of the logs to be returned
    Returns
    -------
    logs : list
        The list of logs between the two datetimes with a given granularity
    """
    if not isinstance(granularity, dt.timedelta):
        granularity = dt.timedelta(seconds=granularity)
    boundries = [
        datetime_from + granularity * i
        for i in range(ceil((datetime_to - datetime_from) / granularity))
    ]
    boundries[-1] = datetime_to + dt.timedelta(seconds=1)
    pipeline = [
        {"$match": {"timestamp": {"$gte": datetime_from, "$lte": datetime_to}}},
        {
            "$project": {
                "timestamp": 1,
                "successful": {"$toInt": "$success"},
                "during_break": {
                    "$toInt": {"$eq": ["$auth_mode", "Manufacturer code"]}
                },
            }
        },
        {
            "$bucket": {
                "groupBy": "$timestamp",
                "boundaries": boundries,
                "default": "no_match",
                "output": {
                    "count": {"$sum": 1},
                    "successful": {"$sum": "$successful"},
                    "during_break": {"$sum": "$during_break"},
                },
            }
        },
    ]
    logs = []
    async for doc in app["db"].logs.aggregate(pipeline):
        logs.append(
            {
                "date_from": doc["_id"].isoformat(),
                "date_to": (doc["_id"] + granularity).isoformat(),
                "count": doc["count"],
                "successful": doc["successful"],
                "during_break": doc["during_break"],
            }
        )

    return logs


async def modify_user(app, uid=None, current_username=None, **kwargs):
    """Change user's properties.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    uid : str, default=None
        The uid of the user to be modified
    current_username : str, default=None
        The current username of the user
    **kwargs : dict
        The properties to be changed.
        username will override the current username
        cards will override the current card list
        permissions will override the current permissions
        card will add a card to the user's card list
        permission will add a permission to the user's permissions
    Returns
    -------
    user : dict
        The user document
    """
    overwrite_keys = ["username", "cards", "permissions"]
    append_keys = ["card", "permission"]
    pipeline = []
    if bool(set(overwrite_keys) & set(kwargs.keys())):
        pipeline.append(
            {
                "$set": {
                    key: value
                    for key, value in kwargs.items()
                    if key in overwrite_keys and len(value) > 0
                }
            }
        )
    if bool(set(append_keys) & set(kwargs.keys())):
        pipeline.append(
            {
                "$set": {
                    key: {"$concatArrays": [f"${key}", [value]]}
                    for key, value in kwargs.items()
                    if key in append_keys and len(value) > 0
                }
            }
        )
    if len(pipeline) <= 0:
        return None
    user = await app["db"].users.find_one_and_update(
        {
            "_id"
            if uid is not None
            else "username": uid
            if uid is not None
            else current_username
        },
        pipeline,
        projection={"_id": 0, "username": 1, "permissions": 1, "cards": 1},
        return_document=ReturnDocument.AFTER,
    )
    return user


# async def add_api_open_to_logs(app, )


async def user_exists(app, **kwargs):
    """Check if a user exists by arbitrary keys.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    **kwargs : dict
        The properties and values to be seached for
    Returns
    -------
    exists : bool
        True if the user exists, False otherwise
    """
    return app["db"].users.count_documents(kwargs) > 0


async def delete_user(app, uid=None, username=None):
    """Delete a specified user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    uid : str, default=None
        The uid of the user to be deleted. If None will use username
    username : str, default=None
        The username of the user to be deleted if no uid is specified.
        If also None won't delete anything.
    Returns
    -------
    deleted : bool
        True if the user was deleted, False otherwise
    """
    if uid:
        return await app["db"].users.delete_one({"_id": uid})

    if username:
        return await app["db"].users.delete_one({"username": username})
    return False


async def add_cards_to_user(app, uid, cards):
    """Add cards to a user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    uid : str
        The uid of the user to add cards to
    cards : list
        The cards to be added to the user
    Returns
    -------
    user : dict
        The modified user document
    """
    user = await app["db"].find_one_and_update(
        {"_id": uid},
        {"$addToSet": {"cards": {"$each": cards}}},
        projection={"_id": 0, "username": 1, "permissions": 1, "cards": 1},
        return_document=ReturnDocument.AFTER,
    )
    return user


async def delete_cards_from_user(app, uid, cards):
    """Delete specified cards from a user.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    uid : str
        The uid of the user to delete cards from
    cards : list
        The cards to be deleted from the user
    Returns
    -------
    user : dict
        The modified user document
    """
    if not isinstance(cards, list):
        cards = [cards]
    user = await app["db"].users.find_one_and_update(
        {"_id": uid},
        {"$pullAll": {"cards": cards}},
        projection={"_id": 0, "username": 1, "permissions": 1, "cards": 1},
        return_document=ReturnDocument.AFTER,
    )
    return user


async def create_users(app, users):
    """Create multiple users.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    users : list
        A list of dictionaries with user data to be created
    """
    return (
        await app["db"]
        .users.with_options(write_concern=WriteConcern(w="majority"))
        .insert_many(
            [
                {
                    "username": user.get("username"),
                    "password": user.get("password", None),
                    "permissions": user.get("permissions", []),
                    "cards": user.get("cards", []),
                }
                for user in users
            ]
        )
    )


async def get_users(app, return_fields=["username", "permissions", "cards"]):
    """Retrieve all users from the database.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    return_fields : list, default=["username", "permissions", "cards"]
        The fields to be returned for each user.
    Returns
    -------
    users : list
        A list of user documents
    """
    projection = {}
    for field in return_fields:
        projection[field] = 1
    if "_id" not in return_fields:
        projection["_id"] = 0
    return app["db"].users.find({}, projection=projection)


async def set_default_permissions(app, username):
    """Set user permissions to the default ("enter").

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    username : str
        The username of the user to be modified
    Returns
    -------
    user : dict
        The modified user document with usernae, permissions and cards fields
    """
    user = await app["db"].users.find_one_and_update(
        {"username": username},
        {"$set": {"permissions": ["enter"]}},
        projection={"_id": 0, "username": 1, "permissions": 1, "cards": 1},
        return_document=ReturnDocument.AFTER,
    )
    return user


async def get_settings(app):
    """Retrieve breaks settings from the database.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    Returns
    -------
    breaks : dict
        A dictionary with the breaks settings
    """
    breaks = (await app["db"].settings.find_one({"setting": "break_times"})).get(
        "value", []
    )
    return {"breaks": breaks}


async def save_settings(app, settings):
    """Save settings to the database.

    Parameters
    ----------
    app : aiohttp.web.Application
        The aiohttp application instance
    settings : dict
        A dictionary with all settings to set
    """
    # TODO optimize into a single query
    for setting, value in settings.items():
        await app["db"].settings.find_one_and_update(
            {"setting": setting}, {"$set": {"value": value}}
        )