fluent/fluentd

View on GitHub
lib/fluent/plugin/out_http.rb

Summary

Maintainability
D
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'net/http'
require 'uri'
require 'openssl'
require 'fluent/tls'
require 'fluent/plugin/output'
require 'fluent/plugin_helper/socket'

# patch Net::HTTP to support extra_chain_cert which was added in Ruby feature #9758.
# see: https://github.com/ruby/ruby/commit/31af0dafba6d3769d2a39617c0dddedb97883712
unless Net::HTTP::SSL_IVNAMES.include?(:@extra_chain_cert)
  class Net::HTTP
    SSL_IVNAMES << :@extra_chain_cert
    SSL_ATTRIBUTES << :extra_chain_cert
    attr_accessor :extra_chain_cert
  end
end

module Fluent::Plugin
  class HTTPOutput < Output
    Fluent::Plugin.register_output('http', self)

    class RetryableResponse < StandardError; end

    ConnectionCache = Struct.new(:uri, :conn)

    helpers :formatter

    desc 'The endpoint for HTTP request, e.g. http://example.com/api'
    config_param :endpoint, :string
    desc 'The method for HTTP request'
    config_param :http_method, :enum, list: [:put, :post], default: :post
    desc 'The proxy for HTTP request'
    config_param :proxy, :string, default: ENV['HTTP_PROXY'] || ENV['http_proxy']
    desc 'Content-Type for HTTP request'
    config_param :content_type, :string, default: nil
    desc 'JSON array data format for HTTP request body'
    config_param :json_array, :bool, default: false
    desc 'Additional headers for HTTP request'
    config_param :headers, :hash, default: nil
    desc 'Additional placeholder based headers for HTTP request'
    config_param :headers_from_placeholders, :hash, default: nil

    desc 'The connection open timeout in seconds'
    config_param :open_timeout, :integer, default: nil
    desc 'The read timeout in seconds'
    config_param :read_timeout, :integer, default: nil
    desc 'The TLS timeout in seconds'
    config_param :ssl_timeout, :integer, default: nil
    desc 'Try to reuse connections'
    config_param :reuse_connections, :bool, default: false

    desc 'The CA certificate path for TLS'
    config_param :tls_ca_cert_path, :string, default: nil
    desc 'The client certificate path for TLS'
    config_param :tls_client_cert_path, :string, default: nil
    desc 'The client private key path for TLS'
    config_param :tls_private_key_path, :string, default: nil
    desc 'The client private key passphrase for TLS'
    config_param :tls_private_key_passphrase, :string, default: nil, secret: true
    desc 'The verify mode of TLS'
    config_param :tls_verify_mode, :enum, list: [:none, :peer], default: :peer
    desc 'The default version of TLS'
    config_param :tls_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: Fluent::TLS::DEFAULT_VERSION
    desc 'The cipher configuration of TLS'
    config_param :tls_ciphers, :string, default: Fluent::TLS::CIPHERS_DEFAULT

    desc 'Raise UnrecoverableError when the response is non success, 4xx/5xx'
    config_param :error_response_as_unrecoverable, :bool, default: true
    desc 'The list of retryable response code'
    config_param :retryable_response_codes, :array, value_type: :integer, default: nil

    config_section :format do
      config_set_default :@type, 'json'
    end

    config_section :auth, required: false, multi: false do
      desc 'The method for HTTP authentication'
      config_param :method, :enum, list: [:basic, :aws_sigv4], default: :basic
      desc 'The username for basic authentication'
      config_param :username, :string, default: nil
      desc 'The password for basic authentication'
      config_param :password, :string, default: nil, secret: true
      desc 'The AWS service to authenticate against'
      config_param :aws_service, :string, default: nil
      desc 'The AWS region to use when authenticating'
      config_param :aws_region, :string, default: nil
      desc 'The AWS role ARN to assume when authenticating'
      config_param :aws_role_arn, :string, default: nil
    end

    def connection_cache_id_thread_key
      "#{plugin_id}_connection_cache_id"
    end

    def connection_cache_id_for_thread
      Thread.current[connection_cache_id_thread_key]
    end

    def connection_cache_id_for_thread=(id)
      Thread.current[connection_cache_id_thread_key] = id
    end

    def initialize
      super

      @uri = nil
      @proxy_uri = nil
      @formatter = nil

      @connection_cache = []
      @connection_cache_id_mutex = Mutex.new
      @connection_cache_next_id = 0
    end

    def close
      super

      @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? }
    end

    def configure(conf)
      super

      @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections

      if @retryable_response_codes.nil?
        log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish')
        @retryable_response_codes = [503]
      end

      @http_opt = setup_http_option
      @proxy_uri = URI.parse(@proxy) if @proxy
      @formatter = formatter_create
      @content_type = setup_content_type unless @content_type

      if @json_array
        if @formatter_configs.first[:@type] != "json"
          raise Fluent::ConfigError, "json_array option could be used with json formatter only"
        end
        define_singleton_method(:format, method(:format_json_array))
      end

      if @auth and @auth.method == :aws_sigv4
        begin
          require 'aws-sigv4'
          require 'aws-sdk-core'
        rescue LoadError
          raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'"
        end

        raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil
        raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil

        if @auth.aws_role_arn == nil
          aws_credentials = Aws::CredentialProviderChain.new.resolve
        else
          aws_credentials = Aws::AssumeRoleCredentials.new(
            client: Aws::STS::Client.new(
              region: @auth.aws_region
            ),
            role_arn: @auth.aws_role_arn,
            role_session_name: "fluentd"
          )
        end

        @aws_signer = Aws::Sigv4::Signer.new(
          service: @auth.aws_service,
          region: @auth.aws_region,
          credentials_provider: aws_credentials
        )
      end
    end

    def multi_workers_ready?
      true
    end

    def formatted_to_msgpack_binary?
      @formatter_configs.first[:@type] == 'msgpack'
    end

    def format(tag, time, record)
      @formatter.format(tag, time, record)
    end

    def format_json_array(tag, time, record)
      @formatter.format(tag, time, record) << ","
    end

    def write(chunk)
      uri = parse_endpoint(chunk)
      req = create_request(chunk, uri)

      log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

      send_request(uri, req)
    end

    private

    def setup_content_type
      case @formatter_configs.first[:@type]
      when 'json'
        @json_array ? 'application/json' : 'application/x-ndjson'
      when 'csv'
        'text/csv'
      when 'tsv', 'ltsv'
        'text/tab-separated-values'
      when 'msgpack'
        'application/x-msgpack'
      when 'out_file', 'single_value', 'stdout', 'hash'
        'text/plain'
      else
        raise Fluent::ConfigError, "can't determine Content-Type from formatter type. Set content_type parameter explicitly"
      end
    end

    def setup_http_option
      use_ssl = @endpoint.start_with?('https')
      opt = {
        open_timeout: @open_timeout,
        read_timeout: @read_timeout,
        ssl_timeout: @ssl_timeout,
        use_ssl: use_ssl
      }

      if use_ssl
        if @tls_ca_cert_path
          raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path)
          opt[:ca_file] = @tls_ca_cert_path
        end
        if @tls_client_cert_path
          raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path)

          bundle = File.read(@tls_client_cert_path)
          bundle_certs = bundle.scan(/-----BEGIN CERTIFICATE-----(?:.|\n)+?-----END CERTIFICATE-----/)
          opt[:cert] = OpenSSL::X509::Certificate.new(bundle_certs[0])

          intermediate_certs = bundle_certs[1..-1]
          if intermediate_certs
            opt[:extra_chain_cert] = intermediate_certs.map { |cert| OpenSSL::X509::Certificate.new(cert) }
          end
        end
        if @tls_private_key_path
          raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path)
          opt[:key] = OpenSSL::PKey.read(File.read(@tls_private_key_path), @tls_private_key_passphrase)
        end
        opt[:verify_mode] = case @tls_verify_mode
                            when :none
                              OpenSSL::SSL::VERIFY_NONE
                            when :peer
                              OpenSSL::SSL::VERIFY_PEER
                            end
        opt[:ciphers] = @tls_ciphers
        opt[:ssl_version] = @tls_version
      end

      opt
    end

    def parse_endpoint(chunk)
      endpoint = extract_placeholders(@endpoint, chunk)
      URI.parse(endpoint)
    end

    def set_headers(req, uri, chunk)
      if @headers
        @headers.each do |k, v|
          req[k] = v
        end
      end
      if @headers_from_placeholders
        @headers_from_placeholders.each do |k, v|
          req[k] = extract_placeholders(v, chunk)
        end
      end
      req['Content-Type'] = @content_type
    end

    def set_auth(req, uri)
      return unless @auth

      if @auth.method == :basic
        req.basic_auth(@auth.username, @auth.password)
      elsif @auth.method == :aws_sigv4
        signature = @aws_signer.sign_request(
          http_method: req.method,
          url: uri.request_uri,
          headers: {
            'Content-Type' => @content_type,
            'Host' => uri.host
          },
          body: req.body
        )
        req.add_field('x-amz-date', signature.headers['x-amz-date'])
        req.add_field('x-amz-security-token', signature.headers['x-amz-security-token'])
        req.add_field('x-amz-content-sha256', signature.headers['x-amz-content-sha256'])
        req.add_field('authorization', signature.headers['authorization'])
      end
    end

    def create_request(chunk, uri)
      req = case @http_method
            when :post
              Net::HTTP::Post.new(uri.request_uri)
            when :put
              Net::HTTP::Put.new(uri.request_uri)
            end
      set_headers(req, uri, chunk)
      req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read

      # At least one authentication method requires the body and other headers, so the order of this call matters
      set_auth(req, uri)
      req
    end

    def make_request_cached(uri, req)
      id = self.connection_cache_id_for_thread
      if id.nil?
        @connection_cache_id_mutex.synchronize {
          id = @connection_cache_next_id
          @connection_cache_next_id += 1
        }
        self.connection_cache_id_for_thread = id
      end
      uri_str = uri.to_s
      if @connection_cache[id].uri != uri_str
        @connection_cache[id].conn.finish if @connection_cache[id].conn&.started?
        http =  if @proxy_uri
                  Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
                else
                  Net::HTTP.start(uri.host, uri.port, @http_opt)
                end
        @connection_cache[id] = ConnectionCache.new(uri_str, http)
      end
      @connection_cache[id].conn.request(req)
    end

    def make_request(uri, req, &block)
      if @proxy_uri
        Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt, &block)
      else
        Net::HTTP.start(uri.host, uri.port, @http_opt, &block)
      end
    end

    def send_request(uri, req)
      res = if @reuse_connections
              make_request_cached(uri, req)
            else
              make_request(uri, req) { |http| http.request(req) }
            end

      if res.is_a?(Net::HTTPSuccess)
        log.debug { "#{res.code} #{res.message.rstrip}#{res.body.lstrip}" }
      else
        msg = "#{res.code} #{res.message.rstrip} #{res.body.lstrip}"

        if @retryable_response_codes.include?(res.code.to_i)
          raise RetryableResponse, msg
        end

        if @error_response_as_unrecoverable
          raise Fluent::UnrecoverableError, msg
        else
          log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}"
        end
      end
    end
  end
end