django_magnificent_messages/models.py
"""
Models for django_magnificent_messages
"""
from django.db import models
from django.db.models import Q, QuerySet
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError
from model_utils.models import TimeStampedModel
from django_magnificent_messages import constants
from django_magnificent_messages.fields import JSONField
from .conf import settings
from .storage.message_storage.db_signals import message_archived, message_read, message_unarchived, message_unread
class MessageNotSentToUserError(Exception):
pass
class Message(TimeStampedModel):
"""
Main model for app.
Stores one record for every message in system
"""
level = models.IntegerField()
subject = models.TextField(blank=True, null=True)
text = models.TextField()
extra = JSONField(blank=True, null=True)
author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, related_name="outbox")
user_generated = models.BooleanField()
reply_to = models.ForeignKey('django_magnificent_messages.Message', on_delete=models.PROTECT,
related_name="replies", null=True)
sent_to_users = models.ManyToManyField(settings.AUTH_USER_MODEL, related_name="messages",
db_table="mm_message_sent_to_user")
sent_to_groups = models.ManyToManyField('auth.Group', related_name="messages", db_table="mm_message_sent_to_group")
read_by = models.ManyToManyField(settings.AUTH_USER_MODEL, related_name="read_messages",
db_table="mm_message_read_by_user")
archived_by = models.ManyToManyField(settings.AUTH_USER_MODEL, related_name="archived_messages",
db_table="mm_message_archived_by_user")
html_safe = models.BooleanField(default=False)
# @property
# def text(self):
# if self.html_safe:
# return mark_safe(self.raw_text)
# return self.raw_text
class Meta:
db_table = "mm_message"
default_permissions = ()
permissions = (
("send_message", "Send message"),
("view_all_message", "View all messages"),
("delete_any_message", "Delete any messages"),
)
ordering = ("-created",)
def _is_user_in_recipients(self, user):
if user in self.sent_to_users.all():
return True
if hasattr(user, "groups"):
for group in user.groups.all():
if group in self.sent_to_groups.all():
return True
return False
def archive(self, user):
if self._is_user_in_recipients(user):
self.archived_by.add(user)
message_archived.send(sender=self.__class__, message=self, user=user)
else:
raise MessageNotSentToUserError(
"{0} was not sent to user, who tried to archive it".format(self)
)
def unarchive(self, user):
if self._is_user_in_recipients(user):
self.archived_by.remove(user)
message_unarchived.send(sender=self.__class__, message=self, user=user)
else:
raise MessageNotSentToUserError(
"{0} was not sent to user, who tried to unarchive ir".format(self)
)
def mark_read(self, user):
if self._is_user_in_recipients(user):
self.read_by.add(user)
message_read.send(sender=self.__class__, message=self, user=user)
else:
raise MessageNotSentToUserError(
"{0} was not sent to user, who tried to mark it as read".format(self)
)
def mark_unread(self, user):
if self._is_user_in_recipients(user):
self.read_by.remove(user)
message_unread.send(sender=self.__class__, message=self, user=user)
else:
raise MessageNotSentToUserError(
"{0} was not sent to user, who tried to mark it as unread".format(self)
)
def __str__(self):
return "<Message: {0}>".format(self.pk)
class Inbox(models.Model):
"""
Inbox model.
Represents user inbox. Stores time, when inbox was checked last and provides methods for getting messages in inbox.
It's possible that in future django-magnificent-messages will support multiple inboxes, so we use ForeignKey
instead of OneToOneField and Inbox has field ``name`` with default="Inbox"
"""
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, related_name="inboxes")
name = models.CharField(max_length=200, default="Inbox")
main = models.BooleanField(default=False)
desc = models.TextField(blank=True)
last_checked = models.DateTimeField(default=constants.MIN_DATETIME)
@property
def all(self) -> QuerySet:
return self._all()
@property
def all_count(self) -> int:
return self._all().count()
@property
def read(self) -> QuerySet:
return self._read()
@property
def read_count(self) -> int:
return self._read().count()
@property
def unread(self) -> QuerySet:
return self._unread()
@property
def unread_count(self) -> int:
return self._unread().count()
@property
def archived(self) -> QuerySet:
return self._get_messages(archived=True)
@property
def archived_count(self) -> int:
return self.archived.count()
@property
def new(self) -> QuerySet:
return self._new()
@property
def new_count(self) -> int:
return self._new().count()
class Meta:
db_table = "mm_inbox"
unique_together = (
("user", "name")
)
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
"""
Preventing more than one main inbox
"""
if self.main and Inbox.objects.filter(user=self.user, main=True).exclude(pk=self.pk).exists():
raise ValidationError(
_("User %(user)s already has main inbox"),
code="invalid",
params={
"user": str(self.user)
}
)
super(Inbox, self).save(force_insert, force_update, using, update_fields)
def _get_messages(self, archived: bool = False, q: Q = Q()) -> QuerySet:
"""
Get messages in this inbox and filter them with q.
Get messages sent to user directly or through some of user groups. Excepts archived messages
Uses two queries to avoid duplication of messages sent to user directly and through the groups, or through
two and more groups. Can't use distinct() because Oracle does not support distinct on NCLOB fields and
Message model has such fields (subject, text and extra)
**You should not use this method directly. Use properties instead**
:param archived: If False (default) - exclude archived. If true - notifications_show only archived
:param q: Q object to filter messages
:return: Messages QuerySet
"""
# Get distinct messages pks
to_user_q = Q(sent_to_users=self.user)
if hasattr(self.user, "groups") and hasattr(self.user.groups, "all") and callable(self.user.groups.all):
to_user_q = to_user_q | Q(sent_to_groups__pk__in=self.user.groups.values_list("pk", flat=True))
message_pks = Message.objects.filter(to_user_q).values("id").distinct()
# Get messages with pk in message_pks
messages = Message.objects.filter(pk__in=message_pks)
if archived:
messages = messages.filter(archived_by=self.user)
else:
messages = messages.exclude(archived_by=self.user)
messages = messages.filter(q)
messages = messages.select_related("author", "reply_to")
messages = messages.prefetch_related("sent_to_users", "sent_to_groups", "read_by", "archived_by")
return messages
def _all(self) -> QuerySet:
"""
Returns all messages in inbox except archived
"""
return self._get_messages()
def _read(self) -> QuerySet:
return self._get_messages(q=Q(pk__in=self.user.read_messages.values("pk")))
def _unread(self) -> QuerySet:
return self._get_messages(q=~Q(pk__in=self.user.read_messages.values("pk")))
def _new(self) -> QuerySet:
return self._get_messages(q=Q(created__gt=self.last_checked))
def update_last_checked(self):
self.last_checked = timezone.now()
self.save()