edgewall/trac

View on GitHub
trac/ticket/notification.py

Summary

Maintainability
F
5 days
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) 2003-2023 Edgewall Software
# Copyright (C) 2003-2005 Daniel Lundin <daniel@edgewall.com>
# Copyright (C) 2005-2006 Emmanuel Blot <emmanuel.blot@free.fr>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Daniel Lundin <daniel@edgewall.com>
#

import re

from trac.api import IEnvironmentSetupParticipant
from trac.attachment import IAttachmentChangeListener
from trac.core import *
from trac.config import *
from trac.notification.api import (IEmailDecorator, INotificationFormatter,
                                   INotificationSubscriber,
                                   NotificationEvent, NotificationSystem)
from trac.notification.mail import (RecipientMatcher, create_message_id,
                                    get_from_author, get_message_addresses,
                                    set_header)
from trac.notification.model import Subscription
from trac.perm import PermissionSystem
from trac.ticket.api import translation_deactivated
from trac.ticket.model import Ticket, sort_tickets_by_priority
from trac.util import lazy
from trac.util.datefmt import format_date_or_datetime, get_timezone
from trac.util.text import (CRLF, exception_to_unicode, jinja2template,
                            shorten_line, text_width, wrap)
from trac.util.translation import _
from trac.web.chrome import Chrome


class TicketNotificationSystem(Component):

    implements(IEnvironmentSetupParticipant)

    def environment_created(self):
        section = 'notification-subscriber'
        if section not in self.config:
            self.config.set(section, 'always_notify_cc',
                            'CarbonCopySubscriber')
            self.config.set(section, 'always_notify_updater',
                            'TicketUpdaterSubscriber')
            self.config.set(section, 'always_notify_previous_updater',
                            'TicketPreviousUpdatersSubscriber')
            self.config.save()

    def environment_needs_upgrade(self):
        return False

    def upgrade_environment(self):
        pass


class TicketChangeEvent(NotificationEvent):
    """Represent a ticket change `NotificationEvent`."""

    def __init__(self, category, target, time, author, comment=None,
                 changes=None, attachment=None):
        super().__init__('ticket', category, target,
                                                time, author)
        self.comment = comment
        if changes is None and time is not None:
            changes = target.get_change(cdate=time)
        self.changes = changes or {}
        self.attachment = attachment


class BatchTicketChangeEvent(NotificationEvent):
    """Represent a ticket batch modify `NotificationEvent`."""

    def __init__(self, targets, time, author, comment, new_values, action):
        super().__init__('ticket', 'batchmodify',
                                                     targets, time, author)
        self.comment = comment
        self.new_values = new_values
        self.action = action

    def get_ticket_change_events(self, env):
        for id in self.target:
            model = Ticket(env, id)
            yield TicketChangeEvent('changed', model, self.time, self.author,
                                    self.comment)


