moleculer-go/moleculer

View on GitHub
transit/pubsub/pubsub.go

Summary

Maintainability
C
1 day
Test Coverage
package pubsub

import (
    "errors"
    "fmt"
    "math"
    "os"
    "strings"
    "sync"
    "time"

    "github.com/moleculer-go/moleculer/version"

    "github.com/moleculer-go/moleculer/payload"

    "github.com/moleculer-go/moleculer/context"
    "github.com/moleculer-go/moleculer/transit"
    "github.com/moleculer-go/moleculer/transit/kafka"
    "github.com/moleculer-go/moleculer/transit/memory"
    "github.com/moleculer-go/moleculer/transit/nats"
    "github.com/moleculer-go/moleculer/transit/tcp"
    "github.com/moleculer-go/moleculer/util"

    "github.com/moleculer-go/moleculer"
    "github.com/moleculer-go/moleculer/serializer"
    log "github.com/sirupsen/logrus"
)

// PubSub is a transit implementation.
type PubSub struct {
    logger               *log.Entry
    transport            transit.Transport
    broker               *moleculer.BrokerDelegates
    isConnected          bool
    pendingRequests      map[string]pendingRequest
    pendingRequestsMutex *sync.Mutex
    serializer           serializer.Serializer

    knownNeighbours   map[string]int64
    neighboursTimeout time.Duration
    neighboursMutex   *sync.Mutex
    brokerStarted     bool
}

const DATATYPE_UNDEFINED = 0
const DATATYPE_NULL = 1
const DATATYPE_JSON = 2
const DATATYPE_BUFFER = 3

func (pubsub *PubSub) onServiceAdded(values ...interface{}) {
    localNodeID := pubsub.broker.LocalNode().GetID()
    // Checking that was added local service
    isLocalServiceAdded := false
    for _, value := range values {
        if value.(map[string]string)["nodeID"] == localNodeID {
            isLocalServiceAdded = true
            break
        }
    }
    if isLocalServiceAdded {
        pubsub.broker.LocalNode().IncreaseSequence()
    }
    if isLocalServiceAdded && pubsub.isConnected && pubsub.brokerStarted {
        pubsub.broadcastNodeInfo("")
    }
}

func (pubsub *PubSub) onBrokerStarted(values ...interface{}) {
    if pubsub.isConnected {
        pubsub.broadcastNodeInfo("")
        pubsub.brokerStarted = true
    }
}

func Create(broker *moleculer.BrokerDelegates) transit.Transit {
    pendingRequests := make(map[string]pendingRequest)
    knownNeighbours := make(map[string]int64)
    transitImpl := PubSub{
        broker:               broker,
        isConnected:          false,
        pendingRequests:      pendingRequests,
        logger:               broker.Logger("Transit", ""),
        serializer:           serializer.New(broker),
        neighboursTimeout:    broker.Config.NeighboursCheckTimeout,
        knownNeighbours:      knownNeighbours,
        neighboursMutex:      &sync.Mutex{},
        pendingRequestsMutex: &sync.Mutex{},
    }

    broker.Bus().On("$node.disconnected", transitImpl.onNodeDisconnected)
    broker.Bus().On("$node.connected", transitImpl.onNodeConnected)
    broker.Bus().On("$broker.started", transitImpl.onBrokerStarted)
    broker.Bus().On("$registry.service.added", transitImpl.onServiceAdded)

    return &transitImpl
}

func (pubsub *PubSub) pendingRequestsByNode(nodeId string) []pendingRequest {
    list := []pendingRequest{}
    for _, p := range pubsub.pendingRequests {
        if p.context.TargetNodeID() == nodeId {
            list = append(list, p)
        }
    }
    return list
}

func (pubsub *PubSub) requestTimedOut(resultChan *chan moleculer.Payload, context moleculer.BrokerContext) func() {
    pError := payload.New(errors.New("request timeout"))
    return func() {
        pubsub.logger.Debug("requestTimedOut() nodeID: ", context.TargetNodeID())
        pubsub.pendingRequestsMutex.Lock()
        defer pubsub.pendingRequestsMutex.Unlock()

        p, exists := pubsub.pendingRequests[context.ID()]
        if exists {
            (*p.resultChan) <- pError
            p.timer.Stop()
            delete(pubsub.pendingRequests, p.context.ID())
        }
    }
}

