edgewall/trac

View on GitHub
trac/notification/mail.py

Summary

Maintainability
F
4 days
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) 2003-2023 Edgewall Software
# Copyright (C) 2003-2005 Daniel Lundin <daniel@edgewall.com>
# Copyright (C) 2005-2006 Emmanuel Blot <emmanuel.blot@free.fr>
# Copyright (C) 2008 Stephen Hansen
# Copyright (C) 2009 Robert Corsaro
# Copyright (C) 2010-2012 Steffen Hoffmann
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.

import hashlib
import hmac
import os
import re
import smtplib
from email import policy
from email.charset import BASE64, QP, SHORTEST, Charset
from email.header import Header
from email.headerregistry import Address
from email.message import EmailMessage
from email.utils import formatdate, parseaddr, getaddresses
from subprocess import Popen, PIPE

from trac.config import (BoolOption, ConfigurationError, IntOption, Option,
                         OrderedExtensionsOption)
from trac.core import Component, ExtensionPoint, TracError, implements
from trac.notification.api import (
    get_target_id, IEmailAddressResolver, IEmailDecorator, IEmailSender,
    INotificationDistributor, INotificationFormatter, INotificationSubscriber,
    NotificationSystem)
from trac.util import lazy
from trac.util.compat import close_fds
from trac.util.datefmt import time_now, to_utimestamp
from trac.util.html import tag
from trac.util.text import CRLF, exception_to_unicode, fix_eol, to_unicode
from trac.util.translation import _, tag_
from trac.web.session import get_session_attribute


__all__ = ['AlwaysEmailSubscriber', 'EMAIL_LOOKALIKE_PATTERN',
           'EmailDistributor', 'FromAuthorEmailDecorator', 'MAXHEADERLEN',
           'RecipientMatcher', 'SendmailEmailSender', 'SessionEmailResolver',
           'SmtpEmailSender', 'create_charset', 'create_header',
           'create_message_id', 'create_mime_multipart', 'create_mime_text',
           'get_message_addresses', 'get_from_author', 'set_header']


MAXHEADERLEN = 76
EMAIL_LOOKALIKE_PATTERN = (
        # the local part
        r"[a-zA-Z0-9.'+_-]+" '@'
        # the domain name part (RFC:1035)
        r'(?:[a-zA-Z0-9_-]+\.)+'  # labels (but also allow '_')
        r'[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?'  # TLD
        )

_mime_encoding_re = re.compile(r'=\?[^?]+\?[bq]\?[^?]+\?=', re.IGNORECASE)

local_hostname = None

# When default policy is used if [notification] mime_encoding is 'none'
# and a line is exceeded max_line_length (78 bytes by default), 'base64'
# or 'quoted-printable' is used for Content-Transfer-Encoding header.
# Using the custom policy avoids the behavior.
_policy_default = policy.SMTP  # newline is CRLF
_policy_8bit = _policy_default.clone(max_line_length=998)


def create_charset(mime_encoding):
    """Create an appropriate email charset for the given encoding.

    Valid options are 'base64' for Base64 encoding, 'qp' for
    Quoted-Printable, and 'none' for no encoding, in which case emails
    will be sent as 7bit if the content is all ASCII, or 8bit otherwise.
    """
    charset = Charset()
    charset.input_charset = 'utf-8'
    charset.output_charset = 'utf-8'
    charset.input_codec = 'utf-8'
    charset.output_codec = 'utf-8'
    pref = mime_encoding.lower()
    if pref == 'base64':
        charset.header_encoding = BASE64
        charset.body_encoding = BASE64
    elif pref in ('qp', 'quoted-printable'):
        charset.header_encoding = QP
        charset.body_encoding = QP
    elif pref == 'none':
        charset.header_encoding = SHORTEST
        charset.body_encoding = None
    else:
        raise TracError(_("Invalid email encoding setting: %(mime_encoding)s",
                          mime_encoding=mime_encoding))
    return charset


