byceps/byceps

View on GitHub
byceps/services/authz/authz_service.py

Summary

Maintainability
A
0 mins
Test Coverage
A
90%
"""
byceps.services.authz.authz_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2014-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from collections import defaultdict
from collections.abc import Iterable

from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError

from byceps.database import db
from byceps.events.authz import (
    RoleAssignedToUserEvent,
    RoleDeassignedFromUserEvent,
)
from byceps.services.user import user_log_service, user_service
from byceps.services.user.models.log import UserLogEntry
from byceps.services.user.models.user import User, UserID
from byceps.util.result import Err, Ok, Result

from . import authz_domain_service
from .dbmodels import DbRole, DbRolePermission, DbUserRole
from .models import PermissionID, Role, RoleID


def create_role(role_id: RoleID, title: str) -> Result[Role, IntegrityError]:
    """Create a role."""
    db_role = DbRole(role_id, title)

    db.session.add(db_role)

    try:
        db.session.commit()
    except IntegrityError as e:
        db.session.rollback()
        return Err(e)

    return Ok(db_role).map(_db_entity_to_role)


def delete_role(role_id: RoleID) -> None:
    """Delete a role."""
    db.session.execute(
        delete(DbRolePermission).where(DbRolePermission.role_id == role_id)
    )
    db.session.execute(delete(DbRole).where(DbRole.id == role_id))
    db.session.commit()


def find_role(role_id: RoleID) -> Role | None:
    """Return the role with that id, or `None` if not found."""
    db_role = db.session.get(DbRole, role_id)

    if db_role is None:
        return None

    return _db_entity_to_role(db_role)


def find_role_ids_for_user(user_id: UserID) -> set[RoleID]:
    """Return the IDs of the roles assigned to the user."""
    db_roles = (
        db.session.scalars(
            select(DbRole)
            .join(DbUserRole)
            .filter(DbUserRole.user_id == user_id)
        )
        .unique()
        .all()
    )

    return {db_role.id for db_role in db_roles}


def find_user_ids_for_role(role_id: RoleID) -> set[UserID]:
    """Return the IDs of the users that have this role assigned."""
    user_id_rows = db.session.scalars(
        select(DbUserRole.user_id).filter(DbUserRole.role_id == role_id)
    ).all()

    return set(user_id_rows)


def assign_permission_to_role(
    permission_id: PermissionID, role_id: RoleID
) -> None:
    """Assign the permission to the role."""
    db_role_permission = DbRolePermission(role_id, permission_id)

    db.session.add(db_role_permission)
    db.session.commit()


def deassign_permission_from_role(
    permission_id: PermissionID, role_id: RoleID
) -> Result[None, str]:
    """Deassign the permission from the role."""
    db_role_permission = db.session.get(
        DbRolePermission, (role_id, permission_id)
    )

    if db_role_permission is None:
        return Err('Unknown role ID and/or permission ID')

    db.session.delete(db_role_permission)
    db.session.commit()

    return Ok(None)


def assign_role_to_user(
    role_id: RoleID, user: User, *, initiator: User | None = None
) -> RoleAssignedToUserEvent | None:
    """Assign the role to the user."""
    if _is_role_assigned_to_user(role_id, user.id):
        # Role is already assigned to user. Nothing to do.
        return None

    event, log_entry = authz_domain_service.assign_role_to_user(
        role_id, user, initiator=initiator
    )

    _persist_role_assignment_to_user(role_id, user, log_entry)

    return event


def _persist_role_assignment_to_user(
    role_id: RoleID, user: User, log_entry: UserLogEntry
) -> None:
    db_user_role = DbUserRole(user.id, role_id)
    db.session.add(db_user_role)

    db_log_entry = user_log_service.to_db_entry(log_entry)
    db.session.add(db_log_entry)

    db.session.commit()


def deassign_role_from_user(
    role_id: RoleID, user: User, *, initiator: User | None = None
) -> Result[RoleDeassignedFromUserEvent, str]:
    """Deassign the role from the user."""
    db_user_role = db.session.get(DbUserRole, (user.id, role_id))

    if db_user_role is None:
        return Err(f'Unknown role ID "{role_id}" and/or user ID "{user.id}"')

    event, log_entry = authz_domain_service.deassign_role_from_user(
        role_id, user, initiator=initiator
    )

    _persist_role_deassignment_from_user(db_user_role, log_entry)

    return Ok(event)


def _persist_role_deassignment_from_user(
    db_user_role: DbUserRole, log_entry: UserLogEntry
) -> None:
    db.session.delete(db_user_role)

    db_log_entry = user_log_service.to_db_entry(log_entry)
    db.session.add(db_log_entry)

    db.session.commit()


def deassign_all_roles_from_user(
    user: User, *, initiator: User | None = None, commit: bool = True
) -> None:
    """Deassign all roles from the user."""
    db.session.execute(delete(DbUserRole).where(DbUserRole.user_id == user.id))

    if commit:
        db.session.commit()


def _is_role_assigned_to_user(role_id: RoleID, user_id: UserID) -> bool:
    """Determine if the role is assigned to the user or not."""
    return (
        db.session.scalar(
            select(
                db.exists()
                .where(DbUserRole.role_id == role_id)
                .where(DbUserRole.user_id == user_id)
            )
        )
        or False
    )


def get_permission_ids_for_user(user_id: UserID) -> set[PermissionID]:
    """Return the IDs of all permissions the user has through the roles
    assigned to it.
    """
    db_role_permissions = db.session.scalars(
        select(DbRolePermission)
        .join(DbRole)
        .join(DbUserRole)
        .filter(DbUserRole.user_id == user_id)
    ).all()

    return {rp.permission_id for rp in db_role_permissions}


def get_assigned_roles_for_permissions() -> dict[PermissionID, set[RoleID]]:
    """Return the IDs of roles that have permissions assigned, indexed
    by permission ID.
    """
    role_ids_by_permission_id = defaultdict(set)

    rows = db.session.execute(
        select(DbRolePermission.permission_id, DbRolePermission.role_id)
    ).all()

    permission_ids_and_role_ids = [
        (PermissionID(permission_id), RoleID(role_id))
        for permission_id, role_id in rows
    ]

    for permission_id, role_id in permission_ids_and_role_ids:
        role_ids_by_permission_id[permission_id].add(role_id)

    return dict(role_ids_by_permission_id)


def get_all_role_ids() -> set[RoleID]:
    """Return all role IDs."""
    role_ids = db.session.scalars(select(DbRole.id)).all()
    return set(role_ids)


def get_all_roles_with_permissions_and_users() -> (
    list[tuple[Role, set[PermissionID], set[User]]]
):
    """Return all roles with titles, permission IDs, and assigned users."""
    db_roles = (
        db.session.scalars(
            select(DbRole).options(
                db.undefer(DbRole.title),
                db.joinedload(DbRole.user_roles).joinedload(DbUserRole.user),
            )
        )
        .unique()
        .all()
    )

    return [
        (
            _db_entity_to_role(db_role),
            {
                db_role_permission.permission_id
                for db_role_permission in db_role.role_permissions
            },
            {
                user_service._db_entity_to_user(db_user)
                for db_user in db_role.users
            },
        )
        for db_role in db_roles
    ]


def get_permission_ids_by_role() -> dict[Role, frozenset[PermissionID]]:
    """Return all roles with their assigned permission IDs.

    Role titles are undeferred to avoid lots of additional queries.
    """
    db_roles = (
        db.session.scalars(select(DbRole).options(db.undefer(DbRole.title)))
        .unique()
        .all()
    )

    role_ids_and_permission_ids = (
        db.session.execute(
            select(DbRolePermission.role_id, DbRolePermission.permission_id)
        )
        .tuples()
        .all()
    )

    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)


def get_permission_ids_by_role_for_user(
    user_id: UserID,
) -> dict[Role, frozenset[PermissionID]]:
    """Return permission IDs grouped by their respective roles for that
    user.

    Role titles are undeferred to avoid lots of additional queries.
    """
    db_roles = (
        db.session.scalars(
            select(DbRole)
            .options(db.undefer(DbRole.title))
            .join(DbUserRole)
            .filter(DbUserRole.user_id == user_id)
        )
        .unique()
        .all()
    )

    role_ids_and_permission_ids = (
        db.session.execute(
            select(DbRolePermission.role_id, DbRolePermission.permission_id)
            .join(DbRole)
            .join(DbUserRole)
            .filter(DbUserRole.user_id == user_id)
        )
        .tuples()
        .all()
    )

    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)


def _index_permission_ids_by_role(
    role_ids_and_permission_ids: Iterable[tuple[RoleID, PermissionID]],
    db_roles: Iterable[DbRole],
) -> dict[Role, frozenset[PermissionID]]:
    """Index permission IDs by role."""
    permission_ids_by_role_id = defaultdict(set)
    for role_id, permission_id in role_ids_and_permission_ids:
        permission_ids_by_role_id[role_id].add(permission_id)

    permission_ids_by_role = {}

    for db_role in db_roles:
        role = _db_entity_to_role(db_role)
        permission_ids = frozenset(permission_ids_by_role_id[role.id])
        permission_ids_by_role[role] = permission_ids

    return permission_ids_by_role


def get_permission_ids_for_role(role_id: RoleID) -> set[PermissionID]:
    """Return the permission IDs assigned to the role."""
    permission_ids = db.session.scalars(
        select(DbRolePermission.permission_id).filter_by(role_id=role_id)
    ).all()

    return {PermissionID(permission_id) for permission_id in permission_ids}


def _db_entity_to_role(db_role: DbRole) -> Role:
    return Role(
        id=db_role.id,
        title=db_role.title,
    )