func (pubsub *PubSub) onNodeDisconnected(values ...interface{}) {
    pubsub.pendingRequestsMutex.Lock()

    var nodeID string = values[0].(string)
    pending := pubsub.pendingRequestsByNode(nodeID)
    pubsub.logger.Debug("onNodeDisconnected() nodeID: ", nodeID, " pending: ", len(pending))
    if len(pending) > 0 {
        pError := payload.New(fmt.Errorf("Node %s disconnected. The request was canceled.", nodeID))
        for _, p := range pending {
            (*p.resultChan) <- pError
            p.timer.Stop()
            delete(pubsub.pendingRequests, p.context.ID())
        }
    }
    pubsub.pendingRequestsMutex.Unlock()

    pubsub.neighboursMutex.Lock()
    delete(pubsub.knownNeighbours, nodeID)
    pubsub.neighboursMutex.Unlock()
}

func (pubsub *PubSub) onNodeConnected(values ...interface{}) {
    nodeID := values[0].(string)
    neighbours := values[1].(int64)
    pubsub.logger.Debug("onNodeConnected() nodeID: ", nodeID, " neighbours: ", neighbours)
    pubsub.neighboursMutex.Lock()
    pubsub.knownNeighbours[nodeID] = neighbours
    pubsub.neighboursMutex.Unlock()
}

func isNats(v string) bool {
    return strings.Index(v, "nats://") > -1
}

func isKafka(v string) bool {
    return strings.Contains(v, "kafka://")
}

// CreateTransport : based on config it will load the transporter
// for now is hard coded for NATS Streaming localhost
func (pubsub *PubSub) createTransport() transit.Transport {
    pubsub.logger.Debug("createTransport() Config.Transporter: " + pubsub.broker.Config.Transporter)
    var transport transit.Transport
    if pubsub.broker.Config.TransporterFactory != nil {
        pubsub.logger.Info("Transporter: Custom factory")
        transport = pubsub.broker.Config.TransporterFactory().(transit.Transport)
    } else if pubsub.broker.Config.Transporter == "STAN" {
        pubsub.logger.Info("Transporter: NatsStreamingTransporter")
        transport = pubsub.createStanTransporter()
    } else if pubsub.broker.Config.Transporter == "TCP" {
        pubsub.logger.Info("Transporter: TCP")
        transport = pubsub.createTCPTransporter()
    } else if isNats(pubsub.broker.Config.Transporter) {
        pubsub.logger.Info("Transporter: NatsTransporter")
        transport = pubsub.createNatsTransporter()
    } else if isKafka(pubsub.broker.Config.Transporter) {
        pubsub.logger.Info("Transporter: KafkaTransporter")
        transport = pubsub.createKafkaTransporter()
    } else {
        pubsub.logger.Info("Transporter: Memory")
        transport = pubsub.createMemoryTransporter()
    }
    transport.SetPrefix(resolveNamespace(pubsub.broker.Config.Namespace))
    transport.SetNodeID(pubsub.broker.LocalNode().GetID())
    transport.SetSerializer(pubsub.serializer)
    return transport
}

func resolveNamespace(namespace string) string {
    if namespace != "" {
        return "MOL-" + namespace
    }
    return "MOL"
}

func (pubsub *PubSub) createMemoryTransporter() transit.Transport {
    pubsub.logger.Debug("createMemoryTransporter() ... ")
    logger := pubsub.logger.WithField("transport", "memory")
    mem := memory.Create(logger, &memory.SharedMemory{})
    return &mem
}

func (pubsub *PubSub) createKafkaTransporter() transit.Transport {
    pubsub.logger.Debug("createKafkaTransporter()")

    return kafka.CreateKafkaTransporter(kafka.KafkaOptions{
        Url:        pubsub.broker.Config.Transporter,
        Name:       pubsub.broker.LocalNode().GetID(),
        Logger:     pubsub.logger.WithField("transport", "kafka"),
        Serializer: pubsub.serializer,
    })
}

