uken/fluent-plugin-elasticsearch

View on GitHub
lib/fluent/plugin/out_elasticsearch_data_stream.rb

Summary

Maintainability
D
1 day
Test Coverage

require_relative 'out_elasticsearch'

module Fluent::Plugin
  class ElasticsearchOutputDataStream < ElasticsearchOutput

    Fluent::Plugin.register_output('elasticsearch_data_stream', self)

    helpers :event_emitter

    config_param :data_stream_name, :string
    config_param :data_stream_ilm_name, :string, :default => nil
    config_param :data_stream_template_name, :string, :default => nil
    config_param :data_stream_ilm_policy, :string, :default => nil
    config_param :data_stream_ilm_policy_overwrite, :bool, :default => false
    config_param :data_stream_template_use_index_patterns_wildcard, :bool, :default => true

    # Elasticsearch 7.9 or later always support new style of index template.
    config_set_default :use_legacy_template, false

    INVALID_START_CHRACTERS = ["-", "_", "+", "."]
    INVALID_CHARACTERS = ["\\", "/", "*", "?", "\"", "<", ">", "|", " ", ",", "#", ":"]

    def configure(conf)
      super

      if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0")
        begin
          require 'elasticsearch/api'
          require 'elasticsearch/xpack'
        rescue LoadError
          raise Fluent::ConfigError, "'elasticsearch/api', 'elasticsearch/xpack' are required for <@elasticsearch_data_stream>."
        end
      else
        begin
          require 'elasticsearch/api'
        rescue LoadError
          raise Fluent::ConfigError, "'elasticsearch/api is required for <@elasticsearch_data_stream>."
        end
      end

      @data_stream_ilm_name = "#{@data_stream_name}_policy" if @data_stream_ilm_name.nil?
      @data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?
      @data_stream_ilm_policy = File.read(File.join(File.dirname(__FILE__), "default-ilm-policy.json")) if @data_stream_ilm_policy.nil?

      # ref. https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-create-data-stream.html
      unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
        validate_data_stream_parameters
      else
        @use_placeholder = true
        @data_stream_names = []
      end

      unless @use_placeholder
        begin
          @data_stream_names = [@data_stream_name]
          retry_operate(@max_retry_putting_template,
                        @fail_on_putting_template_retry_exceed,
                        @catch_transport_exception_on_retry) do
            create_ilm_policy(@data_stream_name, @data_stream_template_name, @data_stream_ilm_name)
            create_index_template(@data_stream_name, @data_stream_template_name, @data_stream_ilm_name)
            create_data_stream(@data_stream_name)
          end
        rescue => e
          raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}"
        end
      end
    end

    def validate_data_stream_parameters
      {"data_stream_name" => @data_stream_name,
       "data_stream_template_name"=> @data_stream_template_name,
       "data_stream_ilm_name" => @data_stream_ilm_name}.each do |parameter, value|
        unless valid_data_stream_parameters?(value)
          unless start_with_valid_characters?(value)
            if not_dots?(value)
              raise Fluent::ConfigError, "'#{parameter}' must not start with #{INVALID_START_CHRACTERS.join(",")}: <#{value}>"
            else
              raise Fluent::ConfigError, "'#{parameter}' must not be . or ..: <#{value}>"
            end
          end
          unless valid_characters?(value)
            raise Fluent::ConfigError, "'#{parameter}' must not contain invalid characters #{INVALID_CHARACTERS.join(",")}: <#{value}>"
          end
          unless lowercase_only?(value)
            raise Fluent::ConfigError, "'#{parameter}' must be lowercase only: <#{value}>"
          end
          if value.bytes.size > 255
            raise Fluent::ConfigError, "'#{parameter}' must not be longer than 255 bytes: <#{value}>"
          end
        end
      end
    end

    def create_ilm_policy(datastream_name, template_name, ilm_name, host = nil)
      unless @data_stream_ilm_policy_overwrite
        return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host) or ilm_policy_exists?(ilm_name, host)
      end

      params = {
        body: @data_stream_ilm_policy
      }
      retry_operate(@max_retry_putting_template,
                    @fail_on_putting_template_retry_exceed,
                    @catch_transport_exception_on_retry) do
        if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0")
          client(host).ilm.put_lifecycle(params.merge(policy: ilm_name))
        else
          client(host).xpack.ilm.put_policy(params.merge(policy_id: ilm_name))
        end
      end
    end

    def create_index_template(datastream_name, template_name, ilm_name, host = nil)
      return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
      wildcard = @data_stream_template_use_index_patterns_wildcard ? '*' : ''
      body = {
        "index_patterns" => ["#{datastream_name}#{wildcard}"],
        "data_stream" => {},
        "template" => {
          "settings" => {
            "index.lifecycle.name" => "#{ilm_name}"
          }
        }
      }
      params = {
        name: template_name,
        body: body
      }
      retry_operate(@max_retry_putting_template,
                    @fail_on_putting_template_retry_exceed,
                    @catch_transport_exception_on_retry) do
        client(host).indices.put_index_template(params)
      end
    end

    def data_stream_exist?(datastream_name, host = nil)
      params = {
        name: datastream_name
      }
      begin
        response = client(host).indices.get_data_stream(params)
        return (not response.is_a?(TRANSPORT_CLASS::Transport::Errors::NotFound))
      rescue TRANSPORT_CLASS::Transport::Errors::NotFound => e
        log.info "Specified data stream does not exist. Will be created: <#{datastream_name}>"
        return false
      end
    end

    def create_data_stream(datastream_name, host = nil)
      return if data_stream_exist?(datastream_name, host)
      params = {
        name: datastream_name
      }
      retry_operate(@max_retry_putting_template,
                    @fail_on_putting_template_retry_exceed,
                    @catch_transport_exception_on_retry) do
        client(host).indices.create_data_stream(params)
      end
    end

    def ilm_policy_exists?(policy_id, host = nil)
      begin
        if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0")
          client(host).ilm.get_lifecycle(policy: policy_id)
        else
          client(host).ilm.get_policy(policy_id: policy_id)
        end
        true
      rescue
        false
      end
    end

    def template_exists?(name, host = nil)
      if @use_legacy_template
        client(host).indices.get_template(:name => name)
      else
        client(host).indices.get_index_template(:name => name)
      end
      return true
    rescue TRANSPORT_CLASS::Transport::Errors::NotFound
      return false
    end

    def valid_data_stream_parameters?(data_stream_parameter)
      lowercase_only?(data_stream_parameter) and
        valid_characters?(data_stream_parameter) and
        start_with_valid_characters?(data_stream_parameter) and
        not_dots?(data_stream_parameter) and
        data_stream_parameter.bytes.size <= 255
    end

    def lowercase_only?(data_stream_parameter)
      data_stream_parameter.downcase == data_stream_parameter
    end

    def valid_characters?(data_stream_parameter)
      not (INVALID_CHARACTERS.each.any? do |v| data_stream_parameter.include?(v) end)
    end

    def start_with_valid_characters?(data_stream_parameter)
      not (INVALID_START_CHRACTERS.each.any? do |v| data_stream_parameter.start_with?(v) end)
    end

    def not_dots?(data_stream_parameter)
      not (data_stream_parameter == "." or data_stream_parameter == "..")
    end

    def client_library_version
      Elasticsearch::VERSION
    end

    def multi_workers_ready?
      true
    end

    def write(chunk)
      data_stream_name = @data_stream_name
      data_stream_template_name = @data_stream_template_name
      data_stream_ilm_name = @data_stream_ilm_name
      host = nil
      if @use_placeholder
        host = if @hosts
                 extract_placeholders(@hosts, chunk)
               else
                 extract_placeholders(@host, chunk)
               end
        data_stream_name = extract_placeholders(@data_stream_name, chunk)
        data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk)
        data_stream_ilm_name = extract_placeholders(@data_stream_ilm_name, chunk)
        unless @data_stream_names.include?(data_stream_name)
          begin
            create_ilm_policy(data_stream_name, data_stream_template_name, data_stream_ilm_name, host)
            create_index_template(data_stream_name, data_stream_template_name, data_stream_ilm_name, host)
            create_data_stream(data_stream_name)
            @data_stream_names << data_stream_name
          rescue => e
            raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
          end
        end
      end

      bulk_message = ""
      headers = {
        CREATE_OP => {}
      }
      tag = chunk.metadata.tag
      chunk.msgpack_each do |time, record|
        next unless record.is_a? Hash

        if @include_tag_key
          record[@tag_key] = tag
        end

        begin
          unless record.has_key?("@timestamp")
            record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
          end
          bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      end

      prepared_data = if compression
        gzip(bulk_message)
      else
        bulk_message
      end

      params = {
        index: data_stream_name,
        body: prepared_data
      }
      begin
        response = client(host, compression).bulk(params)
        if response['errors']
          log.error "Could not bulk insert to Data Stream: #{data_stream_name} #{response}"
          @num_errors_metrics.inc
        end
      rescue => e
        raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{data_stream_name}): #{e.message}"
      end
    end

    def append_record_to_messages(op, meta, header, record, msgs)
      header[CREATE_OP] = meta
      msgs << @dump_proc.call(header) << BODY_DELIMITER
      msgs << @dump_proc.call(record) << BODY_DELIMITER
      msgs
    end

    def retry_stream_retryable?
      @buffer.storable?
    end
  end
end