byceps/byceps

View on GitHub
byceps/services/board/board_topic_query_service.py

Summary

Maintainability
A
0 mins
Test Coverage
B
82%
"""
byceps.services.board.board_topic_query_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2014-2024 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from collections.abc import Sequence
from datetime import datetime

from sqlalchemy import select
from sqlalchemy.sql import Select

from byceps.database import db, paginate, Pagination
from byceps.services.user import user_service
from byceps.services.user.dbmodels.user import DbUser
from byceps.services.user.models.user import User

from .dbmodels.category import DbBoardCategory
from .dbmodels.posting import DbPosting
from .dbmodels.topic import DbTopic
from .models import BoardCategoryID, BoardID, Topic, TopicID


def count_topics_for_board(board_id: BoardID) -> int:
    """Return the number of topics for that board."""
    return (
        db.session.scalar(
            select(db.func.count(DbTopic.id))
            .join(DbBoardCategory)
            .filter(DbBoardCategory.board_id == board_id)
        )
        or 0
    )


def find_topic(topic_id: TopicID) -> Topic | None:
    """Return the topic with that id, or `None` if not found."""
    db_topic = find_db_topic(topic_id)

    if db_topic is None:
        return None

    return _db_entity_to_topic(db_topic)


def find_db_topic(topic_id: TopicID) -> DbTopic | None:
    """Return the topic with that id, or `None` if not found."""
    return db.session.get(DbTopic, topic_id)


def get_db_topic(topic_id: TopicID) -> DbTopic:
    """Return the topic with that id."""
    db_topic = find_db_topic(topic_id)

    if db_topic is None:
        raise ValueError(f'Unknown topic ID "{topic_id}"')

    return db_topic


def find_topic_visible_for_user(
    topic_id: TopicID, *, include_hidden: bool = False
) -> DbTopic | None:
    """Return the topic with that id, or `None` if not found or
    invisible for the user.
    """
    stmt = (
        select(DbTopic)
        .options(
            db.joinedload(DbTopic.category),
        )
        .filter_by(id=topic_id)
    )

    if not include_hidden:
        stmt = stmt.filter_by(hidden=False)

    return db.session.scalars(stmt).unique().first()


def get_recent_topics(
    board_id: BoardID, limit: int, *, include_hidden: bool = False
) -> Sequence[DbTopic]:
    """Return recent topics in that board."""
    return (
        db.session.scalars(
            _select_topics(include_hidden=include_hidden)
            .join(DbBoardCategory)
            .filter(DbBoardCategory.board_id == board_id)
            .filter(DbBoardCategory.hidden == False)  # noqa: E712
            .order_by(DbTopic.last_updated_at.desc())
            .limit(limit)
        )
        .unique()
        .all()
    )


def paginate_topics(
    board_id: BoardID, page: int, per_page: int, *, include_hidden: bool = False
) -> Pagination:
    """Paginate topics in that board."""
    stmt = (
        _select_topics(include_hidden=include_hidden)
        .join(DbBoardCategory)
        .filter(DbBoardCategory.board_id == board_id)
        .filter(DbBoardCategory.hidden == False)  # noqa: E712
        .order_by(DbTopic.last_updated_at.desc())
    )

    return paginate(stmt, page, per_page)


def get_all_topic_ids_in_category(category_id: BoardCategoryID) -> set[TopicID]:
    """Return the IDs of all topics in the category."""
    topic_ids = db.session.scalars(
        select(DbTopic.id).filter(DbTopic.category_id == category_id)
    ).all()

    return set(topic_ids)


def paginate_topics_of_category(
    category_id: BoardCategoryID,
    page: int,
    per_page: int,
    *,
    include_hidden: bool = False,
) -> Pagination:
    """Paginate topics in that category, as visible for the user.

    Pinned topics are returned first.
    """
    stmt = (
        _select_topics(include_hidden=include_hidden)
        .filter_by(category_id=category_id)
        .order_by(DbTopic.pinned.desc(), DbTopic.last_updated_at.desc())
    )

    return paginate(stmt, page, per_page)


def _select_topics(*, include_hidden: bool = False) -> Select:
    stmt = select(DbTopic).options(
        db.joinedload(DbTopic.category),
        db.joinedload(DbTopic.last_updated_by),
        db.joinedload(DbTopic.hidden_by),
        db.joinedload(DbTopic.locked_by),
        db.joinedload(DbTopic.pinned_by),
    )

    if not include_hidden:
        stmt = stmt.filter_by(hidden=False)

    return stmt


def find_default_posting_to_jump_to(
    topic_id: TopicID, last_viewed_at: datetime, *, include_hidden: bool = False
) -> DbPosting | None:
    """Return the posting of the topic to show by default, or `None`."""
    postings_stmt = select(DbPosting).filter_by(topic_id=topic_id)

    if not include_hidden:
        postings_stmt = postings_stmt.filter_by(hidden=False)

    first_new_posting = db.session.scalars(
        postings_stmt.filter(DbPosting.created_at > last_viewed_at).order_by(
            DbPosting.created_at.asc()
        )
    ).first()

    if first_new_posting is None:
        # Current user has seen all postings so far, so show the last one.
        return db.session.scalars(
            postings_stmt.order_by(DbPosting.created_at.desc())
        ).first()

    return first_new_posting


def _db_entity_to_topic(db_topic: DbTopic) -> Topic:
    def to_user(db_user: DbUser | None) -> User | None:
        if db_user is None:
            return None

        return user_service._db_entity_to_user(db_user)

    creator = user_service.get_user(db_topic.creator_id)

    return Topic(
        id=db_topic.id,
        category_id=db_topic.category_id,
        created_at=db_topic.created_at,
        creator=creator,
        title=db_topic.title,
        posting_count=db_topic.posting_count,
        last_updated_at=db_topic.last_updated_at,
        last_updated_by=to_user(db_topic.last_updated_by),
        hidden=db_topic.hidden,
        hidden_at=db_topic.hidden_at,
        hidden_by=to_user(db_topic.hidden_by),
        locked=db_topic.locked,
        locked_at=db_topic.locked_at,
        locked_by=to_user(db_topic.locked_by),
        pinned=db_topic.pinned,
        pinned_at=db_topic.pinned_at,
        pinned_by=to_user(db_topic.pinned_by),
        initial_posting_id=db_topic.initial_posting.id,
        posting_limited_to_moderators=db_topic.posting_limited_to_moderators,
        muted=db_topic.muted,
    )