func (pubsub *PubSub) createNatsTransporter() transit.Transport {
    pubsub.logger.Debug("createNatsTransporter()")
    return nats.CreateNatsTransporter(nats.NATSOptions{
        URL:            pubsub.broker.Config.Transporter,
        Name:           pubsub.broker.LocalNode().GetID(),
        Logger:         pubsub.logger.WithField("transport", "nats"),
        Serializer:     pubsub.serializer,
        AllowReconnect: true,
        ReconnectWait:  time.Second * 2,
        MaxReconnect:   -1,
    })
}

func (pubsub *PubSub) createTCPTransporter() transit.Transport {
    pubsub.logger.Debug("createTCPTransporter()")
    tcpTransporter := tcp.CreateTCPTransporter(tcp.TCPOptions{
        // Enable UDP discovery
        UdpDiscovery: true,
        // Reusing UDP server socket
        UdpReuseAddr: true,

        // UDP port
        UdpPort: 4445,
        // UDP bind address (if empty + UdpMulticast is specified, bind on all interfaces)
        UdpBindAddress: "",
        // UDP sending period (seconds)
        UdpPeriod: 30,

        // Multicast address.
        UdpMulticast: "239.0.0.0",
        // Multicast TTL setting
        UdpMulticastTTL: 1,

        // Send broadcast (Boolean, String, Array<String>)
        UdpBroadcast: []string{},

        // TCP server port.  0 means random port
        Port: 0,
        // Static remote nodes address list (when UDP discovery is not available)
        Urls: []string{},
        // Use hostname as preffered connection address
        UseHostname: true,

        // Gossip sending period in seconds
        GossipPeriod: 2,
        // Maximum enabled outgoing connections. If reach, close the old connections
        MaxConnections: 32,
        // Maximum TCP packet size
        MaxPacketSize: 1 * 1024 * 1024,

        Namespace:  pubsub.broker.Config.Namespace,
        NodeId:     pubsub.broker.LocalNode().GetID(),
        Logger:     pubsub.logger.WithField("transport", "tcp"),
        Serializer: pubsub.serializer,
    })
    var transport transit.Transport = &tcpTransporter
    return transport
}

func (pubsub *PubSub) createStanTransporter() transit.Transport {
    broker := pubsub.broker
    logger := broker.Logger("transport", "stan")

    url := "stan://" + os.Getenv("STAN_HOST") + ":4222"
    clusterID := "test-cluster"
    localNodeID := broker.LocalNode().GetID()
    clientID := strings.ReplaceAll(localNodeID, ".", "_")

    options := nats.StanOptions{
        url,
        clusterID,
        clientID,
        logger,
        pubsub.serializer,
        func(message moleculer.Payload) bool {
            sender := message.Get("sender").String()
            return sender != localNodeID
        },
    }

    stanTransporter := nats.CreateStanTransporter(options)
    var transport transit.Transport = &stanTransporter
    return transport
}

type pendingRequest struct {
    context    moleculer.BrokerContext
    resultChan *chan moleculer.Payload
    timer      *time.Timer
}

func (pubsub *PubSub) checkMaxQueueSize() {
    //TODO: check transit.js line 524
}

// waitForNeighbours this function will wait for neighbour nodes or timeout if the expected number is not received after a time out.
func (pubsub *PubSub) waitForNeighbours() bool {
    if pubsub.broker.Config.DontWaitForNeighbours {
        return true
    }
    start := time.Now()
    for {
        expected := pubsub.expectedNeighbours()
        neighbours := pubsub.neighbours()
        if expected <= neighbours && (expected > 0 || neighbours > 0) {
            pubsub.logger.Debug("waitForNeighbours() - received info from all expected neighbours :) -> expected: ", expected)
            return true
        }
        if time.Since(start) > pubsub.neighboursTimeout {
            pubsub.logger.Warn("waitForNeighbours() - Time out ! did not receive info from all expected neighbours: ", expected, "  INFOs received: ", neighbours)
            return false
        }
        if !pubsub.isConnected {
            return false
        }
        time.Sleep(pubsub.broker.Config.WaitForNeighboursInterval)
    }
}

// DiscoverNodes will check if there are neighbours and return true if any are found ;).
func (pubsub *PubSub) DiscoverNodes() chan bool {
    result := make(chan bool)
    go func() {
        pubsub.DiscoverNode("")
        result <- pubsub.waitForNeighbours()
    }()
    return result
}

