ejplatform/ej-server

View on GitHub
src/ej_conversations/models/util.py

Summary

Maintainability
A
0 mins
Test Coverage
import datetime
from django.contrib.auth import get_user_model
from django.db.models import Count, Q
from django.db.models.functions import TruncDay
from sidekick import import_later
from sidekick import property as property

from .comment import Comment
from .vote import VoteChannels

models = import_later("ej_conversations.models")


def make_clean(cls, commit=True, **kwargs):
    """
    Create an instance of the cls model (or execute the cls callable) and
    call the resulting object .full_clean() method and later decide to save it
    (if commit=True) or not.
    """

    obj = cls(**kwargs)
    obj.full_clean()
    if commit:
        obj.save()
    return obj


def patch_user_model(model):
    def conversations_with_votes(user):
        return models.Conversation.objects.filter(comments__votes__author=user).distinct()

    model.conversations_with_votes = property(conversations_with_votes)


#
# Conversation statistics
#
def vote_count(conversation, which=None):
    """
    Return the number of votes of a given type.
    """
    kwargs = {"comment__conversation_id": conversation.id}
    if which is not None:
        kwargs["choice"] = which
    return models.Vote.objects.filter(**kwargs).count()


def statistics(conversation, cache=True):
    """
    Return a dictionary with basic statistics about conversation.
    """
    if cache:
        try:
            return conversation._cached_statistics
        except AttributeError:
            conversation._cached_statistics = conversation.statistics(False)
            return conversation._cached_statistics

    return {
        # Vote counts
        "votes": conversation.votes.aggregate(
            agree=Count("choice", filter=Q(choice=models.Choice.AGREE)),
            disagree=Count("choice", filter=Q(choice=models.Choice.DISAGREE)),
            skip=Count("choice", filter=Q(choice=models.Choice.SKIP)),
            total=Count("choice"),
        ),
        # Comment counts
        "comments": conversation.comments.aggregate(
            approved=Count("status", filter=Q(status=models.Comment.STATUS.approved)),
            rejected=Count("status", filter=Q(status=models.Comment.STATUS.rejected)),
            pending=Count("status", filter=Q(status=models.Comment.STATUS.pending)),
            total=Count("status"),
        ),
        # Participants count
        "participants": {
            "voters": (
                get_user_model()
                .objects.filter(votes__comment__conversation_id=conversation.id)
                .distinct()
                .count()
            ),
            "commenters": (
                get_user_model()
                .objects.filter(
                    comments__conversation_id=conversation.id,
                    comments__status=Comment.STATUS.approved,
                )
                .distinct()
                .count()
            ),
        },
        "channel_votes": conversation.votes.aggregate(
            webchat=Count("channel", filter=Q(channel=VoteChannels.RASA_WEBCHAT)),
            telegram=Count("channel", filter=Q(channel=VoteChannels.TELEGRAM)),
            whatsapp=Count("channel", filter=Q(channel=VoteChannels.WHATSAPP)),
            opinion_component=Count("channel", filter=Q(channel=VoteChannels.OPINION_COMPONENT)),
            unknown=Count("channel", filter=Q(channel=VoteChannels.UNKNOWN)),
            ej=Count("channel", filter=Q(channel=VoteChannels.EJ)),
            rocketchat=Count("channel", filter=Q(channel=VoteChannels.ROCKETCHAT)),
        ),
        "channel_participants": conversation.votes.aggregate(
            webchat=Count(
                "author",
                filter=Q(channel=VoteChannels.RASA_WEBCHAT),
                distinct="author",
            ),
            telegram=Count(
                "author",
                filter=Q(channel=VoteChannels.TELEGRAM),
                distinct="author",
            ),
            whatsapp=Count(
                "author",
                filter=Q(channel=VoteChannels.WHATSAPP),
                distinct="author",
            ),
            opinion_component=Count(
                "author",
                filter=Q(channel=VoteChannels.OPINION_COMPONENT),
                distinct="author",
            ),
            unknown=Count(
                "author",
                filter=Q(channel=VoteChannels.UNKNOWN),
                distinct="author",
            ),
            ej=Count("author", filter=Q(channel=VoteChannels.EJ), distinct="author"),
            rocketchat=Count(
                "author",
                filter=Q(channel=VoteChannels.ROCKETCHAT),
                distinct="author",
            ),
        ),
    }


def statistics_for_user(conversation, user):
    """
    Get information about user.
    """
    approved_comments_count = conversation.comments.filter(status=Comment.STATUS.approved).count()
    given_votes = (
        0
        if user.id is None
        else (
            models.Vote.objects.filter(comment__conversation_id=conversation.id, author=user)
            .exclude(choice=0)
            .count()
        )
    )

    e = 1e-50  # for numerical stability
    return {
        "votes": given_votes,
        "missing_votes": approved_comments_count - given_votes,
        "participation_ratio": given_votes / (approved_comments_count + e),
        "total_comments": approved_comments_count,
        "comments": given_votes,
    }


def set_date_range(start_date, end_date):
    """
    Set all date range values ​​with 0.
    """
    date_range = end_date - start_date
    initial_values = [
        {"date": start_date + datetime.timedelta(days=i), "value": 0} for i in range(date_range.days + 1)
    ]
    return initial_values


def get_all_interval_dates(start_date, end_date, date_votes):
    initial_values = set_date_range(start_date, end_date)
    for data in date_votes:
        index = (data["date"] - start_date).days
        initial_values[index]["value"] = data["value"]
    return initial_values


def vote_distribution_over_time(conversation, start_date, end_date):
    """
    Returns the total votes for each day in a time interval.
    """
    # contains total votes only for days on which votes occurred.
    date_votes = conversation.votes.filter(
        created__range=(start_date, end_date + datetime.timedelta(days=1))
    )
    date_votes = (
        date_votes.annotate(date=TruncDay("created"))
        .values("date")
        .annotate(value=Count("id"))
        .order_by("-date")
    )
    return get_all_interval_dates(start_date, end_date, date_votes)