cloudfoundry/warden

View on GitHub
em-posix-spawn/lib/em/posix/spawn/child.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'eventmachine'
require 'posix/spawn'

module EventMachine

  module POSIX

    module Spawn

      include ::POSIX::Spawn

      class Child

        include Spawn
        include Deferrable

        # Spawn a new process, write all input and read all output. Supports
        # the standard spawn interface as described in the POSIX::Spawn module
        # documentation:
        #
        #   new([env], command, [argv1, ...], [options])
        #
        # The following options are supported in addition to the standard
        # POSIX::Spawn options:
        #
        #   :input   => str      Write str to the new process's standard input.
        #   :timeout => int      Maximum number of seconds to allow the process
        #                        to execute before aborting with a TimeoutExceeded
        #                        exception.
        #   :max     => total    Maximum number of bytes of output to allow the
        #                        process to generate before aborting with a
        #                        MaximumOutputExceeded exception.
        #   :prepend_stdout => str Data to prepend to stdout
        #   :prepend_stderr => str Data to prepend to stderr
        #
        # Returns a new Child instance that is being executed. The object
        # includes the Deferrable module, and executes the success callback
        # when the process has exited, or the failure callback when the process
        # was killed because of exceeding the timeout, or exceeding the maximum
        # number of bytes to read from stdout and stderr combined. Once the
        # success callback is triggered, this objects's out, err and status
        # attributes are available. Clients can register callbacks to listen to
        # updates from out and err streams of the process.
        def initialize(*args)
          @env, @argv, options = extract_process_spawn_arguments(*args)
          @options = options.dup
          @input = @options.delete(:input)
          @timeout = @options.delete(:timeout)
          @max = @options.delete(:max)
          @discard_output = @options.delete(:discard_output)
          @prepend_stdout = @options.delete(:prepend_stdout) || ""
          @prepend_stderr = @options.delete(:prepend_stderr) || ""
          @options.delete(:chdir) if @options[:chdir].nil?

          exec!
        end

        # All data written to the child process's stdout stream as a String.
        attr_reader :out

        # All data written to the child process's stderr stream as a String.
        attr_reader :err

        # A Process::Status object with information on how the child exited.
        attr_reader :status

        # Total command execution time (wall-clock time)
        attr_reader :runtime

        attr_reader :pid

        # Determine if the process did exit with a zero exit status.
        def success?
          @status && @status.success?
        end

        # Determine if the process has already terminated.
        def terminated?
          !! @status
        end

        # Send the SIGTERM signal to the process.
        # Then send the SIGKILL signal to the process after the
        # specified timeout.
        def kill(timeout = 0)
          return false if terminated? || @sigkill_timer
          timeout ||= 0
          request_termination
          @sigkill_timer = Timer.new(timeout) {
            ::Process.kill('KILL', @pid) rescue nil
          }

          true
        end

        # Send the SIGTERM signal to the process.
        #
        # Returns the Process::Status object obtained by reaping the process.
        def request_termination
          @sigterm_timer.cancel if @sigterm_timer
          ::Process.kill('TERM', @pid) rescue nil
        end

        def add_streams_listener(&listener)
          [@cout.after_read(&listener), @cerr.after_read(&listener)]
        end

        class SignalHandler

          def self.setup!
            @instance ||= begin
                            new.tap do |instance|
                              instance.setup!
                            end
                          end
          end

          def self.teardown!
            if @instance
              @instance.teardown!
              @instance = nil
            end
          end

          def self.instance
            @instance
          end

          def initialize
            @pid_callback = {}
            @pid_to_process_status = {}
          end

          def setup!
            @pipe = ::IO.pipe
            @notifier = ::EM.watch @pipe[0], SignalNotifier, self
            @notifier.notify_readable = true

            @prev_handler = ::Signal.trap(:CHLD) do
              begin
                @pipe[1].write_nonblock("x")
              rescue IO::WaitWritable
              end

              @prev_handler.call unless @prev_handler == 'SYSTEM_DEFAULT'
            end

            @prev_handler ||= lambda { |*_| ; }
          end

          def teardown!
            ::Signal.trap(:CHLD, @prev_handler)

            @notifier.detach if ::EM.reactor_running?
            @pipe[0].close rescue nil
            @pipe[1].close rescue nil
          end

          def pid_callback(pid, &blk)
            @pid_callback[pid] = blk
          end

          def pid_to_process_status(pid)
            @pid_to_process_status.delete(pid)
          end

          def signal
            # The SIGCHLD handler may not be called exactly once for every
            # child. I.e., multiple children exiting concurrently may trigger
            # only one SIGCHLD in the parent. Therefore, reap all processes
            # that can be reaped.
            while pid = ::Process.wait(-1, ::Process::WNOHANG)
              @pid_to_process_status[pid] = $?
              blk = @pid_callback.delete(pid)
              EM.next_tick(&blk) if blk
            end
          rescue ::Errno::ECHILD
          end

          class SignalNotifier < ::EM::Connection
            def initialize(handler)
              @handler = handler
            end

            def notify_readable
              begin
                @io.read_nonblock(65536)
              rescue IO::WaitReadable
              end

              @handler.signal
            end
          end
        end

        # Execute command, write input, and read output. This is called
        # immediately when a new instance of this object is initialized.
        def exec!
          # The signal handler MUST be installed before spawning a new process
          SignalHandler.setup!

          if RUBY_PLATFORM =~ /linux/i && @options.delete(:close_others)
            @options[:in] = :in
            @options[:out] = :out
            @options[:err] = :err

            ::Dir.glob("/proc/%d/fd/*" % Process.pid).map do |file|
              fd = File.basename(file).to_i

              if fd > 2
                @options[fd] = :close
              end
            end
          end

          @pid, stdin, stdout, stderr = popen4(@env, *(@argv + [@options]))
          @start = Time.now

          # Don't leak into processes spawned after us.
          [stdin, stdout, stderr].each { |io| io.close_on_exec = true }

          # watch fds
          @cin = EM.watch stdin, WritableStream, (@input || "").dup, "stdin"
          @cout = EM.watch stdout, ReadableStream, @prepend_stdout, "stdout", @discard_output
          @cerr = EM.watch stderr, ReadableStream, @prepend_stderr, "stderr", @discard_output

          # register events
          @cin.notify_writable = true
          @cout.notify_readable = true
          @cerr.notify_readable = true

          # keep track of open fds
          in_flight = [@cin, @cout, @cerr].compact
          in_flight.each { |io|
            # force binary encoding
            io.force_encoding

            # register finalize hook
            io.callback { in_flight.delete(io) }
          }

          failure = nil

          # keep track of max output
          max = @max
          if max && max > 0
            check_buffer_size = lambda { |listener, _|
              if !terminated? && !listener.closed?
                if @cout.buffer.size + @cerr.buffer.size > max
                  failure = MaximumOutputExceeded
                  in_flight.each(&:close)
                  in_flight.clear
                  request_termination
                end
              end
            }

            @cout.after_read(&check_buffer_size)
            @cerr.after_read(&check_buffer_size)
          end

          # request termination of process when it doesn't terminate
          # in time
          timeout = @timeout
          if timeout && timeout > 0
            @sigterm_timer = Timer.new(timeout) {
              failure = TimeoutExceeded
              in_flight.each(&:close)
              in_flight.clear
              request_termination
            }
          end

          # run block when pid is reaped
          SignalHandler.instance.pid_callback(@pid) {
            @sigterm_timer.cancel if @sigterm_timer
            @sigkill_timer.cancel if @sigkill_timer
            @runtime = Time.now - @start
            @status = SignalHandler.instance.pid_to_process_status(@pid)

            in_flight.each do |io|
              # Trigger final read to make sure buffer is drained
              if io.respond_to?(:notify_readable)
                io.notify_readable
              end

              io.close
            end

            in_flight.clear

            @out = @cout.buffer
            @err = @cerr.buffer

            if failure
              set_deferred_failure failure
            else
              set_deferred_success
            end
          }
        end

        class Stream < Connection

          include Deferrable

          attr_reader :buffer

          def initialize(buffer, name)
            @buffer = buffer
            @name = name
            @closed = false
          end

          def force_encoding
            if @buffer.respond_to?(:force_encoding)
              @io.set_encoding('BINARY', 'BINARY')
              @buffer.force_encoding('BINARY')
            end
          end

          def close
            return if closed?


            # NB: Defer detach to the next tick, because EventMachine blows up
            #     when a file descriptor is attached and detached in the same
            #     tick. This can happen when the child process dies in the same
            #     tick it started, and the `#waitpid` loop in the signal
            #     handler picks it up afterwards. The signal handler, in turn,
            #     queues the child's callback to the executed via
            #     `EM#next_tick`. If the blocks queued by `EM#next_tick` are
            #     executed after that, still in the same tick, the child's file
            #     descriptors can be detached in the same tick they were
            #     attached.
            EM.next_tick do
              # NB: The ordering here is important. If we're using epoll,
              #     detach() attempts to deregister the associated fd via
              #     EPOLL_CTL_DEL and marks the EventableDescriptor for
              #     deletion upon completion of the iteration of the event
              #     loop. However, if the fd was closed before calling
              #     detach(), epoll_ctl() will sometimes return EBADFD and fail
              #     to remove the fd. This can lead to epoll_wait() returning
              #     an event whose data pointer is invalid (since it was
              #     deleted in a prior iteration of the event loop).
              detach
              @io.close rescue nil
            end

            @closed = true
          end

          def closed?
            @closed
          end
        end

        class ReadableStream < Stream

          class Listener

            attr_reader :name

            def initialize(name, &block)
              @name = name
              @block = block
              @offset = 0
            end

            # Sends the part of the buffer that has not yet been sent.
            def call(buffer)
              return if @block.nil?

              to_call = @block
              to_call.call(self, slice_from_buffer(buffer))
            end

            # Sends the part of the buffer that has not yet been sent,
            # after closing the listener. After this, the listener
            # will not receive any more calls.
            def close(buffer = "")
              return if @block.nil?

              to_call, @block = @block, nil
              to_call.call(self, slice_from_buffer(buffer))
            end

            def closed?
              @block.nil?
            end

            private

            def slice_from_buffer(buffer)
              to_be_sent = buffer.slice(@offset..-1)
              to_be_sent ||= ""
              @offset = buffer.length
              to_be_sent
            end
          end

          # Maximum buffer size for reading
          BUFSIZE = (64 * 1024)

          def initialize(buffer, name, discard_output = false, &block)
            super(buffer, name, &block)
            @discard_output = discard_output
            @after_read = []
          end

          def close
            # Ensure that the listener receives the entire buffer if it
            # attaches to the process only just before the stream is closed.
            @after_read.each do |listener|
              listener.close(@buffer)
            end

            @after_read.clear

            super
          end

          def after_read(&block)
            if block
              listener = Listener.new(@name, &block)
              if @closed
                # If this stream is already closed, then close the listener in
                # the next Event Machine tick. This ensures that the listener
                # receives the entire buffer if it attaches to the process only
                # after its completion.
                EM.next_tick do
                  listener.close(@buffer)
                end
              elsif !@buffer.empty?
                # If this stream's buffer is non-empty, pass it to the listener
                # in the next tick to avoid having to wait for the next piece
                # of data to be read.
                EM.next_tick do
                  listener.call(@buffer)
                end
              end

              @after_read << listener
              listener
            end
          end

          def notify_readable
            # Close and detach are decoupled, check if this notification is
            # supposed to go through.
            return if closed?

            begin
              out = @io.read_nonblock(BUFSIZE)
              @buffer << out unless @discard_output
              @after_read.each { |listener| listener.call(@buffer) }
            rescue Errno::EAGAIN, Errno::EINTR
            rescue EOFError
              close
              set_deferred_success
            end
          end
        end

        class WritableStream < Stream

          def notify_writable
            # Close and detach are decoupled, check if this notification is
            # supposed to go through.
            return if closed?

            begin
              boom = nil
              size = @io.write_nonblock(@buffer)
              @buffer = @buffer[size, @buffer.size]
            rescue Errno::EPIPE => boom
            rescue Errno::EAGAIN, Errno::EINTR
            end
            if boom || @buffer.size == 0
              close
              set_deferred_success
            end
          end
        end
      end
    end
  end
end