func (pubsub *PubSub) SendHeartbeat() {
    node := pubsub.broker.LocalNode().ExportAsMap()
    payload := map[string]interface{}{
        "sender": node["id"],
        "cpu":    node["cpu"],
        "cpuSeq": node["cpuSeq"],
        "ver":    version.MoleculerProtocol(),
    }
    message, err := pubsub.serializer.MapToPayload(&payload)
    if err != nil {
        pubsub.logger.Error("SendHeartbeat() Error serializing the payload: ", payload, " error: ", err)
        return
    }
    pubsub.transport.Publish("HEARTBEAT", "", message)
}

func (pubsub *PubSub) DiscoverNode(nodeID string) {
    payload := map[string]interface{}{
        "sender": pubsub.broker.LocalNode().GetID(),
        "ver":    version.MoleculerProtocol(),
    }
    message, err := pubsub.serializer.MapToPayload(&payload)
    if err != nil {
        pubsub.logger.Error("DiscoverNode() Error serializing the payload: ", payload, " error: ", err)
        return
    }
    pubsub.transport.Publish("DISCOVER", nodeID, message)

}

// Emit emit an event to all services that listens to this event.
func (pubsub *PubSub) Emit(context moleculer.BrokerContext) {
    targetNodeID := context.TargetNodeID()
    payload := context.AsMap()
    payload["sender"] = pubsub.broker.LocalNode().GetID()
    payload["ver"] = version.MoleculerProtocol()
    if context.Payload().Exists() {
        payload["dataType"] = DATATYPE_JSON
    } else {
        payload["dataType"] = DATATYPE_NULL
    }

    pubsub.logger.Trace("Emit() targetNodeID: ", targetNodeID, " payload: ", payload)

    message, err := pubsub.serializer.MapToPayload(&payload)
    if err != nil {
        pubsub.logger.Error("Emit() Error serializing the payload: ", payload, " error: ", err)
        panic(fmt.Errorf("Error trying to serialize the payload. Likely issues with the action params. Error: %s", err))
    }
    pubsub.transport.Publish("EVENT", targetNodeID, message)
}

func (pubsub *PubSub) Request(context moleculer.BrokerContext) chan moleculer.Payload {
    pubsub.checkMaxQueueSize()

    resultChan := make(chan moleculer.Payload)

    targetNodeID := context.TargetNodeID()
    payload := context.AsMap()
    payload["sender"] = pubsub.broker.LocalNode().GetID()
    payload["ver"] = version.MoleculerProtocol()
    if context.Payload().Exists() {
        payload["paramsType"] = DATATYPE_JSON
    } else {
        payload["paramsType"] = DATATYPE_NULL
    }

    pubsub.logger.Trace("Request() targetNodeID: ", targetNodeID, " payload: ", payload)

    message, err := pubsub.serializer.MapToPayload(&payload)
    if err != nil {
        pubsub.logger.Error("Request() Error serializing the payload: ", payload, " error: ", err)
        panic(fmt.Errorf("Error trying to serialize the payload. Likely issues with the action params. Error: %s", err))
    }

    pubsub.pendingRequestsMutex.Lock()
    pubsub.logger.Debug("Request() pending request id: ", context.ID(), " targetNodeId: ", context.TargetNodeID())
    pubsub.pendingRequests[context.ID()] = pendingRequest{
        context,
        &resultChan,

        time.AfterFunc(
            pubsub.broker.Config.RequestTimeout,
            pubsub.requestTimedOut(&resultChan, context)),
    }
    pubsub.pendingRequestsMutex.Unlock()

    pubsub.transport.Publish("REQ", targetNodeID, message)
    return resultChan
}

// validateVersion check that version of the message is correct.
func (pubsub *PubSub) validate(handler func(message moleculer.Payload)) transit.TransportHandler {
    return func(msg moleculer.Payload) {
        valid := pubsub.validateVersion(msg) && !pubsub.sameHost(msg)
        if valid {
            handler(msg)
        } else {
            pubsub.logger.Trace("Discarding invalid msg -> ", msg.Value())
        }
    }
}

