joker1007/rukawa

View on GitHub
lib/rukawa/cli.rb

Summary

Maintainability
A
25 mins
Test Coverage
require 'thor'
require 'rukawa/runner'
require 'rukawa/overview'

module Rukawa
  class Cli < Thor

    def self.base_options
      method_option :config, type: :string, default: nil, desc: "If this options is not set, try to load ./rukawa.rb"
      method_option :job_dirs, type: :array, default: [], desc: "Load job directories", banner: "JOB_DIR1 JOB_DIR2"
    end

    def self.run_options
      method_option :concurrency, aliases: "-c", type: :numeric, default: nil, desc: "Default: cpu count"
      method_option :variables, aliases: "--var", type: :hash, default: {}, banner: "KEY:VALUE KEY:VALUE"
      method_option :varfile, type: :string, default: nil, desc: "variable definition file. ex (variables.yml, variables.json)"
      method_option :batch, aliases: "-b", type: :boolean, default: false, desc: "If batch mode, not display running status"
      method_option :log, aliases: "-l", type: :string, desc: "Default: ./rukawa.log"
      method_option :stdout, type: :boolean, default: false, desc: "Output log to stdout"
      method_option :syslog, type: :boolean, default: false, desc: "Output log to syslog"
      method_option :dot, aliases: "-d", type: :string, default: nil, desc: "Output job status by dot format"
      method_option :format, aliases: "-f", type: :string, desc: "Output job status format: png, svg, pdf, ... etc"
      method_option :refresh_interval, aliases: "-r", type: :numeric, default: 3, desc: "Refresh interval for running status information"
    end

    desc "run JOB_NET_NAME [JOB_NAME] [JOB_NAME] ...", "Run jobnet. If JOB_NET is set, resume from JOB_NAME"
    map "run" => "_run"
    base_options
    run_options
    def _run(job_net_name, *job_name)
      load_config
      set_logger
      set_concurrency
      load_job_definitions

      job_net_class = get_class(job_net_name)
      job_classes = job_name.map { |name| get_class(name) }
      job_net = job_net_class.new(variables: variables, resume_job_classes: job_classes)
      result = Runner.run(job_net, options[:batch], options[:refresh_interval])

      if options[:dot]
        job_net.output_dot(options[:dot], format: options[:format])
      end

      unless result
        puts "\nIf you want to retry, run following command."
        failed_jobs = job_net.dag.jobs.each_with_object([]) { |j, arr| arr << j.class.to_s if j.state == Rukawa::State::Error }
        puts "  rukawa run #{job_net_name} #{failed_jobs.join(" ")}"
        exit 1
      end
    end

    desc "graph JOB_NET_NAME [JOB_NAME] [JOB_NAME] ...", "Output jobnet graph. If JOB_NET is set, simulate resumed job sequence"
    base_options
    method_option :output, aliases: "-o", type: :string, required: true
    method_option :format, aliases: "-f", type: :string
    def graph(job_net_name, *job_name)
      load_config
      load_job_definitions

      job_net_class = get_class(job_net_name)
      job_classes = job_name.map { |name| get_class(name) }
      job_net = job_net_class.new(resume_job_classes: job_classes)
      job_net.output_dot(options[:output], format: options[:format])
    end

    desc "run_job JOB_NAME [JOB_NAME] ...", "Run specific jobs."
    base_options
    run_options
    def run_job(*job_name)
      load_config
      set_logger
      set_concurrency
      load_job_definitions

      job_classes = job_name.map { |name| get_class(name) }
      job_net_class = anonymous_job_net_class(*job_classes)
      job_net = job_net_class.new
      result = Runner.run(job_net, options[:batch], options[:refresh_interval])

      if options[:dot]
        job_net.output_dot(options[:dot])
      end

      exit 1 unless result
    end

    desc "list", "List JobNet"
    base_options
    method_option :jobs, aliases: "-j", type: :boolean, desc: "Show jobs", default: false
    def list
      load_config
      load_job_definitions
      Rukawa::Overview.list_job_net(with_jobs: options[:jobs])
    end

    desc "list_job", "List Job"
    base_options
    def list_job
      load_config
      load_job_definitions
      Rukawa::Overview.list_job
    end

    map %w[version -v] => "__print_version"
    desc "version(-v)", "Print the version"
    def __print_version
      puts "rukawa #{Rukawa::VERSION}"
    end

    private

    def load_config
      if options[:config]
        load File.expand_path(options[:config], Dir.pwd)
      else
        load default_config_file if File.exist?(default_config_file)
      end

      Rukawa.configure do |c|
        c.job_dirs.concat(options[:job_dirs]) unless options[:job_dirs].empty?
      end
    end

    def set_logger
      Rukawa.configure do |c|
        if options[:stdout]
          c.logger = Logger.new($stdout)
        elsif options[:syslog]
          require 'syslog/logger'
          c.logger = Syslog::Logger.new('rukawa')
        elsif options[:log]
          c.logger = Logger.new(options[:log])
        end
      end
    end

    def set_concurrency
      Rukawa.configure do |c|
        c.concurrency = options[:concurrency] if options[:concurrency]
      end
    end

    def default_config_file
      "./rukawa.rb"
    end

    def load_job_definitions
      Rukawa.load_jobs
    end

    def get_class(name)
      Object.const_get(name)
    rescue NameError
      $stderr.puts("`#{name}` class is not found")
      exit 1
    end

    def anonymous_job_net_class(*job_classes)
      Class.new(JobNet) do
        self.singleton_class.send(:define_method, :dependencies) do
          job_classes.map { |klass| [klass, []] }.to_h
        end

        define_method(:name) { "AnonymousJobNet" }
      end
    end

    def variables
      if options[:varfile]
        read_varfile.freeze
      else
        options[:variables].freeze
      end
    end

    def read_varfile
      unless File.exist?(options[:varfile])
        $stderr.puts("`#{options[:varfile]}` is not found")
        exit 1
      end

      extname = File.extname(options[:varfile])
      if %w(.yml .yaml).include?(extname)
        require 'yaml'
        deserializer = ->(data) { YAML.load(data) }
      elsif %w(.js .json).include?(extname)
        require 'json'
        deserializer = ->(data) { JSON.load(data) }
      end

      deserializer.call(File.read(options[:varfile]))
    end
  end
end