examples/worker
#!/usr/bin/env ruby
require 'bundler/setup'
require 'distribot'
require 'byebug'
require 'pp'
require 'active_support/core_ext/object'
require 'active_support/core_ext/array'
require 'active_support/json'
Distribot.configure do |config|
config.redis_url = ENV['DISTRIBOT_REDIS_URL']
config.rabbitmq_url = ENV['DISTRIBOT_RABBITMQ_URL']
end
class SimpleWorker
include Distribot::Worker
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
def enumerate(_context, &callback)
jobs = [ ]
2.times do |chunk|
max = 5
jobs << (1..max).to_a.map{|n| {id: SecureRandom.uuid, args: [chunk, n] } }
end
callback.call(jobs.flatten)
end
def process(_context, job)
logger.info job.to_s
raise "Test Error!" if rand >= 0.9
job_time = Distribot.redis.get('difficulty').to_f
sleep job_time <= 0 ? 3 : job_time
end
end
class HardWorker < SimpleWorker;
version '1.1.0'
enumerate_with :enumerate
process_tasks_with :process
end
class GoodWorker < SimpleWorker;
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
end
class FastWorker < SimpleWorker;
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
end
class CheapWorker < SimpleWorker;
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
end
class ForeignWorker < SimpleWorker;
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
end
class SlowWorker < SimpleWorker;
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
end
module News
class ArticleLister
include Distribot::Worker
version '1.0.1'
enumerate_with :enumerate
process_tasks_with :process
def enumerate(context, &callback)
@enumerations ||= 0
story_ids = (1..20).to_a
tasks = story_ids.map { |id| {story_id: "#{@enumerations}.#{id}"} }
@enumerations += 1
return tasks
end
def process(context, task)
json = { url: "https://infra/?story_id=#{task[:story_id]}" }.to_json
data_key = "flow.#{context.flow_id}.stories"
parsed = JSON.parse(json, symbolize_names: true) rescue {}
if parsed.key? :url
Distribot.redis.multi do |redis|
place = [data_key, task[:story_id]].join(':')
redis.set place, json
redis.sadd data_key, place
end
end
end
end
class ArticleDownloaderWorker
include Distribot::Worker
require 'wrest'
version '1.0.0'
enumerate_with :enumerate
process_tasks_with :process
def enumerate(context)
data_key = "flow.#{context.flow_id}.stories"
tasks = Distribot.redis.smembers(data_key).map { |location| {data_locator: location} }
Distribot.redis.del data_key
return tasks
end
def process(context, task)
data = JSON.parse(Distribot.redis.get(task[:data_locator]), symbolize_names: true)
puts data
# data[:url].to_uri(verify_mode: OpenSSL::SSL::VERIFY_NONE).get
Distribot.redis.del(task[:data_locator])
end
end
end
puts News::ArticleLister.new.run
puts News::ArticleDownloaderWorker.new.run
# Distribot.logger.info HardWorker.new.run
# Distribot.logger.info GoodWorker.new.run
# Distribot.logger.info FastWorker.new.run
# Distribot.logger.info CheapWorker.new.run
# Distribot.logger.info ForeignWorker.new.run
# Distribot.logger.info SlowWorker.new.run
puts "Worker up and running!"
sleep