func (pubsub *PubSub) sameHost(msg moleculer.Payload) bool {
    sender := msg.Get("sender").String()
    localNodeID := pubsub.broker.LocalNode().GetID()
    return sender == localNodeID
}

// validateVersion check that version of the message is correct.
func (pubsub *PubSub) validateVersion(msg moleculer.Payload) bool {
    msgVersion := msg.Get("ver").String()
    if msgVersion == version.MoleculerProtocol() {
        return true
    } else {
        pubsub.logger.Error("Discarding msg - wronging version: ", msgVersion, " expected: ", version.MoleculerProtocol(), " msg: ", msg)
        return false
    }
}

// reponseHandler responsible for whem a reponse arrives form a remote node.
func (pubsub *PubSub) reponseHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        pubsub.pendingRequestsMutex.Lock()
        defer pubsub.pendingRequestsMutex.Unlock()

        id := message.Get("id").String()
        sender := message.Get("sender").String()
        pubsub.logger.Debug("reponseHandler() - response arrived from nodeID: ", sender, " context id: ", id)

        request, exists := pubsub.pendingRequests[id]

        if !exists {
            pubsub.logger.Debug("reponseHandler() - discarding response -> request does not exist for id: ", id, " - message: ", message.Value())
            return
        }
        if request.resultChan == nil {
            pubsub.logger.Debug("reponseHandler() - discarding response -> request.resultChan is nil! - message: ", message.Value(), " pending context: ", request.context)
            return
        }

        request.timer.Stop()
        defer delete(pubsub.pendingRequests, id)
        var result moleculer.Payload
        if message.Get("success").Bool() {
            result = message.Get("data")
        } else {
            result = pubsub.parseError(message)
        }

        pubsub.logger.Trace("reponseHandler() id: ", id, " result: ", result)
        (*request.resultChan) <- result
    }
}

func (pubsub *PubSub) parseError(message moleculer.Payload) moleculer.Payload {
    if pubsub.isMoleculerJSError(message) {
        return payload.New(pubsub.moleculerJSError(message))
    }
    return payload.New(errors.New(message.Get("error").String()))
}

func (pubsub *PubSub) isMoleculerJSError(message moleculer.Payload) bool {
    return message.Get("error").Get("message").Exists()
}

func (pubsub *PubSub) moleculerJSError(message moleculer.Payload) error {
    msg := message.Get("error").Get("message").String()
    if message.Get("error").Get("stack").Exists() {
        pubsub.logger.Error(message.Get("error").Get("stack").Value())
    }
    return errors.New(msg)
}

func (pubsub *PubSub) sendResponse(context moleculer.BrokerContext, response moleculer.Payload) {
    targetNodeID := context.TargetNodeID()

    if targetNodeID == "" {
        panic(errors.New("sendResponse() targetNodeID is required !"))
    }

    pubsub.logger.Tracef("sendResponse() reponse type: %T ", response)

    values := make(map[string]interface{})
    values["sender"] = pubsub.broker.LocalNode().GetID()
    values["ver"] = version.MoleculerProtocol()
    values["id"] = context.ID()
    values["meta"] = context.Meta()

    if response.Exists() {
        values["dataType"] = DATATYPE_JSON
    } else {
        values["dataType"] = DATATYPE_NULL
    }

    if response.IsError() {
        var errMap map[string]string
        actionError, isActionError := response.Value().(ActionError)
        if isActionError {
            errMap = map[string]string{
                "message": actionError.Error(),
                "stack":   actionError.Stack(),
                "name":    "Error",
            }
        } else {
            errMap = map[string]string{
                "message": response.String(),
                "name":    "Error",
            }
        }
        values["success"] = false
        values["error"] = errMap
    } else {
        values["success"] = true
        values["data"] = response.Value()
    }

    message, err := pubsub.serializer.MapToPayload(&values)
    if err != nil {
        pubsub.logger.Error("sendResponse() Erro serializing the values: ", values, " error: ", err)
        panic(err)
    }

    pubsub.logger.Trace("sendResponse() targetNodeID: ", targetNodeID, " values: ", values, " message: ", message)

    pubsub.transport.Publish("RES", targetNodeID, message)
}

type ActionError interface {
    Error() string
    Stack() string
}

func parseParamsType(value moleculer.Payload) string {
    if !value.Exists() {
        return "1" //default : null
    }
    return value.String()
}

