edgewall/trac

View on GitHub
trac/ticket/default_workflow.py

Summary

Maintainability
F
4 days
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) 2006-2023 Edgewall Software
# Copyright (C) 2006 Alec Thomas
# Copyright (C) 2007 Eli Carter
# Copyright (C) 2007 Christian Boos <cboos@edgewall.org>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Eli Carter

import io
from configparser import ParsingError, RawConfigParser
from collections import defaultdict
from functools import partial
from pkg_resources import resource_filename

from trac.api import IEnvironmentSetupParticipant
from trac.config import ConfigSection, Configuration, ConfigurationError
from trac.core import *
from trac.perm import PermissionCache, PermissionSystem
from trac.resource import ResourceNotFound
from trac.ticket.api import ITicketActionController, TicketSystem
from trac.ticket.model import Component as TicketComponent, Resolution
from trac.util import exception_to_unicode, get_reporter_id, sub_val, to_list
from trac.util.html import tag
from trac.util.presentation import separated
from trac.util.translation import _, tag_, cleandoc_
from trac.versioncontrol.api import RepositoryManager
from trac.web.chrome import Chrome, add_script, add_script_data
from trac.wiki.formatter import MacroError, ProcessorError
from trac.wiki.macros import WikiMacroBase, parse_args


# -- Utilities for the ConfigurableTicketWorkflow

def parse_workflow_config(rawactions):
    """Given a list of options from [ticket-workflow]"""

    required_attrs = {
        'oldstates': [],
        'newstate': '',
        'name': '',
        'label': '',
        'default': 0,
        'operations': [],
        'permissions': [],
    }
    optional_attrs = {
        'set_owner': [],
        'set_resolution': [],
    }
    known_attrs = required_attrs.copy()
    known_attrs.update(optional_attrs)

    actions = defaultdict(dict)
    for option, value in rawactions:
        parts = option.split('.')
        name = parts[0]
        if len(parts) == 1:
            try:
                # Base name, of the syntax: old,states,here -> newstate
                oldstates, newstate = [x.strip() for x in value.split('->')]
            except ValueError:
                continue  # Syntax error, a warning will be logged later
            actions[name]['oldstates'] = to_list(oldstates)
            actions[name]['newstate'] = newstate
        else:
            attribute = parts[1]
            if attribute not in known_attrs or \
                    isinstance(known_attrs[attribute], str):
                actions[name][attribute] = value
            elif isinstance(known_attrs[attribute], int):
                actions[name][attribute] = int(value)
            elif isinstance(known_attrs[attribute], list):
                actions[name][attribute] = to_list(value)

    for action, attributes in actions.items():
        if 'label' not in attributes:
            if 'name' in attributes:  # backwards-compatibility, #11828
                attributes['label'] = attributes['name']
            else:
                attributes['label'] = action.replace("_", " ").strip()
        for key, val in required_attrs.items():
            attributes.setdefault(key, val)
        for val in ('<none>', '< none >'):
            sub_val(attributes['oldstates'], val, None)

    return actions


def get_workflow_config(config):
    """Usually passed self.config, this will return the parsed ticket-workflow
    section.
    """
    raw_actions = list(config.options('ticket-workflow'))
    actions = parse_workflow_config(raw_actions)
    return actions


def load_workflow_config_snippet(config, filename):
    """Loads the ticket-workflow section from the given file (expected to be in
    the 'workflows' tree) into the provided config.
    """
    filename = resource_filename('trac.ticket', 'workflows/%s' % filename)
    new_config = Configuration(filename)
    for name, value in new_config.options('ticket-workflow'):
        config.set('ticket-workflow', name, value)


