moleculer-go/moleculer

View on GitHub
broker/broker.go

Summary

Maintainability
F
3 days
Test Coverage
package broker

import (
    "errors"
    "strings"
    "time"

    "github.com/hashicorp/go-uuid"
    bus "github.com/moleculer-go/goemitter"
    "github.com/moleculer-go/moleculer"
    "github.com/moleculer-go/moleculer/cache"
    "github.com/moleculer-go/moleculer/context"
    "github.com/moleculer-go/moleculer/metrics"
    "github.com/moleculer-go/moleculer/middleware"
    "github.com/moleculer-go/moleculer/payload"
    "github.com/moleculer-go/moleculer/registry"
    "github.com/moleculer-go/moleculer/serializer"
    "github.com/moleculer-go/moleculer/service"

    log "github.com/sirupsen/logrus"
)

func mergeMaps(base, new map[string]interface{}) map[string]interface{} {
    if base == nil {
        base = map[string]interface{}{}
    }
    for key, value := range new {
        base[key] = value
    }
    return base
}

func mergeConfigs(baseConfig moleculer.Config, userConfig []*moleculer.Config) moleculer.Config {
    if len(userConfig) > 0 {
        for _, config := range userConfig {
            if config.Services != nil {
                baseConfig.Services = mergeMaps(baseConfig.Services, config.Services)
            }

            if config.LogLevel != "" {
                baseConfig.LogLevel = config.LogLevel
            }
            if config.LogFormat != "" {
                baseConfig.LogFormat = config.LogFormat
            }
            if config.DiscoverNodeID != nil {
                baseConfig.DiscoverNodeID = config.DiscoverNodeID
            }
            if config.Transporter != "" {
                baseConfig.Transporter = config.Transporter
            }
            if config.TransporterFactory != nil {
                baseConfig.TransporterFactory = config.TransporterFactory
            }
            if config.StrategyFactory != nil {
                baseConfig.StrategyFactory = config.StrategyFactory
            }
            if config.UpdateNodeMetricsFrequency != 0 {
                baseConfig.UpdateNodeMetricsFrequency = config.UpdateNodeMetricsFrequency
            }
            if config.HeartbeatFrequency != 0 {
                baseConfig.HeartbeatFrequency = config.HeartbeatFrequency
            }
            if config.HeartbeatTimeout != 0 {
                baseConfig.HeartbeatTimeout = config.HeartbeatTimeout
            }
            if config.OfflineCheckFrequency != 0 {
                baseConfig.OfflineCheckFrequency = config.OfflineCheckFrequency
            }
            if config.OfflineTimeout != 0 {
                baseConfig.OfflineTimeout = config.OfflineTimeout
            }
            if config.NeighboursCheckTimeout != 0 {
                baseConfig.NeighboursCheckTimeout = config.NeighboursCheckTimeout
            }
            if config.WaitForDependenciesTimeout != 0 {
                baseConfig.WaitForDependenciesTimeout = config.WaitForDependenciesTimeout
            }
            if config.Middlewares != nil {
                baseConfig.Middlewares = config.Middlewares
            }
            if config.Namespace != "" {
                baseConfig.Namespace = config.Namespace
            }
            if config.RequestTimeout != 0 {
                baseConfig.RequestTimeout = config.RequestTimeout
            }
            if config.MCallTimeout != 0 {
                baseConfig.MCallTimeout = config.MCallTimeout
            }
            if config.RetryPolicy != nil {
                baseConfig.RetryPolicy = config.RetryPolicy
            }
            if config.MaxCallLevel != 0 {
                baseConfig.MaxCallLevel = config.MaxCallLevel
            }
            if config.Metrics {
                baseConfig.Metrics = config.Metrics
            }
            if config.MetricsRate > 0 {
                baseConfig.MetricsRate = config.MetricsRate
            }
            if config.DisableInternalServices {
                baseConfig.DisableInternalServices = config.DisableInternalServices
            }
            if config.DisableInternalMiddlewares {
                baseConfig.DisableInternalMiddlewares = config.DisableInternalMiddlewares
            }
            if config.DontWaitForNeighbours {
                baseConfig.DontWaitForNeighbours = config.DontWaitForNeighbours
            }
            if config.WaitForNeighboursInterval != 0 {
                baseConfig.WaitForNeighboursInterval = config.WaitForNeighboursInterval
            }
            if config.Created != nil {
                baseConfig.Created = config.Created
            }
            if config.Started != nil {
                baseConfig.Started = config.Started
            }
            if config.Stopped != nil {
                baseConfig.Stopped = config.Stopped
            }
        }
    }
    return baseConfig
}