def create_header(key, value, charset):
    """Create an email Header.

    The `key` is always a string and will be converted to the
    appropriate `charset`. The `value` can either be a string or a
    two-element tuple where the first item is the name and the
    second item is the email address.

    See `set_header()` for a helper that sets a header directly on a
    message.
    """
    maxlength = MAXHEADERLEN-(len(key)+2)
    # Do not sent very short headers
    if maxlength < 10:
        raise TracError(_("Header length is too short"))

    email = None
    if isinstance(value, (tuple, list)):
        value, email = value
    if not isinstance(value, str):
        value = to_unicode(value)
    if not value:
        return email

    # when it matches mime-encoding, encode as mime even if only
    # ascii characters
    header = None
    if not _mime_encoding_re.search(value):
        try:
            tmp = value.encode('ascii')
        except UnicodeEncodeError:
            pass
        else:
            header = Header(tmp, 'ascii', maxlinelen=maxlength)
    if not header:
        header = Header(value.encode(charset.output_codec), charset,
                        maxlinelen=maxlength)
    header = str(header)
    if email:
        header = header.replace('\\', r'\\').replace('"', r'\"')
        header = '"%s" <%s>' % (header, email)
    return header


def create_address_header(addresses):
    """Create address header instance to pass to `set_header`.

    The `addresses` is a list or an iterable of addresses. The item can
    either be `str`, a `(name, address)` tuple or a `(None, address)`.
    """
    if isinstance(addresses, str):
        addresses = [(None, addresses)]
    l = []
    for item in addresses:
        if isinstance(item, Address):
            l.append(item)
            continue
        if isinstance(item, str):
            name = None
            addr = item
        elif isinstance(item, (list, tuple)):
            name, addr = item
        else:
            raise ValueError('Unrecognized item %r' % item)
        if '@' in addr:
            username, domain = addr.rsplit('@', 1)
        else:
            username = addr
            domain = ''
        l.append(Address(_replace_encoded_words(name or ''), username, domain))
    return l


def set_header(message, key, value=None, charset=None, addresses=None):
    """Create and add or replace a header in a `EmailMessage`.

    The `key` is always a string. The `value` can either be `None`, a
    string or a two-element tuple where the first item is the name and
    the second item is the email address.

    The `addresses` can either be a list or an iterable of a two-element
    tuple. When the `addresses` is given, the `value` will be ignored.

    The `charset` is no longer used.

    Example::

        set_header(my_message, 'From', ('Trac', 'noreply@ourcompany.com'))

        set_header(my_message, 'To',
                   addresses=[('Foo', 'foo@example.org'),
                              ('Bar', 'bar@example.org')])
    """
    if addresses is not None:
        header = create_address_header(addresses)
    elif isinstance(value, (list, tuple)):  # a pair of name and address
        header = create_address_header([value])
    elif isinstance(value, Address):
        header = value
    elif value is None:
        header = ''
    else:
        header = _replace_encoded_words(str(value))
    if key in message:
        message.replace_header(key, header)
    else:
        message[key] = header


_encoded_words_re = re.compile(r'=\?')


def _replace_encoded_words(text):
    """Replace '=?' with '=\u200b?' to avoid decoding encoded-words by
    `EmailMessage`.
    """
    if text:
        text = _encoded_words_re.sub('=\u200b?', text)
    return text


def create_mime_multipart(subtype):
    """Create a multipart email message.

    The `subtype` is a string that describes the type of multipart
    message you are defining. You should pick one that is defined
    by the email standards. The function does not check if the `subtype`
    is valid.

    The most common examples are:

    * `related` infers that each part is in an integrated whole, like
      images that are embedded in a html part.
    * `alternative` infers that the message contains different formats
      and the client can choose which to display based on capabilities
      and user preferences, such as a text/html with an alternative
      text/plain.
    """
    msg = EmailMessage()
    if subtype == 'related':
        msg.make_related()
    elif subtype == 'alternative':
        msg.make_alternative()
    elif subtype == 'mixed':
        msg.make_mixed()
    else:
        raise ValueError("subtype must be one of ('related', 'multipart', "
                         "'mixed'), not %r" % subtype)
    msg['MIME-Version'] = '1.0'
    return msg


