alot/db/message.py

Summary

Maintainability
C
1 day
Test Coverage
# Copyright (C) 2011-2012  Patrick Totzke <patricktotzke@gmail.com>
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import email
import email.charset as charset
import email.policy
import functools
from datetime import datetime

from notmuch2 import NullPointerError

from . import utils
from .utils import get_body_part, extract_body_part
from .utils import decode_header
from .attachment import Attachment
from .. import helper
from ..settings.const import settings

charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8')


@functools.total_ordering
class Message:
    """
    a persistent notmuch message object.
    It it uses a :class:`~alot.db.DBManager` for cached manipulation
    and lazy lookups.
    """
    def __init__(self, dbman, msg, thread=None):
        """
        :param dbman: db manager that is used for further lookups
        :type dbman: alot.db.DBManager
        :param msg: the wrapped message
        :type msg: notmuch2.Message
        :param thread: this messages thread (will be looked up later if `None`)
        :type thread: :class:`~alot.db.Thread` or `None`
        """
        self._dbman = dbman
        self._id = msg.messageid
        self._thread_id = msg.threadid
        self._thread = thread
        try:
            self._datetime = datetime.fromtimestamp(msg.date)
        except ValueError:
            self._datetime = None
        self._filename = str(msg.path)
        self._email = None  # will be read upon first use
        self._attachments = None  # will be read upon first use
        self._mime_part = None  # will be read upon first use
        self._mime_tree = None  # will be read upon first use
        self._tags = msg.tags

        self._session_keys = [
            value for _, value in msg.properties.getall(prefix="session-key",
                                                        exact=True)
        ]

        try:
            sender = decode_header(msg.header('From'))
            if not sender:
                sender = decode_header(msg.header('Sender'))
        except (NullPointerError, LookupError):
            sender = None
        if sender:
            self._from = sender
        elif 'draft' in self._tags:
            acc = settings.get_accounts()[0]
            self._from = '"{}" <{}>'.format(acc.realname, str(acc.address))
        else:
            self._from = '"Unknown" <>'

    def __str__(self):
        """prettyprint the message"""
        aname, aaddress = self.get_author()
        if not aname:
            aname = aaddress
        return "%s (%s)" % (aname, self.get_datestring())

    def __hash__(self):
        """needed for sets of Messages"""
        return hash(self._id)

    def __eq__(self, other):
        if isinstance(other, type(self)):
            return self._id == other.get_message_id()
        return NotImplemented

    def __ne__(self, other):
        if isinstance(other, type(self)):
            return self._id != other.get_message_id()
        return NotImplemented

    def __lt__(self, other):
        if isinstance(other, type(self)):
            return self._id < other.get_message_id()
        return NotImplemented

    def get_email(self):
        """returns :class:`email.email.EmailMessage` for this message"""
        path = self.get_filename()
        warning = "Subject: Caution!\n"\
                  "Message file is no longer accessible:\n%s" % path
        if not self._email:
            try:
                with open(path, 'rb') as f:
                    self._email = utils.decrypted_message_from_bytes(
                            f.read(), self._session_keys)
            except IOError:
                self._email = email.message_from_string(
                    warning, policy=email.policy.SMTP)
        return self._email

    def get_date(self):
        """returns Date header value as :class:`~datetime.datetime`"""
        return self._datetime

    def get_filename(self):
        """returns absolute path of message files location"""
        return self._filename

    def get_message_id(self):
        """returns messages id (str)"""
        return self._id

    def get_thread_id(self):
        """returns id (str) of the thread this message belongs to"""
        return self._thread_id

    def get_message_parts(self):
        """yield all body parts of this message"""
        for msg in self.get_email().walk():
            if not msg.is_multipart():
                yield msg

    def get_tags(self):
        """returns tags attached to this message as list of strings"""
        return sorted(self._tags)

    def get_thread(self):
        """returns the :class:`~alot.db.Thread` this msg belongs to"""
        if not self._thread:
            self._thread = self._dbman.get_thread(self._thread_id)
        return self._thread

    def has_replies(self):
        """returns true if this message has at least one reply"""
        return len(self.get_replies()) > 0

    def get_replies(self):
        """returns replies to this message as list of :class:`Message`"""
        t = self.get_thread()
        return t.get_replies_to(self)

    def get_datestring(self):
        """
        returns reformated datestring for this message.

        It uses :meth:`SettingsManager.represent_datetime` to represent
        this messages `Date` header

        :rtype: str
        """
        if self._datetime is None:
            res = None
        else:
            res = settings.represent_datetime(self._datetime)
        return res

    def get_author(self):
        """
        returns realname and address of this messages author

        :rtype: (str,str)
        """
        return email.utils.parseaddr(self._from)

    def add_tags(self, tags, afterwards=None, remove_rest=False):
        """
        adds tags to message

        .. note::

            This only adds the requested operation to this objects
            :class:`DBManager's <alot.db.DBManager>` write queue.
            You need to call :meth:`~alot.db.DBManager.flush` to write out.

        :param tags: a list of tags to be added
        :type tags: list of str
        :param afterwards: callback that gets called after successful
                           application of this tagging operation
        :type afterwards: callable
        :param remove_rest: remove all other tags
        :type remove_rest: bool
        """
        def myafterwards():
            if remove_rest:
                self._tags = set(tags)
            else:
                self._tags = self._tags.union(tags)
            if callable(afterwards):
                afterwards()

        self._dbman.tag('id:' + self._id, tags, afterwards=myafterwards,
                        remove_rest=remove_rest)
        self._tags = self._tags.union(tags)

    def remove_tags(self, tags, afterwards=None):
        """remove tags from message

        .. note::

            This only adds the requested operation to this objects
            :class:`DBManager's <alot.db.DBManager>` write queue.
            You need to call :meth:`~alot.db.DBManager.flush` to actually out.

        :param tags: a list of tags to be added
        :type tags: list of str
        :param afterwards: callback that gets called after successful
                           application of this tagging operation
        :type afterwards: callable
        """
        def myafterwards():
            self._tags = self._tags.difference(tags)
            if callable(afterwards):
                afterwards()

        self._dbman.untag('id:' + self._id, tags, myafterwards)

    def get_attachments(self):
        """
        returns messages attachments

        Derived from the leaves of the email mime tree
        that and are not part of :rfc:`2015` syntax for encrypted/signed mails
        and either have :mailheader:`Content-Disposition` `attachment`
        or have :mailheader:`Content-Disposition` `inline` but specify
        a filename (as parameter to `Content-Disposition`).

        :rtype: list of :class:`Attachment`
        """
        if not self._attachments:
            self._attachments = []
            for part in self.get_message_parts():
                ct = part.get_content_type()
                # replace underspecified mime description by a better guess
                if ct in ['octet/stream', 'application/octet-stream']:
                    content = part.get_payload(decode=True)
                    ct = helper.guess_mimetype(content)
                    if (self._attachments and
                            self._attachments[-1].get_content_type() ==
                            'application/pgp-encrypted'):
                        self._attachments.pop()

                if self._is_attachment(part, ct):
                    self._attachments.append(Attachment(part))
        return self._attachments

    @staticmethod
    def _is_attachment(part, ct_override=None):
        """Takes a mimepart and returns a bool indicating if it's an attachment

        Takes an optional argument to override the content type.
        """
        cd = part.get('Content-Disposition', '')
        filename = part.get_filename()
        ct = ct_override or part.get_content_type()

        if cd.lower().startswith('attachment'):
            if ct.lower() not in ['application/pgp-signature']:
                return True
        elif cd.lower().startswith('inline'):
            if (filename is not None and ct.lower() != 'application/pgp'):
                return True

        return False

    def get_mime_part(self):
        if not self._mime_part:
            self._mime_part = get_body_part(self.get_email())
        return self._mime_part

    def set_mime_part(self, mime_part):
        self._mime_part = mime_part

    def get_body_text(self):
        """ returns bodystring extracted from this mail """
        return extract_body_part(self.get_mime_part())

    def matches(self, querystring):
        """tests if this messages is in the resultset for `querystring`"""
        searchfor = '( {} ) AND id:{}'.format(querystring, self._id)
        return self._dbman.count_messages(searchfor) > 0

    def get_mime_tree(self):
        if not self._mime_tree:
            self._mime_tree = self._get_mimetree(self.get_email())
        return self._mime_tree

    @classmethod
    def _get_mimetree(cls, message):
        label = cls._get_mime_part_info(message)
        if message.is_multipart():
            return label, [cls._get_mimetree(m) for m in message.get_payload()]
        else:
            if cls._is_attachment(message):
                message = Attachment(message)
            return label, message

    @staticmethod
    def _get_mime_part_info(mime_part):
        contenttype = mime_part.get_content_type()
        filename = mime_part.get_filename() or '(no filename)'
        charset = mime_part.get_content_charset() or ''
        size = helper.humanize_size(len(mime_part.as_string()))
        return ' '.join((contenttype, filename, charset, size))