corvus-ch/rabbitmq-cli-consumer

View on GitHub
main.go

Summary

Maintainability
A
35 mins
Test Coverage
F
40%
package main

import (
    "context"
    "fmt"
    stdlog "log"
    "net/http"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "github.com/bketelsen/logr"
    "github.com/codegangsta/cli"
    "github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
    "github.com/corvus-ch/rabbitmq-cli-consumer/collector"
    "github.com/corvus-ch/rabbitmq-cli-consumer/command"
    "github.com/corvus-ch/rabbitmq-cli-consumer/config"
    "github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
    "github.com/corvus-ch/rabbitmq-cli-consumer/log"
    "github.com/corvus-ch/rabbitmq-cli-consumer/processor"
    "github.com/pkg/errors"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "github.com/streadway/amqp"
)

var (
    version = "dev"
    commit  = "none"
    date    = "unknown"
)

// flags is the list of global flags known to the application.
var flags []cli.Flag = []cli.Flag{
    cli.StringFlag{
        Name:   "url, u",
        Usage:  "Connect with RabbitMQ using `URL`",
        EnvVar: "AMQP_URL",
    },
    cli.StringFlag{
        Name:  "executable, e",
        Usage: "Location of executable",
    },
    cli.StringFlag{
        Name:  "configuration, c",
        Usage: "Location of configuration file",
    },
    cli.BoolFlag{
        Name:  "output, o",
        Usage: "Enable logging of output from executable",
    },
    cli.BoolFlag{
        Name:  "verbose, V",
        Usage: "Enable verbose mode (logs to stdout and stderr)",
    },
    cli.BoolFlag{
        Name:  "pipe, p",
        Usage: "Pipe the message via STDIN instead of passing it as an argument. The message metadata will be passed as JSON via fd3.",
    },
    cli.BoolFlag{
        Name:  "include, i",
        Usage: "Include metadata. Passes message as JSON data including headers, properties and message body. This flag will be ignored when `-pipe` is used.",
    },
    cli.BoolFlag{
        Name:  "strict-exit-code",
        Usage: "Strict exit code processing will rise a fatal error if exit code is different from allowed onces.",
    },
    cli.StringFlag{
        Name:  "queue-name, q",
        Usage: "Optional queue name to which can be passed in, without needing to define it in config, if set will override config queue name",
    },
    cli.BoolFlag{
        Name:  "no-datetime",
        Usage: "prevents the output of date and time in the logs.",
    },
    cli.BoolFlag{
        Name:  "no-declare",
        Usage: "prevents the queue from being declared.",
    },
    cli.BoolFlag{
        Name:  "metrics, m",
        Usage: "enables metric to be exposed.",
    },
    cli.StringFlag{
        Name:  "web.listen-address",
        Usage: "Address on which to expose metrics and web interface.",
        Value: ":9566",
    },
    cli.StringFlag{
        Name:  "web.telemetry-path",
        Usage: "Path under which to expose metrics.",
        Value: "/metrics",
    },
}

var ll logr.Logger

func main() {
    NewApp().Run(os.Args)
}

// NewApp creates a new application instance with just one single action.
func NewApp() *cli.App {
    app := cli.NewApp()
    app.Name = "rabbitmq-cli-consumer"
    app.Usage = "Consume RabbitMQ easily to any cli program"
    app.Authors = []cli.Author{
        {"Richard van den Brand", "richard@vandenbrand.org"},
        {"Christian Häusler", "haeusler.christian@mac.com"},
    }
    app.Version = fmt.Sprintf("%v, commit %v, built at %v", version, commit, date)
    app.Flags = flags
    app.Action = Action
    app.ExitErrHandler = ExitErrHandler

    return app
}

// Action is the function being run when the application gets executed.
func Action(c *cli.Context) error {
    cfg, err := LoadConfiguration(c)
    if err != nil {
        return err
    }

    l, infW, errW, err := log.NewFromConfig(cfg)
    if err != nil {
        return err
    }
    ll = l

    b := CreateBuilder(c.Bool("pipe"), cfg.RabbitMq.Compression, c.Bool("include"))
    builder, err := command.NewBuilder(b, c.String("executable"), c.Bool("output"), l, infW, errW)
    if err != nil {
        return fmt.Errorf("failed to create command builder: %v", err)
    }

    ack := acknowledger.NewFromConfig(cfg)
    p := processor.New(builder, ack, l)

    client, err := consumer.NewFromConfig(cfg, p, l)
    if err != nil {
        return err
    }
    defer client.Close()

    errs := make(chan error)

    if c.Bool("metrics") {
        ll.Infof("Registering metrics server at %v", c.String("web.listen-address"))
        go func() {
            errs <- setupAndServeMetrics(c.String("web.listen-address"), c.String("web.telemetry-path"))
        }()
    } else {
        ll.Infof("Metrics disabled.")
    }

    go func() {
        errs <- consume(client, l)
    }()

    return <-errs
}

