fabric8io/fluent-plugin-kubernetes

View on GitHub
lib/fluent/plugin/out_kubernetes.rb

Summary

Maintainability
A
2 hrs
Test Coverage
#
# Fluentd Kubernetes Output Plugin - Enrich Fluentd events with Kubernetes
# metadata
#
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class Fluent::KubernetesOutput < Fluent::Output
  Fluent::Plugin.register_output('kubernetes', self)

  config_param :container_id, :string
  config_param :tag, :string
  config_param :kubernetes_pod_regex, :string, default: '^[^_]+_([^\.]+)\.[^_]+_([^\.]+)\.([^\.]+)'

  def initialize
    super
  end

  def configure(conf)
    super

    require 'docker'
    require 'json'
  end

  def emit(tag, es, chain)
    es.each do |time,record|
      Fluent::Engine.emit('kubernetes',
                          time,
                          enrich_record(tag, record))
    end

    chain.next
  end

  private

  def interpolate(tag, str)
    tag_parts = tag.split('.')

    str.gsub(/\$\{tag_parts\[(\d+)\]\}/) { |m| tag_parts[$1.to_i] }
  end

  def enrich_record(tag, record)
    id = interpolate(tag, @container_id)
    if !id.empty?
      record['container_id'] = id
      record = enrich_container_data(id, record)
      record = merge_json_log(record)
    end
    record
  end

  def enrich_container_data(id, record)
    container = Docker::Container.get(id)
    if container
      container_name = container.json['Name']
      if container_name
        record["container_name"] = container_name[1..-1] if container_name[0] == '/'
        regex = Regexp.new(@kubernetes_pod_regex)
        match = container_name.match(regex)
        if match
          pod_container_name, pod_name, pod_namespace =
            match.captures
          record["pod_namespace"] = pod_namespace
          record["pod"] = pod_name
          record["pod_container"] = pod_container_name
        end
      end
    end
    record
  end

  def merge_json_log(record)
    if record.has_key?('log')
      log = record['log'].strip
      if log[0].eql?('{') && log[-1].eql?('}')
        begin
          parsed_log = JSON.parse(log)
          record = record.merge(parsed_log)
          unless parsed_log.has_key?('log')
            record.delete('log')
          end
        rescue JSON::ParserError
        end
      end
    end
    record
  end

end