fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/socket.rb

Summary

Maintainability
D
1 day
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'socket'
require 'ipaddr'
require 'openssl'
if Fluent.windows?
  require 'certstore'
end

require 'fluent/tls'
require_relative 'socket_option'

module Fluent
  module PluginHelper
    module Socket
      # stop     : [-]
      # shutdown : [-]
      # close    : [-]
      # terminate: [-]

      include Fluent::PluginHelper::SocketOption

      attr_reader :_sockets # for tests

      # TODO: implement connection pool for specified host

      def socket_create(proto, host, port, **kwargs, &block)
        case proto
        when :tcp
          socket_create_tcp(host, port, **kwargs, &block)
        when :udp
          socket_create_udp(host, port, **kwargs, &block)
        when :tls
          socket_create_tls(host, port, **kwargs, &block)
        when :unix
          raise "not implemented yet"
        else
          raise ArgumentError, "invalid protocol: #{proto}"
        end
      end

      def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block)
        sock = if connect_timeout
                 s = ::Socket.tcp(host, port, connect_timeout: connect_timeout)
                 s.autoclose = false # avoid GC triggered close
                 WrappedSocket::TCP.for_fd(s.fileno)
               else
                 WrappedSocket::TCP.new(host, port)
               end
        socket_option_set(sock, resolve_name: resolve_name, **kwargs)
        if block
          begin
            block.call(sock)
          ensure
            sock.close_write rescue nil
            sock.close rescue nil
          end
        else
          sock
        end
      end

      def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block)
        family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6
        sock = WrappedSocket::UDP.new(family)
        socket_option_set(sock, resolve_name: resolve_name, **kwargs)
        sock.connect(host, port) if connect
        if block
          begin
            block.call(sock)
          ensure
            sock.close rescue nil
          end
        else
          sock
        end
      end

      def socket_create_tls(
          host, port,
          version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil,
          enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil,
          cert_path: nil, private_key_path: nil, private_key_passphrase: nil,
          cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true,
          connect_timeout: nil,
          **kwargs, &block)

        host_is_ipaddress = IPAddr.new(host) rescue false
        fqdn ||= host unless host_is_ipaddress

        context = OpenSSL::SSL::SSLContext.new

        if insecure
          log.trace "setting TLS verify_mode NONE"
          context.verify_mode = OpenSSL::SSL::VERIFY_NONE
        else
          cert_store = OpenSSL::X509::Store.new
          if allow_self_signed_cert && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE')
            cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE
          end
          begin
            if enable_system_cert_store
              if Fluent.windows? && cert_logical_store_name
                log.trace "loading Windows system certificate store"
                loader = Certstore::OpenSSL::Loader.new(log, cert_store, cert_logical_store_name,
                                                        enterprise: cert_use_enterprise_store)
                loader.load_cert_store
                cert_store = loader.cert_store
                context.cert = loader.get_certificate(cert_thumbprint) if cert_thumbprint
              end
              log.trace "loading system default certificate store"
              cert_store.set_default_paths
            end
          rescue OpenSSL::X509::StoreError
            log.warn "failed to load system default certificate store", error: e
          end
          if cert_paths
            if cert_paths.respond_to?(:each)
              cert_paths.each do |cert_path|
                log.trace "adding CA cert", path: cert_path
                cert_store.add_file(cert_path)
              end
            else
              cert_path = cert_paths
              log.trace "adding CA cert", path: cert_path
              cert_store.add_file(cert_path)
            end
          end

          log.trace "setting TLS context", mode: "peer", ciphers: ciphers
          context.set_params({})
          context.ciphers = ciphers
          context.verify_mode = OpenSSL::SSL::VERIFY_PEER
          context.cert_store = cert_store
          context.verify_hostname = verify_fqdn && fqdn
          context.key = OpenSSL::PKey::read(File.read(private_key_path), private_key_passphrase) if private_key_path

          if cert_path
            certs = socket_certificates_from_file(cert_path)
            context.cert = certs.shift
            unless certs.empty?
              context.extra_chain_cert = certs
            end
          end
        end
        Fluent::TLS.set_version_to_context(context, version, min_version, max_version)

        tcpsock = socket_create_tcp(host, port, connect_timeout: connect_timeout, **kwargs)
        sock = WrappedSocket::TLS.new(tcpsock, context)
        sock.sync_close = true
        sock.hostname = fqdn if verify_fqdn && fqdn && sock.respond_to?(:hostname=)

        log.trace "entering TLS handshake"
        if connect_timeout
          begin
            Timeout.timeout(connect_timeout) { sock.connect }
          rescue Timeout::Error
            log.warn "timeout while connecting tls session", host: host
            sock.close rescue nil
            raise
          end
        else
          sock.connect
        end

        begin
          if verify_fqdn
            log.trace "checking peer's certificate", subject: sock.peer_cert.subject
            sock.post_connection_check(fqdn)
            verify = sock.verify_result
            if verify != OpenSSL::X509::V_OK
              err_name = Socket.tls_verify_result_name(verify)
              log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name
              raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{host} as #{fqdn}"
            end
          end
        rescue OpenSSL::SSL::SSLError => e
          log.warn "failed to verify certification while connecting tls session", host: host, fqdn: fqdn, error: e
          raise
        end

        if block
          begin
            block.call(sock)
          ensure
            sock.close rescue nil
          end
        else
          sock
        end
      end

      def socket_certificates_from_file(path)
        data = File.read(path)
        pattern = Regexp.compile('-+BEGIN CERTIFICATE-+\r?\n(?:[^-]*\r?\n)+-+END CERTIFICATE-+\r?\n?', Regexp::MULTILINE)
        list = []
        data.scan(pattern) { |match| list << OpenSSL::X509::Certificate.new(match) }
        if list.length == 0
          raise Fluent::ConfigError, "cert_path does not contain a valid certificate"
        end
        list
      end

      def self.tls_verify_result_name(code)
        case code
        when OpenSSL::X509::V_OK then 'V_OK'
        when OpenSSL::X509::V_ERR_AKID_SKID_MISMATCH then 'V_ERR_AKID_SKID_MISMATCH'
        when OpenSSL::X509::V_ERR_APPLICATION_VERIFICATION then 'V_ERR_APPLICATION_VERIFICATION'
        when OpenSSL::X509::V_ERR_CERT_CHAIN_TOO_LONG then 'V_ERR_CERT_CHAIN_TOO_LONG'
        when OpenSSL::X509::V_ERR_CERT_HAS_EXPIRED then 'V_ERR_CERT_HAS_EXPIRED'
        when OpenSSL::X509::V_ERR_CERT_NOT_YET_VALID then 'V_ERR_CERT_NOT_YET_VALID'
        when OpenSSL::X509::V_ERR_CERT_REJECTED then 'V_ERR_CERT_REJECTED'
        when OpenSSL::X509::V_ERR_CERT_REVOKED then 'V_ERR_CERT_REVOKED'
        when OpenSSL::X509::V_ERR_CERT_SIGNATURE_FAILURE then 'V_ERR_CERT_SIGNATURE_FAILURE'
        when OpenSSL::X509::V_ERR_CERT_UNTRUSTED then 'V_ERR_CERT_UNTRUSTED'
        when OpenSSL::X509::V_ERR_CRL_HAS_EXPIRED then 'V_ERR_CRL_HAS_EXPIRED'
        when OpenSSL::X509::V_ERR_CRL_NOT_YET_VALID then 'V_ERR_CRL_NOT_YET_VALID'
        when OpenSSL::X509::V_ERR_CRL_SIGNATURE_FAILURE then 'V_ERR_CRL_SIGNATURE_FAILURE'
        when OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT then 'V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT'
        when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD'
        when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD'
        when OpenSSL::X509::V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD'
        when OpenSSL::X509::V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD'
        when OpenSSL::X509::V_ERR_INVALID_CA then 'V_ERR_INVALID_CA'
        when OpenSSL::X509::V_ERR_INVALID_PURPOSE then 'V_ERR_INVALID_PURPOSE'
        when OpenSSL::X509::V_ERR_KEYUSAGE_NO_CERTSIGN then 'V_ERR_KEYUSAGE_NO_CERTSIGN'
        when OpenSSL::X509::V_ERR_OUT_OF_MEM then 'V_ERR_OUT_OF_MEM'
        when OpenSSL::X509::V_ERR_PATH_LENGTH_EXCEEDED then 'V_ERR_PATH_LENGTH_EXCEEDED'
        when OpenSSL::X509::V_ERR_SELF_SIGNED_CERT_IN_CHAIN then 'V_ERR_SELF_SIGNED_CERT_IN_CHAIN'
        when OpenSSL::X509::V_ERR_SUBJECT_ISSUER_MISMATCH then 'V_ERR_SUBJECT_ISSUER_MISMATCH'
        when OpenSSL::X509::V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
        when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
        when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE then 'V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE'
        when OpenSSL::X509::V_ERR_UNABLE_TO_GET_CRL then 'V_ERR_UNABLE_TO_GET_CRL'
        when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT'
        when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY'
        when OpenSSL::X509::V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE then 'V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE'
        end
      end

      # socket_create_socks ?

      module WrappedSocket
        class TCP < ::TCPSocket
          def remote_addr; peeraddr[3]; end
          def remote_host; peeraddr[2]; end
          def remote_port; peeraddr[1]; end
        end
        class UDP < ::UDPSocket
          def remote_addr; peeraddr[3]; end
          def remote_host; peeraddr[2]; end
          def remote_port; peeraddr[1]; end
        end
        class TLS < OpenSSL::SSL::SSLSocket
          def remote_addr; peeraddr[3]; end
          def remote_host; peeraddr[2]; end
          def remote_port; peeraddr[1]; end
        end
      end

      def initialize
        super
        # @_sockets = [] # for keepalived sockets / connection pool
      end

      # def close
      #   @_sockets.each do |sock|
      #     sock.close
      #   end
      #   super
      # end
    end
  end
end