bus/server.go

Summary

Maintainability
A
35 mins
Test Coverage
B
87%
package bus

import (
    "errors"
    "log"
    gonet "net"
    "sync"

    "github.com/lugu/qiloop/bus/net"
)

// ErrServiceNotFound is returned with a message refers to an unknown
// service.
var ErrServiceNotFound = errors.New("Service not found")

// ErrObjectNotFound is returned with a message refers to an unknown
// object.
var ErrObjectNotFound = errors.New("Object not found")

// ErrActionNotFound is returned with a message refers to an unknown
// action.
var ErrActionNotFound = errors.New("Action not found")

// ErrNotAuthenticated is returned with a message tries to contact a
// service without prior authentication.
var ErrNotAuthenticated = errors.New("Not authenticated")

// ErrTerminate is returned with an object lifetime ends while
// clients subscribes to its signals.
var ErrTerminate = errors.New("Object terminated")

// Activation is sent during activation: it informs the object of the
// context in which the object is being used.
type Activation struct {
    ServiceID uint32
    ObjectID  uint32
    Session   Session
    Terminate func()
    Service   Service
}

// Receiver handles incomming messages.
type Receiver interface {
    Receive(m *net.Message, from Channel) error
}

// Actor interface used by Server to manipulate services.
type Actor interface {
    Receiver
    Activate(activation Activation) error
    OnTerminate()
}

// firewall ensures an endpoint talks only to autorized services.
// Especially, it ensure authentication is passed.
func firewall(m *net.Message, from Channel) error {
    if from.Authenticated() == false && m.Header.Service != 0 {
        return ErrNotAuthenticated
    }
    return nil
}

// server listen from incomming connections, set-up the end points and
// forward the EndPoint to the dispatcher.
type server struct {
    listen        net.Listener
    addrs         []string
    namespace     Namespace
    Router        *Router
    contexts      map[Channel]bool
    contextsMutex sync.Mutex
    closeChan     chan int
    waitChan      chan error
}

// NewServer creates a new server which respond to incomming
// connection requests.
func NewServer(listener net.Listener, auth Authenticator,
    namespace Namespace, service1 Actor) (Server, error) {

    service0 := ServiceAuthenticate(auth)

    s := &server{
        listen:        listener,
        namespace:     namespace,
        contexts:      make(map[Channel]bool),
        contextsMutex: sync.Mutex{},
        closeChan:     make(chan int, 1),
        waitChan:      make(chan error, 1),
    }
    s.Router = NewRouter(service0, namespace, s.Session())
    _, err := s.NewService("ServiceDirectory", service1)
    if err != nil {
        s.Terminate()
        return nil, err
    }

    go s.run()
    return s, nil
}

// StandAloneServer starts a new server
func StandAloneServer(listener net.Listener, auth Authenticator,
    namespace Namespace) (Server, error) {

    service0 := ServiceAuthenticate(auth)

    s := &server{
        listen:        listener,
        namespace:     namespace,
        contexts:      make(map[Channel]bool),
        contextsMutex: sync.Mutex{},
        closeChan:     make(chan int, 1),
        waitChan:      make(chan error, 1),
    }
    s.Router = NewRouter(service0, namespace, s.Session())
    go s.run()
    return s, nil
}

// Service represents a running service.
type Service interface {
    ServiceID() uint32
    Add(o Actor) (uint32, error)
    Remove(objectID uint32) error
    Terminate() error
}

// ServiceReceiver represents the service used by the Server.
type ServiceReceiver interface {
    Service
    Receiver
}

// NewService returns a new service. The service is activated as part
// of the creation.
// This brings the new service online. The steps involves are:
// 1. request the name to the namespace (service directory)
// 2. activate the service
// 3. add the service to the router dispatcher
// 4. advertize the service to the namespace (service directory)
func (s *server) NewService(name string, object Actor) (Service, error) {

    s.Router.RLock()
    session := s.Router.session
    s.Router.RUnlock()

    // 1. reserve the name
    serviceID, err := s.namespace.Reserve(name)
    if err != nil {
        return nil, err
    }

    // 2. activate the service
    activation := serviceActivation(s.Router, session, serviceID)
    service, err := NewService(object, activation)
    if err != nil {
        return nil, err
    }

    // 3. make it available
    err = s.Router.Add(serviceID, service)
    if err != nil {
        return nil, err
    }

    // 4. advertize it
    err = s.namespace.Enable(serviceID)
    if err != nil {
        s.Router.Remove(serviceID)
        return nil, err
    }
    return service, nil
}

func (s *server) handle(stream net.Stream, authenticated bool) {

    context := &channel{
        capability: DefaultCap(),
    }
    if authenticated {
        context.SetAuthenticated()
    }
    filter := func(hdr *net.Header) (matched bool, keep bool) {
        if hdr.Type == net.Reply || hdr.Type == net.Error ||
            hdr.Type == net.Event || hdr.Type == net.Cancelled {
            return false, true
        }
        return true, true
    }
    consumer := make(chan *net.Message, 10)
    go func() {
        for msg := range consumer {
            err := firewall(msg, context)
            if err != nil {
                log.Printf("missing authentication from %s: %#v",
                    context.EndPoint().String(), msg.Header)
                context.SendError(msg, err)
                stream.Close()
                return
            }
            err = s.Router.Receive(msg, context)
            if err != nil {
                log.Printf("error %v: %s", msg.Header, err)
            }
        }
    }()
    closer := func(err error) {
        s.contextsMutex.Lock()
        defer s.contextsMutex.Unlock()
        if _, ok := s.contexts[context]; ok {
            delete(s.contexts, context)
        }
    }
    finalize := func(e net.EndPoint) {
        context.endpoint = e
        e.MakeHandler(filter, consumer, closer)
        s.contextsMutex.Lock()
        s.contexts[context] = true
        s.contextsMutex.Unlock()
    }
    net.EndPointFinalizer(stream, finalize)
}

func (s *server) run() {
    for {
        stream, err := s.listen.Accept()
        if err != nil {
            select {
            case <-s.closeChan:
            default:
                s.listen.Close()
                s.stoppedWith(err)
            }
            break
        }
        s.handle(stream, false)
    }
}

func (s *server) stoppedWith(err error) {
    // 1. informs all services
    s.Router.Terminate()
    // 2. close all connections
    s.closeAll()
    // 3. inform server's user
    s.waitChan <- err
    close(s.waitChan)
}

// CloseAll close the connecction. Return the first error if any.
func (s *server) closeAll() error {
    var ret error
    s.contextsMutex.Lock()
    defer s.contextsMutex.Unlock()
    for context := range s.contexts {
        err := context.EndPoint().Close()
        if err != nil && ret == nil {
            ret = err
        }
    }
    return ret
}

// WaitTerminate blocks until the server has terminated.
func (s *server) WaitTerminate() chan error {
    return s.waitChan
}

// Terminate stops a server.
func (s *server) Terminate() error {
    close(s.closeChan)
    err := s.listen.Close()
    s.stoppedWith(err)
    return err
}

// Client returns a local client able to contact services without
// creating a new connection.
func (s *server) Client() Client {
    ctl, srv := gonet.Pipe()
    s.handle(net.ConnStream(srv), true)
    return NewClient(NewChannel(net.ConnEndPoint(ctl), DefaultCap()))
}

// Session returns a local session able to contact local services
// without creating a new connection to the server.
func (s *server) Session() Session {
    return s.namespace.Session(s)
}