ainame/mediakit

View on GitHub
lib/mediakit/process/runner.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'open3'
require 'timeout'
require 'cool.io'
require 'logger'
require 'mediakit/process/shell_escape'
require 'mediakit/utils/null_logger'

module Mediakit
  module Process
    class Runner
      class CommandNotFoundError < StandardError;
      end
      class TimeoutError < StandardError;
      end

      DEFAULT_READ_TIMEOUT_INTERVAL = 30

      attr_reader(:logger)

      def initialize(timeout: nil, nice: 0, logger: Logger.new(STDOUT))
        @timeout = timeout
        @nice = nice
        @logger = logger || Mediakit::Utils::NullLogger.new
      end

      # @overload run(command, *args)
      #   @param command [String]
      #   @param args [Array] args as array for safety shellescape
      # @overload run(command, args)
      #   @param command [String] command name
      #   @param args [Array] args as string
      # @return out [String] stdout and stderr of command
      # @return exit_status [Boolean] is succeeded
      def run(bin, *args)
        command = build_command(bin, *args)
        begin
          stdin, stdout, stderr, wait_thread = Open3.popen3(command)
          stdin.close
          exit_status, output, error_output = wait(stdout, stderr, wait_thread)
        rescue Errno::ENOENT => e
          raise(CommandNotFoundError, "Can't find command - #{command}, #{e.meessage}")
        end

        [exit_status, output, error_output]
      end

      def wait(stdout, stderr, wait_thread)
        begin
          setup_watchers(stdout, stderr)
          loop_thread = Thread.new { run_loop }
          wait_thread.join
          exit_status = (wait_thread.value.exitstatus == 0)
        rescue Timeout::Error => error
          force_kill_process(wait_thread.pid)
          raise(error)
        ensure
          teardown_watchers
          loop_thread.join if loop_thread
          @out_watcher.read
          @err_watcher.read
        end

        [exit_status, @out_watcher.data, @err_watcher.data]
      end

      def build_command(bin, *args)
        command = build_command_without_options(bin, *args)
        if @nice == 0
          command
        else
          "nice -n #{ShellEscape.escape(@nice.to_s)} sh -c \"#{command}\""
        end
      end

      def build_command_without_options(bin, *args)
        escaped_args = ShellEscape.escape(*args)
        "#{bin} #{escaped_args}"
      end

      def setup_watchers(stdout, stderr)
        @timer = @timeout ? TimeoutTimer.new(@timeout, Thread.current) : nil
        @out_watcher = IOWatcher.new(stdout) { |data| @timer.update if @timer; logger.info(data); }
        @err_watcher = IOWatcher.new(stderr) { |data| @timer.update if @timer; logger.info(data); }
        @loop = Coolio::Loop.new
        @out_watcher.attach(@loop)
        @err_watcher.attach(@loop)
        @timer.attach(@loop) if @timer
      end

      def run_loop
        @loop.run
      rescue => e
        # workaround for ambiguous RuntimeError
        logger.warn(e.message)
        logger.warn(e.backtrace.join("\n"))
      end

      def teardown_watchers
        @loop.watchers.each { |w| w.detach  if w.attached? }
        @loop.stop if @loop.has_active_watchers?
      rescue RuntimeError => e
        logger.warn(e.message)
        logger.warn(e.backtrace.join("\n"))
      end

      def force_kill_process(pid)
        ::Process.kill('SIGKILL', pid)
      rescue Errno::ESRCH => e
        logger.warn("fail SIGKILL pid=#{pid} - #{e.message}, #{e.backtrace.join("\n")}")
      end

      class IOWatcher < Coolio::IO
        attr_reader(:io, :data)

        def initialize(io, &block)
          @io = io
          @block = block
          @data = ''
          super
        end

        def on_read(data)
          @block.call(data)
          @data << data
        end

        def on_close
          @block = nil
        end

        def read
          @data << @io.read unless @io.closed?
          @io.close unless @io.closed?
        end
      end

      class TimeoutTimer < Coolio::TimerWatcher
        DEFAULT_CHECK_INTERVAL = 0.1

        def initialize(duration, current_thread)
          @duration = duration
          @watched_at = Time.now
          @current_thread = current_thread
          super(DEFAULT_CHECK_INTERVAL, true)
        end

        def on_timer
          if timeout?
            @current_thread.raise(Timeout::Error, "wait timeout error with #{@duration} sec.")
          end
        end

        def update
          @watched_at = Time.now
        end

        private

        def timeout?
          # compare duration into first decimal place by integer
          ((Time.now - @watched_at) * 10).floor >= (@duration * 10).floor
        end
      end
    end
  end
end