lib/quartz/go_process.rb
require 'multi_json'
class Quartz::GoProcess
attr_reader :seed, :socket_path, :temp_file_path
def self.processes
@processes ||= []
end
def self.clear_processes
@processes = []
end
def forked_mode!
if @forked.nil?
@forked = true
else
@forked = !@forked
end
new_socket! if @forked
end
def initialize(opts)
@seed = SecureRandom.hex
socket_dir = opts.fetch(:socket_dir) { '/tmp' }
@socket_path = opts[:socket_path] || File.join(socket_dir, "quartz_#{seed}.sock")
ENV['QUARTZ_SOCKET'] = @socket_path
if opts[:file_path]
Quartz::Validations.check_for_go
compile_and_run(opts[:file_path])
elsif opts[:bin_path]
@go_process = IO.popen(opts[:bin_path])
elsif opts[:socket_path]
@external_socket = true
else
raise Quartz::ConfigError, 'Missing go binary'
end
block_until_server_starts
self.class.processes << self
end
def compile_and_run(path)
@temp_file_path = "/tmp/quartz_runner_#{seed}"
unless system('go', 'build', '-o', @temp_file_path, path)
raise Quartz::ConfigError, 'Go compilation failed'
end
@go_process = IO.popen(@temp_file_path)
end
def pid
@go_process.pid
end
def new_socket!
Thread.current["quartz_socket_#{seed}".to_sym] = UNIXSocket.new(@socket_path)
end
def socket
Thread.current["quartz_socket_#{seed}".to_sym] ||= UNIXSocket.new(@socket_path)
end
def block_until_server_starts
max_retries = 20
retries = 0
delay = 0.001 # seconds
loop do
return if File.exists?(@socket_path)
raise Quartz::GoServerError, 'RPC server not starting' if retries > max_retries
sleep(delay * 2**retries)
retries += 1
end
end
def get_metadata
payload = {
'method' => 'Quartz.GetMetadata',
'params' => [],
# This parameter isn't needed because we use a different
# connection for each thread.
'id' => 1
}
socket.send(MultiJson.dump(payload), 0)
response = read
if response['error']
raise Quartz::GoResponseError, "Metadata error: #{response['error']}"
end
response['result']
end
def call(struct_name, method, args)
payload = {
'method' => "#{struct_name}.#{method}",
'params' => [args],
'id' => 1
}
begin
socket.send(MultiJson.dump(payload), 0)
rescue Errno::EPIPE
# Retry send with a new socket. We might trigger this if Go
# process restarted. There's a good chance that this raises the
# exact same error.
new_socket!
socket.send(MultiJson.dump(payload), 0)
end
read
end
if ['1.9.3', '2.0.0'].include?(RUBY_VERSION)
READ_EXCEPTION = IO::WaitReadable
else
READ_EXCEPTION = IO::EAGAINWaitReadable
end
MAX_MESSAGE_SIZE = 8192 # Bytes
def read
value = ''
loop do
begin
value << socket.recv_nonblock(MAX_MESSAGE_SIZE)
break if value.end_with?("\n")
rescue READ_EXCEPTION
IO.select([socket], [], [])
rescue Errno::EPIPE
new_socket!
end
end
MultiJson.load(value)
end
def cleanup
# If we've forked, there's no need to cleanup since the parent
# process will.
return if @forked
# If the Go process is managed externally, there's nothing to do.
return if @external_socket
unless @killed_go_process
Process.kill('SIGTERM', pid)
Process.wait(pid)
@killed_go_process = true
end
if @temp_file_path && File.exists?(@temp_file_path)
File.delete(@temp_file_path)
end
end
def self.cleanup
return unless @processes
@processes.each(&:cleanup)
@processes = []
end
end
at_exit { Quartz::GoProcess.cleanup }