Bnei-Baruch/mdb

View on GitHub
events/setup.go

Summary

Maintainability
A
0 mins
Test Coverage
package events

import (
    "context"

    log "github.com/Sirupsen/logrus"
    "github.com/nats-io/go-nats-streaming"
    "github.com/spf13/viper"
)

var eventHandlers []EventHandler

func InitEmitter() (*BufferedEmitter, error) {
    // Setup events handlers
    eventHandlers = make([]EventHandler, 0)
    hNames := viper.GetStringSlice("events.handlers")
    if len(hNames) > 0 {
        for i := range hNames {
            switch hNames[i] {
            case "logger":
                eventHandlers = append(eventHandlers, new(LoggerEventHandler))
            case "nats":
                log.Info("Initializing nats streaming event handler")
                h, err := NewNatsStreamingEventHandler(
                    viper.GetString("nats.subject"),
                    viper.GetString("nats.cluster-id"),
                    viper.GetString("nats.client-id"),
                    stan.NatsURL(viper.GetString("nats.url")),
                    stan.PubAckWait(viper.GetDuration("nats.pub-ack-wait")),
                )
                if err != nil {
                    log.Errorf("Error connecting to nats streaming server: %s", err)
                } else {
                    eventHandlers = append(eventHandlers, h)
                }
            default:
                log.Warnf("Unknown event handler: %s", hNames[i])
            }
        }
    }

    return NewBufferedEmitter(viper.GetInt("events.emitter-size"), eventHandlers...)
}

func CloseEmitter(ctx context.Context) {
    log.Infof("Closing event handlers")
    for i := range eventHandlers {
        if err := eventHandlers[i].Close(ctx); err != nil {
            log.Error("Close event handler:", err)
        }
    }
}