joker1007/rukawa

View on GitHub
lib/rukawa/job_net.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'rukawa/abstract_job'

module Rukawa
  class JobNet < AbstractJob
    include Enumerable
    attr_reader :dag, :context, :variables

    class << self
      def dependencies
        raise NotImplementedError, "Please override"
      end
    end

    def initialize(variables: {}, context: Context.new, parent_job_net: nil, resume_job_classes: [])
      @parent_job_net = parent_job_net
      @variables = variables
      @context = context
      @dag = Dag.new
      @dag.build(self, variables, context, self.class.dependencies)
      @resume_job_classes = resume_job_classes

      unless resume_job_classes.empty?
        resume_targets = []
        @dag.tsort_each_node do |node|
          node.set_state(:bypassed)
          resume_targets << node if resume_job_classes.include?(node.class)
        end

        resume_targets.each do |node|
          @dag.each_strongly_connected_component_from(node) do |nodes|
            nodes.each { |connected| connected.set_state(:waiting) }
          end
        end
      end
    end

    def execute
      dataflows.each(&:execute)
    end

    def run(wait_interval = 1)
      promise = Concurrent::Promise.new do
        futures = execute
        until futures.all?(&:complete?)
          yield self if block_given?
          sleep wait_interval
        end
        errors = futures.map(&:reason).compact

        unless errors.empty?
          errors.each do |err|
            next if err.is_a?(DependencyUnsatisfied)
            Rukawa.logger.error(err)
          end
        end

        futures
      end
      promise.execute
    end

    def started_at
      @dag.nodes.min_by { |j| j.started_at ? j.started_at.to_i : Float::INFINITY }.started_at
    end

    def finished_at
      @dag.nodes.max_by { |j| j.finished_at.to_i }.finished_at
    end

    def toplevel?
      @parent_job_net.nil?
    end

    def subgraph?
      !toplevel?
    end

    def dataflows
      @dag.leveled_each.map(&:dataflow)
    end

    def state
      inject(Rukawa::State::Waiting) do |state, j|
        state.merge(j.state)
      end
    end

    def output_dot(filename, format: nil)
      if format && format != "dot"
        io = IO.popen(["#{Rukawa.config.dot_command}", "-T#{format}", "-o", filename], "w")
        io.write(to_dot)
        io.close
      else
        File.open(filename, 'w') { |f| f.write(to_dot) }
      end
    end

    def to_dot(subgraph = false)
      graphdef = subgraph ? "subgraph" : "digraph"
      buf = %Q|#{graphdef} "#{subgraph ? "cluster_" : ""}#{name}" {\n|
      buf += %Q{label = "#{graph_label}";\n}
      buf += Rukawa.config.graph.attrs
      buf += Rukawa.config.graph.node.attrs
      buf += "color = blue;\n" if subgraph
      dag.each do |j|
        buf += j.to_dot_def
      end

      dag.edges.each do |edge|
        buf += %Q|"#{edge.from.name}" -> "#{edge.to.name}";\n|
      end
      buf += "}\n"
    end

    def to_dot_def
      to_dot(true)
    end

    def jobs_as_to
      @dag.jobs.select { |j| j.in_comings.select { |edge| edge.cluster == self }.empty? && j.root? }
    end

    def jobs_as_from
      @dag.jobs.select { |j| j.out_goings.select { |edge| edge.cluster == self }.empty? && j.leaf? }
    end

    def each(&block)
      @dag.each(&block)
    end

    private

    def graph_label
      if @resume_job_classes.empty?
        name
      else
        "#{name} resume from (#{@resume_job_classes.join(", ")})"
      end
    end
  end
end