dotcloud/docker

View on GitHub
daemon/logger/gelf/gelf.go

Summary

Maintainability
B
4 hrs
Test Coverage
// Package gelf provides the log driver for forwarding server logs to
// endpoints that support the Graylog Extended Log Format.
package gelf // import "github.com/docker/docker/daemon/logger/gelf"

import (
    "compress/flate"
    "encoding/json"
    "fmt"
    "net"
    "net/url"
    "strconv"
    "time"

    "github.com/Graylog2/go-gelf/gelf"
    "github.com/docker/docker/daemon/logger"
    "github.com/docker/docker/daemon/logger/loggerutils"
)

const name = "gelf"

type gelfLogger struct {
    writer   gelf.Writer
    info     logger.Info
    hostname string
    rawExtra json.RawMessage
}

func init() {
    if err := logger.RegisterLogDriver(name, New); err != nil {
        panic(err)
    }
    if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
        panic(err)
    }
}

// New creates a gelf logger using the configuration passed in on the
// context. The supported context configuration variable is gelf-address.
func New(info logger.Info) (logger.Logger, error) {
    // parse gelf address
    address, err := parseAddress(info.Config["gelf-address"])
    if err != nil {
        return nil, err
    }

    // collect extra data for GELF message
    hostname, err := info.Hostname()
    if err != nil {
        return nil, fmt.Errorf("gelf: cannot access hostname to set source field")
    }

    // parse log tag
    tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
    if err != nil {
        return nil, err
    }

    extra := map[string]interface{}{
        "_container_id":   info.ContainerID,
        "_container_name": info.Name(),
        "_image_id":       info.ContainerImageID,
        "_image_name":     info.ContainerImageName,
        "_command":        info.Command(),
        "_tag":            tag,
        "_created":        info.ContainerCreated,
    }

    extraAttrs, err := info.ExtraAttributes(func(key string) string {
        if key[0] == '_' {
            return key
        }
        return "_" + key
    })
    if err != nil {
        return nil, err
    }

    for k, v := range extraAttrs {
        extra[k] = v
    }

    rawExtra, err := json.Marshal(extra)
    if err != nil {
        return nil, err
    }

    var gelfWriter gelf.Writer
    if address.Scheme == "udp" {
        gelfWriter, err = newGELFUDPWriter(address.Host, info)
        if err != nil {
            return nil, err
        }
    } else if address.Scheme == "tcp" {
        gelfWriter, err = newGELFTCPWriter(address.Host, info)
        if err != nil {
            return nil, err
        }
    }

    return &gelfLogger{
        writer:   gelfWriter,
        info:     info,
        hostname: hostname,
        rawExtra: rawExtra,
    }, nil
}

// create new TCP gelfWriter
func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
    gelfWriter, err := gelf.NewTCPWriter(address)
    if err != nil {
        return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
    }

    if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
        i, err := strconv.Atoi(v)
        if err != nil || i < 0 {
            return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
        }
        gelfWriter.MaxReconnect = i
    }

    if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
        i, err := strconv.Atoi(v)
        if err != nil || i < 0 {
            return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
        }
        gelfWriter.ReconnectDelay = time.Duration(i)
    }

    return gelfWriter, nil
}

// create new UDP gelfWriter
func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
    gelfWriter, err := gelf.NewUDPWriter(address)
    if err != nil {
        return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
    }

    if v, ok := info.Config["gelf-compression-type"]; ok {
        switch v {
        case "gzip":
            gelfWriter.CompressionType = gelf.CompressGzip
        case "zlib":
            gelfWriter.CompressionType = gelf.CompressZlib
        case "none":
            gelfWriter.CompressionType = gelf.CompressNone
        default:
            return nil, fmt.Errorf("gelf: invalid compression type %q", v)
        }
    }

    if v, ok := info.Config["gelf-compression-level"]; ok {
        val, err := strconv.Atoi(v)
        if err != nil {
            return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err)
        }
        gelfWriter.CompressionLevel = val
    }

    return gelfWriter, nil
}

func (s *gelfLogger) Log(msg *logger.Message) error {
    if len(msg.Line) == 0 {
        return nil
    }

    level := gelf.LOG_INFO
    if msg.Source == "stderr" {
        level = gelf.LOG_ERR
    }

    m := gelf.Message{
        Version:  "1.1",
        Host:     s.hostname,
        Short:    string(msg.Line),
        TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
        Level:    int32(level),
        RawExtra: s.rawExtra,
    }
    logger.PutMessage(msg)

    if err := s.writer.WriteMessage(&m); err != nil {
        return fmt.Errorf("gelf: cannot send GELF message: %v", err)
    }
    return nil
}

func (s *gelfLogger) Close() error {
    return s.writer.Close()
}

func (s *gelfLogger) Name() string {
    return name
}

// ValidateLogOpt looks for gelf specific log option gelf-address.
func ValidateLogOpt(cfg map[string]string) error {
    address, err := parseAddress(cfg["gelf-address"])
    if err != nil {
        return err
    }

    for key, val := range cfg {
        switch key {
        case "gelf-address":
        case "tag":
        case "labels":
        case "labels-regex":
        case "env":
        case "env-regex":
        case "gelf-compression-level":
            if address.Scheme != "udp" {
                return fmt.Errorf("compression is only supported on UDP")
            }
            i, err := strconv.Atoi(val)
            if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
                return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
            }
        case "gelf-compression-type":
            if address.Scheme != "udp" {
                return fmt.Errorf("compression is only supported on UDP")
            }
            switch val {
            case "gzip", "zlib", "none":
            default:
                return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
            }
        case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay":
            if address.Scheme != "tcp" {
                return fmt.Errorf("%q is only valid for TCP", key)
            }
            i, err := strconv.Atoi(val)
            if err != nil || i < 0 {
                return fmt.Errorf("%q must be a positive integer", key)
            }
        default:
            return fmt.Errorf("unknown log opt %q for gelf log driver", key)
        }
    }

    return nil
}

func parseAddress(address string) (*url.URL, error) {
    if address == "" {
        return nil, fmt.Errorf("gelf-address is a required parameter")
    }
    addr, err := url.Parse(address)
    if err != nil {
        return nil, err
    }

    if addr.Scheme != "udp" && addr.Scheme != "tcp" {
        return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP")
    }

    if _, _, err = net.SplitHostPort(addr.Host); err != nil {
        return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port")
    }

    return addr, nil
}