mre/kafka-influxdb

View on GitHub
docker/logstash/config.conf

Summary

Maintainability
Test Coverage
input
{
  kafka
  {
    bootstrap_servers => "kafka:9092"
    topics => ["metrics"]
  }
}
filter
{
  grok
  {
    match => { "message" =>  "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.gauge-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-gauge:float}\s%{INT:time}"}
    match => { "message" =>  "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.latency-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-latency:float}\s%{INT:time}"}
    match => { "message" =>  "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.derive-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-derive:float}\s%{INT:time}"}
    match => { "message" =>  "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value:float}\s%{INT:time}"}
  }
  metrics
  {
    meter => "events"
    add_tag => "metric"
  }
}
output
{
  if "metric" in [tags]
  {
    stdout {
      codec => line
      {
        format => "rate: %{[events][rate_1m]}"
      }
    }
  }
  else if "_grokparsefailure" in [tags]
  {
    # Uncomment for optional debugging output
    #stdout { codec => rubydebug }
  }
  else
  {
    influxdb
    {
      host => "influxdb"
      db => "metrics"
      measurement => "%{measurement}"
      data_points =>
      {
        "time" => "%{time}"
        "value" => "%{value}"
        "value-derive" => "%{value-derive}"
        "value-gauge" => "%{value-gauge}"
        "value-latency" => "%{value-latency}"
        "datacenter" => "%{datacenter}"
        "service" => "%{service}"
        "host" => "%{host}"
        "application" => "%{application}"
      }
      send_as_tags => ["datacenter" , "service", "host","application"]
      allow_time_override => true
      time_precision => "s"
      coerce_values =>
      {
       "value" => "float"
       "value-gauge" => "float"
       "value-latency" => "float"
       "value-derive" => "float"
      }
      use_event_fields_for_data_points => true
      exclude_fields => ["@timestamp", "@version", "sequence", "message", "type", "measurement"]
      flush_size => 5000
      workers => 1
      retention_policy => "autogen"
    }
    # Uncomment for optional debugging output
    # stdout { codec => rubydebug }
  }
}