def create_mime_text(body, format, charset):
    """Create a `EmailMessage` that can be added to an email message.

    :param body: a string with the body of the message.
    :param format: each text has a EmailMessage, like `text/plain`. The
        supertype is always `text`, so in the `format` parameter you
        pass the subtype, like `plain` or `html`.
    :param charset: should be created using `create_charset()`.
    """
    if isinstance(body, bytes):
        body = str(body, 'utf-8')
    cte = {BASE64: 'base64', QP: 'quoted-printable'}.get(charset.body_encoding)
    msg = EmailMessage(_policy_8bit if cte is None else _policy_default)
    msg['MIME-Version'] = '1.0'
    msg.set_content(body, subtype=format, cte=cte)
    return msg


def create_message_id(env, targetid, from_email, time, more=None):
    """Generate a predictable, but sufficiently unique message ID.

    In case you want to set the "Message ID" header, this convenience
    function will generate one by running a hash algorithm over a number
    of properties.

    :param env: the `Environment`
    :param targetid: a string that identifies the target, like
        `NotificationEvent.target`
    :param from_email: the email address that the message is sent from
    :param time: a Python `datetime`
    :param more: a string that contains additional information that
        makes this message unique
    """
    items = [env.project_url, targetid, to_utimestamp(time)]
    if more is not None:
        items.append(more.encode('ascii', 'ignore'))
    source = b'.'.join(item if isinstance(item, bytes) else
                       str(item).encode('utf-8')
                       for item in items)
    hash_type = NotificationSystem(env).message_id_hash
    try:
        h = hashlib.new(hash_type)
    except:
        raise ConfigurationError(_("Unknown hash type '%(type)s'",
                                   type=hash_type))
    h.update(source)
    host = from_email[from_email.find('@') + 1:]
    return '<%03d.%s@%s>' % (len(source), h.hexdigest(), host)


def get_message_addresses(message, name):
    return getaddresses(str(header) for header in message.get_all(name, ()))


def get_from_author(env, event):
    """Get the author name and email from a given `event`.

    The `event` parameter should be of the type `NotificationEvent`.
    If you only have the username of a Trac user, you should instead
    use the `RecipientMatcher` to find the user's details.

    The method returns a tuple that contains the name and email address
    of the user. For example: `('developer', 'developer@ourcompany.com')`.
    This tuple can be parsed by `set_header()`.
    """
    if event.author and NotificationSystem(env).smtp_from_author:
        matcher = RecipientMatcher(env)
        from_ = matcher.match_from_author(event.author)
        if from_:
            return from_