class ConfigurableTicketWorkflow(Component):
    """Ticket action controller which provides actions according to a
    workflow defined in trac.ini.

    The workflow is defined in the `[ticket-workflow]` section of the
    [wiki:TracIni#ticket-workflow-section trac.ini] configuration file.
    """

    implements(IEnvironmentSetupParticipant, ITicketActionController)

    ticket_workflow_section = ConfigSection('ticket-workflow',
        """The workflow for tickets is controlled by plugins. By default,
        there's only a `ConfigurableTicketWorkflow` component in charge.
        That component allows the workflow to be configured via this section
        in the `trac.ini` file. See TracWorkflow for more details.
        """)

    operations = ('del_owner', 'set_owner', 'set_owner_to_self',
                  'may_set_owner', 'set_resolution', 'del_resolution',
                  'leave_status', 'reset_workflow')

    def __init__(self):
        self.actions = self.get_all_actions()
        self.log.debug('Workflow actions at initialization: %s\n',
                       self.actions)

    # IEnvironmentSetupParticipant methods

    def environment_created(self):
        """When an environment is created, we provide the basic-workflow,
        unless a ticket-workflow section already exists.
        """
        if 'ticket-workflow' not in self.config.sections():
            load_workflow_config_snippet(self.config, 'basic-workflow.ini')
            self.config.save()
            self.actions = self.get_all_actions()

    def environment_needs_upgrade(self):
        pass

    def upgrade_environment(self):
        pass

    # ITicketActionController methods

    def get_ticket_actions(self, req, ticket):
        """Returns a list of (weight, action) tuples that are valid for this
        request and this ticket."""
        # Get the list of actions that can be performed

        # Determine the current status of this ticket.  If this ticket is in
        # the process of being modified, we need to base our information on the
        # pre-modified state so that we don't try to do two (or more!) steps at
        # once and get really confused.
        ticket_status = ticket._old.get('status', ticket['status'])
        exists = ticket_status is not None
        ticket_owner = ticket._old.get('owner', ticket['owner'])
        author = get_reporter_id(req, 'author')

        resource = ticket.resource
        allowed_actions = []
        for action_name, action_info in self.actions.items():
            operations = action_info['operations']
            newstate = action_info['newstate']
            if 'leave_status' not in operations and newstate == '*':
                # Some other controller handles this operation
                continue
            # Exclude action that is effectively a No-op.
            if len(operations) == 1 and \
                    'set_owner_to_self' in operations and \
                    ticket_owner == author and ticket_status == newstate:
                continue
            oldstates = action_info['oldstates']
            if exists and oldstates == ['*'] or ticket_status in oldstates:
                # This action is valid in this state.  Check permissions.
                if self._is_action_allowed(req, action_info, resource):
                    allowed_actions.append((action_info['default'],
                                            action_name))
        # Append special `_reset` action if status is invalid.
        if exists and '_reset' in self.actions and \
                ticket_status not in TicketSystem(self.env).get_all_status():
            reset = self.actions['_reset']
            if self._is_action_allowed(req, reset, resource):
                allowed_actions.append((reset['default'], '_reset'))
        return allowed_actions

    def _is_action_allowed(self, req, action, resource):
        """Returns `True` if the workflow action is allowed for the `resource`.
        """
        perm_cache = req.perm(resource)
        required_perms = action['permissions']
        if required_perms:
            for permission in required_perms:
                if permission in perm_cache:
                    break
            else:
                return False
        return True

    def get_all_status(self):
        """Return a list of all states described by the configuration.

        """
        all_status = set()
        for attributes in self.actions.values():
            all_status.update(attributes['oldstates'])
            all_status.add(attributes['newstate'])
        all_status.discard('*')
        all_status.discard('')
        all_status.discard(None)
        return all_status

    def render_ticket_action_control(self, req, ticket, action):

        self.log.debug('render_ticket_action_control: action "%s"', action)

        this_action = self.actions[action]
        label = this_action['label']
        operations = this_action['operations']
        ticket_owner = ticket._old.get('owner', ticket['owner'])
        ticket_status = ticket._old.get('status', ticket['status'])
        next_status = this_action['newstate']
        author = get_reporter_id(req, 'author')
        author_info = partial(Chrome(self.env).authorinfo, req,
                              resource=ticket.resource)
        format_author = partial(Chrome(self.env).format_author, req,
                                resource=ticket.resource)
        formatted_current_owner = author_info(ticket_owner)
        exists = ticket_status is not None

        ticket_system = TicketSystem(self.env)
        control = []  # default to nothing
        hints = []
        if 'reset_workflow' in operations:
            control.append(_("from invalid state"))
            hints.append(_("Current state no longer exists"))
        if 'del_owner' in operations:
            hints.append(_("The ticket will be disowned"))
        if 'set_owner' in operations or 'may_set_owner' in operations:
            owners = self.get_allowed_owners(req, ticket, this_action)

            if 'set_owner' in operations:
                default_owner = author
            elif 'may_set_owner' in operations:
                if not exists:
                    default_owner = ticket_system.default_owner
                else:
                    default_owner = ticket_owner or None
                if owners is not None and default_owner not in owners:
                    owners.insert(0, default_owner)
            else:
                # Protect against future modification for case that another
                # operation is added to the outer conditional
                raise AssertionError(operations)

            id = 'action_%s_reassign_owner' % action

            if not owners:
                owner = req.args.get(id, default_owner)
                control.append(
                    tag_("to %(owner)s",
                         owner=tag.input(type='text', id=id, name=id,
                                         value=owner)))
                if not exists or ticket_owner is None:
                    hints.append(_("The owner will be the specified user"))
                else:
                    hints.append(tag_("The owner will be changed from "
                                      "%(current_owner)s to the specified "
                                      "user",
                                      current_owner=formatted_current_owner))
            elif len(owners) == 1:
                owner = tag.input(type='hidden', id=id, name=id,
                                  value=owners[0])
                formatted_new_owner = author_info(owners[0])
                control.append(tag_("to %(owner)s",
                                    owner=tag(formatted_new_owner, owner)))
                if not exists or ticket_owner is None:
                    hints.append(tag_("The owner will be %(new_owner)s",
                                      new_owner=formatted_new_owner))
                elif ticket['owner'] != owners[0]:
                    hints.append(tag_("The owner will be changed from "
                                      "%(current_owner)s to %(new_owner)s",
                                      current_owner=formatted_current_owner,
                                      new_owner=formatted_new_owner))
            else:
                selected_owner = req.args.get(id, default_owner)
                control.append(tag_("to %(owner)s", owner=tag.select(
                    [tag.option(text, value=value if value is not None else '',
                                selected=(value == selected_owner or None))
                     for text, value in sorted((format_author(owner), owner)
                                                for owner in owners)],
                    id=id, name=id)))
                if not exists or ticket_owner is None:
                    hints.append(_("The owner will be the selected user"))
                else:
                    hints.append(tag_("The owner will be changed from "
                                      "%(current_owner)s to the selected user",
                                      current_owner=formatted_current_owner))
        elif 'set_owner_to_self' in operations:
            formatted_author = author_info(author)
            if not exists or ticket_owner is None:
                hints.append(tag_("The owner will be %(new_owner)s",
                                  new_owner=formatted_author))
            elif ticket_owner != author:
                hints.append(tag_("The owner will be changed from "
                                  "%(current_owner)s to %(new_owner)s",
                                  current_owner=formatted_current_owner,
                                  new_owner=formatted_author))
            elif ticket_status != next_status:
                hints.append(tag_("The owner will remain %(current_owner)s",
                                  current_owner=formatted_current_owner))
        if 'set_resolution' in operations:
            resolutions = [r.name for r in Resolution.select(self.env)]
            if 'set_resolution' in this_action:
                valid_resolutions = set(resolutions)
                resolutions = this_action['set_resolution']
                if any(x not in valid_resolutions for x in resolutions):
                    raise ConfigurationError(_(
                        "Your workflow attempts to set a resolution but uses "
                        "undefined resolutions (configuration issue, please "
                        "contact your Trac admin)."))
            if not resolutions:
                raise ConfigurationError(_(
                    "Your workflow attempts to set a resolution but none is "
                    "defined (configuration issue, please contact your Trac "
                    "admin)."))
            id = 'action_%s_resolve_resolution' % action
            if len(resolutions) == 1:
                resolution = tag.input(type='hidden', id=id, name=id,
                                       value=resolutions[0])
                control.append(tag_("as %(resolution)s",
                                    resolution=tag(resolutions[0],
                                                   resolution)))
                hints.append(tag_("The resolution will be set to %(name)s",
                                  name=resolutions[0]))
            else:
                selected_option = req.args.get(id,
                                               ticket_system.default_resolution)
                control.append(tag_("as %(resolution)s",
                                    resolution=tag.select(
                    [tag.option(x, value=x,
                                selected=(x == selected_option or None))
                     for x in resolutions],
                    id=id, name=id)))
                hints.append(_("The resolution will be set"))
        if 'del_resolution' in operations:
            hints.append(_("The resolution will be deleted"))
        if 'leave_status' in operations:
            if len(operations) == 1:
                control.append(tag_("as %(status)s", status=ticket_status))
                hints.append(tag_("The owner will remain %(current_owner)s",
                                  current_owner=formatted_current_owner)
                             if ticket_owner else
                             _("The ticket will remain with no owner"))
        elif ticket['status'] is None:  # New ticket
            hints.append(tag_("The status will be '%(name)s'",
                              name=next_status))
        elif next_status != ticket_status:
            hints.append(tag_("Next status will be '%(name)s'",
                              name=next_status))
        return (label, tag(separated(control, ' ')),
                tag(separated(hints, '. ', '.') if hints else ''))

    def get_ticket_changes(self, req, ticket, action):
        this_action = self.actions[action]

        # Enforce permissions
        if not self._is_action_allowed(req, this_action, ticket.resource):
            # The user does not have any of the listed permissions, so we won't
            # do anything.
            return {}

        updated = {}
        # Status changes
        status = this_action['newstate']
        if status != '*':
            updated['status'] = status

        for operation in this_action['operations']:
            if operation == 'del_owner':
                updated['owner'] = ''
            elif operation in ('set_owner', 'may_set_owner'):
                set_owner = this_action.get('set_owner')
                newowner = req.args.get('action_%s_reassign_owner' % action,
                                        set_owner[0] if set_owner else '')
                # If there was already an owner, we get a list, [new, old],
                # but if there wasn't we just get new.
                if type(newowner) == list:
                    newowner = newowner[0]
                updated['owner'] = self._sub_owner_keyword(newowner, ticket)
            elif operation == 'set_owner_to_self':
                updated['owner'] = get_reporter_id(req, 'author')
            elif operation == 'del_resolution':
                updated['resolution'] = ''
            elif operation == 'set_resolution':
                set_resolution = this_action.get('set_resolution')
                newresolution = req.args.get('action_%s_resolve_resolution'
                                             % action,
                                             set_resolution[0]
                                             if set_resolution else '')
                updated['resolution'] = newresolution

            # reset_workflow is just a no-op here, so we don't look for it.
            # leave_status is just a no-op here, so we don't look for it.

        # Set owner to component owner for 'new' ticket if:
        #  - ticket doesn't exist and owner is < default >
        #  - component is changed
        #  - owner isn't explicitly changed
        #  - ticket owner is equal to owner of previous component
        #  - new component has an owner
        if not ticket.exists and 'owner' not in updated:
            updated['owner'] = self._sub_owner_keyword(ticket['owner'], ticket)
        elif ticket['status'] == 'new' and \
                'component' in ticket.values and \
                'component' in ticket._old and \
                'owner' not in updated:
            try:
                old_comp = TicketComponent(self.env, ticket._old['component'])
            except ResourceNotFound:
                # If the old component has been removed from the database
                # we just leave the owner as is.
                pass
            else:
                old_owner = old_comp.owner or ''
                current_owner = ticket['owner'] or ''
                if old_owner == current_owner:
                    new_comp = TicketComponent(self.env, ticket['component'])
                    if new_comp.owner:
                        updated['owner'] = new_comp.owner

        return updated

    def apply_action_side_effects(self, req, ticket, action):
        pass

    # Public methods (for other ITicketActionControllers that want to use
    #                 our config file and provide an operation for an action)

    def get_all_actions(self):
        actions = parse_workflow_config(self.ticket_workflow_section.options())

        has_new_state = any('new' in [a['newstate']] + a['oldstates']
                            for a in actions.values())
        if has_new_state:
            # Special action that gets enabled if the current status no
            # longer exists, as no other action can then change its state.
            # (#5307/#11850)
            reset = {
                'default': 0,
                'label': 'Reset',
                'newstate': 'new',
                'oldstates': [],
                'operations': ['reset_workflow'],
                'permissions': ['TICKET_ADMIN']
            }
            for key, val in reset.items():
                actions['_reset'].setdefault(key, val)

        for name, info in actions.items():
            if not info['newstate']:
                self.log.warning("Ticket workflow action '%s' doesn't define "
                                 "any transitions", name)
        return actions

    def get_actions_by_operation(self, operation):
        """Return a list of all actions with a given operation
        (for use in the controller's get_all_status())
        """
        actions = [(info['default'], action) for action, info
                   in self.actions.items()
                   if operation in info['operations']]
        return actions

    def get_actions_by_operation_for_req(self, req, ticket, operation):
        """Return list of all actions with a given operation that are valid
        in the given state for the controller's get_ticket_actions().

        If state='*' (the default), all actions with the given operation are
        returned.
        """
        # Be sure to look at the original status.
        status = ticket._old.get('status', ticket['status'])
        actions = [(info['default'], action)
                   for action, info in self.actions.items()
                   if operation in info['operations'] and
                      ('*' in info['oldstates'] or
                       status in info['oldstates']) and
                      self._is_action_allowed(req, info, ticket.resource)]
        return actions

    # Public methods

    def get_allowed_owners(self, req, ticket, action):
        """Returns users listed in the `set_owner` field of the action or
        possessing the `TICKET_MODIFY` permission if `set_owner` is not
        specified.

        This method can be overridden in a subclasses in order to
        customize the list of users that populate the assign-to select
        box.

        :since: 1.3.2
        """
        if 'set_owner' in action:
            return self._to_users(action['set_owner'], ticket)
        elif TicketSystem(self.env).restrict_owner:
            users = PermissionSystem(self.env)\
                    .get_users_with_permission('TICKET_MODIFY')
            cache = partial(PermissionCache, self.env, resource=ticket.resource)
            return sorted(u for u in users
                            if 'TICKET_MODIFY' in cache(username=u))

    # Internal methods

    def _sub_owner_keyword(self, owner, ticket):
        """Substitute keywords from the default_owner field.

        < default > -> component owner
        """
        if owner in ('< default >', '<default>'):
            default_owner = ''
            if ticket['component']:
                try:
                    component = TicketComponent(self.env, ticket['component'])
                except ResourceNotFound:
                    pass  # No such component exists
                else:
                    default_owner = component.owner  # May be empty
            return default_owner
        return owner

    def _to_users(self, users_perms_and_groups, ticket):
        """Finds all users contained in the list of `users_perms_and_groups`
        by recursive lookup of users when a `group` is encountered.
        """
        ps = PermissionSystem(self.env)
        groups = ps.get_groups_dict()

        def append_owners(users_perms_and_groups):
            for user_perm_or_group in users_perms_and_groups:
                if user_perm_or_group == 'authenticated':
                    owners.update({u[0] for u in self.env.get_known_users()})
                elif user_perm_or_group.isupper():
                    perm = user_perm_or_group
                    for user in ps.get_users_with_permission(perm):
                        if perm in PermissionCache(self.env, user,
                                                   ticket.resource):
                            owners.add(user)
                elif user_perm_or_group not in groups:
                    owners.add(user_perm_or_group)
                else:
                    append_owners(groups[user_perm_or_group])

        owners = set()
        append_owners(users_perms_and_groups)

        return sorted(owners)