type ServiceBroker struct {
    namespace string

    logger *log.Entry

    localBus *bus.Emitter

    registry *registry.ServiceRegistry

    middlewares *middleware.Dispatch

    cache cache.Cache

    serializer *serializer.Serializer

    services []*service.Service

    started  bool
    starting bool

    rootContext moleculer.BrokerContext

    config moleculer.Config

    delegates *moleculer.BrokerDelegates

    id string

    instanceID string

    localNode moleculer.Node
}

// GetLocalBus : return the service broker local bus (Event Emitter)
func (broker *ServiceBroker) LocalBus() *bus.Emitter {
    return broker.localBus
}

// stopService stop the service.
func (broker *ServiceBroker) stopService(svc *service.Service) {
    broker.middlewares.CallHandlers("serviceStopping", svc)
    svc.Stop(broker.rootContext.ChildActionContext("service.stop", payload.Empty()))
    broker.middlewares.CallHandlers("serviceStopped", svc)
}

// applyServiceConfig apply broker config to the service configuration
// settings is an import config copy from broker to the service.
func (broker *ServiceBroker) applyServiceConfig(svc *service.Service) {
    if bkrConfig, exists := broker.config.Services[svc.Name()]; exists {
        svcConfig, ok := bkrConfig.(map[string]interface{})
        if ok {
            _, ok := svcConfig["settings"]
            if ok {
                settings, ok := svcConfig["settings"].(map[string]interface{})
                if ok {
                    svc.AddSettings(settings)
                } else {
                    broker.logger.Error("Could not add service settings - Error converting the input settings to map[string]interface{} - Invalid format! Service Config : ", svcConfig)
                }
            }

            _, ok = svcConfig["metadata"]
            if ok {
                metadata, ok := svcConfig["metadata"].(map[string]interface{})
                if ok {
                    svc.AddSettings(metadata)
                } else {
                    broker.logger.Error("Could not add service metadata - Error converting the input metadata to map[string]interface{} - Invalid format! Service Config : ", svcConfig)
                }
            }
        } else {
            broker.logger.Error("Could not apply service configuration - Error converting the service config to map[string]interface{} - Invalid format! Broker Config : ", bkrConfig)
        }

    }
}

// startService start a service.
func (broker *ServiceBroker) startService(svc *service.Service) {

    broker.logger.Debug("Broker start service: ", svc.FullName())

    broker.applyServiceConfig(svc)

    broker.middlewares.CallHandlers("serviceStarting", svc)

    broker.waitForDependencies(svc)

    broker.registry.AddLocalService(svc)

    broker.middlewares.CallHandlers("serviceStarted", svc)

    svc.Start(broker.rootContext.ChildActionContext("service.start", payload.Empty()))
}

// waitForDependencies wait for all services listed in the service dependencies to be discovered.
func (broker *ServiceBroker) waitForDependencies(service *service.Service) {
    if len(service.Dependencies()) == 0 {
        return
    }
    start := time.Now()
    for {
        if !broker.started {
            break
        }
        found := true
        for _, dependency := range service.Dependencies() {
            known := broker.registry.KnowService(dependency)
            if !known {
                found = false
                break
            }
        }
        if found {
            broker.logger.Debug("waitForDependencies() - All dependencies were found :) -> service: ", service.Name(), " wait For Dependencies: ", service.Dependencies())
            break
        }
        if time.Since(start) > broker.config.WaitForDependenciesTimeout {
            broker.logger.Warn("waitForDependencies() - Time out ! service: ", service.Name(), " wait For Dependencies: ", service.Dependencies())
            break
        }
        time.Sleep(time.Microsecond)
    }
}

func (broker *ServiceBroker) broadcastLocal(eventName string, params ...interface{}) {
    broker.LocalBus().EmitAsync(eventName, params)
}