// requestHandler : handles when a request arrives on this node.
// 1: create a moleculer.Context from the message, the moleculer.Context contains the target action
// 2: invoke the action
// 3: send a response
func (pubsub *PubSub) requestHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        paramsType := parseParamsType(message.Get("paramsType"))
        if paramsType != "1" && paramsType != "2" {
            errMsg := "Expecting paramsType == 2 (JSON) or 1 (Null) - received: " + paramsType
            pubsub.logger.Error(errMsg)
            //currently there is only one serializer implementation.
            //once more serializers are added, pubsub.serializer must change and be dinamic based on paramsType
            pubsub.sendResponse(context.ActionContext(pubsub.broker, nil), payload.Error(errMsg))
            return
        }
        values := pubsub.serializer.PayloadToContextMap(message)
        context := context.ActionContext(pubsub.broker, values)
        result := <-pubsub.broker.ActionDelegate(context)
        pubsub.sendResponse(context, result)
    }
}

// eventHandler handles when a event msg is sent to this broker
func (pubsub *PubSub) eventHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        values := pubsub.serializer.PayloadToContextMap(message)
        context := context.EventContext(pubsub.broker, values)
        pubsub.broker.HandleRemoteEvent(context)
    }
}

// expectedNeighbours calculate the expected number of neighbours
func (pubsub *PubSub) expectedNeighbours() int64 {
    neighbours := pubsub.neighbours()
    if neighbours == 0 {
        return 0
    }

    var total int64
    pubsub.neighboursMutex.Lock()
    for _, value := range pubsub.knownNeighbours {
        total = total + value
    }
    pubsub.neighboursMutex.Unlock()
    return total / neighbours
}

// neighbours return the total number of known neighbours.
func (pubsub *PubSub) neighbours() int64 {
    return int64(len(pubsub.knownNeighbours))
}

func configToMap(config moleculer.Config) map[string]string {
    m := make(map[string]string)
    m["logLevel"] = config.LogLevel
    m["transporter"] = config.Transporter
    m["namespace"] = config.Namespace
    m["requestTimeout"] = config.RequestTimeout.String()
    return m
}

// broadcastNodeInfo send the local node info to the target node, if empty to all nodes.
func (pubsub *PubSub) broadcastNodeInfo(targetNodeID string) {
    payload := pubsub.broker.LocalNode().ExportAsMap()
    payload["sender"] = payload["id"]
    payload["neighbours"] = pubsub.neighbours()
    payload["ver"] = version.MoleculerProtocol()
    payload["config"] = configToMap(pubsub.broker.Config)
    payload["instanceID"] = pubsub.broker.InstanceID()

    message, _ := pubsub.serializer.MapToPayload(&payload)
    pubsub.transport.Publish("INFO", targetNodeID, message)
}

func (pubsub *PubSub) discoverHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        sender := message.Get("sender").String()
        if pubsub.brokerStarted {
            pubsub.broadcastNodeInfo(sender)
        } else {
            pubsub.broker.Bus().Once("$broker.started", func(...interface{}) {
                pubsub.broadcastNodeInfo(sender)
            })
        }
    }
}

func (pubsub *PubSub) emitRegistryEvent(command string) transit.TransportHandler {
    return func(message moleculer.Payload) {
        pubsub.logger.Trace("emitRegistryEvent() command: ", command, " message: ", message)
        pubsub.broker.Bus().EmitAsync("$registry.transit.message", []interface{}{command, message})
    }
}

func (pubsub *PubSub) SendPing() {
    ping := make(map[string]interface{})
    sender := pubsub.broker.LocalNode().GetID()
    ping["sender"] = sender
    ping["ver"] = version.MoleculerProtocol()
    ping["time"] = time.Now().Unix()
    ping["id"] = util.RandomString(12)
    pingMessage, _ := pubsub.serializer.MapToPayload(&ping)
    pubsub.transport.Publish("PING", sender, pingMessage)

}

