src/shoobx/wfmc/process.py
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Processes
"""
import logging
import threading
import persistent
import datetime
from datetime import timedelta
import zope.cachedescriptors.property
import zope.event
from collections import OrderedDict
from zope import component, interface
from shoobx.wfmc import interfaces
log = logging.getLogger(__name__)
WFRD_PREFIX = "WFRD_REVERT_"
DEL_MARKER = "WFRD_DEL_MARK_"
EVAL_EXCEPTIONS = (NameError, KeyError, ValueError, TypeError)
def always_true(data):
return True
def defaultDeadlineTimer(process, deadline):
timestamp = deadline.deadline_time
timer = threading.Timer(
(timestamp - datetime.datetime.now()).seconds,
process.deadlinePassedHandler,
args=[deadline]
)
deadline.deadlineTimer = timer
timer.start()
def defaultDeadlineCanceller(process, deadline):
if deadline.deadlineTimer:
deadline.deadlineTimer.cancel()
def getInitialDataFieldsValues(datafields, evaluator, strict=True):
"""Form dictionary of initial datafields values."""
vals = {}
for _id, datafield in datafields.items():
val = None
if datafield.initialValue:
try:
val = evaluator.evaluate(datafield.initialValue)
except interfaces.EvaluateException:
if strict:
raise
continue
vals[_id] = val
return vals
@interface.implementer(interfaces.IProcessDefinitionFactory)
class StaticProcessDefinitionFactory(object):
def __init__(self):
self.definitions = {}
def get(self, name):
"""See IProcessDefinitionFactory.get()
"""
return self.definitions.get(name)
def register(self, pd):
self.definitions[pd.id] = pd
@interface.implementer(interfaces.ITransitionDefinition)
class TransitionDefinition(object):
def __init__(self, from_, to, condition=always_true, id=None,
__name__=None, otherwise=False):
self.id = id
self.from_ = from_
self.to = to
self.condition = condition
self.__name__ = __name__
self.description = None
self.type = 'OTHERWISE' if otherwise else 'CONDITION'
@property
def otherwise(self):
return self.type in ('OTHERWISE', )
def __repr__(self):
return "TransitionDefinition(from=%r, to=%r)" % (self.from_, self.to)
@interface.implementer(interfaces.IProcessDefinition)
class ProcessDefinition(object):
TransitionDefinitionFactory = TransitionDefinition
def __init__(self, id, integration=None):
self.id = id
self.name = id
self.integration = integration
self.activities = {}
self.transitions = []
self.applications = {}
self.participants = {}
self.datafields = {}
self.parameters = ()
self.attributes = OrderedDict()
self.description = None
def getAllActivities(self):
"""
Gets all activities, including subflows
"""
result = self.activities.copy()
for idx, act in self.activities.items():
if act.subflows:
sf = self.obtainSubflow(act)
result.update(sf.definition.getAllActivities())
return result
def obtainSubflow(self, activityDefinition):
""" Make a subflow stub from the subflows of `activityDefinition`
"""
if activityDefinition.subflows:
subflow_name, execution, actual = activityDefinition.subflows[0]
subflow_pd = getProcessDefinition(subflow_name)
return subflow_pd(factory=Process)
def __repr__(self):
return "ProcessDefinition(%r)" % self.id
def defineActivities(self, **activities):
self._dirty()
for id, activity in activities.items():
activity.id = id
if activity.__name__ is None:
activity.__name__ = self.id + '.' + id
activity.process = self
self.activities[id] = activity
def defineTransitions(self, *transitions):
self._dirty()
self.transitions.extend(transitions)
# Compute activity transitions based on transition data:
activities = self.activities
for transition in transitions:
activities[transition.from_].transitionOutgoing(transition)
activities[transition.to].incoming += (transition, )
def defineApplications(self, **applications):
for id, application in applications.items():
application.id = id
self.applications[id] = application
def defineParticipants(self, **participants):
for id, participant in participants.items():
participant.id = id
self.participants[id] = participant
def defineDataFields(self, **datafields):
for id, datafield in datafields.items():
datafield.id = id
self.datafields[id] = datafield
def defineParameters(self, *parameters):
self.parameters += parameters
def _start(self):
# Return an initial transition
activities = self.activities
# Find the start, making sure that there is one and that there
# aren't any activities with no transitions:
start = ()
for aid, activity in activities.items():
if not activity.incoming:
start += ((aid, activity), )
if not activity.outgoing:
raise interfaces.InvalidProcessDefinition(
"Activity %s has no transitions" % aid)
if len(start) != 1:
if start:
raise interfaces.InvalidProcessDefinition(
"Multiple start activities",
[id for (id, a) in start]
)
else:
raise interfaces.InvalidProcessDefinition(
"No start activities")
return self.TransitionDefinitionFactory(None, start[0][0])
_start = zope.cachedescriptors.property.Lazy(_start)
def __call__(self, context=None, factory=None):
if factory is None:
factory = Process
return factory(self, self._start, context)
def _dirty(self):
try:
del self._start
except AttributeError:
pass
@interface.implementer(interfaces.IActivityDefinition,
interfaces.IExtendedAttributesContainer)
class ActivityDefinition(object):
performer = ''
process = None
def __init__(self, __name__=None):
self.__name__ = __name__
self.incoming = self.outgoing = ()
self.transition_outgoing = self.explicit_outgoing = ()
self.applications = ()
self.subflows = ()
self.scripts = ()
self.andJoinSetting = self.andSplitSetting = False
self.description = None
self.attributes = OrderedDict()
self.event = None
self.deadlines = []
def andSplit(self, setting):
self.andSplitSetting = setting
def andJoin(self, setting):
self.andJoinSetting = setting
def addApplication(self, application, actual=()):
app = self.process.applications[application]
formal = app.parameters
if len(formal) != len(actual):
raise TypeError("Wrong number of parameters => "
"Actual=%s, Formal=%s "
"for Application %s with id=%s"
% (actual, formal, app, app.id))
self.applications += ((application, formal, tuple(actual)), )
def addSubflow(self, subflow, execution, parameters):
# Lookup of formal parameters must be delayed, since the subflow might
# not yet be loaded.
self.subflows += ((subflow, execution, parameters),)
def addScript(self, code):
self.scripts += (code,)
def definePerformer(self, performer):
self.performer = performer
def addOutgoing(self, transition_id):
self.explicit_outgoing += (transition_id,)
self.computeOutgoing()
def transitionOutgoing(self, transition):
self.transition_outgoing += (transition,)
self.computeOutgoing()
def computeOutgoing(self):
if self.explicit_outgoing:
transitions = dict([(t.id, t) for t in self.transition_outgoing])
self.outgoing = ()
for tid in self.explicit_outgoing:
transition = transitions.get(tid)
if transition is not None:
self.outgoing += (transition,)
else:
self.outgoing = self.transition_outgoing
def __repr__(self):
return "<ActivityDefinition %r>" % self.__name__
class Deadline(object):
def __init__(self, activity, deadline_time, deadlineDef):
self.activity = activity
self.deadline_time = deadline_time
self.definition = deadlineDef
if deadlineDef.execution != 'SYNCHR':
raise NotImplementedError('Only Synchronous (SYNCHR) deadlines '
'are supported at this point.')
@interface.implementer(interfaces.IActivity)
class Activity(persistent.Persistent):
DeadlineFactory = Deadline
incoming = ()
deadlineTimer = None
now = datetime.datetime.now
def __init__(self, process, definition):
self.process = process
self.activity_definition_identifier = definition.id
self.workitems = {}
self.finishedWorkitems = {}
self.workitemIdSequence = Sequence()
self.active = True
# Didn't want to change the getter, but do want it set from the
# constructor
self._definition = definition
self.id = self.process.activityIdSequence.next()
self.deadlines = []
for deadlineDef in self.definition.deadlines:
deadline = self.digestDeadlineDefinition(process, definition,
deadlineDef)
if deadline is not None:
self.deadlines.append(deadline)
self.process.deadlineTimer(deadline)
self.activity_definition_identifier_path = \
calculateActivityStackPath(self)
self.createWorkItems()
def digestDeadlineDefinition(self, process, definition,
deadlineDef):
evaluator = interfaces.IPythonExpressionEvaluator(self.process)
if not deadlineDef.duration:
log.warning(
'There is an empty deadline time in '
'{} for activity {}.'.format(process, definition.id))
return
try:
evaled = evaluator.evaluate(deadlineDef.duration,
{'timedelta': timedelta,
'datetime': datetime})
except Exception as e:
raise RuntimeError(
'Evaluating the deadline duration failed '
'for activity {}. Error: {}'.format(definition.id, e))
if evaled is None:
log.warning(
'There is an empty deadline time in '
'{} for activity {}.'.format(process, definition.id))
return
if isinstance(evaled, timedelta):
deadline_time = self.now() + evaled
elif isinstance(evaled, datetime.datetime):
deadline_time = evaled
elif isinstance(evaled, int):
deadline_time = self.now() + \
timedelta(seconds=evaled)
else:
raise ValueError(
'Deadline time was not a timedelta, datetime, or integer '
'number of seconds.\n{}'.format(evaled)
)
deadline = self.DeadlineFactory(self, deadline_time, deadlineDef)
return deadline
def getExecutionStack(self):
"""Return list of subflow activities that eventually started the
process of current activity.
The first activity returned by this function will belong to the main
process.
"""
act = self
stack = []
while act.process.starterActivityId:
act = self.process.activities[act.process.starterActivityId]
stack.append(act)
stack.reverse()
return stack
def createWorkItems(self):
workitems = {}
if self.definition.applications:
workitems = self.createApplicationWorkItems()
elif self.definition.subflows:
workitems = self.createSubflowWorkItems()
elif self.definition.scripts:
workitems = self.createScriptWorkItems()
for workitem, application, formal, actual in workitems:
self.addWorkItem(workitem, application, formal, actual)
def addWorkItem(self, workitem, application, formal, actual):
nextid = self.workitemIdSequence.next()
workitem.id = nextid
self.workitems[nextid] = (workitem, application, formal, actual)
def createApplicationWorkItems(self):
integration = self.process.definition.integration
participant = integration.createParticipant(
self, self.process, self.definition.performer)
# Instantiate Applications
for application, formal, actual in self.definition.applications:
workitem = integration.createWorkItem(
participant, self.process, self, application)
yield workitem, application, formal, actual
def definition(self):
try:
return self.process.definition.activities[
self.activity_definition_identifier]
except KeyError:
return self._definition
definition = property(definition)
def createScriptWorkItems(self):
integration = self.process.definition.integration
for code in self.definition.scripts:
workitem = integration.createScriptWorkItem(
self.process, self, code)
yield workitem, "__script__", (), ()
def createSubflowWorkItems(self):
integration = self.process.definition.integration
# Instantiate Subflows
for subflow, execution, actual in self.definition.subflows:
# Figre out formal parameters. At this point, process definition
# has to be available.
subflow_pd = self.process.getSubflowProcessDefinition(subflow)
formal = subflow_pd.parameters
workitem = integration.createSubflowWorkItem(
self.process, self, subflow, execution)
yield workitem, subflow, formal, actual
def start(self, transition):
# Start the activity, if we've had enough incoming transitions
definition = self.definition
if definition.andJoinSetting:
if transition in self.incoming:
raise interfaces.ProcessError(
"Repeated incoming %s with id='%s' "
"while waiting for and completion"
% (transition, transition.id))
self.incoming += (transition, )
if len(self.incoming) < len(definition.incoming):
# Tells us whether or not we need to wait
# for enough transitions at an add-joint, specifically for
# the case where we revert back through the joint and want to
# move forward through it again, and don't expect the other
# transition to happen again
return # not enough incoming yet
zope.event.notify(ActivityStarted(self))
if self.workitems:
evaluator = getEvaluator(self.process)
# We need the list() here to make a copy to
# loop over, as we modify self.workitems in the loop.
for workitem, app, formal, actual in list(self.workitems.values()):
__traceback_info__ = (
workitem, self.activity_definition_identifier)
inputs = evaluateInputs(self.process, formal, actual, evaluator)
args = {n: a for n, a in inputs}
__traceback_info__ = (self.activity_definition_identifier,
workitem, args)
zope.event.notify(WorkItemStarting(workitem, app, actual))
workitem.start(args)
zope.event.notify(WorkItemStarted(workitem, app, actual))
else:
# Since we don't have any work items, we're done
self.finish()
def workItemDiscarded(self, work_item):
unused, app, formal, actual = self.workitems.pop(work_item.id)
self._p_changed = True
zope.event.notify(WorkItemDiscarded(work_item, app, actual))
if not self.workitems:
self.finish()
def workItemFinished(self, work_item, results=None):
try:
unused, app, formal, actual = entry = \
self.workitems.pop(work_item.id)
except KeyError:
raise KeyError(
'Tried to pop workitem id:{} from the workitems dict of {}. '
'Maybe it is already finished: {}'.format(
work_item.id, self, self.finishedWorkitems))
self.finishedWorkitems[work_item.id] = entry
self._p_changed = True
args = results
if not results:
args = {}
res = []
for parameter, name in zip(formal, actual):
if parameter.output:
__traceback_info__ = args, parameter
v = args.get(parameter.__name__)
res.append(v)
if not name:
log.warning("Output parameter {param} of {activity} "
"is not bound to workflow variables".format(
param=parameter, activity=self))
continue
# Remember the old value to restore in case of revert
try:
old_val = getattr(self.process.workflowRelevantData, name)
except AttributeError:
old_val = DEL_MARKER
if v != old_val:
setattr(self.process.applicationRelevantData,
WFRD_PREFIX+str(self.id)+"_"+name,
old_val)
setattr(self.process.workflowRelevantData, name, v)
zope.event.notify(WorkItemFinished(
work_item, app, actual, res))
if not self.workitems:
self.finish()
def finish(self):
self.active = False
zope.event.notify(ActivityFinished(self))
transitions = getValidOutgoingTransitions(self.process, self.definition)
self.process.transition(self, transitions)
self.cancelDeadlines()
def abort(self, cancelDeadlineTimer=True):
if cancelDeadlineTimer:
self.cancelDeadlines()
self.active = False
# Abort all workitems. We need the list() here to make a copy to
# loop over, as we modify self.workitems in the loop.
for workitem, app, formal, actual in list(self.workitems.values()):
if interfaces.IAbortWorkItem.providedBy(workitem):
workitem.abort()
zope.event.notify(WorkItemAborted(workitem, app, actual))
else:
# Just discard the workitem (we cannot abort it)
zope.event.notify(WorkItemDiscarded(workitem, app, actual))
del self.workitems[workitem.id]
zope.event.notify(ActivityAborted(self))
def restoreWFRD(self):
wf_revert_names = [name for name in dir(self.process.applicationRelevantData)
if name.startswith(WFRD_PREFIX+str(self.id)+"_")]
for name in wf_revert_names:
old_val = getattr(self.process.applicationRelevantData, name)
wfname = name.lstrip(WFRD_PREFIX+str(self.id)+"_")
if old_val == DEL_MARKER:
delattr(self.process.workflowRelevantData, wfname)
else:
setattr(self.process.workflowRelevantData, wfname, old_val)
def cancelDeadlines(self):
for deadline in self.deadlines:
self.process.deadlineCanceller(deadline)
def revert(self, cancelDeadlineTimer=True):
reverted_workitems = []
# Revert all finished workitems.
for workitem, app, formal, actual in self.finishedWorkitems.values():
if interfaces.IRevertableWorkItem.providedBy(workitem):
workitem.revert()
reverted_workitems.append(workitem)
# Restore workflowRelevantData
self.restoreWFRD()
if cancelDeadlineTimer:
self.cancelDeadlines()
zope.event.notify(ActivityReverted(self))
return reverted_workitems
def __repr__(self):
return "Activity(%r)" % (
self.process.process_definition_identifier + '.' +
self.activity_definition_identifier
)
class WorkflowData(persistent.Persistent):
"""Container for workflow-relevant and application-relevant data
"""
class Sequence(object):
counter = 0
def __init__(self, counter=0):
self.counter = counter
def next(self):
self.counter += 1
return self.counter
def current(self):
return self.counter
@interface.implementer(interfaces.IActivityContainer)
class ActivityContainer(dict):
def getActive(self):
return [a for a in self.values() if a.active]
def getFinished(self):
return [a for a in self.values() if not a.active]
@interface.implementer(interfaces.IProcess)
class Process(persistent.Persistent):
ActivityFactory = Activity
WorkflowDataFactory = WorkflowData
isStarted = False
isFinished = False
isAborted = False
starterActivityId = None
starterWorkitemId = None
execution = interfaces.SYNCHRONOUS
asyncflowId = None
deadlineTimer = defaultDeadlineTimer
deadlineCanceller = defaultDeadlineCanceller
def __init__(self, definition, start, context=None):
self.process_definition_identifier = definition.id
self.context = context
self._definition = definition
self.activities = ActivityContainer()
self.activityIdSequence = Sequence()
self.asyncflowIdSequence = Sequence()
self.workflowRelevantData = self.WorkflowDataFactory()
self.applicationRelevantData = self.WorkflowDataFactory()
self.subflows = []
@property
def startTransition(self):
return self.definition._start
@property
def definition(self):
try:
return getProcessDefinition(self.process_definition_identifier)
except zope.interface.interfaces.ComponentLookupError:
return self._definition
def start(self, *arguments):
if self.isStarted:
raise TypeError("Already started")
definition = self.definition
data = self.workflowRelevantData
evaluator = interfaces.IPythonExpressionEvaluator(self)
# Assign data defaults.
for _id, val in getInitialDataFieldsValues(
definition.datafields, evaluator).items():
setattr(data, _id, val)
# Now apply input parameters on top of the defaults.
args = arguments
inputparams = [p for p in definition.parameters if p.input]
for parameter in inputparams:
if args:
arg, args = args[0], args[1:]
elif parameter.initialValue is not None:
arg = evaluator.evaluate(parameter.initialValue)
else:
__traceback_info__ = (self, args, definition.parameters)
raise ValueError(
'Insufficient arguments passed to process.')
setattr(data, parameter.__name__, arg)
if args:
raise TypeError("Too many arguments. Expected %s. got %s" %
(len(inputparams), len(arguments)))
if self.execution == interfaces.ASYNCHRONOUS:
# Asynchronous processes gets their own asyncflowId
self.asyncflowId = self.asyncflowIdSequence.next()
self.isStarted = True
zope.event.notify(ProcessStarted(self))
self.transition(None, (self.startTransition, ))
def outputs(self):
outputs = {}
evaluator = interfaces.IPythonExpressionEvaluator(self)
for parameter in self.definition.parameters:
if parameter.output:
if hasattr(self.workflowRelevantData, parameter.__name__):
value = getattr(self.workflowRelevantData,
parameter.__name__)
elif parameter.initialValue is not None:
value = evaluator.evaluate(parameter.initialValue)
else:
__traceback_info__ = (self, parameter)
raise ValueError('Process finished, and there is an output '
'parameter with no value in workflow vars '
'and no initial value.')
outputs[parameter.__name__] = value
return outputs
def _finish(self):
self.isFinished = True
if self.starterActivityId:
if self.execution == interfaces.ASYNCHRONOUS:
# Starter activity was already finished, since this process is
# asynchronous
return
# Subflow finished, continue with main flow
starter = self.activities[self.starterActivityId]
wi, _, _, _ = starter.workitems[self.starterWorkitemId]
starter.workItemFinished(wi, self.outputs())
else:
self._terminateAsyncflows()
zope.event.notify(ProcessFinished(self))
def _terminateAsyncflows(self):
for act in self.activities.getActive():
if act.process.asyncflowId is None:
# This is activity from main process, don't touch it
continue
self.throwException(act)
def abort(self):
allActivities = self.activities.values()
for activity in sorted(allActivities,
key=Process.chronological_key,
reverse=True):
if activity.active:
activity.abort()
else:
activity.revert()
del self.activities[activity.id]
self.isAborted = True
zope.event.notify(ProcessAborted(self))
def transition(self, activity, transitions):
if transitions:
definition = self.definition
for transition in transitions:
activity_definition = definition.activities[transition.to]
next = None
if activity_definition.andJoinSetting:
# If it's an and-join, we want only one.
for a in self.activities.getActive():
if a.process is not activity.process:
continue
if a.activity_definition_identifier == transition.to:
# we already have the activity -- use it
next = a
break
if next is None:
next = self.ActivityFactory(self, activity_definition)
zope.event.notify(Transition(activity, next))
self.activities[next.id] = next
next.start(transition)
else:
self._finish()
self._p_changed = True
def __repr__(self):
return "Process(%r)" % self.process_definition_identifier
def getSubflowProcessDefinition(self, subflow_pd_name):
return getProcessDefinition(subflow_pd_name)
def initSubflow(self, subflow_pd_name, starter_activity_id,
starter_workitem_id,
execution=interfaces.SYNCHRONOUS,
proc_factory=None):
subflow_pd = self.getSubflowProcessDefinition(subflow_pd_name)
subflow = subflow_pd(self.context, factory=proc_factory)
subflow.activities = self.activities
subflow.activityIdSequence = self.activityIdSequence
subflow.subflows = self.subflows
subflow.starterActivityId = starter_activity_id
subflow.starterWorkitemId = starter_workitem_id
subflow.execution = execution
subflow.asyncflowIdSequence = self.asyncflowIdSequence
subflow.asyncflowId = self.asyncflowId
self.subflows.append(subflow)
return subflow
@staticmethod
def chronological_key(activity):
"""
Returns the key for sorting activities chronologically in a list
"""
return activity.id
def throwException(self, activity):
"""Throw process exception
We abort the activity and follow exception route
"""
activity.abort()
transitions = getValidOutgoingTransitions(
self, activity.definition,
exception=True
)
activity.process.transition(activity, transitions)
def deadlinePassedHandler(self, deadline):
# TODO: Is this threadsafe?
if deadline.definition.execution != 'SYNCHR':
raise NotImplementedError('Only Synchronous (SYNCHR) deadlines are'
' supported at this point.')
activity = deadline.activity
activity.abort(cancelDeadlineTimer=False)
transitions = getValidOutgoingTransitions(
self, activity.definition,
exception=True
)
# activity.process, since self could be parent flow and we need local
# flow in case of subflow
activity.process.transition(activity, transitions)
@interface.implementer(interfaces.IProcessStarted)
class ProcessStarted:
def __init__(self, process):
self.process = process
def __repr__(self):
return "ProcessStarted(%r)" % self.process
@interface.implementer(interfaces.IProcessFinished)
class ProcessFinished:
def __init__(self, process):
self.process = process
def __repr__(self):
return "ProcessFinished(%r)" % self.process
@interface.implementer(interfaces.IProcessAborted)
class ProcessAborted:
def __init__(self, process):
self.process = process
def __repr__(self):
return "ProcessAborted(%r)" % self.process
def getEvaluator(process):
"""Return expression evaluator object for given proceess"""
return interfaces.IPythonExpressionEvaluator(process)
def evaluateInputs(
process,
formal,
actual,
evaluator,
strict=True,
):
"""Evaluate input parameters for the process or activity.
Return list of pairs: (name, value) for each input parameter.
"""
args = []
for parameter, expr in zip(formal, actual):
if parameter.input:
if expr == '':
expr = getattr(parameter, 'initialValue', '')
__traceback_info__ = (parameter, expr)
try:
value = evaluator.evaluate(expr)
except interfaces.EvaluateException:
if strict:
raise
continue
args.append((parameter.__name__, value))
return args
def getValidOutgoingTransitions(process, activity_definition, exception=False,
checker=None):
"""Return list of valid outgoing transitions from given activity_definition
in given process.
Valid outgoing transitions are transitions having conditions that are
evaluating to True (or "otherwise" transitions if all conditions are
False).
If any exception is raised during evaluation of condition expressions,
it will be rerised only if ``strict`` parameter is True. Otherwise, the
condition will be treated as False.
A custom checker can be passed in if the caller wants more control over
what gets raised during evaluation of the condition. The checker should
take a transition and return a boolean if the transition is available.
"""
def defaultChecker(transition):
return transition.condition(process)
if checker is None:
checker = defaultChecker
transitions = []
otherwises = []
if exception:
# TODO: Need support for exception Name specification
for transition in activity_definition.outgoing:
if transition.type == 'DEFAULTEXCEPTION':
return [transition]
else:
raise interfaces.ProcessError(
'The activity_definition {} exited with an exception, but no '
'exception transition was found.'.format(activity_definition)
)
for transition in activity_definition.outgoing:
if transition.otherwise:
# do not consider 'otherwise' transitions just yet
otherwises.append(transition)
continue
if checker(transition):
transitions.append(transition)
if not activity_definition.andSplitSetting:
break # xor split, want first one
if not transitions:
# no condition was met, choose 'otherwise' transitions
transitions = otherwises
return transitions
@zope.interface.implementer(interfaces.IWorkItem)
class ScriptWorkItem(object):
"""Executes the script and stores all changed workflow-relevant
attributes."""
def __init__(self, process, activity, code):
self.process = process
self.activity = activity
self.code = code
def execute(self):
evaluator = interfaces.IPythonExpressionEvaluator(self.process)
evaluator.execute(self.code)
def start(self, args):
self.execute()
self.finish()
def finish(self):
self.activity.workItemFinished(self)
@zope.interface.implementer(interfaces.IWorkItem)
class SubflowWorkItem(object):
processFactory = None
def __init__(self, process, activity, subflow, execution):
self.process = process
self.activity = activity
self.subflow = subflow
self.execution = execution
def start(self, args):
subproc = self.process.initSubflow(self.subflow,
self.activity.id, self.id,
proc_factory=self.processFactory,
execution=self.execution)
pd = subproc.definition
tupArgs = [args.get(p.__name__, None) for p in pd.parameters if p.input]
subproc.start(*tupArgs)
if self.execution == interfaces.ASYNCHRONOUS:
self.activity.workItemFinished(self)
class WorkItemFinished(object):
def __init__(self, workitem, application, parameters, results):
self.workitem = workitem
self.application = application
self.parameters = parameters
self.results = results
def __repr__(self):
return "WorkItemFinished(%r)" % self.application
class WorkItemStarting(object):
"""Event emitted just before starting workitem
"""
def __init__(self, workitem, application, parameters):
self.workitem = workitem
self.application = application
self.parameters = parameters
def __repr__(self):
return "WorkItemStarting(%r)" % self.application
class WorkItemStarted(object):
"""Event emitted just after starting a workitem"""
def __init__(self, workitem, application, parameters):
self.workitem = workitem
self.application = application
self.parameters = parameters
def __repr__(self):
return "WorkItemStarted(%r)" % self.application
class WorkItemAborted:
def __init__(self, workitem, application, parameters):
self.workitem = workitem
self.application = application
self.parameters = parameters
def __repr__(self):
return "WorkItemAborted(%r)" % self.application
class WorkItemDiscarded(object):
def __init__(self, workitem, application, parameters):
self.workitem = workitem
self.application = application
self.parameters = parameters
def __repr__(self):
return "WorkItemDiscarded(%r)" % self.application
class Transition:
def __init__(self, from_, to):
self.from_ = from_
self.to = to
def __repr__(self):
return "Transition(%r, %r)" % (self.from_, self.to)
class TextCondition:
def __init__(self, type='CONDITION', source=''):
self.type = type
self.otherwise = type in ('OTHERWISE', )
if source:
self.set_source(source)
def set_source(self, source):
self.source = source
# make sure that we can compile the source
compile(source, '<string>', 'eval')
def __getstate__(self):
return {'source': self.source,
'type': self.type}
def __call__(self, process, data={}):
evaluator = interfaces.IPythonExpressionEvaluator(process)
return evaluator.evaluate(self.source, data)
class ActivityFinished:
def __init__(self, activity):
self.activity = activity
def __repr__(self):
return "ActivityFinished(%r)" % self.activity
class ActivityReverted:
def __init__(self, activity):
self.activity = activity
def __repr__(self):
return "ActivityReverted(%r)" % self.activity
class ActivityAborted:
def __init__(self, activity):
self.activity = activity
def __repr__(self):
return "ActivityAborted(%r)" % self.activity
class ActivityStarted:
def __init__(self, activity):
self.activity = activity
def __repr__(self):
return "ActivityStarted(%r)" % self.activity
@interface.implementer(interfaces.IParameterDefinition)
class Parameter(object):
input = output = False
initialValue = None
def __init__(self, name):
self.__name__ = name
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self.__name__)
class OutputParameter(Parameter):
output = True
class InputParameter(Parameter):
input = True
class InputOutputParameter(InputParameter, OutputParameter):
pass
@interface.implementer(interfaces.IApplicationDefinition,
interfaces.IExtendedAttributesContainer)
class Application:
def __init__(self, *parameters):
self.parameters = parameters
self.attributes = OrderedDict()
def defineParameters(self, *parameters):
self.parameters += parameters
def __repr__(self):
input = ', '.join([param.__name__ for param in self.parameters
if param.input == True])
output = ', '.join([param.__name__ for param in self.parameters
if param.output == True])
return "<Application %r: (%s) --> (%s)>" % (self.id, input, output)
@interface.implementer(interfaces.IParticipantDefinition,
interfaces.IExtendedAttributesContainer)
class Participant:
def __init__(self, name=None, type=None):
self.__name__ = name
self.type = type
self.description = None
self.attributes = OrderedDict()
def __repr__(self):
return "Participant(%r, %r)" % (self.__name__, self.type)
@interface.implementer(interfaces.IDataFieldDefinition)
class DataField:
def __init__(self, name=None, title=None, initialValue=None):
self.__name__ = name
self.title = title
self.initialValue = initialValue
def __repr__(self):
return "DataField(%r, %r, %r)" % (
self.__name__, self.title, self.initialValue)
ALLOWED_BUILTIN_NAMES = ['True', 'False', 'None']
ALLOWED_BUILTINS = {k: v for k, v in __builtins__.items()
if k in ALLOWED_BUILTIN_NAMES}
@interface.implementer(interfaces.IPythonExpressionEvaluator)
class PythonExpressionEvaluator(object):
"""Simple Python Expression Evaluator.
This evaluator only produces a limited namespace and does not use a safe
Python engine.
"""
component.adapts(interfaces.IProcess)
def __init__(self, process):
self.process = process
def evaluate(self, expr, locals={}):
__traceback_info__ = (expr, locals)
ns = {'context': self.process.context}
ns.update(vars(self.process.workflowRelevantData))
ns.update(vars(self.process.applicationRelevantData))
ns.update(locals)
try:
return eval(expr, ALLOWED_BUILTINS, ns)
except EVAL_EXCEPTIONS as e:
raise interfaces.EvaluateException(expr) from e
def execute(self, code, locals={}):
__traceback_info__ = (code, locals)
ns = {'context': self.process.context}
ns.update(vars(self.process.workflowRelevantData))
ns.update(vars(self.process.applicationRelevantData))
ns.update(locals)
ns.update(ALLOWED_BUILTINS)
result = {}
exec(code, ALLOWED_BUILTINS, result)
for name, value in result:
pass
def getProcessDefinition(name):
"""Return process definition with given name"""
factory = component.getUtility(interfaces.IProcessDefinitionFactory)
pd = factory.get(name)
if pd is None:
raise RuntimeError("Process with name %s is not found" % name)
return pd
def getActivityStackPath(activity_defs):
"""Return path of activity definition stack (as a string)
"""
return "/".join("%s@%s" % (ad.process.id, ad.id) for ad in activity_defs)
def calculateActivityStackPath(activity):
if activity.definition is None:
return None
stack = activity.getExecutionStack()
ads = [a.definition for a in stack if a.definition]
ads.append(activity.definition)
return getActivityStackPath(ads)