ejplatform/ej-server

View on GitHub
src/ej_conversations/models/conversation_queryset.py

Summary

Maintainability
A
2 hrs
Test Coverage
import logging
import random

from boogie.models import F, Value, IntegerField, FieldDoesNotExist
from boogie.models.wordcloud import WordCloudQuerySet
from django.contrib.auth import get_user_model
from django.db.models import Count, Q

from .comment import Comment
from ..mixins import ConversationMixin

log = logging.getLogger("ej")


class ConversationQuerySet(ConversationMixin, WordCloudQuerySet):
    """
    A table of conversations.
    """

    conversations = lambda self: self

    def authors(self):
        """
        Return all authors from the current conversations.
        """
        return get_user_model().objects.filter(Q(conversations__in=self))

    def promoted(self):
        """
        Show only promoted conversations.
        """
        return self.filter(is_promoted=True, is_hidden=False)

    def cache_annotations(self, *values, user=None, prefix="", **kwargs):
        """
        Annotate each conversation with the progress made by the given user.
        """
        for arg in values:
            kwargs.setdefault(arg, True)

        annotations = self._get_annotations(kwargs, prefix or "", user)
        if kwargs:
            raise TypeError(f"bad attribute: {kwargs.popitem()[0]}")

        if not annotations:
            return self
        return self.annotate(**annotations)

    def _get_annotations(self, kwargs, prefix, user):
        annotations = {}

        # First tag
        if kwargs.pop("first_tag", False):
            ...
            # Fixme: does not work with Postgres. Disabling this creates a N+1
            # error fetching tags, but at least display conversations.
            # annotations[prefix + "first_tag"] = Window(FirstValue("tags__name"))

        # Count comments
        if kwargs.pop("n_comments", False):
            annotations[prefix + "n_comments"] = Count(
                "comments", filter=Q(comments__status=Comment.STATUS.approved), distinct=True
            )

        # Count favorites
        if kwargs.pop("n_favorites", False):
            annotations[prefix + "n_favorites"] = Count("favorites")

        # Count votes
        if kwargs.pop("n_votes", False):
            annotations[prefix + "n_votes"] = Count("comments__votes")

        # Count votes for user
        if kwargs.pop("n_user_votes", False):
            if user.is_authenticated:
                data = Count("comments__votes", filter=Q(comments__votes__author=user))
            else:
                data = Value(0, IntegerField())
            annotations[prefix + "n_user_votes"] = data

        # Author name
        if kwargs.pop("author_name", False):
            annotations[prefix + "author_name"] = F(AUTHOR_NAME_FIELD)
        return annotations

    def random_votes(self, users=None, probs=(0.1, 0.15, 0.25)):
        """
        Cast random votes for the list of users.

        Args:
            users (sequence of users):
                List or queryset of users. Select all users if not given.
            probs:
                List of probabilities for (disagree, skip, agree). If normalized
                for less than 1, some users will not even cast any vote.
        """
        from .vote import Vote, normalize_choice

        if not users:
            users = get_user_model().objects.filter(is_active=True)

        vote_prob = sum(probs)
        if vote_prob == 0:
            return  # Nothing to vote
        elif not 0 <= vote_prob <= 1:
            raise ValueError("sum o probabilities must be in [0, 1] interval")

        # Prepare to sample votes
        probs = [p / vote_prob for p in probs]
        choices = list(map(normalize_choice, ["disagree", "skip", "agree"]))
        comments = self.comments()
        votes = set(map(tuple, comments.votes().values_list("comment_id", "author_id")))

        # Cast random votes
        new_votes = []
        for comment in comments:
            for user in users:
                if (comment.id, user.id) not in votes and random.random() < vote_prob:
                    choice = random.choices(choices, probs)[0]
                    vote = Vote(author=user, choice=choice, comment=comment)
                    new_votes.append(vote)

        return Vote.objects.bulk_create(new_votes)


#
# Constants and configurations
#
try:
    get_user_model()._meta.get_field("name")
    AUTHOR_NAME_FIELD = "author__name"
except FieldDoesNotExist:
    AUTHOR_NAME_FIELD = "author__username"