sensu/sensu-spawn

View on GitHub
lib/sensu/spawn.rb

Summary

Maintainability
B
4 hrs
Test Coverage
gem "em-worker", "0.0.2"
gem "childprocess", "0.5.8"
gem "ffi", "1.9.21"

require "eventmachine"
require "em/worker"
require "childprocess"
require "rbconfig"
require "timeout"

# Attempt an upfront loading of FFI and POSIX spawn libraries. These
# libraries may fail to load on certain platforms, load errors are
# silenced, and the libraries are not used by Sensu Spawn.
begin
  require "ffi"
  require "childprocess/unix/platform/#{ChildProcess.platform_name}"
  require "childprocess/unix/lib"
  require "childprocess/unix/posix_spawn_process"
rescue LoadError; end

module Sensu
  module Spawn
    POSIX_SPAWN_PLATFORMS = [:linux, :macosx].freeze
    POSIX_SPAWN_ARCHS = ["x86_64", "i386"].freeze

    @@mutex = Mutex.new

    class << self
      # Setup a spawn process worker, to limit the number of
      # concurrent child processes allowed at one time. This method
      # creates the spawn process worker instance variable:
      # `@process_worker`.
      #
      # @param [Hash] options to create a process worker with.
      # @option options [Integer] :limit max number of child processes
      #   at a time.
      def setup(options={})
        limit = options[:limit] || 12
        @process_worker ||= EM::Worker.new(:concurrency => limit)
      end

      # Spawn a child process. The EventMachine reactor (loop) must be
      # running for this method to work.
      #
      # @param [String] command to run.
      # @param [Hash] options to create a child process with.
      # @option options [String] :data to write to STDIN.
      # @option options [Integer] :timeout in seconds.
      # @param [Proc] callback called when the child process exits,
      #   its output and exit status are passed as parameters.
      def process(command, options={}, &callback)
        create = Proc.new do
          child_process(command, options)
        end
        setup(options) unless @process_worker
        @process_worker.enqueue(create, callback)
      end

      # Determine if POSIX Spawn is used to create child processes on
      # the current platform. ChildProcess supports POSIX Spawn for
      # several platforms (OSs & architectures), however, Sensu only
      # enables the use of POSIX Spawn on a select few.
      #
      # @return [TrueClass, FalseClass]
      def posix_spawn?
        return @posix_spawn unless @posix_spawn.nil?
        platform_supported = POSIX_SPAWN_PLATFORMS.include?(ChildProcess.os)
        arch_supported = POSIX_SPAWN_ARCHS.include?(ChildProcess.arch)
        @posix_spawn = platform_supported && arch_supported
      end

      # Determine if the current platform is Windows.
      #
      # @return [TrueClass, FalseClass]
      def on_windows?
        return @on_windows unless @on_windows.nil?
        @on_windows = ChildProcess.windows?
      end

      # Build a child process attached to a pipe, in order to capture
      # its output (STDERR, STDOUT). The child process will be a
      # platform dependent shell, that is responsible for executing
      # the provided command.
      #
      # @param [String] command to run.
      # @return [Array] child object, pipe reader, pipe writer.
      def build_child_process(command)
        reader, writer = IO.pipe
        shell = case
        when on_windows?
          ["cmd", "/c"]
        else
          ["sh", "-c"]
        end
        ChildProcess.posix_spawn = posix_spawn?
        shell_command = shell + [command]
        child = ChildProcess.build(*shell_command)
        child.io.stdout = child.io.stderr = writer
        child.leader = true
        [child, reader, writer]
      end

      # Write data to a stream/file and read a stream/file
      # until end of file (EOF).
      #
      # @param writer [Object] to write data to (optional).
      # @param reader [Object] to read contents of until EOF.
      # @param data [String] to be written to writer.
      # @return [String] the reader stream/file contents.
      def write_and_read(writer, reader, data)
        buffer = (data || "").dup
        output = ""
        loop do
          unless buffer.empty?
            writer.write(buffer.slice!(0, 8191))
            writer.close if buffer.empty?
          end
          begin
            readable, _ = IO.select([reader], nil, nil, 0)
            if readable || buffer.empty?
              output << reader.readpartial(8192)
            end
          rescue EOFError
            reader.close
            break
          end
        end
        output
      end

      # Create a child process, return its output (STDERR & STDOUT),
      # and exit status. The child process will have its own process
      # group, may accept data via STDIN, and have a timeout.
      # ChildProcess Unix POSIX spawn (`start()`) is not thread safe,
      # so a mutex is used to allow safe execution on Ruby runtimes
      # with real threads (JRuby).
      #
      # Using stdlib's Timeout instead of child.poll_for_exit to
      # avoid a deadlock, when the child output is greater than
      # the OS max buffer size.
      #
      # @param [String] command to run.
      # @param [Hash] options to create a child process with.
      # @option options [String] :data to write to STDIN.
      # @option options [Integer] :timeout in seconds.
      # @return [Array] child process output and exit status.
      def child_process(command, options={})
        child, reader, writer = build_child_process(command)
        child.duplex = true if options[:data]
        @@mutex.synchronize do
          child.start
        end
        writer.close
        output = ""
        if options[:timeout]
          Timeout::timeout(options[:timeout], ChildProcess::TimeoutError) do
            output = write_and_read(child.io.stdin, reader, options[:data])
            child.wait
          end
        else
          output = write_and_read(child.io.stdin, reader, options[:data])
          child.wait
        end
        [output, child.exit_code]
      rescue ChildProcess::TimeoutError
        output = "Execution timed out"
        begin
          child.stop
        rescue => error
          pid = child.pid rescue "?"
          output += " - Unable to TERM/KILL the process: ##{pid}, #{error}"
        end
        [output, 2]
      rescue => error
        child.stop rescue nil
        ["Unexpected error: #{error}", 3]
      end
    end
  end
end