byceps/byceps

View on GitHub
byceps/services/authn/identity_tag/authn_identity_tag_service.py

Summary

Maintainability
A
0 mins
Test Coverage
F
33%
"""
byceps.services.authn.identity_tag.authn_identity_tag_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2022-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from uuid import UUID

from sqlalchemy import delete, select

from byceps.database import db
from byceps.services.user import user_log_service, user_service
from byceps.services.user.models.log import UserLogEntry
from byceps.services.user.models.user import User

from . import authn_identity_tag_domain_service
from .dbmodels import DbUserIdentityTag
from .models import UserIdentityTag


def create_tag(
    creator: User,
    identifier: str,
    user: User,
    *,
    note: str | None = None,
    suspended: bool = False,
) -> UserIdentityTag:
    """Create a tag."""
    tag, event, log_entry = authn_identity_tag_domain_service.create_tag(
        creator, identifier, user, note=note, suspended=suspended
    )

    _persist_tag_creation(tag, log_entry)

    return tag


def _persist_tag_creation(
    tag: UserIdentityTag, log_entry: UserLogEntry
) -> None:
    db_tag = DbUserIdentityTag(
        tag.id,
        tag.created_at,
        tag.creator.id,
        tag.identifier,
        tag.user.id,
        tag.note,
        tag.suspended,
    )
    db.session.add(db_tag)

    db_log_entry = user_log_service.to_db_entry(log_entry)
    db.session.add(db_log_entry)

    db.session.commit()


def delete_tag(tag: UserIdentityTag, initiator: User) -> None:
    """Delete a tag."""
    event, log_entry = authn_identity_tag_domain_service.delete_tag(
        tag, initiator
    )

    db.session.execute(
        delete(DbUserIdentityTag).where(DbUserIdentityTag.id == tag.id)
    )

    db_log_entry = user_log_service.to_db_entry(log_entry)
    db.session.add(db_log_entry)

    db.session.commit()


def find_tag(tag_id: UUID) -> UserIdentityTag | None:
    """Return the tag, if found."""
    db_tag = db.session.get(DbUserIdentityTag, tag_id)

    if db_tag is None:
        return None

    creator = user_service.get_user(db_tag.creator_id, include_avatar=True)
    user = user_service.get_user(db_tag.user_id, include_avatar=True)

    return _db_entity_to_tag(db_tag, creator, user)


def find_tag_by_identifier(identifier: str) -> UserIdentityTag | None:
    """Return the tag with this identifier."""
    db_tag = db.session.scalars(
        select(DbUserIdentityTag).filter(
            db.func.lower(DbUserIdentityTag.identifier) == identifier.lower()
        )
    ).one_or_none()

    if db_tag is None:
        return None

    creator = user_service.get_user(db_tag.creator_id, include_avatar=True)
    user = user_service.get_user(db_tag.user_id, include_avatar=True)

    return _db_entity_to_tag(db_tag, creator, user)


def get_all_tags() -> list[UserIdentityTag]:
    """Return all tags."""
    db_tags = db.session.scalars(select(DbUserIdentityTag)).all()

    creator_ids = {db_tag.creator_id for db_tag in db_tags}
    user_ids = {db_tag.user_id for db_tag in db_tags}
    creator_and_user_ids = user_ids.union(creator_ids)
    creators_and_users_by_id = user_service.get_users_indexed_by_id(
        creator_and_user_ids, include_avatars=True
    )

    return [
        _db_entity_to_tag(
            db_tag,
            creators_and_users_by_id[db_tag.creator_id],
            creators_and_users_by_id[db_tag.user_id],
        )
        for db_tag in db_tags
    ]


def _db_entity_to_tag(
    db_tag: DbUserIdentityTag, creator: User, user: User
) -> UserIdentityTag:
    return UserIdentityTag(
        id=db_tag.id,
        created_at=db_tag.created_at,
        creator=creator,
        identifier=db_tag.identifier,
        user=user,
        note=db_tag.note,
        suspended=db_tag.suspended,
    )