class RecipientMatcher(object):
    """Matches user names and email addresses.

    :param env: The `trac.env.Enviroment`
    """
    nodomaddr_re = re.compile(r"^[-A-Za-z0-9!*+/=_.]+$")

    def __init__(self, env):
        self.env = env
        addrfmt = EMAIL_LOOKALIKE_PATTERN
        self.notify_sys = NotificationSystem(env)
        admit_domains = self.notify_sys.admit_domains_list
        if admit_domains:
            localfmt, domainfmt = addrfmt.split('@')
            domains = [domainfmt]
            domains.extend(re.escape(x) for x in admit_domains)
            addrfmt = r'%s@(?:%s)' % (localfmt, '|'.join(domains))
        self.shortaddr_re = re.compile(r'<?(%s)>?$' % addrfmt, re.IGNORECASE)
        self.longaddr_re = re.compile(r'(.*)\s+<\s*(%s)\s*>$' % addrfmt,
                                      re.IGNORECASE)
        self.ignore_domains = set(x.lower()
                                  for x in self.notify_sys.ignore_domains_list)

    @lazy
    def use_short_addr(self):
        return self.notify_sys.use_short_addr

    @lazy
    def smtp_default_domain(self):
        return self.notify_sys.smtp_default_domain

    @lazy
    def users(self):
        return self.env.get_known_users(as_dict=True)

    def is_email(self, address):
        """Check if an email address is valid.

        This method checks against the list of domains that are
        to be ignored, which is controlled by the `ignore_domains_list`
        configuration option.

        :param address: the address to validate
        :return: `True` if it is a valid email address that is not in
            the ignore list.
        """
        if not address:
            return False
        match = self.shortaddr_re.match(address)
        if match:
            domain = address[address.find('@') + 1:].lower()
            if domain not in self.ignore_domains:
                return True
        return False

    def match_recipient(self, address):
        """Convenience function to check for an email address

        The parameter `address` can either be a valid user name,
        or an email address. The method first checks if the parameter
        is a valid user name. If so, it will look up the address. If
        there is no match, the function will check if it is a valid
        email address.

        :return: A tuple with a session id, a `1` or `0` to indicate
            whether the user is authenticated, and the matched address.
            Returns `None` when `address` does not match a valid user,
            nor a valid email address. When `address` is an email address,
            the sid will be `None` and the authentication parameter
            will always be `0`
        """
        if not address or address == 'anonymous':
            return None

        if address in self.users:
            sid = address
            auth = 1
            address = (self.users[address][1] or '').strip() or sid
        else:
            sid = None
            auth = 0
            address = address.strip()

        if self.nodomaddr_re.match(address):
            if self.use_short_addr:
                return sid, auth, address
            if self.smtp_default_domain:
                address = "%s@%s" % (address, self.smtp_default_domain)
                return sid, auth, address
            self.env.log.debug("Email address w/o domain: %s", address)
            return None

        mo = self.shortaddr_re.match(address)
        if mo:
            address = mo.group(1)
        else:
            mo = self.longaddr_re.match(address)
            if mo:
                address = mo.group(2)
        if not self.is_email(address):
            self.env.log.debug("Invalid email address: %s", address)
            return None
        return sid, auth, address

    def match_from_author(self, author):
        """Find a name and email address for a specific user

        :param author: The username that you want to query.
        :return: On success, a two-item tuple is returned, with the
            real name and the email address of the user.
        """
        if author:
            author = author.strip()
        recipient = self.match_recipient(author)
        if not recipient:
            return None
        sid, authenticated, address = recipient
        if not address:
            return None
        from_name = None
        if sid and authenticated and sid in self.users:
            from_name = self.users[sid][0]
        if not from_name:
            mo = self.longaddr_re.match(author)
            if mo:
                from_name = mo.group(1)
        return (from_name, address) if from_name else address


