registry/registry.go
package registry
import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/moleculer-go/moleculer/middleware"
"github.com/moleculer-go/moleculer/payload"
"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/service"
"github.com/moleculer-go/moleculer/strategy"
"github.com/moleculer-go/moleculer/transit"
"github.com/moleculer-go/moleculer/transit/pubsub"
log "github.com/sirupsen/logrus"
)
type messageHandlerFunc func(message moleculer.Payload)
type ServiceRegistry struct {
logger *log.Entry
transit transit.Transit
localNode moleculer.Node
nodes *NodeCatalog
services *ServiceCatalog
actions *ActionCatalog
events *EventCatalog
broker *moleculer.BrokerDelegates
strategy strategy.Strategy
stopping bool
updateNodeMetricsFrequency time.Duration
heartbeatFrequency time.Duration
heartbeatTimeout time.Duration
offlineCheckFrequency time.Duration
offlineTimeout time.Duration
nodeReceivedMutex *sync.Mutex
namespace string
}
// createTransit create a transit instance based on the config.
func createTransit(broker *moleculer.BrokerDelegates) transit.Transit {
transit := pubsub.Create(broker)
return transit
}
// createStrategy create a strategy instance based on the config.
func createStrategy(broker *moleculer.BrokerDelegates) strategy.Strategy {
//TODO: when new strategies are addes.. adde config check here to load the right one.
if broker.Config.StrategyFactory != nil {
return broker.Config.StrategyFactory().(strategy.Strategy)
}
return strategy.RandomStrategy{}
}
func CreateRegistry(nodeID string, broker *moleculer.BrokerDelegates) *ServiceRegistry {
config := broker.Config
transit := createTransit(broker)
strategy := createStrategy(broker)
logger := broker.Logger("registry", nodeID)
localNode := CreateNode(nodeID, true, logger.WithField("Node", nodeID))
localNode.Unavailable()
registry := &ServiceRegistry{
broker: broker,
transit: transit,
strategy: strategy,
logger: logger,
localNode: localNode,
actions: CreateActionCatalog(logger.WithField("catalog", "Actions")),
events: CreateEventCatalog(logger.WithField("catalog", "Events")),
services: CreateServiceCatalog(logger.WithField("catalog", "Services")),
nodes: CreateNodesCatalog(logger.WithField("catalog", "Nodes")),
updateNodeMetricsFrequency: config.UpdateNodeMetricsFrequency,
heartbeatFrequency: config.HeartbeatFrequency,
heartbeatTimeout: config.HeartbeatTimeout,
offlineCheckFrequency: config.OfflineCheckFrequency,
offlineTimeout: config.OfflineTimeout,
stopping: false,
nodeReceivedMutex: &sync.Mutex{},
namespace: config.Namespace,
}
registry.logger.Debug("Service Registry created for broker: ", nodeID)
broker.Bus().On("$broker.started", func(args ...interface{}) {
registry.logger.Debug("Registry -> $broker.started event")
registry.localNode.Available()
})
registry.setupMessageHandlers()
return registry
}
func (registry *ServiceRegistry) KnowService(name string) bool {
return registry.services.FindByName(name)
}
func (registry *ServiceRegistry) KnowAction(name string) bool {
return registry.actions.Find(name) != nil
}
func (registry *ServiceRegistry) KnowNode(nodeID string) bool {
_, found := registry.nodes.findNode(nodeID)
return found
}
func (registry *ServiceRegistry) LocalNode() moleculer.Node {
return registry.localNode
}
// Register a node as offline because we don't know all information about it
func (registry *ServiceRegistry) AddOfflineNode(nodeID string, hostname, ipAddress string, port int) moleculer.Node {
node := CreateNode(nodeID, false, registry.logger.WithField("Node", nodeID))
node.UpdateInfo(map[string]interface{}{
"hostname": hostname,
"port": port,
"ipList": []string{ipAddress},
})
registry.nodes.Add(node)
return node
}
func (registry *ServiceRegistry) setupMessageHandlers() {
messageHandler := map[string]messageHandlerFunc{
"HEARTBEAT": registry.filterMessages(registry.heartbeatMessageReceived),
"DISCONNECT": registry.filterMessages(registry.disconnectMessageReceived),
"INFO": registry.filterMessages(registry.RemoteNodeInfoReceived),
}
registry.broker.Bus().On("$registry.transit.message", func(args ...interface{}) {
registry.logger.Trace("Registry -> $registry.transit.message event - args: ", args)
command := args[0].(string)
message := args[1].(moleculer.Payload)
handler := messageHandler[command]
if handler == nil {
panic(errors.New(fmt.Sprint("Registry - $registry.transit.message event - invalid command:", command)))
}
handler(message)
})
}
func (registry *ServiceRegistry) Stop() {
registry.logger.Debug("Registry Stopping...")
registry.stopping = true
err := <-registry.transit.Disconnect()
registry.localNode.Unavailable()
if err != nil {
registry.logger.Debug("Error trying to disconnect transit - error: ", err)
return
}
registry.logger.Debug("Transit Disconnected -> Registry Full Stop!")
}
func (registry *ServiceRegistry) LocalServices() []*service.Service {
return []*service.Service{createNodeService(registry)}
}
func (registry *ServiceRegistry) GetNodeByID(nodeID string) moleculer.Node {
node, _ := registry.nodes.findNode(nodeID)
return node
}
func (registry *ServiceRegistry) heartbeat() {
registry.localNode.UpdateMetrics()
registry.transit.SendHeartbeat()
}
// Start : start the registry background processes.
func (registry *ServiceRegistry) Start() {
registry.logger.Debug("Registry Start() ")
registry.stopping = false
err := <-registry.transit.Connect(registry)
if err != nil {
panic(errors.New(fmt.Sprint("Could not connect to the transit. err: ", err)))
}
<-registry.transit.DiscoverNodes()
registry.nodes.Add(registry.localNode)
if registry.heartbeatFrequency > 0 {
go registry.loopWhileAlive(registry.heartbeatFrequency, registry.heartbeat)
}
if registry.heartbeatTimeout > 0 {
go registry.loopWhileAlive(registry.heartbeatTimeout, registry.checkExpiredRemoteNodes)
}
if registry.offlineCheckFrequency > 0 {
go registry.loopWhileAlive(registry.offlineCheckFrequency, registry.checkOfflineNodes)
}
}
func (registry *ServiceRegistry) ServiceForAction(name string) []*service.Service {
actions := registry.actions.Find(name)
if actions != nil {
result := make([]*service.Service, len(actions))
for i, action := range actions {
result[i] = action.Service()
}
return result
}
return nil
}
// HandleRemoteEvent handle when a remote event is delivered and call all the local handlers.
func (registry *ServiceRegistry) HandleRemoteEvent(context moleculer.BrokerContext) {
name := context.EventName()
groups := context.Groups()
if registry.stopping {
registry.logger.Error("HandleRemoteEvent() - registry is stopping. Discarding event -> name: ", name, " groups: ", groups)
return
}
broadcast := context.IsBroadcast()
registry.logger.Debug("HandleRemoteEvent() - name: ", name, " groups: ", groups)
var stg strategy.Strategy
if !broadcast {
stg = registry.strategy
}
entries := registry.events.Find(name, groups, true, true, stg)
for _, localEvent := range entries {
go localEvent.emitLocalEvent(context)
}
}
// LoadBalanceEvent load balance an event based on the known targetNodes.
func (registry *ServiceRegistry) LoadBalanceEvent(context moleculer.BrokerContext) []*EventEntry {
name := context.EventName()
params := context.Payload()
groups := context.Groups()
eventSig := fmt.Sprint("name: ", name, " groups: ", groups)
registry.logger.Trace("LoadBalanceEvent() - ", eventSig, " params: ", params)
entries := registry.events.Find(name, groups, true, false, registry.strategy)
if entries == nil {
msg := fmt.Sprint("Broker - no endpoints found for event: ", name, " it was discarded!")
registry.logger.Warn(msg)
return nil
}
for _, eventEntry := range entries {
if eventEntry.isLocal {
eventEntry.emitLocalEvent(context)
} else {
registry.emitRemoteEvent(context, eventEntry)
}
}
registry.logger.Trace("LoadBalanceEvent() - ", eventSig, " End.")
return entries
}
func (registry *ServiceRegistry) BroadcastEvent(context moleculer.BrokerContext) []*EventEntry {
name := context.EventName()
groups := context.Groups()
eventSig := fmt.Sprint("name: ", name, " groups: ", groups)
registry.logger.Trace("BroadcastEvent() - ", eventSig, " payload: ", context.Payload())
entries := registry.events.Find(name, groups, false, false, nil)
if entries == nil {
msg := fmt.Sprint("Broker - no endpoints found for event: ", name, " it was discarded!")
registry.logger.Warn(msg)
return nil
}
for _, eventEntry := range entries {
if eventEntry.isLocal {
eventEntry.emitLocalEvent(context)
} else {
registry.emitRemoteEvent(context, eventEntry)
}
}
registry.logger.Trace("BroadcastEvent() - ", eventSig, " End.")
return entries
}
// DelegateCall : invoke a service action and return a channel which will eventualy deliver the results ;).
// This call might be local or remote.
func (registry *ServiceRegistry) LoadBalanceCall(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
actionName := context.ActionName()
params := context.Payload()
registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " namespace: ", registry.namespace, " opts: ", opts)
actionEntry := registry.nextAction(actionName, registry.strategy, opts...)
if actionEntry == nil {
msg := "Registry - endpoint not found for actionName: " + actionName
if registry.namespace != "" {
msg = msg + " namespace: " + registry.namespace
}
registry.logger.Error(msg)
resultChan := make(chan moleculer.Payload, 1)
resultChan <- payload.Error(msg)
return resultChan
}
registry.logger.Debug("LoadBalanceCall() - actionName: ", actionName, " target nodeID: ", actionEntry.TargetNodeID())
if actionEntry.isLocal {
registry.broker.MiddlewareHandler("beforeLocalAction", context)
result := <-actionEntry.invokeLocalAction(context)
tempParams := registry.broker.MiddlewareHandler("afterLocalAction", middleware.AfterActionParams{context, result})
actionParams := tempParams.(middleware.AfterActionParams)
resultChan := make(chan moleculer.Payload, 1)
resultChan <- actionParams.Result
return resultChan
}
registry.broker.MiddlewareHandler("beforeRemoteAction", context)
result := <-registry.invokeRemoteAction(context, actionEntry)
tempParams := registry.broker.MiddlewareHandler("afterRemoteAction", middleware.AfterActionParams{context, result})
actionParams := tempParams.(middleware.AfterActionParams)
resultChan := make(chan moleculer.Payload, 1)
resultChan <- actionParams.Result
return resultChan
}
func (registry *ServiceRegistry) emitRemoteEvent(context moleculer.BrokerContext, eventEntry *EventEntry) {
context.SetTargetNodeID(eventEntry.TargetNodeID())
registry.logger.Trace("Before invoking remote event: ", context.EventName(), " context.TargetNodeID: ", context.TargetNodeID(), " context.Payload(): ", context.Payload())
registry.transit.Emit(context)
}
func (registry *ServiceRegistry) invokeRemoteAction(context moleculer.BrokerContext, actionEntry *ActionEntry) chan moleculer.Payload {
result := make(chan moleculer.Payload, 1)
context.SetTargetNodeID(actionEntry.TargetNodeID())
registry.logger.Trace("Before invoking remote action: ", context.ActionName(), " context.TargetNodeID: ", context.TargetNodeID(), " context.Payload(): ", context.Payload())
go func() {
actionResult := <-registry.transit.Request(context)
registry.logger.Trace("remote request done! action: ", context.ActionName(), " results: ", actionResult)
if registry.stopping {
registry.logger.Error("invokeRemoteAction() - registry is stopping. Discarding action result -> name: ", context.ActionName())
result <- payload.New(errors.New("can't complete request! registry stopping..."))
} else {
result <- actionResult
}
}()
return result
}
// removeServicesByNodeID
func (registry *ServiceRegistry) removeServicesByNodeID(nodeID string) {
svcs := registry.services.RemoveByNode(nodeID)
if len(svcs) > 0 {
for _, svc := range svcs {
registry.broker.Bus().EmitAsync(
"$registry.service.removed",
[]interface{}{svc.Summary()})
}
}
registry.actions.RemoveByNode(nodeID)
registry.events.RemoveByNode(nodeID)
}
// DisconnectNode remove node info (actions, events) from local registry.
func (registry *ServiceRegistry) DisconnectNode(nodeID string) {
node, exists := registry.nodes.findNode(nodeID)
if !exists {
return
}
registry.removeServicesByNodeID(nodeID)
node.Unavailable()
registry.broker.Bus().EmitAsync("$node.disconnected", []interface{}{nodeID})
registry.logger.Warnf("Node %s disconnected ", nodeID)
}
func (registry *ServiceRegistry) checkExpiredRemoteNodes() {
expiredNodes := registry.nodes.expiredNodes(registry.heartbeatTimeout)
for _, node := range expiredNodes {
registry.DisconnectNode(node.GetID())
}
}
func (registry *ServiceRegistry) checkOfflineNodes() {
expiredNodes := registry.nodes.expiredNodes(registry.offlineTimeout)
for _, node := range expiredNodes {
nodeID := node.GetID()
registry.nodes.removeNode(nodeID)
registry.logger.Warnf("Removed offline Node: %s from the registry because it hasn't submitted heartbeat in %d seconds.", nodeID, registry.offlineTimeout)
}
}
// loopWhileAlive : can the delegate runction in the given frequency and stop whe the registry is stopping
func (registry *ServiceRegistry) loopWhileAlive(frequency time.Duration, delegate func()) {
for {
if registry.stopping {
break
}
delegate()
time.Sleep(frequency)
}
}
func (registry *ServiceRegistry) filterMessages(handler func(message moleculer.Payload)) func(message moleculer.Payload) {
return func(message moleculer.Payload) {
if registry.stopping {
registry.logger.Warn("filterMessages() - registry is stopping. Discarding message: ", message)
return
}
if message.Get("sender").Exists() && message.Get("sender").String() == registry.localNode.GetID() {
registry.logger.Debug("filterMessages() - Same host message (sender == localNodeID). discarding... ", message)
return
}
handler(message)
}
}
func (registry *ServiceRegistry) heartbeatMessageReceived(message moleculer.Payload) {
heartbeat := message.RawMap()
succesful := registry.nodes.HeartBeat(heartbeat)
if !succesful {
sender := heartbeat["sender"].(string)
registry.transit.DiscoverNode(sender)
}
}
// disconnectMessageReceived handles when a disconnect msg is received.
// It remove all actions/events from the sender node from the local registry.
func (registry *ServiceRegistry) disconnectMessageReceived(message moleculer.Payload) {
sender := message.Get("sender").String()
node, exists := registry.nodes.findNode(sender)
registry.logger.Debug("disconnectMessageReceived() sender: ", sender, " exists: ", exists)
if exists {
registry.DisconnectNode(node.GetID())
}
}
func compatibility(info map[string]interface{}) map[string]interface{} {
_, exists := info["version"]
if !exists {
info["version"] = ""
}
return info
}
func (registry *ServiceRegistry) ForEachNode(forEAchFunc moleculer.ForEachNodeFunc) {
registry.nodes.ForEachNode(forEAchFunc)
}
func (registry *ServiceRegistry) GetLocalNode() moleculer.Node {
return registry.localNode
}
// remoteNodeInfoReceived process the remote node info message and add to local registry.
func (registry *ServiceRegistry) RemoteNodeInfoReceived(message moleculer.Payload) {
registry.nodeReceivedMutex.Lock()
defer registry.nodeReceivedMutex.Unlock()
msgMap := message.RawMap()
nodeID := message.Get("sender").String()
services := message.Get("services").MapArray()
exists, reconnected, removedServices := registry.nodes.Info(msgMap)
for _, serviceInfo := range services {
serviceInfo = compatibility(serviceInfo)
svc, newService, updatedActions, newActions, deletedActions, updatedEvents, newEvents, deletedEvents := registry.services.updateRemote(nodeID, serviceInfo)
for _, newAction := range newActions {
serviceAction := service.CreateServiceAction(
serviceInfo["name"].(string),
newAction.Name(),
nil,
moleculer.ObjectSchema{nil})
registry.actions.Add(serviceAction, svc, false)
}
for _, updates := range updatedActions {
fullname := updates["name"].(string)
registry.actions.Update(nodeID, fullname, updates)
}
for _, deleted := range deletedActions {
fullname := deleted.FullName()
registry.actions.Remove(nodeID, fullname)
}
for _, newEvent := range newEvents {
serviceEvent := service.CreateServiceEvent(
newEvent.Name(),
serviceInfo["name"].(string),
newEvent.Group(),
newEvent.Handler())
registry.events.Add(serviceEvent, svc, false)
}
for _, updates := range updatedEvents {
name := updates["name"].(string)
registry.events.Update(nodeID, name, updates)
}
for _, deleted := range deletedEvents {
name := deleted.Name()
registry.events.Remove(nodeID, name)
}
if newService {
registry.logger.Infof("Registry - remote %s service is registered.", svc.FullName())
registry.broker.Bus().EmitAsync(
"$registry.service.added",
[]interface{}{svc.Summary()})
}
for _, removedService := range removedServices {
name := removedService["name"].(string)
registry.services.Remove(nodeID, name)
}
}
var neighbours int64
if message.Get("neighbours").Exists() {
neighbours = message.Get("neighbours").Int64()
}
eventParam := []interface{}{nodeID, neighbours}
eventName := "$node.connected"
if exists {
eventName = "$node.updated"
} else if reconnected {
eventName = "$node.reconnected"
}
registry.broker.Bus().EmitAsync(eventName, eventParam)
}
// subscribeInternalEvent subscribe event listeners for internal events (e.g. $node.disconnected) using the localBus.
func (registry *ServiceRegistry) subscribeInternalEvent(event service.Event) {
registry.broker.Bus().On(event.Name(), func(data ...interface{}) {
params := payload.New(nil)
if len(data) > 0 {
params = payload.New(data[0])
}
brokerContext := registry.broker.BrokerContext()
eventContext := brokerContext.ChildEventContext(event.Name(), params, nil, false)
event.Handler()(eventContext.(moleculer.Context), params)
})
}
// AddLocalService : add a local service to the registry
// it will create endpoints for all service actions.
func (registry *ServiceRegistry) AddLocalService(service *service.Service) {
if registry.services.Find(service.Name(), service.Version(), registry.localNode.GetID()) {
registry.logger.Trace("registry - AddLocalService() - Service already registered, will ignore.. service fullName: ", service.FullName())
return
}
registry.services.Add(service)
actions := service.Actions()
events := service.Events()
for _, action := range actions {
registry.actions.Add(action, service, true)
}
for _, event := range events {
if strings.Index(event.Name(), "$") == 0 {
registry.subscribeInternalEvent(event)
} else {
registry.events.Add(event, service, true)
}
}
registry.localNode.Publish(service.AsMap())
registry.logger.Debug("Registry published local service: ", service.FullName(), " # actions: ", len(actions), " # events: ", len(events), " nodeID: ", service.NodeID())
registry.notifyServiceAdded(service.Summary())
}
// notifyServiceAdded notify when a service is added to the registry.
func (registry *ServiceRegistry) notifyServiceAdded(svc map[string]string) {
if registry.broker.IsStarted() {
registry.broker.Bus().EmitAsync(
"$registry.service.added",
[]interface{}{svc})
} else {
registry.broker.Bus().Once("$broker.started", func(...interface{}) {
registry.broker.Bus().EmitAsync(
"$registry.service.added",
[]interface{}{svc})
})
}
}
// nextAction it will find and return the next action to be invoked.
// If multiple nodes that contain this action are found it will use the strategy to decide which one to use.
func (registry *ServiceRegistry) nextAction(actionName string, strategy strategy.Strategy, opts ...moleculer.Options) *ActionEntry {
if len(opts) > 0 && opts[0].NodeID != "" {
return registry.actions.NextFromNode(actionName, opts[0].NodeID)
}
return registry.actions.Next(actionName, strategy)
}
func (registry *ServiceRegistry) GetNodeByAddress(address string) moleculer.Node {
return registry.nodes.GetNodeByAddress(address)
}
func (registry *ServiceRegistry) KnownEventListeners(addNode bool) []string {
events := registry.events.list()
result := make([]string, len(events))
for index, event := range events {
if addNode {
result[index] = fmt.Sprint(event.targetNodeID, ".", event.event.ServiceName(), ".", event.event.Name())
} else {
result[index] = fmt.Sprint(event.event.ServiceName(), ".", event.event.Name())
}
}
sort.Strings(result)
return result
}
func (registry *ServiceRegistry) KnownNodes() []string {
nodes := registry.nodes.list()
result := make([]string, len(nodes))
for index, node := range nodes {
result[index] = node.GetID()
}
sort.Strings(result)
return result
}