karafka/karafka

View on GitHub
bin/integrations

Summary

Maintainability
Test Coverage
#!/usr/bin/env ruby

# Runner to run integration specs in parallel

# Part of integration specs run linear without bundler.
# If we would run bundle exec when running this code, bundler would inject its own context
# into them, messing things up heavily
#
# Types of specs:
# - regular - can run in parallel, includes all the helpers
# - pristine - cannot run in parallel, uses custom bundler but includes helpers
# - poro - cannot run in parallel, uses custom bundler, does not include any helpers
raise 'This code needs to be executed WITHOUT bundle exec' if Kernel.const_defined?(:Bundler)

require 'open3'
require 'fileutils'
require 'pathname'
require 'tmpdir'
require 'etc'

ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../')))

# How many child processes with integration specs do we want to run in parallel
# When the value is high, there's a problem with thread allocation on Github CI, that is why
# we limit it. Locally we can run a lot of those, as many of them have sleeps and do not use a lot
# of CPU. Locally we also cannot go beyond certain limit due to how often and how many topics we
# create in Kafka. With an overloaded system, we start getting timeouts.
CONCURRENCY = ENV.key?('CI') ? 5 : Etc.nprocessors * 3

# How may bytes do we want to keep from the stdout in the buffer for when we need to print it
MAX_BUFFER_OUTPUT = 307_200

# Abstraction around a single test scenario execution process
class Scenario
  # How long a scenario can run before we kill it
  # This is a fail-safe just in case something would hang
  MAX_RUN_TIME = 5 * 60 # 5 minutes tops

  # There are rare cases where Karafka may force shutdown for some of the integration cases
  # This includes exactly those
  EXIT_CODES = {
    default: [0],
    'consumption/worker_critical_error_behaviour_spec.rb' => [0, 2].freeze,
    'shutdown/on_hanging_jobs_and_a_shutdown_spec.rb' => [2].freeze,
    'shutdown/on_hanging_on_shutdown_job_and_a_shutdown_spec.rb' => [2].freeze,
    'shutdown/on_hanging_listener_and_shutdown_spec.rb' => [2].freeze,
    'swarm/forceful_shutdown_of_hanging_spec.rb' => [2].freeze,
    'instrumentation/post_errors_instrumentation_error_spec.rb' => [1].freeze
  }.freeze

  private_constant :MAX_RUN_TIME, :EXIT_CODES

  # Creates scenario instance and runs in the background process
  #
  # @param path [String] path to the scenarios file
  def initialize(path)
    @path = path
    # First 1024 characters from stdout
    @stdout_head = ''
    # Last 1024 characters from stdout
    @stdout_tail = ''
  end

  # Starts running given scenario in a separate process
  def start
    @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(init_and_build_cmd)
    @started_at = current_time
  end

  # @return [String] integration spec name
  def name
    @path.gsub("#{ROOT_PATH}/spec/integrations/", '')
  end

  # @return [Symbol] type of spec
  def type
    scenario_dir = File.dirname(@path)

    return :poro if scenario_dir.include?('_poro')
    return :pristine if scenario_dir.include?('_pristine')

    :regular
  end

  # @return [Boolean] any spec that is not a regular one should not run in parallel with others
  def linear?
    type != :regular
  end

  # @return [Boolean] did this scenario finished or is it still running
  def finished?
    # If the thread is running too long, kill it
    if current_time - @started_at > MAX_RUN_TIME
      begin
        Process.kill('TERM', pid)
      # It may finish right after we want to kill it, that's why we ignore this
      rescue Errno::ESRCH
      end
    end

    # We read it so it won't grow as we use our default logger that prints to both test.log and
    # to stdout. Otherwise after reaching the buffer size, it would hang
    buffer = ''
    @stdout.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false)
    @stdout_head = buffer if @stdout_head.empty?
    @stdout_tail << buffer
    @stdout_tail = @stdout_tail[-MAX_BUFFER_OUTPUT..-1] || @stdout_tail

    !@wait_thr.alive?
  end

  # @return [Boolean] did this scenario finish successfully or not
  def success?
    expected_exit_codes = EXIT_CODES[name] || EXIT_CODES[:default]

    expected_exit_codes.include?(exit_code)
  end

  # @return [Integer] pid of the process of this scenario
  def pid
    @wait_thr.pid
  end

  # @return [Integer] exit code of the process running given scenario
  def exit_code
    # There may be no exit status if we killed the thread
    @wait_thr.value&.exitstatus || 123
  end

  # @return [String] exit status of the process
  def exit_status
    @wait_thr.value.to_s
  end

  # Prints a status report when scenario is finished and stdout if it failed
  def report
    if success?
      print "\e[#{32}m#{'.'}\e[0m"
    else
      buffer = ''

      @stderr.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false)

      puts
      puts "\e[#{31}m#{'[FAILED]'}\e[0m #{name}"
      puts "Time taken: #{current_time - @started_at} seconds"
      puts "Exit code: #{exit_code}"
      puts "Exit status: #{exit_status}"
      puts @stdout_head
      puts '...'
      puts @stdout_tail
      puts buffer
      puts
    end
  end

  # @return [Float] number of seconds that a given spec took to run
  def time_taken
    @finished_at - @started_at
  end

  # Close all the files that are open, so they do not pile up
  def close
    @finished_at = current_time
    @stdin.close
    @stdout.close
    @stderr.close
  end

  private

  # Sets up a proper environment for a given spec to run and returns the run command
  # @return [String] run command
  def init_and_build_cmd
    case type
    when :poro
      scenario_dir = File.dirname(@path)
      # We copy the spec into a temp dir, not to pollute the spec location with logs, etc
      temp_dir = Dir.mktmpdir
      file_name = File.basename(@path)

      FileUtils.cp_r("#{scenario_dir}/.", temp_dir)

      <<~CMD
        cd #{temp_dir} &&
        KARAFKA_GEM_DIR=#{ROOT_PATH} \
        BUNDLE_AUTO_INSTALL=true \
        PRISTINE_MODE=true \
        bundle exec ruby #{file_name}
      CMD
    when :pristine
      scenario_dir = File.dirname(@path)
      # We copy the spec into a temp dir, not to pollute the spec location with logs, etc
      temp_dir = Dir.mktmpdir
      file_name = File.basename(@path)

      FileUtils.cp_r("#{scenario_dir}/.", temp_dir)

      <<~CMD
        cd #{temp_dir} &&
        KARAFKA_GEM_DIR=#{ROOT_PATH} \
        BUNDLE_AUTO_INSTALL=true \
        PRISTINE_MODE=true \
        bundle exec ruby -r #{ROOT_PATH}/spec/integrations_helper.rb #{file_name}
      CMD
    else
      <<~CMD
        KARAFKA_GEM_DIR=#{ROOT_PATH} \
        bundle exec ruby -r ./spec/integrations_helper.rb #{@path}
      CMD
    end
  end

  # @return [Float] current machine time
  def current_time
    Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end
