piotrmurach/tty-command

View on GitHub
lib/tty/command/process_runner.rb

Summary

Maintainability
A
2 hrs
Test Coverage
# frozen_string_literal: true
 
require "thread"
 
require_relative "child_process"
require_relative "result"
require_relative "truncator"
 
module TTY
class Command
class ProcessRunner
# the command to be spawned
attr_reader :cmd
 
# Initialize a Runner object
#
# @param [Printer] printer
# the printer to use for logging
#
# @api private
def initialize(cmd, printer, &block)
@cmd = cmd
@timeout = cmd.options[:timeout]
@input = cmd.options[:input]
@signal = cmd.options[:signal] || "SIGKILL"
@binmode = cmd.options[:binmode]
@printer = printer
@block = block
end
 
# Execute child process
#
# Write the input if provided to the child's stdin and read
# the contents of both the stdout and stderr.
#
# If a block is provided then yield the stdout and stderr content
# as its being read.
#
# @api public
def run!
@printer.print_command_start(cmd)
start = Time.now
 
pid, stdin, stdout, stderr = ChildProcess.spawn(cmd)
 
write_stream(stdin, @input)
 
stdout_data, stderr_data = read_streams(stdout, stderr)
 
status = waitpid(pid)
runtime = Time.now - start
 
@printer.print_command_exit(cmd, status, runtime)
 
Result.new(status, stdout_data, stderr_data, runtime)
ensure
[stdin, stdout, stderr].each { |fd| fd.close if fd && !fd.closed? }
if pid # Ensure no zombie processes
::Process.detach(pid)
terminate(pid)
end
end
 
# Stop a process marked by pid
#
# @param [Integer] pid
#
# @api public
def terminate(pid)
::Process.kill(@signal, pid) rescue nil
end
 
private
 
# The buffer size for reading stdout and stderr
BUFSIZE = 16 * 1024
 
# @api private
def handle_timeout(runtime)
return unless @timeout
 
t = @timeout - runtime
raise TimeoutExceeded if t < 0.0
end
 
# Write the input to the process stdin
#
# @api private
Method `write_stream` has a Cognitive Complexity of 11 (exceeds 5 allowed). Consider refactoring.
def write_stream(stream, input)
start = Time.now
writers = [input && stream].compact
 
while writers.any?
ready = IO.select(nil, writers, writers, @timeout)
raise TimeoutExceeded if ready.nil?
 
ready[1].each do |writer|
begin
err = nil
size = writer.write(@input)
input = input.byteslice(size..-1)
rescue IO::WaitWritable
rescue Errno::EPIPE => err
# The pipe closed before all input written
# Probably process exited prematurely
writer.close
writers.delete(writer)
end
if err || input.bytesize == 0
writer.close
writers.delete(writer)
end
 
# control total time spent writing
runtime = Time.now - start
handle_timeout(runtime)
end
end
end
 
# Read stdout & stderr streams in the background
#
# @param [IO] stdout
# @param [IO] stderr
#
# @api private
Method `read_streams` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.
def read_streams(stdout, stderr)
stdout_data = []
stderr_data = Truncator.new
 
out_handler = ->(data) {
stdout_data << data
@printer.print_command_out_data(cmd, data)
@block.(data, nil) if @block
}
 
err_handler = ->(data) {
stderr_data << data
@printer.print_command_err_data(cmd, data)
@block.(nil, data) if @block
}
 
stdout_thread = read_stream(stdout, out_handler)
stderr_thread = read_stream(stderr, err_handler)
 
stdout_thread.join
stderr_thread.join
 
encoding = @binmode ? Encoding::BINARY : Encoding::UTF_8
 
[
stdout_data.join.force_encoding(encoding),
stderr_data.read.dup.force_encoding(encoding)
]
end
 
# Read stream and invoke handler when data becomes available
#
# @param [IO] stream
# the stream to read data from
# @param [Proc] handler
# the handler to call when data becomes available
#
# @api private
Method `read_stream` has a Cognitive Complexity of 11 (exceeds 5 allowed). Consider refactoring.
def read_stream(stream, handler)
Thread.new do
if Thread.current.respond_to?(:report_on_exception)
Thread.current.report_on_exception = false
end
Thread.current[:cmd_start] = Time.now
readers = [stream]
 
while readers.any?
ready = IO.select(readers, nil, readers, @timeout)
raise TimeoutExceeded if ready.nil?
 
ready[0].each do |reader|
begin
chunk = reader.readpartial(BUFSIZE)
handler.(chunk)
 
# control total time spent reading
runtime = Time.now - Thread.current[:cmd_start]
handle_timeout(runtime)
rescue Errno::EAGAIN, Errno::EINTR
rescue EOFError, Errno::EPIPE, Errno::EIO # thrown by PTY
readers.delete(reader)
reader.close
end
end
end
end
end
 
# @api private
def waitpid(pid)
_pid, status = ::Process.waitpid2(pid, ::Process::WUNTRACED)
status.exitstatus || status.termsig if _pid
rescue Errno::ECHILD
# In JRuby, waiting on a finished pid raises.
end
end # ProcessRunner
end # Command
end # TTY