class WorkflowMacro(WikiMacroBase):
    _domain = 'messages'
    _description = cleandoc_(
    """Render a workflow graph.

    This macro accepts a TracWorkflow configuration and renders the states
    and transitions as a directed graph. If no parameters are given, the
    current ticket workflow is rendered.

    In [WikiProcessors WikiProcessor] mode the `width` and `height`
    arguments can be specified (Defaults: `width = 800` and `height = 600`).

    The repository-scoped path of a workflow file can be specified as either
    a macro or !WikiProcessor `file` argument. The file revision can be
    specified by appending `@<rev>` to the path. The `file` argument value
    must be enclosed in single or double quotes. //(Since 1.3.2)//.

    Examples:
    {{{
    [[Workflow()]]

    [[Workflow(go = here -> there; return = there -> here)]]

    [[Workflow(file=/contrib/workflow/enterprise-workflow.ini@1)]]

    {{{#!Workflow file="/contrib/workflow/enterprise-workflow.ini"
    }}}

    {{{#!Workflow width=700 height=700
    leave = * -> *
    leave.operations = leave_status
    leave.default = 1

    create = <none> -> new
    create.default = 1

    create_and_assign = <none> -> assigned
    create_and_assign.label = assign
    create_and_assign.permissions = TICKET_MODIFY
    create_and_assign.operations = may_set_owner

    accept = new,assigned,accepted,reopened -> accepted
    accept.permissions = TICKET_MODIFY
    accept.operations = set_owner_to_self

    resolve = new,assigned,accepted,reopened -> closed
    resolve.permissions = TICKET_MODIFY
    resolve.operations = set_resolution

    reassign = new,assigned,accepted,reopened -> assigned
    reassign.permissions = TICKET_MODIFY
    reassign.operations = set_owner

    reopen = closed -> reopened
    reopen.permissions = TICKET_CREATE
    reopen.operations = del_resolution
    }}}
    }}}
    """)

    def expand_macro(self, formatter, name, content, args=None):
        if content is not None:
            content = content.strip()
        if not args and not content:
            raw_actions = self.config.options('ticket-workflow')
        else:
            is_macro = args is None
            if is_macro:
                kwargs = parse_args(content)[1]
                file = kwargs.get('file')
            else:
                file = args.get('file')
                if not file and not content:
                    raise ProcessorError("Invalid argument(s).")

            if file:
                text = RepositoryManager(self.env).read_file_by_path(file)
                if text is None:
                    raise ProcessorError(
                        tag_("The file %(file)s does not exist.",
                             file=tag.code(file)))
            elif is_macro:
                text = '\n'.join(line.lstrip() for line in content.split(';'))
            else:
                text = content

            if '[ticket-workflow]' not in text:
                text = '[ticket-workflow]\n' + text
            parser = RawConfigParser()
            try:
                parser.readfp(io.StringIO(text))
            except ParsingError as e:
                if is_macro:
                    raise MacroError(exception_to_unicode(e)) from e
                else:
                    raise ProcessorError(exception_to_unicode(e)) from e
            raw_actions = list(parser.items('ticket-workflow'))
        actions = parse_workflow_config(raw_actions)
        states = list(
            {state for action in actions.values()
                   for state in action['oldstates']} |
            {action['newstate'] for action in actions.values()})
        action_labels = [attrs['label'] for attrs in actions.values()]
        action_names = list(actions)
        edges = []
        for name, action in actions.items():
            new_index = states.index(action['newstate'])
            name_index = action_names.index(name)
            for old_state in action['oldstates']:
                old_index = states.index(old_state)
                edges.append((old_index, new_index, name_index))

        args = args or {}
        width = args.get('width', 800)
        height = args.get('height', 600)
        graph = {'nodes': states, 'actions': action_labels, 'edges': edges,
                 'width': width, 'height': height}
        graph_id = '%012x' % id(graph)
        req = formatter.req
        add_script(req, 'common/js/workflow_graph.js')
        add_script_data(req, {'graph_%s' % graph_id: graph})
        return tag(
            tag.div('', class_='trac-workflow-graph trac-noscript',
                    id='trac-workflow-graph-%s' % graph_id,
                    style="display:inline-block;width:%spx;height:%spx" %
                          (width, height)),
            tag.noscript(
                tag.div(_("Enable JavaScript to display the workflow graph."),
                        class_='system-message')))