end

# Load all the specs
specs = Dir[ROOT_PATH.join('spec/integrations/**/*_spec.rb')]

FILTER_TYPE = ARGV[0] == '--exclude' ? 'exclude' : 'include'

# Remove the exclude flag
ARGV.shift if FILTER_TYPE == '--exclude'

# If filters is provided, apply
# Allows to provide several filters one after another and applies all of them
ARGV.each do |filter|
  specs.delete_if do |name|
    case FILTER_TYPE
    when 'include'
      !name.include?(filter)
    when 'exclude'
      name.include?(filter)
    else
      raise 'Invalid filter type'
    end
  end
end

raise ArgumentError, "No integration specs with filters: #{ARGV.join(', ')}" if specs.empty?

# Randomize order
seed = (ENV['SEED'] || rand(0..10_000)).to_i

puts "Random seed: #{seed}"

scenarios = specs
            .shuffle(random: Random.new(seed))
            .map { |integration_test| Scenario.new(integration_test) }

regulars = scenarios.reject(&:linear?)
linears = scenarios - regulars

active_scenarios = []
finished_scenarios = []

while finished_scenarios.size < scenarios.size
  # If we have space to run another scenario, we add it
  if active_scenarios.size < CONCURRENCY
    scenario = nil
    # We can run only one linear at the same time due to concurrency issues within bundler
    # Since they usually take longer than others, we try to run them as fast as possible when there
    # is a slot
    scenario = linears.pop unless active_scenarios.any?(&:linear?)
    scenario ||= regulars.pop

    if scenario
      scenario.start
      active_scenarios << scenario
    end
  end

  active_scenarios.select(&:finished?).each do |exited|
    scenario = active_scenarios.delete(exited)
    scenario.report
    scenario.close
    finished_scenarios << scenario
  end

  sleep(0.1)
end

# Report longest scenarios
puts
puts "\nLongest scenarios:\n\n"

finished_scenarios.sort_by(&:time_taken).reverse.first(10).each do |long_scenario|
  puts "[#{'%6.2f' % long_scenario.time_taken}] #{long_scenario.name}"
end

failed_scenarios = finished_scenarios.reject(&:success?)

if failed_scenarios.empty?
  puts
else
  # Report once more on the failed jobs
  # This will only list scenarios that failed without printing their stdout here.
  puts
  puts "\nFailed scenarios:\n\n"

  failed_scenarios.each do |scenario|
    puts "\e[#{31}m#{'[FAILED]'}\e[0m #{scenario.name}"
  end

  puts

  # Exit with 1 if not all scenarios were successful
  exit 1
end