byceps/byceps

View on GitHub
byceps/services/ticketing/ticket_attendance_service.py

Summary

Maintainability
A
0 mins
Test Coverage
B
88%
"""
byceps.services.ticketing.ticket_attendance_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2014-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from collections import Counter
from collections.abc import Sequence
from datetime import datetime
from itertools import chain

from sqlalchemy import delete, select

from byceps.database import db, insert_ignore_on_conflict
from byceps.services.brand.models import BrandID
from byceps.services.party import party_service
from byceps.services.party.dbmodels import DbParty
from byceps.services.party.models import Party, PartyID
from byceps.services.user.models.user import UserID

from .dbmodels.archived_attendance import DbArchivedAttendance
from .dbmodels.category import DbTicketCategory
from .dbmodels.ticket import DbTicket


def create_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
    """Create an archived attendance of the user at the party."""
    table = DbArchivedAttendance.__table__

    values = {
        'user_id': str(user_id),
        'party_id': str(party_id),
    }

    insert_ignore_on_conflict(table, values)


def delete_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
    """Delete the archived attendance of the user at the party."""
    db.session.execute(
        delete(DbArchivedAttendance).filter_by(
            user_id=user_id, party_id=party_id
        )
    )
    db.session.commit()


def get_attended_parties(
    user_id: UserID, limit_to_brand_id: BrandID | None = None
) -> list[Party]:
    """Return the parties the user has attended in the past."""
    ticket_attendance_party_ids = _get_attended_party_ids(
        user_id, limit_to_brand_id
    )
    archived_attendance_party_ids = _get_archived_attendance_party_ids(
        user_id, limit_to_brand_id
    )

    party_ids = set(
        chain(ticket_attendance_party_ids, archived_attendance_party_ids)
    )

    return party_service.get_parties(party_ids)


def _get_attended_party_ids(
    user_id: UserID, limit_to_brand_id: BrandID | None = None
) -> set[PartyID]:
    """Return the IDs of the non-legacy parties the user has attended."""
    stmt = (
        select(DbParty.id)
        .filter(DbParty.ends_at < datetime.utcnow())
        .filter(DbParty.canceled == False)  # noqa: E712
        .join(DbTicketCategory)
        .join(DbTicket)
        .filter(DbTicket.revoked == False)  # noqa: E712
        .filter(DbTicket.used_by_id == user_id)
    )

    if limit_to_brand_id is not None:
        stmt = stmt.filter(DbParty.brand_id == limit_to_brand_id)

    party_ids = db.session.scalars(stmt).all()

    return set(party_ids)


def _get_archived_attendance_party_ids(
    user_id: UserID, limit_to_brand_id: BrandID | None = None
) -> set[PartyID]:
    """Return the IDs of the legacy parties the user has attended."""
    stmt = select(DbArchivedAttendance.party_id).filter(
        DbArchivedAttendance.user_id == user_id
    )

    if limit_to_brand_id is not None:
        stmt = stmt.join(DbParty).filter(DbParty.brand_id == limit_to_brand_id)

    party_ids = db.session.scalars(stmt).all()

    return set(party_ids)


def get_attendee_ids_for_party(party_id: PartyID) -> set[UserID]:
    """Return the party's attendees' IDs."""
    ticket_user_ids = db.session.scalars(
        select(DbTicket.used_by_id)
        .join(DbTicketCategory)
        .filter(DbTicketCategory.party_id == party_id)
        .filter(DbTicket.revoked == False)  # noqa: E712
        .filter(DbTicket.used_by_id.is_not(None))
    ).all()

    archived_attendance_user_ids = db.session.scalars(
        select(DbArchivedAttendance.user_id).filter(
            DbArchivedAttendance.party_id == party_id
        )
    ).all()

    return set(
        list(filter(None, ticket_user_ids)) + list(archived_attendance_user_ids)
    )


def get_top_attendees_for_brand(brand_id: BrandID) -> list[tuple[UserID, int]]:
    """Return the attendees with the highest number of parties of this
    brand visited.
    """
    top_ticket_attendance_counts = _get_top_ticket_attendees_for_parties(
        brand_id
    )

    top_archived_attendance_counts = _get_top_archived_attendees_for_parties(
        brand_id
    )

    top_attendance_counts = _merge_top_attendance_counts(
        [top_ticket_attendance_counts, top_archived_attendance_counts]
    )

    # Select top attendees with more than one attendance.
    top_attendees = top_attendance_counts.most_common(50)
    top_attendees = [
        (user_id, attendance_count)
        for user_id, attendance_count in top_attendees
        if attendance_count > 1
    ]

    return top_attendees


def _get_top_ticket_attendees_for_parties(
    brand_id: BrandID,
) -> Sequence[tuple[UserID, int]]:
    user_id_column = db.aliased(DbTicket).used_by_id

    attendance_count = (
        select(
            db.func.count(DbTicketCategory.party_id.distinct()),
        )
        .join(DbParty)
        .filter(DbParty.brand_id == brand_id)
        .join(DbTicket)
        .filter(DbTicket.revoked == False)  # noqa: E712
        .filter(DbTicket.used_by_id == user_id_column)
        .scalar_subquery()
    )

    return (
        db.session.execute(
            select(
                user_id_column.distinct(),
                attendance_count,
            )
            .filter(user_id_column.is_not(None))
            .filter(attendance_count > 0)
            .order_by(attendance_count.desc())
        )
        .tuples()
        .all()
    )


def _get_top_archived_attendees_for_parties(
    brand_id: BrandID,
) -> Sequence[tuple[UserID, int]]:
    attendance_count_column = db.func.count(DbArchivedAttendance.user_id).label(
        'attendance_count'
    )

    return (
        db.session.execute(
            select(
                DbArchivedAttendance.user_id,
                attendance_count_column,
            )
            .join(DbParty)
            .filter(DbParty.brand_id == brand_id)
            .group_by(DbArchivedAttendance.user_id)
            .order_by(attendance_count_column.desc())
        )
        .tuples()
        .all()
    )


def _merge_top_attendance_counts(
    xs: list[Sequence[tuple[UserID, int]]],
) -> Counter[UserID]:
    counter: Counter = Counter()

    for x in xs:
        counter.update(dict(x))

    return counter