func (broker *ServiceBroker) createBrokerLogger() *log.Entry {
    if strings.ToUpper(broker.config.LogFormat) == "JSON" {
        log.SetFormatter(&log.JSONFormatter{})
    } else {
        log.SetFormatter(&log.TextFormatter{
            DisableColors:             false,
            ForceColors:               true,
            EnvironmentOverrideColors: true,
        })
    }

    if strings.ToUpper(broker.config.LogLevel) == "WARN" {
        log.SetLevel(log.WarnLevel)
    } else if strings.ToUpper(broker.config.LogLevel) == "DEBUG" {
        log.SetLevel(log.DebugLevel)
    } else if strings.ToUpper(broker.config.LogLevel) == "TRACE" {
        log.SetLevel(log.TraceLevel)
    } else if strings.ToUpper(broker.config.LogLevel) == "ERROR" {
        log.SetLevel(log.ErrorLevel)
    } else if strings.ToUpper(broker.config.LogLevel) == "FATAL" {
        log.SetLevel(log.FatalLevel)
    } else {
        log.SetLevel(log.InfoLevel)
    }

    brokerLogger := log.WithFields(log.Fields{
        "broker": broker.id,
    })
    //broker.logger.Debug("Broker Log Setup -> Level", log.GetLevel(), " nodeID: ", nodeID)
    return brokerLogger
}

// addService internal addService .. adds one service.Service instance to broker.services list.
func (broker *ServiceBroker) addService(svc *service.Service) {
    svc.SetNodeID(broker.localNode.GetID())
    broker.services = append(broker.services, svc)
    if broker.started || broker.starting {
        broker.startService(svc)
    }
    broker.logger.Debug("Broker - addService() - fullname: ", svc.FullName(), " # actions: ", len(svc.Actions()), " # events: ", len(svc.Events()))
}

// resolveSchema getting schema from interface
func (broker *ServiceBroker) resolveSchema(svc interface{}) (moleculer.ServiceSchema, bool) {
    schema, isSchema := svc.(moleculer.ServiceSchema)
    if !isSchema {
        s, ok := svc.(*moleculer.ServiceSchema)
        if ok {
            schema = *s
            isSchema = ok
        }
    }

    return schema, isSchema
}

// createService create a new service instance, from a struct or a schema :)
func (broker *ServiceBroker) createService(svc interface{}) (*service.Service, error) {

    schema, isSchema := broker.resolveSchema(svc)

    if !isSchema {
        svc, err := service.FromObject(svc, broker.delegates)
        if err != nil {
            return nil, err
        }
        return svc, nil
    }
    return service.FromSchema(schema, broker.delegates), nil
}

// WaitFor : wait for all services to be available
func (broker *ServiceBroker) WaitFor(services ...string) error {
    for _, svc := range services {
        if err := broker.waitForService(svc); err != nil {
            return err
        }
    }
    return nil
}

// WaitForNodes : wait for all nodes to be available
func (broker *ServiceBroker) WaitForNodes(nodes ...string) error {
    for _, nodeID := range nodes {
        if err := broker.waitForNode(nodeID); err != nil {
            return err
        }
    }
    return nil
}

func (broker *ServiceBroker) KnowAction(action string) bool {
    return broker.registry.KnowAction(action)
}

// WaitForActions : wait for all actions to be available
func (broker *ServiceBroker) WaitForActions(actions ...string) error {
    for _, action := range actions {
        if err := broker.waitAction(action); err != nil {
            return err
        }
    }
    return nil
}

// waitForService wait for a service to be available
func (broker *ServiceBroker) waitForService(service string) error {
    start := time.Now()
    for {
        if broker.registry.KnowService(service) {
            break
        }
        if time.Since(start) > broker.config.WaitForDependenciesTimeout {
            broker.logger.Debug("Time:", time.Since(start))
            broker.logger.Debug("WaitForDependenciesTimeout:", broker.config.WaitForDependenciesTimeout)

            err := errors.New("waitForService() - Timeout ! service: " + service)
            broker.logger.Error(err)
            return err
        }
        time.Sleep(time.Microsecond)
    }
    return nil
}

// waitAction wait for an action to be available
func (broker *ServiceBroker) waitAction(action string) error {
    start := time.Now()
    for {
        if broker.registry.KnowAction(action) {
            break
        }
        if time.Since(start) > broker.config.WaitForDependenciesTimeout {
            err := errors.New("waitAction() - Timeout ! action: " + action)
            broker.logger.Error(err)
            return err
        }
        time.Sleep(time.Microsecond)
    }
    return nil
}

// waitForNode wait for a node to be available
func (broker *ServiceBroker) waitForNode(nodeID string) error {
    start := time.Now()
    for {
        if broker.registry.KnowNode(nodeID) {
            break
        }
        if time.Since(start) > broker.config.WaitForDependenciesTimeout {
            err := errors.New("waitForNode() - Timeout ! nodeID: " + nodeID)
            broker.logger.Error(err)
            return err
        }
        time.Sleep(time.Microsecond)
    }
    return nil
}

