
View on GitHub


1 day
Test Coverage
from warnings import warn
import copy
from typing import Dict, Optional, Union, Type, TYPE_CHECKING, List
import gymnasium as gym
from yaml import dump
import numpy as np

import eagerx
from eagerx.core.view import SpecView
from eagerx.utils.utils import (
from eagerx.utils.utils_sub import substitute_args

    from eagerx.core.entities import Engine
    from eagerx import EngineGraph

class EntitySpec(object):
    def __init__(self, params):
        super(EntitySpec, self).__setattr__("_params", params)

    def __setattr__(self, name, value):
        raise AttributeError("You cannot set the new attributes to EntitySpec.")

    def __str__(self):
        return dump(self._params)

    def params(self):
        return self._params

class BackendSpec(EntitySpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the selected backend."""

    def initialize(self, spec_cls):

    def config(self) -> SpecView:
        """Provides an API to get/set the parameters to initialize.

        :return: (mutable) API to get/set parameters.
        return SpecView(self, depth=["config"], unlocked=True)

class ProcessorSpec(EntitySpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the processor."""

    def initialize(self, spec_cls):

    def config(self) -> SpecView:
        """Provides an API to get/set the parameters to initialize.

        :return: (mutable) API to get/set parameters.
        return SpecView(self, depth=[], unlocked=True)

class EngineStateSpec(EntitySpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the engine state."""

    def initialize(self, spec_cls):

    def config(self) -> SpecView:
        """Provides an API to get/set the parameters to initialize.

        :return: API to get/set parameters.
        return SpecView(self, depth=[], unlocked=True)

class BaseNodeSpec(EntitySpec):
    def __init__(self, params):

    def _lookup(self, depth, unlocked=False):
        name = self._params["config"]["name"]
        return SpecView(self, depth=[depth], name=name, unlocked=unlocked)

    def config(self) -> SpecView:
        """Provides an API to set/get the parameters to initialize.

        The default parameters are:

        - .. py:attribute:: Spec.config.name: str

            User specified unique node name.

        - .. py:attribute:: Spec.config.rate: float

            Rate (Hz) at which the :func:`~eagerx.core.entities.Node.callback` is called.

        - .. py:attribute:: Spec.config.process: int = 0

            Process in which the node is launched. See :class:`~eagerx.core.constants.process` for all options.

        - .. py:attribute:: Spec.config.color: str = grey

            Specifies the color of logged messages & node color in the GUI.
            Check-out the termcolor documentation for the supported colors.

        - .. py:attribute:: Spec.config.print_mode: int = 1

            Specifies the different modes for printing: `{1: TERMCOLOR, 2: ROS}`.

        - .. py:attribute:: Spec.config.log_level: int = 30

            Specifies the log level for the engine: `{0: SILENT, 10: DEBUG, 20: INFO, 30: WARN, 40: ERROR, 50: FATAL}`

        :return: API to get/set parameters.
        return self._lookup("config", unlocked=True)

    def inputs(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.inputs`.

        The mutable parameters are:

        - .. py:attribute:: Spec.inputs.<name>.window: int = 1

           A non-negative number that specifies the number of messages to pass to the node's :func:`~eagerx.core.entities.Node.callback`.

           - *window* = 1: Only the last received input message.

           - *window* = *x* > 1: The trailing last *x* received input messages.

           - *window* = 0: All input messages received since the last call to the node's :func:`~eagerx.core.entities.Node.callback`.

           .. note:: With *window* = 0, the number of input messages may vary and can even be zero.

        - .. py:attribute:: Spec.inputs.<name>.processor: ProcessorSpec = None

            A processor that preprocesses the received input message before passing it
            to the node's :func:`~eagerx.core.entities.Node.callback`.

        - .. py:attribute:: Spec.inputs.<name>.space: dict = None

            This space defines the format of valid messages.

        - .. py:attribute:: Spec.inputs.<name>.delay: float = 0.0

            A non-negative simulated delay (seconds). This delay is ignored if
            :attr:`~eagerx.core.entities.Engine.simulate_delays` = True
            in the engine's :func:`~eagerx.core.entities.Engine.spec`.

        - .. py:attribute:: Spec.inputs.<name>.skip: bool = False

            Skip the dependency on this input during the first call to the node's :func:`~eagerx.core.entities.Node.callback`.
            May be necessary to ensure that the connected graph is directed and acyclic.

        :return: API to get/set parameters.
        return self._lookup("inputs")

    def outputs(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.outputs`.

        The mutable parameters are:

        - .. py:attribute:: Spec.outputs.<name>.processor: ProcessorSpec = None

            A processor that preprocesses the output message, returned by :func:`~eagerx.core.entities.Node.callback`,
            before publishing it.

        - .. py:attribute:: Spec.outputs.<name>.space: dict = None

            This space defines the format of valid messages.

        :return: API to get/set parameters.
        return self._lookup("outputs")

    def states(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.states`.

        The mutable parameters are:

        - .. py:attribute:: Spec.states.<name>.space: dict = None

            This space defines the format of valid messages.

        :return: API to get/set parameters.
        return self._lookup("states")

    def initialize(self, spec_cls):
        import eagerx.core.register as register

            params = register.LOOKUP_TYPES[spec_cls.callback]
        except KeyError:
            if spec_cls.__name__ == "EnvNode":
                params = dict()

        if "targets" in params:
            from eagerx.core.entities import ResetNode

            assert issubclass(
                spec_cls, ResetNode
            ), "You can only have targets registered for nodes that inherit from the ResetNode baseclass."
            add_ft = True
            add_ft = False

        # Set default components
        for component, cnames in params.items():
            for cname, space in cnames.items():
                if component == "outputs":
                    if cname not in self.config.outputs:
                    mapping = dict(
                        rate="$(config rate)",
                    # Add feedthrough entries for each output if node is a reset node (i.e. when it has a target)
                    if add_ft:
                        mapping_ft = dict(
                        with self.feedthroughs as d:
                            d[cname] = mapping_ft
                elif component == "inputs":
                    if cname not in self.config.inputs:
                    space = eagerx.Space(shape=(), dtype="int64") if cname == "tick" else space
                    mapping = dict(
                elif component == "targets":
                    if cname not in self.config.targets:
                    mapping = dict(
                    if cname not in self.config.states:
                    component = "states"
                    mapping = dict(
                with getattr(self, component) as d:
                    d[cname] = mapping

    def add_input(
        cname: str,
        window: int = 1,
        delay: float = 0.0,
        skip: bool = False,
        address: str = None,
        processor: Optional[ProcessorSpec] = None,
        space: Optional[gym.spaces.Space] = None,
        mapping = dict(
            processor=processor.params if processor else None,
        with self.inputs as d:
            d[cname] = mapping

    def add_output(
        cname: str,
        processor: Optional[ProcessorSpec] = None,
        space: Optional[gym.spaces.Space] = None,
        mapping = dict(
            rate="$(config rate)",
            processor=processor.params if processor else None,
        with self.outputs as d:
            d[cname] = mapping

    def build(self, ns: str):
        params = self.params  # Creates a deepcopy
        name = self.config.name
        entity_id = self.config.entity_id

        # Replace args in .yaml
        context = {
            "ns": {"env_name": ns, "node_name": name},
            "config": params["config"],
        substitute_args(params, context, only=["config", "ns"])

        # Process inputs
        inputs = []
        for cname in self.config.inputs:
            assert (
                cname in params["inputs"]
            ), f'Received unknown {"input"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".'
            assert (
                "targets" not in params or cname not in params["targets"]
            ), f'Input "{cname}" cannot have the same cname as a target. Change either the input or target cname. Check the spec of "{name}" with entity_id "{entity_id}".'
            n = RxInput(name=cname, **params["inputs"][cname])

        # Process outputs
        outputs = []
        for cname in self.config.outputs:
            msg = f"The rate ({params['outputs'][cname]['rate']} Hz) set for action '{cname}' does not equal the environment rate ({self.config.rate} Hz)."
            assert params["outputs"][cname]["rate"] == self.config.rate, msg
            assert (
                cname in params["outputs"]
            ), f'Received unknown {"output"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".'
            if "address" in params["outputs"][cname]:
                address = params["outputs"][cname].pop("address")
                address = "%s/outputs/%s" % (name, cname)
            n = RxOutput(name=cname, address=address, **params["outputs"][cname])

        states = []
        for cname in self.config.states:
            assert (
                cname in params["states"]
            ), f'Received unknown {"state"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".'
            if "address" in params["states"][cname]:  # if 'env/supervisor', the state address is pre-defined (like an input)
                n = RxState(name=cname, **params["states"][cname])
                address = "%s/states/%s" % (name, cname)
                n = RxState(name=cname, address=address, **params["states"][cname])

        targets = []
        if "targets" in self.config:
            for cname in self.config.targets:
                assert (
                    cname in params["targets"]
                ), f'Received unknown {"target"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".'
                n = RxState(name=cname, **params["targets"][cname])

        feedthroughs = []
        if "feedthroughs" in params:
            assert "targets" in self.config, f'No targets defined for ResetNode "{name}".'
            assert len(self.config.targets) > 0, f'No targets selected for ResetNode "{name}".'
            for cname in self.config.outputs:
                # Add output details  to feedthroughs
                assert (
                    cname in params["feedthroughs"]
                ), f'Feedthrough "{cname}" must directly correspond to a selected output. Check the spec of "{name}" with entity_id "{entity_id}".'
                params["feedthroughs"][cname]["space"] = params["outputs"][cname]["space"]
                n = RxFeedthrough(feedthrough_to=cname, **params["feedthroughs"][cname])

        params["outputs"] = [i.build(ns=ns) for i in outputs]
        params["inputs"] = [i.build(ns=ns) for i in inputs]
        params["states"] = [i.build(ns=ns) for i in states]
        params["targets"] = [i.build(ns=ns) for i in targets]
        params["feedthroughs"] = [i.build(ns=ns) for i in feedthroughs]

        # Create rate dictionary with outputs
        chars_ns = len(ns) + 1
        rate_dict = dict()
        for i in params["outputs"]:
            assert i["rate"] is not None and isinstance(i["rate"], (int, float)) and i["rate"] > 0, (
                f'The rate of node "{name}" (and output cname "{i["name"]}") is misspecified: rate="{i["rate"]}". '
                'Make sure that it is of type(rate)=("int", "float",) and rate > 0.'
            address = i["address"][chars_ns:]
            rate_dict[address] = i["rate"]  # {'rate': i['rate']}

        # Put parameters in node namespace (watch out, order of dict keys probably matters...)
        node_params = {name: params, "rate": rate_dict}
        return replace_None(node_params)

class NodeSpec(BaseNodeSpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the node."""


class ResetNodeSpec(BaseNodeSpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the node."""

    def targets(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.targets`.

        The mutable parameters are:

        - .. py:attribute:: Spec.targets.<name>.processor: ProcessorSpec = None

            A processor that preprocesses the received state message before passing it
            to the node's :func:`~eagerx.core.entities.ResetNode.callback`.

        :return: API to get/set parameters.
        return self._lookup("targets")

    def feedthroughs(self) -> SpecView:
        """Provides an API to set/get the parameters of a feedthrough corresponding to registered :func:`eagerx.core.register.outputs`.

        The mutable parameters are:

        - .. py:attribute:: Spec.feedthroughs.<name>.processor: ProcessorSpec = None

            A processor that preprocesses the received input message before passing it
            to the node's :func:`~eagerx.core.entities.Node.callback`.

        - .. py:attribute:: Spec.feedthroughs.<name>.space: dict = None

            This space defines the format of valid messages.

        - .. py:attribute:: Spec.feedthroughs.<name>.delay: float = 0.0

            A non-negative simulated delay (seconds). This delay is ignored if
            :attr:`~eagerx.core.entities.Engine.simulate_delays` = True
            in the engine's :func:`~eagerx.core.entities.Engine.spec`.

        :return: API to get/set parameters.
        return self._lookup("feedthroughs")

class ObjectSpec(EntitySpec):
    """A parameter specification of an object."""

    def __init__(self, params):

    def _lookup(self, depth, unlocked=False):
        name = self._params["config"]["name"]
        return SpecView(self, depth=[depth], name=name, unlocked=unlocked)

    def gui(
        engine_cls: Type["Engine"],
        interactive: Optional[bool] = True,
        resolution: Optional[List[int]] = None,
        filename: Optional[str] = None,
    ) -> Union[None, np.ndarray]:
        """Opens a graphical user interface of the object's engine implementation.

        .. note:: Requires `eagerx-gui`:

        .. highlight:: python
        .. code-block:: python

            pip3 install eagerx-gui

        :param engine_cls: The class engine (not instance!) that was used to register the engine implementation (e.g. "PybulletEngine").
        :param interactive: If `True`, an interactive application is launched.
                            Otherwise, an RGB render of the GUI is returned.
                            This could be useful when using a headless machine.
        :param resolution: Specifies the resolution of the returned render when `interactive` is `False`.
                           If `interactive` is `True`, this argument is ignored.
        :param filename: If provided, the GUI is rendered to an svg file with this name.
                         If `interactive` is `True`, this argument is ignored.
        :return: RGB render of the GUI if `interactive` is `False`.
        import eagerx.core.register as register

        spec_copy = ObjectSpec(self.params)
        spec_copy._params["engine"] = {}
        engine = engine_cls.get_specification()
        graph = register.add_engine(spec_copy, engine)
        return graph.gui(interactive=interactive, resolution=resolution, filename=filename)

    def engine(self) -> Union[SpecView]:
        """Provides an API to set/get the parameters of an engine-specific implementation.

        The mutable parameters are:

        - Arguments (excluding spec) of the selected engine's :func:`~eagerx.core.entities.Engine.add_object` method.

        - .. py:attribute:: Spec.engine.states.<name>: EngineState

            Link an :class:`~eagerx.core.specs.EngineState` to a registered state with :func:`eagerx.core.register.states`.

        :return: API to get/set parameters.
        return SpecView(self, depth=["engine"], name=self._params["config"]["name"])

    def sensors(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.sensors`.

        The mutable parameters are:

        - .. py:attribute:: Spec.sensors.<name>.rate: float = 1.0

            Rate (Hz) at which the sensor's :func:`~eagerx.core.entities.EngineNode.callback` is called.

        - .. py:attribute:: Spec.sensors.<name>.space: dict = None

            This space defines the format of valid messages.

        :return: API to get/set parameters.
        return self._lookup("sensors")

    def actuators(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.actuators`.

        The mutable parameters are:

        - .. py:attribute:: Spec.actuators.<name>.rate: float = 1.0

            Rate (Hz) at which the actuator's :func:`~eagerx.core.entities.EngineNode.callback` is called.

        - .. py:attribute:: Spec.actuators.<name>.window: int = 1

           A non-negative number that specifies the number of messages to pass to the node's

           - *window* = 1: Only the last received input message.

           - *window* = *x* > 1: The trailing last *x* received input messages.

           - *window* = 0: All input messages received since the last call to the node's

           .. note:: With *window* = 0, the number of input messages may vary and can even be zero.

        - .. py:attribute:: Spec.actuators.<name>.space: dict = None

            This space defines the format of valid messages.

        - .. py:attribute:: Spec.actuators.<name>.delay: float = 0.0

            A non-negative simulated delay (seconds). This delay is ignored if
            :attr:`~eagerx.core.entities.Engine.simulate_delays` = True
            in the engine's :func:`~eagerx.core.entities.Engine.spec`.

        - .. py:attribute:: Spec.actuators.<name>.skip: bool = False

            Skip the dependency on this input during the first call to the node's :func:`~eagerx.core.entities.EngineNode.callback`.
            May be necessary to ensure that the connected graph is directed and acyclic.

        :return: API to get/set parameters.
        return self._lookup("actuators")

    def states(self) -> SpecView:
        """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.engine_states`.

        The mutable parameters are:

        - .. py:attribute:: Spec.states.<name>.space: dict = None

            This space defines the format of valid messages.

        :return: API to get/set parameters.
        return self._lookup("states")

    def config(self) -> SpecView:
        """Provides an API to set/get the parameters to initialize.

        The default parameters are:

        - Additional parameters registered with the :func:`eagerx.core.register.config` decorator.

        - .. py:attribute:: Spec.config.name: str

            User specified unique object name.

        - .. py:attribute:: Spec.config.actuators: list

            List with selected actuators. Must be a subset of the registered :func:`eagerx.core.register.actuators`.

        - .. py:attribute:: Spec.config.sensors: list

            List with selected sensors. Must be a subset of the registered :func:`eagerx.core.register.sensors`.

        - .. py:attribute:: Spec.config.states: list

            List with selected engine_states. Must be a subset of the registered :func:`eagerx.core.register.engine_states`.

        :return: API to get/set parameters.
        return self._lookup("config", unlocked=True)

    def initialize(self, spec_cls):
        import eagerx.core.register as register

        agnostic = register.LOOKUP_TYPES[spec_cls.make]

        # Set default components
        for component, cnames in agnostic.items():
            for cname, space in cnames.items():
                if component == "sensors":
                    mapping = dict(
                elif component == "actuators":
                    mapping = dict(
                    component = "states"
                    mapping = dict(
                with getattr(self, component) as d:
                    d[cname] = mapping
                # Select component per default
                if cname not in getattr(self.config, component):
                    getattr(self.config, component).append(cname)

    def _initialize_object_graph(self):
        mapping = dict()
        for component in ["sensors", "actuators"]:
                mapping[component] = getattr(self, component)
            except AttributeError:

        from eagerx.core.graph_engine import EngineGraph

        graph = EngineGraph.create(**mapping)
        return graph

class EngineSpec(BaseNodeSpec):
    """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the engine."""

    def config(self) -> SpecView:
        """Provides an API to set/get the parameters to initialize.

        The default parameters are:

        - .. py:attribute:: Spec.config.rate: float

            Rate (Hz) at which the :func:`~eagerx.core.entities.Engine.callback` is called.

        - .. py:attribute:: Spec.config.process: int = 0

            Process in which the engine is launched. See :class:`~eagerx.core.constants.process` for all options.

        - .. py:attribute:: Spec.config.sync: bool = True

            Flag that specifies whether we run reactive or asynchronous.

        - .. py:attribute:: Spec.config.real_time_factor: float = 0

            A specified upper bound on the real-time factor. `Wall-clock-rate`=`real_time_factor`*`rate`.
            If `real_time_factor` < 1 the simulation is slower than real time.

        - .. py:attribute:: Spec.config.simulate_delays: bool = True

            Flag that specifies whether input delays are simulated.
            You probably want to set this to `False` when running in the real-world.

        - .. py:attribute:: Spec.config.color: str = grey

            Specifies the color of logged messages. Check-out the termcolor documentation for the supported colors.

        - .. py:attribute:: Spec.config.print_mode: int = 1

            Specifies the different modes for printing: `{1: TERMCOLOR, 2: ROS}`.

        - .. py:attribute:: Spec.config.log_level: int = 30

            Specifies the log level for the engine: `{0: SILENT, 10: DEBUG, 20: INFO, 30: WARN, 40: ERROR, 50: FATAL}`.

        :return: API to get/set parameters.
        return self._lookup("config", unlocked=True)

    def objects(self) -> SpecView:
        """Provides an API to set/get the parameters to add an object to the engine.

        To add a new object, please use :func:`~eagerx.core.specs.Enginespec.add_object`.

        Arguments correspond to the signature of :func:`~eagerx.core.entities.Engine.add_object`.

        :return: API to get/set parameters.
        return self._lookup("objects", unlocked=True)

    def add_object(
        name: str,
        **kwargs: Union[bool, int, float, str, List, Dict],
    ) -> None:
        """Adds an object to the simulator that is interfaced by the engine.

        :param kwargs: Other arguments of :func:`~eagerx.core.entities.Engine.add_object`.
        # todo: check arguments of engine_spec.add_object(...) are arguments of engine.add_object(...).
        with self.objects as d:
            assert name not in d, f"There is already an object called `{name}` added. Names must be unique."
            d[name] = dict(engine_states=dict(), nodes=dict(), add_object=dict())
            with d[name].add_object as s:
                s["name"] = name

    def build(self, ns: str):
        params = super().build(ns)

        # Convert engine states
        for name, obj in params[self.config.name]["objects"].items():
            for cname, view in obj["engine_states"].items():
                address = f"{name}/states/{cname}"
                processor = view["processor"] if view["processor"] != "null" else None
                spec = RxEngineState(cname, address, view["state"], processor, view["space"])
                obj["engine_states"][cname] = spec.build(ns=ns)
        return replace_None(params)

    def _add_engine_states(self, name: str, spec: ObjectSpec):
        # Pop states that were not implemented.
        assert name in self.objects, f"There is no Object called `{name}' in engine.objects. First add the Object."
        states = spec.engine.states
        for cname in list(states.keys()):
            if cname in spec.config.states:
                if states[cname] is not None:
                        name, cname, states[cname], spec.states[cname]["space"], spec.states[cname]["processor"]
                        f"Engine state `{cname}` for object `{name}` will be ignored. "
                        f"There is no implementation provided for it in Engine `{self.config.entity_id}`."

    def _add_engine_state(self, name, cname, engine_state, space, processor=None):
        with self.objects[name].engine_states as s:
            s[cname] = dict(state=None, space=None, processor=None)
            s[cname]["state"] = engine_state
            s[cname]["space"] = space
            s[cname]["processor"] = processor

    def _initialize_engine_config(self, spec, engine_config):
        # Add default config
        with spec.engine as d:
            d["name"] = spec.config.name
            d["states"] = {}
            # Add all states to engine-specific params
            with d.states as s:
                for cname in spec.states.keys():
                    s[cname] = None

    def _register_object(self, spec: ObjectSpec) -> "EngineGraph":
        spec = copy.deepcopy(spec)

        # Construct context & replace placeholders
        context = {"config": spec.config.to_dict()}
        substitute_args(spec._params["config"], context, only=["config"])  # First resolve args within the context
        substitute_args(spec._params, context, only=["config"])  # Resolve rest of params

        # Add engine entry
        import eagerx.core.register as register

        spec._params["engine"] = {}
        return register.add_engine(spec, self)


class Component(object):
    def __init__(self, **kwargs):
        # Iterates over provided arguments and sets the provided arguments as class properties
        for key, value in kwargs.items():
            if key == "__class__":
                continue  # Skip if __class__ type
            setattr(self, key, value)

class RxInput(Component):
    def __init__(
        name: str,
        address: str,
        window: int = 0,
        processor: Dict = None,
        space: Dict = None,
        delay: float = 0.0,
        skip: bool = False,
        dtype: str = None,
        # Store parameters as properties in baseclass
        # IMPORTANT! Do not define variables locally you do **not** want to store
        # on the parameter server anywhere before calling the baseclass' constructor.
        kwargs = locals().copy()
        super(RxInput, self).__init__(**kwargs)

        # Calculate other parameters based on previously defined attributes.

        # Error check the parameters here.

    def build(self, ns=""):
        params = self.__dict__.copy()
        params["address"] = "/".join(filter(None, [ns, params["address"]]))
        # Set dtype if not already set by source
        if params["dtype"] is None:
            params["dtype"] = params["space"]["dtype"]
        return params

class RxOutput(Component):
    def __init__(
        name: str,
        address: str,
        rate: float,
        processor: Dict = None,
        space: Dict = None,
        # Store parameters as properties in baseclass
        # IMPORTANT! Do not define variables locally you do **not** want to store
        # on the parameter server anywhere before calling the baseclass' constructor.
        kwargs = locals().copy()
        super(RxOutput, self).__init__(**kwargs)

        # Calculate other parameters based on previously defined attributes.

        # Error check the parameters here.

    def build(self, ns=""):
        params = self.__dict__.copy()
        params["address"] = "/".join(filter(None, [ns, params["address"]]))
        params["dtype"] = params["space"]["dtype"]
        return params

class RxFeedthrough(Component):
    def __init__(
        address: str,
        feedthrough_to: str,
        window: int = 1,
        processor: Dict = None,
        space: Dict = None,
        delay: float = 0.0,
        skip: bool = False,
        dtype: str = None,
        # Store parameters as properties in baseclass
        # IMPORTANT! Do not define variables locally you do **not** want to store
        # on the parameter server anywhere before calling the baseclass' constructor.
        kwargs = locals().copy()
        super(RxFeedthrough, self).__init__(**kwargs)

        # Calculate other parameters based on previously defined attributes.

        # Error check the parameters here.

    def build(self, ns=""):
        params = self.__dict__.copy()
        params["address"] = "/".join(filter(None, [ns, params["address"]]))
        # Set dtype if not already set by source
        if params["dtype"] is None:
            params["dtype"] = params["space"]["dtype"]
        return params

class RxState(Component):
    def __init__(
        name: str,
        address: str,
        space: Dict,
        processor: Dict = None,
        dtype: str = None,
        # Store parameters as properties in baseclass
        # IMPORTANT! Do not define variables locally you do **not** want to store
        # on the parameter server anywhere before calling the baseclass' constructor.
        kwargs = locals().copy()
        super(RxState, self).__init__(**kwargs)

        # Calculate other parameters based on previously defined attributes.

        # Error check the parameters here.

    def build(self, ns=""):
        params = self.__dict__.copy()
        params["address"] = "/".join(filter(None, [ns, params["address"]]))
        # Set dtype if not already set by source
        if params["dtype"] is None:
            params["dtype"] = params["space"]["dtype"]
        return params

class RxEngineState(Component):
    def __init__(
        name: str,
        address: str,
        state: Dict,
        processor: Dict = None,
        space: Dict = None,
        # Store parameters as properties in baseclass
        # IMPORTANT! Do not define variables locally you do **not** want to store
        # on the parameter server anywhere before calling the baseclass' constructor.
        kwargs = locals().copy()
        super(RxEngineState, self).__init__(**kwargs)

        # Calculate other parameters based on previously defined attributes.

        # Error check the parameters here.

    def build(self, ns=""):
        params = self.__dict__.copy()
        params["address"] = "/".join(filter(None, [ns, params["address"]]))
        params["dtype"] = params["space"]["dtype"]
        return params