rapid7/metasploit-framework

View on GitHub
lib/rex/proto/amqp/version_0_9_1/client.rb

Summary

Maintainability
A
2 hrs
Test Coverage
class Rex::Proto::Amqp::Version091::Client

  require 'rex/stopwatch'
  require 'rex/proto/amqp/error'
  require 'rex/proto/amqp/version_0_9_1/frames'
  require 'rex/proto/amqp/version_0_9_1/client/channel'

  include Rex::Proto::Amqp

  # @return [String] The AMQP server host.
  attr_reader :host

  # @return [Integer] The AMQP server port.
  attr_reader :port

  # @return [Boolean] Whether or not SSL is used for the connection.
  attr_reader :ssl

  # @return [Rex::Socket::Comm] An optional, explicit object to use for creating the connection.
  attr_reader :comm

  # @return [Hash] A hash containing server information.
  attr_reader :server_info

  # @!attribute timeout
  #   @return [Integer] The communication timeout in seconds.
  attr_accessor :timeout

  # @param [String] host The AMQP server host.
  # @param [Integer,NilClass] port The AMQP server port or nil for automatic based on ssl.
  # @param [Boolean] ssl Whether or not SSL is used for the connection.
  # @param [String] ssl_version The SSL version to use.
  # @param [Rex::Socket::Comm] comm An optional, explicit object to use for creating the connection.
  # @param [Integer] timeout The communication timeout in seconds.
  def initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10)
    if port.nil?
      port = ssl ? 5671 : 5672
    end

    @host = host
    @port = port
    @context = context
    @ssl = ssl
    @ssl_version = ssl_version
    @comm = comm
    @server_info = {}
    @channels = {}
    @frame_queue = []
    @next_channel_id = 1
    @timeout = timeout
  end

  # Establish the connection to the remote server.
  #
  # @param [Integer] t An explicit timeout to use for the connection otherwise the default will be used.
  # @return [NilClass]
  def connect(t = -1)
    timeout = (t.nil? or t == -1) ? @timeout : t

    @conn = Rex::Socket::Tcp.create(
      'PeerHost'   => @host,
      'PeerPort'   => @port.to_i,
      'Context'    => @context,
      'SSL'        => @ssl,
      'SSLVersion' => @ssl_version,
      'Timeout'    => timeout,
      'Comm'       => @comm
    )

    nil
  end

  # Close the connection to the remote server.
  #
  # @return [NilClass]
  def close
    if @conn && !@conn.closed?
      @conn.shutdown
      @conn.close
    end

    @conn = nil
  end

  # Login to the remote server. The connection will be started automatically if it has not already been established.
  #
  # @param [String] username The username to authenticate with.
  # @param [String] password The password to authenticate with.
  # @param [String] vhost The virtual host to connect to.
  # @return [Boolean] Whether or not authentication was successful.
  def login(username, password, vhost: '/')
    connect if @conn.nil?

    send_protocol_header
    connection_start(username, password)

    resp = recv_frame
    if is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose)
      close
      return false
    elsif !is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionTune)
      raise Error::UnexpectedReplyError.new(resp)
    end

    @server_info[:tuning] = resp.arguments.snapshot
    connection_tune_ok = Version091::Frames::AmqpVersion091MethodFrame.new
    connection_tune_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionTuneOk.new(
      resp.arguments.snapshot
    )
    send_frame(connection_tune_ok)

    connection_open(vhost)

    true
  end

  # Send a frame to the connected peer.
  #
  # @param [#to_binary_s] frame The frame to send.
  # @return [Integer] The number of bytes written.
  def send_frame(frame)
    @conn.put(frame.to_binary_s)
  end

  # Receive a frame from the connected peer with a timeout.
  #
  # @return [BinData::Record] The frame that was received.
  def recv_frame
    remaining = @timeout
    header_raw, elapsed_time = Rex::Stopwatch.elapsed_time do
      num_bytes = Version091::Frames::AmqpVersion091FrameHeader.new.num_bytes
      @conn.get_once(num_bytes, remaining)
    end
    remaining -= elapsed_time

    header = Version091::Frames::AmqpVersion091FrameHeader.read(header_raw)
    body = ''
    while (body.size < (header.frame_size + 1)) && remaining > 0
      chunk, elapsed_time = Rex::Stopwatch.elapsed_time do
        @conn.read((header.frame_size + 1) - body.size, remaining)
      end
      remaining -= elapsed_time
      body << chunk
    end

    unless body.size == (header.frame_size + 1)
      if remaining <= 0
        raise Rex::TimeoutError, 'Failed to read the response data due to timeout.'
      end

      Error::InvalidFrameError.new
    end

    case header.frame_type
    when 1
      frame = Version091::Frames::AmqpVersion091MethodFrame.read(header.to_binary_s + body)
    when 2
      frame = Version091::Frames::AmqpVersion091ContentHeaderFrame.read(header.to_binary_s + body)
    when 3
      frame = Version091::Frames::AmqpVersion091ContentBodyFrame.read(header.to_binary_s + body)
    end

    frame
  end

  # Open a new channel.
  #
  # @return [Channel] The newly opened channel.
  def channel_open
    ch_open = Version091::Frames::AmqpVersion091MethodFrame.new
    ch_open.header.frame_channel = cid = @next_channel_id
    ch_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelOpen.new
    send_frame(ch_open)
    resp = recv_frame

    unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelOpenOk)
      raise Error::UnexpectedReplyError.new(resp)
    end

    @next_channel_id += 1
    @channels[cid] = Channel.new(self, cid)
  end

  # Close an established channel.
  #
  # @param [Channel] channel The channel object to close.
  # @return [NilClass]
  def channel_close(channel)
    ch_close = Version091::Frames::AmqpVersion091MethodFrame.new
    ch_close.header.frame_channel = channel.id
    ch_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelClose.new
    send_frame(ch_close)
    resp = recv_frame

    unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelCloseOk)
      raise Error::UnexpectedReplyError.new(resp)
    end

    @channels.delete(channel.id)
    nil
  end

  # Close the established connection by performing the necessary handshake.
  #
  # @return [NilClass]
  def connection_close
    send_connection_close
    recv_connection_close_ok

    nil
  end

  # Open a connection by performing the necessary handshake.
  #
  # @param [String] vhost The virtual host to connect to.
  # @return [NilClass]
  def connection_open(vhost)
    send_connection_open(virtual_host: vhost)
    recv_connection_open_ok

    nil
  end

  # Start a connection by performing the necessary handshake. The caller needs to validate the response to ensure
  # authentication succeeded.
  #
  # @param [String] username The username to authenticate with.
  # @param [String] password The password to authenticate with.
  # @return [NilClass]
  def connection_start(username, password)
    recv_connection_start

    unless @server_info[:security_mechanisms].include?('PLAIN')
      # PLAIN is supported by default, others can be added via plugins
      raise Error::NegotiationError.new('There are no mutually supported authentication mechanisms.')
    end

    # prefer en_US if it's available, otherwise select one at random
    if @server_info[:locales].include?('en_US')
      locale = 'en_US'
    else
      locale = @server_info[:locales].sample
    end

    send_connection_start_ok({
      # Per the spec, these properties "should" contain: product, version, platform, copyright, and information
      client_properties: [
        { name: 'capabilities', data: { data_type: 'F'.ord, data: [
          { name: 'authentication_failure_close', data: { data_type: 't'.ord, data: true } },
          { name: 'basic.nack', data: { data_type: 't'.ord, data: true } },
          { name: 'connection.blocked', data: { data_type: 't'.ord, data: true } },
          { name: 'consumer_cancel_notify', data: { data_type: 't'.ord, data: true } },
          { name: 'publisher_confirms', data: { data_type: 't'.ord, data: true } }
        ] } }
      ],
      # https://www.rabbitmq.com/access-control.html#mechanisms
      mechanism: 'PLAIN',
      response: build_sasl_response_plain(username, password),
      locale: locale
    })
  end

  def recv_connection_close_ok
    resp = recv_frame
    unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionCloseOk)
      raise Error::UnexpectedReplyError.new(resp)
    end

    resp
  end

  def recv_connection_open_ok
    resp = recv_frame
    unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpenOk)
      raise Error::UnexpectedReplyError.new(resp)
    end

    resp
  end

  def recv_connection_start
    resp = recv_frame
    unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionStart)
      raise Error::UnexpectedReplyError.new(resp)
    end

    @server_info = {
      properties: resp.arguments.server_properties.coerce,
      security_mechanisms: resp.arguments.mechanisms.split(' '),
      locales: resp.arguments.locales.split(' ')
    }

    resp
  end

  def send_connection_close(arguments={})
    conn_close = Version091::Frames::AmqpVersion091MethodFrame.new
    conn_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose.new(arguments)
    send_frame(conn_close)

    nil
  end

  def send_connection_open(arguments={})
    connection_open = Version091::Frames::AmqpVersion091MethodFrame.new
    connection_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpen.new(arguments)
    send_frame(connection_open)

    nil
  end

  def send_connection_start_ok(arguments={})
    connection_start_ok = Version091::Frames::AmqpVersion091MethodFrame.new
    connection_start_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionStartOk.new(arguments)
    send_frame(connection_start_ok)

    nil
  end

  def send_protocol_header
    send_frame(Version091::Frames::AmqpVersion091ProtocolHeader.new)

    nil
  end

  private

  # Build a SASL authentication response for a username and password.
  #
  # @param [String] username
  # @param [String] password
  # @return [String]
  def build_sasl_response_plain(username, password)
    # per the SASL spec, username and password must be UTF-8 encoded
    # see: https://www.rfc-editor.org/rfc/rfc4616#section-2
    "\x00".b + username.encode('UTF-8') + "\x00".b + password.encode('UTF-8')
  end

  # Check if a frame is a method frame, and (optionally) if it's of the specified type.
  #
  # @param [BinData::Record] resp The object to verify is a method frame.
  # @param [BinData::Record] klass The method argument class to check.
  # @return [Boolean]
  def is_method_frame?(resp, klass=nil)
    return false unless resp.is_a?(Version091::Frames::AmqpVersion091MethodFrame)

    if klass
      return false unless resp.class_id == klass::CLASS_ID
      return false unless resp.method_id == klass::METHOD_ID
    end

    true
  end
end