ashrithr/awscli

View on GitHub
lib/awscli/emr.rb

Summary

Maintainability
F
3 days
Test Coverage
module Awscli
  module Emr
    class EMR
      def initialize(connection)
        @conn = connection
      end

      def list(options)
        validate_job_ids options[:job_flow_ids] if options[:job_flow_ids]
        opts = Marshal.load(Marshal.dump(options))
        opts.reject! { |k| k == 'table' } if options[:table]
        if job_flow_ids = opts.delete(:job_flow_ids)
          opts.merge!('JobFlowIds' => job_flow_ids)
        end
        if job_flow_status = opts.delete(:job_flow_status)
          opts.merge!('JobFlowStates' => job_flow_status)
        end
        if options[:table]
          puts 'For detailed information, dont pass --table option'
          job_flows = @conn.describe_job_flows(opts).body['JobFlows']
          table_data = Array.new
          unless job_flows.empty?
            job_flows.each do |job_flow|
              table_data << {
                              :job_flow_id => job_flow['JobFlowId'],
                              :name => job_flow['Name'],
                              :instance_count => job_flow['Instances']['InstanceCount'],
                              :master_dns => job_flow['Instances']['MasterPublicDnsName'],
                              :ec2_key_name => job_flow['Instances']['Ec2KeyName'],
                              :state => job_flow['ExecutionStatusDetail']['State']
                            }
            end
          end
          Formatador.display_table(table_data, [:job_flow_id, :name, :state, :instance_count, :master_dns, :ec2_key_name])
        else
          puts 'For less information, pass --table option'
          puts @conn.describe_job_flows(opts).body['JobFlows'].to_yaml
        end
      end

      def create_job_flow(options)
        # => BOOTSTRAP ACTIONS
        boot_strap_actions = []
        if options[:bootstrap_actions]
          options[:bootstrap_actions].each do |step|
            boot_strap_actions << parse_boot_strap_actions(step)
          end
        end

        # => STEPS
        steps = []
        if options[:custom_jar_steps]
          options[:custom_jar_steps].each do |step|
            steps << parse_custom_jar(step)
          end
        end
        if options[:hive_interactive]
          steps << hive_install(options[:hadoop_version])
        end
        if options[:pig_interactive]
          steps << pig_install
        end
        if options[:hive_steps]
          steps << hive_install(options[:hadoop_version]) unless options[:hive_interactive]
          options[:hive_steps].each do |step|
            steps << parse_hive_steps(step)
          end
        end
        if options[:pig_steps]
          steps << pig_install unless options[:pig_interactive]
          options[:pig_steps].each do |step|
            steps << parse_pig_steps(step)
          end
        end
        if options[:streaming_steps]
          options[:streaming_steps].each do |step|
            steps << parse_streaming_steps(step)
          end
        end
        if options[:hbase_install]
          boot_strap_actions << hbase_install_boot_strap
          steps << hbase_install_steps
          #validate hadoop version and instance size
          abort "Invalid hadoop version #{options[:hadoop_version]}, supported Hadoop Versions for HBase are: #{Awscli::EMR::HBASE_SUPPORTED_HADOOP.join(',')}" unless Awscli::EMR::HBASE_SUPPORTED_HADOOP.include?(options[:hadoop_version])
          options[:instance_groups] && parse_instance_groups(options[:instance_groups]).each do |group|
            unless is_valid_instance_type?(group['InstanceType'])
              abort "Instance type #{group['InstanceType']} is not compatible with HBase, instance size should be equal or greater than m1.large"
            end
          end
          if options[:master_instance_type]
            unless is_valid_instance_type?(options[:master_instance_type])
              abort "Instance type #{options[:master_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
            end
          end
          if options[:slave_instance_type]
            unless is_valid_instance_type?(options[:slave_instance_type])
              abort "Instance type #{options[:slave_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
            end
          end
          # => HBase backups
          if options[:hbase_backup_schedule]
            # Backup
            if options[:hbase_consistent_backup]
              steps << parse_hbase_backup(options[:hbase_backup_schedule], true)
            else
              steps << parse_hbase_backup(options[:hbase_backup_schedule])
            end
          elsif options[:hbase_backup_restore]
            # Restore
            steps << parse_hbase_restore(options[:hbase_backup_restore])
          end
        end

        # => INSTANCES
        instances = Hash.new
        instances['HadoopVersion'] = options[:hadoop_version]
        if options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]  #then job flow should not be terminated
          instances['KeepJobFlowAliveWhenNoSteps'] = true
        else
          instances['KeepJobFlowAliveWhenNoSteps'] = options[:alive]
        end
        instances['Ec2KeyName'] = options[:instance_ec2_key_name] if options[:instance_ec2_key_name]
        instances['InstanceCount'] = options[:instance_count] if options[:instance_count]
        instances['MasterInstanceType'] = options[:master_instance_type] if options[:master_instance_type]
        instances['SlaveInstanceType'] = options[:slave_instance_type] if options[:slave_instance_type]
        instances['TerminationProtected'] = options[:termination_protection] if options[:termination_protection]
        # => Instance Groups
        instances['InstanceGroups'] = parse_instance_groups(options[:instance_groups]) if options[:instance_groups]

        # => Build final request
        job_flow = Hash.new
        job_flow['AmiVersion'] = Awscli::EMR::HADOOP_AMI_MAPPING[options[:hadoop_version]]
        job_flow['LogUri'] = options[:log_uri] if options[:log_uri]
        job_flow['BootstrapActions'] = boot_strap_actions if options[:bootstrap_actions] or options[:hbase_install]
        job_flow['Instances'] = instances
        job_flow['Steps'] = steps
        if options[:alive] or options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]
          @conn.run_job_flow("#{options[:name]} (requires manual termination)", job_flow)
        else
          @conn.run_job_flow(options[:name], job_flow)
        end
        puts "Create JobFlow '#{options[:name]}' Successfully!"
      end

      def add_instance_group(options)
        opts = Marshal.load(Marshal.dump(options))
        opts.reject! { |key| key == 'job_flow_id' }
        opts.reject! { |key| key == 'region' }
        abort 'invalid job id' unless @conn.describe_job_flows.body['JobFlows'].map { |job| job['JobFlowId'] }.include?(options[:job_flow_id])
        abort 'invalid instance type' unless Awscli::Instances::INSTANCE_SIZES.include?(options[:instance_type])
        if instance_count = opts.delete(:instance_count)
          opts.merge!('InstanceCount' => instance_count)
        end
        if instance_type = opts.delete(:instance_type)
          opts.merge!('InstanceType' => instance_type)
        end
        if instance_role = opts.delete(:instance_role)
          opts.merge!('InstanceRole' => instance_role)
        end
        if name = opts.delete(:name)
          opts.merge!('Name' => name)
        end
        if bid_price = opts.delete(:bid_price)
          opts.merge!('BidPrice' => bid_price)
          opts.merge!('MarketType' => 'SPOT')
        else
          opts.merge!('MarketType' => 'ON_DEMAND')
        end
        (instance_groups ||= []) << opts
        @conn.add_instance_groups(options[:job_flow_id], 'InstanceGroups' => instance_groups)
        puts "Added instance group to job flow(with id): #{options[:job_flow_id]}"
      end

      def add_steps(job_flow_id, job_steps)
        validate_job_ids job_flow_id
        @conn.add_job_flow_steps(job_flow_id, 'Steps' => parse_custom_jar(job_steps))
        puts "Added step to job flow id: #{job_flow_id}"
      end

      def modify_instance_group(options)
        abort "Invalid instance group id: #{options[:instance_group_id]}" unless validate_instance_group_id?(options[:instance_group_id])
        @conn.modify_instance_groups(
            'InstanceGroups' => [
              'InstanceCount' => options[:instance_count],
              'InstanceGroupId' => options[:instance_group_id]
            ]
        )
      rescue Excon::Errors::BadRequest
        puts "[Error]: #{$!}"
      else
        puts "Modified instance group #{options[:instance_group_id]} size to #{options[:instance_count]}"
      end

      def set_termination_protection(job_flow_ids, terminate_protection)
        validate_job_ids job_flow_ids
        @conn.set_termination_protection(
            terminate_protection,
            {
                'JobFlowIds' => job_flow_ids
            }
        )
        terminate_protection ?
          puts("Termination protection flag added to job_flows: #{job_flow_ids.join(',')}") :
          puts("Termination protection flag removed from job_flows: #{job_flow_ids.join(',')}")
      end

      def add_instance_groups(job_flow_id, groups)
        validate_job_ids job_flow_id
        instance_groups = parse_instance_groups(groups)
        @conn.add_instance_groups(job_flow_id, 'InstanceGroups' => instance_groups)
      end

      def delete(job_ids)
        validate_job_ids job_ids
        @conn.terminate_job_flows('JobFlowIds' => job_ids)
        puts "Terminated Job Flows: #{job_ids.join(',')}"
      end

      private

      def validate_job_ids(job_ids)
        available_job_ids = @conn.describe_job_flows.body['JobFlows'].map { |job| job['JobFlowId'] }
        abort 'invalid job id\'s' unless available_job_ids.each_cons(job_ids.size).include? job_ids
      end

      def validate_instance_group_id?(group_id)
        @conn.describe_job_flows.body['JobFlows'].map { |j| j['Instances']['InstanceGroups'].map {|g| g['InstanceGroupId']} }.flatten.include?(group_id)
      end

      def is_valid_instance_type?(instance_type)
        ! Awscli::EMR::HBASE_INVALID_INSTANCES.member?(instance_type)
      end

      def parse_instance_groups(groups)
        #parse instance_groups => instance_count,instance_role(MASTER | CORE | TASK),instance_type,name,bid_price
        instance_groups = []
        groups.each do |group|
          instance_count, instance_role, instance_size, name, bid_price = group.split(',')
          if instance_count.empty? or instance_role.empty? or instance_size.empty?
            abort 'instance_count, instance_role and instance_size are required'
          end
          abort "Invalid instance role: #{instance_role}" unless %w(MASTER CORE TASK).include?(instance_role.upcase)
          abort "Invalid instance type: #{instance_size}" unless Awscli::Instances::INSTANCE_SIZES.include?(instance_size)
          if bid_price
            instance_groups << {
                'BidPrice' => bid_price,
                'InstanceCount' => instance_count.to_i,
                'InstanceRole' => instance_role,
                'InstanceType' => instance_size,
                'MarketType' => 'SPOT',
                'Name' => name || "awscli-emr-#{instance_role}-group",
            }
          else
            instance_groups << {
                'InstanceCount' => instance_count.to_i,
                'InstanceRole' => instance_role,
                'InstanceType' => instance_size,
                'MarketType' => 'ON_DEMAND',
                'Name' => name || "awscli-emr-#{instance_role}-group",
            }
          end
        end
        instance_groups
      end

      def parse_boot_strap_actions(step)
        #parse => name,bootstrap_action_path,bootstrap_action_args
        name, path, *args = step.split(',')
        if name.empty? or path.empty?
          abort 'name and path are required'
        end
        boot_strap_actions = {
          'Name' => name,
          'ScriptBootstrapAction' => {
            'Args' => args || [],
            'Path' => path
          }
        }
      end

      def parse_custom_jar(steps)
        #parse jar_path(s3)*,name_of_step*,main_class,action_on_failure(TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE),arg1=agr2=arg3,properties(k=v,k=v)
        abort "invalid step pattern, expecting 'jar_path(s3)*,name_of_step*,main_class,action_on_failure,arg1=agr2=arg3,prop_k1=prop_v1,prop_k2=prop_v2)'" unless step =~ /(.*),(.*),(.*),(.*),(.*),(.*),(.*)/
        jar, name, main_class, action_on_failure, extra_args, *job_conf = step.split(',')
        if jar.empty? or name.empty?
          abort 'jar and name are required for a step'
        end
        step_to_run = {
          'ActionOnFailure' => action_on_failure.empty? ? 'TERMINATE_JOB_FLOW' : action_on_failure,
          'Name' => name,
          'HadoopJarStep' => {
            'Jar' => jar,
            'Args' => extra_args.empty? ? [] : extra_args.split('='),
            'Properties' => []
          }
        }
        #steps['HadoopJarStep']['Args'] + extra_args.split('=') unless extra_args
        step_to_run['HadoopJarStep']['MainClass'] = main_class unless main_class.empty?
        unless job_conf.empty?
          job_conf.each do |kv_pair|
            properties = {}
            properties['Key'], properties['Value'] = kv_pair.split('=')
            step_to_run['HadoopJarStep']['Properties'] << properties
          end
        end
        step_to_run
      end

      def parse_hive_steps(step)
        #parse script_path(s3)*,input_path(s3),output_path(s3),'-d','args1','-d','args2','-d','arg3'
        path, input_path, output_path, *args = step.split(',')
        abort 'path to the hive script is required' if path.empty?
        hive_step = {
          'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
          'Name' => 'awscli-emr-hive-step',
          'HadoopJarStep' => {
            "Jar" => 's3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            "Args" => [
              's3://us-west-1.elasticmapreduce/libs/hive/hive-script',
              '--base-path',
              's3://us-west-1.elasticmapreduce/libs/hive/',
              '--run-hive-script',
              '--args',
              '-f',
              path
            ]
          }
        }
        hive_step['HadoopJarStep']['Args'] << '-d' << "INPUT=#{input_path}" unless input_path.empty?
        hive_step['HadoopJarStep']['Args'] << '-d' << "OUTPUT=#{output_path}" unless output_path.empty?
        hive_step['HadoopJarStep']['Args'] += args unless args.empty?
        hive_step
      end

      def parse_pig_steps(step)
        #parse script_path(s3)*,input_path(s3),output_path(s3),'-p','args1','-p','args2','-p','arg3'
        path, input_path, output_path, *args = step.split(',')
        abort 'path to the hive script is required' if path.empty?
        pig_step = {
          'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
          'Name' => 'awscli-emr-pig-step',
          'HadoopJarStep' => {
            "Jar" => 's3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            "Args" => %w(s3://us-west-1.elasticmapreduce/libs/pig/pig-script --base-path s3://us-west-1.elasticmapreduce/libs/pig/ --run-pig-script --pig-versions latest --args)
          }
        }
        pig_step['HadoopJarStep']['Args'] << '-p' << "INPUT=#{input_path}" unless input_path.empty?
        pig_step['HadoopJarStep']['Args'] << '-p' << "OUTPUT=#{output_path}" unless output_path.empty?
        pig_step['HadoopJarStep']['Args'] += args unless args.empty?
        pig_step['HadoopJarStep']['Args'] << path
        pig_step
      end

      def parse_streaming_steps(step)
        #parse input*:output*:mapper*:reducer*:extra_arg1:extra_arg2
        input, output, mapper, reducer, *args = step.split(',')
        #input, output, mapper, reducer, args, *job_conf = step.split(',')
        if input.empty? or output.empty? or mapper.empty? or reducer.empty?
          abort 'input, output, mapper and reducer are required'
        end
        streaming_step = {
          'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
          'Name' => 'awscli-emr-streaming-step',
          'HadoopJarStep' => {
            "Jar" => '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
            "Args" => [
              '-input', input,
              '-output', output,
              '-mapper', mapper,
              '-reducer', reducer
            ]
          }
        }
        streaming_step['HadoopJarStep']['Args'] + args unless args.empty?
        #TODO: Add -jobconf params as k=v,k=v,k=v
        #streaming_step['HadoopJarStep']['Args'] << '-job_conf' + job_conf if job_conf.empty?
        streaming_step
      end

      def hive_install(hadoop_version)
        {
          'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
          'Name' => 'awscli-emr-hive-setup',
          'HadoopJarStep' => {
              'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script',
                         '--base-path',
                         's3://us-east-1.elasticmapreduce/libs/hive/',
                         '--install-hive',
                         '--hive-versions',
                         Awscli::EMR::HADOOP_HIVE_COMPATIBILITY[hadoop_version]
                        ],
              'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
          }
        }
      end

      def pig_install
        {
          'ActionOnFailure' => 'TERMINATE_JOB_FLOW',
          'Name' => 'awscli-emr-pig-setup',
          'HadoopJarStep' => {
              'Args' => %w(s3://us-east-1.elasticmapreduce/libs/pig/pig-script --base-path s3://us-east-1.elasticmapreduce/libs/pig/ --install-pig --pig-versions latest),
              'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
          }
        }
      end

      def hbase_install_boot_strap
        {
          'Name' => 'awscli-emr-install-hbase',
          'ScriptBootstrapAction' => {
              'Args' => [],
              'Path' => 's3://us-west-1.elasticmapreduce/bootstrap-actions/setup-hbase'
          }
        }
      end

      def hbase_install_steps
        {
          'ActionOnFailure' => 'CANCEL_AND_WAIT',
          'Name' => 'awscli-emr-start-hbase',
          'HadoopJarStep' => {
              'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
              'Args' => %w(emr.hbase.backup.Main --start-master)
          }
        }
      end

      def parse_hbase_backup(backup_step, consistent=false)
        #parse frequency*,frequency_unit*(Days|Hrs|Mins),path(s3)*,start_time*(now|iso-format)
        frequency, frequency_unit, path, start_time = backup_step.split(',')
        abort 'Invalid backup step pattern, expecting frequency,frequency_unit(days|hrs|mins),path(s3),start_time(now|iso-format)' unless backup_step =~ /(.*),(.*),(.*),(.*)/
        if frequency.empty? or frequency_unit.empty? or path.empty? or start_time.empty?
          abort 'frequency, frequency_unit, path, start_time are required to perform a backup'
        end
        abort "Invalid frequency unit : #{frequency_unit}" unless %w(days hrs mins).include?(frequency_unit)
        hbase_backup_step = {
            'Name' => 'awscli-emr-schedule-hbase-backup',
            'ActionOnFailure' => 'CANCEL_AND_WAIT',
            'HadoopJarStep' => {
                'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
                'Args' => ['emr.hbase.backup.Main', '--backup-dir', path, '--set-scheduled-backup', true, '--full-backup-time-interval',
                           frequency, '--incremental-backup-time-unit', frequency_unit, '--start-time', start_time]
            }
        }
        hbase_backup_step['HadoopJarStep']['Args'] << '--consistent' if consistent
        hbase_backup_step
      end

      def parse_hbase_restore(restore_step)
        #parse path(s3)*,version
        path, version = restore_step.split(',')
        if path.empty?
          abort 'path is required'
        end
        hbase_restore_step = {
            'Name' => 'awscli-emr-restore-hbase-backup',
            'ActionOnFailure' => 'CANCEL_AND_WAIT',
            'HadoopJarStep' => {
                'Jar' => '/home/hadoop/lib/hbase-0.92.0.jar',
                'Args' => ['emr.hbase.backup.Main', '--restore', '--backup-dir', path]
            }
        }
        if defined?(version).nil?
          hbase_restore_step['HadoopJarStep']['Args'] << '--backup-version' << version unless version.empty?
        end
        hbase_restore_step
      end
    end
  end
end