lib/opal/builder/scheduler/prefork.rb
# frozen_string_literal: true
require 'etc'
require 'set'
module Opal
class Builder
class Scheduler
class Prefork < Scheduler
# We hook into the process_requires method
def process_requires(rel_path, requires, autoloads, options)
return if requires.empty?
if @in_fork
io = @in_fork
io.send(:new_requires, rel_path, requires, autoloads, options)
else
processed = prefork_reactor(rel_path, requires, autoloads, options)
processed = OrderCorrector.correct_order(processed, requires, builder)
builder.processed.append(*processed)
end
end
private
# Prefork is not deterministic. This module corrects an order of processed
# files so that it would be exactly the same as if building sequentially.
# While for Ruby files it usually isn't a problem, because the real order
# stems from how `Opal.modules` array is accessed, the JavaScript files
# are executed verbatim and their order may be important. Also, having
# deterministic output is always a good thing.
module OrderCorrector
module_function
def correct_order(processed, requires, builder)
# Let's build a hash that maps a filename to an array of files it requires
requires_hash = processed.to_h do |i|
[i.filename, expand_requires(i.requires, builder)]
end
# Let's build an array with a correct order of requires
order_array = build_require_order_array(expand_requires(requires, builder), requires_hash)
# If a key is duplicated, remove the last duplicate
order_array = order_array.uniq
# Create a hash from this array: [a,b,c] => [a => 0, b => 1, c => 2]
order_hash = order_array.each_with_index.to_h
# Let's return a processed array that has elements in the order provided
processed.sort_by do |asset|
# If a filename isn't present somehow in our hash, let's put it at the end
order_hash[asset.filename] || order_array.length
end
end
# Expand a requires array, so that the requires filenames will be
# matching Builder::Processor#. Builder needs to be passed so that
# we can access an `expand_ext` function from its context.
def expand_requires(requires, builder)
requires.map { |i| builder.expand_ext(i) }
end
def build_require_order_array(requires, requires_hash, built_for = Set.new)
array = []
requires.each do |name|
next if built_for.include?(name)
built_for << name
asset_requires = requires_hash[name]
array += build_require_order_array(asset_requires, requires_hash, built_for) if asset_requires
array << name
end
array
end
end
class ForkSet < Array
def initialize(count, &block)
super([])
@count, @block = count, block
create_fork
end
def get_events(queue_length)
# Wait for anything to happen:
# - Either any of our workers return some data
# - Or any workers become ready to receive data
# - But only if we have enough work for them
ios = IO.select(
map(&:read_io),
sample(queue_length).map(&:write_io),
[]
)
return [[], []] unless ios
events = ios[0].map do |io|
io = from_io(io, :read_io)
[io, *io.recv]
end
idles = ios[1].map do |io|
from_io(io, :write_io)
end
# Progressively create forks, because we may not need all
# the workers at the time. The number 6 was picked due to
# some trial and error on a Ryzen machine.
#
# Do note that prefork may happen more than once.
create_fork if length < @count && rand(6) == 1
[events, idles]
end
def create_fork
self << Fork.new(self, &@block)
end
def from_io(io, type)
find { |i| i.__send__(type) == io }
end
def close
each(&:close)
end
def wait
each(&:wait)
end
end
class Fork
def initialize(forkset)
@parent_read, @child_write = IO.pipe(binmode: true)
@child_read, @parent_write = IO.pipe(binmode: true)
@forkset = forkset
@in_fork = false
@pid = fork do
@in_fork = true
begin
@parent_read.close
@parent_write.close
yield(self)
rescue => error
send(:exception, error)
ensure
send(:close) unless write_io.closed?
@child_write.close
end
end
@child_read.close
@child_write.close
end
def close
send(:close)
@parent_write.close
end
def goodbye
read_io.close unless read_io.closed?
end
def send_message(io, msg)
msg = Marshal.dump(msg)
io.write([msg.length].pack('Q') + msg)
end
def recv_message(io)
length, = *io.read(8).unpack('Q')
Marshal.load(io.read(length)) # rubocop:disable Security/MarshalLoad
end
def fork?
@in_fork
end
def read_io
fork? ? @child_read : @parent_read
end
def write_io
fork? ? @child_write : @parent_write
end
def eof?
write_io.closed?
end
def send(*msg)
send_message(write_io, msg)
end
def recv
recv_message(read_io)
end
def wait
Process.waitpid(@pid, Process::WNOHANG)
end
end
# By default we use 3/4 of CPU threads detected.
def fork_count
ENV['OPAL_PREFORK_THREADS']&.to_i || (Etc.nprocessors * 3 / 4.0).ceil
end
def prefork
@forks = ForkSet.new(fork_count, &method(:fork_entrypoint))
end
def fork_entrypoint(io)
# Ensure we can work with our forks async...
Fiber.set_scheduler(nil) if Fiber.respond_to? :set_scheduler
@in_fork = io
until io.eof?
$0 = 'opal/builder: idle'
type, *args = *io.recv
case type
when :compile
rel_path, req, autoloads, options = *args
$0 = "opal/builder: #{req}"
begin
asset = builder.process_require_threadsafely(req, autoloads, options)
io.send(:new_asset, asset)
rescue Builder::MissingRequire => error
io.send(:missing_require_exception, rel_path, error)
end
when :close
io.goodbye
break
end
end
rescue Errno::EPIPE
exit!
end
def prefork_reactor(rel_path, requires, autoloads, options)
prefork
processed = []
first = rel_path
queue = requires.map { |i| [rel_path, i, autoloads, options] }
awaiting = 0
built = 0
should_log = $stderr.tty? && !ENV['OPAL_DISABLE_PREFORK_LOGS']
$stderr.print "\r\e[K" if should_log
loop do
events, idles = @forks.get_events(queue.length)
idles.each do |io|
break if queue.empty?
rel_path, req, autoloads, options = *queue.shift
next if builder.already_processed.include?(req)
awaiting += 1
builder.already_processed << req
io.send(:compile, rel_path, req, autoloads, options)
end
events.each do |io, type, *args|
case type
when :new_requires
rel_path, requires, autoloads, options = *args
requires.each do |i|
queue << [rel_path, i, autoloads, options]
end
when :new_asset
asset, = *args
if !asset
# Do nothing, we received a nil which is expected.
else
processed << asset
end
built += 1
awaiting -= 1
when :missing_require_exception
rel_path, error = *args
raise error, "A file required by #{rel_path.inspect} wasn't found.\n#{error.message}", error.backtrace
when :exception
error, = *args
raise error
when :close
io.goodbye
end
end
if should_log
percent = (100.0 * built / (awaiting + built)).round(1)
str = format("[opal/builder] Building %<first>s... (%<percent>4.3g%%)\r", first: first, percent: percent)
$stderr.print str
end
break if awaiting == 0 && queue.empty?
end
processed
ensure
$stderr.print "\r\e[K\r" if should_log
@forks.close
@forks.wait
end
end
end
end
end