class EmailDistributor(Component):
    """Distributes notification events as emails."""

    implements(INotificationDistributor)

    formatters = ExtensionPoint(INotificationFormatter)
    decorators = ExtensionPoint(IEmailDecorator)

    resolvers = OrderedExtensionsOption('notification',
        'email_address_resolvers', IEmailAddressResolver,
        'SessionEmailResolver',
        include_missing=False,
        doc="""Comma separated list of email resolver components in the order
        they will be called. If an email address is resolved, the remaining
        resolvers will not be called.
        """)

    default_format = Option('notification', 'default_format.email',
        'text/plain', doc="Default format to distribute email notifications.")

    def __init__(self):
        self._charset = create_charset(self.config.get('notification',
                                                       'mime_encoding'))

    # INotificationDistributor methods

    def transports(self):
        yield 'email'

    def distribute(self, transport, recipients, event):
        if transport != 'email':
            return
        if not self.config.getbool('notification', 'smtp_enabled'):
            self.log.debug("%s skipped because smtp_enabled set to false",
                           self.__class__.__name__)
            return

        formats = {}
        for f in self.formatters:
            for style, realm in f.get_supported_styles(transport):
                if realm == event.realm:
                    formats[style] = f
        if not formats:
            self.log.error("%s No formats found for %s %s",
                           self.__class__.__name__, transport, event.realm)
            return
        self.log.debug("%s has found the following formats capable of "
                       "handling '%s' of '%s': %s", self.__class__.__name__,
                       transport, event.realm, ', '.join(formats))

        matcher = RecipientMatcher(self.env)
        notify_sys = NotificationSystem(self.env)
        always_cc = set(notify_sys.smtp_always_cc_list)
        addresses = {}
        for sid, auth, addr, fmt in recipients:
            if fmt not in formats:
                self.log.debug("%s format %s not available for %s %s",
                               self.__class__.__name__, fmt, transport,
                               event.realm)
                continue

            if sid and not addr:
                for resolver in self.resolvers:
                    addr = resolver.get_address_for_session(sid, auth) or None
                    if addr:
                        self.log.debug(
                            "%s found the address '%s' for '%s [%s]' via %s",
                            self.__class__.__name__, addr, sid, auth,
                            resolver.__class__.__name__)
                        break
            if sid and auth and not addr:
                addr = sid
            if notify_sys.smtp_default_domain and \
                    not notify_sys.use_short_addr and \
                    addr and matcher.nodomaddr_re.match(addr):
                addr = '%s@%s' % (addr, notify_sys.smtp_default_domain)
            if not addr:
                self.log.debug("%s was unable to find an address for "
                               "'%s [%s]'", self.__class__.__name__, sid, auth)
            elif matcher.is_email(addr) or \
                    notify_sys.use_short_addr and \
                    matcher.nodomaddr_re.match(addr):
                addresses.setdefault(fmt, set()).add(addr)
                if sid and auth and sid in always_cc:
                    always_cc.discard(sid)
                    always_cc.add(addr)
                elif notify_sys.use_public_cc:
                    always_cc.add(addr)
            else:
                self.log.debug("%s was unable to use an address '%s' for '%s "
                               "[%s]'", self.__class__.__name__, addr, sid,
                               auth)

        outputs = {}
        failed = []
        for fmt, formatter in formats.items():
            if fmt not in addresses and fmt != 'text/plain':
                continue
            try:
                outputs[fmt] = formatter.format(transport, fmt, event)
            except Exception as e:
                self.log.warning('%s caught exception while '
                                 'formatting %s to %s for %s: %s%s',
                                 self.__class__.__name__, event.realm, fmt,
                                 transport, formatter.__class__,
                                 exception_to_unicode(e, traceback=True))
                failed.append(fmt)

        # Fallback to text/plain when formatter is broken
        if failed and 'text/plain' in outputs:
            for fmt in failed:
                addresses.setdefault('text/plain', set()) \
                         .update(addresses.pop(fmt, ()))

        for fmt, addrs in addresses.items():
            self.log.debug("%s is sending event as '%s' to: %s",
                           self.__class__.__name__, fmt, ', '.join(addrs))
            message = self._create_message(fmt, outputs)
            if message:
                addrs = set(addrs)
                cc_addrs = sorted(addrs & always_cc)
                bcc_addrs = sorted(addrs - always_cc)
                self._do_send(transport, event, message, cc_addrs, bcc_addrs)
            else:
                self.log.warning("%s cannot send event '%s' as '%s': %s",
                                 self.__class__.__name__, event.realm, fmt,
                                 ', '.join(addrs))

    def _create_message(self, format, outputs):
        if format not in outputs:
            return None
        message = create_mime_multipart('related')
        maintype, subtype = format.split('/')
        preferred = create_mime_text(outputs[format], subtype, self._charset)
        if format != 'text/plain' and 'text/plain' in outputs:
            text = create_mime_text(outputs['text/plain'], 'plain',
                                    self._charset)
            alternative = create_mime_multipart('alternative')
            alternative.attach(text)
            alternative.attach(preferred)
            preferred = alternative
        message.attach(preferred)
        return message

    def _do_send(self, transport, event, message, cc_addrs, bcc_addrs):
        notify_sys = NotificationSystem(self.env)
        smtp_from = notify_sys.smtp_from
        smtp_from_name = notify_sys.smtp_from_name or self.env.project_name
        smtp_replyto = notify_sys.smtp_replyto
        if not notify_sys.use_short_addr and notify_sys.smtp_default_domain:
            if smtp_from and '@' not in smtp_from:
                smtp_from = '%s@%s' % (smtp_from,
                                       notify_sys.smtp_default_domain)
            if smtp_replyto and '@' not in smtp_replyto:
                smtp_replyto = '%s@%s' % (smtp_replyto,
                                          notify_sys.smtp_default_domain)

        headers = {}
        headers['X-Mailer'] = 'Trac %s, by Edgewall Software'\
                              % self.env.trac_version
        headers['X-Trac-Version'] = self.env.trac_version
        headers['X-Trac-Project'] = self.env.project_name
        headers['X-URL'] = self.env.project_url
        headers['X-Trac-Realm'] = event.realm
        headers['Precedence'] = 'bulk'
        headers['Auto-Submitted'] = 'auto-generated'
        if isinstance(event.target, (list, tuple)):
            targetid = ','.join(map(get_target_id, event.target))
        else:
            targetid = get_target_id(event.target)
        rootid = create_message_id(self.env, targetid, smtp_from, None,
                                   more=event.realm)
        if event.category == 'created':
            headers['Message-ID'] = rootid
        else:
            headers['Message-ID'] = create_message_id(self.env, targetid,
                                                      smtp_from, event.time,
                                                      more=event.realm)
            headers['In-Reply-To'] = rootid
            headers['References'] = rootid
        headers['Date'] = formatdate()
        headers['To'] = 'undisclosed-recipients: ;'
        for k, v in headers.items():
            set_header(message, k, v)

        set_header(message, 'From', (smtp_from_name, smtp_from)
                                    if smtp_from_name else smtp_from)
        if cc_addrs:
            set_header(message, 'Cc', addresses=cc_addrs)
        if bcc_addrs:
            set_header(message, 'Bcc', addresses=bcc_addrs)
        set_header(message, 'Reply-To', addresses=[smtp_replyto])

        for decorator in self.decorators:
            decorator.decorate_message(event, message, self._charset)

        from_name, from_addr = parseaddr(str(message['From']))
        to_addrs = set()
        for name in ('To', 'Cc', 'Bcc'):
            values = map(str, message.get_all(name, ()))
            to_addrs.update(addr for name, addr in getaddresses(values)
                                 if addr)
        del message['Bcc']
        notify_sys.send_email(from_addr, list(to_addrs), message.as_bytes())


