docker/logstash/config.conf
input
{
kafka
{
bootstrap_servers => "kafka:9092"
topics => ["metrics"]
}
}
filter
{
grok
{
match => { "message" => "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.gauge-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-gauge:float}\s%{INT:time}"}
match => { "message" => "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.latency-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-latency:float}\s%{INT:time}"}
match => { "message" => "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.statsd\.derive-(?<application>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value-derive:float}\s%{INT:time}"}
match => { "message" => "(?<datacenter>[^\.]+?)\.(?<service>[^\.]+?)\.(?<host>[^\.]+?)\.(?<measurement>[^\s]+?)\s%{NUMBER:value:float}\s%{INT:time}"}
}
metrics
{
meter => "events"
add_tag => "metric"
}
}
output
{
if "metric" in [tags]
{
stdout {
codec => line
{
format => "rate: %{[events][rate_1m]}"
}
}
}
else if "_grokparsefailure" in [tags]
{
# Uncomment for optional debugging output
#stdout { codec => rubydebug }
}
else
{
influxdb
{
host => "influxdb"
db => "metrics"
measurement => "%{measurement}"
data_points =>
{
"time" => "%{time}"
"value" => "%{value}"
"value-derive" => "%{value-derive}"
"value-gauge" => "%{value-gauge}"
"value-latency" => "%{value-latency}"
"datacenter" => "%{datacenter}"
"service" => "%{service}"
"host" => "%{host}"
"application" => "%{application}"
}
send_as_tags => ["datacenter" , "service", "host","application"]
allow_time_override => true
time_precision => "s"
coerce_values =>
{
"value" => "float"
"value-gauge" => "float"
"value-latency" => "float"
"value-derive" => "float"
}
use_event_fields_for_data_points => true
exclude_fields => ["@timestamp", "@version", "sequence", "message", "type", "measurement"]
flush_size => 5000
workers => 1
retention_policy => "autogen"
}
# Uncomment for optional debugging output
# stdout { codec => rubydebug }
}
}