Shoobx/shoobx.wfmc

View on GitHub
src/shoobx/wfmc/subflow.txt

Summary

Maintainability
Test Coverage
===============
Subflow support
===============

Here we demonstrate and test subflow support in shoobx.wfmc. Subflows referenced
in xpdl are executed in context of the main (calling) process and share all the
context with main process, including workflow variables and parameters.
Subflows are executed by special type of activity: "Sub-flow".

Directory with this file contains sample process using subflows::

    >>> import os
    >>> from shoobx.wfmc import xpdl
    >>> sample_process = os.path.join(this_directory, "subflow.xpdl")
    >>> with open(sample_process) as f:
    ...     package = xpdl.read(f)

First create environment for executing a process.

    >>> import zope.interface
    >>> from shoobx.wfmc import interfaces

Define WorkItems used by the process::

    >>> @zope.interface.implementer(interfaces.IWorkItem)
    ... class OutputWorkItem(object):
    ...     id = None
    ... 
    ...     def __init__(self, process, activity):
    ...         self.process = process
    ...         self.activity = activity
    ... 
    ...     def start(self, args):
    ...         print(("OUTPUT: " + args['message']))
    ...         self.activity.workItemFinished(self)

    >>> @zope.interface.implementer(interfaces.IWorkItem)
    ... class InputWorkItem(object):
    ...     id = None
    ... 
    ...     def __init__(self, process, activity):
    ...         self.process = process
    ...         self.activity = activity
    ... 
    ...     def start(self, args):
    ...         print(("INPUT: " + args['message']))

Define integration object::

    >>> from shoobx.wfmc.attributeintegration import AttributeIntegration

    >>> class Integration(AttributeIntegration):
    ...     def administratorParticipant(self, activity, process):
    ...         return "admin"
    ...
    ...     def inputWorkItem(self, participant, process, activity):
    ...         return InputWorkItem(process, activity)
    ...
    ...     def outputWorkItem(self, participant, process, activity):
    ...         return OutputWorkItem(process, activity)

Set integration for all definitions::

    >>> integration = Integration()
    >>> for pd in package.values():
    ...     pd.integration = integration

Register all processes in static factory::

    >>> import zope.component
    >>> from shoobx.wfmc.process import StaticProcessDefinitionFactory
    >>> pdfactory = StaticProcessDefinitionFactory()
    >>> for pd in package.values():
    ...     pdfactory.register(pd)

    >>> zope.component.provideUtility(pdfactory)

Now we can create and execute our process::

    >>> context = None
    >>> mainproc_def = package['mainflow']
    >>> proc = mainproc_def(context)
    >>> proc.start()
    OUTPUT: First mainflow activity
    OUTPUT: First subflow activity
    INPUT: Inner subflow activity

    >>> lastactivity = proc.activities[max(proc.activities.keys())]
    >>> lastactivity.activity_definition_identifier
    'innerflow_first'

    >>> wi, _, _, _ = list((lastactivity.workitems.values()))[0]
    >>> lastactivity.workItemFinished(wi, {'result': "from-the-deep"})
    INPUT: Hello Parameter

    >>> lastactivity = proc.activities[max(proc.activities.keys())]
    >>> lastactivity.activity_definition_identifier
    'subflow_second'

    >>> wi, _, _, _ = list((lastactivity.workitems.values()))[0]
    >>> lastactivity.workItemFinished(wi, {'result': "completed"})
    INPUT: Inner subflow activity

    >>> lastactivity = proc.activities[max(proc.activities.keys())]
    >>> wi, _, _, _ = list((lastactivity.workitems.values()))[0]
    >>> lastactivity.workItemFinished(wi, {'result': "from-the-deep-2"})
    OUTPUT: Second mainflow activity

Make sure process is finished now::

    >>> proc.isFinished
    True
    >>> proc.activities.getActive()
    []

All subflows are registered in main flow

    >>> sorted(proc.subflows, key=lambda p: p.definition.id)
    [Process('innerflow'), Process('innerflow'), Process('subflow')]

Variable from inner subflow should not be visible in main flow namespace::

    >>> hasattr(proc.workflowRelevantData, "value")
    False

Subflow output variable, however, is written to main flow context:

    >>> proc.workflowRelevantData.subflow_result
    'completed'