# Workaround for SMTP with unicode credentials
# https://bugs.python.org/issue29750
class SMTPWithUnicodeCredsFixup(smtplib.SMTP):

    def auth_plain(self, challenge=None):
        rv = super().auth_plain(challenge=challenge)
        return self._patch_encode_method(rv)

    def auth_login(self, challenge=None):
        rv = super().auth_login(challenge=challenge)
        return self._patch_encode_method(rv)

    def auth_cram_md5(self, challenge=None):
        if challenge is None:
            return None
        rv = self.user + ' ' + hmac.HMAC(self.password.encode('utf-8'),
                                         challenge, 'md5').hexdigest()
        return self._patch_encode_method(rv)

    @classmethod
    def _patch_encode_method(cls, value):
        try:
            value.encode('ascii')
        except UnicodeEncodeError:
            return _SMTPAuthResponse(value)
        else:
            return value


class _SMTPAuthResponse(object):

    __slots__ = ('response',)

    def __init__(self, response):
        self.response = response

    def encode(self, encoding='utf-8', errors='strict'):
        return self.response.encode('utf-8')


class SmtpEmailSender(Component):
    """E-mail sender connecting to an SMTP server."""

    implements(IEmailSender)

    smtp_server = Option('notification', 'smtp_server', 'localhost',
        """SMTP server hostname to use for email notifications.""")

    smtp_port = IntOption('notification', 'smtp_port', 25,
        """SMTP server port to use for email notification.""")

    smtp_user = Option('notification', 'smtp_user', '',
        """Username for authenticating with SMTP server.""")

    smtp_password = Option('notification', 'smtp_password', '',
        """Password for authenticating with SMTP server.""")

    use_tls = BoolOption('notification', 'use_tls', 'false',
        """Use SSL/TLS to send notifications over SMTP.""")

    @lazy
    def smtp_class(self):
        try:
            for value in (self.smtp_user, self.smtp_password):
                value.encode('ascii')
        except UnicodeEncodeError:
            return SMTPWithUnicodeCredsFixup
        else:
            return smtplib.SMTP

    def send(self, from_addr, recipients, message):
        global local_hostname

        # Ensure the message complies with RFC2822: use CRLF line endings
        message = fix_eol(message, CRLF)

        self.log.info("Sending notification through SMTP at %s:%d to %s",
                      self.smtp_server, self.smtp_port, recipients)
        try:
            server = self.smtp_class(self.smtp_server, self.smtp_port,
                                     local_hostname)
        except smtplib.socket.error as e:
            raise ConfigurationError(
                tag_("SMTP server connection error (%(error)s). Please "
                     "modify %(option1)s or %(option2)s in your "
                     "configuration.",
                     error=to_unicode(e),
                     option1=tag.code("[notification] smtp_server"),
                     option2=tag.code("[notification] smtp_port")))
        else:
            local_hostname = server.local_hostname

        try:
            # server.set_debuglevel(True)
            if self.use_tls:
                server.ehlo()
                if 'starttls' not in server.esmtp_features:
                    raise ConfigurationError(_("TLS enabled but server does "
                                               "not support TLS"))
                server.starttls()
                server.ehlo()
            if self.smtp_user:
                server.login(self.smtp_user, self.smtp_password)
            start = time_now()
            server.sendmail(from_addr, recipients, message)
            t = time_now() - start
            if t > 5:
                self.log.warning("Slow mail submission (%.2f s), "
                                 "check your mail setup", t)
            if self.use_tls:
                # avoid false failure detection when the server closes
                # the SMTP connection with TLS enabled
                import socket
                try:
                    server.quit()
                except socket.sslerror:
                    pass
            else:
                server.quit()
        except smtplib.SMTPException:
            raise
        except Exception as e:
            self.log.error("Exception caught while sending notification "
                           "through SMTP at %s:%d%s",
                           self.smtp_server, self.smtp_port,
                           exception_to_unicode(e, traceback=True))
            raise
        finally:
            server.close()