// Publish : for each service schema it will validate and create
// a service instance in the broker.
func (broker *ServiceBroker) Publish(services ...interface{}) {
    for _, item := range services {
        svc, err := broker.createService(item)
        if err != nil {
            panic(errors.New("Could not publish service - error: " + err.Error()))
        }
        broker.addService(svc)
    }
}

func (broker *ServiceBroker) Start() {
    if broker.IsStarted() {
        broker.logger.Warn("broker.Start() called on a broker that already started!")
        return
    }
    broker.starting = true
    broker.logger.Info("Moleculer is starting...")
    broker.logger.Info("Node ID: ", broker.localNode.GetID())

    broker.middlewares.CallHandlers("brokerStarting", broker.delegates)

    broker.registry.Start()

    internalServices := broker.registry.LocalServices()
    for _, service := range internalServices {
        service.SetNodeID(broker.localNode.GetID())
        broker.startService(service)
    }

    for _, service := range broker.services {
        broker.startService(service)
    }

    for _, service := range internalServices {
        broker.addService(service)
    }

    broker.logger.Debug("Broker -> registry started!")

    defer broker.broadcastLocal("$broker.started")
    defer broker.middlewares.CallHandlers("brokerStarted", broker.delegates)

    broker.started = true
    broker.starting = false
    broker.logger.Info("Service Broker with ", len(broker.services), " service(s) started successfully.")
}

func (broker *ServiceBroker) Stop() {
    if !broker.started {
        broker.logger.Info("Broker is not started!")
        return
    }
    broker.logger.Info("Service Broker is stopping...")

    broker.middlewares.CallHandlers("brokerStopping", broker.delegates)

    for _, service := range broker.services {
        broker.stopService(service)
    }

    broker.registry.Stop()

    broker.started = false
    broker.broadcastLocal("$broker.stopped")

    broker.middlewares.CallHandlers("brokerStopped", broker.delegates)
}

type callPair struct {
    label  string
    result moleculer.Payload
}

func (broker *ServiceBroker) invokeMCalls(callMaps map[string]map[string]interface{}, result chan map[string]moleculer.Payload) {
    if len(callMaps) == 0 {
        result <- make(map[string]moleculer.Payload)
        return
    }

    resultChan := make(chan callPair)
    for label, content := range callMaps {
        go func(label, actionName string, params interface{}, results chan callPair) {
            result := <-broker.Call(actionName, params)
            results <- callPair{label, result}
        }(label, content["action"].(string), content["params"], resultChan)
    }

    timeoutChan := make(chan bool, 1)
    go func(timeout time.Duration) {
        time.Sleep(timeout)
        timeoutChan <- true
    }(broker.config.MCallTimeout)

    results := make(map[string]moleculer.Payload)
    for {
        select {
        case pair := <-resultChan:
            results[pair.label] = pair.result
            if len(results) == len(callMaps) {
                result <- results
                return
            }
        case <-timeoutChan:
            timeoutError := errors.New("MCall timeout error.")
            broker.logger.Error(timeoutError)
            for label, _ := range callMaps {
                if _, exists := results[label]; !exists {
                    results[label] = payload.New(timeoutError)
                }
            }
            result <- results
            return
        }
    }
}

// MCall perform multiple calls and return all results together in a nice map indexed by name.
func (broker *ServiceBroker) MCall(callMaps map[string]map[string]interface{}) chan map[string]moleculer.Payload {
    result := make(chan map[string]moleculer.Payload, 1)
    go broker.invokeMCalls(callMaps, result)
    return result
}

// Call :  invoke a service action and return a channel which will eventualy deliver the results ;)
func (broker *ServiceBroker) Call(actionName string, params interface{}, opts ...moleculer.Options) chan moleculer.Payload {
    broker.logger.Trace("Broker - Call() actionName: ", actionName, " params: ", params, " opts: ", opts)
    if !broker.IsStarted() {
        panic(errors.New("Broker must be started before making calls :("))
    }
    actionContext := broker.rootContext.ChildActionContext(actionName, payload.New(params), opts...)
    return broker.registry.LoadBalanceCall(actionContext, opts...)
}

func (broker *ServiceBroker) Emit(event string, params interface{}, groups ...string) {
    broker.logger.Trace("Broker - Emit() event: ", event, " params: ", params, " groups: ", groups)
    if !broker.IsStarted() {
        panic(errors.New("Broker must be started before emiting events :("))
    }
    newContext := broker.rootContext.ChildEventContext(event, payload.New(params), groups, false)
    broker.registry.LoadBalanceEvent(newContext)
}

