fabric8io/fluent-plugin-kubernetes_metadata_filter

View on GitHub
lib/fluent/plugin/filter_kubernetes_metadata.rb

Summary

Maintainability
D
2 days
Test Coverage
# frozen_string_literal: true

#
# Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with
# Kubernetes metadata
#
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require_relative 'kubernetes_metadata_cache_strategy'
require_relative 'kubernetes_metadata_common'
require_relative 'kubernetes_metadata_stats'
require_relative 'kubernetes_metadata_util'
require_relative 'kubernetes_metadata_watch_namespaces'
require_relative 'kubernetes_metadata_watch_pods'

require 'fluent/plugin/filter'
require 'resolv'

module Fluent::Plugin
  class KubernetesMetadataFilter < Fluent::Plugin::Filter
    K8_POD_CA_CERT = 'ca.crt'
    K8_POD_TOKEN = 'token'

    include KubernetesMetadata::CacheStrategy
    include KubernetesMetadata::Common
    include KubernetesMetadata::WatchNamespaces
    include KubernetesMetadata::WatchPods

    Fluent::Plugin.register_filter('kubernetes_metadata', self)

    config_param :kubernetes_url, :string, default: nil
    config_param :cache_size, :integer, default: 1000
    config_param :cache_ttl, :integer, default: 60 * 60
    config_param :watch, :bool, default: true
    config_param :apiVersion, :string, default: 'v1'
    config_param :client_cert, :string, default: nil
    config_param :client_key, :string, default: nil
    config_param :ca_file, :string, default: nil
    config_param :verify_ssl, :bool, default: true
    config_param :open_timeout, :integer, default: 3
    config_param :read_timeout, :integer, default: 10

    REGEX_VAR_LOG_PODS = '(var\.log\.pods)\.(?<namespace>[^_]+)_(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<pod_uuid>[a-z0-9-]*)\.(?<container_name>.+)\..*\.log$'
    REGEX_VAR_LOG_CONTAINERS = '(var\.log\.containers)\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$'

    #tag_to_kubernetes_name_regexp which must include named capture groups:
    #  namespace            - The namespace in which the pod is deployed
    #  pod_name             - The pod name
    #  container_name       - The name of the container
    #  pod_uuid (/var/log/pods) | docker_id (/var/log/containers) - Unique identifier used in caching of either pod_uuid or the container hash
    config_param :tag_to_kubernetes_name_regexp, :string, default: "(#{REGEX_VAR_LOG_PODS}|#{REGEX_VAR_LOG_CONTAINERS})"

    config_param :bearer_token_file, :string, default: nil
    config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount'

    config_param :annotation_match, :array, default: []
    config_param :stats_interval, :integer, default: 30
    config_param :allow_orphans, :bool, default: true
    config_param :orphaned_namespace_name, :string, default: '.orphaned'
    config_param :orphaned_namespace_id, :string, default: 'orphaned'
    config_param :lookup_from_k8s_field, :bool, default: true
    # if `ca_file` is for an intermediate CA, or otherwise we do not have the root CA and want
    # to trust the intermediate CA certs we do have, set this to `true` - this corresponds to
    # the openssl s_client -partial_chain flag and X509_V_FLAG_PARTIAL_CHAIN
    config_param :ssl_partial_chain, :bool, default: false
    config_param :skip_labels, :bool, default: false
    config_param :skip_pod_labels, :bool, default: false
    config_param :skip_namespace_labels, :bool, default: false
    config_param :skip_container_metadata, :bool, default: false
    config_param :skip_master_url, :bool, default: false
    config_param :skip_namespace_metadata, :bool, default: false
    config_param :include_ownerrefs_metadata, :bool, default: false

    # A classname in the form of Test::APIAdapter which will try
    # to be resolved from a relative named file 'test_api_adapter'
    config_param :test_api_adapter, :string, default: nil

    # The time interval in seconds for retry backoffs when watch connections fail.
    config_param :watch_retry_interval, :integer, default: 1
    # The base number of exponential backoff for retries.
    config_param :watch_retry_exponential_backoff_base, :integer, default: 2
    # The maximum number of times to retry pod and namespace watches.
    config_param :watch_retry_max_times, :integer, default: 10

    def fetch_pod_metadata(namespace_name, pod_name)
      log.trace("fetching pod metadata: #{namespace_name}/#{pod_name}")
      options = {
        resource_version: '0' # Fetch from API server cache instead of etcd quorum read
      }
      pod_object = @client.get_pod(pod_name, namespace_name, options)
      log.trace("raw metadata for #{namespace_name}/#{pod_name}: #{pod_object}")
      metadata = parse_pod_metadata(pod_object)
      @stats.bump(:pod_cache_api_updates)
      log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}")
      @cache[metadata['pod_id']] = metadata
      rescue KubeException => e
        if e.error_code == 401
          # recreate client to refresh token
          log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
          create_client()
        elsif e.error_code == 404
          log.debug "Encountered '404 Not Found' exception, pod not found"
          @stats.bump(:pod_cache_api_nil_error)
        else
          log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
          @stats.bump(:pod_cache_api_nil_error)
        end
        {}
      rescue StandardError => e
        @stats.bump(:pod_cache_api_nil_error)
        log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
        {}
      end

    def dump_stats
      @curr_time = Time.now
      return if @curr_time.to_i - @prev_time.to_i < @stats_interval

      @prev_time = @curr_time
      @stats.set(:pod_cache_size, @cache.count)
      @stats.set(:namespace_cache_size, @namespace_cache.count) if @namespace_cache
      log.info(@stats)
      if log.level == Fluent::Log::LEVEL_TRACE
        log.trace("       id cache: #{@id_cache.to_a}")
        log.trace("      pod cache: #{@cache.to_a}")
        log.trace("namespace cache: #{@namespace_cache.to_a}")
      end
    end

    def fetch_namespace_metadata(namespace_name)
      log.trace("fetching namespace metadata: #{namespace_name}")
      options = {
        resource_version: '0' # Fetch from API server cache instead of etcd quorum read
      }
      namespace_object = @client.get_namespace(namespace_name, nil, options)
      log.trace("raw metadata for #{namespace_name}: #{namespace_object}")
      metadata = parse_namespace_metadata(namespace_object)
      @stats.bump(:namespace_cache_api_updates)
      log.trace("parsed metadata for #{namespace_name}: #{metadata}")
      @namespace_cache[metadata['namespace_id']] = metadata
      rescue KubeException => e
        if e.error_code == 401
          # recreate client to refresh token
          log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
          create_client()
        else
          log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
          @stats.bump(:namespace_cache_api_nil_error)
        end
        {}
      rescue StandardError => e
        @stats.bump(:namespace_cache_api_nil_error)
        log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
        {}
    end

    def initialize
      super
      @prev_time = Time.now
      @ssl_options = {}
      @auth_options = {}
    end

    def configure(conf)
      super

      require 'kubeclient'
      require 'lru_redux'

      @stats = KubernetesMetadata::Stats.new
      if @stats_interval <= 0
        @stats = KubernetesMetadata::NoOpStats.new
        self.define_singleton_method(:dump_stats) {}
      end

      if @cache_ttl < 0
        log.info 'Setting the cache TTL to :none because it was <= 0'
        @cache_ttl = :none
      end

      # Caches pod/namespace UID tuples for a given container UID.
      @id_cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl)

      # Use the container UID as the key to fetch a hash containing pod metadata
      @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl)

      # Use the namespace UID as the key to fetch a hash containing namespace metadata
      @namespace_cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl)

      @tag_to_kubernetes_name_regexp_compiled = Regexp.compile(@tag_to_kubernetes_name_regexp)

      # Use Kubernetes default service account if we're in a pod.
      if @kubernetes_url.nil?
        log.debug 'Kubernetes URL is not set - inspecting environ'

        env_host = ENV['KUBERNETES_SERVICE_HOST']
        env_port = ENV['KUBERNETES_SERVICE_PORT']
        if present?(env_host) && present?(env_port)
          if env_host =~ Resolv::IPv6::Regex
            # Brackets are needed around IPv6 addresses
            env_host = "[#{env_host}]"
          end
          @kubernetes_url = "https://#{env_host}:#{env_port}/api"
          log.debug "Kubernetes URL is now '#{@kubernetes_url}'"
        else
          log.debug 'No Kubernetes URL could be found in config or environ'
        end
      end

      # Use SSL certificate and bearer token from Kubernetes service account.
      if Dir.exist?(@secret_dir)
        log.debug "Found directory with secrets: #{@secret_dir}"
        ca_cert = File.join(@secret_dir, K8_POD_CA_CERT)
        pod_token = File.join(@secret_dir, K8_POD_TOKEN)

        if !present?(@ca_file) && File.exist?(ca_cert)
          log.debug "Found CA certificate: #{ca_cert}"
          @ca_file = ca_cert
        end

        if !present?(@bearer_token_file) && File.exist?(pod_token)
          log.debug "Found pod token: #{pod_token}"
          @bearer_token_file = pod_token
        end
      end

      if present?(@kubernetes_url)
        @ssl_options = {
          client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
          client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
          ca_file: @ca_file,
          verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
        }

        if @ssl_partial_chain
          # taken from the ssl.rb OpenSSL::SSL::SSLContext code for DEFAULT_CERT_STORE
          require 'openssl'
          ssl_store = OpenSSL::X509::Store.new
          ssl_store.set_default_paths
          flagval = if defined? OpenSSL::X509::V_FLAG_PARTIAL_CHAIN
                      OpenSSL::X509::V_FLAG_PARTIAL_CHAIN
                    else
                      # this version of ruby does not define OpenSSL::X509::V_FLAG_PARTIAL_CHAIN
                      0x80000
                    end
          ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval
          @ssl_options[:cert_store] = ssl_store
        end

        if present?(@bearer_token_file)
          @auth_options[:bearer_token_file] = @bearer_token_file
        end

        create_client()

        if @test_api_adapter
          log.info "Extending client with test api adaper #{@test_api_adapter}"
          require_relative @test_api_adapter.underscore
          @client.extend(eval(@test_api_adapter))
        end

        begin
          @client.api_valid?
        rescue KubeException => e
          raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{e.message}"
        end

        if @watch
          if ENV['K8S_NODE_NAME'].nil? || ENV['K8S_NODE_NAME'].strip.empty?
            log.warn("!! The environment variable 'K8S_NODE_NAME' is not set to the node name which can affect the API server and watch efficiency !!")
          end

          pod_thread = Thread.new(self, &:set_up_pod_thread)
          pod_thread.abort_on_exception = true

          namespace_thread = Thread.new(self, &:set_up_namespace_thread)
          namespace_thread.abort_on_exception = true
        end
      end

      @annotations_regexps = []
      @annotation_match.each do |regexp|
        @annotations_regexps << Regexp.compile(regexp)
      rescue RegexpError => e
        log.error "Error: invalid regular expression in annotation_match: #{e}"
      end
    end

    def create_client()
      log.debug 'Creating K8S client'
      @client = nil
      @client = Kubeclient::Client.new(
        @kubernetes_url,
        @apiVersion,
        ssl_options: @ssl_options,
        auth_options: @auth_options,
        timeouts: {
          open: @open_timeout,
          read: @read_timeout
        },
        as: :parsed_symbolized
      )
    end

    def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key, create_time, batch_miss_cache, docker_id)
      metadata = {
        'docker' => { 'container_id' => "" },
        'kubernetes' => {
          'container_name' => container_name,
          'namespace_name' => namespace_name,
          'pod_name' => pod_name
        }
      }
      metadata['docker']['container_id'] = docker_id unless docker_id.nil?
      container_cache_key = container_name
      if present?(@kubernetes_url)
        pod_metadata = get_pod_metadata(cache_key, namespace_name, pod_name, create_time, batch_miss_cache)
        if (pod_metadata.include? 'containers') && (pod_metadata['containers'].include? container_cache_key) && !@skip_container_metadata
          metadata['kubernetes']['container_image'] = pod_metadata['containers'][container_cache_key]['image']
          metadata['kubernetes']['container_image_id'] = pod_metadata['containers'][container_cache_key]['image_id'] unless pod_metadata['containers'][container_cache_key]['image_id'].empty?
          metadata['docker']['container_id'] = pod_metadata['containers'][container_cache_key]['containerID'] unless pod_metadata['containers'][container_cache_key]['containerID'].empty?
        end

        metadata['kubernetes'].merge!(pod_metadata) if pod_metadata
        metadata['kubernetes'].delete('containers')
      end
      metadata['kubernetes'].tap do |kube|
        kube.each_pair do |k,v|
          kube[k.dup] = v.dup
        end
      end
      metadata.delete('docker') if metadata['docker'] && (metadata['docker']['container_id'].nil? || metadata['docker']['container_id'].empty?)
      metadata
    end

    def filter(tag, time, record)
      tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled)
      batch_miss_cache = {}
      if tag_match_data
        cache_key =  if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil?
          tag_match_data['pod_uuid']
        else
          tag_match_data['docker_id']
        end
        docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil
        metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
                                                cache_key, time, batch_miss_cache, docker_id)
      end
      if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
          record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
          record['kubernetes'].key?('namespace_name') &&
          record['kubernetes'].key?('pod_name') &&
          record['kubernetes'].key?('container_name') &&
          record['docker'].key?('container_id') &&
          (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
                                                record['kubernetes']['container_name'], record['docker']['container_id'],
                                                time, batch_miss_cache, record['docker']['container_id']))
        metadata = k_metadata
      end
      dump_stats
      metadata ? record.merge(metadata) : record
    end

    # copied from activesupport
    def present?(object)
      object.respond_to?(:empty?) ? !object.empty? : !!object
    end
  end
end