karafka/karafka

View on GitHub
bin/benchmarks

Summary

Maintainability
Test Coverage
#!/usr/bin/env ruby

# Runner for running given benchmark cases
# Some of the cases require pre-populated data and we populate this in places that need it
# In other cases we generate this data in a background process, so the partitions data stream
# is consistent and we don't end up consuming huge batches of a single partition.

require 'open3'
require 'pathname'

$LOAD_PATH.unshift(File.dirname(__FILE__))
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..'))

ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../')))

BENCHMARK_TOPICS = {
  'benchmarks_00_01' => 1,
  'benchmarks_00_05' => 5,
  'benchmarks_01_05' => 5,
  'benchmarks_00_10' => 10
}

# Load all the benchmarks
benchmarks = Dir[ROOT_PATH.join('spec/benchmarks/**/*.rb')]

# If filter is provided, apply
benchmarks.delete_if { |name| !name.include?(ARGV[0]) } if ARGV[0]

raise ArgumentError, "No benchmarks with filter: #{ARGV[0]}" if benchmarks.empty?

# We may skip seeding if we are running the benchmarks multiple times, then since we do not
# commit offsets we can skip generating more data
if ENV['SEED']
  require 'spec/benchmarks_helper'

  # We need to setup karafka here to have producer for data seeding
  setup_karafka

  # This takes some time but needs to run only once per benchmark session
  puts 'Seeding benchmarks data...'

  producer = Karafka::App.producer

  # We make our data json compatible so we can also benchmark serialization
  elements = Array.new(100_000) { { a: :b }.to_json }

  topics = Karafka::Admin.cluster_info.topics.map { |details| details.fetch(:topic_name) }

  BENCHMARK_TOPICS.each do |topic_name, partitions_count|
    ::Karafka::Admin.delete_topic(topic_name) if topics.include?(topic_name)
    ::Karafka::Admin.create_topic(topic_name, partitions_count, 1)
  end

  # We do not populate data of benchmarks_0_10 as we use it with life-stream data only
  %w[
    benchmarks_00_01
    benchmarks_00_05
  ].each do |topic_name|
    partitions_count = topic_name.split('_').last.to_i

    partitions_count.times do |partition|
      puts "Seeding #{topic_name}:#{partition}"

      elements.each_slice(10_000) do |data_slice|
        data = data_slice.map do |data|
          { topic: topic_name, payload: data, partition: partition }
        end

        producer.buffer_many(data)
        producer.flush_sync
      end
    end
  end
end

# Selects requested benchmarks and runs them one after another
benchmarks.each do |benchmark_path|
  puts "Running #{benchmark_path.gsub("#{ROOT_PATH}/spec/benchmarks/", '')}"

  benchmark = "bundle exec ruby -r ./spec/benchmarks_helper.rb #{benchmark_path}"

  Open3.popen3(benchmark) do |stdin, stdout, stderr, thread|
    t1 = Thread.new do
      while line = stdout.gets
        puts(line)
      end
    rescue IOError
    end

    t2 = Thread.new do
      while line = stderr.gets
        puts(line)
      end
    rescue IOError
    end

    thread.join
  end
end