bin/integrations
#!/usr/bin/env ruby
# Runner to run integration specs in parallel
# Part of integration specs run linear without bundler.
# If we would run bundle exec when running this code, bundler would inject its own context
# into them, messing things up heavily
#
# Types of specs:
# - regular - can run in parallel, includes all the helpers
# - pristine - cannot run in parallel, uses custom bundler but includes helpers
# - poro - cannot run in parallel, uses custom bundler, does not include any helpers
raise 'This code needs to be executed WITHOUT bundle exec' if Kernel.const_defined?(:Bundler)
require 'open3'
require 'fileutils'
require 'pathname'
require 'tmpdir'
require 'etc'
ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../')))
# How many child processes with integration specs do we want to run in parallel
# When the value is high, there's a problem with thread allocation on Github CI, that is why
# we limit it. Locally we can run a lot of those, as many of them have sleeps and do not use a lot
# of CPU. Locally we also cannot go beyond certain limit due to how often and how many topics we
# create in Kafka. With an overloaded system, we start getting timeouts.
CONCURRENCY = ENV.key?('CI') ? 5 : Etc.nprocessors * 3
# How may bytes do we want to keep from the stdout in the buffer for when we need to print it
MAX_BUFFER_OUTPUT = 307_200
# Abstraction around a single test scenario execution process
class Scenario
# How long a scenario can run before we kill it
# This is a fail-safe just in case something would hang
MAX_RUN_TIME = 5 * 60 # 5 minutes tops
# There are rare cases where Karafka may force shutdown for some of the integration cases
# This includes exactly those
EXIT_CODES = {
default: [0],
'consumption/worker_critical_error_behaviour_spec.rb' => [0, 2].freeze,
'shutdown/on_hanging_jobs_and_a_shutdown_spec.rb' => [2].freeze,
'shutdown/on_hanging_on_shutdown_job_and_a_shutdown_spec.rb' => [2].freeze,
'shutdown/on_hanging_listener_and_shutdown_spec.rb' => [2].freeze,
'swarm/forceful_shutdown_of_hanging_spec.rb' => [2].freeze,
'instrumentation/post_errors_instrumentation_error_spec.rb' => [1].freeze
}.freeze
private_constant :MAX_RUN_TIME, :EXIT_CODES
# Creates scenario instance and runs in the background process
#
# @param path [String] path to the scenarios file
def initialize(path)
@path = path
# First 1024 characters from stdout
@stdout_head = ''
# Last 1024 characters from stdout
@stdout_tail = ''
end
# Starts running given scenario in a separate process
def start
@stdin, @stdout, @stderr, @wait_thr = Open3.popen3(init_and_build_cmd)
@started_at = current_time
end
# @return [String] integration spec name
def name
@path.gsub("#{ROOT_PATH}/spec/integrations/", '')
end
# @return [Symbol] type of spec
def type
scenario_dir = File.dirname(@path)
return :poro if scenario_dir.include?('_poro')
return :pristine if scenario_dir.include?('_pristine')
:regular
end
# @return [Boolean] any spec that is not a regular one should not run in parallel with others
def linear?
type != :regular
end
# @return [Boolean] did this scenario finished or is it still running
def finished?
# If the thread is running too long, kill it
if current_time - @started_at > MAX_RUN_TIME
begin
Process.kill('TERM', pid)
# It may finish right after we want to kill it, that's why we ignore this
rescue Errno::ESRCH
end
end
# We read it so it won't grow as we use our default logger that prints to both test.log and
# to stdout. Otherwise after reaching the buffer size, it would hang
buffer = ''
@stdout.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false)
@stdout_head = buffer if @stdout_head.empty?
@stdout_tail << buffer
@stdout_tail = @stdout_tail[-MAX_BUFFER_OUTPUT..-1] || @stdout_tail
!@wait_thr.alive?
end
# @return [Boolean] did this scenario finish successfully or not
def success?
expected_exit_codes = EXIT_CODES[name] || EXIT_CODES[:default]
expected_exit_codes.include?(exit_code)
end
# @return [Integer] pid of the process of this scenario
def pid
@wait_thr.pid
end
# @return [Integer] exit code of the process running given scenario
def exit_code
# There may be no exit status if we killed the thread
@wait_thr.value&.exitstatus || 123
end
# @return [String] exit status of the process
def exit_status
@wait_thr.value.to_s
end
# Prints a status report when scenario is finished and stdout if it failed
def report
if success?
print "\e[#{32}m#{'.'}\e[0m"
else
buffer = ''
@stderr.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false)
puts
puts "\e[#{31}m#{'[FAILED]'}\e[0m #{name}"
puts "Time taken: #{current_time - @started_at} seconds"
puts "Exit code: #{exit_code}"
puts "Exit status: #{exit_status}"
puts @stdout_head
puts '...'
puts @stdout_tail
puts buffer
puts
end
end
# @return [Float] number of seconds that a given spec took to run
def time_taken
@finished_at - @started_at
end
# Close all the files that are open, so they do not pile up
def close
@finished_at = current_time
@stdin.close
@stdout.close
@stderr.close
end
private
# Sets up a proper environment for a given spec to run and returns the run command
# @return [String] run command
def init_and_build_cmd
case type
when :poro
scenario_dir = File.dirname(@path)
# We copy the spec into a temp dir, not to pollute the spec location with logs, etc
temp_dir = Dir.mktmpdir
file_name = File.basename(@path)
FileUtils.cp_r("#{scenario_dir}/.", temp_dir)
<<~CMD
cd #{temp_dir} &&
KARAFKA_GEM_DIR=#{ROOT_PATH} \
BUNDLE_AUTO_INSTALL=true \
PRISTINE_MODE=true \
bundle exec ruby #{file_name}
CMD
when :pristine
scenario_dir = File.dirname(@path)
# We copy the spec into a temp dir, not to pollute the spec location with logs, etc
temp_dir = Dir.mktmpdir
file_name = File.basename(@path)
FileUtils.cp_r("#{scenario_dir}/.", temp_dir)
<<~CMD
cd #{temp_dir} &&
KARAFKA_GEM_DIR=#{ROOT_PATH} \
BUNDLE_AUTO_INSTALL=true \
PRISTINE_MODE=true \
bundle exec ruby -r #{ROOT_PATH}/spec/integrations_helper.rb #{file_name}
CMD
else
<<~CMD
KARAFKA_GEM_DIR=#{ROOT_PATH} \
bundle exec ruby -r ./spec/integrations_helper.rb #{@path}
CMD
end
end
# @return [Float] current machine time
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
# Load all the specs
specs = Dir[ROOT_PATH.join('spec/integrations/**/*_spec.rb')]
FILTER_TYPE = ARGV[0] == '--exclude' ? 'exclude' : 'include'
# Remove the exclude flag
ARGV.shift if FILTER_TYPE == '--exclude'
# If filters is provided, apply
# Allows to provide several filters one after another and applies all of them
ARGV.each do |filter|
specs.delete_if do |name|
case FILTER_TYPE
when 'include'
!name.include?(filter)
when 'exclude'
name.include?(filter)
else
raise 'Invalid filter type'
end
end
end
raise ArgumentError, "No integration specs with filters: #{ARGV.join(', ')}" if specs.empty?
# Randomize order
seed = (ENV['SEED'] || rand(0..10_000)).to_i
puts "Random seed: #{seed}"
scenarios = specs
.shuffle(random: Random.new(seed))
.map { |integration_test| Scenario.new(integration_test) }
regulars = scenarios.reject(&:linear?)
linears = scenarios - regulars
active_scenarios = []
finished_scenarios = []
while finished_scenarios.size < scenarios.size
# If we have space to run another scenario, we add it
if active_scenarios.size < CONCURRENCY
scenario = nil
# We can run only one linear at the same time due to concurrency issues within bundler
# Since they usually take longer than others, we try to run them as fast as possible when there
# is a slot
scenario = linears.pop unless active_scenarios.any?(&:linear?)
scenario ||= regulars.pop
if scenario
scenario.start
active_scenarios << scenario
end
end
active_scenarios.select(&:finished?).each do |exited|
scenario = active_scenarios.delete(exited)
scenario.report
scenario.close
finished_scenarios << scenario
end
sleep(0.1)
end
# Report longest scenarios
puts
puts "\nLongest scenarios:\n\n"
finished_scenarios.sort_by(&:time_taken).reverse.first(10).each do |long_scenario|
puts "[#{'%6.2f' % long_scenario.time_taken}] #{long_scenario.name}"
end
failed_scenarios = finished_scenarios.reject(&:success?)
if failed_scenarios.empty?
puts
else
# Report once more on the failed jobs
# This will only list scenarios that failed without printing their stdout here.
puts
puts "\nFailed scenarios:\n\n"
failed_scenarios.each do |scenario|
puts "\e[#{31}m#{'[FAILED]'}\e[0m #{scenario.name}"
end
puts
# Exit with 1 if not all scenarios were successful
exit 1
end