class SendmailEmailSender(Component):
    """E-mail sender using a locally-installed sendmail program."""

    implements(IEmailSender)

    sendmail_path = Option('notification', 'sendmail_path', 'sendmail',
        """Path to the sendmail executable.

        The sendmail program must accept the `-i` and `-f` options.
        """)

    def send(self, from_addr, recipients, message):
        # Use native line endings in message
        message = fix_eol(message, os.linesep)

        self.log.info("Sending notification through sendmail at %s to %s",
                      self.sendmail_path, recipients)
        cmdline = [self.sendmail_path, '-i', '-f', from_addr] + recipients
        self.log.debug("Sendmail command line: %s", cmdline)
        try:
            child = Popen(cmdline, bufsize=-1, stdin=PIPE, stdout=PIPE,
                          stderr=PIPE, close_fds=close_fds)
        except OSError as e:
            raise ConfigurationError(
                tag_("Sendmail error (%(error)s). Please modify %(option)s "
                     "in your configuration.",
                     error=to_unicode(e),
                     option=tag.code("[notification] sendmail_path")))
        out, err = child.communicate(message)
        if child.returncode or err:
            raise Exception("Sendmail failed with (%s, %s), command: '%s'"
                            % (child.returncode, err.strip(), cmdline))


class SessionEmailResolver(Component):
    """Gets the email address from the user preferences / session."""

    implements(IEmailAddressResolver)

    def get_address_for_session(self, sid, authenticated):
        return get_session_attribute(self.env, sid, authenticated, 'email')


class AlwaysEmailSubscriber(Component):
    """Implement a policy to -always- send an email to a certain address.

    Controlled via the smtp_always_cc and smtp_always_bcc option in the
    notification section of trac.ini.
    """

    implements(INotificationSubscriber)

    def matches(self, event):
        matcher = RecipientMatcher(self.env)
        klass = self.__class__.__name__
        format = None
        priority = 0
        for address in self._get_address_list():
            recipient = matcher.match_recipient(address)
            if recipient:
                sid, authenticated, address = recipient
                yield (klass, 'email', sid, authenticated, address, format,
                       priority, 'always')

    def description(self):
        return None  # not configurable

    def requires_authentication(self):
        return False

    def default_subscriptions(self):
        return ()

    def _get_address_list(self):
        section = self.config['notification']
        def getlist(name):
            return section.getlist(name, sep=(',', ' '), keep_empty=False)
        return set(getlist('smtp_always_cc')) | \
               set(getlist('smtp_always_bcc'))


class FromAuthorEmailDecorator(Component):
    """Implement a policy to use the author of the event as the sender in
    notification emails.

    Controlled via the smtp_from_author option in the notification section
    of trac.ini.
    """

    implements(IEmailDecorator)

    def decorate_message(self, event, message, charset):
        from_ = get_from_author(self.env, event)
        if from_:
            set_header(message, 'From', addresses=[from_])