bus/directory/directory.go

Summary

Maintainability
A
40 mins
Test Coverage
B
85%
package directory

import (
    "fmt"
    "io"
    "sort"

    "github.com/lugu/qiloop/bus"
    "github.com/lugu/qiloop/bus/util"
    "github.com/lugu/qiloop/type/object"
)

// serviceDirectory implements ServiceDirectoryImplementor
type serviceDirectory struct {
    staging  map[uint32]ServiceInfo
    services map[uint32]ServiceInfo
    lastID   uint32
    signal   ServiceDirectorySignalHelper
}

// serviceDirectoryImpl returns an implementation of ServiceDirectory
func serviceDirectoryImpl() *serviceDirectory {
    return &serviceDirectory{
        staging:  make(map[uint32]ServiceInfo),
        services: make(map[uint32]ServiceInfo),
        lastID:   0,
    }
}

func (s *serviceDirectory) Activate(activation bus.Activation,
    helper ServiceDirectorySignalHelper) error {
    s.signal = helper
    return nil
}

func (s *serviceDirectory) OnTerminate() {
}

func checkServiceInfo(i ServiceInfo) error {
    if i.Name == "" {
        return fmt.Errorf("empty name not allowed")
    }
    if i.MachineId == "" {
        return fmt.Errorf("empty machine id not allowed")
    }
    if i.ProcessId == 0 {
        return fmt.Errorf("process id zero not allowed")
    }
    if len(i.Endpoints) == 0 {
        return fmt.Errorf("missing end point (%s)", i.Name)
    }
    for _, e := range i.Endpoints {
        if e == "" {
            return fmt.Errorf("empty endpoint not allowed")
        }
    }
    return nil
}

func (s *serviceDirectory) info(serviceID uint32) (ServiceInfo, error) {
    info, ok := s.services[serviceID]
    if !ok {
        return info, fmt.Errorf("service %d not found", serviceID)
    }
    return info, nil
}

func (s *serviceDirectory) Service(service string) (info ServiceInfo, err error) {
    for _, info = range s.services {
        if info.Name == service {
            return info, nil
        }
    }
    return info, fmt.Errorf("Service not found: %s", service)
}

type serviceList []ServiceInfo

func (a serviceList) Len() int           { return len(a) }
func (a serviceList) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a serviceList) Less(i, j int) bool { return a[i].ServiceId < a[j].ServiceId }

func (s *serviceDirectory) Services() ([]ServiceInfo, error) {
    list := make([]ServiceInfo, 0, len(s.services))
    for _, info := range s.services {
        list = append(list, info)
    }
    sort.Sort(serviceList(list))
    return list, nil
}

func (s *serviceDirectory) RegisterService(newInfo ServiceInfo) (uint32, error) {
    if err := checkServiceInfo(newInfo); err != nil {
        return 0, err
    }
    for _, info := range s.staging {
        if info.Name == newInfo.Name {
            return 0, fmt.Errorf("Service name already staging: %s", info.Name)
        }
    }
    for _, info := range s.services {
        if info.Name == newInfo.Name {
            return 0, fmt.Errorf("Service name already ready: %s", info.Name)
        }
    }
    s.lastID++
    newInfo.ServiceId = s.lastID
    s.staging[s.lastID] = newInfo
    return s.lastID, nil
}

func (s *serviceDirectory) UnregisterService(id uint32) error {
    i, ok := s.services[id]
    if ok {
        delete(s.services, id)
        signal := s.signal
        if signal != nil {
            signal.SignalServiceRemoved(id, i.Name)
        }
        return nil
    }
    _, ok = s.staging[id]
    if ok {
        delete(s.staging, id)
        return nil
    }
    return fmt.Errorf("Service not found: %d", id)
}

func (s *serviceDirectory) ServiceReady(id uint32) error {
    i, ok := s.staging[id]
    if ok {
        delete(s.staging, id)
        s.services[id] = i
        signal := s.signal
        if signal != nil {
            signal.SignalServiceAdded(id, i.Name)
        }
        return nil
    }
    return fmt.Errorf("Service id not found: %d", id)
}

