nulogy/Gorgon

View on GitHub
lib/gorgon_bunny/lib/gorgon_bunny/transport.rb

Summary

Maintainability
C
1 day
Test Coverage
require "socket"
require "thread"
require "monitor"

begin
  require "openssl"
rescue LoadError => le
  $stderr.puts "Could not load OpenSSL"
end

require "gorgon_bunny/exceptions"
require "gorgon_bunny/socket"

module GorgonBunny
  # @private
  class Transport

    #
    # API
    #

    # Default TCP connection timeout
    DEFAULT_CONNECTION_TIMEOUT = 5.0
    # Default TLS protocol version to use.
    # Currently SSLv3, same as in RabbitMQ Java client
    DEFAULT_TLS_PROTOCOL       = "SSLv3"


    attr_reader :session, :host, :port, :socket, :connect_timeout, :read_write_timeout, :disconnect_timeout
    attr_reader :tls_context

    def initialize(session, host, port, opts)
      @session        = session
      @session_thread = opts[:session_thread]
      @host    = host
      @port    = port
      @opts    = opts

      @logger                = session.logger
      @tls_enabled           = tls_enabled?(opts)

      @read_write_timeout = opts[:socket_timeout] || 3
      @read_write_timeout = nil if @read_write_timeout == 0
      @connect_timeout    = self.timeout_from(opts)
      @connect_timeout    = nil if @connect_timeout == 0
      @disconnect_timeout = @read_write_timeout || @connect_timeout

      @writes_mutex       = @session.mutex_impl.new

      maybe_initialize_socket
      prepare_tls_context(opts) if @tls_enabled
    end

    def hostname
      @host
    end

    def uses_tls?
      @tls_enabled
    end
    alias tls? uses_tls?

    def uses_ssl?
      @tls_enabled
    end
    alias ssl? uses_ssl?


    def connect
      if uses_ssl?
        @socket.connect
        @socket.post_connection_check(host) if uses_tls? && @verify_peer

        @status = :connected

        @socket
      else
        # no-op
      end
    end

    def connected?
      :not_connected == @status && open?
    end

    def configure_socket(&block)
      block.call(@socket) if @socket
    end

    def configure_tls_context(&block)
      block.call(@tls_context) if @tls_context
    end

    # Writes data to the socket. If read/write timeout was specified, GorgonBunny::ClientTimeout will be raised
    # if the operation times out.
    #
    # @raise [ClientTimeout]
    def write(data)
      begin
        if @read_write_timeout
          GorgonBunny::Timeout.timeout(@read_write_timeout, GorgonBunny::ClientTimeout) do
            if open?
              @writes_mutex.synchronize { @socket.write(data) }
              @socket.flush
            end
          end
        else
          if open?
            @writes_mutex.synchronize { @socket.write(data) }
            @socket.flush
          end
        end
      rescue SystemCallError, GorgonBunny::ClientTimeout, GorgonBunny::ConnectionError, IOError => e
        @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})"
        close
        @status = :not_connected

        if @session.automatically_recover?
          @session.handle_network_failure(e)
        else
          @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
        end
      end
    end

    # Writes data to the socket without timeout checks
    def write_without_timeout(data)
      begin
        @writes_mutex.synchronize { @socket.write(data) }
        @socket.flush
      rescue SystemCallError, GorgonBunny::ConnectionError, IOError => e
        close

        if @session.automatically_recover?
          @session.handle_network_failure(e)
        else
          @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
        end
      end
    end

    # Sends frame to the peer.
    #
    # @raise [ConnectionClosedError]
    # @private
    def send_frame(frame)
      if closed?
        @session.handle_network_failure(ConnectionClosedError.new(frame))
      else
        write(frame.encode)
      end
    end

    # Sends frame to the peer without timeout control.
    #
    # @raise [ConnectionClosedError]
    # @private
    def send_frame_without_timeout(frame)
      if closed?
        @session.handle_network_failure(ConnectionClosedError.new(frame))
      else
        write_without_timeout(frame.encode)
      end
    end


    def close(reason = nil)
      @socket.close if open?
    end

    def open?
      @socket && !@socket.closed?
    end

    def closed?
      !open?
    end

    def flush
      @socket.flush if @socket
    end

    def read_fully(*args)
      @socket.read_fully(*args)
    end

    def read_ready?(timeout = nil)
      io = IO.select([@socket].compact, nil, nil, timeout)
      io && io[0].include?(@socket)
    end


    # Exposed primarily for GorgonBunny::Channel
    # @private
    def read_next_frame(opts = {})
      header    = @socket.read_fully(7)
      # TODO: network issues here will sometimes cause
      #       the socket method return an empty string. We need to log
      #       and handle this better.
      # type, channel, size = begin
      #                         GorgonAMQ::Protocol::Frame.decode_header(header)
      #                       rescue GorgonAMQ::Protocol::EmptyResponseError => e
      #                         puts "Got GorgonAMQ::Protocol::EmptyResponseError, header is #{header.inspect}"
      #                       end
      type, channel, size = GorgonAMQ::Protocol::Frame.decode_header(header)
      payload   = @socket.read_fully(size)
      frame_end = @socket.read_fully(1)

      # 1) the size is miscalculated
      if payload.bytesize != size
        raise BadLengthError.new(size, payload.bytesize)
      end

      # 2) the size is OK, but the string doesn't end with FINAL_OCTET
      raise NoFinalOctetError.new if frame_end != GorgonAMQ::Protocol::Frame::FINAL_OCTET
      GorgonAMQ::Protocol::Frame.new(type, payload, channel)
    end


    def self.reacheable?(host, port, timeout)
      begin
        s = GorgonBunny::Socket.open(host, port,
          :socket_timeout => timeout)

        true
      rescue SocketError, Timeout::Error => e
        false
      ensure
        s.close if s
      end
    end

    def self.ping!(host, port, timeout)
      raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
    end

    def initialize_socket
      begin
        @socket = GorgonBunny::Timeout.timeout(@connect_timeout, ClientTimeout) do
          GorgonBunny::Socket.open(@host, @port,
            :keepalive      => @opts[:keepalive],
            :socket_timeout => @connect_timeout)
        end
      rescue StandardError, ClientTimeout => e
        @status = :not_connected
        raise GorgonBunny::TCPConnectionFailed.new(e, self.hostname, self.port)
      end

      @socket
    end

    def maybe_initialize_socket
      initialize_socket if !@socket || closed?
    end

    def post_initialize_socket
      @socket = if uses_tls?
                  wrap_in_tls_socket(@socket)
                else
                  @socket
                end
    end

    protected

    def tls_enabled?(opts)
      opts[:tls] || opts[:ssl] || (opts[:port] == GorgonAMQ::Protocol::TLS_PORT) || false
    end

    def tls_certificate_path_from(opts)
      opts[:tls_cert] || opts[:ssl_cert] || opts[:tls_cert_path] || opts[:ssl_cert_path] || opts[:tls_certificate_path] || opts[:ssl_certificate_path]
    end

    def tls_key_path_from(opts)
      opts[:tls_key] || opts[:ssl_key] || opts[:tls_key_path] || opts[:ssl_key_path]
    end

    def tls_certificate_from(opts)
      begin
        read_client_certificate!
      rescue MissingTLSCertificateFile => e
        inline_client_certificate_from(opts)
      end
    end

    def tls_key_from(opts)
      begin
        read_client_key!
      rescue MissingTLSKeyFile => e
        inline_client_key_from(opts)
      end
    end


    def inline_client_certificate_from(opts)
      opts[:tls_certificate] || opts[:ssl_cert_string]
    end

    def inline_client_key_from(opts)
      opts[:tls_key] || opts[:ssl_key_string]
    end

    def prepare_tls_context(opts)
      # client cert/key paths
      @tls_certificate_path  = tls_certificate_path_from(opts)
      @tls_key_path          = tls_key_path_from(opts)
      # client cert/key
      @tls_certificate       = tls_certificate_from(opts)
      @tls_key               = tls_key_from(opts)
      @tls_certificate_store = opts[:tls_certificate_store]

      default_ca_file = ENV[OpenSSL::X509::DEFAULT_CERT_FILE_ENV] || OpenSSL::X509::DEFAULT_CERT_FILE
      default_ca_path = ENV[OpenSSL::X509::DEFAULT_CERT_DIR_ENV] || OpenSSL::X509::DEFAULT_CERT_DIR
      @tls_ca_certificates   = opts.fetch(:tls_ca_certificates, [
          default_ca_file,
          File.join(default_ca_path, 'ca-certificates.crt'), # Ubuntu/Debian
          File.join(default_ca_path, 'ca-bundle.crt'), # Amazon Linux & Fedora/RHEL
          File.join(default_ca_path, 'ca-bundle.pem') # OpenSUSE
        ])
      @verify_peer           = opts[:verify_ssl] || opts[:verify_peer]

      @tls_context = initialize_tls_context(OpenSSL::SSL::SSLContext.new)
    end

    def wrap_in_tls_socket(socket)
      raise ArgumentError, "cannot wrap nil into TLS socket, @tls_context is nil. This is a GorgonBunny bug." unless socket
      raise "cannot wrap a socket into TLS socket, @tls_context is nil. This is a GorgonBunny bug." unless @tls_context

      s = GorgonBunny::SSLSocket.new(socket, @tls_context)
      s.sync_close = true
      s
    end

    def check_local_certificate_path!(s)
      raise MissingTLSCertificateFile, "cannot read client TLS certificate from #{s}" unless File.file?(s) && File.readable?(s)
    end

    def check_local_key_path!(s)
      raise MissingTLSKeyFile, "cannot read client TLS private key from #{s}" unless File.file?(s) && File.readable?(s)
    end

    def read_client_certificate!
      if @tls_certificate_path
        check_local_certificate_path!(@tls_certificate_path)
        @tls_certificate = File.read(@tls_certificate_path)
      end
    end

    def read_client_key!
      if @tls_key_path
        check_local_key_path!(@tls_key_path)
        @tls_key         = File.read(@tls_key_path)
      end
    end

    def initialize_tls_context(ctx)
      ctx.cert       = OpenSSL::X509::Certificate.new(@tls_certificate) if @tls_certificate
      ctx.key        = OpenSSL::PKey::RSA.new(@tls_key) if @tls_key
      ctx.cert_store = if @tls_certificate_store
                         @tls_certificate_store
                       else
                         initialize_tls_certificate_store(@tls_ca_certificates)
                       end

      if !@tls_certificate
        @logger.warn <<-MSG
        Using TLS but no client certificate is provided! If RabbitMQ is configured to verify peer
        certificate, connection upgrade will fail!
        MSG
      end
      if @tls_certificate && !@tls_key
        @logger.warn "Using TLS but no client private key is provided!"
      end

      # setting TLS/SSL version only works correctly when done
      # vis set_params. MK.
      ctx.set_params(:ssl_version => @opts.fetch(:tls_protocol, DEFAULT_TLS_PROTOCOL))
      ctx.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT) if @verify_peer

      ctx
    end

    def initialize_tls_certificate_store(certs)
      certs = certs.select { |path| File.readable? path }
      @logger.debug "Using CA certificates at #{certs.join(', ')}"
      if certs.empty?
        @logger.error "No CA certificates found, add one with :tls_ca_certificates"
      end
      OpenSSL::X509::Store.new.tap do |store|
        certs.each { |path| store.add_file(path) }
      end
    end

    def timeout_from(options)
      options[:connect_timeout] || options[:connection_timeout] || options[:timeout] || DEFAULT_CONNECTION_TIMEOUT
    end
  end
end