bin/benchmarks
#!/usr/bin/env ruby
# Runner for running given benchmark cases
# Some of the cases require pre-populated data and we populate this in places that need it
# In other cases we generate this data in a background process, so the partitions data stream
# is consistent and we don't end up consuming huge batches of a single partition.
require 'open3'
require 'pathname'
$LOAD_PATH.unshift(File.dirname(__FILE__))
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..'))
ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../')))
BENCHMARK_TOPICS = {
'benchmarks_00_01' => 1,
'benchmarks_00_05' => 5,
'benchmarks_01_05' => 5,
'benchmarks_00_10' => 10
}
# Load all the benchmarks
benchmarks = Dir[ROOT_PATH.join('spec/benchmarks/**/*.rb')]
# If filter is provided, apply
benchmarks.delete_if { |name| !name.include?(ARGV[0]) } if ARGV[0]
raise ArgumentError, "No benchmarks with filter: #{ARGV[0]}" if benchmarks.empty?
# We may skip seeding if we are running the benchmarks multiple times, then since we do not
# commit offsets we can skip generating more data
if ENV['SEED']
require 'spec/benchmarks_helper'
# We need to setup karafka here to have producer for data seeding
setup_karafka
# This takes some time but needs to run only once per benchmark session
puts 'Seeding benchmarks data...'
producer = Karafka::App.producer
# We make our data json compatible so we can also benchmark serialization
elements = Array.new(100_000) { { a: :b }.to_json }
topics = Karafka::Admin.cluster_info.topics.map { |details| details.fetch(:topic_name) }
BENCHMARK_TOPICS.each do |topic_name, partitions_count|
::Karafka::Admin.delete_topic(topic_name) if topics.include?(topic_name)
::Karafka::Admin.create_topic(topic_name, partitions_count, 1)
end
# We do not populate data of benchmarks_0_10 as we use it with life-stream data only
%w[
benchmarks_00_01
benchmarks_00_05
].each do |topic_name|
partitions_count = topic_name.split('_').last.to_i
partitions_count.times do |partition|
puts "Seeding #{topic_name}:#{partition}"
elements.each_slice(10_000) do |data_slice|
data = data_slice.map do |data|
{ topic: topic_name, payload: data, partition: partition }
end
producer.buffer_many(data)
producer.flush_sync
end
end
end
end
# Selects requested benchmarks and runs them one after another
benchmarks.each do |benchmark_path|
puts "Running #{benchmark_path.gsub("#{ROOT_PATH}/spec/benchmarks/", '')}"
benchmark = "bundle exec ruby -r ./spec/benchmarks_helper.rb #{benchmark_path}"
Open3.popen3(benchmark) do |stdin, stdout, stderr, thread|
t1 = Thread.new do
while line = stdout.gets
puts(line)
end
rescue IOError
end
t2 = Thread.new do
while line = stderr.gets
puts(line)
end
rescue IOError
end
thread.join
end
end