func (s *serviceDirectory) UpdateServiceInfo(i ServiceInfo) error {
    if err := checkServiceInfo(i); err != nil {
        return err
    }

    info, ok := s.services[i.ServiceId]
    if !ok {
        return fmt.Errorf("Service not found: %d (%s)", i.ServiceId, i.Name)
    }
    if info.Name != i.Name {
        return fmt.Errorf("Invalid name: %s (expected: %s)", i.Name,
            info.Name)
    }

    s.services[i.ServiceId] = i
    return nil
}

// MachineId returns a machine identifier.
func (s *serviceDirectory) MachineId() (string, error) {
    return util.MachineID(), nil
}

func (s *serviceDirectory) _socketOfService(P0 uint32) (
    o object.ObjectReference, err error) {
    return o, fmt.Errorf("_socketOfService not yet implemented")
}

func (s *serviceDirectory) Namespace(addr string) bus.Namespace {
    return &directoryNamespace{
        directory: s,
        addrs: []string{
            addr,
        },
    }
}

type directoryNamespace struct {
    directory *serviceDirectory
    addrs     []string
}

func (ns *directoryNamespace) Reserve(name string) (uint32, error) {
    info := ServiceInfo{
        Name:      name,
        ServiceId: 0,
        MachineId: util.MachineID(),
        ProcessId: util.ProcessID(),
        Endpoints: ns.addrs,
        SessionId: "",
    }
    return ns.directory.RegisterService(info)
}

func (ns *directoryNamespace) Remove(serviceID uint32) error {
    return ns.directory.UnregisterService(serviceID)
}

func (ns *directoryNamespace) Enable(serviceID uint32) error {
    return ns.directory.ServiceReady(serviceID)
}
func (ns *directoryNamespace) Resolve(name string) (uint32, error) {
    info, err := ns.directory.Service(name)
    if err != nil {
        return 0, err
    }
    return info.ServiceId, nil
}
func (ns *directoryNamespace) Session(server bus.Server) bus.Session {
    return &directorySession{
        server:    server,
        namespace: ns,
    }
}

type directorySession struct {
    server    bus.Server
    namespace *directoryNamespace
}

func (s *directorySession) Proxy(name string, objectID uint32) (bus.Proxy, error) {

    info, err := s.namespace.directory.Service(name)
    if err != nil {
        return nil, fmt.Errorf("service not found: %s", name)
    }

    clt, err := s.client(info)
    if err != nil {
        return nil, err
    }
    meta, err := bus.GetMetaObject(clt, info.ServiceId, objectID)
    if err != nil {
        return nil, fmt.Errorf("call metaObject (service %d, object %d): %s",
            info.ServiceId, objectID, err)
    }
    return bus.NewProxy(clt, meta, info.ServiceId, objectID), nil

}
func (s *directorySession) client(info ServiceInfo) (bus.Client, error) {

    if info.MachineId == util.MachineID() &&
        info.ProcessId == util.ProcessID() {
        // if on the same process and listening one of the
        // same port, then by-pass connection
        for _, addr1 := range info.Endpoints {
            for _, addr2 := range s.namespace.addrs {
                if addr1 == addr2 {
                    return s.server.Client(), nil
                }
            }
        }
    }
    _, channel, err := bus.SelectEndPoint(info.Endpoints, "", "")
    if err != nil {
        return nil, fmt.Errorf("object connection error (%s): %s",
            info.Name, err)
    }
    return bus.NewClient(channel), nil
}

func (s *directorySession) Object(ref object.ObjectReference) (bus.Proxy,
    error) {

    info, err := s.namespace.directory.info(ref.ServiceID)
    if err != nil {
        return nil, err
    }

    clt, err := s.client(info)
    if err != nil {
        return nil, err
    }
    return bus.NewProxy(clt, ref.MetaObject,
        ref.ServiceID, ref.ObjectID), nil
}

func (s *directorySession) Terminate() error {
    return nil
}

// ReadServiceInfo unmarshal a ServiceInfo struct.
func ReadServiceInfo(r io.Reader) (s ServiceInfo, err error) {
    return readServiceInfo(r)
}

// WriteServiceInfo marshal a ServiceInfo struct.
func WriteServiceInfo(s ServiceInfo, w io.Writer) (err error) {
    return writeServiceInfo(s, w)
}