
View on GitHub


6 hrs
Test Coverage
from .parse import process_expression_string, serialize
from .dependency import DependencyCollector
from ..utils.collections import subdict
from ..schema.actions import RollbackAction, TerminateAction, PrimaryAction, CompositeAction, SimulatorAction, action_builtins, CollectReferences
from .builtins import ordered_builtins, global_builtins
from .exprgraph import dfs_make
from collections import ChainMap, defaultdict
from sortedcontainers import SortedSet
from frozendict import frozendict
from collections.abc import Sequence

import inspect

def register_builtin(name,fn,ordered_arguments=True):
    global ordered_builtins
    global global_builtins

    global_builtins[name] = fn
    if ordered_arguments:

class ExecutableExpression:
    # is a fancy lambda function that can be executed
    start = None
    builtins = global_builtins

    def __init__(self,keywords,builtins,fn,code,deps):
        self.keywords = keywords
        self.builtins = builtins
        self.fn = fn
        self.code = code
        self.deps = deps

    def to_string(self):
        return pformat(dict(
            keywords = self.keywords,
            builtins = self.builtins,
            code = self.code,
            fn = self.fn

    def initialize(cls,s):
            tree, depdict = process_expression_string(s,start=cls.start)
            deps, code = DependencyCollector(depdict),serialize(tree)
            #if deps.has_subvariables:
            #    err = 'Code `{s}` is nesting too many variables'
            #    assert has_subvariables, err.format(s=s) 
            keywords = list(deps.variables)

            # this step figures what builtins to use, picks them from the global_builtins list
            builtins = subdict(cls.builtins, ['__builtins__'] + list(deps.builtins))
            code2 = 'lambda {vars}: {code}'.format(vars=','.join(keywords),code=code)
                fn = eval(code2,builtins)
                x = cls(keywords=keywords,builtins=builtins,fn=fn,code=code,deps=deps)
                x = None
            x = None
        # Note checking of valid object is OUTSIDE the control of this factory method
        return x

    def initialize_from_strings(cls,strings,classes,cmax=0):
        d = dict()
        allowed_forms = '\n'.join([x for c in classes for x in c.allowed_forms])
        for s in strings:
            for c in classes:
                x = c.initialize(s)
                if x is not None:
            if x is None:
                err = "`{s}` does not create a valid instance of {c}. It must have one of the forms:\n {f}"
                raise ValueError(err.format(s=s,c=classes,f=allowed_forms))

            if x.deps.declared_variable is not None:
                d[x.deps.declared_variable] = x
                d['_{0}'.format(cmax)] = x
                cmax += 1
        return d

    def exec(self,*dicts):
        d = ChainMap(*dicts)
        kwargs = {x:d[x] for x in self.keywords}
        v = self.fn(**kwargs)
        if self.__class__.allowed_returns is not None:
            types = self.__class__.allowed_returns
            err = 'Value {v} returned by {cls} `{code}` is not one of {types}.'
            assert isinstance(v,types), err.format(v=v,code=self.code,cls=self.__class__.__name__,types=types)
        return v

    def build_exprgraph(self):
        tree, deps = process_expression_string(self.code,start=self.__class__.start)
        return dfs_make(tree)

    def full_string(self):
        return self.code    

class Constraint(ExecutableExpression):
    start = 'boolean_expression'
    builtins = global_builtins
    allowed_forms = ['<expr> <bool_op> <expr>']
    allowed_returns = (bool,)
class Computation(ExecutableExpression):
    start = 'assignment'
    builtins = global_builtins
    allowed_forms = ['<var> = <expr>']
    allowed_returns = None

    def build_exprgraph(self):
        assert self.deps.declared_variable is not None
        code = f'{self.deps.declared_variable} = {self.code}'
        tree, deps = process_expression_string(code,start=self.__class__.start)
        return dfs_make(tree)

    def full_string(self):
        return f'{self.deps.declared_variable} = {self.code}'

class RateLaw(ExecutableExpression):
    start = 'expression'
    builtins = global_builtins
    allowed_forms = ['<expr>']
    allowed_returns = (int,float,)

class ObservableExpression(ExecutableExpression):
    start = 'expression'
    builtins = global_builtins
    allowed_forms = ['<expr>']
    allowed_returns = None

######## Simulator methods #########
def rollback(expr):
    assert isinstance(expr,bool), "Rollback condition must evaluate to a boolean."
    return {True:RollbackAction(),False:[]}[expr]

def terminate(expr):
    assert isinstance(expr,bool), "Terminate condition must evaluate to a boolean."
    return {True:TerminateAction(),False:[]}[expr]

########### ActionCaller ##########
# an executable expression object that when called on a match
# is equivalent to an action method call
class ActionCaller(ExecutableExpression):
    start = 'function_call'
    builtins = ChainMap(global_builtins,dict(rollback=rollback,terminate=terminate),action_builtins)
    allowed_forms = ['<actioncall> ( <boolexpr> )', '<pattern>.<var>.<actioncall> (<params>)', '<pattern>.<actioncall> (<params>)']
    allowed_returns = None

    def exec(self,match,*dicts):
        v = super().exec(match,*dicts)
        return v  

def initialize_from_string(string,classes):
    for c in classes:
        x = c.initialize(string)
        if x is not None:
            return x
    err = 'Could not create a valid instance of {0} from {1}'
    assert False, err.format(classes,string)

class ExecutableExpressionManager:
    # assume top-level variable are variables of a pattern
    # e.g., a.x => a is a node on a pattern, x is an attribute
    # similarly, a.x(), x is a computation

    def __init__(self,constraint_execs,namespace):
        self.execs = constraint_execs
        self.namespace = namespace

    def pprint(self):
        return '\n'.join([x.code for x in self.execs])

    def get_attribute_calls(self):
        attrcalls = defaultdict(SortedSet)
        for c in self.execs:
            for k,v in c.deps.attribute_calls.items():
            for fnametuple in c.deps.function_calls:
                if len(fnametuple)==2:
                    var,fname = fnametuple
                    if isinstance(self.namespace[var],type):
                        _class = self.namespace[var]
                        fn = getattr(_class,fname)
                        assert fn._is_computation, f'Could not find function {fnametuple}'
                        kws = sorted(set(fn._kws) & set(_class.get_literal_attrs()))
        for k,v in attrcalls.items():
            for a in v:
                yield k,a

    def get_helper_calls(self):
        helpercalls = defaultdict(SortedSet)
        for c in self.execs:
            for fnametuple,kwargs in c.deps.function_calls.items():
                if len(fnametuple)==2:
                    var,fname = fnametuple
                    if self.namespace[var]=='Helper Pattern':
        for k,v in helpercalls.items():
            for tup in v:
                yield k,tup

    def exec(self,match,*dicts):
        for c in self.execs:
            if c.deps.declared_variable is not None:
                match[c.deps.declared_variable] = c.exec(match,*dicts)
            elif not c.exec(match,*dicts):
                return None
        return match

class ActionManager:
    def __init__(self,action_execs,factories):
        self.execs = action_execs

        for e in self.execs:
            for fnametuple in e.deps.function_calls:
                if fnametuple == ('add',):
                    assert len(e.deps.variables)==1
                    var = list(e.deps.variables)[0]
                    assert var in factories

    def exec(self,match,*dicts):
        for c in self.execs:
            actions = c.exec(match,*dicts)
            if hasattr(c,'build_variable'):
                assert isinstance(actions[-1],CollectReferences)
                actions[-1].variable = c.build_variable
            if isinstance(actions,Sequence):
                yield from actions
                yield actions