uken/fluent-plugin-elasticsearch

View on GitHub
lib/fluent/plugin/out_elasticsearch_dynamic.rb

Summary

Maintainability
D
2 days
Test Coverage
# encoding: UTF-8
require_relative 'out_elasticsearch'

module Fluent::Plugin
  class ElasticsearchOutputDynamic < ElasticsearchOutput

    Fluent::Plugin.register_output('elasticsearch_dynamic', self)

    helpers :event_emitter

    config_param :delimiter, :string, :default => "."

    DYNAMIC_PARAM_NAMES = %W[hosts host port include_timestamp logstash_format logstash_prefix logstash_dateformat time_key utc_index index_name tag_key type_name id_key parent_key routing_key write_operation]
    DYNAMIC_PARAM_SYMBOLS = DYNAMIC_PARAM_NAMES.map { |n| "@#{n}".to_sym }

    RequestInfo = Struct.new(:host, :index)

    attr_reader :dynamic_config

    def configure(conf)
      super

      # evaluate all configurations here
      @dynamic_config = {}
      DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
        value = expand_param(self.instance_variable_get(var), nil, nil, nil)
        key = DYNAMIC_PARAM_NAMES[i]
        @dynamic_config[key] = value.to_s
      }
      # end eval all configs

      log.warn "Elasticsearch dynamic plugin will be deprecated and removed in the future. Please consider to use normal Elasticsearch plugin"
    end

    def create_meta_config_map
      {'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => @routing_key_name}
    end


    def client(host = nil, compress_connection = false)
      # check here to see if we already have a client connection for the given host
      connection_options = get_connection_options(host)

      @_es = nil unless is_existing_connection(connection_options[:hosts])
      @_es = nil unless @compressable_connection == compress_connection

      @_es ||= begin
        @compressable_connection = compress_connection
        @current_config = connection_options[:hosts].clone
        adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
        gzip_headers = if compress_connection
                         {'Content-Encoding' => 'gzip'}
                       else
                         {}
                       end
        headers = { 'Content-Type' => @content_type.to_s, }.merge(gzip_headers)
        ssl_options = { verify: @ssl_verify, ca_file: @ca_file}.merge(@ssl_version_options)
        transport = TRANSPORT_CLASS::Transport::HTTP::Faraday.new(connection_options.merge(
                                                                            options: {
                                                                              reload_connections: @reload_connections,
                                                                              reload_on_failure: @reload_on_failure,
                                                                              resurrect_after: @resurrect_after,
                                                                              logger: @transport_logger,
                                                                              transport_options: {
                                                                                headers: headers,
                                                                                request: { timeout: @request_timeout },
                                                                                ssl: ssl_options,
                                                                              },
                                                                              http: {
                                                                                user: @user,
                                                                                password: @password,
                                                                                scheme: @scheme
                                                                              },
                                                                              compression: compress_connection,
                                                                            }), &adapter_conf)
        Elasticsearch::Client.new transport: transport
      end
    end

    def get_connection_options(con_host)
      raise "`password` must be present if `user` is present" if @user && !@password

      hosts = if con_host || @hosts
        (con_host || @hosts).split(',').map do |host_str|
          # Support legacy hosts format host:port,host:port,host:port...
          if host_str.match(%r{^[^:]+(\:\d+)?$})
            {
              host:   host_str.split(':')[0],
              port:   (host_str.split(':')[1] || @port).to_i,
              scheme: @scheme.to_s
            }
          else
            # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
            uri = URI(get_escaped_userinfo(host_str))
            %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
              hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
              hash
            end
          end
        end.compact
      else
        [{host: @host, port: @port.to_i, scheme: @scheme.to_s}]
      end.each do |host|
        host.merge!(user: @user, password: @password) if !host[:user] && @user
        host.merge!(path: @path) if !host[:path] && @path
      end

      {
        hosts: hosts
      }
    end

    def connection_options_description(host)
      get_connection_options(host)[:hosts].map do |host_info|
        attributes = host_info.dup
        attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
        attributes.inspect
      end.join(', ')
    end

    def multi_workers_ready?
      true
    end

    def write(chunk)
      bulk_message = Hash.new { |h,k| h[k] = '' }
      dynamic_conf = @dynamic_config.clone

      headers = {
        UPDATE_OP => {},
        UPSERT_OP => {},
        CREATE_OP => {},
        INDEX_OP => {}
      }

      tag = chunk.metadata.tag

      chunk.msgpack_each do |time, record|
        next unless record.is_a? Hash

        if @flatten_hashes
          record = flatten_record(record)
        end

        begin
          # evaluate all configurations here
          DYNAMIC_PARAM_SYMBOLS.each_with_index { |var, i|
            k = DYNAMIC_PARAM_NAMES[i]
            v = self.instance_variable_get(var)
            # check here to determine if we should evaluate
            if dynamic_conf[k] != v
              value = expand_param(v, tag, time, record)
              dynamic_conf[k] = value
            end
          }
        # end eval all configs
        rescue => e
          # handle dynamic parameters misconfigurations
          router.emit_error_event(tag, time, record, e)
          next
        end

        if eval_or_val(dynamic_conf['logstash_format']) || eval_or_val(dynamic_conf['include_timestamp'])
          if record.has_key?("@timestamp")
            time = Time.parse record["@timestamp"]
          elsif record.has_key?(dynamic_conf['time_key'])
            time = Time.parse record[dynamic_conf['time_key']]
            record['@timestamp'] = record[dynamic_conf['time_key']] unless time_key_exclude_timestamp
          else
            record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
          end
        end

        if eval_or_val(dynamic_conf['logstash_format'])
          if eval_or_val(dynamic_conf['utc_index'])
            target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).getutc.strftime("#{dynamic_conf['logstash_dateformat']}")}"
          else
            target_index = "#{dynamic_conf['logstash_prefix']}#{@logstash_prefix_separator}#{Time.at(time).strftime("#{dynamic_conf['logstash_dateformat']}")}"
          end
        else
          target_index = dynamic_conf['index_name']
        end

        # Change target_index to lower-case since Elasticsearch doesn't
        # allow upper-case characters in index names.
        target_index = target_index.downcase

        if @include_tag_key
          record.merge!(dynamic_conf['tag_key'] => tag)
        end

        if dynamic_conf['hosts']
          host = dynamic_conf['hosts']
        else
          host = "#{dynamic_conf['host']}:#{dynamic_conf['port']}"
        end

        if @include_index_in_url
          key = RequestInfo.new(host, target_index)
          meta = {"_type" => dynamic_conf['type_name']}
        else
          key = RequestInfo.new(host, nil)
          meta = {"_index" => target_index, "_type" => dynamic_conf['type_name']}
        end

        @meta_config_map.each_pair do |config_name, meta_key|
          if dynamic_conf[config_name] && accessor = record_accessor_create(dynamic_conf[config_name])
            if raw_value = accessor.call(record)
              meta[meta_key] = raw_value
            end
          end
        end

        if @remove_keys
          @remove_keys.each { |key| record.delete(key) }
        end

        write_op = dynamic_conf["write_operation"]
        append_record_to_messages(write_op, meta, headers[write_op], record, bulk_message[key])
      end

      bulk_message.each do |info, msgs|
        send_bulk(msgs, info.host, info.index) unless msgs.empty?
        msgs.clear
      end
    end

    def send_bulk(data, host, index)
      begin
        prepared_data = if compression
                          gzip(data)
                        else
                          data
                        end
        response = client(host, compression).bulk body: prepared_data, index: index
        if response['errors']
          log.error "Could not push log to Elasticsearch: #{response}"
        end
      rescue => e
        @_es = nil if @reconnect_on_error
        # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
        raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{connection_options_description(host)}): #{e.message}"
      end
    end

    def eval_or_val(var)
      return var unless var.is_a?(String)
      eval(var)
    end

    def expand_param(param, tag, time, record)
      # check for '${ ... }'
      #   yes => `eval`
      #   no  => return param
      return param if (param.to_s =~ /\${.+}/).nil?

      # check for 'tag_parts[]'
        # separated by a delimiter (default '.')
      tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil?

      # pull out section between ${} then eval
      inner = param.clone
      while inner.match(/\${.+}/)
        to_eval = inner.match(/\${(.+?)}/){$1}

        if !(to_eval =~ /record\[.+\]/).nil? && record.nil?
          return to_eval
        elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil?
          return to_eval
        elsif !(to_eval =~/time/).nil? && time.nil?
          return to_eval
        else
          inner.sub!(/\${.+?}/, eval( to_eval ))
        end
      end
      inner
    end

    def is_valid_expand_param_type(param)
      return false if [:@buffer_type].include?(param)
      return self.instance_variable_get(param).is_a?(String)
    end
  end
end