EGI-FCTF/oneacct_export

View on GitHub
lib/one_worker.rb

Summary

Maintainability
B
5 hrs
Test Coverage
file_dir_name = File.dirname(__FILE__)
$LOAD_PATH.unshift(file_dir_name) unless $LOAD_PATH.include?(file_dir_name)

require 'sidekiq'
require 'one_data_accessor'
require 'one_writer'
require 'sidekiq_conf'
require 'oneacct_exporter/log'
require 'oneacct_exporter/version'
require 'settings'
require 'data_validators/apel_data_validator'
require 'data_validators/pbs_data_validator'
require 'data_validators/logstash_data_validator'
require 'output_types'
require 'errors'
require 'ipaddr'

# Sidekiq worker class
class OneWorker
  include Sidekiq::Worker
  include OutputTypes
  include Errors

  sidekiq_options retry: 5, dead: false, \
                  queue: (Settings['sidekiq'] && Settings.sidekiq['queue']) ? Settings.sidekiq['queue'].to_sym : :default

  IGNORED_NETWORKS=["10.0.0.0/8","172.16.0.0/12","192.168.0.0/16"].map {|x| IPAddr.new x}

  # Prepare data that are common for all the output types
  def common_data
    data = {}
    data['oneacct_export_version'] = ::OneacctExporter::VERSION

    data
  end

  # Prepare data that are specific for output type and common for every virtual machine
  def output_type_specific_data
    data = {}
    if PBS_OT.include?(Settings.output['output_type']) && Settings.output['pbs']
      data['realm'] = Settings.output.pbs['realm']
      data['pbs_queue'] = Settings.output.pbs['queue']
      data['scratch_type'] = Settings.output.pbs['scratch_type']
      data['host'] = Settings.output.pbs['host_identifier']
    end

    if APEL_OT.include?(Settings.output['output_type'])
      data['endpoint'] = Settings.output.apel['endpoint'].chomp('/')
      data['site_name'] = Settings.output.apel['site_name']
      data['cloud_type'] = Settings.output.apel['cloud_type']
      data['cloud_compute_service'] = Settings.output.apel['cloud_compute_service']
    end

    if LOGSTASH_OT.include?(Settings.output['output_type'])
      data['host'] = Settings.output.logstash['host']
      data['port'] = Settings.output.logstash['port']
    end

    data
  end

  # Create mapping of user ID and specified element
  #
  # @return [Hash] created map
  def create_user_map(oda)
    logger.debug('Creating user map.')
    create_map(OpenNebula::UserPool, 'TEMPLATE/X509_DN', oda)
  end

  # Create mapping of image ID and specified element
  #
  # @return [Hash] created map
  def create_image_map(oda)
    logger.debug('Creating image map.')
    create_map(OpenNebula::ImagePool, 'TEMPLATE/VMCATCHER_EVENT_AD_MPURI', oda)
  end

  def create_cluster_map(oda)
    logger.debug('Creating cluster map.')
    create_map(OpenNebula::ClusterPool, 'TEMPLATE/APEL_SITE_NAME', oda)
  end

  # Generic method for mapping creation
  def create_map(pool_type, mapping, oda)
    oda.mapping(pool_type, mapping)
  rescue => e
    msg = "Couldn't create map: #{e.message}. "\
      'Stopping to avoid malformed records.'
    logger.error(msg)
    raise msg
  end

  # Load virtual machine with specified ID
  #
  # @return [OpenNebula::VirtualMachine] virtual machine
  def load_vm(vm_id, oda)
    oda.vm(vm_id)
  rescue => e
    logger.error("Couldn't retrieve data for vm with id: #{vm_id}. #{e.message}. Skipping.")
    return nil
  end

  # Obtain and parse required data from vm
  #
  # @return [Hash] required data from virtual machine
  def process_vm(vm, user_map, image_map, cluster_map, benchmark_map)
    data = common_data
    data.merge! output_type_specific_data

    data['vm_uuid'] = vm['ID']
    data['start_time'] = vm['STIME']
    data['end_time'] = vm['ETIME']
    data['machine_name'] = vm['DEPLOY_ID']
    data['user_id'] = vm['UID']
    data['group_id'] = vm['GID']
    data['user_dn'] = vm['USER_TEMPLATE/USER_X509_DN']
    data['user_dn'] ||= user_map[data['user_id']]
    data['user_name'] = vm['UNAME']
    data['group_name'] = vm['GNAME']
    data['status_code'] = vm['STATE']
    data['status'] = vm.state_str
    data['cpu_count'] = vm['TEMPLATE/VCPU']
    data['network_inbound'] = vm['MONITORING/NETTX']
    data['network_outbound'] = vm['MONITORING/NETRX']
    data['memory'] = vm['TEMPLATE/MEMORY']
    data['image_name'] = vm['TEMPLATE/DISK[1]/VMCATCHER_EVENT_AD_MPURI']
    data['image_name'] ||= image_map[vm['TEMPLATE/DISK[1]/IMAGE_ID']]
    data['image_name'] ||= mixin(vm)
    data['image_name'] ||= vm['TEMPLATE/DISK[1]/IMAGE_ID']
    data['history'] = history_records(vm)
    data['disks'] = disk_records(vm)
    data['number_of_public_ips'] = number_of_public_ips(vm)

    benchmark = benchmark_map[vm['HISTORY_RECORDS/HISTORY[last()]/HID']]
    data['benchmark_type'] = benchmark[:benchmark_type]
    data['benchmark_value'] = benchmark[:benchmark_value]

    site_name = cluster_map[vm['HISTORY_RECORDS/HISTORY[1]/CID']]
    data['site_name'] = site_name if site_name

    data
  end

  # Returns an array of history records from vm
  #
  # @param [OpenNebula::VirtualMachine] vm virtual machine
  #
  # @return [Array] array of hashes representing vm's history records
  def history_records(vm)
    history = []
    vm.each 'HISTORY_RECORDS/HISTORY' do |h|
      history_record = {}
      history_record['start_time'] = h['STIME']
      history_record['end_time'] = h['ETIME']
      history_record['rstart_time'] = h['RSTIME']
      history_record['rend_time'] = h['RETIME']
      history_record['seq'] = h['SEQ']
      history_record['hostname'] = h['HOSTNAME']

      history << history_record
    end

    history
  end

  # Returns an array of disk records from vm
  #
  # @param [OpenNebula::VirtualMachine] vm virtual machine
  #
  # @return [Array] array of hashes representing vm's disk records
  def disk_records(vm)
    disks = []
    vm.each 'TEMPLATE/DISK' do |d|
      disk = {}
      disk['size'] = d['SIZE']

      disks << disk
    end

    disks
  end

  # Returns number of unique public ip addresses of vm
  #
  # @param [OpenNebula::VirtualMachine] vm virtual machine
  #
  # @return [Integer] number of unique public ip addresses represented by integer
  def number_of_public_ips(vm)
    all_ips = []
    vm.each 'TEMPLATE/NIC' do |nic|
      nic.each 'IP' do |ip|
        all_ips << ip.text if ip_public?(ip)
      end
    end

    all_ips.uniq.length
  end

  # Look for 'os_tpl' OCCI mixin to better identify virtual machine's image
  #
  # @param [OpenNebula::VirtualMachine] vm virtual machine
  #
  # @return [NilClass, String] if found, mixin identifying string, nil otherwise
  def mixin(vm)
    mixin_locations = %w(USER_TEMPLATE/OCCI_COMPUTE_MIXINS USER_TEMPLATE/OCCI_MIXIN TEMPLATE/OCCI_MIXIN)

    mixin_locations.each do |mixin_location|
      vm.each mixin_location do |mixin|
        mixin = mixin.text.split
        mixin.select! { |line| line.include? '/occi/infrastructure/os_tpl#' }
        return mixin.first unless mixin.empty?
      end
    end

    nil # nothing found
  end

  # Sidekiq specific method, specifies the purpose of the worker
  #
  # @param [String] vms IDs of virtual machines to process in form of numbers separated by '|'
  # (easier for cooperation with redis)
  # @param [String] file_number number of the output file
  def perform(vms, file_number)
    OneacctExporter::Log.setup_log_level(logger)

    vms = vms.split('|')

    oda = OneDataAccessor.new(false, logger)
    user_map = create_user_map(oda)
    image_map = create_image_map(oda)
    cluster_map = create_cluster_map(oda)
    benchmark_map = oda.benchmark_map

    data = []

    vms.each do |vm_id|
      vm = load_vm(vm_id, oda)
      next unless vm

      begin
        logger.debug("Processing vm with id: #{vm_id}.")
        vm_data = process_vm(vm, user_map, image_map, cluster_map, benchmark_map)

        validator = DataValidators::ApelDataValidator.new(logger) if APEL_OT.include?(Settings.output['output_type'])
        validator = DataValidators::PbsDataValidator.new(logger) if PBS_OT.include?(Settings.output['output_type'])
        validator = DataValidators::LogstashDataValidator.new(logger) if LOGSTASH_OT.include?(Settings.output['output_type'])

        vm_data = validator.validate_data(vm_data) if validator
      rescue Errors::ValidationError => e
        logger.error("Error occured during processing of vm with id: #{vm_id}. #{e.message}")
        next
      end

      logger.debug("Adding vm with data: #{vm_data} for export.")
      data << vm_data
    end

    write_data(data, file_number) unless data.empty?
  end

  # Write processed data into output directory
  #
  # @param [Hash] data data to be written into file
  # @param [Fixnum] file_number sequence number of file data will be written to
  def write_data(data, file_number)
    logger.debug('Creating writer...')
    ow = OneWriter.new(data, file_number, logger)
    ow.write
  rescue => e
    msg = "Cannot write result: #{e.message}"
    logger.error(msg)
    raise msg
  end

  private

  # Check if IP is public
  #
  # @param [String] ip address
  #
  # @return [Bool] true or false
  def ip_public?(ip)
    ip_obj = IPAddr.new(ip.text)
    IGNORED_NETWORKS.each do |net|
      return false if net.include? ip_obj
    end
    true
  end
end