
View on GitHub


2 hrs
Test Coverage
 * Copyright 2019 Comcast Cable Communications Management, LLC
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package main

import (
    olog "log"
    _ "net/http/pprof"


    db "github.com/xmidt-org/codex-db"
    dbretry "github.com/xmidt-org/codex-db/retry"
    webhook "github.com/xmidt-org/wrp-listener"
    secretGetter "github.com/xmidt-org/wrp-listener/secret"

const (
    applicationName, apiBase = "svalinn", "/api/v1"

var (
    GitCommit = "undefined"
    Version   = "undefined"
    BuildTime = "undefined"

type SvalinnConfig struct {
    Endpoint          string
    Health            HealthConfig
    Webhook           WebhookConfig
    Secret            SecretConfig
    RequestParser     requestParser.Config
    BatchInserter     batchInserter.Config
    Db                cassandra.Config
    InsertRetries     backoff.ExponentialBackOff
    BlacklistInterval time.Duration

type WebhookConfig struct {
    RegistrationInterval time.Duration
    Timeout              time.Duration
    RegistrationURL      string
    HostToRegister       string
    Request              webhook.W
    JWT                  acquire.RemoteBearerTokenAcquirerOptions
    Basic                string

type SecretConfig struct {
    Header    string
    Delimiter string

type Svalinn struct {
    done          <-chan struct{}
    shutdown      chan struct{}
    waitGroup     *sync.WaitGroup
    requestParser *requestParser.RequestParser
    batchInserter *batchInserter.BatchInserter
    registerer    *webhookClient.PeriodicRegisterer

type database struct {
    dbClose            func() error
    blacklistStop      chan struct{}
    blacklistRefresher blacklist.List
    inserter           db.Inserter
    health             *health.Health

type HealthConfig struct {
    Port     string
    Endpoint string

func SetLogger(logger log.Logger) func(delegate http.Handler) http.Handler {
    return func(delegate http.Handler) http.Handler {
        return http.HandlerFunc(
            func(w http.ResponseWriter, r *http.Request) {
                ctx := r.WithContext(logging.WithLogger(r.Context(),
                    log.With(logger, "requestHeaders", r.Header, "requestURL", r.URL.EscapedPath(), "method", r.Method)))
                delegate.ServeHTTP(w, ctx)

func GetLogger(ctx context.Context) log.Logger {
    return log.With(logging.GetLogger(ctx), "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)

//nolint:funlen // this will be fixed with uber fx
func svalinn(arguments []string) {
    start := time.Now()

    var (
        f, v                                = pflag.NewFlagSet(applicationName, pflag.ContinueOnError), viper.New()
        logger, metricsRegistry, codex, err = server.Initialize(applicationName, arguments, f, v, cassandra.Metrics, dbretry.Metrics, requestParser.Metrics, batchInserter.Metrics, basculechecks.Metrics, webhookClient.Metrics, basculemetrics.Metrics, Metrics)

    if parseErr, done := printVersion(f, arguments); done {
        // if we're done, we're exiting no matter what
        exitIfError(logger, emperror.Wrap(parseErr, "failed to parse arguments"))

    exitIfError(logger, emperror.Wrap(err, "unable to initialize viper"))
    logging.Info(logger).Log(logging.MessageKey(), "Successfully loaded config file", "configurationFile", v.ConfigFileUsed())

    // set everything up
    config := new(SvalinnConfig)

    if config.Webhook.Request.Config.URL == "" {
        config.Webhook.Request.Config.URL = codex.Server
    config.Webhook.Request.Config.URL = config.Webhook.Request.Config.URL + apiBase + config.Endpoint

    cipherOptions, err := voynicrypto.FromViper(v)
    exitIfError(logger, emperror.Wrap(err, "failed to initialize cipher options"))
    encrypter, err := cipherOptions.GetEncrypter(logger)
    exitIfError(logger, emperror.Wrap(err, "failed to load cipher encrypter"))

    secretGetter := secretGetter.NewConstantSecret(config.Webhook.Request.Config.Secret)

    var m *basculemetrics.AuthValidationMeasures

    if metricsRegistry != nil {
        m = basculemetrics.NewAuthValidationMeasures(metricsRegistry)
    listener := basculemetrics.NewMetricListener(m)

    svalinnHandler := alice.New()

    if config.Secret.Header != "" && config.Webhook.Request.Config.Secret != "" {
        htf, err := hashTokenFactory.New("Sha1", sha1.New, secretGetter)
        exitIfError(logger, emperror.Wrap(err, "failed to create hashTokenFactory"))

        authConstructor := basculehttp.NewConstructor(
            basculehttp.WithTokenFactory("Sha1", htf),

        svalinnHandler = alice.New(SetLogger(logger), authConstructor, basculehttp.NewListenerDecorator(listener))
    router := mux.NewRouter()

    database, err := setupDb(config, logger, metricsRegistry)
    exitIfError(logger, emperror.Wrap(err, "failed to initialize database connection"))

    s := &Svalinn{}
    svalinnMeasures := NewMeasures(metricsRegistry)
    s.batchInserter, err = batchInserter.NewBatchInserter(config.BatchInserter, logger, metricsRegistry, database.inserter, svalinnMeasures)
    exitIfError(logger, emperror.Wrap(err, "failed to create batch inserter"))

    s.requestParser, err = requestParser.NewRequestParser(config.RequestParser, logger, metricsRegistry, s.batchInserter, database.blacklistRefresher, encrypter, svalinnMeasures)
    exitIfError(logger, emperror.Wrap(err, "failed to create request parser"))

    app := &App{
        logger: logger,
        parser: s.requestParser,

    // MARK: Actual server logic
    router.Handle(apiBase+config.Endpoint, svalinnHandler.ThenFunc(app.handleWebhook))
    startHealth(logger, database.health, config)
    // if the register interval is 0 and these values aren't set, don't register
    if config.Webhook.RegistrationInterval > 0 && config.Webhook.RegistrationURL != "" && config.Webhook.Request.Config.URL != "" && len(config.Webhook.Request.Events) > 0 {
        acquirer, err := determineTokenAcquirer(config.Webhook)
        if err != nil {
            logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "Failed to determine token acquirer", logging.ErrorKey(), err.Error())
            // TODO: we shouldn't continue trying to set the webhook registerer up if we fail
        basicConfig := webhookClient.BasicConfig{
            Timeout:         config.Webhook.Timeout,
            RegistrationURL: config.Webhook.RegistrationURL,
            Request:         config.Webhook.Request,
        registerer, err := webhookClient.NewBasicRegisterer(acquirer, secretGetter, basicConfig)
        if err != nil {
            logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "Failed to create basic registerer", logging.ErrorKey(), err.Error())
            // TODO: we shouldn't continue trying to set the webhook registerer up if we fail
        periodicRegisterer, err := webhookClient.NewPeriodicRegisterer(registerer, config.Webhook.RegistrationInterval, logger, webhookClient.NewMeasures(metricsRegistry))
        if err != nil {
            logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "Failed to create periodic registerer", logging.ErrorKey(), err.Error())

        s.registerer = periodicRegisterer

    // MARK: Starting the server
    var runnable concurrent.Runnable
    _, runnable, s.done = codex.Prepare(logger, nil, metricsRegistry, router)
    s.waitGroup, s.shutdown, err = concurrent.Execute(runnable)
    exitIfError(logger, emperror.Wrap(err, "unable to start device manager"))

    logging.Info(logger).Log(logging.MessageKey(), fmt.Sprintf("%s is up and running!", applicationName), "elapsedTime", time.Since(start))

    waitUntilShutdown(logger, s, database)
    logging.Info(logger).Log(logging.MessageKey(), "Svalinn has shut down")

func printVersion(f *pflag.FlagSet, arguments []string) (error, bool) {
    printVer := f.BoolP("version", "v", false, "displays the version number")
    if err := f.Parse(arguments); err != nil {
        return err, true

    if *printVer {
        return nil, true
    return nil, false

func printVersionInfo(writer io.Writer) {
    fmt.Fprintf(writer, "%s:\n", applicationName)
    fmt.Fprintf(writer, "  version: \t%s\n", Version)
    fmt.Fprintf(writer, "  go version: \t%s\n", runtime.Version())
    fmt.Fprintf(writer, "  built time: \t%s\n", BuildTime)
    fmt.Fprintf(writer, "  git commit: \t%s\n", GitCommit)
    fmt.Fprintf(writer, "  os/arch: \t%s/%s\n", runtime.GOOS, runtime.GOARCH)

func exitIfError(logger log.Logger, err error) {
    if err != nil {
        if logger != nil {
            logging.Error(logger, emperror.Context(err)...).Log(logging.ErrorKey(), err.Error())
        fmt.Fprintf(os.Stderr, "Error: %#v\n", err.Error())

func setupDb(config *SvalinnConfig, logger log.Logger, metricsRegistry xmetrics.Registry) (database, error) {
    var (
        d database
    d.health = health.New()
    d.health.Logger = healthlogger.NewHealthLogger(logger)

    dbConn, err := cassandra.CreateDbConnection(config.Db, metricsRegistry, d.health)
    if err != nil {
        return database{}, err

    d.dbClose = dbConn.Close

    if config.InsertRetries.MaxElapsedTime >= 0 {
        d.inserter = dbretry.CreateRetryInsertService(
    } else {
        d.inserter = dbConn

    d.blacklistStop = make(chan struct{}, 1)
    blacklistConfig := blacklist.RefresherConfig{
        Logger:         logger,
        UpdateInterval: config.BlacklistInterval,
    d.blacklistRefresher = blacklist.NewListRefresher(blacklistConfig, dbConn, d.blacklistStop)
    return d, nil


func startHealth(logger log.Logger, health *health.Health, config *SvalinnConfig) {
    if config.Health.Endpoint != "" && config.Health.Port != "" {
        err := health.Start()
        if err != nil {
            logging.Error(logger).Log(logging.MessageKey(), "failed to start health", logging.ErrorKey(), err)
        // router.Handler(config.Health.Address, handlers)
        http.HandleFunc(config.Health.Endpoint, handlers.NewJSONHandlerFunc(health, nil))
        go func() {
            olog.Fatal(http.ListenAndServe(config.Health.Port, nil))

func waitUntilShutdown(logger log.Logger, s *Svalinn, database database) {
    signals := make(chan os.Signal, 10)
    signal.Notify(signals, os.Kill, os.Interrupt) //nolint:staticcheck // this will be fixed with uber fx
    for exit := false; !exit; {
        select {
        case s := <-signals:
            logging.Error(logger).Log(logging.MessageKey(), "exiting due to signal", "signal", s)
            exit = true
        case <-s.done:
            logging.Error(logger).Log(logging.MessageKey(), "one or more servers exited")
            exit = true

    err := database.health.Stop()
    if err != nil {
        logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "stopping health endpoint failed",
            logging.ErrorKey(), err.Error())
    err = database.dbClose()
    if err != nil {
        logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "closing database threads failed",
            logging.ErrorKey(), err.Error())

func main() {