class TicketFormatter(Component):
    """Format `TicketChangeEvent` notifications."""

    implements(INotificationFormatter, IEmailDecorator)

    COLS = 75
    addrsep_re = re.compile(r'[;\s,]+')

    ambiguous_char_width = Option('notification', 'ambiguous_char_width',
                                  'single',
        """Width of ambiguous characters that should be used in the table
        of the notification mail.

        If `single`, the same width as characters in US-ASCII. This is
        expected by most users. If `double`, twice the width of
        US-ASCII characters.  This is expected by CJK users.
        """)

    batch_subject_template = Option('notification', 'batch_subject_template',
                                    '${prefix} Batch modify: ${tickets_descr}',
        """Like `ticket_subject_template` but for batch modifications.
        (''since 1.0'')""")

    ticket_subject_template = Option('notification', 'ticket_subject_template',
                                     '${prefix} #${ticket.id}: ${summary}',
        """A Jinja2 text template snippet used to get the notification
        subject.

        The template variables are documented on the
        [TracNotification#Customizingthee-mailsubject TracNotification] page.
        """)

    @lazy
    def ambiwidth(self):
        return 2 if self.ambiguous_char_width == 'double' else 1

    def get_supported_styles(self, transport):
        yield 'text/plain', 'ticket'

    def format(self, transport, style, event):
        if event.realm != 'ticket':
            return
        if event.category == 'batchmodify':
            return self._format_plaintext_batchmodify(event)
        if event.category in ('attachment added', 'attachment deleted'):
            return self._format_plaintext_attachment(event)
        else:
            return self._format_plaintext(event)

    def _format_plaintext(self, event):
        """Format ticket change notification e-mail (untranslated)"""
        ticket = event.target
        newticket = event.category == 'created'
        with translation_deactivated(ticket):
            link = self.env.abs_href.ticket(ticket.id)

            changes_body = ''
            changes_descr = ''
            change_data = {}
            if not newticket and event.time:  # Ticket change
                from trac.ticket.web_ui import TicketModule
                for change in TicketModule(self.env) \
                              .grouped_changelog_entries(ticket,
                                                         when=event.time):
                    if not change['permanent']:  # attachment with same time...
                        continue
                    author = change['author']
                    change_data.update({
                        'author': self._format_author(author),
                        'comment': wrap(change['comment'], self.COLS, ' ', ' ',
                                        '\n', self.ambiwidth)
                    })
                    link += '#comment:%s' % str(change.get('cnum', ''))
                    for field, values in change['fields'].items():
                        old = values['old']
                        new = values['new']
                        newv = ''
                        if field == 'description':
                            new_descr = wrap(new, self.COLS, ' ', ' ', '\n',
                                             self.ambiwidth)
                            old_descr = wrap(old, self.COLS, '> ', '> ', '\n',
                                             self.ambiwidth)
                            old_descr = old_descr.replace(2 * '\n', '\n' + '>' +
                                                          '\n')
                            cdescr = '\n'
                            cdescr += 'Old description:' + 2 * '\n' + old_descr + \
                                      2 * '\n'
                            cdescr += 'New description:' + 2 * '\n' + new_descr + \
                                      '\n'
                            changes_descr = cdescr
                        elif field == 'cc':
                            addcc, delcc = self._diff_cc(old, new)
                            chgcc = ''
                            if delcc:
                                chgcc += wrap(" * cc: %s (removed)" %
                                              ', '.join(delcc),
                                              self.COLS, ' ', ' ', '\n',
                                              self.ambiwidth) + '\n'
                            if addcc:
                                chgcc += wrap(" * cc: %s (added)" %
                                              ', '.join(addcc),
                                              self.COLS, ' ', ' ', '\n',
                                              self.ambiwidth) + '\n'
                            if chgcc:
                                changes_body += chgcc
                        else:
                            if field in ['owner', 'reporter']:
                                old = self._format_author(old)
                                new = self._format_author(new)
                            elif field in ticket.time_fields:
                                format = ticket.fields.by_name(field) \
                                                      .get('format')
                                old = self._format_time_field(old, format)
                                new = self._format_time_field(new, format)
                            newv = new
                            length = 7 + len(field)
                            spacer_old, spacer_new = ' ', ' '
                            if len(old + new) + length > self.COLS:
                                length = 5
                                if len(old) + length > self.COLS:
                                    spacer_old = '\n'
                                if len(new) + length > self.COLS:
                                    spacer_new = '\n'
                            chg = '* %s: %s%s%s=>%s%s' \
                                  % (field, spacer_old, old,
                                     spacer_old, spacer_new, new)
                            chg = chg.replace('\n', '\n' + length * ' ')
                            chg = wrap(chg, self.COLS, '', length * ' ', '\n',
                                       self.ambiwidth)
                            changes_body += ' %s%s' % (chg, '\n')
                        if newv:
                            change_data[field] = {'oldvalue': old,
                                                  'newvalue': new}

            ticket_values = ticket.values.copy()
            ticket_values['id'] = ticket.id
            ticket_values['description'] = wrap(
                ticket_values.get('description', ''), self.COLS,
                initial_indent=' ', subsequent_indent=' ', linesep='\n',
                ambiwidth=self.ambiwidth)
            ticket_values['new'] = newticket
            ticket_values['link'] = link

            data = Chrome(self.env).populate_data(None, {
                'CRLF': CRLF,
                'ticket_props': self._format_props(ticket),
                'ticket_body_hdr': self._format_hdr(ticket),
                'ticket': ticket_values,
                'changes_body': changes_body,
                'changes_descr': changes_descr,
                'change': change_data
            })
            return self._format_body(data, 'ticket_notify_email.txt')

    def _format_plaintext_attachment(self, event):
        """Format ticket attachment notification e-mail (untranslated)"""
        ticket = event.target
        added = event.category == 'attachment added'
        newticket = False
        link = self.env.abs_href.ticket(ticket.id)
        author = event.attachment.author
        with translation_deactivated(ticket):
            changes_body = wrap(" * Attachment \"%s\" %s."
                                % (event.attachment.filename,
                                   "added" if added else "removed"),
                                self.COLS, ' ', ' ', '\n',
                                self.ambiwidth) + "\n"
            if event.attachment.description:
                changes_body += "\n" + wrap(event.attachment.description,
                                            self.COLS, ' ', ' ', '\n',
                                            self.ambiwidth)

            ticket_values = ticket.values.copy()
            ticket_values['id'] = ticket.id
            ticket_values['description'] = wrap(
                ticket_values.get('description', ''), self.COLS,
                initial_indent=' ', subsequent_indent=' ', linesep='\n',
                ambiwidth=self.ambiwidth)
            ticket_values['new'] = newticket
            ticket_values['link'] = link

            data = Chrome(self.env).populate_data(None, {
                'CRLF': CRLF,
                'ticket_props': self._format_props(ticket),
                'ticket_body_hdr': self._format_hdr(ticket),
                'ticket': ticket_values,
                'changes_body': changes_body,
                'changes_descr': '',
                'change': {'author': self._format_author(author)},
            })
            return self._format_body(data, 'ticket_notify_email.txt')

    def _format_plaintext_batchmodify(self, event):
        """Format batch ticket change notification e-mail (untranslated)"""
        with translation_deactivated():
            tickets = sort_tickets_by_priority(self.env, event.target)
            changes_descr = '\n'.join('%s to %s' % (prop, val)
                                      for prop, val
                                      in event.new_values.items())
            tickets_descr = ', '.join('#%s' % t for t in tickets)
            link = self.env.abs_href.query(id=','.join(str(t) for t in tickets))
            data = Chrome(self.env).populate_data(None, {
                'CRLF': CRLF,
                'tickets_descr': tickets_descr,
                'changes_descr': changes_descr,
                'comment': event.comment,
                'action': event.action,
                'author': event.author,
                'ticket_query_link': link,
            })
            return self._format_body(data, 'batch_ticket_notify_email.txt')

    def _format_author(self, author):
        return Chrome(self.env).format_author(None, author)

    def _format_body(self, data, template_name):
        chrome = Chrome(self.env)
        template = chrome.load_template(template_name, text=True)
        with translation_deactivated():  # don't translate the e-mail stream
            body = chrome.render_template_string(template, data, text=True)
            return body.encode('utf-8')

    def _format_subj(self, event):
        is_newticket = event.category == 'created'
        ticket = event.target

        summary = ticket['summary']
        if event.changes and 'summary' in event.changes['fields']:
            change = event.changes['fields']['summary']
            summary = "%s (was: %s)" % (change['new'], change['old'])

        prefix = self.config.get('notification', 'smtp_subject_prefix')
        if prefix == '__default__':
            prefix = '[%s]' % self.env.project_name

        data = {
            'prefix': prefix,
            'summary': summary,
            'ticket': ticket,
            'changes': event.changes,
            'env': self.env,
        }

        template = _template_from_string(self.ticket_subject_template)
        subj = template.render(**data).strip()
        if not is_newticket:
            subj = "Re: " + subj
        return subj

    def _format_subj_batchmodify(self, tickets):
        tickets_descr = ', '.join('#%s' % t for t in tickets)

        template = _template_from_string(self.batch_subject_template)

        prefix = self.config.get('notification', 'smtp_subject_prefix')
        if prefix == '__default__':
            prefix = '[%s]' % self.env.project_name

        data = {
            'prefix': prefix,
            'tickets_descr': tickets_descr,
            'env': self.env,
        }
        subj = template.render(**data).strip()
        return shorten_line(subj)

    def _format_hdr(self, ticket):
        return '#%s: %s' % (ticket.id, wrap(ticket['summary'], self.COLS,
                                            linesep='\n',
                                            ambiwidth=self.ambiwidth))

    def _format_props(self, ticket):
        fields = [f for f in ticket.fields
                  if f['name'] not in ('summary', 'cc', 'time', 'changetime')]
        width = [0, 0, 0, 0]
        i = 0
        for f in fields:
            if f['type'] == 'textarea':
                continue
            fname = f['name']
            if fname not in ticket.values:
                continue
            fval = ticket[fname] or ''
            if fname in ticket.time_fields:
                format = ticket.fields.by_name(fname).get('format')
                fval = self._format_time_field(fval, format)
            if fval.find('\n') != -1:
                continue
            if fname in ['owner', 'reporter']:
                fval = self._format_author(fval)
            idx = 2 * (i % 2)
            width[idx] = max(self._get_text_width(f['label']), width[idx])
            width[idx + 1] = max(self._get_text_width(fval), width[idx + 1])
            i += 1
        width_l = width[0] + width[1] + 5
        width_r = width[2] + width[3] + 5
        half_cols = (self.COLS - 1) // 2
        if width_l + width_r + 1 > self.COLS:
            if ((width_l > half_cols and width_r > half_cols) or
                    (width[0] > half_cols // 2 or width[2] > half_cols // 2)):
                width_l = half_cols
                width_r = half_cols
            elif width_l > width_r:
                width_l = min((self.COLS - 1) * 2 // 3, width_l)
                width_r = self.COLS - width_l - 1
            else:
                width_r = min((self.COLS - 1) * 2 // 3, width_r)
                width_l = self.COLS - width_r - 1
        sep = width_l * '-' + '+' + width_r * '-'
        txt = sep + '\n'
        vals_lr = ([], [])
        big = []
        i = 0
        width_lr = [width_l, width_r]
        for f in [f for f in fields if f['name'] != 'description']:
            fname = f['name']
            if fname not in ticket.values:
                continue
            fval = ticket[fname] or ''
            if fname in ticket.time_fields:
                format = ticket.fields.by_name(fname).get('format')
                fval = self._format_time_field(fval, format)
            if fname in ['owner', 'reporter']:
                fval = self._format_author(fval)
            if f['type'] == 'textarea' or '\n' in str(fval):
                big.append((f['label'], '\n'.join(fval.splitlines())))
            else:
                # Note: f['label'] is a Babel's LazyObject, make sure its
                # __str__ method won't be called.
                str_tmp = '%s:  %s' % (f['label'], str(fval))
                idx = i % 2
                initial_indent = ' ' * (width[2 * idx] -
                                        self._get_text_width(f['label']) +
                                        2 * idx)
                wrapped = wrap(str_tmp, width_lr[idx] - 2 + 2 * idx,
                               initial_indent, '  ', '\n', self.ambiwidth)
                vals_lr[idx].append(wrapped.splitlines())
                i += 1
        if len(vals_lr[0]) > len(vals_lr[1]):
            vals_lr[1].append([])

        cell_l = []
        cell_r = []
        for i in range(len(vals_lr[0])):
            vals_l = vals_lr[0][i]
            vals_r = vals_lr[1][i]
            vals_diff = len(vals_l) - len(vals_r)
            diff = len(cell_l) - len(cell_r)
            if diff > 0:
                # add padding to right side if needed
                if vals_diff < 0:
                    diff += vals_diff
                cell_r.extend([''] * max(diff, 0))
            elif diff < 0:
                # add padding to left side if needed
                if vals_diff > 0:
                    diff += vals_diff
                cell_l.extend([''] * max(-diff, 0))
            cell_l.extend(vals_l)
            cell_r.extend(vals_r)

        for i in range(max(len(cell_l), len(cell_r))):
            if i >= len(cell_l):
                cell_l.append(width_l * ' ')
            elif i >= len(cell_r):
                cell_r.append('')
            fmt_width = width_l - self._get_text_width(cell_l[i]) \
                        + len(cell_l[i])
            txt += '%-*s|%s%s' % (fmt_width, cell_l[i], cell_r[i], '\n')
        if big:
            txt += sep
            for name, value in big:
                txt += '\n'.join(['', name + ':', value, '', ''])
        txt += sep
        return txt

    def _format_time_field(self, value, format):
        tzinfo = get_timezone(self.config.get('trac', 'default_timezone'))
        return format_date_or_datetime(format, value, tzinfo=tzinfo) \
               if value else ''

    def _diff_cc(self, old, new):
        oldcc = self.addrsep_re.split(old)
        newcc = self.addrsep_re.split(new)
        added = [self._format_author(x)
                 for x in newcc if x and x not in oldcc]
        removed = [self._format_author(x)
                   for x in oldcc if x and x not in newcc]
        return added, removed

    def _get_text_width(self, text):
        return text_width(text, ambiwidth=self.ambiwidth)

    def _get_from_email(self, event):
        from_email = get_from_author(self.env, event)
        if from_email and isinstance(from_email, tuple):
            from_email = from_email[1]
        if not from_email:
            from_email = self.config.get('notification', 'smtp_from') or \
                         self.config.get('notification', 'smtp_replyto')
        return from_email

    def _get_message_id(self, targetid, from_email, modtime, more=None):
        return create_message_id(self.env, targetid, from_email, modtime, more)

    def decorate_message(self, event, message, charset):
        if event.realm != 'ticket':
            return
        from_email = self._get_from_email(event)
        if event.category == 'batchmodify':
            tickets = sort_tickets_by_priority(self.env, event.target)
            subject = self._format_subj_batchmodify(tickets)
            targetid = ','.join(map(str, tickets))
            msgid = self._get_message_id(targetid, from_email, event.time)
        else:
            subject = self._format_subj(event)
            ticket = event.target
            targetid = '%08d' % ticket.id
            more = ticket['reporter'] or ''
            msgid = self._get_message_id(targetid, from_email, None, more)
            url = self.env.abs_href.ticket(ticket.id)
            if event.category != 'created':
                set_header(message, 'In-Reply-To', msgid)
                set_header(message, 'References', msgid)
                msgid = self._get_message_id(targetid, from_email, event.time,
                                             more)
                cnum = ticket.get_comment_number(event.time)
                if cnum is not None:
                    url += '#comment:%d' % cnum
            set_header(message, 'X-Trac-Ticket-ID', ticket.id)
            set_header(message, 'X-Trac-Ticket-URL', url)
            # When owner, reporter and updater are listed in the Cc header,
            # move the address to To header.
            if NotificationSystem(self.env).use_public_cc:
                to_addrs = set()
                matcher = RecipientMatcher(self.env)
                for rcpt in ticket['owner'], ticket['reporter'], event.author:
                    rcpt = matcher.match_recipient(rcpt)
                    if not rcpt:
                        continue
                    addr = rcpt[2]
                    if addr:
                        to_addrs.add(addr)
                if to_addrs:
                    cc_addrs = get_message_addresses(message, 'Cc')
                    to_addrs &= set(addr for name, addr in cc_addrs)
                if to_addrs:
                    cc_addrs = [(name, addr) for name, addr in cc_addrs
                                             if addr not in to_addrs]
                    if cc_addrs:
                        set_header(message, 'Cc', addresses=cc_addrs)
                    elif 'Cc' in message:
                        del message['Cc']
                    set_header(message, 'To', addresses=to_addrs)
        set_header(message, 'Subject', subject)
        set_header(message, 'Message-ID', msgid)


class TicketOwnerSubscriber(Component):
    """Allows ticket owners to subscribe to their tickets."""

    implements(INotificationSubscriber)

    def matches(self, event):
        owners = None
        if _is_ticket_change_event(event):
            owners = [event.target['owner']]
            # Harvest previous owner
            if 'fields' in event.changes and 'owner' in event.changes['fields']:
                owners.append(event.changes['fields']['owner']['old'])
        return _ticket_change_subscribers(self, owners)

    def description(self):
        return _("Ticket that I own is created or modified")

    def default_subscriptions(self):
        klass = self.__class__.__name__
        return NotificationSystem(self.env).default_subscriptions(klass)

    def requires_authentication(self):
        return True


class TicketUpdaterSubscriber(Component):
    """Allows updaters to subscribe to their own updates."""

    implements(INotificationSubscriber)

    def matches(self, event):
        updater = None
        if _is_ticket_change_event(event):
            updater = event.author
        return _ticket_change_subscribers(self, updater)

    def description(self):
        return _("I update a ticket")

    def default_subscriptions(self):
        klass = self.__class__.__name__
        return NotificationSystem(self.env).default_subscriptions(klass)

    def requires_authentication(self):
        return True


class TicketPreviousUpdatersSubscriber(Component):
    """Allows subscribing to future changes simply by updating a ticket."""

    implements(INotificationSubscriber)

    def matches(self, event):
        updaters = None
        if _is_ticket_change_event(event):
            updaters = [author for author, in self.env.db_query("""
                SELECT DISTINCT author FROM ticket_change
                WHERE ticket=%s
                """, (event.target.id,))
                        if author != event.author]
        return _ticket_change_subscribers(self, updaters)

    def description(self):
        return _("Ticket that I previously updated is modified")

    def default_subscriptions(self):
        klass = self.__class__.__name__
        return NotificationSystem(self.env).default_subscriptions(klass)

    def requires_authentication(self):
        return True


class TicketReporterSubscriber(Component):
    """Allows the users to subscribe to tickets that they report."""

    implements(INotificationSubscriber)

    def matches(self, event):
        reporter = None
        if _is_ticket_change_event(event):
            reporter = event.target['reporter']
        return _ticket_change_subscribers(self, reporter)

    def description(self):
        return _("Ticket that I reported is modified")

    def default_subscriptions(self):
        klass = self.__class__.__name__
        return NotificationSystem(self.env).default_subscriptions(klass)

    def requires_authentication(self):
        return True


class NewTicketSubscriber(Component):
    """Allows the users to subscribe to new tickets."""

    implements(INotificationSubscriber)

    # INotificationSubscriber methods

    def matches(self, event):
        if event.realm != 'ticket' or event.category != 'created':
            return

        klass = self.__class__.__name__
        for s in Subscription.find_by_class(self.env, klass):
            yield s.subscription_tuple()

    def description(self):
        return _("Any ticket is created")

    def default_subscriptions(self):
        return []

    def requires_authentication(self):
        return False


class CarbonCopySubscriber(Component):
    """Carbon copy subscriber for cc ticket field."""

    implements(INotificationSubscriber)

    def matches(self, event):
        cc_users = None
        if _is_ticket_change_event(event):
            # CC field is stored as comma-separated string. Parse to set.
            chrome = Chrome(self.env)
            to_set = lambda cc: set(chrome.cc_list(cc))
            cc_users = to_set(event.target['cc'] or '')

            # Harvest previous CC field
            if 'fields' in event.changes and 'cc' in event.changes['fields']:
                cc_users.update(to_set(event.changes['fields']['cc']['old']))

            # Get members of permission groups
            groups = PermissionSystem(self.env).get_groups_dict()
            for cc in sorted(cc_users):
                if cc in groups:
                    cc_users.discard(cc)
                    cc_users.update(groups[cc])

        return _ticket_change_subscribers(self, cc_users)

    def description(self):
        return _("Ticket that I'm listed in the CC field is modified")

    def default_subscriptions(self):
        klass = self.__class__.__name__
        return NotificationSystem(self.env).default_subscriptions(klass)

    def requires_authentication(self):
        return True


class TicketAttachmentNotifier(Component):
    """Sends notification on attachment change."""

    implements(IAttachmentChangeListener)

    # IAttachmentChangeListener methods

    def attachment_added(self, attachment):
        self._notify_attachment(attachment, 'attachment added',
                                attachment.date)

    def attachment_deleted(self, attachment):
        self._notify_attachment(attachment, 'attachment deleted', None)

    def attachment_reparented(self, attachment, old_parent_realm,
                              old_parent_id):
        pass

    # Internal methods

    def _notify_attachment(self, attachment, category, time):
        resource = attachment.resource.parent
        if resource.realm != 'ticket':
            return
        ticket = Ticket(self.env, resource.id)
        event = TicketChangeEvent(category, ticket, time, ticket['reporter'],
                                  attachment=attachment)
        try:
            NotificationSystem(self.env).notify(event)
        except Exception as e:
            self.log.error("Failure sending notification when %s for "
                           "attachment '%s' to ticket #%s: %s",
                           category, attachment.filename, ticket.id,
                           exception_to_unicode(e))


def _is_ticket_change_event(event):
    return event.realm == 'ticket' and \
           event.category in ('created', 'changed', 'attachment added',
                              'attachment deleted')


def _ticket_change_subscribers(subscriber, candidates):
    if not candidates:
        return
    if not isinstance(candidates, (list, set, tuple)):
        candidates = [candidates]

    matcher = RecipientMatcher(subscriber.env)
    klass = subscriber.__class__.__name__
    sids = set()
    for candidate in candidates:
        recipient = matcher.match_recipient(candidate)
        if not recipient:
            continue
        sid, auth, addr = recipient

        # Default subscription
        for s in subscriber.default_subscriptions():
            yield s[0], s[1], sid, auth, addr, s[2], s[3], s[4]
        if sid:
            sids.add((sid, auth))

    for s in Subscription.find_by_sids_and_class(subscriber.env, sids, klass):
        yield s.subscription_tuple()


def _template_from_string(string):
    return jinja2template(string, text=True, line_statement_prefix=None,
                          line_comment_prefix=None)