func setupAndServeMetrics(addr string, path string) error {
    srv := &http.Server{
        Addr: addr,
        // Good practice to set timeouts to avoid Slowloris attacks.
        WriteTimeout: time.Second * 15,
        ReadTimeout:  time.Second * 15,
        IdleTimeout:  time.Second * 60,
    }

    prometheus.MustRegister(collector.ProcessCounter)
    prometheus.MustRegister(collector.ProcessDuration)
    prometheus.MustRegister(collector.MessageDuration)

    http.Handle(path, promhttp.Handler())
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte(`<html>
             <head><title>rabbitmq-cli-consumer</title></head>
             <body>
             <h1>rabbitmq-cli-consumer</h1>
             <p><a href='` + path + `'>Metrics</a></p>
             </body>
             </html>`))
    })

    if err := srv.ListenAndServe(); err != nil {
        return errors.Wrap(err, "failed to serve metrics")
    }

    return nil
}

func consume(client *consumer.Consumer, l logr.Logger) error {
    done := make(chan error)
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGTERM)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        done <- client.Consume(ctx)
    }()

    select {
    case <-sig:
        l.Info("Cancel consumption of messages.")
        cancel()
        return checkConsumeError(<-done)

    case err := <-done:
        return checkConsumeError(err)
    }
}

func checkConsumeError(err error) error {
    switch err.(type) {
    case *amqp.Error:
        if strings.Contains(err.Error(), "Exception (320) Reason:") {
            return cli.NewExitError(fmt.Sprintf("connection closed: %v", err.(*amqp.Error).Reason), 10)
        }
        return err

    case *processor.AcknowledgmentError:
        return cli.NewExitError(err, 11)

    default:
        return err
    }
}

// ExitErrHandler is a global error handler registered with the application.
func ExitErrHandler(_ *cli.Context, err error) {
    if err == nil {
        return
    }

    code := 1

    if err.Error() != "" {
        if ll != nil {
            ll.Error(err)
        } else {
            stdlog.Printf("%+v\n", err)
        }
    }

    if exitErr, ok := err.(cli.ExitCoder); ok {
        code = exitErr.ExitCode()
    }

    os.Exit(code)
}

// CreateBuilder creates a new empty instance of command.Builder.
// The result must be passed to command.NewBuilder before it is ready to be used.
// If pipe is set to true, compression and metadata are ignored.
func CreateBuilder(pipe, compression, metadata bool) command.Builder {
    if pipe {
        return &command.PipeBuilder{}
    }

    return &command.ArgumentBuilder{
        Compressed:   compression,
        WithMetadata: metadata,
    }
}

// LoadConfiguration checks the configuration flags, loads the config from file and updates the config according the flags.
func LoadConfiguration(c *cli.Context) (*config.Config, error) {
    file := c.String("configuration")
    url := c.String("url")
    queue := c.String("queue-name")

    if file == "" && url == "" && queue == "" && c.String("executable") == "" {
        cli.ShowAppHelp(c)
        return nil, cli.NewExitError("", 1)
    }

    cfg, err := configuration(file)
    if err != nil {
        return nil, fmt.Errorf("failed parsing configuration: %s", err)
    }

    if len(url) > 0 {
        cfg.RabbitMq.AmqpUrl = url
    }

    if queue != "" {
        cfg.RabbitMq.Queue = queue
    }

    if c.IsSet("no-datetime") {
        cfg.Logs.NoDateTime = c.Bool("no-datetime")
    }

    if c.IsSet("verbose") {
        cfg.Logs.Verbose = c.Bool("verbose")
    }

    if c.IsSet("strict-exit-code") {
        cfg.RabbitMq.Stricfailure = c.Bool("strict-exit-code")
    }

    if c.IsSet("no-declare") {
        cfg.QueueSettings.Nodeclare = c.Bool("no-declare")
    }

    return cfg, nil
}

func configuration(file string) (*config.Config, error) {
    if file == "" {
        return config.CreateFromString("")
    }

    return config.LoadAndParse(file)
}