neka-nat/ez_interactive_marker

View on GitHub
nodes/ez_interactive_marker

Summary

Maintainability
Test Coverage
#!/usr/bin/env python
import argparse
import functools
import roslib; roslib.load_manifest("ez_interactive_marker")
import rospy
import std_msgs.msg as sm
from interactive_markers.interactive_marker_server import *
from interactive_markers.menu_handler import *
from ez_interactive_marker.config_parser import load_from_string, load
from ez_interactive_marker.commands import TopicPub, ServiceCall, PyFunction
from ez_interactive_marker.controllers import GroupedCheckStateController
from ez_interactive_marker.subscriber_generator import SubscriberGenerator

def process_feedback(feedback=None):
    pass

class EzInteractiveMarker:
    def __init__(self, server, config):
        self._server = server
        self._menu_handler = MenuHandler()
        self._gp_csc = GroupedCheckStateController(self._server, self._menu_handler)
        self._sub_gen = SubscriberGenerator(self._server)
        self._tp_pub = TopicPub()
        self._sv_call = ServiceCall()
        self._py_func = PyFunction()
        node_name = rospy.get_name()
        rospy.Subscriber(node_name + '/update_config',
                         sm.String,
                         self._update_config_from_string)
        self._update_config(config)

    def _update_config(self, config):
        for k, v in config.items():
            self._server.insert(v['interactive_marker'], process_feedback)
            self._sub_gen.generate(v['interactive_marker'].name)
            rospy.loginfo('Insert: "%s"' % v['interactive_marker'].name)
            if 'menu' in v:
                self._make_menu(v['menu'])
                self._menu_handler.apply(self._server, v['interactive_marker'].name)
        self._server.applyChanges()

    def _update_config_from_string(self, config):
        self._update_config(load_from_string(config.data))

    def _make_menu(self, menu_config, parent=None):
        for c in menu_config:
            callback = functools.partial(self._execute_func, process_feedback)
            if 'command' in c:
                if c['command']['type'] == 'topic_pub':
                    callback = functools.partial(self._execute_func,
                                                 functools.partial(self._tp_pub, c['command']['args']))
                elif c['command']['type'] == 'service_call':
                    callback = functools.partial(self._execute_func,
                                                 functools.partial(self._sv_call, c['command']['args']))
                elif c['command']['type'] == 'py_function':
                    callback = functools.partial(self._execute_func,
                                                 functools.partial(self._py_func, c['command']['args']))
                else:
                    raise ValueError('Unsupported command type %s' % c['command']['type'])
            entry = self._menu_handler.insert(c['title'], parent, callback=callback)
            if 'group' in c:
                self._gp_csc.add_group(c['group'], entry)
            if 'children' in c:
                self._make_menu(c['children'], entry)

    def _execute_func(self, main_func, feedback=None):
        main_func(feedback)
        if not feedback is None:
            self._gp_csc.update_check_state(feedback.menu_entry_id)

def main():
    parser = argparse.ArgumentParser(description='Ez Interactive marker')
    parser.add_argument('-c', '--config', type=str, default='', help='Config file name.')
    args = parser.parse_args(rospy.myargv()[1:])
    rospy.init_node("ez_interactive_marker")
    server_name = rospy.get_param('~server_name',
                                  'ez_interactive_marker')
    file_name = rospy.get_param('~config_file', args.config)
    ims = {}
    if file_name != '':
        ims = load(file_name)
    server = InteractiveMarkerServer(server_name)
    eim = EzInteractiveMarker(server, ims)
    rospy.spin()

if __name__ == "__main__":
    main()