func (pubsub *PubSub) pingHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        pong := make(map[string]interface{})
        sender := message.Get("sender").String()
        pong["sender"] = sender
        pong["ver"] = version.MoleculerProtocol()
        pong["time"] = message.Get("time").Int()
        pong["arrived"] = time.Now().Unix()
        pong["id"] = util.RandomString(12)

        pongMessage, _ := pubsub.serializer.MapToPayload(&pong)
        pubsub.transport.Publish("PONG", sender, pongMessage)
    }
}

func (pubsub *PubSub) pongHandler() transit.TransportHandler {
    return func(message moleculer.Payload) {
        now := time.Now().Unix()
        elapsed := now - message.Get("time").Int64()
        arrived := message.Get("arrived").Int64()
        timeDiff := math.Round(
            float64(now) - float64(arrived) - float64(elapsed)/2)

        mapValue := make(map[string]interface{})
        mapValue["nodeID"] = message.Get("sender").String()
        mapValue["elapsedTime"] = elapsed
        mapValue["timeDiff"] = timeDiff
        mapValue["id"] = message.Get("id").String()

        pubsub.broker.Bus().EmitAsync("$node.pong", []interface{}{mapValue})
    }
}

func (pubsub *PubSub) subscribe() {
    nodeID := pubsub.broker.LocalNode().GetID()
    pubsub.transport.Subscribe("RES", nodeID, pubsub.validate(pubsub.reponseHandler()))

    pubsub.transport.Subscribe("REQ", nodeID, pubsub.validate(pubsub.requestHandler()))
    //pubsub.transport.Subscribe("REQB", nodeID, pubsub.requestHandler())
    pubsub.transport.Subscribe("EVENT", nodeID, pubsub.validate(pubsub.eventHandler()))

    pubsub.transport.Subscribe("HEARTBEAT", "", pubsub.validate(pubsub.emitRegistryEvent("HEARTBEAT")))
    pubsub.transport.Subscribe("DISCONNECT", "", pubsub.validate(pubsub.emitRegistryEvent("DISCONNECT")))
    pubsub.transport.Subscribe("INFO", "", pubsub.validate(pubsub.emitRegistryEvent("INFO")))
    pubsub.transport.Subscribe("INFO", nodeID, pubsub.validate(pubsub.emitRegistryEvent("INFO")))
    pubsub.transport.Subscribe("DISCOVER", nodeID, pubsub.validate(pubsub.discoverHandler()))
    pubsub.transport.Subscribe("DISCOVER", "", pubsub.validate(pubsub.discoverHandler()))
    pubsub.transport.Subscribe("PING", nodeID, pubsub.validate(pubsub.pingHandler()))
    pubsub.transport.Subscribe("PONG", nodeID, pubsub.validate(pubsub.pongHandler()))

}

// sendDisconnect broadcast a DISCONNECT pkt to all nodes informing this one is stopping.
func (pubsub *PubSub) sendDisconnect() {
    payload := make(map[string]interface{})
    payload["sender"] = pubsub.broker.LocalNode().GetID()
    payload["ver"] = version.MoleculerProtocol()
    msg, _ := pubsub.serializer.MapToPayload(&payload)
    pubsub.transport.Publish("DISCONNECT", "", msg)
}

// Disconnect : disconnect the transit's  transporter.
func (pubsub *PubSub) Disconnect() chan error {
    endChan := make(chan error)
    if !pubsub.isConnected {
        endChan <- nil
        return endChan
    }
    pubsub.logger.Info("PubSub - Disconnecting transport...")
    pubsub.sendDisconnect()
    pubsub.isConnected = false
    return pubsub.transport.Disconnect()
}

// Connect : connect the transit with the transporter, subscribe to all events and start publishing its node info
func (pubsub *PubSub) Connect(registry moleculer.Registry) chan error {
    endChan := make(chan error)
    if pubsub.isConnected {
        endChan <- nil
        return endChan
    }
    pubsub.logger.Debug("PubSub - Connecting transport...")
    pubsub.transport = pubsub.createTransport()
    go func() {
        err := <-pubsub.transport.Connect(registry)
        if err == nil {
            pubsub.isConnected = true
            pubsub.logger.Debug("PubSub - Transport Connected!")

            pubsub.subscribe()

        } else {
            pubsub.logger.Debug("PubSub - Error connecting transport - error: ", err)
        }
        endChan <- err
    }()
    return endChan
}

func (pubsub *PubSub) Ready() {

}