bus/session/session.go

Summary

Maintainability
A
2 hrs
Test Coverage
D
63%
package session

import (
    "fmt"
    "log"
    "sync"

    "github.com/lugu/qiloop/bus"
    "github.com/lugu/qiloop/bus/net"
    "github.com/lugu/qiloop/bus/services"
    "github.com/lugu/qiloop/type/object"
)

// Session implements the Session interface. It is an
// implementation of Session. It does not update the list of services
// and returns clients.
type Session struct {
    serviceList      []services.ServiceInfo
    serviceListMutex sync.Mutex
    Directory        services.ServiceDirectoryProxy
    cancel           func()
    cancelMutex      sync.Mutex
    added            chan services.ServiceAdded
    removed          chan services.ServiceRemoved
    userName         string
    userToken        string
    poll             map[string]bus.Client
    pollMutex        sync.RWMutex
}

func (s *Session) newObject(info services.ServiceInfo, ref object.ObjectReference) (bus.ObjectProxy, error) {
    c, err := s.client(info)
    if err != nil {
        return nil, fmt.Errorf("object connection error (%s): %s",
            info.Name, err)
    }
    proxy := bus.NewProxy(c, ref.MetaObject, ref.ServiceID, ref.ObjectID)
    return bus.MakeObject(proxy), nil
}

func (s *Session) newService(info services.ServiceInfo, objectID uint32) (p bus.Proxy, err error) {
    c, err := s.client(info)
    if err != nil {
        return nil, fmt.Errorf("service connection error (%s): %s", info.Name, err)
    }
    proxy, err := metaProxy(c, info.ServiceId, objectID)
    if err != nil {
        return nil, fmt.Errorf("get service meta object (%s): %s", info.Name, err)
    }
    return proxy, nil
}

// endpoint returns an net.EndPoint matching the info description. If
// an existing connection exists, it reuse the connection, otherwise
// it establish a new connection.
func (s *Session) client(info services.ServiceInfo) (bus.Client, error) {
    if len(info.Endpoints) == 0 {
        return nil, fmt.Errorf("empty address list")
    }
    s.pollMutex.RLock()
    for _, addr := range info.Endpoints {
        c, ok := s.poll[addr]
        if ok {
            s.pollMutex.RUnlock()
            return c, nil
        }
    }
    s.pollMutex.RUnlock()
    addr, channel, err := bus.SelectEndPoint(info.Endpoints, s.userName, s.userToken)
    if err != nil {
        return nil, fmt.Errorf("service connection error (%s): %s", info.Name, err)
    }
    filter := func(hdr *net.Header) (matched bool, keep bool) { return false, true }
    consumer := func(msg *net.Message) error { panic("unexpected") }
    closer := func(err error) {
        s.pollMutex.Lock()
        delete(s.poll, addr)
        s.pollMutex.Unlock()
    }
    endpoint := channel.EndPoint()
    s.pollMutex.Lock()
    c, ok := s.poll[addr]
    if ok {
        s.pollMutex.RUnlock()
        endpoint.Close()
        return c, nil
    }
    c = bus.NewClient(channel)
    s.poll[addr] = c
    s.pollMutex.Unlock()
    endpoint.AddHandler(filter, consumer, closer)
    return c, nil
}

func (s *Session) findServiceName(name string) (i services.ServiceInfo, err error) {
    s.serviceListMutex.Lock()
    defer s.serviceListMutex.Unlock()
    for _, service := range s.serviceList {
        if service.Name == name {
            return service, nil
        }
    }
    return i, fmt.Errorf("Service not found: %s", name)
}

func (s *Session) findServiceID(uid uint32) (i services.ServiceInfo, err error) {
    s.serviceListMutex.Lock()
    defer s.serviceListMutex.Unlock()
    for _, service := range s.serviceList {
        if service.ServiceId == uid {
            return service, nil
        }
    }
    return i, fmt.Errorf("Service ID not found: %d", uid)
}

// Proxy resolve the service name and returns a proxy to it.
func (s *Session) Proxy(name string, objectID uint32) (p bus.Proxy, err error) {
    info, err := s.findServiceName(name)
    if err != nil {
        return p, err
    }
    return s.newService(info, objectID)
}

// Object returns a reference to ref.
// TODO: cache the returned objects in order to benefit from the
// signal registration caching.
func (s *Session) Object(ref object.ObjectReference) (o bus.Proxy, err error) {
    info, err := s.findServiceID(ref.ServiceID)
    if err != nil {
        return o, err
    }
    obj, err := s.newObject(info, ref)
    if err != nil {
        return nil, err
    }
    return obj.Proxy(), nil
}

// metaProxy is to create proxies to the directory and server
// services needed for a session.
func metaProxy(c bus.Client, serviceID, objectID uint32) (p bus.Proxy, err error) {
    meta, err := bus.GetMetaObject(c, serviceID, objectID)
    if err != nil {
        return p, fmt.Errorf("Can not reach metaObject: %s", err)
    }
    return bus.NewProxy(c, meta, serviceID, objectID), nil
}

// NewAuthSession connects an address and return a new session.
func NewAuthSession(addr, user, token string) (bus.Session, error) {

    s := new(Session)
    s.userName = user
    s.userToken = token
    s.poll = map[string]bus.Client{}
    // Manually create a serviceList with just the ServiceInfo
    // needed to contact ServiceDirectory.
    s.serviceList = []services.ServiceInfo{
        services.ServiceInfo{
            Name:      "ServiceDirectory",
            ServiceId: 1,
            Endpoints: []string{
                addr,
            },
        },
    }
    var err error
    s.Directory, err = services.ServiceDirectory(s)
    if err != nil {
        return nil, fmt.Errorf("contact server: %s", err)
    }

    s.serviceList, err = s.Directory.Services()
    if err != nil {
        return nil, fmt.Errorf("list services: %s", err)
    }
    var cancelRemoved, cancelAdded func()
    cancelRemoved, s.removed, err = s.Directory.SubscribeServiceRemoved()
    if err != nil {
        return nil, fmt.Errorf("subscribe remove signal: %s", err)
    }
    cancelAdded, s.added, err = s.Directory.SubscribeServiceAdded()
    if err != nil {
        return nil, fmt.Errorf("subscribe added signal: %s", err)
    }
    s.cancel = func() {
        cancelRemoved()
        cancelAdded()
    }
    go s.updateLoop()
    return s, nil
}

// NewSession connects an address and return a new session.
func NewSession(addr string) (bus.Session, error) {
    return NewAuthSession(addr, "", "")
}

func (s *Session) updateServiceList() {
    services, err := s.Directory.Services()
    if err != nil {
        log.Printf("error: failed to update service directory list: %s", err)
        log.Printf("error: closing session.")
        if err := s.Terminate(); err != nil {
            log.Printf("error: session destruction: %s", err)
        }
    }
    s.serviceListMutex.Lock()
    s.serviceList = services
    s.serviceListMutex.Unlock()
}

// Terminate close the session.
func (s *Session) Terminate() error {
    // prevent Terminate being called twice
    s.cancelMutex.Lock()
    if s.cancel != nil {
        s.cancel()
        s.cancel = nil
    }
    s.cancelMutex.Unlock()
    s.pollMutex.Lock()
    for _, client := range s.poll {
        client.Channel().EndPoint().Close()
    }
    s.pollMutex.Unlock()
    return nil
}

func (s *Session) updateLoop() {
    for {
        select {
        case _, ok := <-s.removed:
            if !ok {
                return
            }
            s.updateServiceList()
        case _, ok := <-s.added:
            if !ok {
                return
            }
            s.updateServiceList()
        }
    }
}