Shoobx/shoobx.wfmc

View on GitHub
src/shoobx/wfmc/xpdl.py

Summary

Maintainability
F
3 days
Test Coverage
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XPDL reader for process definitions
"""
from __future__ import absolute_import
import logging
import sys
import xml.sax
import xml.sax.xmlreader
import xml.sax.handler

import shoobx.wfmc.process
from zope import interface
from shoobx.wfmc import interfaces

xpdlns10 = "http://www.wfmc.org/2002/XPDL1.0"
xpdlns21 = "http://www.wfmc.org/2008/XPDL2.1"

RAISE_ON_DUPLICATE_ERROR = False

log = logging.getLogger(__name__)


class HandlerError(Exception):

    def __init__(self, orig, tag, locator):
        self.orig = orig
        self.tag = tag
        self.xml = locator.getSystemId()
        self.line = locator.getLineNumber()

    def __repr__(self):
        return ('%r\nFile "%s", line %s. in %s'
                % (self.orig, self.xml, self.line, self.tag))

    def __str__(self):
        return ('%s\nFile "%s", line %s. in %s'
                % (self.orig, self.xml, self.line, self.tag))

    def __call__(self):
        return self


class Package(dict):

    def __init__(self):
        self.applications = {}
        self.participants = {}
        self.pools = {}
        self.script = None
        self.parseErrors = []

    def defineApplications(self, **applications):
        for id, application in applications.items():
            application.id = id
            self.applications[id] = application

    def definePools(self, **pools):
        for id, pool in pools.items():
            pool.id = id
            self.pools[id] = pool

    def defineParticipants(self, **participants):
        for id, participant in participants.items():
            participant.id = id
            self.participants[id] = participant

    def addScript(self, script):
        self.script = script


@interface.implementer(interfaces.IPoolDefinition)
class PoolDefinition:

    def __init__(self, name=None, process_def=None):
        self.__name__ = name
        self.process_def = process_def
        self.lanes = {}

    def defineLanes(self, **lanes):
        for id, lane in lanes.items():
            lane.id = id
            self.lanes[id] = lane

    def __repr__(self):
        return "Pool(%r, %r)" % (self.__name__, self.process_def)


@interface.implementer(interfaces.ILaneDefinition)
class LaneDefinition:

    def __init__(self, name=None):
        self.__name__ = name
        self.performers = ()

    def definePerformer(self, performer):
        self.performers += (performer,)

    def __repr__(self):
        return "Lane(%r)" % (self.__name__, )


class DeadlineDefinition(object):
    def __init__(self, duration, act_def, exceptionName=None,
                 execution='SYNCHR'):
        self.duration = duration
        self.exceptionName = exceptionName
        self.execution = execution

        # The definition ID must be the list index, since there is no other
        # uniquely identifying information (0 for the first, etc.)
        self.id = len(act_def.deadlines)
        if self.id > 0:
            raise NotImplementedError(
                'Current implementation only supports one deadline per activity'
            )
        act_def.deadlines.append(self)


class XPDLHandler(xml.sax.handler.ContentHandler):

    start_handlers = {}
    end_handlers = {}
    text = ''

    ProcessDefinitionFactory = shoobx.wfmc.process.ProcessDefinition
    PoolDefinitionFactory = PoolDefinition
    LaneDefinitionFactory = LaneDefinition
    ParticipantFactory = shoobx.wfmc.process.Participant
    DataFieldFactory = shoobx.wfmc.process.DataField
    ApplicationFactory = shoobx.wfmc.process.Application
    ActivityDefinitionFactory = shoobx.wfmc.process.ActivityDefinition
    TransitionDefinitionFactory = shoobx.wfmc.process.TransitionDefinition
    TextCondition = shoobx.wfmc.process.TextCondition
    DeadlineDefinitionFactory = DeadlineDefinition

    def __init__(self, package):
        self.package = package
        self.stack = []
        self.textstack = []

    @property
    def text(self):
        return self.textstack[-1]

    def startElementNS(self, name, qname, attrs):
        handler = self.start_handlers.get(name)
        if handler:
            try:
                result = handler(self, attrs)
            except:
                raise HandlerError(sys.exc_info()[1], name[1], self.locator)
        else:
            result = None

        if result is None:
            # Just dup the top of the stack
            result = self.stack[-1]

        self.stack.append(result)
        self.textstack.append('')

    def endElementNS(self, name, qname):
        last = self.stack.pop()
        handler = self.end_handlers.get(name)
        if handler:
            try:
                handler(self, last)
            except:
                raise HandlerError(sys.exc_info()[1], name[1], self.locator)

        self.textstack.pop()

    def characters(self, text):
        self.textstack[-1] += text

    def setDocumentLocator(self, locator):
        self.locator = locator

    ######################################################################
    # Application handlers

    # Pointless container elements that we want to "ignore" by having them
    # dupe their containers:
    def Package(self, attrs):
        package = self.package
        package.id = attrs[(None, 'Id')]
        package.__name__ = attrs.get((None, 'Name'))
        return package
    start_handlers[(xpdlns10, 'Package')] = Package
    start_handlers[(xpdlns21, 'Package')] = Package

    def WorkflowProcess(self, attrs):
        id = attrs[(None, 'Id')]
        process = self.ProcessDefinitionFactory(id)
        process.__name__ = attrs.get((None, 'Name'))

        # Copy package data:
        process.defineApplications(**self.package.applications)
        process.defineParticipants(**self.package.participants)

        self.package[id] = process
        return process
    start_handlers[(xpdlns10, 'WorkflowProcess')] = WorkflowProcess
    start_handlers[(xpdlns21, 'WorkflowProcess')] = WorkflowProcess

    parameter_types = {
        'IN': shoobx.wfmc.process.InputParameter,
        'OUT': shoobx.wfmc.process.OutputParameter,
        'INOUT': shoobx.wfmc.process.InputOutputParameter,
        }

    def FormalParameter(self, attrs):
        mode = attrs.get((None, 'Mode'), 'IN')
        id = attrs[(None, 'Id')]
        parameter = self.parameter_types[mode](id)
        self.stack[-1].defineParameters(parameter)
        return parameter
    start_handlers[(xpdlns10, 'FormalParameter')] = FormalParameter
    start_handlers[(xpdlns21, 'FormalParameter')] = FormalParameter

    def Pool(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        process_def = attrs.get((None, 'Process'))
        pool = self.PoolDefinitionFactory(name, process_def)
        self.stack[-1].definePools(**{str(id): pool})
        return pool
    start_handlers[(xpdlns10, 'Pool')] = Pool
    start_handlers[(xpdlns21, 'Pool')] = Pool

    def Lane(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        lane = self.LaneDefinitionFactory(name)
        self.stack[-1].defineLanes(**{str(id): lane})
        return lane
    start_handlers[(xpdlns10, 'Lane')] = Lane
    start_handlers[(xpdlns21, 'Lane')] = Lane

    def Participant(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        participant = self.ParticipantFactory(name)
        self.stack[-1].defineParticipants(**{str(id): participant})
        return participant
    start_handlers[(xpdlns10, 'Participant')] = Participant
    start_handlers[(xpdlns21, 'Participant')] = Participant

    def ParticipantType(self, attrs):
        tp = attrs.get((None, 'Type'))
        participant = self.stack[-1]
        participant.type = tp
        return participant
    start_handlers[(xpdlns10, 'ParticipantType')] = ParticipantType
    start_handlers[(xpdlns21, 'ParticipantType')] = ParticipantType

    def DataField(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        datafield = self.DataFieldFactory(id, name)
        self.stack[-1].defineDataFields(**{str(id): datafield})
        return datafield
    start_handlers[(xpdlns10, 'DataField')] = DataField
    start_handlers[(xpdlns21, 'DataField')] = DataField

    def initialValue(self, datafield):
        self.stack[-1].initialValue = self.text.strip()
    end_handlers[(xpdlns10, 'InitialValue')] = initialValue
    end_handlers[(xpdlns21, 'InitialValue')] = initialValue

    def Application(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        app = self.ApplicationFactory()
        app.id = id
        if name:
            app.__name__ = name
        return app
    start_handlers[(xpdlns10, 'Application')] = Application
    start_handlers[(xpdlns21, 'Application')] = Application

    def application(self, app):
        self.stack[-1].defineApplications(**{str(app.id): app})
    end_handlers[(xpdlns10, 'Application')] = application
    end_handlers[(xpdlns21, 'Application')] = application

    def description(self, ignored):
        if self.stack[-1] is not None:
            self.stack[-1].description = self.text
    end_handlers[(xpdlns10, 'Description')] = description
    end_handlers[(xpdlns21, 'Description')] = description

    ######################################################################
    # Activity definitions

    def ActivitySet(self, attrs):
        raise NotImplementedError("ActivitySet")
    end_handlers[(xpdlns10, 'ActivitySet')] = ActivitySet
    end_handlers[(xpdlns21, 'ActivitySet')] = ActivitySet

    def Activity(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        activity = self.ActivityDefinitionFactory(name)
        activity.id = id
        self.stack[-1].defineActivities(**{str(id): activity})
        return activity
    start_handlers[(xpdlns10, 'Activity')] = Activity
    start_handlers[(xpdlns21, 'Activity')] = Activity

    def startTool(self, attrs):
        return Tool(attrs[(None, 'Id')])
    start_handlers[(xpdlns10, 'Tool')] = startTool
    start_handlers[(xpdlns21, 'TaskApplication')] = startTool

    def endTool(self, tool):
        self.stack[-1].addApplication(tool.id, tool.parameters)
    end_handlers[(xpdlns10, 'Tool')] = endTool
    end_handlers[(xpdlns21, 'TaskApplication')] = endTool

    def SubFlow(self, attrs):
        return SubFlow(attrs[(None, 'Id')], attrs.get((None, 'Execution')))
    start_handlers[(xpdlns10, 'SubFlow')] = SubFlow
    start_handlers[(xpdlns21, 'SubFlow')] = SubFlow

    def subflow(self, subflow):
        self.stack[-1].addSubflow(subflow.id, subflow.execution,
                                  subflow.parameters)
    end_handlers[(xpdlns10, 'SubFlow')] = subflow
    end_handlers[(xpdlns21, 'SubFlow')] = subflow

    def script(self, script):
        self.stack[-1].addScript(self.text)
    end_handlers[(xpdlns10, 'Script')] = script
    end_handlers[(xpdlns21, 'Script')] = script

    def StartEvent(self, attrs):
        ad = self.stack[-1]
        assert isinstance(ad, shoobx.wfmc.process.ActivityDefinition)
        ad.event = interfaces.START_EVENT
    start_handlers[(xpdlns10, 'StartEvent')] = StartEvent
    start_handlers[(xpdlns21, 'StartEvent')] = StartEvent

    def EndEvent(self, attrs):
        ad = self.stack[-1]
        assert isinstance(ad, shoobx.wfmc.process.ActivityDefinition)
        ad.event = interfaces.END_EVENT
    start_handlers[(xpdlns10, 'EndEvent')] = EndEvent
    start_handlers[(xpdlns21, 'EndEvent')] = EndEvent

    def actualparameter(self, ignored):
        self.stack[-1].parameters += (self.text,)
    end_handlers[(xpdlns10, 'ActualParameter')] = actualparameter
    end_handlers[(xpdlns21, 'ActualParameter')] = actualparameter

    def performer(self, ignored):
        activity_or_lane = self.stack[-1]
        activity_or_lane.definePerformer(self.text.strip())
    end_handlers[(xpdlns10, 'Performer')] = performer
    end_handlers[(xpdlns21, 'Performer')] = performer

    def startDeadline(self, attrs):
        execution = attrs.get((None, 'Execution')) or 'SYNCHR'
        actdef = self.stack[-1]
        self.DeadlineDefinitionFactory(
            None, actdef, exceptionName=None, execution=execution)
    start_handlers[(xpdlns10, 'Deadline')] = startDeadline
    start_handlers[(xpdlns21, 'Deadline')] = startDeadline

    def deadlineDuration(self, actdef):
        duration = self.text.strip()
        actdef.deadlines[-1].duration = duration
    end_handlers[(xpdlns10, 'DeadlineDuration')] = deadlineDuration
    end_handlers[(xpdlns21, 'DeadlineDuration')] = deadlineDuration

    def exceptionName(self, actdef):
        exceptionName = self.text.strip()
        actdef.deadlines[-1].exceptionName = exceptionName
    end_handlers[(xpdlns10, 'ExceptionName')] = exceptionName
    end_handlers[(xpdlns21, 'ExceptionName')] = exceptionName

    def Join(self, attrs):
        Type = attrs.get((None, 'Type'))
        if Type in ('AND', 'Parallel'):
            self.stack[-1].andJoin(True)
    start_handlers[(xpdlns10, 'Join')] = Join
    start_handlers[(xpdlns21, 'Join')] = Join

    def Split(self, attrs):
        Type = attrs.get((None, 'Type'))
        if Type in ('AND', 'Parallel'):
            self.stack[-1].andSplit(True)
    start_handlers[(xpdlns10, 'Split')] = Split
    start_handlers[(xpdlns21, 'Split')] = Split

    def TransitionRef(self, attrs):
        Id = attrs.get((None, 'Id'))
        self.stack[-1].addOutgoing(Id)
    start_handlers[(xpdlns10, 'TransitionRef')] = TransitionRef
    start_handlers[(xpdlns21, 'TransitionRef')] = TransitionRef

    # Activity definitions
    ######################################################################
    def Transition(self, attrs):
        id = attrs[(None, 'Id')]
        name = attrs.get((None, 'Name'))
        from_ = attrs.get((None, 'From'))
        to = attrs.get((None, 'To'))
        transition = self.TransitionDefinitionFactory(from_, to)
        transition.id = id
        transition.__name__ = name
        return transition
    start_handlers[(xpdlns10, 'Transition')] = Transition
    start_handlers[(xpdlns21, 'Transition')] = Transition

    def transition(self, transition):
        self.stack[-1].defineTransitions(transition)
    end_handlers[(xpdlns10, 'Transition')] = transition
    end_handlers[(xpdlns21, 'Transition')] = transition

    def startCondition(self, attrs):
        tp = attrs.get((None, 'Type'), 'CONDITION')
        transdef = self.stack[-1]
        assert isinstance(transdef, self.TransitionDefinitionFactory)

        transdef.type = tp
        condition = self.TextCondition(tp)
        return condition
    start_handlers[(xpdlns10, 'Condition')] = startCondition
    start_handlers[(xpdlns21, 'Condition')] = startCondition

    def endCondition(self, condition):
        assert isinstance(self.stack[-1],
                          self.TransitionDefinitionFactory)

        text = self.text
        condition.set_source('(%s)' % text)
        self.stack[-1].condition = condition
    end_handlers[(xpdlns10, 'Condition')] = endCondition
    end_handlers[(xpdlns21, 'Condition')] = endCondition

    def ExtendedAttributes(self, attrs):
        parent = self.stack[-1]
        if interfaces.IExtendedAttributesContainer.providedBy(parent):
            return parent.attributes

        return {}  # dummy dict that will be discarded
    start_handlers[(xpdlns10, 'ExtendedAttributes')] = ExtendedAttributes
    start_handlers[(xpdlns21, 'ExtendedAttributes')] = ExtendedAttributes

    def ExtendedAttribute(self, attrs):
        container = self.stack[-1]
        name = attrs[(None, 'Name')]
        value = attrs.get((None, 'Value'))
        if name in container:
            msg = ("The Name '{}' is already used, uniqueness violated, "
                   "value: '{}' see: {}.".format(
                    name, value, self.package.id))
            if RAISE_ON_DUPLICATE_ERROR:
                raise KeyError(msg)
            self.package.parseErrors.append(msg)

        container[name] = value
        return container, name
    start_handlers[(xpdlns10, 'ExtendedAttribute')] = ExtendedAttribute
    start_handlers[(xpdlns21, 'ExtendedAttribute')] = ExtendedAttribute

    def extendedAttribute(self, info):
        container, name = info
        if container[name] is None:
            container[name] = self.text.strip()
    end_handlers[(xpdlns10, 'ExtendedAttribute')] = extendedAttribute
    end_handlers[(xpdlns21, 'ExtendedAttribute')] = extendedAttribute


class Tool(object):
    parameters = ()

    def __init__(self, id):
        self.id = id


class SubFlow(object):
    parameters = ()
    execution = interfaces.SYNCHRONOUS

    def __init__(self, id, execution=None):
        self.id = id
        if execution is not None:
            self.execution = execution


def read(file):
    src = xml.sax.xmlreader.InputSource(getattr(file, 'name', '<string>'))
    src.setByteStream(file)
    parser = xml.sax.make_parser()
    package = Package()
    parser.setContentHandler(XPDLHandler(package))
    parser.setFeature(xml.sax.handler.feature_namespaces, True)
    parser.parse(src)
    return package