func (broker *ServiceBroker) Broadcast(event string, params interface{}, groups ...string) {
    broker.logger.Trace("Broker - Broadcast() event: ", event, " params: ", params, " groups: ", groups)
    if !broker.IsStarted() {
        panic(errors.New("Broker must be started before broadcasting events :("))
    }
    newContext := broker.rootContext.ChildEventContext(event, payload.New(params), groups, true)
    broker.registry.BroadcastEvent(newContext)
}

func (broker *ServiceBroker) IsStarted() bool {
    return broker.started
}

func (broker *ServiceBroker) GetLogger(name string, value string) *log.Entry {
    return broker.logger.WithField(name, value)
}

func (broker *ServiceBroker) LocalNode() moleculer.Node {
    return broker.localNode
}

func (broker *ServiceBroker) newLogger(name string, value string) *log.Entry {
    return broker.logger.WithField(name, value)
}

func (broker *ServiceBroker) setupLocalBus() {
    broker.localBus = bus.Construct()

    broker.localBus.On("$registry.service.added", func(args ...interface{}) {
        //TODO check code from -> this.broker.servicesChanged(true)
    })
}

func (broker *ServiceBroker) registerMiddlewares() {
    broker.middlewares = middleware.Dispatcher(broker.logger.WithField("middleware", "dispatcher"))
    for _, mware := range broker.config.Middlewares {
        broker.middlewares.Add(mware)
    }
    if !broker.config.DisableInternalMiddlewares {
        broker.registerInternalMiddlewares()
    }
}

func (broker *ServiceBroker) registerInternalMiddlewares() {
    broker.middlewares.Add(metrics.Middlewares())
}

func (broker *ServiceBroker) init() {
    broker.id = broker.config.DiscoverNodeID()
    broker.logger = broker.createBrokerLogger()
    broker.setupLocalBus()

    broker.registerMiddlewares()

    broker.config = broker.middlewares.CallHandlers("Config", broker.config).(moleculer.Config)

    instanceID, err := uuid.GenerateUUID()
    if err != nil {
        broker.logger.Error("Could not create an instance id -  error ", err)
        instanceID = "error creating instance id"
    }
    broker.instanceID = instanceID

    broker.delegates = broker.createDelegates()
    broker.registry = registry.CreateRegistry(broker.id, broker.delegates)
    broker.localNode = broker.registry.LocalNode()
    broker.rootContext = context.BrokerContext(broker.delegates)

}

func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates {
    return &moleculer.BrokerDelegates{
        LocalNode: broker.LocalNode,
        Logger:    broker.newLogger,
        Bus:       broker.LocalBus,
        IsStarted: broker.IsStarted,
        Config:    broker.config,
        InstanceID: func() string {
            return broker.instanceID
        },
        ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
            return broker.registry.LoadBalanceCall(context, opts...)
        },
        EmitEvent: func(context moleculer.BrokerContext) {
            broker.registry.LoadBalanceEvent(context)
        },
        BroadcastEvent: func(context moleculer.BrokerContext) {
            broker.registry.BroadcastEvent(context)
        },
        HandleRemoteEvent: func(context moleculer.BrokerContext) {
            broker.registry.HandleRemoteEvent(context)
        },
        ServiceForAction: func(name string) []*moleculer.ServiceSchema {
            svcs := broker.registry.ServiceForAction(name)
            if svcs != nil {
                result := make([]*moleculer.ServiceSchema, len(svcs))
                for i, svc := range svcs {
                    result[i] = svc.Schema()
                }
                return result
            }
            return nil
        },
        MultActionDelegate: func(callMaps map[string]map[string]interface{}) chan map[string]moleculer.Payload {
            return broker.MCall(callMaps)
        },
        BrokerContext: func() moleculer.BrokerContext {
            return broker.rootContext
        },
        MiddlewareHandler: broker.middlewares.CallHandlers,
        Publish:           broker.Publish,
        WaitFor:           broker.WaitFor,
    }
}

// New : returns a valid broker based on environment configuration
// this is usually called when creating a broker to starting the service(s)
func New(userConfig ...*moleculer.Config) *ServiceBroker {
    config := mergeConfigs(moleculer.DefaultConfig, userConfig)
    broker := ServiceBroker{config: config}
    broker.init()
    return &broker
}